Source code for podpac.core.data.types

"""
Type Summary

Attributes
----------
WCS_DEFAULT_CRS : str
    Description
WCS_DEFAULT_VERSION : str
    Description
"""

from __future__ import division, unicode_literals, print_function, absolute_import

import warnings
import os
import re
from io import BytesIO
from collections import OrderedDict, defaultdict
from six import string_types
import logging

import numpy as np
import traitlets as tl
import pandas as pd  # Core dependency of xarray
import xarray as xr

# Helper utility for optional imports
from lazy_import import lazy_module, lazy_class

# Internal dependencies
from podpac.core import authentication
from podpac.core.node import Node
from podpac.core.settings import settings
from podpac.core.utils import common_doc, trait_is_defined, ArrayTrait, NodeTrait
from podpac.core.data.datasource import COMMON_DATA_DOC, DataSource
from podpac.core.coordinates import Coordinates, UniformCoordinates1d, ArrayCoordinates1d, StackedCoordinates
from podpac.core.coordinates.utils import Dimension
from podpac.core.algorithm.algorithm import Algorithm
from podpac.core.data.interpolation import interpolation_trait

# Optional dependencies
bs4 = lazy_module("bs4")
# Not used directly, but used indirectly by bs4 so want to check if it's available
lxml = lazy_module("lxml")
pydap = lazy_module("pydap")
lazy_module("pydap.client")
lazy_module("pydap.model")
rasterio = lazy_module("rasterio")
h5py = lazy_module("h5py")
boto3 = lazy_module("boto3")
requests = lazy_module("requests")
zarr = lazy_module("zarr")
zarrGroup = lazy_class("zarr.Group")
s3fs = lazy_module("s3fs")
# esri
RasterToNumPyArray = lazy_module("arcpy.RasterToNumPyArray")
urllib3 = lazy_module("urllib3")
certifi = lazy_module("certifi")

# Set up logging
_logger = logging.getLogger(__name__)


[docs]class Array(DataSource): """Create a DataSource from an array Attributes ---------- source : np.ndarray Numpy array containing the source data Notes ------ `native_coordinates` need to supplied by the user when instantiating this node. """ source = ArrayTrait().tag(readonly=True) native_coordinates = tl.Instance(Coordinates, allow_none=False).tag(attr=True) @tl.validate("source") def _validate_source(self, d): a = d["value"] try: a.astype(float) except: raise ValueError("Array source must be numerical") return a def _first_init(self, **kwargs): # If Array is being created from Node.from_definition or Node.from_json, then we have to handle the # native coordinates specifically. This is special. No other DataSource node needs to deserialize # native_coordinates in this way because it is implemented specifically in the node through get_coordinates if isinstance(kwargs.get("native_coordinates"), OrderedDict): kwargs["native_coordinates"] = Coordinates.from_definition(kwargs["native_coordinates"]) elif isinstance(kwargs.get("native_coordinates"), string_types): kwargs["native_coordinates"] = Coordinates.from_json(kwargs["native_coordinates"]) return kwargs
[docs] @common_doc(COMMON_DATA_DOC) def get_data(self, coordinates, coordinates_index): """{get_data} """ s = coordinates_index d = self.create_output_array(coordinates, data=self.source[s]) return d
[docs]@common_doc(COMMON_DATA_DOC) class PyDAP(DataSource): """Create a DataSource from an OpenDAP server feed. Attributes ---------- auth_class : :class:`podpac.authentication.Session` :class:`requests.Session` derived class providing authentication credentials. When username and password are provided, an auth_session is created using this class. auth_session : :class:`podpac.authentication.Session` Instance of the auth_class. This is created if username and password is supplied, but this object can also be supplied directly datakey : str Pydap 'key' for the data to be retrieved from the server. Datasource may have multiple keys, so this key determines which variable is returned from the source. dataset : pydap.model.DatasetType The open pydap dataset. This is provided for troubleshooting. native_coordinates : Coordinates {native_coordinates} password : str, optional Password used for authenticating against OpenDAP server. WARNING: this is stored as plain-text, provide auth_session instead if you have security concerns. source : str URL of the OpenDAP server. username : str, optional Username used for authenticating against OpenDAP server. WARNING: this is stored as plain-text, provide auth_session instead if you have security concerns. """ source = tl.Unicode().tag(readonly=True) dataset = tl.Instance("pydap.model.DatasetType").tag(readonly=True) # node attrs datakey = tl.Unicode().tag(attr=True) # optional inputs auth_class = tl.Type(authentication.Session) auth_session = tl.Instance(authentication.Session, allow_none=True) username = tl.Unicode(default_value=None, allow_none=True) password = tl.Unicode(default_value=None, allow_none=True) @tl.default("auth_session") def _auth_session_default(self): # requires username and password if not self.username or not self.password: return None # requires auth_class # TODO: default auth_class? if not self.auth_class: return None # instantiate and check utl try: session = self.auth_class(username=self.username, password=self.password) session.get(self.source + ".dds") except: # TODO: catch a 403 error return None return session @tl.default("dataset") def _open_dataset(self): """Summary Parameters ---------- source : str, optional Description Returns ------- TYPE Description """ # auth session # if self.auth_session: try: dataset = self._open_url() except Exception: # TODO handle a 403 error # TODO: Check Url (probably inefficient...) try: self.auth_session.get(self.source + ".dds") dataset = self._open_url() except Exception: # TODO: handle 403 error print("Warning, dataset could not be opened. Check login credentials.") dataset = None return dataset def _open_url(self): return pydap.client.open_url(self.source, session=self.auth_session)
[docs] @common_doc(COMMON_DATA_DOC) def get_native_coordinates(self): """{get_native_coordinates} Raises ------ NotImplementedError DAP has no mechanism for creating coordinates automatically, so this is left up to child classes. """ raise NotImplementedError( "DAP has no mechanism for creating coordinates" + ", so this is left up to child class " + "implementations." )
[docs] @common_doc(COMMON_DATA_DOC) def get_data(self, coordinates, coordinates_index): """{get_data} """ data = self.dataset[self.datakey][tuple(coordinates_index)] # PyDAP 3.2.1 gives a numpy array for the above, whereas 3.2.2 needs the .data attribute to get a numpy array if not isinstance(data, np.ndarray) and hasattr(data, "data"): data = data.data d = self.create_output_array(coordinates, data=data.reshape(coordinates.shape)) return d
@property def keys(self): """The list of available keys from the OpenDAP dataset. Returns ------- List The list of available keys from the OpenDAP dataset. Any of these keys can be set as self.datakey """ return self.dataset.keys()
[docs]@common_doc(COMMON_DATA_DOC) class CSV(DataSource): """Create a DataSource from a .csv file. This class assumes that the data has a storage format such as: header 1, header 2, header 3, ... row1_data1, row1_data2, row1_data3, ... row2_data1, row2_data2, row2_data3, ... Attributes ---------- native_coordinates : Coordinates {native_coordinates} source : str Path to the data source alt_col : str or int Column number or column title containing altitude data lat_col : str or int Column number or column title containing latitude data lon_col : str or int Column number or column title containing longitude data time_col : str or int Column number or column title containing time data data_col : str or int Column number or column title containing output data dims : list[str] Default is ['alt', 'lat', 'lon', 'time']. List of dimensions tested. This list determined the order of the stacked dimensions. dataset : pd.DataFrame Raw Pandas DataFrame used to read the data """ source = tl.Unicode().tag(readonly=True) dataset = tl.Instance(pd.DataFrame).tag(readonly=True) # node attrs dims = tl.List(default_value=["alt", "lat", "lon", "time"]).tag(attr=True) alt_col = tl.Union([tl.Unicode(), tl.Int()]).tag(attr=True) lat_col = tl.Union([tl.Unicode(), tl.Int()]).tag(attr=True) lon_col = tl.Union([tl.Unicode(), tl.Int()]).tag(attr=True) time_col = tl.Union([tl.Unicode(), tl.Int()]).tag(attr=True) data_col = tl.Union([tl.Unicode(), tl.Int()]).tag(attr=True) def _first_init(self, **kwargs): # First part of if tests to make sure this is the CSV parent class # It's assumed that derived classes will define alt_col etc for specialized readers if type(self) == CSV and not ( ("alt_col" in kwargs) or ("time_col" in kwargs) or ("lon_col" in kwargs) or ("lat_col" in kwargs) ): raise TypeError("CSV requires at least one of time_col, alt_col, lat_col, or lon_col.") return super(CSV, self)._first_init(**kwargs) @property def _alt_col(self): if isinstance(self.alt_col, int): return self.alt_col return self.dataset.columns.get_loc(self.alt_col) @property def _lat_col(self): if isinstance(self.lat_col, int): return self.lat_col return self.dataset.columns.get_loc(self.lat_col) @property def _lon_col(self): if isinstance(self.lon_col, int): return self.lon_col return self.dataset.columns.get_loc(self.lon_col) @property def _time_col(self): if isinstance(self.time_col, int): return self.time_col return self.dataset.columns.get_loc(self.time_col) @property def _data_col(self): if isinstance(self.data_col, int): return self.data_col return self.dataset.columns.get_loc(self.data_col) @tl.default("dataset") def _open_dataset(self): """Opens the data source Returns ------- pd.DataFrame pd.read_csv(source) """ return pd.read_csv(self.source, parse_dates=True, infer_datetime_format=True)
[docs] @common_doc(COMMON_DATA_DOC) def get_native_coordinates(self): """{get_native_coordinates} The default implementation tries to find the lat/lon coordinates based on dataset.affine or dataset.transform (depending on the version of rasterio). It cannot determine the alt or time dimensions, so child classes may have to overload this method. """ coords = [] for d in self.dims: if trait_is_defined(self, d + "_col") or ( d + "_col" not in self.trait_names() and hasattr(self, d + "_col") ): i = getattr(self, "_{}_col".format(d)) if d is "time": c = np.array(self.dataset.iloc[:, i], np.datetime64) else: c = np.array(self.dataset.iloc[:, i]) coords.append(ArrayCoordinates1d(c, name=d)) if len(coords) > 1: coords = [StackedCoordinates(coords)] return Coordinates(coords)
[docs] @common_doc(COMMON_DATA_DOC) def get_data(self, coordinates, coordinates_index): """{get_data} """ d = self.dataset.iloc[coordinates_index[0], self._data_col] return self.create_output_array(coordinates, data=d)
[docs]@common_doc(COMMON_DATA_DOC) class Rasterio(DataSource): """Create a DataSource using Rasterio. Parameters ---------- source : str, :class:`io.BytesIO` Path to the data source band : int The 'band' or index for the variable being accessed in files such as GeoTIFFs Attributes ---------- dataset : :class:`rasterio._io.RasterReader` A reference to the datasource opened by rasterio native_coordinates : :class:`podpac.Coordinates` {native_coordinates} Notes ------ The source could be a path to an s3 bucket file, e.g.: s3://landsat-pds/L8/139/045/LC81390452014295LGN00/LC81390452014295LGN00_B1.TIF In that case, make sure to set the environmental variable: * Windows: set CURL_CA_BUNDLE=<path_to_conda_env>\Library\ssl\cacert.pem * Linux: export CURL_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt """ source = tl.Union([tl.Unicode(), tl.Instance(BytesIO)]).tag(readonly=True) dataset = tl.Any().tag(readonly=True) # node attrs band = tl.CInt(1).tag(attr=True) @tl.default("dataset") def _open_dataset(self): """Opens the data source Returns ------- :class:`rasterio.io.DatasetReader` Rasterio dataset """ # TODO: dataset should not open by default # prefer with as: syntax if isinstance(self.source, BytesIO): # https://rasterio.readthedocs.io/en/latest/topics/memory-files.html # TODO: this is still not working quite right - likely need to work # out the BytesIO format or how we are going to read/write in memory with rasterio.MemoryFile(self.source) as memfile: return memfile.open(driver="GTiff") # local file else: return rasterio.open(self.source)
[docs] def close_dataset(self): """Closes the file for the datasource """ self.dataset.close()
[docs] @common_doc(COMMON_DATA_DOC) def get_native_coordinates(self): """{get_native_coordinates} The default implementation tries to find the lat/lon coordinates based on dataset.affine. It cannot determine the alt or time dimensions, so child classes may have to overload this method. """ # check to see if the coordinates are rotated used affine affine = self.dataset.transform if affine[1] != 0.0 or affine[3] != 0.0: raise NotImplementedError("Rotated coordinates are not yet supported") try: crs = self.dataset.crs["init"].upper() except: crs = None # get bounds left, bottom, right, top = self.dataset.bounds # rasterio reads data upside-down from coordinate conventions, so lat goes from top to bottom lat = UniformCoordinates1d(top, bottom, size=self.dataset.height, name="lat") lon = UniformCoordinates1d(left, right, size=self.dataset.width, name="lon") return Coordinates([lat, lon], dims=["lat", "lon"], crs=crs)
[docs] @common_doc(COMMON_DATA_DOC) def get_data(self, coordinates, coordinates_index): """{get_data} """ data = self.create_output_array(coordinates) slc = coordinates_index # read data within coordinates_index window window = ((slc[0].start, slc[0].stop), (slc[1].start, slc[1].stop)) raster_data = self.dataset.read(self.band, out_shape=tuple(coordinates.shape), window=window) # set raster data to output array data.data.ravel()[:] = raster_data.ravel() return data
@property def band_count(self): """The number of bands Returns ------- int The number of bands in the dataset """ if not hasattr(self, "_band_count"): self._band_count = self.dataset.count return self._band_count @property def band_descriptions(self): """A description of each band contained in dataset.tags Returns ------- OrderedDict Dictionary of band_number: band_description pairs. The band_description values are a dictionary, each containing a number of keys -- depending on the metadata """ if not hasattr(self, "_band_descriptions"): self._band_descriptions = OrderedDict((i, self.dataset.tags(i + 1)) for i in range(self.band_count)) return self._band_descriptions @property def band_keys(self): """An alternative view of band_descriptions based on the keys present in the metadata Returns ------- dict Dictionary of metadata keys, where the values are the value of the key for each band. For example, band_keys['TIME'] = ['2015', '2016', '2017'] for a dataset with three bands. """ if not hasattr(self, "_band_keys"): keys = {k for i in range(self.band_count) for k in self.band_descriptions[i]} # set self._band_keys = {k: [self.band_descriptions[i].get(k) for i in range(self.band_count)] for k in keys} return self._band_keys
[docs] def get_band_numbers(self, key, value): """Return the bands that have a key equal to a specified value. Parameters ---------- key : str / list Key present in the metadata of the band. Can be a single key, or a list of keys. value : str / list Value of the key that should be returned. Can be a single value, or a list of values Returns ------- np.ndarray An array of band numbers that match the criteria """ if (not hasattr(key, "__iter__") or isinstance(key, string_types)) and ( not hasattr(value, "__iter__") or isinstance(value, string_types) ): key = [key] value = [value] match = np.ones(self.band_count, bool) for k, v in zip(key, value): match = match & (np.array(self.band_keys[k]) == v) matches = np.where(match)[0] + 1 return matches
class DatasetCoordinatedMixin(tl.HasTraits): latkey = tl.Unicode(allow_none=True, default_value="lat").tag(attr=True) lonkey = tl.Unicode(allow_none=True, default_value="lon").tag(attr=True) timekey = tl.Unicode(allow_none=True, default_value="time").tag(attr=True) altkey = tl.Unicode(allow_none=True, default_value="alt").tag(attr=True) dims = tl.List(trait=Dimension()).tag(attr=True) crs = tl.Unicode(allow_none=True, default_value=None).tag(attr=True) cf_time = tl.Bool(False).tag(attr=True) cf_units = tl.Unicode(allow_none=True).tag(attr=True) cf_calendar = tl.Unicode(allow_none=True).tag(attr=True) @common_doc(COMMON_DATA_DOC) def get_native_coordinates(self): """{get_native_coordinates} """ cs = [] for dim in self.dims: if dim == "lat": cs.append(self.get_lat()) elif dim == "lon": cs.append(self.get_lon()) elif dim == "time": cs.append(self.get_time()) elif dim == "alt": cs.append(self.get_alt()) return Coordinates(cs, dims=self.dims, crs=self.crs) def get_lat(self): """ Get the native lat coordinates. Subclasses should customize as needed. Returns ------- lat : array-like or Coordinates1d native latitude coordinates. """ return self.dataset[self.latkey] def get_lon(self): """ Get the native lon coordinates. Subclasses should customize as needed. Returns ------- lon : array-like or Coordinates1d native latitude coordinates. """ return self.dataset[self.lonkey] def get_time(self): """ Get the native time coordinates. Subclasses should customize as needed. Returns ------- time : array-like or Coordinates1d native latitude coordinates. """ values = self.dataset[self.timekey] if self.cf_time: values = xr.coding.times.decode_cf_datetime(values, self.cf_units, self.cf_calendar) return values def get_alt(self): """ Get the native alt coordinates. Subclasses should customize as needed. Returns ------- alt : array-like or Coordinates1d native latitude coordinates. """ return self.dataset[self.altkey]
[docs]@common_doc(COMMON_DATA_DOC) class H5PY(DatasetCoordinatedMixin, DataSource): """Create a DataSource node using h5py. Attributes ---------- datakey : str The 'key' for the data to be retrieved from the file. Datasource may have multiple keys, so this key determines which variable is returned from the source. dataset : h5py.File The h5py File object used for opening the file native_coordinates : Coordinates {native_coordinates} source : str Path to the data source For example, if self.datasets[datakey] has shape (1, 2, 3) and the (time, lon, lat) dimensions have sizes (1, 2, 3) then dims should be ['time', 'lon', 'lat'] file_mode : str, optional Default is 'r'. The mode used to open the HDF5 file. Options are r, r+, w, w- or x, a (see h5py.File). """ source = tl.Unicode().tag(readonly=True) dataset = tl.Any().tag(readonly=True) file_mode = tl.Unicode(default_value="r").tag(readonly=True) # node attrs datakey = tl.Unicode().tag(attr=True) @tl.default("dataset") def _open_dataset(self): """Opens the data source Parameters ---------- source : str, optional Uses self.source by default. Path to the data source. Returns ------- Any raster.open(source) """ # TODO: dataset should not open by default # prefer with as: syntax return h5py.File(self.source, self.file_mode)
[docs] def close_dataset(self): """Closes the file for the datasource """ self.dataset.close()
[docs] @common_doc(COMMON_DATA_DOC) def get_data(self, coordinates, coordinates_index): """{get_data} """ data = self.create_output_array(coordinates) slc = coordinates_index a = self.dataset[self.datakey][slc] data.data.ravel()[:] = a.ravel() return data
@property def keys(self): return H5PY._find_h5py_keys(self.dataset)
[docs] def attrs(self, key="/"): """ Dataset or group key for which attributes will be summarized. """ return dict(self.dataset[key].attrs)
@staticmethod def _find_h5py_keys(obj, keys=[]): if isinstance(obj, (h5py.Group, h5py.File)): for k in obj.keys(): keys = H5PY._find_h5py_keys(obj[k], keys) else: keys.append(obj.name) return keys keys = list(set(keys)) keys.sort() return keys
class Zarr(DatasetCoordinatedMixin, DataSource): source = tl.Unicode(default_value=None, allow_none=True).tag(readonly=True) dataset = tl.Any().tag(readonly=True) file_mode = tl.Unicode(default_value="r").tag(attr=True) # node attrs datakey = tl.Unicode().tag(attr=True) # optional inputs access_key_id = tl.Unicode() secret_access_key = tl.Unicode() region_name = tl.Unicode() @tl.default("access_key_id") def _get_access_key_id(self): return settings["AWS_ACCESS_KEY_ID"] @tl.default("secret_access_key") def _get_secret_access_key(self): return settings["AWS_SECRET_ACCESS_KEY"] @tl.default("region_name") def _get_region_name(self): return settings["AWS_REGION_NAME"] def init(self): # check that source or dataset is provided if self.source is None: self.dataset # check dim keys for dim in self.dims: if dim == "lat": if self.latkey is None: raise TypeError("Zarr node 'latkey' is required for dims %s" % self.dims) if self.latkey not in self.dataset: raise ValueError("Zarr lat key '%s' not found" % self.latkey) elif dim == "lon": if self.lonkey is None: raise TypeError("Zarr node 'lonkey' is required for dims %s" % self.dims) if self.lonkey not in self.dataset: raise ValueError("Zarr lon key '%s' not found" % self.lonkey) elif dim == "time": if self.timekey is None: raise TypeError("Zarr node 'timekey' is required for dims %s" % self.dims) if self.timekey not in self.dataset: raise ValueError("Zarr time key '%s' not found" % self.timekey) elif dim == "alt": if self.altkey is None: raise TypeError("Zarr node 'altkey' is required for dims %s" % self.dims) if self.altkey not in self.dataset: raise ValueError("Zarr alt key '%s' not found" % self.altkey) # check data key if self.datakey not in self.dataset: raise ValueError("Zarr data key '%s' not found" % self.datakey) @tl.default("dataset") def _open_dataset(self): if self.source is None: raise TypeError("Zarr node requires 'source' or 'dataset'") if self.source.startswith("s3://"): root = self.source.strip("s3://") kwargs = {"region_name": self.region_name} s3 = s3fs.S3FileSystem(key=self.access_key_id, secret=self.secret_access_key, client_kwargs=kwargs) s3map = s3fs.S3Map(root=root, s3=s3, check=False) store = s3map else: store = str(self.source) # has to be a string in Python2.7 for local files try: return zarr.open(store, mode=self.file_mode) except ValueError: raise ValueError("No Zarr store found at path '%s'" % self.source) @common_doc(COMMON_DATA_DOC) def get_data(self, coordinates, coordinates_index): """{get_data} """ data = self.create_output_array(coordinates) a = self.dataset[self.datakey][coordinates_index] data.data.ravel()[:] = a.ravel() return data WCS_DEFAULT_VERSION = "1.0.0" WCS_DEFAULT_CRS = "EPSG:4326"
[docs]class WCS(DataSource): """Create a DataSource from an OGC-complient WCS service Attributes ---------- crs : 'str' Default is EPSG:4326 (WGS84 Geodic) EPSG number for the coordinate reference system that the data should be returned in. layer_name : str Name of the WCS layer that should be fetched from the server source : str URL of the WCS server endpoint version : str Default is 1.0.0. WCS version string. wcs_coordinates : Coordinates The coordinates of the WCS source """ source = tl.Unicode().tag(readonly=True) wcs_coordinates = tl.Instance(Coordinates).tag(readonly=True) # default below # node attrs layer_name = tl.Unicode().tag(attr=True) version = tl.Unicode(WCS_DEFAULT_VERSION).tag(attr=True) crs = tl.Unicode(WCS_DEFAULT_CRS).tag(attr=True) _get_capabilities_qs = tl.Unicode("SERVICE=WCS&REQUEST=DescribeCoverage&" "VERSION={version}&COVERAGE={layer}") _get_data_qs = tl.Unicode( "SERVICE=WCS&VERSION={version}&REQUEST=GetCoverage&" "FORMAT=GeoTIFF&COVERAGE={layer}&" "BBOX={w},{s},{e},{n}&CRS={crs}&RESPONSE_CRS={crs}&" "WIDTH={width}&HEIGHT={height}&TIME={time}" ) # TODO: This should be capabilities_url, not get_ @property def get_capabilities_url(self): """Constructs the url that requests the WCS capabilities Returns ------- str The url that requests the WCS capabilities """ return self.source + "?" + self._get_capabilities_qs.format(version=self.version, layer=self.layer_name) @tl.default("wcs_coordinates") def get_wcs_coordinates(self): """Retrieves the native coordinates reported by the WCS service. Returns ------- Coordinates The native coordinates reported by the WCS service. Notes ------- This assumes a `time`, `lat`, `lon` order for the coordinates, and currently doesn't handle `alt` coordinates Raises ------ Exception Raises this if the required dependencies are not installed. """ if requests is not None: capabilities = requests.get(self.get_capabilities_url) if capabilities.status_code != 200: raise Exception("Could not get capabilities from WCS server") capabilities = capabilities.text # TODO: remove support urllib3 - requests is sufficient elif urllib3 is not None: if certifi is not None: http = urllib3.PoolManager(ca_certs=certifi.where()) else: http = urllib3.PoolManager() r = http.request("GET", self.get_capabilities_url) capabilities = r.data if r.status != 200: raise Exception("Could not get capabilities from WCS server:" + self.get_capabilities_url) else: raise Exception("Do not have a URL request library to get WCS data.") if ( lxml is not None ): # could skip using lxml and always use html.parser instead, which seems to work but lxml might be faster capabilities = bs4.BeautifulSoup(capabilities, "lxml") else: capabilities = bs4.BeautifulSoup(capabilities, "html.parser") domain = capabilities.find("wcs:spatialdomain") pos = domain.find("gml:envelope").get_text().split() lonlat = np.array(pos, float).reshape(2, 2) grid_env = domain.find("gml:gridenvelope") low = np.array(grid_env.find("gml:low").text.split(), int) high = np.array(grid_env.find("gml:high").text.split(), int) size = high - low dlondlat = (lonlat[1, :] - lonlat[0, :]) / size bottom = lonlat[:, 1].min() + dlondlat[1] / 2 top = lonlat[:, 1].max() - dlondlat[1] / 2 left = lonlat[:, 0].min() + dlondlat[0] / 2 right = lonlat[:, 0].max() - dlondlat[0] / 2 timedomain = capabilities.find("wcs:temporaldomain") if timedomain is None: return Coordinates( [ UniformCoordinates1d(top, bottom, size=size[1], name="lat"), UniformCoordinates1d(left, right, size=size[0], name="lon"), ] ) date_re = re.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}") times = str(timedomain).replace("<gml:timeposition>", "").replace("</gml:timeposition>", "").split("\n") times = np.array([t for t in times if date_re.match(t)], np.datetime64) if len(times) == 0: return Coordinates( [ UniformCoordinates1d(top, bottom, size=size[1], name="lat"), UniformCoordinates1d(left, right, size=size[0], name="lon"), ] ) return Coordinates( [ ArrayCoordinates1d(times, name="time"), UniformCoordinates1d(top, bottom, size=size[1], name="lat"), UniformCoordinates1d(left, right, size=size[0], name="lon"), ] ) @property @common_doc(COMMON_DATA_DOC) def native_coordinates(self): """{native_coordinates} Returns ------- Coordinates {native_coordinates} Notes ------ This is a little tricky and doesn't fit into the usual PODPAC method, as the service is actually doing the data wrangling for us... """ # TODO update so that we don't rely on _requested_coordinates if possible if not self._requested_coordinates: return self.wcs_coordinates cs = [] for dim in self.wcs_coordinates.dims: if dim in self._requested_coordinates.dims: c = self._requested_coordinates[dim] if c.size == 1: cs.append(ArrayCoordinates1d(c.coordinates[0], name=dim)) elif isinstance(c, UniformCoordinates1d): cs.append(UniformCoordinates1d(c.bounds[0], c.bounds[1], abs(c.step), name=dim)) else: # TODO: generalize/fix this # WCS calls require a regular grid, could (otherwise we have to do multiple WCS calls) cs.append(UniformCoordinates1d(c.bounds[0], c.bounds[1], size=c.size, name=dim)) else: cs.append(self.wcs_coordinates[dim]) c = Coordinates(cs) return c
[docs] def get_data(self, coordinates, coordinates_index): """{get_data} Raises ------ Exception Raises this if there is a network error or required dependencies are not installed. """ output = self.create_output_array(coordinates) dotime = "time" in self.wcs_coordinates.dims if "time" in coordinates.dims and dotime: sd = np.timedelta64(0, "s") times = [str(t + sd) for t in coordinates["time"].coordinates] else: times = [""] if len(times) > 1: for i, time in enumerate(times): url = ( self.source + "?" + self._get_data_qs.format( version=self.version, layer=self.layer_name, w=min(coordinates["lon"].area_bounds), e=max(coordinates["lon"].area_bounds), s=min(coordinates["lat"].area_bounds), n=max(coordinates["lat"].area_bounds), width=coordinates["lon"].size, height=coordinates["lat"].size, time=time, crs=self.crs, ) ) if not dotime: url = url.replace("&TIME=", "") if requests is not None: data = requests.get(url) if data.status_code != 200: raise Exception("Could not get data from WCS server:" + url) io = BytesIO(bytearray(data.content)) content = data.content # TODO: remove support urllib3 - requests is sufficient elif urllib3 is not None: if certifi is not None: http = urllib3.PoolManager(ca_certs=certifi.where()) else: http = urllib3.PoolManager() r = http.request("GET", url) if r.status != 200: raise Exception("Could not get capabilities from WCS server:" + url) content = r.data io = BytesIO(bytearray(r.data)) else: raise Exception("Do not have a URL request library to get WCS data.") try: try: # This works with rasterio v1.0a8 or greater, but not on python 2 with rasterio.open(io) as dataset: output.data[i, ...] = dataset.read() except Exception as e: # Probably python 2 print(e) tmppath = os.path.join(settings["DISK_CACHE_DIR"], "wcs_temp.tiff") if not os.path.exists(os.path.split(tmppath)[0]): os.makedirs(os.path.split(tmppath)[0]) # TODO: close tmppath? os does this on remove? open(tmppath, "wb").write(content) with rasterio.open(tmppath) as dataset: output.data[i, ...] = dataset.read() os.remove(tmppath) # Clean up except ImportError: # Writing the data to a temporary tiff and reading it from there is hacky # However reading directly from r.data or io doesn't work # Should improve in the future open("temp.tiff", "wb").write(r.data) output.data[i, ...] = RasterToNumPyArray("temp.tiff") else: time = times[0] url = ( self.source + "?" + self._get_data_qs.format( version=self.version, layer=self.layer_name, w=min(coordinates["lon"].area_bounds), e=max(coordinates["lon"].area_bounds), s=min(coordinates["lat"].area_bounds), n=max(coordinates["lat"].area_bounds), width=coordinates["lon"].size, height=coordinates["lat"].size, time=time, crs=self.crs, ) ) if not dotime: url = url.replace("&TIME=", "") if requests is not None: data = requests.get(url) if data.status_code != 200: raise Exception("Could not get data from WCS server:" + url) io = BytesIO(bytearray(data.content)) content = data.content # TODO: remove support urllib3 - requests is sufficient elif urllib3 is not None: if certifi is not None: http = urllib3.PoolManager(ca_certs=certifi.where()) else: http = urllib3.PoolManager() r = http.request("GET", url) if r.status != 200: raise Exception("Could not get capabilities from WCS server:" + url) content = r.data io = BytesIO(bytearray(r.data)) else: raise Exception("Do not have a URL request library to get WCS data.") try: try: # This works with rasterio v1.0a8 or greater, but not on python 2 with rasterio.open(io) as dataset: if dotime: output.data[0, ...] = dataset.read() else: output.data[:] = dataset.read() except Exception as e: # Probably python 2 print(e) tmppath = os.path.join(settings["DISK_CACHE_DIR"], "wcs_temp.tiff") if not os.path.exists(os.path.split(tmppath)[0]): os.makedirs(os.path.split(tmppath)[0]) open(tmppath, "wb").write(content) with rasterio.open(tmppath) as dataset: output.data[:] = dataset.read() os.remove(tmppath) # Clean up except ImportError: # Writing the data to a temporary tiff and reading it from there is hacky # However reading directly from r.data or io doesn't work # Should improve in the future open("temp.tiff", "wb").write(r.data) try: output.data[:] = RasterToNumPyArray("temp.tiff") except: raise Exception("Rasterio or Arcpy not available to read WCS feed.") if not coordinates["lat"].is_descending: if dotime: output.data[:] = output.data[:, ::-1, :] else: output.data[:] = output.data[::-1, :] return output
@property def base_ref(self): """Summary Returns ------- TYPE Description """ return self.layer_name.rsplit(".", 1)[1]
[docs]class ReprojectedSource(DataSource): """Create a DataSource with a different resolution from another Node. This can be used to bilinearly interpolated a dataset after averaging over a larger area. Attributes ---------- source : Node The source node source_interpolation : str Type of interpolation method to use for the source node reprojected_coordinates : Coordinates Coordinates where the source node should be evaluated. """ source = NodeTrait().tag(readonly=True) # node attrs source_interpolation = interpolation_trait().tag(attr=True) reprojected_coordinates = tl.Instance(Coordinates).tag(attr=True) def _first_init(self, **kwargs): if "reprojected_coordinates" in kwargs: if isinstance(kwargs["reprojected_coordinates"], dict): kwargs["reprojected_coordinates"] = Coordinates.from_definition(kwargs["reprojected_coordinates"]) elif isinstance(kwargs["reprojected_coordinates"], string_types): kwargs["reprojected_coordinates"] = Coordinates.from_json(kwargs["reprojected_coordinates"]) return super(ReprojectedSource, self)._first_init(**kwargs)
[docs] @common_doc(COMMON_DATA_DOC) def get_native_coordinates(self): """{get_native_coordinates} """ if isinstance(self.source, DataSource): sc = self.source.native_coordinates else: # Otherwise we cannot guarantee that native_coordinates exist sc = self.reprojected_coordinates rc = self.reprojected_coordinates coords = [rc[dim] if dim in rc.dims else sc[dim] for dim in sc.dims] return Coordinates(coords)
[docs] @common_doc(COMMON_DATA_DOC) def get_data(self, coordinates, coordinates_index): """{get_data} """ if hasattr(self.source, "interpolation") and self.source_interpolation is not None: si = self.source.interpolation self.source.interpolation = self.source_interpolation elif self.source_interpolation is not None: _logger.warning( "ReprojectedSource cannot set the 'source_interpolation'" " since self.source does not have an 'interpolation' " " attribute. \n type(self.source): %s\nself.source: " % (str(type(self.source)), str(self.source)) ) data = self.source.eval(coordinates) if hasattr(self.source, "interpolation") and self.source_interpolation is not None: self.source.interpolation = si # The following is needed in case the source is an algorithm # or compositor node that doesn't have all the dimensions of # the reprojected coordinates # TODO: What if data has coordinates that reprojected_coordinates doesn't have keep_dims = list(data.coords.keys()) drop_dims = [d for d in coordinates.dims if d not in keep_dims] coordinates.drop(drop_dims) return data
@property def base_ref(self): """Summary Returns ------- TYPE Description """ return "{}_reprojected".format(self.source.base_ref)
@common_doc(COMMON_DATA_DOC) class Dataset(DataSource): """Create a DataSource node using xarray.open_dataset. Attributes ---------- datakey : str The 'key' for the data to be retrieved from the file. Datasource may have multiple keys, so this key determines which variable is returned from the source. dataset : xarray.Dataset, optional The xarray dataset from which to retrieve data. If not specified, will be automatically created from the 'source' native_coordinates : Coordinates {native_coordinates} source : str Path to the data source extra_dim : dict In cases where the data contain dimensions other than ['lat', 'lon', 'time', 'alt'], these dimensions need to be selected. For example, if the data contains ['lat', 'lon', 'channel'], the second channel can be selected using `extra_dim=dict(channel=1)` """ source = tl.Any(allow_none=True).tag(readonly=True) dataset = tl.Instance(xr.Dataset).tag(readonly=True) # node attrs extra_dim = tl.Dict({}).tag(attr=True) datakey = tl.Unicode().tag(attr=True) @tl.default("dataset") def _dataset_default(self): return xr.open_dataset(self.source) @property @common_doc(COMMON_DATA_DOC) def native_coordinates(self): """{native_coordinates} """ # we have to remove any dimensions not in 'lat', 'lon', 'time', 'alt' for the 'get_data' machinery to work properly coords = self.dataset[self.datakey].coords crds = [] dims = [] for d in coords.dims: if d not in ["lat", "lon", "time", "alt"]: continue crds.append(coords[d].data) dims.append(d) return Coordinates(crds, dims) def get_data(self, coordinates, coordinates_index): return self.create_output_array(coordinates, self.dataset[self.datakey][self.extra_dim].data[coordinates_index]) @property def keys(self): """The list of available keys from the xarray dataset. Returns ------- List The list of available keys from the xarray dataset. Any of these keys can be set as self.datakey. """ return list(self.dataset.keys())