diff --git a/docs/source/api/index.rst b/docs/source/api/index.rst index 8b74e40a..d3da2779 100644 --- a/docs/source/api/index.rst +++ b/docs/source/api/index.rst @@ -48,6 +48,8 @@ Basic Operators MPIVStack MPIStackedVStack MPIHStack + MPIHalo + Derivatives ~~~~~~~~~~~ @@ -71,6 +73,7 @@ Signal Processing :toctree: generated/ MPIFredholm1 + MPINonStationaryConvolve1D MPIFFT2D MPIFFTND diff --git a/docs/source/gpu.rst b/docs/source/gpu.rst index dfabb46e..aab702ef 100644 --- a/docs/source/gpu.rst +++ b/docs/source/gpu.rst @@ -184,9 +184,7 @@ GPU+MPI, and GPU+NCCL): * - :class:`pylops_mpi.basicoperators.MPIFirstDerivative` - ✅ - ✅ - - ✅ - - ✅ - - ✅ + - ✅ * - :class:`pylops_mpi.basicoperators.MPISecondDerivative` - ✅ - ✅ diff --git a/examples/plot_halo.py b/examples/plot_halo.py new file mode 100644 index 00000000..ae5af41c --- /dev/null +++ b/examples/plot_halo.py @@ -0,0 +1,636 @@ +""" +Halo +==== +This example demonstrates how to use the :py:class:`pylops_mpi.basicoperators.MPIHalo` +operator. + +This operator is specifically designed to extend pylops-mpi's capabilities in terms of +chunking of N-dimensional distributed arrays when solving inverse problems with +pylops-mpi's operators and solvers. As a matter of fact, whilst +:py:class:`pylops_mpi.DistributedArray` allows one to create N-dimensional array +distributed over a given axis (defined via the ``axis`` paramater), only +1-dimensional distributed arrays can be used with pylops-mpi's operators and solvers. + +Moreover, several operators require accessing neighbouring values, which for values at +the edges of a local array belong to neighouring ranks. The process of obtaining ghost +cells (or halos) from neighouring ranks is implemented in the ``add_ghost_cells`` method +of the :py:class:`pylops_mpi.DistributedArray` class; however, once again as the chunking +is so far limited to only one axis, also haloing happens in a single dimension. + +The :py:class:`pylops_mpi.basicoperators.MPIHalo` operator allows to internally convert a +1-dimensional distributed array into a N-dimensional distributed array (whilst never +physically changing the actual shape of local arrays) and extracting a number of user-defined +ghost cells over all axes. Moreover, even for the case when we are interested to chunk the +distributed array over a single axis, having an operator that can be chained to any other +pre-existent operator may open doors to an easier implementation of new operators compared +to having access to ``add_ghost_cells`` method to be used directly in the matvec/rmatvec +implementation of an operator. + +""" +from matplotlib import pyplot as plt + +import sys +import math +import time +import numpy as np +import pylops + +from pylops.utils.wavelets import ricker +from pylops_mpi.basicoperators.Halo import MPIHalo, halo_block_split +from mpi4py import MPI +from scipy.signal.windows import gaussian + +import pylops_mpi + +plt.close("all") +np.random.seed(42) + +comm = MPI.COMM_WORLD +rank = comm.Get_rank() +size = comm.Get_size() + +def pause(comm, t=4): + sys.stdout.flush() + comm.barrier() + time.sleep(t) + +def local_extent_from_slice(local_shape, local_slice, halo): + lefts = [] + rights = [] + if isinstance(halo, int): + for sl in local_slice: + lefts.append(halo if (sl.start or 0) > 0 else 0) + rights.append(halo if sl.stop is not None else 0) + else: + for sl, hal in zip(local_slice, halo): + lefts.append(hal if (sl.start or 0) > 0 else 0) + rights.append(hal if sl.stop is not None else 0) + extent = tuple(dim + l + r for dim, l, r in zip(local_shape, lefts, rights)) + return extent, lefts, rights + + +############################################################################### +# Let’s start by considering a 1-dimensional distributed array. We are +# interested to compute a first derivative: however, because we are required +# to use a stencil of size >=2, at the edges of the local arrays we are +# required to borrow cells from neighbouring ranks. As already mentioned, the +# :py:class:`pylops_mpi.basicoperators.MPIFirstDerivative` operator does that +# under the hood using the ``add_ghost_cells`` method of the +# :py:class:`pylops_mpi.DistributedArray` class; however, we will see how we +# can now achieve the same without having to re-implement the derivative operator +# in pylops-mpi. Instead, we will simply combine the +# :py:class:`pylops.FirstDerivative` operator with the +# :py:class:`pylops_mpi.basicoperators.MPIBlockDiag` and +# :py:class:`pylops_mpi.basicoperators.MPIHalo` operators. +# +# To begin with, let's however simply create and apply a +# :py:class:`pylops_mpi.basicoperators.MPIHalo` operator with a halo of 1 +# (apart from the edges of the global array, where no halo is added) + + +# Halo operator +nlocal = 64 +n = nlocal * size +proc_grid_shape = (size, ) # number of partitions over each axis + +halo = 1 if size > 1 else 0 # for Sphinx-gallery as it runs with 1 rank +halo_op = MPIHalo( + dims=(n, ), + halo=halo, + proc_grid_shape=proc_grid_shape, + comm=comm +) + +# Global array +x = np.arange(n).astype(np.float64) + +# Distributed array +x_dist = pylops_mpi.DistributedArray( + global_shape=n, + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER) +x_slice = halo_block_split((n, ), comm, proc_grid_shape) +x_local = x[x_slice] +x_dist.local_array[:] = x_local + +# Apply halo +x_dist_halo = halo_op @ x_dist + +if rank == 0: + print('1D Halo with halo=1') +pause(comm, t=1) +print(f"Rank {rank} - shape before/after haloing: {x_dist.local_array.size}" + f"/{x_dist_halo.local_array.size}") + + +############################################################################### +# Let’s now instead compute the derivative and compare it with the result of +# the :py:class:`pylops_mpi.basicoperators.MPIFirstDerivative` operator. + +# Derivative operator (using Halo) +if size == 1: + local_shape_with_halo = x_local.size # for Sphinx-gallery +else: + if rank == 0 or rank == size - 1: + local_shape_with_halo = (x_local.size + 1, ) + else: + local_shape_with_halo = (x_local.size + 2, ) +DOp = pylops.FirstDerivative(dims=local_shape_with_halo, axis=0) +DOp_dist = pylops_mpi.MPIBlockDiag([DOp, ]) +Op_dist = halo_op.H @ DOp_dist @ halo_op + +y_dist = Op_dist @ x_dist + +# Derivative operator (original) +DOp1_dist = pylops_mpi.MPIFirstDerivative(dims=n) + +y1_dist = DOp1_dist @ x_dist + +assert np.allclose( + y_dist.local_array, + y1_dist.local_array) + + +############################################################################### +# So good so far, we can reproduce a 2-nd order, centered first derivative +# by combining basic pylops and pylops-mpi operators. Let's see if we can do the +# same with a 5-th order derivative that requires having an halo of 2. + +pause(comm, t=1) + +halo = 2 if size > 1 else 0 # for Sphinx-gallery as it runs with 1 rank +halo_op = MPIHalo( + dims=(n, ), + halo=halo, + proc_grid_shape=proc_grid_shape, + comm=comm +) + +# Apply halo +x_dist_halo = halo_op @ x_dist + +if rank == 0: + print('1D Halo with halo=2') +pause(comm, t=1) +print(f"Rank {rank} - shape before/after haloing: {x_dist.local_array.size}" + f"/{x_dist_halo.local_array.size}") + +# Derivative operator (using Halo) +DOp = pylops.FirstDerivative(dims=x_dist_halo.local_array.size, axis=0, order=5) +DOp_dist = pylops_mpi.MPIBlockDiag([DOp, ]) +Op_dist = halo_op.H @ DOp_dist @ halo_op + +y_dist = Op_dist @ x_dist + +# Derivative operator (original) +DOp1_dist = pylops_mpi.MPIFirstDerivative(dims=n, order=5) + +y1_dist = DOp1_dist @ x_dist + +assert np.allclose( + y_dist.local_array, + y1_dist.local_array) + + +############################################################################### +# Let's move to something a bit more complicated. We assume to have a +# 2-dimensional array that is chunked over both axes. Note that since we +# partition the array over both axes, the number of ranks must be a power of +# 2 of an integer number. First, we simply want to multiply each element by +# a scalar; whilst this does not really require an halo, we will see how we +# can create local :py:class:`pylops.basicoperators.Diagonal` operators of the +# correct size. + + +# Input and halo partition +dims = (n, n) +size_2 = math.pow(size, 1 / 2) +power_of_2 = size_2 == int(size_2) + +if not power_of_2: + pause(comm, t=1) + if rank == 0: + print(f"Number of ranks = {size} is not a power of 2 of an " + "integer number, skipping example with 2-dimensional array") +else: + # Halo operator + size_2 = int(size_2) + proc_grid_shape = (size_2, size_2) # number of partitions over each axis + + halo = 1 if size > 1 else 0 # for Sphinx-gallery as it runs with 1 rank + halo_op = MPIHalo( + dims=dims, + halo=halo, + proc_grid_shape=proc_grid_shape, + comm=comm + ) + + # Global array + x = np.arange(np.prod(dims)).astype(np.float64).reshape(dims) + + # Local array + x_slice = halo_block_split(dims, comm, proc_grid_shape) + x_local = x[x_slice] + x_local_shape = x_local.shape + xhalo_local_shape, lefts, rights = \ + local_extent_from_slice(x_local_shape, x_slice, halo) + xhalo_local_size = np.prod(xhalo_local_shape) + + # Distributed array + x_dist = pylops_mpi.DistributedArray( + global_shape=np.prod(dims), + local_shapes=comm.allgather(np.prod(x_local_shape)), + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER) + x_dist.local_array[:] = x_local.flatten() + + # Apply halo + x_dist_halo = halo_op @ x_dist + + if rank == 0: + print('2D Halo with halo=1') + pause(comm, t=1) + print(f"Rank {rank} - shape before/after haloing: {x_local_shape}" + f"/{xhalo_local_shape}") + + # Diagonal operator acting also on the halo, then halo is removed + DOp = pylops.Diagonal(2 * np.ones(xhalo_local_size)) + DOp_dist = pylops_mpi.MPIBlockDiag([DOp, ]) + + Op_dist = halo_op.H @ DOp_dist @ halo_op + y_dist = Op_dist @ x_dist + + assert np.allclose(2 * x_dist.local_array, y_dist.local_array) + + +############################################################################### +# Next, we turn our attention back to the first derivative; this time, since +# we assume that the 2-dimensional array is chunked over both axes, we will see +# that we can leverage the halo to compute the derivative over either of the +# axes + +if power_of_2: + halo_op = MPIHalo( + dims=dims, + halo=halo, + proc_grid_shape=proc_grid_shape, + comm=comm + ) + + for axis in [0, 1]: + DOp = pylops.FirstDerivative(dims=xhalo_local_shape, axis=axis) + DOp_dist = pylops_mpi.MPIBlockDiag([DOp, ]) + + Op_dist = halo_op.H @ DOp_dist @ halo_op + y_dist = Op_dist @ x_dist + + DOp1 = pylops.FirstDerivative(dims=dims, axis=axis) + y = DOp1 @ x + + core_slices = tuple(slice(left, None if right == 0 else -right) + for left, right in zip(lefts, rights)) + + # Check that derivative on entire 2d object is the same as the one on blocks with halo + assert np.allclose(y[x_slice][core_slices], + y_dist.local_array.reshape(x_local_shape)[core_slices]) + + +############################################################################### +# And we repeat the same with a 3-dimensional array + + +# Input and halo partition +dims = (n, n, n) +size_3 = math.pow(size, 1 / 3) +power_of_3 = size_3 == int(size_3) + +if not power_of_3: + pause(comm, t=1) + if rank == 0: + print(f"Number of ranks = {size} is not a power of 3 of an " + "integer number, skipping example with 3-dimensional array") +else: + # Halo operator + size_3 = int(size_3) + proc_grid_shape = (size_3, size_3, size_3) # number of partitions over each axis + + halo = 1 + halo_op = MPIHalo( + dims=dims, + halo=halo, + proc_grid_shape=proc_grid_shape, + comm=comm + ) + + # Global array + x = np.arange(np.prod(dims)).astype(np.float64).reshape(dims) + + # Local array + x_slice = halo_block_split(dims, comm, proc_grid_shape) + x_local = x[x_slice] + x_local_shape = x_local.shape + xhalo_local_shape, lefts, rights = local_extent_from_slice(x_local_shape, x_slice, halo) + xhalo_local_size = np.prod(xhalo_local_shape) + + if rank == 0: + print('3D Halo with halo=1') + pause(comm, t=1) + print(f"Rank {rank} - shape before/after haloing: {x_local_shape}" + f"/{xhalo_local_shape}") + + # Distributed array + x_dist = pylops_mpi.DistributedArray( + global_shape=np.prod(dims), + local_shapes=comm.allgather(np.prod(x_local_shape)), + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER) + x_dist.local_array[:] = x_local.flatten() + + for axis in [0, 1, 2]: + DOp = pylops.FirstDerivative(dims=xhalo_local_shape, axis=axis) + DOp_dist = pylops_mpi.MPIBlockDiag([DOp, ]) + + Op_dist = halo_op.H @ DOp_dist @ halo_op + y_dist = Op_dist @ x_dist + + DOp1 = pylops.FirstDerivative(dims=dims, axis=axis) + y = DOp1 @ x + + core_slices = tuple(slice(left, None if right == 0 else -right) + for left, right in zip(lefts, rights)) + + # Check that derivative on entire 2d object is the same as the one on blocks with halo + assert np.allclose(y[x_slice][core_slices], + y_dist.local_array.reshape(x_local_shape)[core_slices]) + + + +############################################################################### +# Next, we move to something more interesting. We will use the +# :py:class:`pylops_mpi.basicoperators.MPIHalo` operator to create a distributed +# non-stationary convolutional operator acting on 1-dimensional array. This will +# be ultimately equivalent to PyLops' +# :py:class:`pylops.signalprocessing.NonStationaryConvolve1D`, however both the +# input array and the filters will be distibuted. What makes this operator +# interesting is that we need to handle convolutions at the edges between +# different ranks and to do that we will halo the input array of a number of +# samples equal to the distance between two filters and we will also borrow the +# filtering from the next/previous rank. + + +# Input signal operator +nlocal = 64 +nfilters_local = 2 +n = nlocal * size +proc_grid_shape = (size, ) + +# Filters +ntw = 16 +dt = 0.004 +tw = np.arange(ntw) * dt +fs = np.arange(nfilters_local * size) * 8 + 20 +wavs = np.stack([ricker(tw, f0=f)[0] for f in fs]) + +# Filters centers (selected such that they are symmetric on either +# side of the edges of the distributed array between ranks) +n_between_h = nlocal // nfilters_local +ih = nlocal // (2 * nfilters_local) + \ + np.arange(0, nlocal * size, n_between_h) + +pause(comm, t=1) +if rank == 0: + print(f"Filters centers: {ih}") + +# Input signal +t = np.arange(n) * dt +x = np.zeros(n, dtype=np.float64) +x[ih] = 1.0 + +# Halo operator +halo = n_between_h if size > 1 else 0 # for Sphinx-gallery as it runs with 1 rank +halo_op = MPIHalo( + dims=(n, ), + halo=halo, + proc_grid_shape=proc_grid_shape, + comm=comm +) + +# Distributed array +x_dist = pylops_mpi.DistributedArray( + global_shape=n, + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER) +x_slice = halo_block_split((n, ), comm, proc_grid_shape) +x_local = x[x_slice] +x_dist.local_array[:] = x_local + +x_local_shape = x_local.shape +xhalo_local_shape, lefts, rights = \ + local_extent_from_slice(x_local_shape, x_slice, halo) +pause(comm, t=1) +print(f"Rank {rank} - shape before/after haloing: {x_local_shape}" + f"/{xhalo_local_shape}") + +# Create operators +if size == 1: + # for Sphinx-gallery + COp = pylops.signalprocessing.NonStationaryConvolve1D( + dims=nlocal + halo, hs=wavs, ih=ih) +else: + if rank == 0: + # Only one extra filters on the right + COp = pylops.signalprocessing.NonStationaryConvolve1D( + dims=nlocal + halo, hs=wavs[:nfilters_local + 1], + ih=ih[:nfilters_local + 1] + ) + elif rank == size - 1: + # Only one extra filters on the left + COp = pylops.signalprocessing.NonStationaryConvolve1D( + dims=nlocal + halo, hs=wavs[-nfilters_local - 1:], + ih=ih[-nfilters_local - 1:] - x_slice[0].start + halo + ) + else: + # Two extra filters on either side + COp = pylops.signalprocessing.NonStationaryConvolve1D( + dims=nlocal + 2 * halo, hs=wavs[nfilters_local * rank - 1:nfilters_local * (rank + 1) + 1], + ih=ih[nfilters_local * rank - 1:nfilters_local * (rank + 1) + 1] - x_slice[0].start + halo + ) + +# Create and apply total operator +COp_dist = pylops_mpi.MPIBlockDiag([COp, ]) +Op_dist = halo_op.H @ COp_dist @ halo_op + +y_dist = Op_dist @ x_dist +xadj_dist = Op_dist.H @ y_dist + +y_dist = y_dist.asarray() +xadj_dist = xadj_dist.asarray() + +# Create and apply benchmark serial operator +COp = pylops.signalprocessing.NonStationaryConvolve1D( + dims=n, hs=wavs, ih=ih +) + +y = COp @ x +xadj = COp.H @ y + +############################################################################### +# Let's display the results + +if rank == 0: + plt.figure(figsize=(10, 3)) + plt.plot(t, x, "k", label="Input") + plt.plot(t, y, "b", label="Forward (serial)") + plt.plot(t, y_dist, "--r", label="Forward (distr)") + plt.xlabel("Time [sec]") + plt.xlim(0, t[-1]) + plt.legend() + plt.tight_layout() + + plt.figure(figsize=(10, 3)) + plt.plot(t, x, "k", label="Input") + plt.plot(t, xadj, "b", label="Adjoint (serial)") + plt.plot(t, xadj_dist, "--r", label="Adjoint (distr)") + plt.xlabel("Time [sec]") + plt.xlim(0, t[-1]) + plt.legend() + plt.tight_layout() + + +############################################################################### +# We repeat the same exercise for a 2-dimensional array using PyLops' +# :py:class:`pylops.signalprocessing.NonStationaryConvolve2D`; here +# the input array and the filters are distibuted along the slowest axis. + + +# Input signal operator +nlocal = (32, 64) +nfilters_local = (2, 4) +proc_grid_shape = (size, 1) +n = (nlocal[0] * proc_grid_shape[0], + nlocal[1] * proc_grid_shape[1]) + +# Filters +ntw = 15 +dt = 0.004 +tw = np.arange(ntw) * dt +fs = np.arange(nfilters_local[1] * proc_grid_shape[1]) * 8 + 20 + +wavy = gaussian(ntw, 2.0) +wavs = np.zeros((nfilters_local[0] * proc_grid_shape[0], + nfilters_local[1] * proc_grid_shape[1], + ntw, ntw * 2 - 1)) +for ify in range(nfilters_local[0] * proc_grid_shape[0]): + for ifx in range(nfilters_local[1] * proc_grid_shape[1]): + wavx = ricker(tw, f0=fs[ifx])[0] + wav = np.outer(wavy, wavx[None].T) + wavs[ify, ifx] = wav + +# Filters centers (selected such that they are symmetric on either +# side of the edges of the distributed array between ranks) +n_between_hy = nlocal[0] // nfilters_local[0] +n_between_hx = nlocal[1] // nfilters_local[1] + +ihy = nlocal[0] // (2 * nfilters_local[0]) + np.arange(0, nlocal[0] * proc_grid_shape[0], n_between_hy) +ihx = nlocal[1] // (2 * nfilters_local[1]) + np.arange(0, nlocal[1] * proc_grid_shape[1], n_between_hx) + +pause(comm, t=1) +if rank == 0: + print(f"Filters centers - 1st axis: {ihy}, 2nd axis: {ihx}") + +# Input signal +x = np.zeros(n, dtype=np.float64) +for ih in ihy: + x[ih, ihx] = 1.0 + +# Halo operator +halo = (n_between_hy, 0) if size > 1 else (0, 0) # for Sphinx-gallery as it runs with 1 rank +halo_op = MPIHalo( + dims=n, + halo=n_between_hy, # STRANGE! + proc_grid_shape=proc_grid_shape, + comm=comm +) + +# Distributed array +x_dist = pylops_mpi.DistributedArray( + global_shape=np.prod(n), + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER) +x_slice = halo_block_split(n, comm, proc_grid_shape) +x_local = x[x_slice] +x_dist.local_array[:] = x_local.ravel() + +# Apply halo +x_dist_halo = halo_op @ x_dist + +x_local_shape = x_local.shape +xhalo_local_shape, lefts, rights = \ + local_extent_from_slice(x_local_shape, x_slice, halo) +xhalo_local_size = np.prod(xhalo_local_shape) +pause(comm, t=1) +print(f"Rank {rank} - shape before/after haloing: {x_local_shape}" + f"/{xhalo_local_shape}") + +# Create operators +if size == 1: + # for Sphinx-gallery + COp = pylops.signalprocessing.NonStationaryConvolve2D( + dims=(nlocal[0] + halo[0], n[1]), hs=wavs, ihx=ihy, ihz=ihx) +else: + if rank == 0: + COp = pylops.signalprocessing.NonStationaryConvolve2D( + dims=(nlocal[0] + halo[0], n[1]), + hs=wavs[:nfilters_local[0] + 1], + ihx=ihy[:nfilters_local[0] + 1], + ihz=ihx + ) + elif rank == size - 1: + COp = pylops.signalprocessing.NonStationaryConvolve2D( + dims=(nlocal[0] + halo[0], n[1]), + hs=wavs[-nfilters_local[0] - 1:], + ihx=ihy[-nfilters_local[0] - 1:] - x_slice[0].start + halo[0], + ihz=ihx + ) + else: + COp = pylops.signalprocessing.NonStationaryConvolve2D( + dims=(nlocal[0] + 2 * halo[0], n[1]), + hs=wavs[nfilters_local[0] * rank - 1:nfilters_local[0] * (rank + 1) + 1], + ihx=ihy[nfilters_local[0] * rank - 1:nfilters_local[0] * (rank + 1) + 1] - x_slice[0].start + halo[0], + ihz=ihx + ) + +# Create and apply total operator +COp_dist = pylops_mpi.MPIBlockDiag([COp, ]) +Op_dist = halo_op.H @ COp_dist @ halo_op + +y_dist = Op_dist @ x_dist +xadj_dist = Op_dist.H @ y_dist + +y_dist = y_dist.asarray().reshape(n) +xadj_dist = xadj_dist.asarray().reshape(n) + +# Create and apply benchmark serial operator +COp = pylops.signalprocessing.NonStationaryConvolve2D( + dims=n, hs=wavs, ihx=ihy, ihz=ihx +) + +y = COp @ x +xadj = COp.H @ y + +############################################################################### +# Let's display the results + +if rank == 0: + fig, axs = plt.subplots(2, 2, sharex=True, sharey=True, figsize=(10, 8)) + axs[0, 0].imshow(y_dist, cmap="gray", vmin=-1, vmax=1) + axs[0, 0].set_title("Forward") + axs[0, 1].imshow(y_dist - y, cmap="gray", vmin=-.1, vmax=.1) + axs[0, 1].set_title("Forward (distr. - serial)") + axs[1, 0].imshow(xadj_dist, cmap="gray", vmin=-1, vmax=1) + axs[1, 0].set_title("Adjoint") + axs[1, 1].imshow(xadj_dist - xadj, cmap="gray", vmin=-.1, vmax=.1) + axs[1, 1].set_title("Adjoint (distr. - serial)") + for ax in axs.flatten(): + ax.axis("tight") + fig.tight_layout() diff --git a/examples/plot_nonstatconv.py b/examples/plot_nonstatconv.py new file mode 100644 index 00000000..e1c63f96 --- /dev/null +++ b/examples/plot_nonstatconv.py @@ -0,0 +1,234 @@ +""" +Non-Stationary Convolution +========================== +This example demonstrates how to use the +:py:class:`pylops_mpi.signalprocessing.MPINonStationaryConvolve1D` +operator to apply 1d non-stationary convolution to 1-dimensional or +N-dimensional distributed arrays over the distributed axis. + +This operator is effectively equivalent to PyLops' +:py:class:`pylops.signalprocessing.NonStationaryConvolve1D`, however both the +input array and the filters will be distibuted. What makes this operator +interesting is that we need to handle convolutions at the edges between +different ranks and to do that we will halo the input array of a number of +samples equal to the distance between two filters and we will also borrow the +filtering from the next/previous rank. This is internally achieved by means +of the :py:class:`pylops_mpi.basicoperators.MPIHalo` operator. + +""" +from matplotlib import pyplot as plt + +import sys +import time +import numpy as np +import pylops + +from pylops.utils.wavelets import ricker +from mpi4py import MPI + +import pylops_mpi + +plt.close("all") +np.random.seed(42) + +comm = MPI.COMM_WORLD +rank = comm.Get_rank() +size = comm.Get_size() + +def pause(comm, t=4): + sys.stdout.flush() + comm.barrier() + time.sleep(t) + +def local_extent_from_slice(local_shape, local_slice, halo): + lefts = [] + rights = [] + if isinstance(halo, int): + for sl in local_slice: + lefts.append(halo if (sl.start or 0) > 0 else 0) + rights.append(halo if sl.stop is not None else 0) + else: + for sl, hal in zip(local_slice, halo): + lefts.append(hal if (sl.start or 0) > 0 else 0) + rights.append(hal if sl.stop is not None else 0) + extent = tuple(dim + l + r for dim, l, r in zip(local_shape, lefts, rights)) + return extent, lefts, rights + + +############################################################################### +# Let’s start by creating a 1-dimensional distruted array as well as the +# filters + +# Input signal dimensions +nlocal = 64 +nfilters_local = 4 +n = nlocal * size +proc_grid_shape = (size, ) + +# Filters +ntw = 16 +dt = 0.004 +tw = np.arange(ntw) * dt +fs = np.arange(nfilters_local * size) * 8 + 20 +wavs = np.stack([ricker(tw, f0=f)[0] for f in fs]) + +# Filters centers (selected such that they are symmetric on either +# side of the edges of the distributed array between ranks) +n_between_h = nlocal // nfilters_local +ih = nlocal // (2 * nfilters_local) + \ + np.arange(0, nlocal * size, n_between_h) + +pause(comm, t=1) +if rank == 0: + print(f"Filters centers: {ih}") + +# Input signal +t = np.arange(n) * dt +x = np.zeros(n, dtype=np.float64) +x[ih] = 1.0 + +# Distributed array +x_dist = pylops_mpi.DistributedArray( + global_shape=n, + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER) +x_dist.local_array[:] = x[nlocal * rank: nlocal * (rank + 1)] + +# Create operator +COp_dist = pylops_mpi.signalprocessing.MPINonStationaryConvolve1D( + n, wavs, ih, base_comm=comm) + +# Apply operator +y_dist = COp_dist @ x_dist +xadj_dist = COp_dist.H @ y_dist + +y_dist = y_dist.asarray() +xadj_dist = xadj_dist.asarray() + +# Create and apply benchmark serial operator +COp = pylops.signalprocessing.NonStationaryConvolve1D( + dims=n, hs=wavs, ih=ih +) + +y = COp @ x +xadj = COp.H @ y + +############################################################################### +# Let's display the results + +if rank == 0: + plt.figure(figsize=(10, 3)) + plt.plot(t, x, "k", label="Input") + plt.plot(t, y, "b", label="Forward (serial)") + plt.plot(t, y_dist, "--r", label="Forward (distr)") + plt.xlabel("Time [sec]") + plt.xlim(0, t[-1]) + plt.legend() + plt.tight_layout() + + plt.figure(figsize=(10, 3)) + plt.plot(t, x, "k", label="Input") + plt.plot(t, xadj, "b", label="Adjoint (serial)") + plt.plot(t, xadj_dist, "--r", label="Adjoint (distr)") + plt.xlabel("Time [sec]") + plt.xlim(0, t[-1]) + plt.legend() + plt.tight_layout() + +############################################################################### +# Whilst, up until now we can considered 1D signals, +# :py:class:`pylops.signalprocessing.NonStationaryConvolve1D` can also operate +# on NDarray, applying non-stationary filters over one axis. Let's consider +# here a 2D signal with the filters applied over the first axis. + +# Input signal +t = np.arange(n) * dt +x = np.zeros((n, 20), dtype=np.float64) +x[ih] = 1.0 + +# Distributed array +x_dist = pylops_mpi.DistributedArray( + global_shape=n * 20, + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER) +x_dist.local_array[:] = x[nlocal * rank: nlocal * (rank + 1)].ravel() + +# Create operator +COp_dist = pylops_mpi.signalprocessing.MPINonStationaryConvolve1D( + (n, 20), wavs, ih, axis=0, base_comm=comm) + +# Apply operator +y_dist = COp_dist @ x_dist +xadj_dist = COp_dist.H @ y_dist + +y_dist = y_dist.asarray().reshape(n, 20) +xadj_dist = xadj_dist.asarray().reshape(n, 20) + +# Create and apply benchmark serial operator +COp = pylops.signalprocessing.NonStationaryConvolve1D( + dims=(n, 20), hs=wavs, ih=ih, axis=0, +) + +y = COp @ x +xadj = COp.H @ y + +if rank == 0: + fig, axs = plt.subplots(2, 3, figsize=(12, 7)) + fig.suptitle("Non-stationary convolution on axis=0") + axs[0][0].imshow(x, cmap="gray", vmin=-1, vmax=1) + axs[0][1].imshow(y_dist, cmap="gray", vmin=-1, vmax=1) + axs[0][2].imshow(xadj_dist, cmap="gray", vmin=-1, vmax=1) + axs[1][0].axis("off") + axs[1][1].imshow(y_dist - y, cmap="gray", vmin=-1, vmax=1) + axs[1][2].imshow(xadj_dist - xadj, cmap="gray", vmin=-1, vmax=1) + for ax in axs.ravel(): + ax.axis("tight") + +############################################################################### +# And on 2D signal with the filters applied over the second axis. + +# Input signal +t = np.arange(n) * dt +x = np.zeros((20, n), dtype=np.float64) +x[:, ih] = 1.0 + +# Distributed array +x_dist = pylops_mpi.DistributedArray( + global_shape=n * 20, + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER) +x_dist.local_array[:] = x[:, nlocal * rank: nlocal * (rank + 1)].ravel() + +# Create operator +COp_dist = pylops_mpi.signalprocessing.MPINonStationaryConvolve1D( + (20, n), wavs, ih, axis=-1, base_comm=comm) + +# Apply operator +y_dist = COp_dist @ x_dist +xadj_dist = COp_dist.H @ y_dist + +y_dist = y_dist.asarray() +xadj_dist = xadj_dist.asarray() + +y_dist = y_dist.reshape(size, 20, nlocal).transpose(1, 0, 2).reshape(20, -1) +xadj_dist = xadj_dist.reshape(size, 20, nlocal).transpose(1, 0, 2).reshape(20, -1) + +# Create and apply benchmark serial operator +COp = pylops.signalprocessing.NonStationaryConvolve1D( + (20, n), wavs, ih, axis=-1, +) + +y = COp @ x +xadj = COp.H @ y + +if rank == 0: + fig, axs = plt.subplots(2, 3, figsize=(12, 7)) + fig.suptitle("Non-stationary convolution on axis=0") + axs[0][0].imshow(x, cmap="gray", vmin=-1, vmax=1) + axs[0][1].imshow(y_dist, cmap="gray", vmin=-1, vmax=1) + axs[0][2].imshow(xadj_dist, cmap="gray", vmin=-1, vmax=1) + axs[1][0].axis("off") + axs[1][1].imshow(y_dist - y, cmap="gray", vmin=-1, vmax=1) + axs[1][2].imshow(xadj_dist - xadj, cmap="gray", vmin=-1, vmax=1) + for ax in axs.ravel(): + ax.axis("tight") diff --git a/pylops_mpi/basicoperators/Halo.py b/pylops_mpi/basicoperators/Halo.py new file mode 100644 index 00000000..923655d2 --- /dev/null +++ b/pylops_mpi/basicoperators/Halo.py @@ -0,0 +1,422 @@ +import math +from typing import Any, Dict, Optional, Tuple, Union + +import numpy as np +from mpi4py import MPI +from pylops.utils.backend import get_module + +from pylops_mpi import DistributedArray, MPILinearOperator, Partition +from pylops_mpi.Distributed import DistributedMixIn + + +def halo_block_split( + global_shape: tuple, + comm: MPI.Comm, + grid_shape: Optional[tuple] = None, +) -> tuple: + r"""Split a global array over a Cartesian process grid. + + Compute the local slice owned by the calling rank when ``global_shape`` is + distributed over ``grid_shape``. This helper follows the same Cartesian + partitioning used internally by :class:`MPIHalo`. + + Parameters + ---------- + global_shape : :obj:`tuple` + Shape of the global array before flattening. + comm : :obj:`mpi4py.MPI.Comm` + MPI communicator containing the ranks in the process grid. + grid_shape : :obj:`tuple`, optional + Number of ranks along each array axis. When ``None``, all ranks are + placed along the last axis. + + Returns + ------- + local_slice : :obj:`tuple` + Tuple of :class:`slice` objects selecting the local block owned by the + calling rank. + + Raises + ------ + ValueError + If ``grid_shape`` does not contain exactly ``comm.Get_size()`` ranks. + + """ + ndim = len(global_shape) + size = comm.Get_size() + # default: put all ranks on the last axis + if grid_shape is None: + grid_shape = (1,) * (ndim - 1) + (size,) + if math.prod(grid_shape) != size: + raise ValueError(f"grid_shape {grid_shape} does not match comm size {size}") + + cart = comm.Create_cart(grid_shape, periods=[False] * ndim, reorder=True) + coords = cart.Get_coords(cart.Get_rank()) + + slices = [] + for gdim, procs_on_axis, coord in zip(global_shape, grid_shape, coords): + block_size = math.ceil(gdim / procs_on_axis) + start = coord * block_size + end = min(start + block_size, gdim) + if coord == procs_on_axis - 1: + sl = slice(start, None) + else: + sl = slice(start, end) + slices.append(sl) + return tuple(slices) + + +class MPIHalo(DistributedMixIn, MPILinearOperator): + r"""MPI Halo + + Apply haloing to all dimensions of a flattened, 1-dimensional + :class:`pylops_mpi.DistributedArray` after local reshaping to a + N-dimensional array. + + The Halo operator is applied over a Cartesian process grid, where each + rank owns a local block of the global N-dimensional array. + + Parameters + ---------- + dims : :obj:`tuple` + Number of samples for each dimension. + halo : :obj:`int` or :obj:`tuple` + Number of halo samples to add around each local block. A scalar value + applies the same halo to both sides of every axis. A tuple of length + ``ndim`` applies a symmetric halo per axis. A tuple of length + ``2 * ndim`` specifies the halo to apply at the start and at the end + for each axis. + proc_grid_shape : :obj:`tuple` + Number of MPI ranks along each dimension. + comm : :obj:`mpi4py.MPI.Comm`, optional + MPI Base Communicator. Defaults to ``mpi4py.MPI.COMM_WORLD``. + dtype : :obj:`str`, optional + Type of elements in the input array. + + Attributes + ---------- + shape : :obj:`tuple` + Operator shape + + Notes + ----- + The MPIHalo operator extends each rank's local array with a **halo** (ghost cells) + of width ``halo`` along the haloed axes, providing the neighboring rank's data for + stencil-like operators. Ranks are arranged in an N-dimensional Cartesian grid as + provided by ``proc_grid_shape``, whereby each rank owns a contiguous block of the + global array. The ``halo`` is normalised to a tuple of length ``2 * ndim``, containing + one ``(minus, plus)`` halo-width pair for each axis. The tuple is flattened in + axis order as + + .. math:: + (h_{0,-}, h_{0,+}, h_{1,-}, h_{1,+}, \ldots) + + where :math:`h_{i,-}`` and :math:`h_{i,+}`` represent the halo widths on the + negative and positive side of the i-th axis, respectively. For convenience, + ``halo`` may be provided as a scalar when the same symmetric halo is + required along all axes, or as a tuple of length ``ndim`` when symmetric halos + of different width are applied to different axis. Ghost cells on the global + boundary of an axis are zero by default. + + In the forward mode, each rank exchanges boundary slices with its left and + right neighbors along each axis via ``MPI_Sendrecv``. Ranks at a global + boundary have ``MPI.PROC_NULL`` as their neighbor on that side, so those + ghost regions remain zero and no exchange is attempted. Once the exchange + is complete, local PyLops operators can be applied independently + on each rank's extended block, typically wrapped into a + :class:`pylops_mpi.basicoperators.MPIBlockDiag` operator. + + In the adjoint mode, the reverse operation is performed the original + local domain is extracted by removing the ghost cells. + + Finally, note that the Halo operator is not linear operator per se; instead, + it is meant to sandwitch any linear operator to implement equivalent + behaviours to the serial version of such an operator. + + """ + + def __init__( + self, + dims: tuple, + halo: Union[int, tuple], + proc_grid_shape: Optional[tuple] = None, + comm: MPI.Comm = MPI.COMM_WORLD, + dtype: Any = np.float64, + ) -> None: + self.global_dims = tuple(dims) + self.ndim = len(dims) + + self.comm = comm + self.dtype = dtype + + if proc_grid_shape is None: + proc_grid_shape = (1,) * (self.ndim - 1) + (self.comm.Get_size(),) + self.proc_grid_shape = tuple(proc_grid_shape) + if math.prod(self.proc_grid_shape) != self.comm.Get_size(): + raise ValueError( + f"grid_shape {self.proc_grid_shape} does not match comm size {self.comm.Get_size()}" + ) + + self.cart_comm, self.neigh = self._build_topo() + self.halo = self._parse_halo(halo) + + self.local_dims = self._calc_local_dims() + self.local_extent = self._calc_local_extent() + self._validate_exchange_widths() + self._local_dim_sizes = [] + # For uneven global dimensions, MPIHalo's Cartesian block sizes differ + # from DistributedArray's default flat split. Store those sizes so + # _rmatvec can build an adjoint output with the same local ownership + # as the original Halo input. + comm_group = self.comm.Get_group() + cart_group = self.cart_comm.Get_group() + for rank in range(self.comm.Get_size()): + cart_rank = MPI.Group.Translate_ranks(comm_group, [rank], cart_group)[0] + coords = self.cart_comm.Get_coords(cart_rank) + local_size = 1 + for gdim, coord, grid_procs in zip(self.global_dims, coords, self.proc_grid_shape): + block_size = math.ceil(gdim / grid_procs) + start = coord * block_size + end = min(start + block_size, gdim) + local_size *= end - start + self._local_dim_sizes.append(local_size) + + self._local_extent_sizes = self._allgather( + self.comm, + None, + int(np.prod(self.local_extent)), + ) + + self.shape = ( + int(np.sum(self._local_extent_sizes)), + int(np.prod(self.global_dims)), + ) + super().__init__(shape=self.shape, dtype=np.dtype(dtype), base_comm=comm) + + def _parse_halo(self, h: Union[int, tuple]) -> tuple: + """Normalize halo input to a 2 * ndim tuple of per-side widths for each axis of the N-dimensional array. + + Accepts a scalar, a tuple of length-1, one value per axis (the same value is assigned to both sides), + or explicit minus/plus pairs for each axis. + """ + if isinstance(h, (int, np.int64, np.int32)): + halo = (h,) * (2 * self.ndim) + trimmed = list(halo) + for ax in range(self.ndim): + if trimmed[2 * ax] and self.neigh[("-", ax)] == MPI.PROC_NULL: + trimmed[2 * ax] = 0 + if trimmed[2 * ax + 1] and self.neigh[("+", ax)] == MPI.PROC_NULL: + trimmed[2 * ax + 1] = 0 + halo = tuple(trimmed) + if any(h < 0 for h in halo): + raise ValueError("Halo widths must be non-negative") + return halo + + h = tuple(h) + if len(h) == 1: + halo = h * (2 * self.ndim) + elif len(h) == self.ndim: + halo = sum(tuple([(d, d) for d in h]), ()) + elif len(h) == 2 * self.ndim: + halo = h + else: + raise ValueError(f"Invalid halo length {len(h)} for ndim={self.ndim}") + if any(h < 0 for h in halo): + raise ValueError("Halo widths must be non-negative") + return halo + + def _build_topo(self) -> Tuple[MPI.Comm, Dict[Tuple[str, int], int]]: + """Create the Cartesian communicator and map neighboring ranks on the distribution axis.""" + cart_comm = self.comm.Create_cart( + self.proc_grid_shape, + periods=[False] * self.ndim, + reorder=True, + ) + neigh = {} + for ax in range(self.ndim): + before, after = cart_comm.Shift(ax, 1) + neigh[("-", ax)] = before + neigh[("+", ax)] = after + return cart_comm, neigh + + def _calc_local_dims(self) -> tuple: + """Compute this rank's local block shape before halo padding.""" + rank = self.cart_comm.Get_rank() + coords = self.cart_comm.Get_coords(rank) + local = [] + for ax, (gdim, coord, grid_procs) in enumerate( + zip(self.global_dims, coords, self.proc_grid_shape) + ): + block_size = math.ceil(gdim / grid_procs) + start = coord * block_size + end = min(start + block_size, gdim) + local.append(end - start) + return tuple(local) + + def _calc_local_extent(self) -> tuple: + """Compute this rank's local block shape after halo padding.""" + ext = [] + for ax in range(self.ndim): + minus_halo, plus_halo = self.halo[2 * ax], self.halo[2 * ax + 1] + ext.append(self.local_dims[ax] + minus_halo + plus_halo) + return tuple(ext) + + def _validate_local_array_shape( + self, x: DistributedArray, expected_shape: tuple, name: str + ) -> None: + """Raise if a distributed input does not match this rank's expected local shape.""" + local_shapes = self.cart_comm.allgather( + (x.local_array.size, int(np.prod(expected_shape)), expected_shape) + ) + for rank, (actual_size, expected_size, shape) in enumerate(local_shapes): + if actual_size != expected_size: + raise ValueError( + "MPIHalo input local shapes do not match the Cartesian block " + f"decomposition: rank {rank}: {name} local array has size " + f"{actual_size}, expected {expected_size} for local shape {shape}" + ) + + def _validate_exchange_widths(self) -> None: + """ + Raise if the requested halos cannot be exchanged with one-hop neighbors. + For example: + - Halo width Larger than local block size or that of the remote neighbors. + """ + width_error = 1 + mismatch_error = 2 + local_error = 0 + for ax in range(self.ndim): + before, after = self.halo[2 * ax], self.halo[2 * ax + 1] + minus_nbr, plus_nbr = self.neigh[("-", ax)], self.neigh[("+", ax)] + local_dim = self.local_dims[ax] + + if before > local_dim and minus_nbr != MPI.PROC_NULL: + local_error |= width_error + if after > local_dim and plus_nbr != MPI.PROC_NULL: + local_error |= width_error + + plus_neighbor_before = self.cart_comm.sendrecv( + before, dest=minus_nbr, source=plus_nbr + ) + minus_neighbor_after = self.cart_comm.sendrecv(after, dest=plus_nbr, source=minus_nbr) + if plus_nbr != MPI.PROC_NULL and after != plus_neighbor_before: + local_error |= mismatch_error + if minus_nbr != MPI.PROC_NULL and before != minus_neighbor_after: + local_error |= mismatch_error + + global_error = self.cart_comm.allreduce(local_error, op=MPI.BOR) + if global_error & width_error: + raise ValueError( + "MPIHalo halo widths are not supported by the current one-hop " + "exchange: halo width exceeds local block size" + ) + if global_error & mismatch_error: + raise ValueError( + "MPIHalo halo widths are not supported by the current one-hop " + "exchange: halo width does not match neighbor halo width" + ) + + def _exchange_along_axis(self, ncp: Any, arr: Any, axis: int, before: int, after: int, engine: str) -> None: + """Exchange boundary/halo slices with neighboring ranks along one axis.""" + minus_nbr, plus_nbr = self.neigh[("-", axis)], self.neigh[("+", axis)] + # slice definitions + slicer = [slice(None)] * self.ndim + # send before + if before and minus_nbr != MPI.PROC_NULL: + snd_s = slicer.copy() + snd_s[axis] = slice(before, 2 * before) + snd = arr[tuple(snd_s)].copy() + rcv = ncp.empty_like(snd) + rcv = self._sendrecv( + self.cart_comm, + None, + snd, + rcv, + dest=minus_nbr, + source=minus_nbr, + engine=engine, + ) + rcv_s = slicer.copy() + rcv_s[axis] = slice(0, before) + arr[tuple(rcv_s)] = rcv + # send after + if after and plus_nbr != MPI.PROC_NULL: + snd_s = slicer.copy() + snd_s[axis] = slice(-2 * after, -after) + rcv_s = slicer.copy() + rcv_s[axis] = slice(-after, None) + snd = arr[tuple(snd_s)].copy() + rcv = ncp.empty_like(snd) + rcv = self._sendrecv( + self.cart_comm, + None, + snd, + rcv, + dest=plus_nbr, + source=plus_nbr, + engine=engine, + ) + arr[tuple(rcv_s)] = rcv + + def _matvec(self, x: DistributedArray) -> DistributedArray: + ncp = get_module(x.engine) + if x.partition != Partition.SCATTER: + raise ValueError( + f"x should have partition={Partition.SCATTER} Got {x.partition} instead..." + ) + + self._validate_local_array_shape(x, self.local_dims, "x") + + y = DistributedArray( + global_shape=self.shape[0], + partition=Partition.SCATTER, + local_shapes=self._local_extent_sizes, + base_comm=x.base_comm, + base_comm_nccl=x.base_comm_nccl, + engine=x.engine, + dtype=self.dtype, + ) + + core = x.local_array.reshape(self.local_dims) + halo_arr = ncp.zeros(self.local_extent, dtype=self.dtype) + # insert core + core_slices = [ + slice(left, left + ldim) + for left, ldim in zip(self.halo[::2], self.local_dims) + ] + halo_arr[tuple(core_slices)] = core + + # exchange along each axis + for ax in range(self.ndim): + before, after = self.halo[2 * ax], self.halo[2 * ax + 1] + self._exchange_along_axis( + ncp, halo_arr, axis=ax, before=before, after=after, engine=x.engine + ) + + y[:] = halo_arr.ravel() + return y + + def _rmatvec(self, x: DistributedArray) -> DistributedArray: + if x.partition != Partition.SCATTER: + raise ValueError( + f"x should have partition={Partition.SCATTER} Got {x.partition} instead..." + ) + self._validate_local_array_shape(x, self.local_extent, "x") + + res = DistributedArray( + global_shape=self.shape[1], + partition=Partition.SCATTER, + local_shapes=self._local_dim_sizes, + base_comm=x.base_comm, + base_comm_nccl=x.base_comm_nccl, + engine=x.engine, + dtype=self.dtype, + ) + arr = x.local_array.reshape(self.local_extent) + core_slices = [ + slice(left, left + ldim) + for left, ldim in zip(self.halo[::2], self.local_dims) + ] + core = arr[tuple(core_slices)] + res[:] = core.ravel() + return res diff --git a/pylops_mpi/basicoperators/__init__.py b/pylops_mpi/basicoperators/__init__.py index 8db5a988..d3c707ef 100644 --- a/pylops_mpi/basicoperators/__init__.py +++ b/pylops_mpi/basicoperators/__init__.py @@ -17,6 +17,7 @@ MPISecondDerivative Second Derivative operator MPILaplacian Laplacian operator MPIGradient Gradient operator + MPIHalo Halo operator """ @@ -28,6 +29,8 @@ from .SecondDerivative import * from .Laplacian import * from .Gradient import * +from .Halo import * + __all__ = [ "MPIMatrixMult", @@ -39,5 +42,6 @@ "MPIFirstDerivative", "MPISecondDerivative", "MPILaplacian", - "MPIGradient" + "MPIGradient", + "MPIHalo" ] diff --git a/pylops_mpi/signalprocessing/NonStatConvolve1d.py b/pylops_mpi/signalprocessing/NonStatConvolve1d.py new file mode 100644 index 00000000..a2d26970 --- /dev/null +++ b/pylops_mpi/signalprocessing/NonStatConvolve1d.py @@ -0,0 +1,189 @@ +from typing import Union + +import numpy as np + +from mpi4py import MPI +from pylops.signalprocessing import NonStationaryConvolve1D +from pylops.utils._internal import _value_or_sized_to_tuple +from pylops.utils.backend import to_numpy +from pylops.utils.typing import DTypeLike, InputDimsLike, NDArray + +from pylops_mpi import MPILinearOperator +from pylops_mpi.basicoperators.BlockDiag import MPIBlockDiag +from pylops_mpi.basicoperators.Halo import MPIHalo, halo_block_split + + +def MPINonStationaryConvolve1D( + dims: Union[int, InputDimsLike], + hs: NDArray, + ih: InputDimsLike, + axis: int = -1, + base_comm: MPI.Comm = MPI.COMM_WORLD, + dtype: DTypeLike = "float64", +) -> MPILinearOperator: + r"""1D non-stationary convolution operator. + + Apply distributed non-stationary one-dimensional convolution. + A varying compact filter is provided on a coarser grid and on-the-fly + interpolation is applied in forward and adjoint modes. + + Alongside distributing the input array across different ranks, the + filters are also distributed and filters operating at the edges of the + local arrays are replicated on both ranks either side of the edge. + + .. note:: + + Currently the + :obj:`pylops_mpi.signalprocessing.MPINonStationaryConvolve1D` + requires that shape of the local arrays of the input + :obj:`pylops_mpi.DistributedArray` are be the same for all ranks. + + Parameters + ---------- + dims : :obj:`list` or :obj:`int` + Number of samples for each dimension of the input + :obj:`pylops_mpi.DistributedArray`. + hs : :obj:`numpy.ndarray` + Bank of 1d compact filters of size :math:`n_\text{filts} \times n_h`. + Filters must have odd number of samples and are assumed to be + centered in the middle of the filter support. + ih : :obj:`tuple` + Indices of the locations of the filters ``hs`` in the model (and data). Note + that the filters must be regularly sampled, i.e. :math:`dh=\text{diff}(ih)=\text{const.}` + axis : :obj:`int`, optional + Axis along which convolution is applied + base_comm : :obj:`mpi4py.MPI.Comm`, optional + MPI Base Communicator. Defaults to ``mpi4py.MPI.COMM_WORLD``. + dtype : :obj:`str`, optional + Type of elements in input array. + + Attributes + ---------- + shape : :obj:`tuple` + Operator shape + + Raises + ------ + ValueError + If filters ``hs`` have even size + ValueError + If ``ih`` is not regularly sampled + ValueError + If ``ih`` is outside the bounds defined by ``dims[axis]`` + + Notes + ----- + The MPINonStationaryConvolve1D operator applies non-stationary + one-dimensional convolution between the input signal :math:`d(t)` + and a bank of compact filter kernels :math:`h(t; t_i)`. Assume + the input signal is composed of :math:`N=16` samples, and distributed + across :math:`N=2` ranks (with each local signal composes of + :math:`N=8` samples); similarly, consider :math:`N=4` filters + at locations :math:`t_2` and :math:`t_6` in the first rank and + :math:`t_10` and :math:`t_14` in the second rank. Each rank applies + an halo of :math:`N=4` samples to include the following/preceding + filter, and applies locally a + :class:pylops.signalprocessingNonStationaryConvolve1D` operator; + finally the halo is removed from each local convolved signal. + + """ + # TODO: need to adapt operator to handle NDarrays + rank = base_comm.Get_rank() + size = base_comm.Get_size() + dims = _value_or_sized_to_tuple(dims) + ih = to_numpy(ih) + + # Checks for local operator + if hs.shape[1] % 2 == 0: + raise ValueError("filters hs must have odd length") + if len(np.unique(np.diff(ih))) > 1: + raise ValueError( + "the indices of filters 'ih' are must be regularly sampled" + ) + if min(ih) < 0 or max(ih) >= dims[axis]: + raise ValueError( + "the indices of filters 'ih' must be larger than 0 and " + "smaller than `dims`" + ) + + # Additional check for distributed operarator + if dims[axis] % size: + raise ValueError( + f"number of input samples {dims[0]} is not divisible by " + f"the number of ranks ({size})" + ) + + # Identify halo as the maximum distance between any partition of + # the distributed array into local arrays and the closest filter + # outside of the partition plus half of the size of the filter + dims_local = dims[axis] // size + starts_local = np.arange(0, dims[axis], dims_local) + start_local = starts_local[rank] + end_local = start_local + dims_local - 1 + ihidx_local = np.where((ih >= start_local) & (ih <= end_local))[0] + if len(ihidx_local) == 0: + raise ValueError(f"rank {rank} has zerof filters!") + + ihdiff = np.diff(ih)[0] + ih_local = ih[ihidx_local] + dist_start_local = 0 if rank == 0 else ihdiff - (ih_local[0] - start_local) + dist_end_local = 0 if rank == (size - 1) else ihdiff - (end_local - ih_local[-1]) + dist_start = np.max(np.array(base_comm.allgather(dist_start_local))) + dist_end = np.max(np.array(base_comm.allgather(dist_end_local))) + halo = max([dist_start, dist_end]) + (hs.shape[1] // 2 + 1) + if size == 1: + # to allow operator to work also with size=1 + halo = 0 + + # Create halo operator + proc_grid_shape = [1, ] * len(dims) + proc_grid_shape[axis] = size + HOp = MPIHalo( + dims=dims, + halo=halo, + proc_grid_shape=proc_grid_shape, + comm=base_comm, + dtype=dtype + ) + + if size == 1: + # to allow operator to work also with size=1 + dims_ns = list(dims) + dims_ns[axis] = dims_local + halo + COp = NonStationaryConvolve1D( + dims=dims_ns, hs=hs, ih=ih, axis=axis, dtype=dtype) + else: + x_slice = halo_block_split(dims if len(dims) == 1 else (dims[axis], ), + base_comm, (size, )) + if rank == 0: + dims_ns = list(dims) + dims_ns[axis] = dims_local + halo + COp = NonStationaryConvolve1D( + dims=dims_ns, hs=hs[:ihidx_local[-1] + 2], + ih=ih[:ihidx_local[-1] + 2], + axis=axis, + dtype=dtype + ) + elif rank == size - 1: + dims_ns = list(dims) + dims_ns[axis] = dims_local + halo + COp = NonStationaryConvolve1D( + dims=dims_ns, hs=hs[ihidx_local[0] - 1:], + ih=ih[ihidx_local[0] - 1:] - x_slice[0].start + halo, + axis=axis, + dtype=dtype + ) + else: + dims_ns = list(dims) + dims_ns[axis] = dims_local + 2 * halo + COp = NonStationaryConvolve1D( + dims=dims_ns, hs=hs[ihidx_local[0] - 1: ihidx_local[-1] + 2], + ih=ih[ihidx_local[0] - 1: ihidx_local[-1] + 2] - x_slice[0].start + halo, + axis=axis, + dtype=dtype + ) + + COp_full = MPIBlockDiag([COp, ]) + Op = HOp.H @ COp_full @ HOp + + return Op diff --git a/pylops_mpi/signalprocessing/__init__.py b/pylops_mpi/signalprocessing/__init__.py index 209b64e4..87662206 100644 --- a/pylops_mpi/signalprocessing/__init__.py +++ b/pylops_mpi/signalprocessing/__init__.py @@ -8,18 +8,20 @@ A list of operators present in pylops_mpi.signalprocessing : MPIFredholm1 Fredholm integral of first kind. + MPINonStationaryConvolve1D 1D non-stationary convolution operator. MPIFFT2D Two-dimensional Fast-Fourier Transform MPIFFTND N-dimensional Fast-Fourier Transform - """ from .Fredholm1 import * +from .NonStatConvolve1d import * from .FFT2D import * from .FFTND import * __all__ = [ "MPIFredholm1", + "MPINonStationaryConvolve1D", "MPIFFT2D", "MPIFFTND", ] diff --git a/tests/test_halo.py b/tests/test_halo.py new file mode 100644 index 00000000..d923979c --- /dev/null +++ b/tests/test_halo.py @@ -0,0 +1,427 @@ +"""Test the the Halo class + Designed to run with n processes + $ mpiexec -n 10 pytest test_halo.py --with-mpi +""" +import os +import math + +if int(os.environ.get("TEST_CUPY_PYLOPS", 0)): + import cupy as np + from cupy.testing import assert_allclose + + backend = "cupy" +else: + import numpy as np + from numpy.testing import assert_allclose + + backend = "numpy" +from mpi4py import MPI +import pytest + +import pylops +import pylops_mpi +from pylops_mpi.basicoperators import MPIHalo, halo_block_split +from pylops_mpi.utils.dottest import dottest + +np.random.seed(42) +comm = MPI.COMM_WORLD +rank = comm.Get_rank() +size = comm.Get_size() +if backend == "cupy": + device_id = rank % np.cuda.runtime.getDeviceCount() + np.cuda.Device(device_id).use() + + +par1 = {"dims": (16 * size,), "proc_grid_shape": (size,)} +par2 = {"dims": (16 * size, 8), "proc_grid_shape": (size, 1)} +par3 = {"dims": (8, 16 * size), "proc_grid_shape": (1, size)} +par4 = {"dims": (16 * size, 8, 8), "proc_grid_shape": (size, 1, 1)} +par5 = {"dims": (8, 16 * size, 8), "proc_grid_shape": (1, size, 1)} +par6 = {"dims": (8, 8, 16 * size), "proc_grid_shape": (1, 1, size)} + +uneven_par1 = {"dims": (16 * size + 1,), "proc_grid_shape": (size,)} +uneven_par2 = {"dims": (16 * size + 1, 8), "proc_grid_shape": (size, 1)} +uneven_par3 = {"dims": (8, 16 * size + 1), "proc_grid_shape": (1, size)} +uneven_par4 = {"dims": (8, 8, 16 * size + 1), "proc_grid_shape": (1, 1, size)} + + +def _expected_haloed_block(global_array, dims, proc_grid_shape, halo): + slices = halo_block_split(dims, comm, proc_grid_shape) + local_dims = tuple( + (dims[ax] if sl.stop is None else sl.stop) - sl.start + for ax, sl in enumerate(slices) + ) + expected_shape = tuple( + local_dims[ax] + halo[2 * ax] + halo[2 * ax + 1] + for ax in range(len(local_dims)) + ) + expected = np.zeros(expected_shape, dtype=global_array.dtype) + + source_slices = [] + target_slices = [] + for ax, sl in enumerate(slices): + local_start = sl.start + local_stop = dims[ax] if sl.stop is None else sl.stop + before, after = halo[2 * ax], halo[2 * ax + 1] + + source_start = max(0, local_start - before) + source_stop = min(dims[ax], local_stop + after) + target_start = before - (local_start - source_start) + target_stop = target_start + (source_stop - source_start) + + source_slices.append(slice(source_start, source_stop)) + target_slices.append(slice(target_start, target_stop)) + + expected[tuple(target_slices)] = global_array[tuple(source_slices)] + + return expected + + +@pytest.mark.mpi(min_size=2) +def test_halo_invalid_grid_shape(): + with pytest.raises(ValueError, match="does not match comm size"): + MPIHalo( + dims=(16 * size,), + halo=1, + proc_grid_shape=(size + 1,), + comm=comm, + dtype=np.float64, + ) + + +@pytest.mark.mpi(min_size=2) +def test_halo_invalid_halo_shape(): + with pytest.raises(ValueError, match="Invalid halo length"): + MPIHalo( + dims=(16 * size,), + halo=(1, 2, 3), + proc_grid_shape=(size,), + comm=comm, + dtype=np.float64, + ) + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.parametrize("halo", [-1, (-1,), (1, -1)]) +def test_halo_invalid_negative_halo(halo): + with pytest.raises(ValueError, match="non-negative"): + MPIHalo( + dims=(16 * size,), + halo=halo, + proc_grid_shape=(size,), + comm=comm, + dtype=np.float64, + ) + + +@pytest.mark.mpi(min_size=2) +def test_halo_rejects_halo_wider_than_local_block(): + dims = (4 * size,) + with pytest.raises(ValueError, match="exceeds local block size"): + MPIHalo( + dims=dims, + halo=5, + proc_grid_shape=(size,), + comm=comm, + dtype=np.float64, + ) + + +@pytest.mark.mpi(min_size=2) +def test_halo_invalid_asymmetric_distributed_halo(): + dims = (16 * size,) + with pytest.raises(ValueError, match="does not match neighbor"): + MPIHalo( + dims=dims, + halo=(1, 2), + proc_grid_shape=(size,), + comm=comm, + dtype=np.float64, + ) + + +@pytest.mark.mpi(min_size=2) +def test_halo_rejects_broadcast_input(): + dims = (16 * size,) + halo_op = MPIHalo( + dims=dims, + halo=1, + proc_grid_shape=(size,), + comm=comm, + dtype=np.float64, + ) + x_dist = pylops_mpi.DistributedArray( + global_shape=math.prod(dims), + base_comm=comm, + partition=pylops_mpi.Partition.BROADCAST, + engine=backend, + dtype=np.float64, + ) + y_dist = pylops_mpi.DistributedArray( + global_shape=halo_op.shape[0], + base_comm=comm, + partition=pylops_mpi.Partition.BROADCAST, + engine=backend, + dtype=np.float64, + ) + + with pytest.raises(ValueError, match=f"{pylops_mpi.Partition.SCATTER}"): + halo_op @ x_dist + with pytest.raises(ValueError, match=f"{pylops_mpi.Partition.SCATTER}"): + halo_op.H @ y_dist + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.parametrize("par", [(par1), (par2), (par3), (par4), (par5), (par6)]) +@pytest.mark.parametrize("halo_kind", ["scalar", "ndim_tuple", "per_side_tuple"]) +def test_halo(par, halo_kind): + dims, proc_grid_shape = par["dims"], par["proc_grid_shape"] + ndim = len(dims) + + if halo_kind == "scalar": + halo = 1 + cart = comm.Create_cart(proc_grid_shape, periods=[False] * ndim, reorder=True) + trimmed = [halo] * (2 * ndim) + for ax in range(ndim): + before, after = cart.Shift(ax, 1) + if before == MPI.PROC_NULL: + trimmed[2 * ax] = 0 + if after == MPI.PROC_NULL: + trimmed[2 * ax + 1] = 0 + expected_halo = tuple(trimmed) + elif halo_kind == "ndim_tuple": + halo = tuple(range(1, ndim + 1)) + expected_halo = tuple(value for value in halo for _ in range(2)) + else: + halo = tuple(value for value in range(1, ndim + 1) for _ in range(2)) + expected_halo = halo + + halo_op = MPIHalo( + dims=dims, + halo=halo, + proc_grid_shape=proc_grid_shape, + comm=comm, + dtype=np.float64, + ) + + global_array = np.arange(math.prod(dims), dtype=np.float64).reshape(dims) + local_block = global_array[halo_block_split(dims, comm, proc_grid_shape)] + expected_block = _expected_haloed_block( + global_array, + dims, + proc_grid_shape, + expected_halo, + ) + + x_dist = pylops_mpi.DistributedArray( + global_shape=math.prod(dims), + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER, + engine=backend, + dtype=np.float64, + ) + x_dist[:] = local_block.ravel() + + y_dist = halo_op @ x_dist + assert y_dist.local_array.shape == (expected_block.size,) + assert y_dist.global_shape == (sum(comm.allgather(expected_block.size)),) + assert_allclose(y_dist.local_array, expected_block.ravel(), rtol=1e-14) + + x_adj_dist = halo_op.H @ y_dist + assert x_adj_dist.global_shape == (math.prod(dims),) + assert_allclose(x_adj_dist.local_array, local_block.ravel(), rtol=1e-14) + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.parametrize("par", [(uneven_par1), (uneven_par2), (uneven_par3), (uneven_par4)]) +def test_halo_uneven_global_size(par): + dims, proc_grid_shape = par["dims"], par["proc_grid_shape"] + ndim = len(dims) + halo = 1 + cart = comm.Create_cart(proc_grid_shape, periods=[False] * ndim, reorder=True) + expected_halo = [halo] * (2 * ndim) + for ax in range(ndim): + before, after = cart.Shift(ax, 1) + if before == MPI.PROC_NULL: + expected_halo[2 * ax] = 0 + if after == MPI.PROC_NULL: + expected_halo[2 * ax + 1] = 0 + + halo_op = MPIHalo( + dims=dims, + halo=halo, + proc_grid_shape=proc_grid_shape, + comm=comm, + dtype=np.float64, + ) + + global_array = np.arange(math.prod(dims), dtype=np.float64).reshape(dims) + local_block = global_array[halo_block_split(dims, comm, proc_grid_shape)] + expected_block = _expected_haloed_block( + global_array, + dims, + proc_grid_shape, + tuple(expected_halo), + ) + local_shapes = comm.allgather(local_block.size) + + x_dist = pylops_mpi.DistributedArray( + global_shape=math.prod(dims), + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER, + local_shapes=local_shapes, + engine=backend, + dtype=np.float64, + ) + x_dist[:] = local_block.ravel() + + y_dist = halo_op @ x_dist + assert y_dist.local_array.shape == (expected_block.size,) + assert_allclose(y_dist.local_array, expected_block.ravel(), rtol=1e-14) + + x_adj_dist = halo_op.H @ y_dist + assert x_adj_dist.local_array.shape == (local_block.size,) + assert_allclose(x_adj_dist.local_array, local_block.ravel(), rtol=1e-14) + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.parametrize("par", [(par1), (par2), (par3), (par4), (par5), (par6)]) +def test_halo_tuple_boundary_zeros_match_scalar(par): + dims, proc_grid_shape = par["dims"], par["proc_grid_shape"] + ndim = len(dims) + + cart = comm.Create_cart(proc_grid_shape, periods=[False] * ndim, reorder=True) + tuple_halo = [1] * (2 * ndim) + for ax in range(ndim): + before, after = cart.Shift(ax, 1) + if before == MPI.PROC_NULL: + tuple_halo[2 * ax] = 0 + if after == MPI.PROC_NULL: + tuple_halo[2 * ax + 1] = 0 + + scalar_halo_op = MPIHalo( + dims=dims, + halo=1, + proc_grid_shape=proc_grid_shape, + comm=comm, + dtype=np.float64, + ) + tuple_halo_op = MPIHalo( + dims=dims, + halo=tuple(tuple_halo), + proc_grid_shape=proc_grid_shape, + comm=comm, + dtype=np.float64, + ) + + global_array = np.arange(math.prod(dims), dtype=np.float64).reshape(dims) + local_block = global_array[halo_block_split(dims, comm, proc_grid_shape)] + x_dist = pylops_mpi.DistributedArray( + global_shape=math.prod(dims), + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER, + engine=backend, + dtype=np.float64, + ) + x_dist[:] = local_block.ravel() + + y_scalar_dist = scalar_halo_op @ x_dist + y_tuple_dist = tuple_halo_op @ x_dist + assert y_tuple_dist.global_shape == y_scalar_dist.global_shape + assert y_tuple_dist.local_array.shape == y_scalar_dist.local_array.shape + assert_allclose(y_tuple_dist.local_array, y_scalar_dist.local_array, rtol=1e-14) + + x_scalar_adj_dist = scalar_halo_op.H @ y_scalar_dist + x_tuple_adj_dist = tuple_halo_op.H @ y_tuple_dist + assert_allclose( + x_tuple_adj_dist.local_array, + x_scalar_adj_dist.local_array, + rtol=1e-14, + ) + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.parametrize("par", [(par1), (par2), (par3), (par4), (par5), (par6)]) +@pytest.mark.parametrize("dtype", [np.float64, np.complex128]) +def test_halo_first_derivative(par, dtype): + dims, proc_grid_shape = par["dims"], par["proc_grid_shape"] + axis = proc_grid_shape.index(size) + n = math.prod(dims) + halo = 1 + + halo_op = MPIHalo( + dims=dims, + halo=halo, + proc_grid_shape=proc_grid_shape, + comm=comm, + dtype=dtype, + ) + + x_dist = pylops_mpi.DistributedArray( + global_shape=n, + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER, + engine=backend, + dtype=dtype, + ) + + y_dist = pylops_mpi.DistributedArray( + global_shape=n, + base_comm=comm, + partition=pylops_mpi.Partition.SCATTER, + engine=backend, + dtype=dtype, + ) + + base = np.arange(n, dtype=np.float64).reshape(dims) + model_global = base + data_global = (base + 1.0) / n + if np.dtype(dtype).kind == "c": + model_global = model_global + 1j * (base + 1.0) + data_global = data_global + 1j * ((base + 2.0) / n) + model_global = model_global.astype(dtype) + data_global = data_global.astype(dtype) + local_slices = halo_block_split(dims, comm, proc_grid_shape) + x_dist[:] = model_global[local_slices].ravel() + y_dist[:] = data_global[local_slices].ravel() + + cart = comm.Create_cart(proc_grid_shape, periods=[False] * len(dims), reorder=True) + expected_halo = [halo] * (2 * len(dims)) + for ax in range(len(dims)): + before, after = cart.Shift(ax, 1) + if before == MPI.PROC_NULL: expected_halo[2 * ax] = 0 + if after == MPI.PROC_NULL: expected_halo[2 * ax + 1] = 0 + + local_dims = tuple( + (dims[ax] if sl.stop is None else sl.stop) - sl.start + for ax, sl in enumerate(local_slices) + ) + local_extent = tuple( + local_dims[ax] + expected_halo[2 * ax] + expected_halo[2 * ax + 1] + for ax in range(len(dims)) + ) + + DOp = pylops.FirstDerivative( + dims=local_extent, + axis=axis, + kind="forward", + dtype=dtype, + ) + DOp_dist = pylops_mpi.MPIBlockDiag([DOp], base_comm=comm, dtype=dtype) + Op_dist = halo_op.H @ DOp_dist @ halo_op + + dottest(Op_dist, x_dist, y_dist, n, n) + + y_dist = Op_dist @ x_dist + y_adj_dist = Op_dist.H @ x_dist + + DOp_serial = pylops.FirstDerivative( + dims=dims, + axis=axis, + kind="forward", + dtype=dtype, + ) + y_serial = (DOp_serial @ model_global.ravel()).reshape(dims) + y_adj_serial = (DOp_serial.H @ model_global.ravel()).reshape(dims) + assert_allclose(y_dist.local_array, y_serial[local_slices].ravel(), rtol=1e-14) + assert_allclose(y_adj_dist.local_array, y_adj_serial[local_slices].ravel(), rtol=1e-14) diff --git a/tests/test_nonstatconvolve.py b/tests/test_nonstatconvolve.py new file mode 100644 index 00000000..29d6bcc4 --- /dev/null +++ b/tests/test_nonstatconvolve.py @@ -0,0 +1,126 @@ +"""Test the MPINonStationaryConvolve1D class + Designed to run with n processes + $ mpiexec -n 10 pytest test_nonstatconvolve.py --with-mpi +""" +import os + +if int(os.environ.get("TEST_CUPY_PYLOPS", 0)): + import cupy as np + from cupy.testing import assert_allclose + from cupyx.scipy.signal.windows import triang + backend = "cupy" +else: + import numpy as np + from numpy.testing import assert_allclose + from scipy.signal.windows import triang + backend = "numpy" +from mpi4py import MPI +import pytest +import pylops + +import pylops_mpi +from pylops_mpi import DistributedArray +from pylops_mpi.signalprocessing import MPINonStationaryConvolve1D +from pylops_mpi.utils.dottest import dottest + +np.random.seed(42) +size = MPI.COMM_WORLD.Get_size() +rank = MPI.COMM_WORLD.Get_rank() +if backend == "cupy": + device_id = rank % np.cuda.runtime.getDeviceCount() + np.cuda.Device(device_id).use() + +# filters +nfilts = (5, 7) +nfilts3 = (5, 5, 7) +filts_local = 2 + +h1 = triang(nfilts[0], sym=True) +h1s = np.tile(h1[None], (size * filts_local, 1)) + +par1_1d = { + "nz": 32, + "nx": 64, + "axis": 0, + "dtype": np.float64, +} # first direction +par2_1d = { + "nz": 32, + "nx": 64, + "axis": 1, + "dtype": np.float64, +} # second direction + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.parametrize("par", [(par1_1d)]) +def test_even_filter(par): + """Check error is raised if filter has even size""" + with pytest.raises(ValueError, match="filters hs must have odd length"): + n_between_h = par["nz"] // filts_local + ih = par["nz"] // (2 * filts_local) + \ + np.arange(0, par["nz"] * size, n_between_h) + + _ = MPINonStationaryConvolve1D( + dims=par["nx"], + hs=h1s[..., :-1], + ih=ih, + ) + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.parametrize("par", [(par1_1d)]) +def test_ih_irregular(par): + """Check error is raised if ih (or ihx/ihz) are irregularly sampled""" + with pytest.raises(ValueError, match="must be regularly sampled"): + n_between_h = par["nz"] // filts_local + ih = par["nz"] // (2 * filts_local) + \ + np.arange(0, par["nz"] * size, n_between_h) + ih[0] = ih[0] + 1 + _ = MPINonStationaryConvolve1D( + dims=par["nx"], + hs=h1s, + ih=ih, + ) + + +@pytest.mark.parametrize("par", [(par1_1d), (par2_1d)]) +def test_NonStationaryConvolve1D(par): + """Dot-test and inversion for NonStationaryConvolve1D operator""" + # 1D + if par["axis"] == 0: + n_between_h = par["nz"] // filts_local + ih = par["nz"] // (2 * filts_local) + \ + np.arange(0, par["nz"] * size, n_between_h) + Cop_MPI = MPINonStationaryConvolve1D( + dims=par["nz"] * size, + hs=h1s, + ih=ih, + dtype=par["dtype"] + ) + + x_global = np.zeros(size * par["nz"], dtype=np.float64) + x_global[ih] = 1.0 + + x = DistributedArray(global_shape=size * par["nz"], + partition=pylops_mpi.Partition.SCATTER, + dtype=par["dtype"], engine=backend) + x.local_array[:] = x_global[par["nz"] * rank: par["nz"] * (rank + 1)] + # Forward + y_dist = Cop_MPI @ x + y = y_dist.asarray() + # Adjoint + y_adj_dist = Cop_MPI.H @ y_dist + y_adj = y_adj_dist.asarray() + # Dot test + dottest(Cop_MPI, x, y_dist, size * par["nz"], size * par["nz"]) + + if rank == 0: + Cop = pylops.signalprocessing.NonStationaryConvolve1D( + dims=par["nz"] * size, hs=h1s, ih=ih, + dtype=par['dtype']) + assert Cop_MPI.shape == Cop.shape + y_np = Cop @ x_global + y_adj_np = Cop.H @ y_np + assert_allclose(y, y_np, rtol=1e-14) + assert_allclose(y_adj, y_adj_np, rtol=1e-14)