"""
Generic Data Source Class
DataSource is the root class for all other podpac defined data sources,
including user defined data sources.
"""
from __future__ import division, unicode_literals, print_function, absolute_import
from collections import OrderedDict
from copy import deepcopy
import warnings
import logging
import numpy as np
import xarray as xr
import traitlets as tl
# Internal imports
from podpac.core.settings import settings
from podpac.core.units import UnitsDataArray
from podpac.core.coordinates import Coordinates, Coordinates1d, StackedCoordinates
from podpac.core.coordinates.utils import VALID_DIMENSION_NAMES, make_coord_delta, make_coord_delta_array
from podpac.core.node import Node
from podpac.core.utils import common_doc, cached_property
from podpac.core.node import COMMON_NODE_DOC
log = logging.getLogger(__name__)
DATA_DOC = {
"coordinates": "The coordinates of the data source.",
"get_data": """
This method must be defined by the data source implementing the DataSource class.
When data source nodes are evaluated, this method is called with request coordinates and coordinate indexes.
The implementing method can choose which input provides the most efficient method of getting data
(i.e via coordinates or via the index of the coordinates).
Coordinates and coordinate indexes may be strided or subsets of the
source data, but all coordinates and coordinate indexes will match 1:1 with the subset data.
This method may return a numpy array, an xarray DaraArray, or a podpac UnitsDataArray.
If a numpy array or xarray DataArray is returned, :meth:`podpac.data.DataSource.evaluate` will
cast the data into a `UnitsDataArray` using the requested source coordinates.
If a podpac UnitsDataArray is passed back, the :meth:`podpac.data.DataSource.evaluate`
method will not do any further processing.
The inherited Node method `create_output_array` can be used to generate the template UnitsDataArray
in your DataSource.
See :meth:`podpac.Node.create_output_array` for more details.
Parameters
----------
coordinates : :class:`podpac.Coordinates`
The coordinates that need to be retrieved from the data source using the coordinate system of the data
source
coordinates_index : List
A list of slices or a boolean array that give the indices of the data that needs to be retrieved from
the data source. The values in the coordinate_index will vary depending on the `coordinate_index_type`
defined for the data source.
Returns
--------
np.ndarray, xr.DataArray, :class:`podpac.UnitsDataArray`
A subset of the returned data. If a numpy array or xarray DataArray is returned,
the data will be cast into UnitsDataArray using the returned data to fill values
at the requested source coordinates.
""",
"get_coordinates": """
Returns a Coordinates object that describes the coordinates of the data source.
In most cases, this method is defined by the data source implementing the DataSource class.
If method is not implemented by the data source, it will try to return ``self.coordinates``
if ``self.coordinates`` is not None.
Otherwise, this method will raise a NotImplementedError.
Returns
--------
:class:`podpac.Coordinates`
The coordinates describing the data source array.
Notes
------
Need to pay attention to:
- the order of the dimensions
- the stacking of the dimension
- the type of coordinates
Coordinates should be non-nan and non-repeating for best compatibility
""",
"interpolation": """
Interpolation definition for the data source.
By default, the interpolation method is set to `podpac.settings["DEFAULT_INTERPOLATION"]` which defaults to 'nearest'` for all dimensions.
""",
"interpolation_long": """
{interpolation}
If input is a string, it must match one of the interpolation shortcuts defined in
:attr:`podpac.data.INTERPOLATION_SHORTCUTS`. The interpolation method associated
with this string will be applied to all dimensions at the same time.
If input is a dict or list of dict, the dict or dict elements must adhere to the following format:
The key ``'method'`` defining the interpolation method name.
If the interpolation method is not one of :attr:`podpac.data.INTERPOLATION_SHORTCUTS`, a
second key ``'interpolators'`` must be defined with a list of
:class:`podpac.interpolators.Interpolator` classes to use in order of uages.
The dictionary may contain an option ``'params'`` key which contains a dict of parameters to pass along to
the :class:`podpac.interpolators.Interpolator` classes associated with the interpolation method.
The dict may contain the key ``'dims'`` which specifies dimension names (i.e. ``'time'`` or ``('lat', 'lon')`` ).
If the dictionary does not contain a key for all unstacked dimensions of the source coordinates, the
:attr:`podpac.data.INTERPOLATION_DEFAULT` value will be used.
All dimension keys must be unstacked even if the underlying coordinate dimensions are stacked.
Any extra dimensions included but not found in the source coordinates will be ignored.
The dict may contain a key ``'params'`` that can be used to configure the :class:`podpac.interpolators.Interpolator` classes associated with the interpolation method.
If input is a :class:`podpac.data.Interpolation` class, this Interpolation
class will be used without modification.
""",
}
COMMON_DATA_DOC = COMMON_NODE_DOC.copy()
COMMON_DATA_DOC.update(DATA_DOC) # inherit and overwrite with DATA_DOC
[docs]@common_doc(COMMON_DATA_DOC)
class DataSource(Node):
"""Base node for any data obtained directly from a single source.
Parameters
----------
source : Any
The location of the source. Depending on the child node this can be a filepath,
numpy array, or dictionary as a few examples.
coordinates : :class:`podpac.Coordinates`
{coordinates}
nan_vals : List, optional
List of values from source data that should be interpreted as 'no data' or 'nans'
coordinate_index_type : str, optional
Type of index to use for data source. Possible values are ``['slice', 'numpy', 'xarray']``
Default is 'numpy', which allows a tuple of integer indices.
cache_coordinates : bool
Whether to cache coordinates using the podpac ``cache_ctrl``. Default False.
cache_output : bool
Should the node's output be cached? If not provided or None, uses default based on
settings["CACHE_DATASOURCE_OUTPUT_DEFAULT"]. If True, outputs will be cached and retrieved from cache. If False,
outputs will not be cached OR retrieved from cache (even if they exist in cache).
Notes
-----
Custom DataSource Nodes must implement the :meth:`get_data` and :meth:`get_coordinates` methods.
"""
nan_vals = tl.List().tag(attr=True)
nan_val = tl.Any(np.nan).tag(attr=True)
boundary = tl.Dict().tag(attr=True)
coordinate_index_type = tl.Enum(
["slice", "numpy", "xarray"],
default_value="numpy",
).tag(attr=True)
cache_coordinates = tl.Bool(False)
cache_output = tl.Bool()
# privates
_coordinates = tl.Instance(Coordinates, allow_none=True, default_value=None, read_only=True)
# debug attributes
_requested_coordinates = tl.Instance(Coordinates, allow_none=True)
_requested_source_coordinates = tl.Instance(Coordinates, allow_none=True)
_requested_source_coordinates_index = tl.Instance(tuple, allow_none=True)
_requested_source_boundary = tl.Instance(dict, allow_none=True)
_requested_source_data = tl.Instance(UnitsDataArray, allow_none=True)
_evaluated_coordinates = tl.Instance(Coordinates, allow_none=True)
@tl.validate("boundary")
def _validate_boundary(self, d):
val = d["value"]
for dim, boundary in val.items():
if dim not in VALID_DIMENSION_NAMES:
raise ValueError("Invalid dimension '%s' in boundary" % dim)
if np.array(boundary).ndim == 0:
try:
delta = make_coord_delta(boundary)
except ValueError:
raise ValueError(
"Invalid boundary for dimension '%s' ('%s' is not a valid coordinate delta)" % (dim, boundary)
)
if np.array(delta).astype(float) < 0:
raise ValueError("Invalid boundary for dimension '%s' (%s < 0)" % (dim, delta))
if np.array(boundary).ndim == 1:
make_coord_delta_array(boundary)
raise NotImplementedError("Non-centered boundary not yet supported for dimension '%s'" % dim)
if np.array(boundary).ndim == 2:
for elem in boundary:
make_coord_delta_array(elem)
raise NotImplementedError("Non-uniform boundary not yet supported for dimension '%s'" % dim)
return val
@tl.default("cache_output")
def _cache_output_default(self):
return settings["CACHE_DATASOURCE_OUTPUT_DEFAULT"]
# ------------------------------------------------------------------------------------------------------------------
# Properties
# ------------------------------------------------------------------------------------------------------------------
@property
def coordinates(self):
"""{coordinates}"""
if self._coordinates is not None:
nc = self._coordinates
elif self.cache_coordinates and self.has_cache("coordinates"):
nc = self.get_cache("coordinates")
self.set_trait("_coordinates", nc)
else:
nc = self.get_coordinates()
self.set_trait("_coordinates", nc)
if self.cache_coordinates:
self.put_cache(nc, "coordinates")
return nc
@property
def dims(self):
"""datasource dims."""
return self.coordinates.dims
@property
def udims(self):
"""datasource udims."""
return self.coordinates.udims
@property
def _crs(self):
"""datasource crs."""
return self.coordinates.crs
# ------------------------------------------------------------------------------------------------------------------
# Private Methods
# ------------------------------------------------------------------------------------------------------------------
def _get_data(self, rc, rci):
"""Wrapper for `self.get_data` with pre and post processing
Returns
-------
podpac.core.units.UnitsDataArray
Returns UnitsDataArray with coordinates defined by _requested_source_coordinates
Raises
------
TypeError
Raised if unknown data is passed by from self.get_data
NotImplementedError
Raised if get_data is not implemented by data source subclass
"""
# get data from data source at requested source coordinates and requested source coordinates index
data = self.get_data(rc, rci)
# convert data into UnitsDataArray depending on format
# TODO: what other processing needs to happen here?
if isinstance(data, UnitsDataArray):
udata_array = data
elif isinstance(data, xr.DataArray):
# TODO: check order of coordinates here
udata_array = self.create_output_array(rc, data=data.data)
elif isinstance(data, np.ndarray):
udata_array = self.create_output_array(rc, data=data)
else:
raise TypeError(
"Unknown data type passed back from "
+ "{}.get_data(): {}. ".format(type(self).__name__, type(data))
+ "Must be one of numpy.ndarray, xarray.DataArray, or podpac.UnitsDataArray"
)
# extract single output, if necessary
# subclasses should extract single outputs themselves if possible, but this provides a backup
if "output" in udata_array.dims and self.output is not None:
udata_array = udata_array.sel(output=self.output)
# fill nan_vals in data array
udata_array.data[np.isin(udata_array.data, self.nan_vals)] = self.nan_val
return udata_array
# ------------------------------------------------------------------------------------------------------------------
# Methods
# ------------------------------------------------------------------------------------------------------------------
[docs] def get_source_data(self, bounds={}):
"""
Get source data, without interpolation.
Arguments
---------
bounds : dict
Dictionary of bounds by dimension, optional.
Keys must be dimension names, and values are (min, max) tuples, e.g. ``{'lat': (10, 20)}``.
Returns
-------
data : UnitsDataArray
Source data
"""
coords, I = self.coordinates.select(bounds, return_index=True)
return self._get_data(coords, I)
[docs] def eval(self, coordinates, **kwargs):
"""
Wraps the super Node.eval method in order to cache with the correct coordinates.
The output is independent of the crs or any extra dimensions, so this transforms and removes extra dimensions
before caching in the super eval method.
"""
# check for missing dimensions
for c in self.coordinates.values():
if isinstance(c, Coordinates1d):
if c.name not in coordinates.udims:
raise ValueError("Cannot evaluate these coordinates, missing dim '%s'" % c.name)
elif isinstance(c, StackedCoordinates):
if all(dim not in coordinates.udims for dim in c.udims):
raise ValueError("Cannot evaluate these coordinates, missing at least one dim in '%s'" % c.name)
# store original requested coordinates
requested_coordinates = coordinates
# This is needed for the interpolation mixin to avoid floating-point discrepancies
# between the requested coordinates and the evaluated coordinates
self._requested_coordinates = requested_coordinates
# remove extra dimensions
extra = [
c.name
for c in coordinates.values()
if (isinstance(c, Coordinates1d) and c.name not in self.udims)
or (isinstance(c, StackedCoordinates) and all(dim not in self.udims for dim in c.dims))
]
coordinates = coordinates.drop(extra)
# transform coordinates into native crs if different
if coordinates.crs.lower() != self._crs.lower():
coordinates = coordinates.transform(self._crs)
# note: super().eval (not self._eval)
# This call already sub-selects an 'output' if specified
output = super().eval(coordinates, **kwargs)
# transform back to requested coordinates, if necessary
if coordinates.crs.lower() != requested_coordinates.crs.lower():
# need to use the already-selected output, if it exists
try:
outputs = output["output"].data.tolist()
if isinstance(outputs, str):
# this will pass outputs=None to the create function, which is what we want in this case
# which is when it is a single output (not a dim)
outputs = []
except KeyError:
# 'output' does not exist in the data, so outputs should be empty
outputs = []
except Exception as e:
outputs = self.outputs
coords = Coordinates.from_xarray(output, crs=output.attrs.get("crs", None))
# the coords.transform in the next line can cause floating point discrepancies between
# the requested coordinates and the output coordinates. This is handled in the
# InterpolationMixin using self._requested_coordinates
output = self.create_output_array(
coords.transform(requested_coordinates.crs), data=output.data, outputs=outputs
)
return output
@common_doc(COMMON_DATA_DOC)
def _eval(self, coordinates, output=None, _selector=None):
"""Evaluates this node using the supplied coordinates.
The coordinates are mapped to the requested coordinates, interpolated if necessary, and set to
`_requested_source_coordinates` with associated index `_requested_source_coordinates_index`. The requested
source coordinates and index are passed to `get_data()` returning the source data at the
coordinatesset to `_requested_source_data`. Finally `_requested_source_data` is interpolated
using the `interpolate` method and set to the `output` attribute of the node.
Parameters
----------
coordinates : :class:`podpac.Coordinates`
{requested_coordinates}
An exception is raised if the requested coordinates are missing dimensions in the DataSource.
Extra dimensions in the requested coordinates are dropped.
output : :class:`podpac.UnitsDataArray`, optional
{eval_output}
_selector: callable(coordinates, request_coordinates)
{eval_selector}
Returns
-------
{eval_return}
Raises
------
ValueError
Cannot evaluate these coordinates
"""
log.debug("Evaluating {} data source".format(self.__class__.__name__))
# Use the selector
if _selector is not None:
(rsc, rsci) = _selector(self.coordinates, coordinates, index_type=self.coordinate_index_type)
else:
# get source coordinates that are within the requested coordinates bounds
(rsc, rsci) = self.coordinates.intersect(coordinates, outer=True, return_index=True)
# if requested coordinates and coordinates do not intersect, shortcut with nan UnitsDataArary
if rsc.size == 0:
if output is None:
output = self.create_output_array(rsc)
if "output" in output.dims and self.output is not None:
output = output.sel(output=self.output)
else:
output[:] = np.nan
if settings["DEBUG"]:
self._evaluated_coordinates = coordinates
self._requested_source_coordinates = rsc
self._requested_source_coordinates_index = rsci
self._requested_source_boundary = None
self._requested_source_data = None
self._output = output
return output
# get data from data source
rsd = self._get_data(rsc, rsci)
if output is None:
# if requested_coordinates.crs.lower() != coordinates.crs.lower():
# if rsc.shape == rsd.shape:
# rsd = self.create_output_array(rsc, data=rsd.data)
# else:
# crds = Coordinates.from_xarray(rsd, crs=data.attrs.get("crs", None))
# rsd = self.create_output_array(crds.transform(rsc.crs), data=rsd.data)
output = rsd
else:
output.data[:] = rsd.data
# get indexed boundary
rsb = self._get_boundary(rsci)
output.attrs["boundary_data"] = rsb
output.attrs["bounds"] = self.coordinates.bounds
# save output to private for debugging
if settings["DEBUG"]:
self._evaluated_coordinates = coordinates
self._requested_source_coordinates = rsc
self._requested_source_coordinates_index = rsci
self._requested_source_boundary = rsb
self._requested_source_data = rsd
self._output = output
return output
[docs] def find_coordinates(self):
"""
Get the available coordinates for the Node. For a DataSource, this is just the coordinates.
Returns
-------
coords_list : list
singleton list containing the coordinates (Coordinates object)
"""
return [self.coordinates]
[docs] def get_bounds(self, crs="default"):
"""Get the full available coordinate bounds for the Node.
Arguments
---------
crs : str
Desired CRS for the bounds. Use 'source' to use the native source crs.
If not specified, podpac.settings["DEFAULT_CRS"] is used. Optional.
Returns
-------
bounds : dict
Bounds for each dimension. Keys are dimension names and values are tuples (min, max).
crs : str
The crs for the bounds.
"""
if crs == "default":
crs = settings["DEFAULT_CRS"]
elif crs == "source":
crs = self.coordinates.crs
return self.coordinates.transform(crs).bounds, crs
[docs] @common_doc(COMMON_DATA_DOC)
def get_data(self, coordinates, coordinates_index):
"""{get_data}
Raises
------
NotImplementedError
This needs to be implemented by derived classes
"""
raise NotImplementedError
[docs] @common_doc(COMMON_DATA_DOC)
def get_coordinates(self):
"""{get_coordinates}
Raises
------
NotImplementedError
This needs to be implemented by derived classes
"""
raise NotImplementedError
[docs] def set_coordinates(self, coordinates, force=False):
"""Set the coordinates. Used by Compositors as an optimization.
Arguments
---------
coordinates : :class:`podpac.Coordinates`
Coordinates to set. Usually these are coordinates that are shared across compositor sources.
NOTE: This is only currently used by SMAPCompositor. It should potentially be moved to the SMAPSource.
"""
if force or not self.trait_is_defined("_coordinates"):
self.set_trait("_coordinates", coordinates)
def _get_boundary(self, index):
"""
Select the boundary for the given the coordinates index. Only non-uniform boundary arrays need to be indexed.
Arguments
---------
index : tuple
Coordinates index (e.g. coordinates_index)
Returns
-------
boundary : dict
Indexed boundary. Uniform boundaries are unchanged and non-uniform boundary arrays are indexed.
"""
if index is None:
return self.boundary
boundary = {}
for c, I in zip(self.coordinates.values(), index):
for dim in c.dims:
if dim not in self.boundary:
pass
elif np.array(self.boundary[dim]).ndim == 2:
boundary[dim] = np.array(self.boundary[dim][I])
else:
boundary[dim] = self.boundary[dim]
return boundary