Source code for podpac.core.node

"""
Node Summary
"""

from __future__ import division, print_function, absolute_import

import re
import functools
import json
import inspect
import importlib
import warnings
from collections import OrderedDict
from copy import deepcopy
import logging

import numpy as np
import traitlets as tl
import six

import podpac
from podpac.core.settings import settings
from podpac.core.units import ureg, UnitsDataArray
from podpac.core.utils import common_doc
from podpac.core.utils import JSONEncoder
from podpac.core.utils import cached_property
from podpac.core.utils import trait_is_defined
from podpac.core.utils import _get_query_params_from_url, _get_from_url, _get_param
from podpac.core.utils import probe_node
from podpac.core.utils import NodeTrait
from podpac.core.utils import hash_alg
from podpac.core.coordinates import Coordinates
from podpac.core.style import Style
from podpac.core.cache import CacheCtrl, get_default_cache_ctrl, make_cache_ctrl, S3CacheStore, DiskCacheStore
from podpac.core.managers.multi_threading import thread_manager

_logger = logging.getLogger(__name__)

COMMON_NODE_DOC = {
    "requested_coordinates": """The set of coordinates requested by a user. The Node will be evaluated using these coordinates.""",
    "eval_output": """Default is None. Optional input array used to store the output data. When supplied, the node will not
            allocate its own memory for the output array. This array needs to have the correct dimensions,
            coordinates, and coordinate reference system.""",
    "eval_selector": """The selector function is an optimization that enables nodes to only select data needed by an interpolator.
            It returns a new Coordinates object, and an index object that indexes into the `coordinates` parameter
            If not provided, the Coordinates.intersect() method will be used instead.""",
    "eval_return": """
        :class:`podpac.UnitsDataArray`
            Unit-aware xarray DataArray containing the results of the node evaluation.
        """,
    "hash_return": "A unique hash capturing the coordinates and parameters used to evaluate the node. ",
    "outdir": "Optional output directory. Uses :attr:`podpac.settings[.cache_path` by default",
    "definition_return": """
        OrderedDict
            Dictionary containing the location of the Node, the name of the plugin (if required), as well as any
            parameters and attributes that were tagged by children.
        """,
    "arr_init_type": """How to initialize the array. Options are:
                nan: uses np.full(..., np.nan) (Default option)
                empty: uses np.empty
                zeros: uses np.zeros()
                ones: uses np.ones
                full: uses np.full(..., fillval)
                data: uses the fillval as the input array
        """,
    "arr_fillval": "used if init_type=='full' or 'data', default = 0",
    "arr_style": "The style to use for plotting. Uses self.style by default",
    "arr_no_style": "Default is False. If True, self.style will not be assigned to arr.attr['layer_style']",
    "arr_shape": "Shape of array. Uses self.shape by default.",
    "arr_coords": "Input to UnitsDataArray (i.e. an xarray coords dictionary/list)",
    "arr_dims": "Input to UnitsDataArray (i.e. an xarray dims list of strings)",
    "arr_units": "Default is self.units The Units for the data contained in the DataArray.",
    "arr_dtype": "Default is np.float. Datatype used by default",
    "arr_kwargs": "Dictioary of any additional keyword arguments that will be passed to UnitsDataArray.",
    "arr_return": """
        :class:`podpac.UnitsDataArray`
            Unit-aware xarray DataArray of the desired size initialized using the method specified.
        """,
}

COMMON_DOC = COMMON_NODE_DOC.copy()


[docs]class NodeException(Exception): """Base class for exceptions when using podpac nodes""" pass
class NodeDefinitionError(NodeException): """Raised node definition errors, such as when the definition is circular or is not yet unavailable.""" pass
[docs]@common_doc(COMMON_DOC) class Node(tl.HasTraits): """The base class for all Nodes, which defines the common interface for everything. Attributes ---------- cache_output: bool Should the node's output be cached? If not provided or None, uses default based on settings (CACHE_NODE_OUTPUT_DEFAULT for general Nodes, and CACHE_DATASOURCE_OUTPUT_DEFAULT for DataSource nodes). If True, outputs will be cached and retrieved from cache. If False, outputs will not be cached OR retrieved from cache (even if they exist in cache). force_eval: bool Default is False. Should the node's cached output be updated from the source data? If True it ignores the cache when computing outputs but puts results into the cache (thereby updating the cache) cache_ctrl: :class:`podpac.core.cache.cache.CacheCtrl` Class that controls caching. If not provided, uses default based on settings. dtype : type The numpy datatype of the output. Currently only ``float`` is supported. style : :class:`podpac.Style` Object discribing how the output of a node should be displayed. This attribute is planned for deprecation in the future. units : str The units of the output data. Must be pint compatible. outputs : list For multiple-output nodes, the names of the outputs. Default is ``None`` for standard nodes. output : str For multiple-output nodes only, specifies a particular output to evaluate, if desired. Must be one of ``outputs``. Notes ----- Additional attributes are available for debugging after evaluation, including: * ``_requested_coordinates``: the requested coordinates of the most recent call to eval * ``_output``: the output of the most recent call to eval * ``_from_cache``: whether the most recent call to eval used the cache * ``_multi_threaded``: whether the most recent call to eval was executed using multiple threads """ outputs = tl.List(trait=tl.Unicode(), allow_none=True).tag(attr=True) output = tl.Unicode(default_value=None, allow_none=True).tag(attr=True) units = tl.Unicode(default_value=None, allow_none=True).tag(attr=True, hidden=True) style = tl.Instance(Style) dtype = tl.Enum([float], default_value=float) cache_output = tl.Bool() force_eval = tl.Bool(False) cache_ctrl = tl.Instance(CacheCtrl, allow_none=True) # list of attribute names, used by __repr__ and __str__ to display minimal info about the node # e.g. data sources use ['source'] _repr_keys = [] @tl.default("outputs") def _default_outputs(self): return None @tl.validate("output") def _validate_output(self, d): if d["value"] is not None: if self.outputs is None: raise TypeError("Invalid output '%s' (output must be None for single-output nodes)." % self.output) if d["value"] not in self.outputs: raise ValueError("Invalid output '%s' (available outputs are %s)" % (self.output, self.outputs)) return d["value"] @tl.default("style") def _default_style(self): return Style() @tl.validate("units") def _validate_units(self, d): ureg.Unit(d["value"]) # will throw an exception if this is not a valid pint Unit return d["value"] @tl.default("cache_output") def _cache_output_default(self): return settings["CACHE_NODE_OUTPUT_DEFAULT"] @tl.default("cache_ctrl") def _cache_ctrl_default(self): return get_default_cache_ctrl() # debugging _requested_coordinates = tl.Instance(Coordinates, allow_none=True) _output = tl.Instance(UnitsDataArray, allow_none=True) _from_cache = tl.Bool(allow_none=True, default_value=None) # Flag that is True if the Node was run multi-threaded, or None if the question doesn't apply _multi_threaded = tl.Bool(allow_none=True, default_value=None) # util _definition_guard = False _traits_initialized_guard = False
[docs] def __init__(self, **kwargs): """Do not overwrite me""" # Shortcut for users to make setting the cache_ctrl simpler: if "cache_ctrl" in kwargs and isinstance(kwargs["cache_ctrl"], list): kwargs["cache_ctrl"] = make_cache_ctrl(kwargs["cache_ctrl"]) tkwargs = self._first_init(**kwargs) # make tagged "readonly" and "attr" traits read_only, and set them using set_trait # NOTE: The set_trait is required because this sets the traits read_only at the *class* level; # on subsequent initializations, they will already be read_only. with self.hold_trait_notifications(): for name, trait in self.traits().items(): if settings["DEBUG"]: trait.read_only = False elif trait.metadata.get("readonly") or trait.metadata.get("attr"): if name in tkwargs: self.set_trait(name, tkwargs.pop(name)) trait.read_only = True # Call traitlets constructor super(Node, self).__init__(**tkwargs) self._traits_initialized_guard = True self.init()
def _first_init(self, **kwargs): """Only overwrite me if absolutely necessary Parameters ---------- **kwargs Keyword arguments provided by user when class was instantiated. Returns ------- dict Keyword arguments that will be passed to the standard intialization function. """ return kwargs
[docs] def init(self): """Overwrite this method if a node needs to do any additional initialization after the standard initialization.""" pass
@property def attrs(self): """List of node attributes""" return [name for name in self.traits() if self.trait_metadata(name, "attr")] @property def _repr_info(self): keys = self._repr_keys[:] if self.trait_is_defined("output") and self.output is not None: if "output" not in keys: keys.append("output") elif self.trait_is_defined("outputs") and self.outputs is not None: if "outputs" not in keys: keys.append("outputs") return ", ".join("%s=%s" % (key, repr(getattr(self, key))) for key in keys) def __repr__(self): return "<%s(%s)>" % (self.__class__.__name__, self._repr_info) def __str__(self): return "<%s(%s) attrs: %s>" % (self.__class__.__name__, self._repr_info, ", ".join(self.attrs))
[docs] @common_doc(COMMON_DOC) def eval(self, coordinates, **kwargs): """ Evaluate the node at the given coordinates. Parameters ---------- coordinates : podpac.Coordinates {requested_coordinates} **kwargs: **dict Additional key-word arguments passed down the node pipelines, used internally Returns ------- output : {eval_return} """ output = kwargs.get("output", None) # check crs compatibility if output is not None and "crs" in output.attrs and output.attrs["crs"] != coordinates.crs: raise ValueError( "Output coordinate reference system ({}) does not match".format(output.crs) + "request Coordinates coordinate reference system ({})".format(coordinates.crs) ) if settings["DEBUG"]: self._requested_coordinates = coordinates item = "output" # get standardized coordinates for caching cache_coordinates = coordinates.transpose(*sorted(coordinates.dims)).simplify() if not self.force_eval and self.cache_output and self.has_cache(item, cache_coordinates): data = self.get_cache(item, cache_coordinates) if output is not None: order = [dim for dim in output.dims if dim not in data.dims] + list(data.dims) output.transpose(*order)[:] = data self._from_cache = True else: data = self._eval(coordinates, **kwargs) if self.cache_output: self.put_cache(data, item, cache_coordinates) self._from_cache = False # extract single output, if necessary # subclasses should extract single outputs themselves if possible, but this provides a backup if "output" in data.dims and self.output is not None: data = data.sel(output=self.output) # transpose data to match the dims order of the requested coordinates order = [dim for dim in coordinates.xdims if dim in data.dims] if "output" in data.dims: order.append("output") data = data.part_transpose(order) if settings["DEBUG"]: self._output = data # Add style information data.attrs["layer_style"] = self.style if self.units is not None: data.attrs["units"] = self.units # Add crs if it is missing if "crs" not in data.attrs: data.attrs["crs"] = coordinates.crs return data
def _eval(self, coordinates, output=None, _selector=None): raise NotImplementedError
[docs] def eval_group(self, group): """ Evaluate the node for each of the coordinates in the group. Parameters ---------- group : podpac.CoordinatesGroup Group of coordinates to evaluate. Returns ------- outputs : list evaluation output, list of UnitsDataArray objects """ return [self.eval(coords) for coords in group]
[docs] def find_coordinates(self): """ Get all available coordinates for the Node. Implemented in child classes. Returns ------- coord_list : list list of available coordinates (Coordinates objects) """ raise NotImplementedError
[docs] def get_bounds(self, crs="default"): """Get the full available coordinate bounds for the Node. Arguments --------- crs : str Desired CRS for the bounds. If not specified, the default CRS in the podpac settings is used. Optional. Returns ------- bounds : dict Bounds for each dimension. Keys are dimension names and values are tuples (min, max). crs : str The CRS for the bounds. """ if crs == "default": crs = podpac.settings["DEFAULT_CRS"] bounds = {} for coords in self.find_coordinates(): ct = coords.transform(crs) for dim, (lo, hi) in ct.bounds.items(): if dim not in bounds: bounds[dim] = (lo, hi) else: bounds[dim] = (min(lo, bounds[dim][0]), max(hi, bounds[dim][1])) return bounds, crs
[docs] @common_doc(COMMON_DOC) def create_output_array(self, coords, data=np.nan, attrs=None, outputs=None, **kwargs): """ Initialize an output data array Parameters ---------- coords : podpac.Coordinates {arr_coords} data : None, number, or array-like (optional) {arr_init_type} attrs : dict Attributes to add to output -- UnitsDataArray.create uses the 'crs' portion contained in here outputs : list[string], optional Default is self.outputs. List of strings listing the outputs **kwargs {arr_kwargs} Returns ------- {arr_return} """ if attrs is None: attrs = {} if "layer_style" not in attrs: attrs["layer_style"] = self.style if "crs" not in attrs: attrs["crs"] = coords.crs if "units" not in attrs and self.units is not None: attrs["units"] = ureg.Unit(self.units) if "geotransform" not in attrs: try: attrs["geotransform"] = coords.geotransform except (TypeError, AttributeError): pass if outputs is None: outputs = self.outputs if outputs == []: outputs = None return UnitsDataArray.create(coords, data=data, outputs=outputs, dtype=self.dtype, attrs=attrs, **kwargs)
[docs] def trait_is_defined(self, name): return trait_is_defined(self, name)
[docs] def probe(self, lat=None, lon=None, time=None, alt=None, crs=None): """Evaluates every part of a node / pipeline at a point and records which nodes are actively being used. Parameters ------------ lat : float, optional Default is None. The latitude location lon : float, optional Default is None. The longitude location time : float, np.datetime64, optional Default is None. The time alt : float, optional Default is None. The altitude location crs : str, optional Default is None. The CRS of the request. Returns --------- dict A dictionary that contains the following for each node: * "active": bool, # If the node is being used or not * "value": float, # The value of the node evaluated at that point * "inputs": list, # List of names of input nodes (based on definition) * "name": str, # node.style.name or self.base_ref if the style name is empty * "node_hash": str, # The node's hash """ return probe_node(self, lat, lon, time, alt, crs)
# ----------------------------------------------------------------------------------------------------------------- # Serialization # ----------------------------------------------------------------------------------------------------------------- @property def base_ref(self): """ Default reference/name in node definitions Returns ------- str Name of the node in node definitions """ return self.__class__.__name__ @property def _base_definition(self): d = OrderedDict() # node and plugin if self.__module__ == "podpac": d["node"] = self.__class__.__name__ elif self.__module__.startswith("podpac."): _, module = self.__module__.split(".", 1) d["node"] = "%s.%s" % (module, self.__class__.__name__) else: d["plugin"] = self.__module__ d["node"] = self.__class__.__name__ # attrs/inputs attrs = {} inputs = {} for name in self.attrs: value = getattr(self, name) if ( isinstance(value, Node) or ( isinstance(value, (list, tuple, np.ndarray)) and (len(value) > 0) and all(isinstance(elem, Node) for elem in value) ) or ( isinstance(value, dict) and (len(value) > 0) and all(isinstance(elem, Node) for elem in value.values()) ) ): inputs[name] = value else: attrs[name] = value if "units" in attrs and attrs["units"] is None: del attrs["units"] if "outputs" in attrs and attrs["outputs"] is None: del attrs["outputs"] if "output" in attrs and attrs["output"] is None: del attrs["output"] if attrs: d["attrs"] = attrs if inputs: d["inputs"] = inputs # style if self.style.definition: d["style"] = self.style.definition return d @cached_property def definition(self): """ Full node definition. Returns ------- OrderedDict Dictionary-formatted node definition. """ if getattr(self, "_definition_guard", False): raise NodeDefinitionError("node definition has a circular dependency") if not getattr(self, "_traits_initialized_guard", False): raise NodeDefinitionError("node is not yet fully initialized") try: self._definition_guard = True nodes = [] refs = [] definitions = [] def add_node(node): for ref, n in zip(refs, nodes): if node == n: return ref # get base definition d = node._base_definition if "inputs" in d: # sort and shallow copy d["inputs"] = OrderedDict([(key, d["inputs"][key]) for key in sorted(d["inputs"].keys())]) # replace nodes with references, adding nodes depth first for key, value in d["inputs"].items(): if isinstance(value, Node): d["inputs"][key] = add_node(value) elif isinstance(value, (list, tuple, np.ndarray)): d["inputs"][key] = [add_node(item) for item in value] elif isinstance(value, dict): d["inputs"][key] = {k: add_node(v) for k, v in value.items()} else: raise TypeError("Invalid input '%s' of type '%s': %s" % (key, type(value))) if "attrs" in d: # sort and shallow copy d["attrs"] = OrderedDict([(key, d["attrs"][key]) for key in sorted(d["attrs"].keys())]) # get base ref and then ensure it is unique ref = node.base_ref while ref in refs: if re.search("_[1-9][0-9]*$", ref): ref, i = ref.rsplit("_", 1) i = int(i) else: i = 0 ref = "%s_%d" % (ref, i + 1) nodes.append(node) refs.append(ref) definitions.append(d) return ref # add top level node add_node(self) # finalize, verify serializable, and return definition = OrderedDict(zip(refs, definitions)) definition["podpac_version"] = podpac.__version__ json.dumps(definition, cls=JSONEncoder) return definition finally: self._definition_guard = False @property def json(self): """Definition for this node in JSON format.""" return json.dumps(self.definition, separators=(",", ":"), cls=JSONEncoder) @property def json_pretty(self): """Definition for this node in JSON format, with indentation suitable for display.""" return json.dumps(self.definition, indent=4, cls=JSONEncoder) @cached_property def hash(self): """hash for this node, used in caching and to determine equality.""" # deepcopy so that the cached definition property is not modified by the deletes below d = deepcopy(self.definition) # omit version if "podpac_version" in d: del d["podpac_version"] # omit style in every node for k in d: if "style" in d[k]: del d[k]["style"] s = json.dumps(d, separators=(",", ":"), cls=JSONEncoder) return hash_alg(s.encode("utf-8")).hexdigest()
[docs] def save(self, path): """ Write node to file. Arguments --------- path : str path to write to See Also -------- load : load podpac Node from file. """ with open(path, "w") as f: json.dump(self.definition, f, separators=(",", ":"), cls=JSONEncoder)
def __eq__(self, other): if not isinstance(other, Node): return False return self.hash == other.hash def __ne__(self, other): if not isinstance(other, Node): return True return self.hash != other.hash # ----------------------------------------------------------------------------------------------------------------- # Caching Interface # -----------------------------------------------------------------------------------------------------------------
[docs] def get_cache(self, key, coordinates=None): """ Get cached data for this node. Parameters ---------- key : str Key for the cached data, e.g. 'output' coordinates : podpac.Coordinates, optional Coordinates for which the cached data should be retrieved. Omit for coordinate-independent data. Returns ------- data : any The cached data. Raises ------ NodeException Cached data not found. """ try: self.definition except NodeDefinitionError as e: raise NodeException("Cache unavailable, %s (key='%s')" % (e.args[0], key)) if self.cache_ctrl is None or not self.has_cache(key, coordinates=coordinates): raise NodeException("cached data not found for key '%s' and coordinates %s" % (key, coordinates)) return self.cache_ctrl.get(self, key, coordinates=coordinates)
[docs] def put_cache(self, data, key, coordinates=None, expires=None, overwrite=True): """ Cache data for this node. Parameters ---------- data : any The data to cache. key : str Unique key for the data, e.g. 'output' coordinates : podpac.Coordinates, optional Coordinates that the cached data depends on. Omit for coordinate-independent data. expires : float, datetime, timedelta Expiration date. If a timedelta is supplied, the expiration date will be calculated from the current time. overwrite : bool, optional Overwrite existing data, default True. Raises ------ NodeException Cached data already exists (and overwrite is False) """ try: self.definition except NodeDefinitionError as e: raise NodeException("Cache unavailable, %s (key='%s')" % (e.args[0], key)) if self.cache_ctrl is None: return if not overwrite and self.has_cache(key, coordinates=coordinates): raise NodeException("Cached data already exists for key '%s' and coordinates %s" % (key, coordinates)) with thread_manager.cache_lock: self.cache_ctrl.put(self, data, key, coordinates=coordinates, expires=expires, update=overwrite)
[docs] def has_cache(self, key, coordinates=None): """ Check for cached data for this node. Parameters ---------- key : str Key for the cached data, e.g. 'output' coordinates : podpac.Coordinates, optional Coordinates for which the cached data should be retrieved. Omit for coordinate-independent data. Returns ------- bool True if there is cached data for this node, key, and coordinates. """ try: self.definition except NodeDefinitionError as e: raise NodeException("Cache unavailable, %s (key='%s')" % (e.args[0], key)) if self.cache_ctrl is None: return False with thread_manager.cache_lock: return self.cache_ctrl.has(self, key, coordinates=coordinates)
[docs] def rem_cache(self, key, coordinates=None, mode="all"): """ Clear cached data for this node. Parameters ---------- key : str Delete cached objects with this key. If `'*'`, cached data is deleted for all keys. coordinates : podpac.Coordinates, str, optional Default is None. Delete cached objects for these coordinates. If `'*'`, cached data is deleted for all coordinates, including coordinate-independent data. If None, will only affect coordinate-independent data. mode: str, optional Specify which cache stores are affected. Default 'all'. See Also --------- `podpac.core.cache.cache.CacheCtrl.clear` to remove ALL cache for ALL nodes. """ try: self.definition except NodeDefinitionError as e: raise NodeException("Cache unavailable, %s (key='%s')" % (e.args[0], key)) if self.cache_ctrl is None: return self.cache_ctrl.rem(self, item=key, coordinates=coordinates, mode=mode)
# --------------------------------------------------------# # Class Methods (Deserialization) # --------------------------------------------------------#
[docs] @classmethod def from_definition(cls, definition): """ Create podpac Node from a dictionary definition. Arguments --------- d : dict node definition Returns ------- :class:`Node` podpac Node See Also -------- definition : node definition as a dictionary from_json : create podpac node from a JSON definition load : create a node from file """ if "podpac_version" in definition and definition["podpac_version"] != podpac.__version__: warnings.warn( "node definition version mismatch " "(this node was created with podpac version '%s', " "but your current podpac version is '%s')" % (definition["podpac_version"], podpac.__version__) ) if len(definition) == 0: raise ValueError("Invalid definition: definition cannot be empty.") # parse node definitions in order nodes = OrderedDict() for name, d in definition.items(): if name == "podpac_version": continue if "node" not in d: raise ValueError("Invalid definition for node '%s': 'node' property required" % name) # get node class module_root = d.get("plugin", "podpac") node_string = "%s.%s" % (module_root, d["node"]) module_name, node_name = node_string.rsplit(".", 1) try: module = importlib.import_module(module_name) except ImportError: raise ValueError("Invalid definition for node '%s': no module found '%s'" % (name, module_name)) try: node_class = getattr(module, node_name) except AttributeError: raise ValueError( "Invalid definition for node '%s': class '%s' not found in module '%s'" % (name, node_name, module_name) ) # parse and configure kwargs kwargs = {} for k, v in d.get("attrs", {}).items(): kwargs[k] = v for k, v in d.get("inputs", {}).items(): kwargs[k] = _lookup_input(nodes, name, v) for k, v in d.get("lookup_attrs", {}).items(): kwargs[k] = _lookup_attr(nodes, name, v) if "style" in d: style_class = getattr(node_class, "style", Style) if isinstance(style_class, tl.TraitType): # Now we actually have to look through the class to see # if there is a custom initializer for style for attr in dir(node_class): atr = getattr(node_class, attr) if not isinstance(atr, tl.traitlets.DefaultHandler) or atr.trait_name != "style": continue try: style_class = atr(node_class) except Exception as e: # print ("couldn't make style from class", e) try: style_class = atr(node_class()) except: # print ("couldn't make style from class instance", e) style_class = style_class.klass try: kwargs["style"] = style_class.from_definition(d["style"]) except Exception as e: kwargs["style"] = Style.from_definition(d["style"]) # print ("couldn't make style from inferred style class", e) for k in d: if k not in ["node", "inputs", "attrs", "lookup_attrs", "plugin", "style"]: raise ValueError("Invalid definition for node '%s': unexpected property '%s'" % (name, k)) nodes[name] = node_class(**kwargs) return list(nodes.values())[-1]
[docs] @classmethod def from_json(cls, s): """ Create podpac Node from a JSON definition. Arguments --------- s : str JSON-formatted node definition Returns ------- :class:`Node` podpac Node See Also -------- json : node definition as a JSON string load : create a node from file """ d = json.loads(s, object_pairs_hook=OrderedDict) return cls.from_definition(d)
[docs] @classmethod def load(cls, path): """ Create podpac Node from file. Arguments --------- path : str path to text file containing a JSON-formatted node definition Returns ------- :class:`Node` podpac Node See Also -------- save : save a node to file """ with open(path) as f: d = json.load(f, object_pairs_hook=OrderedDict) return cls.from_definition(d)
[docs] @classmethod def from_url(cls, url): """ Create podpac Node from a WMS/WCS request. Arguments --------- url : str, dict The raw WMS/WCS request url, or a dictionary of query parameters Returns ------- :class:`Node` A full Node with sub-nodes based on the definition of the node from the URL Notes ------- The request can specify the PODPAC node by four different mechanism: * Direct node name: PODPAC will look for an appropriate node in podpac.datalib * JSON definition passed using the 'PARAMS' query string: Need to specify the special LAYER/COVERAGE value of "%PARAMS%" * By pointing at the JSON definition retrievable with a http GET request: e.g. by setting LAYER/COVERAGE value to https://my-site.org/pipeline_definition.json * By pointing at the JSON definition retrievable from an S3 bucket that the user has access to: e.g by setting LAYER/COVERAGE value to s3://my-bucket-name/pipeline_definition.json """ query_params = _get_query_params_from_url(url) if _get_param(query_params, "SERVICE") == "WMS": layer = _get_param(query_params, "LAYERS") elif _get_param(query_params, "SERVICE") == "WCS": layer = _get_param(query_params, "COVERAGE") d = None if layer.startswith("https://"): d = _get_from_url(layer).json() elif layer.startswith("s3://"): parts = layer.split("/") bucket = parts[2] key = "/".join(parts[3:]) s3 = S3CacheStore(s3_bucket=bucket) s = s3._load(key) elif layer == "%PARAMS%": s = _get_param(query_params, "PARAMS") else: p = _get_param(query_params, "PARAMS") if p is None: p = "{}" if not isinstance(p, dict): p = json.loads(p) return cls.from_name_params(layer, p) if d is None: d = json.loads(s, object_pairs_hook=OrderedDict) return cls.from_definition(d)
[docs] @classmethod def from_name_params(cls, name, params=None): """ Create podpac Node from a WMS/WCS request. Arguments --------- name : str The name of the PODPAC Node / Layer params : dict, optional Default is None. Dictionary of parameters to modify node attributes, style, or completely/partially define the node. This dictionary can either be a `Node.definition` or `Node.definition['attrs']`. Node, the specified `name` always take precidence over anything defined in `params` (e.g. params['node'] won't be used). Returns ------- :class:`Node` A full Node with sub-nodes based on the definition of the node from the node name and parameters """ layer = name p = params d = None if p is None: p = {} definition = {} # If one of the special names are in the params list, then add params to the root layer if "node" in p or "plugin" in p or "style" in p or "attrs" in p: definition.update(p) else: definition["attrs"] = p definition.update({"node": layer}) # The user-specified node name ALWAYS takes precidence. d = OrderedDict({layer.replace(".", "-"): definition}) return cls.from_definition(d)
[docs] @classmethod def get_ui_spec(cls, help_as_html=False): """Get spec of node attributes for building a ui Parameters ---------- help_as_html : bool, optional Default is False. If True, the docstrings will be converted to html before storing in the spec. Returns ------- dict Spec for this node that is readily json-serializable """ filter = [] spec = {"help": cls.__doc__, "module": cls.__module__ + "." + cls.__name__, "attrs": {}, "style": {}} # Strip out starting spaces in the help text so that markdown parsing works correctly if spec["help"] is None: spec["help"] = "No help text to display." spec["help"] = spec["help"].replace("\n ", "\n") if help_as_html: from numpydoc.docscrape_sphinx import SphinxDocString from docutils.core import publish_string tmp = SphinxDocString(spec["help"]) tmp2 = publish_string(str(tmp), writer_name="html") slc = slice(tmp2.index(b'<div class="document">'), tmp2.index(b"</body>")) spec["help"] = tmp2[slc].decode() # find any default values that are defined by function with decorators # e.g. using @tl.default("trait_name") # def _default_trait_name(self): ... function_defaults = {} for attr in dir(cls): atr = getattr(cls, attr) if not isinstance(atr, tl.traitlets.DefaultHandler): continue try: try: def_val = atr(cls()) except: def_val = atr(cls) if isinstance(def_val, NodeTrait): def_val = def_val.name print("Changing Nodetrait to string") # if "NodeTrait" not in str(atr(cls)): function_defaults[atr.trait_name] = def_val except Exception: _logger.warning( "For node {}: Failed to generate default from function for trait {}".format( cls.__name__, atr.trait_name ) ) for attr in dir(cls): if attr in filter: continue attrt = getattr(cls, attr) if not isinstance(attrt, tl.TraitType): continue if not attrt.metadata.get("attr", False): continue type_ = attrt.__class__.__name__ try: schema = getattr(attrt, "_schema") except: schema = None type_extra = str(attrt) if type_ == "Union": type_ = [t.__class__.__name__ for t in attrt.trait_types] type_extra = "Union" elif type_ == "Instance": type_ = attrt.klass.__name__ if type_ == "Node": type_ = "NodeTrait" type_extra = attrt.klass elif type_ == "Dict" and schema is None: try: schema = { "key": getattr(attrt, "_key_trait").__class__.__name__, "value": getattr(attrt, "_value_trait").__class__.__name__, } except Exception as e: print("Could not find schema for", attrt, " of type", type_) schema = None required = attrt.metadata.get("required", False) hidden = attrt.metadata.get("hidden", False) if attr in function_defaults: default_val = function_defaults[attr] else: default_val = attrt.default() if not isinstance(type_extra, str): type_extra = str(type_extra) try: if np.isnan(default_val): default_val = "nan" except: pass if default_val == tl.Undefined: default_val = None spec["attrs"][attr] = { "type": type_, "type_str": type_extra, # May remove this if not needed "values": getattr(attrt, "values", None), "default": default_val, "help": attrt.help, "required": required, "hidden": hidden, "schema": schema, } try: # This returns the style_json = json.loads(cls().style.json) # load the style from the cls except: style_json = {} spec["style"] = style_json # this does not work, because node not created yet? """ I will manually define generic defaults here. Eventually we may want to dig into this and create node specific styling. This will have to be done under each node. But may be difficult to add style to each node? Example: podpac.core.algorithm.utility.SinCoords.Style ----> returns a tl.Instance BUT if I do: podpac.core.algorithm.utility.SinCoords().style.json ---> outputs style ERROR if no parenthesis are given. So how can this be done without instantiating the class? Will need to ask @MPU how to define a node specific style. """ # spec["style"] = { # "name": "?", # "units": "m", # "clim": [-1.0, 1.0], # "colormap": "jet", # "enumeration_legend": "?", # "enumeration_colors": "?", # "default_enumeration_legend": "unknown", # "default_enumeration_color": (0.2, 0.2, 0.2), # } spec.update(getattr(cls, "_ui_spec", {})) return spec
def _lookup_input(nodes, name, value): # containers if isinstance(value, list): return [_lookup_input(nodes, name, elem) for elem in value] if isinstance(value, dict): return {k: _lookup_input(nodes, name, v) for k, v in value.items()} # node reference if not isinstance(value, six.string_types): raise ValueError( "Invalid definition for node '%s': invalid reference '%s' of type '%s' in inputs" % (name, value, type(value)) ) if not value in nodes: raise ValueError( "Invalid definition for node '%s': reference to nonexistent node '%s' in inputs" % (name, value) ) node = nodes[value] # copy in debug mode if settings["DEBUG"]: node = deepcopy(node) return node def _lookup_attr(nodes, name, value): # containers if isinstance(value, list): return [_lookup_attr(nodes, name, elem) for elem in value] if isinstance(value, dict): return {_k: _lookup_attr(nodes, name, v) for k, v in value.items()} if not isinstance(value, six.string_types): raise ValueError( "Invalid definition for node '%s': invalid reference '%s' of type '%s' in lookup_attrs" % (name, value, type(value)) ) # node elems = value.split(".") if elems[0] not in nodes: raise ValueError( "Invalid definition for node '%s': reference to nonexistent node '%s' in lookup_attrs" % (name, elems[0]) ) # subattrs attr = nodes[elems[0]] for n in elems[1:]: if not hasattr(attr, n): raise ValueError( "Invalid definition for node '%s': reference to nonexistent attribute '%s' in lookup_attrs value '%s" % (name, n, value) ) attr = getattr(attr, n) # copy in debug mode if settings["DEBUG"]: attr = deepcopy(attr) return attr # --------------------------------------------------------# # Mixins # --------------------------------------------------------# class NoCacheMixin(tl.HasTraits): """Mixin to use no cache by default.""" cache_ctrl = tl.Instance(CacheCtrl, allow_none=True) @tl.default("cache_ctrl") def _cache_ctrl_default(self): return CacheCtrl([]) class DiskCacheMixin(tl.HasTraits): """Mixin to add disk caching to the Node by default.""" cache_ctrl = tl.Instance(CacheCtrl, allow_none=True) @tl.default("cache_ctrl") def _cache_ctrl_default(self): # get the default cache_ctrl and addd a disk cache store if necessary default_ctrl = get_default_cache_ctrl() stores = default_ctrl._cache_stores if not any(isinstance(store, DiskCacheStore) for store in default_ctrl._cache_stores): stores.append(DiskCacheStore()) return CacheCtrl(stores) # --------------------------------------------------------# # Decorators # --------------------------------------------------------#