Source code for podpac.core.coordinates.uniform_coordinates1d
from __future__ import division, unicode_literals, print_function, absolute_import
import copy
from collections import OrderedDict
import numpy as np
import traitlets as tl
from collections import OrderedDict
from podpac.core.coordinates.utils import (
make_coord_value,
make_coord_delta,
add_coord,
divide_delta,
timedelta_divisible,
lower_precision_time_bounds,
)
from podpac.core.coordinates.coordinates1d import Coordinates1d
from podpac.core.coordinates.array_coordinates1d import ArrayCoordinates1d
from podpac.core.utils import cached_property
[docs]class UniformCoordinates1d(Coordinates1d):
"""
1-dimensional array of uniformly-spaced coordinates defined by a start, stop, and step.
UniformCoordinates1d efficiently stores a uniformly-spaced coordinate array; the full coordinates array is only
calculated when needed. For numerical coordinates, the start, stop, and step are converted to ``float``. For time
coordinates, the start and stop are converted to numpy ``datetime64``, and the step is converted to numpy
``timedelta64``. For convenience, podpac automatically converts datetime strings such as ``'2018-01-01'`` to
``datetime64`` and timedelta strings such as ``'1,D'`` to ``timedelta64``.
UniformCoordinates1d can also be created by specifying the size instead of the step.
Parameters
----------
start : float or datetime64
Start coordinate.
stop : float or datetime64
Stop coordinate. Unless fix_stop_val == True at creation, this may not always be
exactly equal to what the user specified. Internally we ensure that stop = start + step * (size - 1)
step : float or timedelta64
Signed, non-zero step between coordinates. Note, the specified step my be changed internally to satisfy floating point consistency.
That is, the consistent step will ensure that step = (stop - start) / (size - 1)
name : str
Dimension name, one of 'lat', 'lon', 'time', 'alt'.
coordinates : array, read-only
Full array of coordinate values.
See Also
--------
:class:`Coordinates1d`, :class:`ArrayCoordinates1d`, :class:`crange`, :class:`clinspace`
"""
start = tl.Union([tl.Float(), tl.Instance(np.datetime64)], read_only=True)
start.__doc__ = ":float, datetime64: Start coordinate."
stop = tl.Union([tl.Float(), tl.Instance(np.datetime64)], read_only=True)
stop.__doc__ = ":float, datetime64: Stop coordinate."
step = tl.Union([tl.Float(), tl.Instance(np.timedelta64)], read_only=True)
step.__doc__ = ":float, timedelta64: Signed, non-zero step between coordinates."
[docs] def __init__(self, start, stop, step=None, size=None, name=None, fix_stop_val=False):
"""
Create uniformly-spaced 1d coordinates from a `start`, `stop`, and `step` or `size`.
Parameters
----------
start : float or datetime64
Start coordinate.
stop : float or datetime64
Stop coordinate.
step : float or timedelta64
Signed, nonzero step between coordinates (either step or size required).
size : int
Number of coordinates (either step or size required).
name : str, optional
Dimension name, one of 'lat', 'lon', 'time', or 'alt'.
fix_stop_val : bool, optional
Default is False. If True, the constructor will modify the step to be consistent
instead of the stop value. Otherwise, the stop value *may* be modified to ensure that
stop = start + step * size
Notes
------
When the user specifies fix_stop_val, then `stop` will always be exact as specified by the user.
For floating point coordinates, the specified `step` my be changed internally to satisfy floating point consistency.
That is, for consistency `step = (stop - start) / (size - 1)`
"""
if step is not None and size is not None:
raise TypeError("only one of 'step' and 'size' is allowed")
elif step is None and size is None:
raise TypeError("'step' or 'size' is required")
# validate and set start, stop, and step
start = make_coord_value(start)
stop = make_coord_value(stop)
if step is not None:
step = make_coord_delta(step)
elif isinstance(size, (int, np.compat.long, np.integer)) and not isinstance(size, np.timedelta64):
step = divide_delta(stop - start, size - 1)
else:
raise TypeError("size must be an integer, not '%s'" % type(size))
if isinstance(start, float) and isinstance(stop, float) and isinstance(step, float):
fstep = step
elif isinstance(start, np.datetime64) and isinstance(stop, np.datetime64) and isinstance(step, np.timedelta64):
fstep = step.astype(float)
else:
raise TypeError(
"UniformCoordinates1d mismatching types (start '%s', stop '%s', step '%s')."
% (type(start), type(stop), type(step))
)
if fstep == 0:
raise ValueError("Uniformcoordinates1d step cannot be zero")
if fstep <= 0 and start < stop:
raise ValueError("UniformCoordinates1d step must be greater than zero if start < stop.")
if fstep >= 0 and start > stop:
raise ValueError("UniformCoordinates1d step must be less than zero if start > stop.")
self.set_trait("start", start)
self.set_trait("stop", stop)
self.set_trait("step", step)
if not fix_stop_val: # Need to make sure that 'stop' is consistent with self.coordinates[-1]
self.set_trait("stop", add_coord(self.start, (self.size - 1) * self.step))
# Make sure step is floating-point error consistent in all cases
# This is only needed when the type is float
if fstep == step and self.size > 1:
step = divide_delta(self.stop - self.start, self.size - 1)
self.set_trait("step", step)
# set common properties
super(UniformCoordinates1d, self).__init__(name=name)
def __eq__(self, other):
if not self._eq_base(other):
return False
if isinstance(other, UniformCoordinates1d):
if self.dtype == float:
if not np.allclose([self.start, self.stop, self.step], [other.start, other.stop, other.step]):
return False
elif self.start != other.start or self.stop != other.stop or self.step != other.step:
return False
if isinstance(other, ArrayCoordinates1d):
if self.dtype == float:
if not np.allclose(self.coordinates, other.coordinates):
return False
else:
if not np.array_equal(self.coordinates, other.coordinates):
return False
return True
# ------------------------------------------------------------------------------------------------------------------
# Alternate Constructors
# ------------------------------------------------------------------------------------------------------------------
[docs] @classmethod
def from_tuple(cls, items, **kwargs):
if not isinstance(items, tuple) or len(items) != 3:
raise ValueError(
"UniformCoordinates1d.from_tuple expects a tuple of (start, stop, step/size), got %s" % (items,)
)
elif isinstance(items[2], int):
return cls(items[0], items[1], size=items[2], **kwargs)
else:
step = make_coord_delta(items[2])
return cls(items[0], items[1], step, **kwargs)
[docs] @classmethod
def from_definition(cls, d):
"""
Create uniformly-spaced 1d Coordinates from a coordinates definition.
The definition must contain the coordinate start, stop, and step or size::
c = UniformCoordinates1d.from_definition({
"start": 1,
"stop": 10,
"step": 0.5
})
c = UniformCoordinates1d.from_definition({
"start": 1,
"stop": 10,
"size": 21
})
The definition may also contain any of the 1d Coordinates properties::
c = UniformCoordinates1d.from_definition({
"start": 1,
"stop": 10,
"step": 0.5,
"name": "lat"
})
Arguments
---------
d : dict
uniform 1d coordinates definition
Returns
-------
:class:`UniformCoordinates1d`
uniformly-spaced 1d Coordinates
See Also
--------
definition
"""
if "start" not in d:
raise ValueError('UniformCoordinates1d definition requires "start" property')
if "stop" not in d:
raise ValueError('UniformCoordinates1d definition requires "stop" property')
start = d["start"]
stop = d["stop"]
kwargs = {k: v for k, v in d.items() if k not in ["start", "stop"]}
return cls(start, stop, **kwargs)
# -----------------------------------------------------------------------------------------------------------------
# Standard methods, array-like
# -----------------------------------------------------------------------------------------------------------------
def __getitem__(self, index):
# fallback for non-slices
if not isinstance(index, slice):
# The following 3 lines is copied from ArrayCoordinates1d.__getitem__
if self.ndim == 1 and np.ndim(index) > 1 and np.array(index).dtype == int:
index = np.array(index).flatten().tolist()
return ArrayCoordinates1d(self.coordinates[index], **self.properties)
# start, stop, step
if index.start is None:
start = self.start
elif index.start >= 0:
start = add_coord(self.start, self.step * min(index.start, self.size - 1))
else:
start = add_coord(self.start, self.step * max(0, self.size + index.start))
if index.stop is None:
stop = self.stop
elif index.stop >= 0:
stop = add_coord(self.start, self.step * (min(index.stop, self.size) - 1))
else:
stop = add_coord(self.start, self.step * max(0, self.size + index.stop - 1))
if index.step is None:
step = self.step
else:
step = index.step * self.step
if index.step < 0:
start, stop = stop, start
# empty slice
if ((start > stop) and np.array(step).astype(float) > 0) or (
(start < stop) and np.array(step).astype(float) < 0
):
return ArrayCoordinates1d([], **self.properties)
return UniformCoordinates1d(start, stop, step, **self.properties)
def __contains__(self, item):
# overrides the Coordinates1d.__contains__ method with optimizations for uniform coordinates.
try:
item = make_coord_value(item)
except:
return False
if type(item) != self.dtype:
return False
if item < self.bounds[0] or item > self.bounds[1]:
return False
if self.dtype == np.datetime64:
return timedelta_divisible(item - self.start, self.step)
else:
return (item - self.start) % self.step == 0
# ------------------------------------------------------------------------------------------------------------------
# Properties
# ------------------------------------------------------------------------------------------------------------------
@cached_property
def coordinates(self):
""":array, read-only: Coordinate values."""
coordinates = add_coord(self.start, np.arange(0, self.size) * self.step)
# coordinates.setflags(write=False) # This breaks the 002-open-point-file example
return coordinates
@property
def ndim(self):
return 1
@property
def shape(self):
return (self.size,)
@property
def size(self):
"""Number of coordinates."""
dname = np.array(self.step).dtype.name
if dname == "timedelta64[Y]":
dyear = self.stop.item().year - self.start.item().year
if dyear > 0 and self.stop.item().month < self.start.item().month:
dyear -= 1
range_ = dyear
step = self.step.item()
elif dname == "timedelta64[M]":
dyear = self.stop.item().year - self.start.item().year
dmonth = self.stop.item().month - self.start.item().month
range_ = 12 * dyear + dmonth
step = self.step.item()
else:
range_ = self.stop - self.start
step = self.step
return max(0, int(np.floor(range_ / step + 1e-10) + 1))
@property
def dtype(self):
""":type: Coordinates dtype.
``float`` for numerical coordinates and numpy ``datetime64`` for datetime coordinates.
"""
return type(self.start)
@property
def is_monotonic(self):
return True
@property
def is_descending(self):
if self.start == self.stop:
return None
return self.stop < self.start
@property
def is_uniform(self):
return True
@property
def bounds(self):
"""Low and high coordinate bounds."""
lo = self.start
hi = add_coord(self.start, self.step * (self.size - 1))
if self.is_descending:
lo, hi = hi, lo
return lo, hi
@property
def argbounds(self):
if self.is_descending:
return -1, 0
else:
return 0, -1
def _get_definition(self, full=True):
d = OrderedDict()
d["start"] = self.start
d["stop"] = self.stop
d["step"] = self.step
d.update(self._full_properties if full else self.properties)
return d
# ------------------------------------------------------------------------------------------------------------------
# Methods
# ------------------------------------------------------------------------------------------------------------------
[docs] def copy(self):
"""
Make a deep copy of the uniform 1d Coordinates.
Returns
-------
:class:`UniformCoordinates1d`
Copy of the coordinates.
"""
kwargs = self.properties
return UniformCoordinates1d(self.start, self.stop, self.step, **kwargs)
[docs] def unique(self, return_index=False):
"""
Return the coordinates (uniform coordinates are already unique).
Arguments
---------
return_index : bool, optional
If True, return index for the unique coordinates in addition to the coordinates. Default False.
Returns
-------
unique : :class:`ArrayCoordinates1d`
New ArrayCoordinates1d object with unique, sorted coordinate values.
unique_index : list of indices
index
"""
if return_index:
return self.copy(), np.arange(self.size).tolist()
else:
return self.copy()
[docs] def simplify(self):
"""Get the simplified/optimized representation of these coordinates.
Returns
-------
simplified : UniformCoordinates1d
These coordinates (the coordinates are already simplified).
"""
return self.copy()
[docs] def flatten(self):
"""
Return a copy of the uniform coordinates, for consistency.
Returns
-------
:class:`UniformCoordinates1d`
Flattened coordinates.
"""
return self.copy()
[docs] def reshape(self, newshape):
return ArrayCoordinates1d(self.coordinates, **self.properties).reshape(newshape)
[docs] def issubset(self, other):
"""Report whether other coordinates contains these coordinates.
Arguments
---------
other : Coordinates, Coordinates1d
Other coordinates to check
Returns
-------
issubset : bool
True if these coordinates are a subset of the other coordinates.
Notes
-----
This overrides the Coordinates1d.issubset method with optimizations for uniform coordinates.
"""
from podpac.core.coordinates import Coordinates
if isinstance(other, Coordinates):
if self.name not in other.dims:
return False
other = other[self.name]
# use Coordinates1d implementation when the other coordinates are not uniform
if not other.is_uniform:
return super(UniformCoordinates1d, self).issubset(other)
# use Coordinates1d implementation when the steps cannot be compared (e.g. months and days)
try:
self.step / other.step
except TypeError:
return super(UniformCoordinates1d, self).issubset(other)
# short-cuts that don't require checking coordinates
if self.dtype != other.dtype:
return False
if self.bounds[0] < other.bounds[0] or self.bounds[1] > other.bounds[1]:
return False
# check start and step
if self.start not in other:
return False
if self.size == 1:
return True
if self.dtype == np.datetime64:
return timedelta_divisible(self.step, other.step)
else:
return self.step % other.step == 0
def _select(self, bounds, return_index, outer):
# TODO is there an easier way to do this with the new outer flag?
my_bounds = self.bounds
# If the bounds are of instance datetime64, then the comparison should happen at the lowest precision
if self.dtype == np.datetime64:
my_bounds, bounds = lower_precision_time_bounds(my_bounds, bounds, outer)
lo = max(bounds[0], my_bounds[0])
hi = min(bounds[1], my_bounds[1])
fmin = (lo - my_bounds[0]) / np.abs(self.step)
fmax = (hi - my_bounds[0]) / np.abs(self.step)
imin = int(np.ceil(fmin))
imax = int(np.floor(fmax))
if outer:
if imin != fmin:
imin -= 1
if imax != fmax:
imax += 1
imax = np.clip(imax + 1, 0, self.size)
imin = np.clip(imin, 0, self.size)
# empty case
if imin >= imax:
return self._select_empty(return_index)
if self.is_descending:
imax, imin = self.size - imin, self.size - imax
I = slice(imin, imax)
if return_index:
return self[I], I
else:
return self[I]