"""
Stats Summary
"""
from __future__ import division, unicode_literals, print_function, absolute_import
import warnings
from operator import mul
from functools import reduce
import logging
import xarray as xr
import numpy as np
import scipy.stats
import traitlets as tl
from six import string_types
# Internal dependencies
import podpac
from podpac.core.coordinates import Coordinates
from podpac.core.node import Node
from podpac.core.algorithm.algorithm import UnaryAlgorithm, Algorithm
from podpac.core.utils import common_doc, NodeTrait
from podpac.core.node import COMMON_NODE_DOC
COMMON_DOC = COMMON_NODE_DOC.copy()
# Set up logging
_log = logging.getLogger(__name__)
class Reduce(UnaryAlgorithm):
"""Base node for statistical algorithms
Attributes
----------
dims : list
List of strings that give the dimensions which should be reduced
source : podpac.Node
The source node that will be reduced.
"""
from podpac.core.utils import DimsTrait
dims = DimsTrait(allow_none=True, default_value=None).tag(attr=True)
_reduced_coordinates = tl.Instance(Coordinates, allow_none=True)
_dims = tl.List(trait=tl.Unicode())
def _first_init(self, **kwargs):
if "dims" in kwargs and isinstance(kwargs["dims"], string_types):
kwargs["dims"] = [kwargs["dims"]]
return super(Reduce, self)._first_init(**kwargs)
def dims_axes(self, output):
"""Finds the indices for the dimensions that will be reduced. This is passed to numpy.
Parameters
----------
output : UnitsDataArray
The output array with the reduced dimensions
Returns
-------
list
List of integers for the dimensions that will be reduces
"""
axes = [i for i in range(len(output.dims)) if output.dims[i] in self._dims]
return axes
@property
def chunk_size(self):
"""Size of chunks for parallel processing or large arrays that do not fit in memory
Returns
-------
int
Size of chunks
"""
chunk_size = podpac.settings["CHUNK_SIZE"]
if chunk_size == "auto":
return 1024**2 # TODO
else:
return chunk_size
def _get_chunk_shape(self, coords):
"""Shape of chunks for parallel processing or large arrays that do not fit in memory.
Returns
-------
list
List of integers giving the shape of each chunk.
"""
if self.chunk_size is None:
return None
chunk_size = self.chunk_size
d = {k: coords[k].size for k in coords.dims if k not in self._dims}
s = reduce(mul, d.values(), 1)
for dim in self._dims:
n = chunk_size // s
if n == 0:
d[dim] = 1
elif n < coords[dim].size:
d[dim] = n
else:
d[dim] = coords[dim].size
s *= d[dim]
return [d[dim] for dim in coords.dims]
def _reshape(self, x):
"""
Transpose and reshape a DataArray to put the reduce dimensions together
as axis 0. This is useful for example for scipy.stats.skew and kurtosis
which only calculate over a single axis, by default 0.
Parameters
----------
x : xr.DataArray
Input DataArray
Returns
-------
a : np.array
Transposed and reshaped array
"""
if self._dims is None:
return x.data.flatten()
n = len(self._dims)
dims = list(self._dims) + [d for d in x.dims if d not in self._dims]
x = x.transpose(*dims)
a = x.data.reshape(-1, *x.shape[n:])
return a
def iteroutputs(self, coordinates, _selector):
"""Generator for the chunks of the output
Yields
------
UnitsDataArray
Output for this chunk
"""
chunk_shape = self._get_chunk_shape(coordinates)
for chunk in coordinates.iterchunks(chunk_shape):
yield self.source.eval(chunk, _selector=_selector)
@common_doc(COMMON_DOC)
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this nodes using the supplied coordinates.
Parameters
----------
coordinates : podpac.Coordinates
{requested_coordinates}
output : podpac.UnitsDataArray, optional
{eval_output}
_selector: callable(coordinates, request_coordinates)
{eval_selector}
Returns
-------
{eval_return}
"""
self._requested_coordinates = coordinates
if self.dims:
self._dims = [dim for dim in self.dims if dim in coordinates.dims]
else:
self._dims = list(coordinates.dims)
self._reduced_coordinates = coordinates.drop(self._dims)
if output is None:
output = self.create_output_array(self._reduced_coordinates)
if self.chunk_size and self.chunk_size < reduce(mul, coordinates.shape, 1):
try:
result = self.reduce_chunked(self.iteroutputs(coordinates, _selector), output)
except NotImplementedError:
warnings.warn("No reduce_chunked method defined, using one-step reduce")
source_output = self.source.eval(coordinates, _selector=_selector)
result = self.reduce(source_output)
else:
source_output = self.source.eval(coordinates, _selector=_selector)
result = self.reduce(source_output)
if output.shape == ():
output.data = result
else:
output[:] = result
return output
def reduce(self, x):
"""
Reduce a full array, e.g. x.mean(dims).
Must be defined in each child.
Parameters
----------
x : UnitsDataArray
Array that needs to be reduced.
Raises
------
NotImplementedError
Must be defined in each child.
"""
raise NotImplementedError
def reduce_chunked(self, xs, output):
"""
Reduce a list of xs with a memory-effecient iterative algorithm.
Optionally defined in each child.
Parameters
----------
xs : list, generator
List of UnitsDataArray's that need to be reduced together.
Returns
-------
UnitsDataArray
Reduced output.
"""
raise NotImplementedError
class ReduceOrthogonal(Reduce):
"""
Extended Reduce class that enables chunks that are smaller than the reduced
output array.
The base Reduce node ensures that each chunk is at least as big as the
reduced output, which works for statistics that can be calculated in O(1)
space. For statistics that require O(n) space, the node must iterate
through the Coordinates orthogonally to the reduce dimension, using chunks
that only cover a portion of the output array.
"""
def _get_chunk_shape(self, coords):
"""Shape of chunks for parallel processing or large arrays that do not fit in memory.
Returns
-------
list
List of integers giving the shape of each chunk.
"""
if self.chunk_size is None:
return None
chunk_size = self.chunk_size
# here, the minimum size is the reduce-dimensions size
d = {k: coords[k].size for k in self._dims}
s = reduce(mul, d.values(), 1)
for dim in coords.dims[::-1]:
if dim in self._dims:
continue
n = chunk_size // s
if n == 0:
d[dim] = 1
elif n < coords[dim].size:
d[dim] = n
else:
d[dim] = coords[dim].size
s *= d[dim]
return [d[dim] for dim in coords.dims]
def iteroutputs(self, coordinates, selector):
"""Generator for the chunks of the output
Yields
------
UnitsDataArray
Output for this chunk
"""
chunk_shape = self._get_chunk_shape(coordinates)
for chunk, slices in coordinates.iterchunks(chunk_shape, return_slices=True):
yield self.source.eval(chunk, _selector=selector), slices
def reduce_chunked(self, xs, output):
"""
Reduce a list of xs with a memory-effecient iterative algorithm.
Optionally defined in each child.
Parameters
----------
xs : list, generator
List of UnitsDataArray's that need to be reduced together.
Returns
-------
UnitsDataArray
Reduced output.
"""
# special case for full reduce
if not self._reduced_coordinates.dims:
x, xslices = next(xs)
return self.reduce(x)
y = xr.full_like(output, np.nan)
for x, xslices in xs:
yslc = tuple(xslices[self._requested_coordinates.dims.index(dim)] for dim in self._reduced_coordinates.dims)
y.data[yslc] = self.reduce(x)
return y
[docs]class Min(Reduce):
"""Computes the minimum across dimension(s)"""
[docs] def reduce(self, x):
"""Computes the minimum across dimension(s)
Parameters
----------
x : UnitsDataArray
Source data.
Returns
-------
UnitsDataArray
Minimum of the source data over dims
"""
return x.min(dim=self._dims)
[docs] def reduce_chunked(self, xs, output):
"""Computes the minimum across a chunk
Parameters
----------
xs : iterable
Iterable of sources
Returns
-------
UnitsDataArray
Minimum of the source data over dims
"""
# note: np.fmin ignores NaNs, np.minimum propagates NaNs
y = xr.full_like(output, np.nan)
for x in xs:
y = np.fmin(y, x.min(dim=self._dims))
return y
[docs]class Max(Reduce):
"""Computes the maximum across dimension(s)"""
[docs] def reduce(self, x):
"""Computes the maximum across dimension(s)
Parameters
----------
x : UnitsDataArray
Source data.
Returns
-------
UnitsDataArray
Maximum of the source data over dims
"""
return x.max(dim=self._dims)
[docs] def reduce_chunked(self, xs, output):
"""Computes the maximum across a chunk
Parameters
----------
xs : iterable
Iterable of sources
Returns
-------
UnitsDataArray
Maximum of the source data over dims
"""
# note: np.fmax ignores NaNs, np.maximum propagates NaNs
y = xr.full_like(output, np.nan)
for x in xs:
y = np.fmax(y, x.max(dim=self._dims))
return y
[docs]class Sum(Reduce):
"""Computes the sum across dimension(s)"""
[docs] def reduce(self, x):
"""Computes the sum across dimension(s)
Parameters
----------
x : UnitsDataArray
Source data.
Returns
-------
UnitsDataArray
Sum of the source data over dims
"""
return x.sum(dim=self._dims)
[docs] def reduce_chunked(self, xs, output):
"""Computes the sum across a chunk
Parameters
----------
xs : iterable
Iterable of sources
Returns
-------
UnitsDataArray
Sum of the source data over dims
"""
s = xr.zeros_like(output)
for x in xs:
s += x.sum(dim=self._dims)
return s
[docs]class Count(Reduce):
"""Counts the finite values across dimension(s)"""
[docs] def reduce(self, x):
"""Counts the finite values across dimension(s)
Parameters
----------
x : UnitsDataArray
Source data.
Returns
-------
UnitsDataArray
Number of finite values of the source data over dims
"""
return np.isfinite(x).sum(dim=self._dims)
[docs] def reduce_chunked(self, xs, output):
"""Counts the finite values across a chunk
Parameters
----------
xs : iterable
Iterable of sources
Returns
-------
UnitsDataArray
Number of finite values of the source data over dims
"""
n = xr.zeros_like(output)
for x in xs:
n += np.isfinite(x).sum(dim=self._dims)
return n
[docs]class Mean(Reduce):
"""Computes the mean across dimension(s)"""
[docs] def reduce(self, x):
"""Computes the mean across dimension(s)
Parameters
----------
x : UnitsDataArray
Source data.
Returns
-------
UnitsDataArray
Mean of the source data over dims
"""
return x.mean(dim=self._dims)
[docs] def reduce_chunked(self, xs, output):
"""Computes the mean across a chunk
Parameters
----------
xs : iterable
Iterable of sources
Returns
-------
UnitsDataArray
Mean of the source data over dims
"""
s = xr.zeros_like(output)
n = xr.zeros_like(output)
for x in xs:
# TODO efficency
s += x.sum(dim=self._dims)
n += np.isfinite(x).sum(dim=self._dims)
output = s / n
return output
[docs]class Variance(Reduce):
"""Computes the variance across dimension(s)"""
[docs] def reduce(self, x):
"""Computes the variance across dimension(s)
Parameters
----------
x : UnitsDataArray
Source data.
Returns
-------
UnitsDataArray
Variance of the source data over dims
"""
return x.var(dim=self._dims)
[docs] def reduce_chunked(self, xs, output):
"""Computes the variance across a chunk
Parameters
----------
xs : iterable
Iterable of sources
Returns
-------
UnitsDataArray
Variance of the source data over dims
"""
n = xr.zeros_like(output)
m = xr.zeros_like(output)
m2 = xr.zeros_like(output)
# Welford, adapted to handle multiple data points in each iteration
for x in xs:
n += np.isfinite(x).sum(dim=self._dims)
d = x - m
m += (d / n).sum(dim=self._dims)
d2 = x - m
m2 += (d * d2).sum(dim=self._dims)
return m2 / n
[docs]class Skew(Reduce):
"""
Computes the skew across dimension(s)
TODO NaN behavior when there is NO data (currently different in reduce and reduce_chunked)
"""
[docs] def reduce(self, x):
"""Computes the skew across dimension(s)
Parameters
----------
x : UnitsDataArray
Source data.
Returns
-------
UnitsDataArray
Skew of the source data over dims
"""
# N = np.isfinite(x).sum(dim=self._dims)
# M1 = x.mean(dim=self._dims)
# E = x - M1
# E2 = E**2
# E3 = E2*E
# M2 = (E2).sum(dim=self._dims)
# M3 = (E3).sum(dim=self._dims)
# skew = self.skew(M3, M2, N)
a = self._reshape(x)
skew = scipy.stats.skew(a, nan_policy="omit")
return skew
[docs] def reduce_chunked(self, xs, output):
"""Computes the skew across a chunk
Parameters
----------
xs : iterable
Iterable of sources
Returns
-------
UnitsDataArray
Skew of the source data over dims
"""
N = xr.zeros_like(output)
M1 = xr.zeros_like(output)
M2 = xr.zeros_like(output)
M3 = xr.zeros_like(output)
check_empty = True
for x in xs:
Nx = np.isfinite(x).sum(dim=self._dims)
M1x = x.mean(dim=self._dims)
Ex = x - M1x
Ex2 = Ex**2
Ex3 = Ex2 * Ex
M2x = (Ex2).sum(dim=self._dims)
M3x = (Ex3).sum(dim=self._dims)
# premask to omit NaNs
b = Nx.data > 0
Nx = Nx.data[b]
M1x = M1x.data[b]
M2x = M2x.data[b]
M3x = M3x.data[b]
Nb = N.data[b]
M1b = M1.data[b]
M2b = M2.data[b]
# merge
d = M1x - M1b
n = Nb + Nx
NNx = Nb * Nx
M3.data[b] += M3x + d**3 * NNx * (Nb - Nx) / n**2 + 3 * d * (Nb * M2x - Nx * M2b) / n
M2.data[b] += M2x + d**2 * NNx / n
M1.data[b] += d * Nx / n
N.data[b] = n
# calculate skew
skew = np.sqrt(N) * M3 / np.sqrt(M2**3)
return skew
[docs]class Kurtosis(Reduce):
"""Computes the kurtosis across dimension(s)
TODO NaN behavior when there is NO data (currently different in reduce and reduce_chunked)
"""
[docs] def reduce(self, x):
"""Computes the kurtosis across dimension(s)
Parameters
----------
x : UnitsDataArray
Source data.
Returns
-------
UnitsDataArray
Kurtosis of the source data over dims
"""
# N = np.isfinite(x).sum(dim=self._dims)
# M1 = x.mean(dim=self._dims)
# E = x - M1
# E2 = E**2
# E4 = E2**2
# M2 = (E2).sum(dim=self._dims)
# M4 = (E4).sum(dim=self._dims)
# kurtosis = N * M4 / M2**2 - 3
a = self._reshape(x)
kurtosis = scipy.stats.kurtosis(a, nan_policy="omit")
return kurtosis
[docs] def reduce_chunked(self, xs, output):
"""Computes the kurtosis across a chunk
Parameters
----------
xs : iterable
Iterable of sources
Returns
-------
UnitsDataArray
Kurtosis of the source data over dims
"""
N = xr.zeros_like(output)
M1 = xr.zeros_like(output)
M2 = xr.zeros_like(output)
M3 = xr.zeros_like(output)
M4 = xr.zeros_like(output)
for x in xs:
Nx = np.isfinite(x).sum(dim=self._dims)
M1x = x.mean(dim=self._dims)
Ex = x - M1x
Ex2 = Ex**2
Ex3 = Ex2 * Ex
Ex4 = Ex2**2
M2x = (Ex2).sum(dim=self._dims)
M3x = (Ex3).sum(dim=self._dims)
M4x = (Ex4).sum(dim=self._dims)
# premask to omit NaNs
b = Nx.data > 0
Nx = Nx.data[b]
M1x = M1x.data[b]
M2x = M2x.data[b]
M3x = M3x.data[b]
M4x = M4x.data[b]
Nb = N.data[b]
M1b = M1.data[b]
M2b = M2.data[b]
M3b = M3.data[b]
# merge
d = M1x - M1b
n = Nb + Nx
NNx = Nb * Nx
M4.data[b] += (
M4x
+ d**4 * NNx * (Nb**2 - NNx + Nx**2) / n**3
+ 6 * d**2 * (Nb**2 * M2x + Nx**2 * M2b) / n**2
+ 4 * d * (Nb * M3x - Nx * M3b) / n
)
M3.data[b] += M3x + d**3 * NNx * (Nb - Nx) / n**2 + 3 * d * (Nb * M2x - Nx * M2b) / n
M2.data[b] += M2x + d**2 * NNx / n
M1.data[b] += d * Nx / n
N.data[b] = n
# calculate kurtosis
kurtosis = N * M4 / M2**2 - 3
return kurtosis
[docs]class StandardDeviation(Variance):
"""Computes the standard deviation across dimension(s)"""
[docs] def reduce(self, x):
"""Computes the standard deviation across dimension(s)
Parameters
----------
x : UnitsDataArray
Source data.
Returns
-------
UnitsDataArray
Standard deviation of the source data over dims
"""
return x.std(dim=self._dims)
[docs] def reduce_chunked(self, xs, output):
"""Computes the standard deviation across a chunk
Parameters
----------
xs : iterable
Iterable of sources
Returns
-------
UnitsDataArray
Standard deviation of the source data over dims
"""
var = super(StandardDeviation, self).reduce_chunked(xs, output)
return np.sqrt(var)
class Percentile(ReduceOrthogonal):
"""Computes the percentile across dimension(s)
Attributes
----------
percentile : TYPE
Description
"""
percentile = tl.Float(default_value=50.0).tag(attr=True)
def reduce(self, x):
"""Computes the percentile across dimension(s)
Parameters
----------
x : UnitsDataArray
Source data.
Returns
-------
UnitsDataArray
Percentile of the source data over dims
"""
return np.nanpercentile(x, self.percentile, self.dims_axes(x))
# =============================================================================
# Time-Grouped Reduce
# =============================================================================
_REDUCE_FUNCTIONS = ["all", "any", "count", "max", "mean", "median", "min", "prod", "std", "sum", "var", "custom"]
[docs]class GroupReduce(UnaryAlgorithm):
"""
Group a time-dependent source node and then compute a statistic for each result.
Attributes
----------
custom_reduce_fn : function
required if reduce_fn is 'custom'.
groupby : str
datetime sub-accessor. Currently 'dayofyear' is the enabled option.
reduce_fn : str
builtin xarray groupby reduce function, or 'custom'.
source : podpac.Node
Source node
"""
_repr_keys = ["source", "groupby", "reduce_fn"]
coordinates_source = NodeTrait(allow_none=True).tag(attr=True)
# see https://github.com/pydata/xarray/blob/eeb109d9181c84dfb93356c5f14045d839ee64cb/xarray/core/accessors.py#L61
groupby = tl.CaselessStrEnum(["dayofyear", "weekofyear", "season", "month"], allow_none=True).tag(attr=True)
reduce_fn = tl.CaselessStrEnum(_REDUCE_FUNCTIONS).tag(attr=True)
custom_reduce_fn = tl.Any(allow_none=True, default_value=None).tag(attr=True)
_source_coordinates = tl.Instance(Coordinates)
@tl.default("coordinates_source")
def _default_coordinates_source(self):
return self.source
@common_doc(COMMON_DOC)
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this nodes using the supplied coordinates.
Parameters
----------
coordinates : podpac.Coordinates
{requested_coordinates}
output : podpac.UnitsDataArray, optional
{eval_output}
selector: callable(coordinates, request_coordinates)
{eval_selector}
Returns
-------
{eval_return}
Raises
------
ValueError
If source it not time-depended (required by this node).
"""
source_output = self.source.eval(coordinates)
# group
grouped = source_output.groupby("time.%s" % self.groupby)
# reduce
if self.reduce_fn == "custom":
out = grouped.apply(self.custom_reduce_fn, "time")
else:
# standard, e.g. grouped.median('time')
out = getattr(grouped, self.reduce_fn)("time")
out = out.rename({self.groupby: "time"})
if output is None:
coords = podpac.coordinates.merge_dims(
[coordinates.drop("time"), Coordinates([out.coords["time"]], ["time"])]
)
coords = coords.transpose(*out.dims)
output = self.create_output_array(coords, data=out.data)
else:
output.data[:] = out.data[:]
## map
# eval_time = xr.DataArray(coordinates.coords["time"])
# E = getattr(eval_time.dt, self.groupby)
# out = out.sel(**{self.groupby: E}).rename({self.groupby: "time"})
# output[:] = out.transpose(*output.dims).data
return output
@property
def base_ref(self):
"""
Default node reference/name in node definitions
Returns
-------
str
Default node reference/name in node definitions
"""
return "%s.%s.%s" % (self.source.base_ref, self.groupby, self.reduce_fn)
[docs]class ResampleReduce(UnaryAlgorithm):
"""
Resample a time-dependent source node using a statistical operation to achieve the result.
Attributes
----------
custom_reduce_fn : function
required if reduce_fn is 'custom'.
resample : str
datetime sub-accessor. Currently 'dayofyear' is the enabled option.
reduce_fn : str
builtin xarray groupby reduce function, or 'custom'.
source : podpac.Node
Source node
"""
_repr_keys = ["source", "resample", "reduce_fn"]
coordinates_source = NodeTrait(allow_none=True).tag(attr=True)
# see https://github.com/pydata/xarray/blob/eeb109d9181c84dfb93356c5f14045d839ee64cb/xarray/core/accessors.py#L61
resample = tl.Unicode().tag(attr=True)
reduce_fn = tl.CaselessStrEnum(_REDUCE_FUNCTIONS).tag(attr=True)
custom_reduce_fn = tl.Any(allow_none=True, default_value=None).tag(attr=True)
_source_coordinates = tl.Instance(Coordinates)
@tl.default("coordinates_source")
def _default_coordinates_source(self):
return self.source
@common_doc(COMMON_DOC)
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this nodes using the supplied coordinates.
Parameters
----------
coordinates : podpac.Coordinates
{requested_coordinates}
output : podpac.UnitsDataArray, optional
{eval_output}
_selector: callable(coordinates, request_coordinates)
{eval_selector}
Returns
-------
{eval_return}
Raises
------
ValueError
If source it not time-dependent (required by this node).
"""
source_output = self.source.eval(coordinates, _selector=_selector)
# group
grouped = source_output.resample(time=self.resample)
# reduce
if self.reduce_fn == "custom":
out = grouped.reduce(self.custom_reduce_fn)
else:
# standard, e.g. grouped.median('time')
out = getattr(grouped, self.reduce_fn)()
if output is None:
output = podpac.UnitsDataArray(out)
output.attrs = source_output.attrs
else:
output.data[:] = out.data[:]
## map
# eval_time = xr.DataArray(coordinates.coords["time"])
# E = getattr(eval_time.dt, self.groupby)
# out = out.sel(**{self.groupby: E}).rename({self.groupby: "time"})
# output[:] = out.transpose(*output.dims).data
return output
@property
def base_ref(self):
"""
Default node reference/name in node definitions
Returns
-------
str
Default node reference/name in node definitions
"""
return "%s.%s.%s" % (self.source.base_ref, self.resample, self.reduce_fn)
[docs]class DayOfYear(GroupReduce):
"""
Group a time-dependent source node by day of year and compute a statistic for each group.
Attributes
----------
custom_reduce_fn : function
required if reduce_fn is 'custom'.
reduce_fn : str
builtin xarray groupby reduce function, or 'custom'.
source : podpac.Node
Source node
"""
groupby = "dayofyear"
class DayOfYearWindow(Algorithm):
"""
This applies a function over a moving window around day-of-year in the requested coordinates.
It includes the ability to rescale the input/outputs. Note if, the input coordinates include multiple years, the
moving window will include all of the data inside the day-of-year window.
Users need to implement the 'function' method.
Attributes
-----------
source: podpac.Node
The source node from which the statistics will be computed
window: int, optional
Default is 0. The size of the window over which to compute the distrubtion. This is always centered about the
day-of-year. The total number of days is always an odd number. For example, window=2 and window=3 will compute
the beta distribution for [x-1, x, x + 1] and report it as the result for x, where x is a day of the year.
scale_max: podpac.Node, optional
Default is None. A source dataset that can be used to scale the maximum value of the source function so that it
will fall between [0, 1]. If None, uses self.scale_float[0].
scale_min: podpac.Node, optional
Default is None. A source dataset that can be used to scale the minimum value of the source function so that it
will fall between [0, 1]. If None, uses self.scale_float[1].
scale_float: list, optional
Default is []. Floating point numbers used to scale the max [0] and min [1] of the source so that it falls
between [0, 1]. If scale_max or scale_min are defined, this property is ignored. If these are defined, the data
will be rescaled only if rescale=True below.
If None and scale_max/scale_min are not defined, the data is not scaled in any way.
rescale: bool, optional
Rescales the output data after being scaled from scale_float or scale_min/max
"""
source = tl.Instance(podpac.Node).tag(attr=True)
window = tl.Int(0).tag(attr=True)
scale_max = tl.Instance(podpac.Node, default_value=None, allow_none=True).tag(attr=True)
scale_min = tl.Instance(podpac.Node, default_value=None, allow_none=True).tag(attr=True)
scale_float = tl.List(default_value=None, allow_none=True).tag(attr=True)
rescale = tl.Bool(False).tag(attr=True)
def algorithm(self, inputs, coordinates):
win = self.window // 2
source = inputs["source"]
# Scale the source to range [0, 1], required for the beta distribution
if "scale_max" in inputs:
scale_max = inputs["scale_max"]
elif self.scale_float and self.scale_float[1] is not None:
scale_max = self.scale_float[1]
else:
scale_max = None
if "scale_min" in inputs:
scale_min = inputs["scale_min"]
elif self.scale_float and self.scale_float[0] is not None:
scale_min = self.scale_float[0]
else:
scale_min = None
_log.debug("scale_min: {}\nscale_max: {}".format(scale_min, scale_max))
if scale_min is not None and scale_max is not None:
source = (source.copy() - scale_min) / (scale_max - scale_min)
with np.errstate(invalid="ignore"):
source.data[(source.data < 0) | (source.data > 1)] = np.nan
# Make the output coordinates with day-of-year as time
coords = xr.Dataset({"time": coordinates["time"].coordinates})
dsdoy = np.sort(np.unique(coords.time.dt.dayofyear))
latlon_coords = coordinates.drop("time")
time_coords = podpac.Coordinates([dsdoy], ["time"])
coords = podpac.coordinates.merge_dims([latlon_coords, time_coords])
coords = coords.transpose(*coordinates.dims)
output = self.create_output_array(coords)
# if all-nan input, no need to calculate
if np.all(np.isnan(source)):
return output
# convert source time coords to day-of-year as well
sdoy = source.time.dt.dayofyear
# loop over each day of year and compute window
for i, doy in enumerate(dsdoy):
_log.debug("Working on doy {doy} ({i}/{ld})".format(doy=doy, i=i + 1, ld=len(dsdoy)))
# If either the start or end runs over the year, we need to do an OR on the bool index
# ----->s....<=e------ .in -out
# ..<=e----------->s..
do_or = False
start = doy - win
if start < 1:
start += 365
do_or = True
end = doy + win
if end > 365:
end -= 365
do_or = True
if do_or:
I = (sdoy >= start) | (sdoy <= end)
else:
I = (sdoy >= start) & (sdoy <= end)
# Scipy's beta function doesn's support multi-dimensional arrays, so we have to loop over lat/lon/alt
lat_f = lon_f = alt_f = [None]
dims = ["lat", "lon", "alt"]
if "lat" in source.dims:
lat_f = source["lat"].data
if "lon" in source.dims:
lon_f = source["lon"].data
if "alt" in source.dims:
alt_f = source["alt"].data
for alt in alt_f:
for lat in lat_f:
for lon in lon_f:
# _log.debug(f'lat, lon, alt = {lat}, {lon}, {alt})
loc_dict = {k: v for k, v in zip(dims, [lat, lon, alt]) if v is not None}
data = source.sel(time=I, **loc_dict).dropna("time").data
if np.all(np.isnan(data)):
continue
# Fit function to the particular point
output.loc[loc_dict][{"time": i}] = self.function(data, output.loc[loc_dict][{"time": i}])
# Rescale the outputs
if self.rescale:
output = self.rescale_outputs(output, scale_max, scale_min)
return output
def function(self, data, output):
raise NotImplementedError(
"Child classes need to implement this function. It is applied over the data and needs"
" to populate the output."
)
def rescale_outputs(self, output, scale_max, scale_min):
output = (output * (scale_max - scale_min)) + scale_min
return output