Source code for podpac.core.coordinates.uniform_coordinates1d

from __future__ import division, unicode_literals, print_function, absolute_import

import copy
from collections import OrderedDict

import numpy as np
import traitlets as tl
from collections import OrderedDict

from podpac.core.coordinates.utils import (
    make_coord_value,
    make_coord_delta,
    add_coord,
    divide_delta,
    timedelta_divisible,
    lower_precision_time_bounds,
)
from podpac.core.coordinates.coordinates1d import Coordinates1d
from podpac.core.coordinates.array_coordinates1d import ArrayCoordinates1d
from podpac.core.utils import cached_property


[docs]class UniformCoordinates1d(Coordinates1d): """ 1-dimensional array of uniformly-spaced coordinates defined by a start, stop, and step. UniformCoordinates1d efficiently stores a uniformly-spaced coordinate array; the full coordinates array is only calculated when needed. For numerical coordinates, the start, stop, and step are converted to ``float``. For time coordinates, the start and stop are converted to numpy ``datetime64``, and the step is converted to numpy ``timedelta64``. For convenience, podpac automatically converts datetime strings such as ``'2018-01-01'`` to ``datetime64`` and timedelta strings such as ``'1,D'`` to ``timedelta64``. UniformCoordinates1d can also be created by specifying the size instead of the step. Parameters ---------- start : float or datetime64 Start coordinate. stop : float or datetime64 Stop coordinate. Unless fix_stop_val == True at creation, this may not always be exactly equal to what the user specified. Internally we ensure that stop = start + step * (size - 1) step : float or timedelta64 Signed, non-zero step between coordinates. Note, the specified step my be changed internally to satisfy floating point consistency. That is, the consistent step will ensure that step = (stop - start) / (size - 1) name : str Dimension name, one of 'lat', 'lon', 'time', 'alt'. coordinates : array, read-only Full array of coordinate values. See Also -------- :class:`Coordinates1d`, :class:`ArrayCoordinates1d`, :class:`crange`, :class:`clinspace` """ start = tl.Union([tl.Float(), tl.Instance(np.datetime64)], read_only=True) start.__doc__ = ":float, datetime64: Start coordinate." stop = tl.Union([tl.Float(), tl.Instance(np.datetime64)], read_only=True) stop.__doc__ = ":float, datetime64: Stop coordinate." step = tl.Union([tl.Float(), tl.Instance(np.timedelta64)], read_only=True) step.__doc__ = ":float, timedelta64: Signed, non-zero step between coordinates."
[docs] def __init__(self, start, stop, step=None, size=None, name=None, fix_stop_val=False): """ Create uniformly-spaced 1d coordinates from a `start`, `stop`, and `step` or `size`. Parameters ---------- start : float or datetime64 Start coordinate. stop : float or datetime64 Stop coordinate. step : float or timedelta64 Signed, nonzero step between coordinates (either step or size required). size : int Number of coordinates (either step or size required). name : str, optional Dimension name, one of 'lat', 'lon', 'time', or 'alt'. fix_stop_val : bool, optional Default is False. If True, the constructor will modify the step to be consistent instead of the stop value. Otherwise, the stop value *may* be modified to ensure that stop = start + step * size Notes ------ When the user specifies fix_stop_val, then `stop` will always be exact as specified by the user. For floating point coordinates, the specified `step` my be changed internally to satisfy floating point consistency. That is, for consistency `step = (stop - start) / (size - 1)` """ if step is not None and size is not None: raise TypeError("only one of 'step' and 'size' is allowed") elif step is None and size is None: raise TypeError("'step' or 'size' is required") # validate and set start, stop, and step start = make_coord_value(start) stop = make_coord_value(stop) if step is not None: step = make_coord_delta(step) elif isinstance(size, (int, np.compat.long, np.integer)) and not isinstance(size, np.timedelta64): step = divide_delta(stop - start, size - 1) else: raise TypeError("size must be an integer, not '%s'" % type(size)) if isinstance(start, float) and isinstance(stop, float) and isinstance(step, float): fstep = step elif isinstance(start, np.datetime64) and isinstance(stop, np.datetime64) and isinstance(step, np.timedelta64): fstep = step.astype(float) else: raise TypeError( "UniformCoordinates1d mismatching types (start '%s', stop '%s', step '%s')." % (type(start), type(stop), type(step)) ) if fstep == 0: raise ValueError("Uniformcoordinates1d step cannot be zero") if fstep <= 0 and start < stop: raise ValueError("UniformCoordinates1d step must be greater than zero if start < stop.") if fstep >= 0 and start > stop: raise ValueError("UniformCoordinates1d step must be less than zero if start > stop.") self.set_trait("start", start) self.set_trait("stop", stop) self.set_trait("step", step) if not fix_stop_val: # Need to make sure that 'stop' is consistent with self.coordinates[-1] self.set_trait("stop", add_coord(self.start, (self.size - 1) * self.step)) # Make sure step is floating-point error consistent in all cases # This is only needed when the type is float if fstep == step and self.size > 1: step = divide_delta(self.stop - self.start, self.size - 1) self.set_trait("step", step) # set common properties super(UniformCoordinates1d, self).__init__(name=name)
def __eq__(self, other): if not self._eq_base(other): return False if isinstance(other, UniformCoordinates1d): if self.dtype == float: if not np.allclose([self.start, self.stop, self.step], [other.start, other.stop, other.step]): return False elif self.start != other.start or self.stop != other.stop or self.step != other.step: return False if isinstance(other, ArrayCoordinates1d): if self.dtype == float: if not np.allclose(self.coordinates, other.coordinates): return False else: if not np.array_equal(self.coordinates, other.coordinates): return False return True # ------------------------------------------------------------------------------------------------------------------ # Alternate Constructors # ------------------------------------------------------------------------------------------------------------------
[docs] @classmethod def from_tuple(cls, items, **kwargs): if not isinstance(items, tuple) or len(items) != 3: raise ValueError( "UniformCoordinates1d.from_tuple expects a tuple of (start, stop, step/size), got %s" % (items,) ) elif isinstance(items[2], int): return cls(items[0], items[1], size=items[2], **kwargs) else: step = make_coord_delta(items[2]) return cls(items[0], items[1], step, **kwargs)
[docs] @classmethod def from_definition(cls, d): """ Create uniformly-spaced 1d Coordinates from a coordinates definition. The definition must contain the coordinate start, stop, and step or size:: c = UniformCoordinates1d.from_definition({ "start": 1, "stop": 10, "step": 0.5 }) c = UniformCoordinates1d.from_definition({ "start": 1, "stop": 10, "size": 21 }) The definition may also contain any of the 1d Coordinates properties:: c = UniformCoordinates1d.from_definition({ "start": 1, "stop": 10, "step": 0.5, "name": "lat" }) Arguments --------- d : dict uniform 1d coordinates definition Returns ------- :class:`UniformCoordinates1d` uniformly-spaced 1d Coordinates See Also -------- definition """ if "start" not in d: raise ValueError('UniformCoordinates1d definition requires "start" property') if "stop" not in d: raise ValueError('UniformCoordinates1d definition requires "stop" property') start = d["start"] stop = d["stop"] kwargs = {k: v for k, v in d.items() if k not in ["start", "stop"]} return cls(start, stop, **kwargs)
# ----------------------------------------------------------------------------------------------------------------- # Standard methods, array-like # ----------------------------------------------------------------------------------------------------------------- def __getitem__(self, index): # fallback for non-slices if not isinstance(index, slice): # The following 3 lines is copied from ArrayCoordinates1d.__getitem__ if self.ndim == 1 and np.ndim(index) > 1 and np.array(index).dtype == int: index = np.array(index).flatten().tolist() return ArrayCoordinates1d(self.coordinates[index], **self.properties) # start, stop, step if index.start is None: start = self.start elif index.start >= 0: start = add_coord(self.start, self.step * min(index.start, self.size - 1)) else: start = add_coord(self.start, self.step * max(0, self.size + index.start)) if index.stop is None: stop = self.stop elif index.stop >= 0: stop = add_coord(self.start, self.step * (min(index.stop, self.size) - 1)) else: stop = add_coord(self.start, self.step * max(0, self.size + index.stop - 1)) if index.step is None: step = self.step else: step = index.step * self.step if index.step < 0: start, stop = stop, start # empty slice if ((start > stop) and np.array(step).astype(float) > 0) or ( (start < stop) and np.array(step).astype(float) < 0 ): return ArrayCoordinates1d([], **self.properties) return UniformCoordinates1d(start, stop, step, **self.properties) def __contains__(self, item): # overrides the Coordinates1d.__contains__ method with optimizations for uniform coordinates. try: item = make_coord_value(item) except: return False if type(item) != self.dtype: return False if item < self.bounds[0] or item > self.bounds[1]: return False if self.dtype == np.datetime64: return timedelta_divisible(item - self.start, self.step) else: return (item - self.start) % self.step == 0 # ------------------------------------------------------------------------------------------------------------------ # Properties # ------------------------------------------------------------------------------------------------------------------ @cached_property def coordinates(self): """:array, read-only: Coordinate values.""" coordinates = add_coord(self.start, np.arange(0, self.size) * self.step) # coordinates.setflags(write=False) # This breaks the 002-open-point-file example return coordinates @property def ndim(self): return 1 @property def shape(self): return (self.size,) @property def size(self): """Number of coordinates.""" dname = np.array(self.step).dtype.name if dname == "timedelta64[Y]": dyear = self.stop.item().year - self.start.item().year if dyear > 0 and self.stop.item().month < self.start.item().month: dyear -= 1 range_ = dyear step = self.step.item() elif dname == "timedelta64[M]": dyear = self.stop.item().year - self.start.item().year dmonth = self.stop.item().month - self.start.item().month range_ = 12 * dyear + dmonth step = self.step.item() else: range_ = self.stop - self.start step = self.step return max(0, int(np.floor(range_ / step + 1e-10) + 1)) @property def dtype(self): """:type: Coordinates dtype. ``float`` for numerical coordinates and numpy ``datetime64`` for datetime coordinates. """ return type(self.start) @property def is_monotonic(self): return True @property def is_descending(self): if self.start == self.stop: return None return self.stop < self.start @property def is_uniform(self): return True @property def bounds(self): """Low and high coordinate bounds.""" lo = self.start hi = add_coord(self.start, self.step * (self.size - 1)) if self.is_descending: lo, hi = hi, lo return lo, hi @property def argbounds(self): if self.is_descending: return -1, 0 else: return 0, -1 def _get_definition(self, full=True): d = OrderedDict() d["start"] = self.start d["stop"] = self.stop d["step"] = self.step d.update(self._full_properties if full else self.properties) return d # ------------------------------------------------------------------------------------------------------------------ # Methods # ------------------------------------------------------------------------------------------------------------------
[docs] def copy(self): """ Make a deep copy of the uniform 1d Coordinates. Returns ------- :class:`UniformCoordinates1d` Copy of the coordinates. """ kwargs = self.properties return UniformCoordinates1d(self.start, self.stop, self.step, **kwargs)
[docs] def unique(self, return_index=False): """ Return the coordinates (uniform coordinates are already unique). Arguments --------- return_index : bool, optional If True, return index for the unique coordinates in addition to the coordinates. Default False. Returns ------- unique : :class:`ArrayCoordinates1d` New ArrayCoordinates1d object with unique, sorted coordinate values. unique_index : list of indices index """ if return_index: return self.copy(), np.arange(self.size).tolist() else: return self.copy()
[docs] def simplify(self): """Get the simplified/optimized representation of these coordinates. Returns ------- simplified : UniformCoordinates1d These coordinates (the coordinates are already simplified). """ return self.copy()
[docs] def flatten(self): """ Return a copy of the uniform coordinates, for consistency. Returns ------- :class:`UniformCoordinates1d` Flattened coordinates. """ return self.copy()
[docs] def reshape(self, newshape): return ArrayCoordinates1d(self.coordinates, **self.properties).reshape(newshape)
[docs] def issubset(self, other): """Report whether other coordinates contains these coordinates. Arguments --------- other : Coordinates, Coordinates1d Other coordinates to check Returns ------- issubset : bool True if these coordinates are a subset of the other coordinates. Notes ----- This overrides the Coordinates1d.issubset method with optimizations for uniform coordinates. """ from podpac.core.coordinates import Coordinates if isinstance(other, Coordinates): if self.name not in other.dims: return False other = other[self.name] # use Coordinates1d implementation when the other coordinates are not uniform if not other.is_uniform: return super(UniformCoordinates1d, self).issubset(other) # use Coordinates1d implementation when the steps cannot be compared (e.g. months and days) try: self.step / other.step except TypeError: return super(UniformCoordinates1d, self).issubset(other) # short-cuts that don't require checking coordinates if self.dtype != other.dtype: return False if self.bounds[0] < other.bounds[0] or self.bounds[1] > other.bounds[1]: return False # check start and step if self.start not in other: return False if self.size == 1: return True if self.dtype == np.datetime64: return timedelta_divisible(self.step, other.step) else: return self.step % other.step == 0
def _select(self, bounds, return_index, outer): # TODO is there an easier way to do this with the new outer flag? my_bounds = self.bounds # If the bounds are of instance datetime64, then the comparison should happen at the lowest precision if self.dtype == np.datetime64: my_bounds, bounds = lower_precision_time_bounds(my_bounds, bounds, outer) lo = max(bounds[0], my_bounds[0]) hi = min(bounds[1], my_bounds[1]) fmin = (lo - my_bounds[0]) / np.abs(self.step) fmax = (hi - my_bounds[0]) / np.abs(self.step) imin = int(np.ceil(fmin)) imax = int(np.floor(fmax)) if outer: if imin != fmin: imin -= 1 if imax != fmax: imax += 1 imax = np.clip(imax + 1, 0, self.size) imin = np.clip(imin, 0, self.size) # empty case if imin >= imax: return self._select_empty(return_index) if self.is_descending: imax, imin = self.size - imin, self.size - imax I = slice(imin, imax) if return_index: return self[I], I else: return self[I]