Source code for deriva.core.datapath

"""Definitions and implementations for data-path expressions to query and manipulate (insert, update, delete)."""

from . import urlquote
import copy
from datetime import date
import itertools
import logging
import re
from requests import HTTPError
import warnings
from . import DEFAULT_HEADERS, ermrest_model as _erm

__all__ = ['DataPathException', 'Min', 'Max', 'Sum', 'Avg', 'Cnt', 'CntD', 'Array', 'ArrayD', 'Bin']

logger = logging.getLogger(__name__)
"""Logger for this module"""

_system_defaults = {'RID', 'RCT', 'RCB', 'RMT', 'RMB'}
"""Set of system default column names"""


def deprecated(f):
    """A simple 'deprecated' function decorator."""
    def wrapper(*args, **kwargs):
        warnings.warn("'%s' has been deprecated" % f.__name__, DeprecationWarning, stacklevel=2)
        return f(*args, **kwargs)
    return wrapper


def from_catalog(catalog):
    """Wraps an ErmrestCatalog object for use in datapath expressions.

    :param catalog: an ErmrestCatalog object
    :return: a datapath._CatalogWrapper object
    """
    return _CatalogWrapper(catalog)


def _isidentifier(a):
    """Tests if string is a valid python identifier.

    This function is intended for internal usage within this module.

    :param a: a string
    """
    if hasattr(a, 'isidentifier'):
        return a.isidentifier()
    else:
        return re.match("[_A-Za-z][_a-zA-Z0-9]*$", a) is not None


def _identifier_for_name(name, *reserveds):
    """Makes an identifier from a given name and disambiguates if it is reserved.

    1. replace invalid identifier characters with '_'
    2. prepend with '_' if first character is a digit
    3. append a disambiguating positive integer if it is reserved

    :param name: a string of any format
    :param *reserveds: iterable collections of reserved strings
    :return: a valid identifier string for the given name
    """
    assert len(name) > 0, 'empty strings are not allowed'

    # replace invalid characters with '_'s
    identifier = re.sub("[^_a-zA-Z0-9]", "_", name)

    # prepend with '_' is it starts with a digit
    if identifier[0].isdigit():
        identifier = '_' + identifier

    # append a disambiguating positive integer if it is reserved
    disambiguator = 1
    ambiguous = identifier
    while any(identifier in reserved for reserved in reserveds):
        identifier = ambiguous + str(disambiguator)
        disambiguator += 1

    return identifier


def _make_identifier_to_name_mapping(names, reserved):
    """Makes a dictionary of (valid) identifiers to (original) names.

    Try to favor the names that require the least modification:
    1. add all names that are valid identifiers and do not conflict with reserved names
    2. add all names that are valid identifiers but do conflict with reserved names by appending a disambiguator
    3. add an unambiguous identifier made from the name, when the name is not already a valid identifier

    :param names: iterable collection of strings
    :param reserved: iterable collection of reserved identifiers
    :return: a dictionary to map from identifier to name
    """
    reserved = set(reserved)
    assert all(_isidentifier(r) for r in reserved), 'all reserved names must be valid identifiers'

    mappings = {  # first, add all non-offending names
        name: name
        for name in names if _isidentifier(name) and name not in reserved
    }
    mappings.update({  # second, add all names that conflict with reserved strings
        name + '1': name
        for name in names if name in reserved and name + '1' not in mappings
    })
    invalid_names = set(names) - mappings.keys()

    # third, convert and disambiguate remaining names
    for name in invalid_names:
        mappings[_identifier_for_name(name, mappings.keys(), reserved)] = name

    return mappings


def _http_error_message(e):
    """Returns a formatted error message from the raw HTTPError.
    """
    return '\n'.join(e.response.text.splitlines()[1:]) + '\n' + str(e)


[docs]class DataPathException (Exception): """Exception in a datapath expression. """ def __init__(self, message, reason=None): super(DataPathException, self).__init__(message, reason) self.message = message self.reason = reason def __str__(self): return self.message
class _CatalogWrapper (object): """Wraps a Catalog for datapath expressions. """ def __init__(self, catalog): """Creates the _CatalogWrapper. :param catalog: ErmrestCatalog object """ super(_CatalogWrapper, self).__init__() self._wrapped_catalog = catalog self._wrapped_model = catalog.getCatalogModel() self.schemas = { k: _SchemaWrapper(self, v) for k, v in self._wrapped_model.schemas.items() } self._identifiers = _make_identifier_to_name_mapping( self.schemas.keys(), super(_CatalogWrapper, self).__dir__()) def __dir__(self): return itertools.chain( super(_CatalogWrapper, self).__dir__(), self._identifiers.keys() ) def __getattr__(self, a): if a in self._identifiers: return self.schemas[self._identifiers[a]] elif hasattr(super(_CatalogWrapper, self), a): return getattr(super(_CatalogWrapper, self), a) else: raise AttributeError("'%s' object for catalog '%s' has no attribute or schema '%s'" % (type(self).__name__, self._wrapped_catalog.catalog_id, a)) @classmethod def compose(cls, *paths): """Compose path fragments into a path. The root of any path fragment must be found in the table instances of the currently composed path from left to right, _but_ it does not have to be the current context (last table instance) of the last left hand path. Paths must not have overlapping table instances with the currently composed path from left to right, except for each subsequent path's root table instance which _must_ be defined in one of the left hand paths. No input path in 'paths' will be mutated. :param paths: instances of `DataPath` :return: a new `DataPath` instance composed from the 'paths' """ if not paths: raise ValueError("No input path(s) given") if not all(isinstance(path, DataPath) for path in paths): raise TypeError("Input 'paths' must be an instance of %s" % type(DataPath).__name__) base = copy.deepcopy(paths[0]) for path in paths[1:]: base.merge(path) return base class _SchemaWrapper (object): """Wraps a Schema for datapath expressions. """ def __init__(self, catalog, schema): """Creates the _SchemaWrapper. :param catalog: the catalog wrapper to which this schema wrapper belongs :param schema: the wrapped schema object """ super(_SchemaWrapper, self).__init__() self._catalog = catalog self._wrapped_schema = schema self._name = schema.name self.tables = { k: _TableWrapper(self, v) for k, v in schema.tables.items() } self._identifiers = _make_identifier_to_name_mapping( self.tables.keys(), super(_SchemaWrapper, self).__dir__()) def __dir__(self): return itertools.chain( super(_SchemaWrapper, self).__dir__(), self._identifiers.keys() ) def __getattr__(self, a): if a in self._identifiers: return self.tables[self._identifiers[a]] elif hasattr(super(_SchemaWrapper, self), a): return getattr(super(_SchemaWrapper, self), a) else: raise AttributeError("'%s' object for schema '%s' has no attribute or table '%s'" % (type(self).__name__, self._name, a)) @deprecated def describe(self): """Provides a description of the model element. :return: a user-friendly string representation of the model element. """ s = "_SchemaWrapper name: '%s'\nList of tables:\n" % self._name if len(self.tables) == 0: s += "none" else: s += "\n".join(" '%s'" % tname for tname in self.tables) return s @deprecated def _repr_html_(self): return self.describe() class DataPath (object): """Represents a datapath expression. """ def __init__(self, root): assert isinstance(root, _TableAlias) self._path_expression = _Root(root) self._root = root self._base_uri = root._schema._catalog._wrapped_catalog._server_uri self._table_instances = dict() # map of alias_name => _TableAlias object self._context = None self._identifiers = {} self._bind_table_instance(root) def __dir__(self): return itertools.chain( super(DataPath, self).__dir__(), self._identifiers.keys() ) def __getattr__(self, a): if a in self._identifiers: return self._table_instances[self._identifiers[a]] elif hasattr(super(DataPath, self), a): return getattr(super(DataPath, self), a) else: raise AttributeError("'%s' object has no attribute or table instance '%s'" % (type(self).__name__, a)) def __deepcopy__(self, memodict={}): cp = DataPath(copy.deepcopy(self._root, memo=memodict)) for alias in copy.deepcopy(self._table_instances, memo=memodict).values(): if alias != cp._root: cp._bind_table_instance(alias) cp._context = cp._table_instances[self._context._name] cp._path_expression = copy.deepcopy(self._path_expression, memo=memodict) assert not cp._table_instances.keys() - set(cp._identifiers) assert cp._table_instances.keys() == self._table_instances.keys() assert cp._identifiers.keys() == self._identifiers.keys() assert cp._root._name in cp._table_instances assert cp._root == cp._table_instances[cp._root._name] assert cp._root != self._root assert cp._root._name == self._root._name assert cp._context != self._context assert cp._context._name == self._context._name assert str(cp._path_expression) == str(self._path_expression) assert cp._path_expression != self._path_expression return cp @property def table_instances(self): """Collection of the table instances in this datapath expression.""" return self._table_instances @property def context(self): """Context (i.e., last bound table instance) of this datapath expression.""" return self._context @context.setter def context(self, value): """Updates the context of this datapath expression (must be a table instance bound to this expression).""" if not isinstance(value, _TableAlias): raise TypeError('context must be a table alias object') if value._name not in self._table_instances: raise ValueError('table alias must be bound in this path') if self._context != value: self._path_expression = _ResetContext(self._path_expression, value) self._context = value @property def uri(self): """The current URI serialization of this datapath expression.""" return self._base_uri + str(self._path_expression) def _contextualized_uri(self, context): """Returns a path uri for the specified context. :param context: a table instance that is bound to this path :return: string representation of the path uri """ assert isinstance(context, _TableAlias) assert context._name in self._table_instances if self._context != context: return self._base_uri + str(_ResetContext(self._path_expression, context)) else: return self.uri def _bind_table_instance(self, alias): """Binds a new table instance into this path. """ assert isinstance(alias, _TableAlias) alias._bind(self) self._table_instances[alias._name] = self._context = alias self._identifiers[_identifier_for_name(alias._name, self._identifiers.keys(), super(DataPath, self).__dir__())] = alias._name def delete(self): """Deletes the entity set referenced by the data path. """ try: path = str(self._path_expression) logger.debug("Deleting: {p}".format(p=path)) self._root._schema._catalog._wrapped_catalog.delete(path) except HTTPError as e: logger.debug(e.response.text) if 400 <= e.response.status_code < 500: raise DataPathException(_http_error_message(e), e) else: raise e def filter(self, filter_expression): """Filters the path based on the specified formula. :param filter_expression: should be a valid _Predicate object :return: self """ assert isinstance(filter_expression, _Predicate) self._path_expression = _Filter(self._path_expression, filter_expression) return self def link(self, right, on=None, join_type=''): """Links this path with another table. To link a table with an unambigious relationship where table A is related to table B via a single foreign key reference, the `on` clause is not. ``` # let A and B be variables for tables from the catalog path = A.link(B) ``` To link tables with more than one foreign key reference between them, use explicit `on` clause. ``` # let A.c1 be a column that is a simple foreign key to B.c1 that is a simple key in B path = A.link(B, on=(A.c1 == B.c1)) ``` To link tables with foreign keys on composite keys, use a conjunction of 2 or more equality comparisons in the `on` clause. ``` # let A.c1, A.c2 be columns that form a foreign key to B.c1, B.c2 that are a composite key in B path = A.link(B, on=((A.c1 == B.c1) & (A.c2 == B.c2))) ``` Alternatively, use an `ermrest_model.ForeignKey` object to link the table to the path. Both "inbound" and "outbound" foreign keys are supported by the `link` method. ``` # let fk be a foreign key object from table A to table B (or from table B to table A) path = A.link(B, on=fk) ``` By default links use inner join semantics on the foreign key / key equality comparison. The `join_type` parameter can be used to specify `left`, `right`, or `full` outer join semantics. :param right: the right hand table of the link expression :param on: an equality comparison between key and foreign key columns, a conjunction of such comparisons, or a foreign key object :param join_type: the join type of this link which may be 'left', 'right', 'full' outer joins or '' for inner join link by default. :return: self """ if not isinstance(right, _TableWrapper): raise TypeError("'right' must be a '_TableWrapper' instance") if on and not ( isinstance(on, _ComparisonPredicate) or (isinstance(on, _ConjunctionPredicate) and on.is_valid_join_condition) or isinstance(on, _erm.ForeignKey) ): raise TypeError("'on' must be a comparison, conjuction of comparisons, or foreign key object") if join_type and on is None: raise ValueError("'on' must be specified for outer joins") if right._schema._catalog != self._root._schema._catalog: raise ValueError("'right' is from a different catalog. Cannot link across catalogs.") if isinstance(right, _TableAlias) and right._name in self._table_instances: raise ValueError("'right' is a table alias that has already been used.") else: # Generate an unused alias name for the table table_name = right._name alias_name = table_name counter = 1 while alias_name in self._table_instances: counter += 1 alias_name = table_name + str(counter) right = right.alias(alias_name) if on is None: # if 'on' not given, default to the 'right' table on = right elif isinstance(on, _erm.ForeignKey): catalog = self._root._schema._catalog fk = on # determine 'direction' -- inbound or outbound path_context_table = self.context._base_table._wrapped_table if (path_context_table.schema.name, path_context_table.name) == (fk.table.schema.name, fk.table.name): fkcols = zip(fk.foreign_key_columns, fk.referenced_columns) elif (path_context_table.schema.name, path_context_table.name) == (fk.pk_table.schema.name, fk.pk_table.name): fkcols = zip(fk.referenced_columns, fk.foreign_key_columns) else: raise ValueError('"%s" is not an inbound or outbound foreign key for the path\'s context, table "%s"' % (fk.constraint_name, path_context_table.name)) # compose join condition on = None for lcol, rcol in fkcols: lcol = catalog.schemas[lcol.table.schema.name].tables[lcol.table.name].columns[lcol.name] rcol = catalog.schemas[rcol.table.schema.name].tables[rcol.table.name].columns[rcol.name] if on: on = on & (lcol == rcol) else: on = lcol == rcol # Extend path expression self._path_expression = _Link(self._path_expression, on, right, join_type) # Bind alias and this data path self._bind_table_instance(right) return self def entities(self): """Returns a results set of whole entities from this data path's current context. ``` results1 = my_path.entities() ``` :return: a result set of entities where each element is a whole entity per the table definition and policy. """ return self._query() def aggregates(self, *functions): """Returns a results set of computed aggregates from this data path. By using the built-in subclasses of the `AggregateFunction` class, including `Min`, `Max`, `Sum`, `Avg`, `Cnt`, `CntD`, `Array`, and `ArrayD`, aggregates can be computed and fetched. These aggregates must be passed as named parameters since they require _alias names_. ``` results1 = my_path.aggregates(Min(col1).alias('mincol1'), Array(col2).alias('arrcol2')) results2 = my_path.aggregates(Min(col1), Array(col2)) # Error! Aggregates must be aliased. results3 = my_path.aggregates(col1, Array(col2).alias('arrcol2')) # Error! Cannot mix columns and aggregate functions. ``` :param functions: aliased aggregate functions :return: a results set with a single row of results. """ return self._query(mode=_Project.AGGREGATE, projection=list(functions)) def attributes(self, *attributes): """Returns a results set of attributes projected and optionally renamed from this data path. ``` results1 = my_path.attributes(col1, col2) # fetch a subset of attributes of the path results2 = my_path.attributes(col1.alias('col_1'), col2.alias('col_2')) # fetch and rename the attributes results3 = my_path.attributes(col1, col2.alias('col_2')) # rename some but not others ``` :param attributes: a list of Columns. :return: a results set of the projected attributes from this data path. """ return self._query(mode=_Project.ATTRIBUTE, projection=list(attributes)) def groupby(self, *keys): """Returns an attribute group object. The attribute group object returned by this method can be used to get a results set of computed aggregates for groups of attributes from this data path. With a single group key: ``` results1 = my_path.groupby(col1).attributes(Min(col2).alias('min_col1'), Array(col3).alias('arr_col2')) ``` With more than one group key: ``` results2 = my_path.groupby(col1, col2).attributes(Min(col3).alias('min_col1'), Array(col4).alias('arr_col2')) ``` With aliased group keys: ``` results3 = my_path.groupby(col1.alias('key_one'), col2.alias('keyTwo'))\ .attributes(Min(col3).alias('min_col1'), Array(col4).alias('arr_col2')) ``` With binning: ``` results3 = my_path.groupby(col1.alias('key_one'), Bin(col2;10;0;9999).alias('my_bin'))\ .attributes(Min(col3).alias('min_col1'), Array(col4).alias('arr_col2')) ``` :param keys: a list of columns, aliased columns, or aliased bins, to be used as the grouping key. :return: an attribute group that supports an `.attributes(...)` method that accepts columns, aliased columns, and/or aliased aggregate functions as its arguments. """ return _AttributeGroup(self, self._query, keys) def _query(self, mode='entity', projection=[], group_key=[], context=None): """Internal method for querying the data path from the perspective of the given 'context'. :param mode: a valid mode in Project.MODES :param projection: a projection list. :param group_key: a group key list (only for attributegroup queries). :param context: optional context for the query. :return: a results set. """ assert context is None or isinstance(context, _TableAlias) catalog = self._root._schema._catalog._wrapped_catalog expression = self._path_expression if context: expression = _ResetContext(expression, context) if mode != _Project.ENTITY: expression = _Project(expression, mode, projection, group_key) base_path = str(expression) def fetcher(limit=None, sort=None, headers=DEFAULT_HEADERS): assert limit is None or isinstance(limit, int) assert sort is None or hasattr(sort, '__iter__') limiting = '?limit=%d' % limit if limit else '' sorting = '@sort(' + ','.join([col._uname for col in sort]) + ')' if sort else '' path = base_path + sorting + limiting logger.debug("Fetching " + path) try: resp = catalog.get(path, headers=headers) return resp.json() except HTTPError as e: logger.debug(e.response.text) if 400 <= e.response.status_code < 500: raise DataPathException(_http_error_message(e), e) else: raise e return _ResultSet(self._base_uri + base_path, fetcher) def merge(self, path): """Merges the current path with the given path. The right-hand 'path' must be rooted on a `_TableAlias` object that exists (by alias name) within this path (the left-hand path). It _must not_ have other shared table aliases. :param path: a `DataPath` object rooted on a table alias that can be found in this path :return: this path merged with the given (right-hand) path """ if not isinstance(path, DataPath): raise TypeError("'path' must be an instance of %s" % type(self).__name__) if path._root._name not in self._table_instances: raise ValueError("right-hand path root not found in this path's table instances") if not path._root._equivalent(self._table_instances[path._root._name]): raise ValueError("right-hand path root is not equivalent to the matching table instance in this path") if self._table_instances.keys() & path._table_instances.keys() != {path._root._name}: raise ValueError("overlapping table instances found in right-hand path") # update this path as rebased right-hand path temp = copy.deepcopy(path._path_expression) temp.rebase(self._path_expression, self._table_instances[path._root._name]) self._path_expression = temp # copy and bind table instances from right-hand path for alias in path._table_instances: if alias not in self.table_instances: self._bind_table_instance(copy.deepcopy(path._table_instances[alias])) # set the context self._context = self._table_instances[path._context._name] return self class _ResultSet (object): """A set of results for various queries or data manipulations. The result set is produced by a path. The results may be explicitly fetched. The result set behaves like a container. If the result set has not been fetched explicitly, on first use of container operations, it will be implicitly fetched from the catalog. """ def __init__(self, uri, fetcher_fn): """Initializes the _ResultSet. :param uri: the uri for the entity set in the catalog. :param fetcher_fn: a function that fetches the entities from the catalog. """ assert fetcher_fn is not None self._fetcher_fn = fetcher_fn self._results_doc = None self._sort_keys = None self.uri = uri @property def _results(self): if self._results_doc is None: self.fetch() return self._results_doc def __len__(self): return len(self._results) def __getitem__(self, item): return self._results[item] def __iter__(self): return iter(self._results) def sort(self, *attributes): """Orders the results set by the given attributes. :param keys: Columns, column aliases, or aggregate function aliases. The sort attributes must be projected by the originating query. :return: self """ if not attributes: raise ValueError("No sort attributes given.") if not all(isinstance(a, _ColumnWrapper) or isinstance(a, _ColumnAlias) or isinstance(a, _AggregateFunctionAlias) or isinstance(a, _SortDescending) for a in attributes): raise TypeError("Sort keys must be column, column alias, or aggregate function alias") self._sort_keys = attributes self._results_doc = None return self def fetch(self, limit=None, headers=DEFAULT_HEADERS): """Fetches the results from the catalog. :param limit: maximum number of results to fetch from the catalog. :param headers: headers to send in request to server :return: self """ limit = int(limit) if limit else None self._results_doc = self._fetcher_fn(limit, self._sort_keys, headers) logger.debug("Fetched %d entities" % len(self._results_doc)) return self class _TableWrapper (object): """Wraps a Table for datapath expressions. """ def __init__(self, schema, table): """Creates a _TableWrapper object. :param schema: the schema objec to which this table belongs :param table: the wrapped table """ self._schema = schema self._wrapped_table = table self._name = table.name self._uname = urlquote(table.name) self._fqname = "%s:%s" % (urlquote(self._schema._name), self._uname) self._instancename = '*' self._projection_name = self._instancename self._fromname = self._fqname self.column_definitions = { v.name: _ColumnWrapper(self, v) for v in table.column_definitions } self._identifiers = _make_identifier_to_name_mapping( self.column_definitions.keys(), super(_TableWrapper, self).__dir__()) def __dir__(self): return itertools.chain( super(_TableWrapper, self).__dir__(), self._identifiers.keys() ) def __getattr__(self, a): if a in self._identifiers: return self.column_definitions[self._identifiers[a]] elif hasattr(super(_TableWrapper, self), a): return getattr(super(_TableWrapper, self), a) else: raise AttributeError("'%s' object for table '%s' has no attribute or column '%s'" % (type(self).__name__, self._wrapped_table.name, a)) @deprecated def describe(self): """Provides a description of the model element. :return: a user-friendly string representation of the model element. """ s = "_TableWrapper name: '%s'\nList of columns:\n" % self._name if len(self.column_definitions) == 0: s += "none" else: s += "\n".join(" %s" % col._name for col in self.column_definitions.values()) return s @deprecated def _repr_html_(self): return self.describe() @property def columns(self): """Sugared access to self.column_definitions""" return self.column_definitions @property def path(self): """Always a new DataPath instance that is rooted at this table. Note that this table will be automatically aliased using its own table name. """ return DataPath(self.alias(self._name)) @property def _contextualized_path(self): """Returns the path as contextualized for this table instance. Conditionally updates the context of the path to which this table instance is bound. """ return self.path @property @deprecated def uri(self): return self.path.uri def alias(self, alias_name): """Returns a table alias object. :param alias_name: a string to use as the alias name """ return _TableAlias(self, alias_name) def filter(self, filter_expression): """See the docs for this method in `DataPath` for more information.""" return self._contextualized_path.filter(filter_expression) def link(self, right, on=None, join_type=''): """See the docs for this method in `DataPath` for more information.""" return self._contextualized_path.link(right, on, join_type) def _query(self, mode='entity', projection=[], group_key=[], context=None): """Invokes query on the path for this table.""" return self.path._query(mode, projection, group_key=group_key, context=context) def entities(self): """Returns a results set of whole entities from this data path's current context. See the docs for this method in `DataPath` for more information. """ return self._query() def aggregates(self, *functions): """Returns a results set of computed aggregates from this data path. See the docs for this method in `DataPath` for more information. """ return self._query(mode=_Project.AGGREGATE, projection=list(functions)) def attributes(self, *attributes): """Returns a results set of attributes projected and optionally renamed from this data path. See the docs for this method in `DataPath` for more information. """ return self._query(mode=_Project.ATTRIBUTE, projection=list(attributes)) def groupby(self, *keys): """Returns an attribute group object. See the docs for this method in `DataPath` for more information. """ return _AttributeGroup(self, self._query, keys) def insert(self, entities, defaults=set(), nondefaults=set(), add_system_defaults=True, on_conflict_skip=False): """Inserts entities into the table. :param entities: an iterable collection of entities (i.e., rows) to be inserted into the table. :param defaults: optional, set of column names to be assigned the default expression value. :param nondefaults: optional, set of columns names to override implicit system defaults :param add_system_defaults: flag to add system columns to the set of default columns. :param on_conflict_skip: flag to skip entities that violate uniqueness constraints. :return a collection of newly created entities. """ # empty entities will be accepted but results are therefore an empty entity set if not entities: return _ResultSet(self.path.uri, lambda ignore1, ignore2, ignore3: []) options = [] if on_conflict_skip: options.append('onconflict=skip') if defaults or add_system_defaults: defaults_enc = {urlquote(cname) for cname in defaults} if add_system_defaults: defaults_enc |= _system_defaults - nondefaults options.append("defaults={cols}".format(cols=','.join(defaults_enc))) if nondefaults: nondefaults_enc = {urlquote(cname) for cname in nondefaults} options.append("nondefaults={cols}".format(cols=','.join(nondefaults_enc))) path = '/entity/' + self._fqname if options: path += "?" + "&".join(options) logger.debug("Inserting entities to path: {path}".format(path=path)) # JSONEncoder does not handle general iterable objects, so we have to make sure its an acceptable collection if not hasattr(entities, '__iter__'): raise TypeError('entities is not iterable') entities = entities if isinstance(entities, (list, tuple)) else list(entities) # test the first entity element to make sure that it looks like a dictionary if not hasattr(entities[0], 'keys'): raise TypeError('entities[0] does not look like a dictionary -- does not have a "keys()" method') try: resp = self._schema._catalog._wrapped_catalog.post(path, json=entities, headers={'Content-Type': 'application/json'}) return _ResultSet(self.path.uri, lambda ignore1, ignore2, ignore3: resp.json()) except HTTPError as e: logger.debug(e.response.text) if 400 <= e.response.status_code < 500: raise DataPathException(_http_error_message(e), e) else: raise e def update(self, entities, correlation={'RID'}, targets=None): """Update entities of a table. For more information see the ERMrest protocol for the `attributegroup` interface. By default, this method will correlate the input data (entities) based on the `RID` column of the table. By default, the method will use all column names found in the first row of the `entities` input, which are not found in the `correlation` set and not defined as 'system columns' by ERMrest, as the targets if `targets` is not set. :param entities: an iterable collection of entities (i.e., rows) to be updated in the table. :param correlation: an iterable collection of column names used to correlate input set to the set of rows to be updated in the catalog. E.g., `{'col name'}` or `{mytable.mycolumn}` will work if you pass a _ColumnWrapper object. :param targets: an iterable collection of column names used as the targets of the update operation. :return: a collection of updated entities as returned by the corresponding ERMrest interface. """ # empty entities will be accepted but results are therefore an empty entity set if not entities: return _ResultSet(self.path.uri, lambda ignore1, ignore2, ignore3: []) # JSONEncoder does not handle general iterable objects, so we have to make sure its an acceptable collection if not hasattr(entities, '__iter__'): raise TypeError('entities is not iterable') entities = entities if isinstance(entities, (list, tuple)) else list(entities) # test the first entity element to make sure that it looks like a dictionary if not hasattr(entities[0], 'keys'): raise TypeError('entities[0] does not look like a dictionary -- does not have a "keys()" method') # Form the correlation keys and the targets correlation_cnames = {urlquote(str(c)) for c in correlation} if targets: target_cnames = {urlquote(str(t)) for t in targets} else: exclusions = correlation_cnames | _system_defaults target_cnames = {urlquote(str(t)) for t in entities[0].keys() if urlquote(str(t)) not in exclusions} # test if there are any targets after excluding for correlation keys and system columns if not target_cnames: raise ValueError('No "targets" for the update. There must be at least one column as a target of the update,' ' and targets cannot overlap with "correlation" keys and system columns.') # Form the path path = '/attributegroup/{table}/{correlation};{targets}'.format( table=self._fqname, correlation=','.join(correlation_cnames), targets=','.join(target_cnames) ) try: resp = self._schema._catalog._wrapped_catalog.put(path, json=entities, headers={'Content-Type': 'application/json'}) return _ResultSet(self.path.uri, lambda ignore1, ignore2, ignore3: resp.json()) except HTTPError as e: logger.debug(e.response.text) if 400 <= e.response.status_code < 500: raise DataPathException(_http_error_message(e), e) else: raise e class _TableAlias (_TableWrapper): """Represents a table alias in datapath expressions. """ def __init__(self, base_table, alias_name): """Initializes the table alias. :param base_table: the base table to be given an alias name :param alias_name: the alias name """ assert isinstance(base_table, _TableWrapper) super(_TableAlias, self).__init__(base_table._schema, base_table._wrapped_table) self._parent = None self._base_table = base_table self._name = alias_name self._uname = urlquote(alias_name) self._fqname = self._base_table._fqname self._instancename = self._uname + ":*" self._projection_name = self._instancename self._fromname = "%s:=%s" % (self._uname, self._base_table._fqname) def __deepcopy__(self, memodict={}): # deep copy implementation of a table alias should not make copies of model objects (ie, the base table) return _TableAlias(self._base_table, self._name) def _equivalent(self, alias): """Equivalence comparison between table aliases. :param alias: another table alias :return: True, if the base table and alias name match, else False """ if not isinstance(alias, _TableAlias): raise TypeError("'alias' must be an instance of '%s'" % type(self).__name__) return self._name == alias._name and self._base_table == alias._base_table @property def path(self): """Returns the parent path for this alias. """ if not self._parent: self._parent = DataPath(self) return self._parent def _bind(self, parent_path): """Binds this table instance to the given parent path.""" if self._parent: raise ValueError("Cannot bind a table instance that has already been bound.") elif not isinstance(parent_path, DataPath): raise TypeError("value must be a DataPath instance.") self._parent = parent_path @property def _contextualized_path(self): """Returns the path as contextualized for this table instance. Conditionally updates the context of the path to which this table instance is bound. """ path = self.path if path.context != self: path.context = self return path @property @deprecated def uri(self): return self.path._contextualized_uri(self) def _query(self, mode='entity', projection=[], group_key=[], context=None): """Overridden method to set context of query to this table instance.""" return self.path._query(mode, projection, group_key=group_key, context=self) class _ColumnWrapper (object): """Wraps a Column for datapath expressions. """ def __init__(self, table, column): """Creates a _ColumnWrapper object. :param table: the table to which this column belongs :param column: the wrapped column """ super(_ColumnWrapper, self).__init__() self._table = table self._wrapped_column = column self._name = column.name self._uname = urlquote(self._name) @property def _fqname(self): """Late binding needed for table alias instances.""" return "%s:%s" % (self._table._fqname, self._uname) @property def _instancename(self): """Late binding needed for table alias instances.""" return "%s:%s" % (self._table._uname, self._uname) if isinstance(self._table, _TableAlias) else self._uname @property def _projection_name(self): """Late binding needed for table alias instances.""" return self._instancename @deprecated def describe(self): """Provides a description of the model element. :return: a user-friendly string representation of the model element. """ return "_ColumnWrapper name: '%s'\tType: %s\tComment: '%s'" % \ (self._name, self._wrapped_column.type.typename, self._wrapped_column.comment) @deprecated def _repr_html_(self): return self.describe() @property def desc(self): """A descending sort modifier based on this column.""" return _SortDescending(self) def __str__(self): return self._name def eq(self, other): """Returns an 'equality' comparison predicate. :param other: `None` or any other literal value. :return: a filter predicate object """ if other is None: return _ComparisonPredicate(self, "::null::", '') else: return _ComparisonPredicate(self, "=", other) __eq__ = eq def lt(self, other): """Returns a 'less than' comparison predicate. :param other: a literal value. :return: a filter predicate object """ return _ComparisonPredicate(self, "::lt::", other) __lt__ = lt def le(self, other): """Returns a 'less than or equal' comparison predicate. :param other: a literal value. :return: a filter predicate object """ return _ComparisonPredicate(self, "::leq::", other) __le__ = le def gt(self, other): """Returns a 'greater than' comparison predicate. :param other: a literal value. :return: a filter predicate object """ return _ComparisonPredicate(self, "::gt::", other) __gt__ = gt def ge(self, other): """Returns a 'greater than or equal' comparison predicate. :param other: a literal value. :return: a filter predicate object """ return _ComparisonPredicate(self, "::geq::", other) __ge__ = ge def regexp(self, other): """Returns a 'regular expression' comparison predicate. :param other: a _string_ literal value. :return: a filter predicate object """ if not isinstance(other, str): logger.warning("'regexp' method comparison only supports string literals.") return _ComparisonPredicate(self, "::regexp::", other) def ciregexp(self, other): """Returns a 'case-insensitive regular expression' comparison predicate. :param other: a _string_ literal value. :return: a filter predicate object """ if not isinstance(other, str): logger.warning("'ciregexp' method comparison only supports string literals.") return _ComparisonPredicate(self, "::ciregexp::", other) def ts(self, other): """Returns a 'text search' comparison predicate. :param other: a _string_ literal value. :return: a filter predicate object """ if not isinstance(other, str): logger.warning("'ts' method comparison only supports string literals.") return _ComparisonPredicate(self, "::ts::", other) def alias(self, name): """Returns an alias for this column.""" return _ColumnAlias(self, name) class _ColumnAlias (object): """Represents an (output) alias for a column instance in a datapath expression. """ def __init__(self, base_column, alias_name): """Initializes the column alias. :param base_column: the base column to be given an alias name :param alias_name: the alias name """ assert isinstance(base_column, _ColumnWrapper) super(_ColumnAlias, self).__init__() self._name = alias_name self._base_column = base_column self._uname = urlquote(self._name) @property def _projection_name(self): """Late binding needed for table alias instances.""" return "%s:=%s" % (self._uname, self._base_column._instancename) def __deepcopy__(self, memodict={}): # deep copy implementation of a column alias should not make copies of model objects (ie, the base column) return _ColumnAlias(self._base_column, self._name) @deprecated def describe(self): """Provides a description of the model element. :return: a user-friendly string representation of the model element. """ return "_ColumnWrapper name: '%s'\tAlias for: %s" % \ (self._name, self._base_column.describe()) @deprecated def _repr_html_(self): return self.describe() @property def desc(self): """A descending sort modifier based on this column.""" return _SortDescending(self) def __str__(self): return self._name def eq(self, other): """Returns an 'equality' comparison predicate. :param other: `None` or any other literal value. :return: a filter predicate object """ return self._base_column.eq(other) __eq__ = eq def lt(self, other): """Returns a 'less than' comparison predicate. :param other: a literal value. :return: a filter predicate object """ return self._base_column.lt(other) __lt__ = lt def le(self, other): """Returns a 'less than or equal' comparison predicate. :param other: a literal value. :return: a filter predicate object """ return self._base_column.le(other) __le__ = le def gt(self, other): """Returns a 'greater than' comparison predicate. :param other: a literal value. :return: a filter predicate object """ return self._base_column.gt(other) __gt__ = gt def ge(self, other): """Returns a 'greater than or equal' comparison predicate. :param other: a literal value. :return: a filter predicate object """ return self._base_column.ge(other) __ge__ = ge def regexp(self, other): """Returns a 'regular expression' comparison predicate. :param other: a _string_ literal value. :return: a filter predicate object """ return self._base_column.regexp(other) def ciregexp(self, other): """Returns a 'case-insensitive regular expression' comparison predicate. :param other: a _string_ literal value. :return: a filter predicate object """ return self._base_column.ciregexp(other) def ts(self, other): """Returns a 'text search' comparison predicate. :param other: a _string_ literal value. :return: a filter predicate object """ return self._base_column.ts(other) class _SortDescending (object): """A descending sort condition.""" def __init__(self, attr): """Creates sort descending object. :param attr: a column, column alias, or aggrfn alias object """ assert isinstance(attr, _ColumnWrapper) or isinstance(attr, _ColumnAlias) or isinstance(attr, _AggregateFunctionAlias) self._attr = attr self._uname = urlquote(self._attr._uname) + "::desc::" class _PathOperator (object): def __init__(self, r): assert isinstance(r, _PathOperator) or isinstance(r, _TableAlias) if isinstance(r, _Project): raise Exception("Cannot extend a path after an attribute projection") self._r = r def __deepcopy__(self, memodict={}): return type(self)(copy.deepcopy(self._r, memo=memodict)) @property def _path(self): assert isinstance(self._r, _PathOperator) return self._r._path @property def _mode(self): assert isinstance(self._r, _PathOperator) return self._r._mode def __str__(self): return "/%s/%s" % (self._mode, self._path) def rebase(self, base, root_context): """Rebases the current path expression to begin as a reset context following 'base'. :param base: a valid path expresion :param root_context: root context on which to rebase this path expression :return: rebased expresion _or_ a new `_ResetContext` instance if `self` was the root """ assert isinstance(base, _PathOperator) assert isinstance(root_context, _TableAlias) if isinstance(self, _Root): return _ResetContext(base, self._table) else: pathobj = self while not isinstance(pathobj._r, _Root): pathobj = self._r assert root_context._equivalent(pathobj._r._table) pathobj._r = _ResetContext(base, root_context) return self class _Root (_PathOperator): def __init__(self, r): super(_Root, self).__init__(r) assert isinstance(r, _TableAlias) self._table = r @property def _path(self): return self._table._fromname @property def _mode(self): return 'entity' class _ResetContext (_PathOperator): def __init__(self, r, alias): if isinstance(r, _ResetContext): r = r._r # discard the previous context reset operator super(_ResetContext, self).__init__(r) assert isinstance(alias, _TableAlias) self._alias = alias def __deepcopy__(self, memodict={}): return _ResetContext(copy.deepcopy(self._r, memo=memodict), copy.deepcopy(self._alias, memo=memodict)) @property def _path(self): assert isinstance(self._r, _PathOperator) return "%s/$%s" % (self._r._path, self._alias._uname) class _Filter(_PathOperator): def __init__(self, r, formula): super(_Filter, self).__init__(r) assert isinstance(formula, _Predicate) self._formula = formula def __deepcopy__(self, memodict={}): return _Filter(copy.deepcopy(self._r, memo=memodict), copy.deepcopy(self._formula, memo=memodict)) @property def _path(self): assert isinstance(self._r, _PathOperator) return "%s/%s" % (self._r._path, str(self._formula)) class _Project (_PathOperator): """Projection path component.""" ENTITY = 'entity' ATTRIBUTE = 'attribute' AGGREGATE = 'aggregate' ATTRGROUP = 'attributegroup' MODES = (ENTITY, ATTRIBUTE, AGGREGATE, ATTRGROUP) def __init__(self, r, mode=ENTITY, projection=[], group_key=[]): """Initializes the projection component. :param r: the parent path component. :param r: the 'mode' of the projection (entity, attribute, etc.) :param projection: projection list. :param group_key: grouping keys list. """ super(_Project, self).__init__(r) assert mode in self.MODES assert mode == self.ENTITY or mode == self.ATTRGROUP or len(projection) > 0 assert mode != self.ATTRGROUP or len(group_key) > 0 self._projection_mode = mode self._projection = [] self._group_key = [] if mode == self.ATTRIBUTE: if not all(isinstance(obj, _TableWrapper) or isinstance(obj, _TableAlias) or isinstance(obj, _ColumnWrapper) or isinstance(obj, _ColumnAlias) for obj in projection): raise TypeError("Only columns or column aliases can be retrieved by an 'attribute' query.") elif mode == self.AGGREGATE: if not all(isinstance(obj, _AggregateFunctionAlias) for obj in projection): raise TypeError("Only aggregate function aliases can be retrieved by an 'aggregate' query.") elif mode == self.ATTRGROUP: if not all(isinstance(obj, _ColumnWrapper) or isinstance(obj, _ColumnAlias) or isinstance(obj, _AggregateFunctionAlias) for obj in projection): raise TypeError("Only columns, column aliases, or aggregate function aliases can be retrieved by an 'attributegroup' query.") if not all(isinstance(obj, _ColumnWrapper) or isinstance(obj, _ColumnAlias) or isinstance(obj, _AggregateFunctionAlias) for obj in group_key): raise TypeError("Only column aliases or aggregate function aliases can be used to group an 'attributegroup' query.") self._group_key = [obj._projection_name for obj in group_key] self._projection = [obj._projection_name for obj in projection] def __deepcopy__(self, memodict={}): cp = super(_Project, self).__deepcopy__(memodict=memodict) cp._projection_mode = self._projection_mode cp._projection = copy.deepcopy(self._projection, memo=memodict) cp._group_key = copy.deepcopy(self._group_key, memo=memodict) return cp @property def _path(self): assert isinstance(self._r, _PathOperator) projection = ','.join(self._projection) if self._projection_mode == self.ATTRGROUP: assert self._group_key grouping = ','.join(self._group_key) return "%s/%s;%s" % (self._r._path, grouping, projection) else: return "%s/%s" % (self._r._path, projection) @property def _mode(self): return self._projection_mode class _Link (_PathOperator): def __init__(self, r, on, as_=None, join_type=''): """Initialize the _Link operator :param r: parent path operator :param on: a table alias, a comparison predicate, or a conjunction of comparisons :param as_: table alias :param join_type: left, right or full for outer join semantics, or '' for inner join semantics """ super(_Link, self).__init__(r) assert isinstance(on, _ComparisonPredicate) or isinstance(on, _TableAlias) or ( isinstance(on, _ConjunctionPredicate) and on.is_valid_join_condition), "Invalid join 'on' clause" assert as_ is None or isinstance(as_, _TableAlias) assert join_type == '' or (join_type in ('left', 'right', 'full') and isinstance(on, _Predicate)) self._on = on self._as = as_ self._join_type = join_type def __deepcopy__(self, memodict={}): return _Link( copy.deepcopy(self._r, memo=memodict), copy.deepcopy(self._on, memo=memodict), as_=copy.deepcopy(self._as, memo=memodict), join_type=self._join_type ) @property def _path(self): assert isinstance(self._r, _PathOperator) assign = '' if self._as is None else "%s:=" % self._as._uname if isinstance(self._on, _TableWrapper): cond = self._on._fqname elif isinstance(self._on, _ComparisonPredicate): cond = str(self._on) elif isinstance(self._on, _ConjunctionPredicate): cond = self._on.as_join_condition else: raise DataPathException("Invalid join condition: " + str(self._on)) return "%s/%s%s%s" % (self._r._path, assign, self._join_type, cond) class _Predicate (object): """Common base class for all predicate types.""" def and_(self, other): """Returns a conjunction predicate. :param other: a predicate object. :return: a junction predicate object. """ if not isinstance(other, _Predicate): raise TypeError("Invalid comparison with object that is not a _Predicate instance.") return _ConjunctionPredicate([self, other]) __and__ = and_ def or_(self, other): """Returns a disjunction predicate. :param other: a predicate object. :return: a junction predicate object. """ if not isinstance(other, _Predicate): raise TypeError("Invalid comparison with object that is not a _Predicate instance.") return _DisjunctionPredicate([self, other]) __or__ = or_ def negate(self): """Returns a negation predicate. This predicate is wrapped in a negation predicate which is returned to the caller. :return: a negation predicate object. """ return _NegationPredicate(self) __invert__ = negate class _ComparisonPredicate (_Predicate): """Comparison (left-operand operator right-operand) predicate""" def __init__(self, lop, op, rop): super(_ComparisonPredicate, self).__init__() assert isinstance(lop, _ColumnWrapper) assert isinstance(rop, _ColumnWrapper) or isinstance(rop, int) or \ isinstance(rop, float) or isinstance(rop, str) or \ isinstance(rop, date) assert isinstance(op, str) self._lop = lop self._op = op self._rop = rop def __deepcopy__(self, memodict={}): # deep copy of predicate should not deep copy the model object references (i.e., _ColumnWrapper objects) return _ComparisonPredicate(self._lop, self._op, self._rop) @property def is_equality(self): return self._op == '=' @property def left(self): return self._lop @property def right(self): return self._rop def __str__(self): if isinstance(self._rop, _ColumnWrapper): # The only valid circumstance for a _ColumnWrapper rop is in a link 'on' predicate for simple key/fkey joins return "(%s)=(%s)" % (self._lop._instancename, self._rop._fqname) else: # All other comparisons are serialized per the usual form return "%s%s%s" % (self._lop._instancename, self._op, urlquote(str(self._rop))) class _JunctionPredicate (_Predicate): """Junction (and/or) of child predicates.""" def __init__(self, op, operands): super(_JunctionPredicate, self).__init__() assert operands and hasattr(operands, '__iter__') and len(operands) > 1 assert all(isinstance(operand, _Predicate) for operand in operands) assert isinstance(op, str) self._operands = operands self._op = op def __str__(self): return self._op.join(["(%s)" % operand for operand in self._operands]) class _ConjunctionPredicate (_JunctionPredicate): """Conjunction (and) or child predicates.""" def __init__(self, operands): super(_ConjunctionPredicate, self).__init__('&', operands) def and_(self, other): return _ConjunctionPredicate(self._operands + [other]) @property def is_valid_join_condition(self): """Tests if this conjunction is a valid join condition.""" return all(isinstance(o, _ComparisonPredicate) and o.is_equality for o in self._operands) @property def as_join_condition(self): """Returns the conjunction in the 'join condition' serialized format.""" lhs = [] rhs = [] for operand in self._operands: assert isinstance(operand, _ComparisonPredicate) and operand.is_equality assert isinstance(operand.left, _ColumnWrapper) assert isinstance(operand.right, _ColumnWrapper) lhs.append(operand.left) rhs.append(operand.right) return "({left})=({right})".format( left=",".join(lop._instancename for lop in lhs), right=",".join(rop._fqname for rop in rhs) ) class _DisjunctionPredicate (_JunctionPredicate): """Disjunction (or) of child predicates.""" def __init__(self, operands): super(_DisjunctionPredicate, self).__init__(';', operands) def or_(self, other): return _DisjunctionPredicate(self._operands + [other]) class _NegationPredicate (_Predicate): """Negates the child predicate.""" def __init__(self, child): super(_NegationPredicate, self).__init__() assert isinstance(child, _Predicate) self._child = child def __str__(self): return "!(%s)" % self._child class AggregateFunction (object): """Base class of all aggregate functions.""" def __init__(self, fn_name, arg): """Initializes the aggregate function. :param fn_name: name of the function per ERMrest specification. :param arg: argument of the function per ERMrest specification. """ super(AggregateFunction, self).__init__() self._fn_name = fn_name self._arg = arg def __str__(self): return "%s(%s)" % (self._fn_name, self._arg) @property def _instancename(self): return "%s(%s)" % (self._fn_name, self._arg._instancename) def alias(self, alias_name): """Returns an (output) alias for this aggregate function instance.""" return _AggregateFunctionAlias(self, alias_name)
[docs]class Min (AggregateFunction): """Aggregate function for minimum non-NULL value.""" def __init__(self, arg): super(Min, self).__init__('min', arg)
[docs]class Max (AggregateFunction): """Aggregate function for maximum non-NULL value.""" def __init__(self, arg): super(Max, self).__init__('max', arg)
[docs]class Sum (AggregateFunction): """Aggregate function for sum of non-NULL values.""" def __init__(self, arg): super(Sum, self).__init__('sum', arg)
[docs]class Avg (AggregateFunction): """Aggregate function for average of non-NULL values.""" def __init__(self, arg): super(Avg, self).__init__('avg', arg)
[docs]class Cnt (AggregateFunction): """Aggregate function for count of non-NULL values.""" def __init__(self, arg): super(Cnt, self).__init__('cnt', arg)
[docs]class CntD (AggregateFunction): """Aggregate function for count of distinct non-NULL values.""" def __init__(self, arg): super(CntD, self).__init__('cnt_d', arg)
[docs]class Array (AggregateFunction): """Aggregate function for an array containing all values (including NULL).""" def __init__(self, arg): super(Array, self).__init__('array', arg)
[docs]class ArrayD (AggregateFunction): """Aggregate function for an array containing distinct values (including NULL).""" def __init__(self, arg): super(ArrayD, self).__init__('array_d', arg)
[docs]class Bin (AggregateFunction): """Binning function.""" def __init__(self, arg, nbins, minval=None, maxval=None): """Initialize the bin function. If `minval` or `maxval` are not given, they will be set based on the min and/or max values for the column (`operand` parameter) as determined by issuing an aggregate query over the current data path. :param arg: a column or aliased column instance :param nbins: number of bins :param minval: minimum value (optional) :param maxval: maximum value (optional) """ super(Bin, self).__init__('bin', arg) if not (isinstance(arg, _ColumnWrapper) or isinstance(arg, _ColumnAlias)): raise TypeError("Bin argument must be a column or column alias") self.nbins = nbins self.minval = minval self.maxval = maxval def __str__(self): return "%s(%s;%s;%s;%s)" % (self._fn_name, self._arg, self.nbins, self.minval, self.maxval) @property def _instancename(self): return "%s(%s;%s;%s;%s)" % (self._fn_name, self._arg._instancename, self.nbins, self.minval, self.maxval)
class _AggregateFunctionAlias (object): """Alias for aggregate functions.""" def __init__(self, fn, alias_name): """Initializes the aggregate function alias. :param fn: aggregate function instance :param alias_name: alias name """ super(_AggregateFunctionAlias, self).__init__() assert isinstance(fn, AggregateFunction) self._fn = fn self._name = alias_name self._uname = urlquote(self._name) def __str__(self): return str(self._fn) @property def _projection_name(self): """In a projection, the object uses this name.""" return "%s:=%s" % (self._uname, self._fn._instancename) @property def desc(self): """A descending sort modifier based on this alias.""" return _SortDescending(self) class _AttributeGroup (object): """A computed attribute group.""" def __init__(self, source, queryfn, keys): """Initializes an attribute group instance. :param source: the source object for the group (DataPath, _TableWrapper, _TableAlias) :param queryfn: a query function that takes mode, projection, and group_key parameters :param keys: an iterable collection of group keys """ super(_AttributeGroup, self).__init__() assert any(isinstance(source, valid_type) for valid_type in [DataPath, _TableWrapper, _TableAlias]) assert isinstance(keys, tuple) if not keys: raise ValueError("No groupby keys.") self._source = source self._queryfn = queryfn self._grouping_keys = list(keys) def attributes(self, *attributes): """Returns a results set of attributes projected and optionally renamed from this group. :param attributes: the columns, aliased columns, and/or aliased aggregate functions to be retrieved for this group. :return: a results set of the projected attributes from this group. """ self._resolve_binning_ranges() return self._queryfn(mode=_Project.ATTRGROUP, projection=list(attributes), group_key=self._grouping_keys) def _resolve_binning_ranges(self): """Helper method to resolve any unspecified binning ranges.""" for key in self._grouping_keys: if isinstance(key, _AggregateFunctionAlias) and isinstance(key._fn, Bin): bin = key._fn aggrs = [] if bin.minval is None: aggrs.append(Min(bin._arg).alias('minval')) if bin.maxval is None: aggrs.append(Max(bin._arg).alias('maxval')) if aggrs: result = self._source.aggregates(*aggrs)[0] bin.minval = result.get('minval', bin.minval) bin.maxval = result.get('maxval', bin.maxval) if (bin.minval is None) or (bin.maxval is None): raise ValueError('Automatic determination of binning bounds failed.')