Source code for mstc.processing.operation

"""Higher order operations initialized with a component or components."""
from functools import reduce

from .core import MultipleOperation, SingleOperation


[docs]class Compose(MultipleOperation): """Implement a pipeline to execute a sequence of components. Propagating attributes."""
[docs] def __init__(self, components, **kwargs): """ Initialize a pipeline. Args: components (iterable): an iterable containing components. kwargs (dict): arguments to pass to Encoder as attributes. """ super(Compose, self).__init__( components=components, attributes=kwargs )
[docs] def __call__(self, an_object): """ Execute a composition of components. Args: an_object (object): an input for the composition. Returns: a xr.DataArray or iterable of xr.DataArray generated from the composition. """ attributes = self.attributes # propagate attributes for component in self.components: component.attributes.update(attributes) an_object = component(an_object) attributes.update(component.attributes) # after call return an_object
[docs]class Broadcast(MultipleOperation): """Broadcast an input using multiple components."""
[docs] def __init__(self, components, **kwargs): """ Initialize the operation. Args: components (iterable): an iterable containing components. kwargs (dict): arguments to pass to Brancher as attributes. """ super(Broadcast, self).__init__( components=components, attributes=kwargs )
[docs] def __call__(self, an_object): """ Broadcast an object into an iterable of xr.DataArrays using multiple components. Args: an_object (object): an object. Returns: an iterable of xr.DataArrays. """ operation_attributes = self.attributes.copy() # collect attributes for component in self.components: self.attributes.update(component.attributes) component.attributes.update(operation_attributes) yield component(an_object)
[docs]class BroadcastMap(MultipleOperation): """Broadcast input to multiple Map components initialized on the fly."""
[docs] def __init__(self, components, **kwargs): """ Initialize the operation. Args: components (iterable): an iterable containing components. kwargs (dict): arguments to pass to Brancher as attributes. """ super(BroadcastMap, self).__init__( components=components, attributes=kwargs )
[docs] def __call__(self, an_iterable): """ Apply each object to all components. Args: an_iterable (iterable): an iterable of objects. Returns: an iterable of xr.DataArrays. """ an_iterable = list(an_iterable) operation_attributes = self.attributes.copy() # collect attributes for component in self.components: self.attributes.update(component.attributes) component.attributes.update(operation_attributes) for an_object in Map(component, **operation_attributes)(an_iterable): # noqa yield an_object
[docs]class ZipMap(MultipleOperation): """Map component of an iterable to respective xr.DataArray of an iterable. """
[docs] def __init__(self, components, **kwargs): """ Initialize the zip. Args: components (iterable): an iterable containing components. kwargs (dict): arguments to pass to components as attributes. """ super(ZipMap, self).__init__( components=components, attributes=kwargs )
[docs] def __call__(self, an_iterable): """ Encoding an iterable to an iterable of xr.DataArrays, the attributes are added to xr.DataArray. Args: an_iterable (iterable): an_iterable. Returns: an iterable of xr.DataArrays. """ operation_attributes = self.attributes.copy() # collect attributes for component, an_object in zip(self.components, an_iterable): self.attributes.update(component.attributes) component.attributes.update(operation_attributes) yield component(an_object)
[docs]class Map(SingleOperation): """Apply component to all objects in iterable"""
[docs] def __init__(self, component, **kwargs): """ Initialize the reduction. Args: component (Component): a component accepting iterable. kwargs (dict): arguments to pass to component as attributes. """ super(Map, self).__init__( component=component, attributes=kwargs )
[docs] def __call__(self, an_iterable): """ Map the component over an iterable using the standard map. Args: an_iterable (iterable): an iterable of objects. Returns: a map object (iterable). """ return map(self.component.__call__, an_iterable)
[docs]class Reduce(SingleOperation): """Apply component with iterable input with single returned object."""
[docs] def __init__(self, component, **kwargs): """ Initialize the reduction. Args: component (Component): a component accepting iterable. kwargs (dict): arguments to pass to component as attributes. """ super(Reduce, self).__init__( component=component, attributes=kwargs )
[docs] def __call__(self, an_iterable): """ Reduce an iterable to a single object using the standard reduce. Args: an_iterable (iterable): an iterable of objects. Returns: an object. """ return reduce(self.component.__call__, an_iterable)