import podpac
from collections import OrderedDict

import traitlets as tl
import numpy as np
import xarray as xr
import scipy.signal

from podpac.core.settings import settings
from podpac.core.coordinates import Coordinates, UniformCoordinates1d, ArrayCoordinates1d
from podpac.core.coordinates import add_coord
from podpac.core.node import Node
from podpac.core.algorithm.algorithm import UnaryAlgorithm
from podpac.core.utils import common_doc, ArrayTrait, NodeTrait
from podpac.core.node import COMMON_NODE_DOC

[docs]class Convolution(UnaryAlgorithm): """Compute a general convolution over a source node. This node automatically resizes the requested coordinates to avoid edge effects. Attributes ---------- source : podpac.Node Source node on which convolution will be performed. kernel : np.ndarray, optional The convolution kernel. This kernel must include the dimensions of source node outputs. The dimensions for this array are labelled by `kernel_dims`. Any dimensions not in the source nodes outputs will be summed over. kernel_dims : list, optional A list of the dimensions for the kernel axes. If the dimensions in this list do not match the coordinates in the source, then any extra dimensions in the kernel are removed by adding all the values over that axis dimensions in the source are not convolved with any kernel. kernel_type : str, optional If kernel is not defined, kernel_type will create a kernel based on the inputs, and it will have the same number of axes as kernel_dims. The format for the created kernels is '<kernel_type>, <kernel_size>, <kernel_params>'. Any kernel defined in `scipy.signal` as well as `mean` can be used. For example: kernel_type = 'mean, 8' or kernel_type = 'gaussian,16,8' are both valid. Note: These kernels are automatically normalized such that kernel.sum() == 1 """ kernel = ArrayTrait(dtype=float).tag(attr=True) kernel_dims = tl.List().tag(attr=True) # Takes one or the other which is hard to implement in a GUI kernel_type = tl.List().tag(attr=True) def _first_init(self, kernel=None, kernel_dims=None, kernel_type=None, kernel_ndim=None, **kwargs): if kernel_dims is None: raise TypeError("Convolution expected 'kernel_dims' to be specified when giving a 'kernel' array") if kernel is not None and kernel_type is not None: raise TypeError("Convolution expected 'kernel' or 'kernel_type', not both") if kernel is None: if kernel_type is None: raise TypeError("Convolution requires 'kernel' array or 'kernel_type' string") kernel = self._make_kernel(kernel_type, len(kernel_dims)) if len(kernel_dims) != len(np.array(kernel).shape): raise TypeError( "The kernel_dims should contain the same number of dimensions as the number of axes in 'kernel', but len(kernel_dims) {} != len(kernel.shape) {}".format( len(kernel_dims), len(np.array(kernel).shape) ) ) kwargs["kernel"] = kernel kwargs["kernel_dims"] = kernel_dims return super(Convolution, self)._first_init(**kwargs) @common_doc(COMMON_DOC) def _eval(self, coordinates, output=None, _selector=None): """Evaluates this nodes using the supplied coordinates. Parameters ---------- coordinates : podpac.Coordinates {requested_coordinates} output : podpac.UnitsDataArray, optional {eval_output} _selector: callable(coordinates, request_coordinates) {eval_selector} Returns ------- {eval_return} """ # The size of this kernel is used to figure out the expanded size full_kernel = self.kernel # expand the coordinates # The next line effectively drops extra coordinates, so we have to add those later in case the # source is some sort of reduction Node. kernel_dims = [kd for kd in coordinates.dims if kd in self.kernel_dims] missing_dims = [kd for kd in coordinates.dims if kd not in self.kernel_dims] exp_coords = [] exp_slice = [] for dim in kernel_dims: coord = coordinates[dim] s = full_kernel.shape[self.kernel_dims.index(dim)] if s == 1 or not isinstance(coord, (UniformCoordinates1d, ArrayCoordinates1d)): exp_coords.append(coord) exp_slice.append(slice(None)) continue if isinstance(coord, UniformCoordinates1d): s_start = -s // 2 s_end = max(s // 2 - ((s + 1) % 2), 1) # The 1e-14 is for floating point error because if endpoint is slightly # in front of step * N then the endpoint is excluded # ALSO: MUST use size instead of step otherwise floating point error # makes the xarray arrays not align. The following HAS to be true: # np.diff(coord.coordinates).mean() == coord.step exp_coords.append( UniformCoordinates1d( add_coord(coord.start, s_start * coord.step), add_coord(coord.stop, s_end * coord.step + 1e-14 * coord.step), size=coord.size - s_start + s_end, # HAVE to use size, see note above ** ) ) exp_slice.append(slice(-s_start, -s_end)) elif isinstance(coord, ArrayCoordinates1d): if not coord.is_monotonic or coord.size < 2: exp_coords.append(coord) exp_slice.append(slice(None)) continue arr_coords = coord.coordinates delta_start = arr_coords[1] - arr_coords[0] extra_start = np.arange(arr_coords[0] - delta_start * (s // 2), arr_coords[0], delta_start) delta_end = arr_coords[-1] - arr_coords[-2] # The 1e-14 is for floating point error to make sure endpoint is included extra_end = np.arange( arr_coords[-1] + delta_end, arr_coords[-1] + delta_end * (s // 2) + delta_end * 1e-14, delta_end ) arr_coords = np.concatenate([extra_start, arr_coords, extra_end]) exp_coords.append(ArrayCoordinates1d(arr_coords, ** exp_slice.append(slice(extra_start.size, -extra_end.size)) # Add missing dims back in -- this is needed in case the source is a reduce node. exp_coords += [coordinates[d] for d in missing_dims] # Create expanded coordinates exp_slice = tuple(exp_slice) expanded_coordinates = Coordinates(exp_coords,, validate_crs=False) if settings["DEBUG"]: self._expanded_coordinates = expanded_coordinates # evaluate source using expanded coordinates, convolve, and then slice out original coordinates source = self.source.eval(expanded_coordinates, _selector=_selector) kernel_dims_u = kernel_dims kernel_dims = self.kernel_dims sum_dims = [d for d in kernel_dims if d not in source.dims] # Sum out the extra dims full_kernel = full_kernel.sum(axis=tuple([kernel_dims.index(d) for d in sum_dims])) exp_slice = [exp_slice[i] for i in range(len(kernel_dims_u)) if kernel_dims_u[i] not in sum_dims] kernel_dims = [d for d in kernel_dims if d in source.dims] # Put the kernel axes in the correct order # The (if d in kernel_dims) takes care of "output", which can be optionally present full_kernel = full_kernel.transpose([kernel_dims.index(d) for d in source.dims if (d in kernel_dims)]) # Check for extra dimensions in the source and reshape the kernel appropriately if any([d not in kernel_dims for d in source.dims if d != "output"]): new_axis = [] new_exp_slice = [] for d in source.dims: if d in kernel_dims: new_axis.append(slice(None)) new_exp_slice.append(exp_slice[kernel_dims.index(d)]) else: new_axis.append(None) new_exp_slice.append(slice(None)) full_kernel = full_kernel[tuple(new_axis)] exp_slice = new_exp_slice if np.any(np.isnan(source)): method = "direct" else: method = "auto" if ("output" not in source.dims) or ("output" in source.dims and "output" in kernel_dims): result = scipy.signal.convolve(source, full_kernel, mode="same", method=method) else: # source with multiple outputs result = np.stack( [ scipy.signal.convolve(source.sel(output=output), full_kernel, mode="same", method=method) for output in source.coords["output"] ], axis=source.dims.index("output"), ) result = result[tuple(exp_slice)] if output is None: missing_dims = [d for d in coordinates.dims if d not in source.dims] output = self.create_output_array(coordinates.drop(missing_dims), data=result) else: output[:] = result return output @staticmethod def _make_kernel(kernel_type, ndim): ktype = kernel_type.split(",")[0] size = int(kernel_type.split(",")[1]) if ktype == "mean": k = np.ones([size] * ndim) else: args = [float(a) for a in kernel_type.split(",")[2:]] f = getattr(scipy.signal, ktype) k1d = f(size, *args) k = k1d.copy() for i in range(ndim - 1): k = np.tensordot(k, k1d, 0) return k / k.sum()