Source code for deriva.utils.catalog.components.deriva_model

import logging
import logging.config
import pprint
import time
from collections import namedtuple, OrderedDict
from collections.abc import MutableMapping
import copy
from enum import Enum
from urllib.parse import urlparse
from requests.exceptions import HTTPError

# test
import deriva.core.ermrest_model as em
import tabulate
from deriva.core import ErmrestCatalog, get_credential
from deriva.core.ermrest_config import tag as chaise_tags, KeyedList

chaise_tags['catalog_config'] = 'tag:isrd.isi.edu,2019:catalog-config'

CATALOG_CONFIG__TAG = 'tag:isrd.isi.edu,2019:catalog-config'

logger = logging.getLogger(__name__)
# Make sure we only have one stream handler....
if len(logger.handlers) == 0:
    handler = logging.StreamHandler()
    logger.addHandler(handler)
logger.setLevel(logging.INFO)

def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        if 'log_time' in kw:
            name = kw.get('log_name', method.__class__, method.__name__.upper())
            kw['log_time'][name] = int((te - ts) * 1000)
        else:
            logger.info('%r  %2.2f ms', method.__name__, (te - ts) * 1000)
        return result
    return timed


class DerivaMethodFilter:
    def __init__(self, include=True, exclude=[]):
        self.include = include
        self.exclude = exclude

    def filter(self, record):
        if self.include is True:
            return record.funcName not in  self.exclude
        else:
            return record.funcName in self.include


# Add filters: ['source_spec'] to use filter.
logger_config = {
    'disable_existing_loggers': False,
    'version': 1,
    'filters': {
        'method_filter': {
            '()': DerivaMethodFilter,
            'include': True
        },
        'model_filter': {
            '()': DerivaMethodFilter,
            'include': ['model_element']
        },
        'foreign_key_filter': {
            '()': DerivaMethodFilter,
            'include': ['delete']
        },
        'table_filter': {
            '()': DerivaMethodFilter,
            'exclude': ['_foreign_key', '_referenced', '_key_in_columns'],
            'include': ['copy_columns']
        },
        'visiblesources_filter': {
            '()': DerivaMethodFilter,
            'include': ['insert_sources']
        },
        'sourcespec_filter': {
            '()': DerivaMethodFilter,
            'include': ['rename_column']
        }

    },
    'formatters': {
        'class': {
            'style': '{',
            'format': '{levelname} {name}.{funcName}: {message}'
        },
    },
    'handlers': {
        'console': {
            'level': 'DEBUG',
            'formatter': 'class',
            'filters': ['method_filter'],
            'class': 'logging.StreamHandler',
        },
    },
    'loggers': {
        'deriva_model': {
            'handlers': ['console'],
            'level': 'INFO',
            'propagate': False
        },
        'deriva_model.DerivaModel': {
            #     'level': 'DEBUG',
            #     'filters': ['model_filter']
        },
        'deriva_model.DerivaCatalog': {
            #   'level': 'DEBUG',
        },
        'deriva_model.DerivaColumnMap': {
            #    'level': 'DEBUG'
        },
        'deriva_model.DerivaSchema': {
            #    'level': 'DEBUG'
        },
        'deriva_model.DerivaVisibleSources': {
            #  'level': 'DEBUG',
            # 'filters': ['visiblesources_filter']
        },
        'deriva_model.DerivaSourceSpec': {
            #       'level': 'DEBUG',
            #   'filters': ['sourcespec_filter']
        },
        'deriva_model.DerivaTable': {
         #          'level': 'DEBUG',
    #           'filters': ['table_filter']
        },
        'deriva_model.DerivaColumn': {
            #   'level': 'DEBUG'
        },
        'deriva_model.DerivaKey': {
            #     'level': 'DEBUG'
        },
        'deriva_model.DerivaForeignKey': {
            #     'level': 'DEBUG',
            #   'filters': ['foreign_key_filter']
        }
    },
}


logging.config.dictConfig(logger_config)


class DerivaLogging:
    def __init__(self, **kwargs):
        self.logger = logging.getLogger('{}.{}'.format('deriva_model', type(self).__name__))


class DerivaCatalogError(Exception):
    def __init__(self, obj, msg):
        self.msg = msg
        self.obj = obj


class DerivaModelError(DerivaCatalogError):
    def __init__(self, obj, msg):
        DerivaCatalogError.__init__(self, obj, msg)


class DerivaSourceError(DerivaCatalogError):
    def __init__(self, obj, msg):
        DerivaCatalogError.__init__(self, obj, msg)


class DerivaKeyError(DerivaCatalogError):
    def __init__(self, obj, msg):
        DerivaCatalogError.__init__(self, obj, msg)


class DerivaForeignKeyError(DerivaCatalogError):
    def __init__(self, obj, msg):
        DerivaCatalogError.__init__(self, obj, msg)


class DerivaTableError(DerivaCatalogError):
    def __init__(self, obj, msg):
        DerivaCatalogError.__init__(self, obj, msg)


class DerivaContext(Enum):
    compact = "compact"
    compact_brief = "compact/brief"
    compact_select = "compact/select"
    detailed = "detailed"
    entry = "entry"
    entry_edit = "entry/edit"
    entry_create = "entry/create"
    filter = "filter"
    row_name = "row_name"
    row_name_title = "row_name/title"
    row_name_compact = "row_name/compact"
    row_name_detailed = "row_name/detailed"
    star = "*"
    all = "all"


[docs]class DerivaModel(DerivaLogging): """ Representation of a deriva model. Is primarily used as a resource manager to group catalog operations so as to minimize network round trips. For example: ``` with DerivaModel(catalog) table = schema.create_table('MyTable',[]) table.display = 'My Nice Table' ``` """ contexts = {i for i in DerivaContext if i is not DerivaContext("all")} def __init__(self, catalog): super().__init__() self.catalog = catalog def __enter__(self): if self.catalog.nesting == 0: self.logger.debug('entering model changes %s', self.catalog_model()) self.catalog.nesting += 1 return self def __exit__(self, exc_type, exc_val, exc_tb): self.catalog.nesting -= 1 if self.catalog.nesting == 0: self.logger.debug('applying changes to model %s', self.catalog_model()) self.catalog._apply()
[docs] def model_element(self, obj): self.logger.debug('type %s', type(obj).__name__) if isinstance(obj, DerivaCatalog): m = obj.model_instance elif isinstance(obj, DerivaColumn): m = obj.column elif isinstance(obj, DerivaKey): m = obj.key elif isinstance(obj, DerivaForeignKey): m = obj.fkey elif isinstance(obj, DerivaTable): m = obj.table elif isinstance(obj, DerivaSchema): m = obj.schema if not m: raise DerivaModelError(self, 'Model not found for object {}'.format(obj)) return m;
[docs] def catalog_model(self): return self.catalog.model_instance
class DerivaACL(MutableMapping): acl_matrix = { 'DerivaCatalog': {'owner', 'create', 'select', 'insert', 'update', 'write', 'delete', 'enumerate'}, 'DerivaCatalogConfigure': {'owner', 'create', 'select', 'insert', 'update', 'write', 'delete', 'enumerate'}, 'DerivaSchema': {'owner', 'create', 'select', 'insert', 'update', 'write', 'delete', 'enumerate'}, 'DerivaTable': {'owner', 'create', 'select', 'insert', 'update', 'write', 'delete', 'enumerate'}, 'DerivaTableConfigure': {'owner', 'create', 'select', 'insert', 'update', 'write', 'delete', 'enumerate'}, 'DerivaColumn': {'owner', 'create', 'select', 'insert', 'update', 'write', 'delete', 'enumerate'}, 'DerivaForeignKey': {'owner', 'create', 'select', 'insert', 'update', 'write', 'delete', 'enumerate'} } def __init__(self, obj): self._catalog = obj.catalog self._acls = obj.get_acls() self._obj_type = obj.object_type() def __setitem__(self, key, value): # TODO This needs to properly work with subclassing if key not in DerivaACL.acl_matrix[self._obj_type]: raise DerivaCatalogError(self, msg='Invalid ACL: {}'.format(key)) with DerivaModel(self._catalog) as m: self._acls[key] = value def __delitem__(self, key): with DerivaModel(self._catalog): self._acls.pop(key) def __getitem__(self, key): return self._acls[key] def __iter__(self): return iter(self._acls) def __len__(self): return len(self._acls) def __repr__(self): return self._acls.__repr__() def __str__(self): return self._acls.__str__() @property def value(self): return self._acls def validate(self, obj): keys = {i for i in self._acls.keys()} if keys <= DerivaACL.acl_matrix[self._obj_type]: return True else: logger.info('Invalid ACL: %s %s', obj.name, self) return False class DerivaACLBinding(MutableMapping): acl_binding_matrix = { 'DerivaTable': {'owner', 'create', 'select', 'update', 'write', 'delete', 'enumerate'}, 'DerivaColumn': {'owner', 'create', 'select', 'update', 'write', 'delete', 'enumerate'}, 'DerivaForeignKey': {'owner', 'insert', 'update'} } def __init__(self, obj): self._catalog = obj.catalog self._acl_bindings = obj.get_acl_bindings() def __setitem__(self, key, value): with DerivaModel(self._catalog) as m: self._acl_bindings[key] = value def __delitem__(self, key): with DerivaModel(self._catalog): self._acl_bindings.pop(key) def __getitem__(self, key): return self._acl_bindings[key] def __iter__(self): return iter(self._acl_bindings) def __len__(self): return len(self._acl_bindings) def __repr__(self): return self._acl_bindings.__repr__() def __str__(self): return self._acl_bindings.__str__() @property def value(self): return self._acl_bindings def validate(self, obj): if isinstance(self._acl_bindings, dict): return True else: logger.info('Invalid acl_binding %s %s', obj.name, self) return False class DerivaAnnotations(MutableMapping): """ Class used to represent an annotation. Main reason for this class is to make sure apply function is called when needed. """ annotation_tags = {v for v in chaise_tags.values()} def __init__(self, obj): self.catalog = obj.catalog m = DerivaModel(self.catalog) self.annotations = m.model_element(obj).annotations def __setitem__(self, key, value): if key not in DerivaAnnotations.annotation_tags: raise DerivaCatalogError(self, msg='Unknow annotation tag: {}'.format(key)) with DerivaModel(self.catalog): self.annotations[key] = value def __delitem__(self, key): with DerivaModel(self.catalog): self.annotations.pop(key) def __getitem__(self, key): return self.annotations[key] def __iter__(self): return iter(self.annotations) def __len__(self): return len(self.annotations) def __repr__(self): return self.annotations.__repr__() def __str__(self): return self.annotations.__str__() def validate(self, obj): rval = True for t, a in self.annotations.items(): if t not in chaise_tags.values(): logger.info('Invalid annotation tag %s', t) rval = False if t == chaise_tags.display: rval = obj.validate_display() and rval if t == chaise_tags.visible_columns: if isinstance(obj, DerivaTable): rval = obj.visible_columns.validate() and rval else: logger.info('visible_columns annotation on non-table element %s', obj.name) rval = False if t == chaise_tags.visible_foreign_keys: if isinstance(obj, DerivaTable): rval = obj.visible_foreign_keys.validate() and rval else: logger.info('visible_foreign_keys annotation on non-table element %s', obj.name) if t == chaise_tags.foreign_key: pass if t == chaise_tags.table_display: if isinstance(obj, DerivaTable): rval = obj.validate_table_display() and rval else: logger.info('table_display annotation on non-table element %s', obj.name) rval = False if t == chaise_tags.column_display: pass if t == chaise_tags.asset: pass if t == chaise_tags.bulk_upload: pass if t == chaise_tags.export: pass if t == chaise_tags.chaise_config: pass return rval class DerivaCore(DerivaLogging): def __init__(self, catalog): self.catalog = catalog super().__init__() @property def annotations(self): """ Get/Set a Deriva Annotation. :return: """ return DerivaAnnotations(self) @annotations.setter def annotations(self, value): with DerivaModel(self.catalog): m.model_element(self).annotations.clear() m.model_element(self).annotations.update(value) @property def acls(self): """ Get/Set a Deriva ACL. :return: """ return DerivaACL(self) @acls.setter def acls(self, value): with DerivaModel(self.catalog) as m: m.model_element(self).acls.clear() m.model_element(self).acls.update(value) @property def acl_bindings(self): """ Get/Set a Deriva ACL. :return: """ if self.object_type() not in DerivaACLBinding.acl_binding_matrix: raise DerivaCatalogError(self, msg='ACL Bindings not defined for {}'.format(type(self).__name__)) return DerivaACLBinding(self) @acl_bindings.setter def acl_bindings(self, value): if self.object_type() not in DerivaACLBinding.acl_binding_matrix: raise DerivaCatalogError(self, msg='ACL Bindings not defined for {}'.format(type(self).__name__)) with DerivaModel(self.catalog) as m: m.model_element(self).acl_bindings.clear() m.model_element(self).acl_bindings.update(value) def get_acls(self): """ Get dictionary form of ACL :return: """ with DerivaModel(self.catalog) as m: return m.model_element(self).acls def get_acl_bindings(self): """ Get dictionary from of acl_bindings :return: """ with DerivaModel(self.catalog) as m: return m.model_element(self).acl_bindings def object_type(self): if isinstance(self, DerivaCatalog): obj_type = 'DerivaCatalog' elif isinstance(self, DerivaSchema): obj_type = 'DerivaSchema' elif isinstance(self, DerivaTable): obj_type = 'DerivaTable' elif isinstance(self, DerivaColumn): obj_type = 'DerivaColumn' elif isinstance(self, DerivaKey): obj_type = 'DerivaKey' elif isinstance(self, DerivaForeignKey): obj_type = 'DerivaForeignKey' else: obj_type = type(self).__name__ return obj_type
[docs]class DerivaCatalog(DerivaCore): """ A Dervia catalog. Operations on the catalog will alter both the ERMrest service as well as the annotations used by Chaise. """ def __init__(self, host, scheme='https', catalog_id=1, ermrest_catalog=None): """ Initialize a DerivaCatalog. :param host: Name of the server hosting the deriva catalog service :param scheme: Scheme to be used for connecting to the host, defaults to https :param catalog_id: The identifer for the catalog in the server. Is an integer """ self.nesting = 0 super().__init__(self) self.ermrest_catalog = ( ermrest_catalog if ermrest_catalog else ErmrestCatalog(scheme, host, catalog_id, credentials=get_credential(host)) ) self.model_instance = self.ermrest_catalog.getCatalogModel() self.model_map = {} self._map_model() def __str__(self): return '\n'.join([i for i in self.schemas]) def __getitem__(self, schema_name): return self.schemas.__getitem__(schema_name) def __iter__(self): return self.schemas.__iter__() def __contains__(self, item): return self.schemas.__contains__(item) def _repr_html_(self): return ( '<b>Catalog: {}</b><br>'.format(self.name) + tabulate.tabulate( [[i.name, i.comment] for i in self.schemas.values()], headers=['Schema Name', 'Comment'], tablefmt="html", showindex=True, stralign='left') ) @property def host(self): """ Get catalog host. :return: Hostname of the current catalog """ return urlparse(self.ermrest_catalog.get_server_uri()).hostname @property def catalog_id(self): """ Get catalog id :return: catalog identifier """ return self.ermrest_catalog.catalog_id @property def server_uri(self): """ URI for the catalog server :return: server uri """ return self.ermrest_catalog.get_server_uri() @property def schemas(self): """ Return an interable for the schemas contained in the the catalog. The return value can be indexed by schema name, or iterated over. :return: """ return {k: self.model_map[v] for k, v in self.model_instance.schemas.items()} @property def navbar_menu(self): """ Get/Set the navigation bar menu. :return: """ return self.annotations[chaise_tags.chaise_config]['navbarMenu'] @navbar_menu.setter def navbar_menu(self, value): if not isinstance(value, dict): raise ValueError('Menu must be a dictionary') if chaise_tags.chaise_config not in self.annotations: self.annotations[chaise_tags.chaise_config] = {'navbarMenu': value} else: self.annotations[chaise_tags.chaise_config]['navbarMenu'] = value @property def bulk_upload(self): """ Get/Set the navigation bar menu. :return: """ return self.annotations[chaise_tags.bulk_upload] @bulk_upload.setter def navbar_menu(self, value): if not isinstance(value, dict): raise ValueError('Menu must be a dictionary') self.annotations[chaise_tags.bulk_upload] = value @property def name(self): return self.model_instance.annotations.get(chaise_tags.catalog_config, {'name':'unknown'})['name'] def _apply(self): """ Push any pending annotation updates to the server. Should not be need to be called except when things get messed up. :return: """ self.logger.debug('%s', self.model_instance) self.model_instance.apply()
[docs] def describe(self): print(self)
[docs] def refresh(self): """ Refresh the any cached model values from the server. :return: """ assert (self.nesting == 0) logger.debug('Refreshing model') server_url = urlparse(self.ermrest_catalog.get_server_uri()) catalog_id = server_url.path.split('/')[-1] self.ermrest_catalog = ErmrestCatalog(server_url.scheme, server_url.hostname, catalog_id, credentials=get_credential(server_url.hostname)) self.model_instance = self.ermrest_catalog.getCatalogModel() self.model_map = {} self._map_model()
[docs] def getPathBuilder(self): return self.ermrest_catalog.getPathBuilder()
[docs] def schema(self, schema_name): return self.model_map[self.model_instance.schemas[schema_name]]
[docs] def create_schema(self, schema_name, comment=None, acls={}, annotations={}): """ Create a new schema in this catalog. :param schema_name: The name of the schema :param comment: A comment for the schema :param acls: ACLs for the schema :param annotations: Schema annotations. :return: A DerivaSchema object """ self.logger.debug('name: %s', schema_name) try: s = self.model_instance.create_schema(em.Schema.define( schema_name, comment=comment, acls=acls, annotations=annotations ) ) except ValueError: raise DerivaCatalogError(self, 'Schema %s already exists'.format(schema_name)) self.model_map[s] = DerivaSchema(self, s) return self.schema(schema_name)
[docs] def get_groups(self): if chaise_tags.catalog_config in self.annotations: return self.annotations[chaise_tags.catalog_config]['groups'] else: raise DerivaCatalogError(self, msg='Attempting to configure table before catalog is configured')
def _map_model(self): """ :return: """ for s in self.model_instance.schemas.values(): self.model_map[s] = DerivaSchema(self, s)
[docs] def validate(self): """ Validate all of the objects in the catalog. :return: """ rval = self.annotations.validate(self) rval = self.acls.validate(self) and rval for s in self.schemas: logger.info('Validating %s', s.name) # TODO Validate schema attributes. rval = s.validate() and rval return rval
[docs] def validate_display(self): # TODO impliment pass
[docs] def rename_visible_columns(self, column_map, validate=False): for s in self: for t in s: try: t.visible_columns = t.visible_columns.rename_columns(column_map, validate=validate) except DerivaSourceError: pass
[docs]class DerivaSchema(DerivaCore): def __init__(self, catalog, schema): super().__init__(catalog) self.schema_name = schema.name self.schema = schema self._map_model() def __str__(self): return '\n'.join([t for t in self.tables]) def __getitem__(self, table_name): return self.tables.__getitem__(table_name) def __iter__(self): return self.tables.__iter__() def __contains__(self, table_name): self.tables.__contains__(table_name) def _repr_html_(self): return ( '<b>Schema: {}</b><br>'.format(self.name) + tabulate.tabulate( [[i.name, i.comment] for i in self.tables], headers=['Table Name', 'Comment'], tablefmt="html", showindex=True, stralign='left') ) @property def name(self): return self.schema.name @property def comment(self): return self.schema.comment @comment.setter def comment(self, value): with DerivaModel(self.catalog): self.schema.comment = value @property def tables(self): return { k: self.catalog.model_map[v] for k, v in self.schema.tables.items()} @property def display(self): return self.annotations[chaise_tags.display] @display.setter def display(self, value): self.annotations[chaise_tags.display] = value def _map_model(self): """ :return: True if all values are valid. """ for t in self.schema.tables.values(): self.catalog.model_map[t] = DerivaTable(self.catalog, t) def _create_table(self, table_def): with DerivaModel(self.catalog): t = self.schema.create_table(table_def) table = self.catalog.model_map[t] = DerivaTable(self.catalog, t) table.deleted = False # Table may have been previously been deleted. return table
[docs] def describe(self): print(self)
[docs] def drop(self): self.schema.drop() del self.catalog.model_map[self.schema]
[docs] def table(self, table_name): """ Return a DerivaTable object for the named table. :param table_name: :return: """ return self._catalog.modelmap[self.schema.tables[table_name]]
[docs] def create_table(self, table_name, column_defs, key_defs=[], fkey_defs=[], comment=None, acls={}, acl_bindings={}, annotations={}, default_config=True): """ Create a new table from the provided arguments. :param table_name: The name of the new table to be created. :param column_defs: :param key_defs: :param fkey_defs: :param comment: :param acls: :param acl_bindings: :param annotations: :param default_config: :return: """ self.logger.debug('table_name: %s', table_name) column_names = [c['name'] for c in column_defs] def update_key_name(tname, k): cannonical_name = '_'.join([ tname, '_'.join([c for c in column_names if c in k['unique_columns']]), 'key' ]) return {**k,'names': [[self.name, cannonical_name]]} if k['names'] == [] else k def update_fkey_name(tname, fk): key_columns = [c['column_name'] for c in fk['foreign_key_columns']] cannonical_name = '_'.join([ tname, '_'.join([c for c in column_names if c in key_columns]), 'fkey' ]) return {**fk, 'names': [[self.name, cannonical_name]]} if fk['names'] == [] else fk with DerivaModel(self.catalog): table = self._create_table( em.Table.define( table_name, column_defs, key_defs=[update_key_name(table_name, k) for k in key_defs], fkey_defs=[update_fkey_name(table_name, fk) for fk in fkey_defs], comment=comment, acls=acls, acl_bindings=acl_bindings, annotations=annotations) ) for fkey in table.foreign_keys: _, _, inbound_sources = fkey.referenced_table.sources() fkey.referenced_table.visible_foreign_keys.insert_sources(inbound_sources) column_sources, outbound_sources, inbound_sources = table.sources(merge_outbound=True) table.visible_columns.insert_context(DerivaContext('*'), column_sources) table.visible_columns.insert_context(DerivaContext('entry'), column_sources) table.visible_foreign_keys.insert_context(DerivaContext('*'), inbound_sources) return table
[docs] def create_asset(self, table_name, column_defs=[], key_defs=[], fkey_defs=[], comment=None, acls={}, acl_bindings={}, annotations={}, file_pattern='.*', extensions=[]): """ Create an asset table. This function creates a new table that has the standard asset columns in addition to columns provided by the caller. :param table_name: :param column_defs: :param key_defs: :param fkey_defs: :param comment: :param acls: :param acl_bindings: :param annotations: :return: A DerivaTable object """ self.logger.debug('table_name: %s', table_name) # Now that we know the table name, patch up the key and fkey defs to have the correct name. with DerivaModel(self.catalog): proto_table = namedtuple('ProtoTable', ['catalog', 'schema', 'schema_name', 'name', 'columns']) for k in key_defs: k.update_table(proto_table(self.catalog, self.schema, self.name, table_name, column_defs)) for k in fkey_defs: k.update_table(proto_table(self.catalog, self.schema, self.name, table_name, column_defs)) asset_table = self._create_table(em.Table.define_asset( self.schema_name, table_name, key_defs=[key.definition() if isinstance(key, DerivaKey) else key for key in key_defs], fkey_defs=[fkey.definition() if type(fkey) is DerivaForeignKey else fkey for fkey in fkey_defs], column_defs=[col.definition() for col in column_defs], annotations=annotations, acls=acls, acl_bindings=acl_bindings, comment=comment) ) asset_table.columns['URL'].annotations[chaise_tags.column_display] = \ {'*': {'markdown_pattern': '[**{{URL}}**]({{{URL}}})'}} asset_table.columns['Filename'].annotations[chaise_tags.column_display] = \ {'*': {'markdown_pattern': '[**{{Filename}}**]({{{URL}}})'}} asset_table.columns['Length'].annotations[chaise_tags.generated] = True asset_table.columns['MD5'].annotations[chaise_tags.generated] = True asset_table.columns['URL'].annotations[chaise_tags.generated] = True asset_table._create_upload_spec(file_pattern, extensions) return asset_table
[docs] def create_vocabulary(self, vocab_name, curie_template, uri_template='/id/{RID}', column_defs=[], key_defs=[], fkey_defs=[], comment=None, acls={}, acl_bindings={}, annotations={} ): """ Create a vocabulary table that can be used to reference externally defined terms. This funcion provides the option to add additional columns to the table, as well as set access control and additional table annotations. :param vocab_name: Name of the vocabulary table to be created. :param curie_template: Default shortform name for the term, in the form of 'NAMESPACE:{RID}', :param uri_template: :param column_defs: Additional columns to be added to the vocabulary table. :param key_defs: :param fkey_defs: :param comment: Comment string. :param acls: :param acl_bindings: :param annotations: :return: A DerivaTable object """ return self._create_table( em.Table.define_vocabulary(vocab_name, curie_template, uri_template=uri_template, column_defs=column_defs, key_defs=key_defs, fkey_defs=fkey_defs, comment=comment, acls=acls, acl_bindings=acl_bindings, annotations=annotations) )
[docs] def validate(self): """ Validate the annotations associated with the tables in this schema. Look at all visable column, visible foreign key, display and other configurable fields associated with the catalog and check to ensure they use valid column and key definitions. Some limited syntax checking is done as well. Throws an exception if an invalid value is found. :return: True if all values are valid. """ rval = self.validate_display() rval = self.acls.validate(self) and rval rval = self.annotations.validate(self) and rval for t in self.tables: logger.info('Validating table %s', t.name) rval = t.validate() and rval return rval
[docs] def validate_display(self): #TODO Finish.... return True
class DerivaColumnMap(DerivaLogging, OrderedDict): """ The DerivaColumnMap class is used to define a mapping between columns. This mapping is used to define how columns are renamed. A column map is an ordered dictionary that reflects the order that the columns should be added to the table. The key of the dictionary is the *current* column name. The value may take several forms: * A string which is the new name of the column. In this case, all attributes of the column are preserved. * A Deriva DerivaColumnDef or ermrest_model Column object, which provides all of the values for the new column. * A dictionary that can specify standard column attributes (name, type, nullok, default, comment, acls, acl_bindings). In addition the attribute **fill* can be provided, which is a value to be used to fill in missing values when mapping a column that has nullok=True to a column that has nullok=Falsue """ def __init__(self, table, column_map, dest_table): self.table = table self.dest_table = dest_table super().__init__() self.logger.debug('table: %s dest_table: %s column_map: %s ', table.name if table is None else None, dest_table.name if dest_table else None, column_map) self.update(self._normalize_column_map(table, column_map, dest_table)) def _normalize_column_map(self, table, column_map, dest_table): """ Put a column map into a standard format which is a dictionary in the form of {source-name: DerivaColumnDef} where source-name can be in the form of a column or key name. A simplified format which is just the SrcCol:DestCo is converted. dest_table is used to specify the target table of the mapping if it is not included as part of the DerviaColumnSpec. Entries for each column in columns are also added. Once the column_map is normalized, mappings for keys and foreign keys are added based on the columns that are being mapped. We use ordered dictionaries to make the order of the columns consistant with the order of the columns, then the order of the column map. . :param column_map: :param dest_table: :return: """ def _normalize_column(k, v): """ The form of a column can be one of: column_name: DerivaColumnDef|em.Column new_column: typename|dict column: new_name These are all put into a standard form of name: DerivaColumnDef, with the table attribute set to dest_table if provided. :param k: Name of the column being mapped :param v: Either the name of the new colu\n or a dictionary of new column attributes. :return: """ self.logger.debug('column: %s', k) if not type(k) is str: k = k.name if isinstance(v, (DerivaColumn, DerivaKey, DerivaForeignKey)): return k, v try: # Get the existing column definition if it exists. col = table[k] # Get current definition for the column name = v if type(v) is str else v.get('name', k) # Name may be provided in v, if not use k. except DerivaCatalogError: # Column is new, so create a default definition for it. If value is a string, then its the type. col = DerivaColumn(**{'define': True, 'name': k, 'table': dest_table, **({'type': v} if type(v) is str else v)}) name = k # We have a column remap in the form of col: new_name or col: dictionary # Create a proper dictionary spec for the value adding in a table entry in the case if needed. args = {'define': True, 'name': name, 'table': dest_table, 'type': col.type, 'nullok': col.nullok, 'default': col.default, 'fill': col.fill, 'comment': col.comment, 'acls': col.acls, 'acl_bindings': col.acl_bindings} return k, DerivaColumn(**args) # Go through the columns in order and add map entries, converting any map entries that are just column names # or dictionaries to DerivaColumnDefs column_map = OrderedDict(_normalize_column(k, v) for k, v in column_map.items()) # Collect up all of the column name maps. column_name_map = OrderedDict((k, v.name) for k, v in column_map.items()) self.logger.debug('column_map: %s column_name_map %s', column_map, column_name_map) self.logger.debug('keys: %s \nkey_columns: %s \n mapped_keys %s \n%s \nfkeys %s \n mapped fkeys %s', [key.name for key in table.keys], [[c.name for c in key.columns] for key in table.keys], [[column_name_map.get(c.name, c.name) for c in key.columns] for key in table.keys], [fkey.name for fkey in table.foreign_keys], [[c.name for c in key.columns] for key in table.foreign_keys], [[column_name_map.get(c.name, c.name) for c in fkey.columns] for fkey in table.foreign_keys], ) # Get new key and fkey definitions by mapping to new column names. column_map.update( {key.name: DerivaKey(define=True, table=dest_table, columns=[column_name_map.get(c.name, c.name) for c in key.columns], comment=key.comment, annotations=key.annotations ) for key in table.keys if table._key_in_columns(column_name_map.keys(), key.columns, rename=(table == dest_table)) } ) column_map.update( { fkey.name: DerivaForeignKey(define=True, table=dest_table, columns=[column_name_map.get(c.name, c.name) for c in fkey.columns], dest_table=fkey.referenced_table, dest_columns=[c.name for c in fkey.referenced_columns], comment=fkey.comment, acls=fkey.acls, acl_bindings=fkey.acl_bindings ) for fkey in table.foreign_keys if table._key_in_columns(column_name_map.keys(), fkey.columns, rename=(table == dest_table)) } ) self.logger.debug('normalized column map %s', {k:v.name for k,v in column_map.items()}) return column_map def get_columns(self): return OrderedDict((k, v) for k, v in self.items() if isinstance(v, DerivaColumn)) def get_keys(self): return OrderedDict((k, v) for k, v in self.items() if isinstance(v, DerivaKey)) def get_foreign_keys(self): return OrderedDict((k, v) for k, v in self.items() if isinstance(v, DerivaForeignKey)) def get_names(self): field = 'name' return OrderedDict((k, getattr(v, field)) for k, v in self.items() if getattr(v, field)) class DerivaVisibleSources(DerivaLogging): def __init__(self, table, tag): super().__init__() self.table = table self.tag = tag self.logger.debug('table: %s tag: %s', table.name, tag) def __str__(self): return pprint.pformat(self.table.annotations[self.tag]) def __repr__(self): return self.table.annotations[self.tag].__repr__() def __getitem__(self, item): return self.table.annotations[self.tag][item] def __setitem__(self, instance, value): with DerivaModel(self.table.catalog): self.table.annotations[self.tag].update({instance: value}) def __delitem__(self, item): del self.table.annotations[self.tag][item] def __iter__(self): return self.table.annotations[self.tag].__iter__() def to_json(self): pass def validate(self): if self.tag not in self.table.annotations: return True rval = True for c, l in self.table.annotations[self.tag].items(): try: DerivaContext(c) # Make sure that we have a valid context value. except ValueError: rval = False logger.info('Invalid context name %s', c) if c == 'filter': if self.tag == chaise_tags.visible_foreign_keys: rval = False logger.info('Filter context not allowed in visible_foreign_key annotation.') continue else: try: l = l['and'] except TypeError: logger.info('Invalid filter specification %s', l) rval = False continue for j in l: try: DerivaSourceSpec(self.table, j) except DerivaCatalogError as e: logger.info('Invalid source specification %s %s', self.tag, e.msg) rval = False return rval def clean(self, dryrun=False): new_vs = {} for c, l in self.table.annotations[self.tag].items(): new_context = [] if c == 'filter': l = l['and'] for j in l: try: new_context.append(DerivaSourceSpec(self.table, j).spec) except DerivaCatalogError: print("Removing {} {}".format(c, j)) new_vs.update({c: {'and': new_context} if c == 'filter' else new_context}) if not dryrun: with DerivaModel(self.table.catalog): self.table.annotations[self.tag] = new_vs @staticmethod def _normalize_positions(positions): """ A position can be in the form: {context: {key:list}, context: {key:list} ...} {key:list, ...} {context,context} where context can be all. Convert these into a standard format: { context: {key:list} ...} :param positions: position list :return: normalized position. """ def remove_new_columns(plist): return OrderedDict((k, v) for k, v in plist.items() if k != v[0]) # If just a set of contexts, convert to normal form. if isinstance(positions, set) or positions == {}: return OrderedDict((DerivaContext(j), {}) for i in positions for j in (DerivaModel.contexts if DerivaContext(i) is DerivaContext("all") else [i])) try: # Map all contexts to enum values... return OrderedDict((DerivaContext(j), remove_new_columns(v)) for k, v in positions.items() for j in (DerivaModel.contexts if DerivaContext(k) is DerivaContext("all") else [k])) except ValueError: # Keys are not valid context name, so we must have keylist dictionary. return OrderedDict((k, remove_new_columns(positions)) for k in DerivaModel.contexts) def insert_context(self, context, sources=[], replace=False): # Map over sources and make sure that they are all ok before we insert... if context == 'filter': if sources == []: sources = {'and': []} else: sources = {'and': [DerivaSourceSpec(self.table, s).spec for s in sources['and']]} else: sources = [DerivaSourceSpec(self.table, s).spec for s in sources] self.logger.debug('context: %s %s sources: %s', self.tag, context, sources) # check for valid context. context = DerivaContext(context) if self.tag not in self.table.annotations: self.table.annotations[self.tag] = {context.value: sources} elif context.value not in self.table.annotations[self.tag] or replace: self.table.annotations[self.tag][context.value] = sources return def insert_sources(self, source_list, positions={}): """ Insert a set of columns into a source list. If column is included in a foreign-key, make source an outgoing spec. :param source_list: A column map which will indicate the sources to be included. :param source_list: A column map which will indicate the sources to be included. :param positions: where it insert the so :return: """ positions = self._normalize_positions({'all'} if positions == {} else positions) self.logger.debug('positions: %s', positions) self.logger.debug('table: %s sources: %s', self.table.name, [i.spec for i in source_list]) with DerivaModel(self.table.catalog): # Identify any columns that are references to assets and collect up associated columns. skip_columns, assets = [], [] for col in [i.column_name for i in source_list]: self.logger.debug('source col %s', col) if col == 'pseudo_column': continue if chaise_tags.asset in self.table[col].annotations: assets.append(col) skip_columns.extend(self.table[col][chaise_tags.asset].values()) sources = {} try: s = self.table.annotations[self.tag] except KeyError: s = self.table.annotations[self.tag] = {} for context, context_list in s.items(): if DerivaContext(context) not in positions.keys(): continue if context == 'filter': context_list = context_list['and'] # Get list of column names that are in the spec, mapping back simple FK references. self.logger.debug('source_specs %s %s', self.table.name, [i.spec for i in source_list]) self.logger.debug('context %s %s', context, [i for i in context_list]) self.logger.debug('referenced_by %s', [i.name for i in self.table.referenced_by]) source_specs = [DerivaSourceSpec(self.table, i, validate=False) for i in source_list] new_context = [ DerivaSourceSpec(self.table, i, validate=False, src_tag=self.tag).spec for i in context_list ] self.logger.debug('getting source names %s %s', source_specs, new_context) for source in source_specs: self.logger.debug('source: %s %s', source.spec, source.spec in new_context) if (context == 'entry' and source.column_name in skip_columns) or source.spec in new_context: # Skip over asset columns in entry context and make sure we don't have repeat column specs. continue new_context.append(source.spec) sources[context] = {'and': new_context} if context == 'filter' else new_context self.logger.debug('updated sources: %s', pprint.pformat(sources)) sources = self._reorder_sources(sources, positions) self.logger.debug('reordered sources: source:%s',sources) # All is good, so update the visible columns annotation. self.logger.debug('updated annotations: source:%s %s', sources, pprint.pformat(self.table.annotations.get(self.tag,{}))) self.table.annotations[self.tag] = {**self.table.annotations.get(self.tag,{}), **sources} self.logger.debug('annotations updated: %s', self.table.annotations[self.tag]) def rename_columns(self, column_map, validate=False): """ Go through a list of visible specs and rename the spec, returning a new visible column spec. :param column_map: :return: """ if self.tag not in self.table.annotations: raise DerivaSourceError(self, msg='tag {} does not exist'.format(self.tag)) self.logger.debug('column_map %s %s', column_map, pprint.pformat(self.table.annotations[self.tag])) # For each context, go through the source specs and rename columns new_vc = {} for context, vc_list in self.table.annotations[self.tag].items(): if context == 'filter': vc_list = vc_list['and'] renamed_list = [ DerivaSourceSpec(self.table, i, validate=validate).rename_column(column_map) for i in vc_list ] new_vc[context] = {'and': renamed_list} if context == 'filter' else renamed_list self.logger.debug('renamed %s', new_vc) return new_vc def copy_visible_source(self, from_context): pass def make_outbound(self, column, contexts=None): """ Go through the contexts assoicated with the source list and look for a spec for column and convert this from a basic column spec to a outbound source spec. :param column: column to convert to outbound spec :param contexts: List of contexts to apply transformation to. If the empty list, then use all columns. :return: """ contexts = contexts if contexts else [] self.logger.debug('tag: %s columns: %s vc before %s', self.tag, column, self.table.annotations[self.tag]) context_names = [i.value for i in (DerivaContext if contexts == [] else contexts)] for context, vc_list in self.table.annotations[self.tag].items(): # Get list of column names that are in the spec, mapping back simple FK references. if context not in context_names: continue if context == 'filter': vc_list = vc_list['and'] for s in vc_list: # Get the spec for the current element. try: spec = DerivaSourceSpec(self.table, s, validate=False, src_tag=self.tag) except DerivaSourceError: continue if spec.column_name == column: spec = DerivaSourceSpec(self.table, s, validate=False, src_tag=self.tag) # Create a SourceSpec for the column and then convert to outbound spec. spec.make_outbound() s.update(spec.spec) self.logger.debug('done %s', self.table.annotations[self.tag]) def make_column(self, column, contexts=[], validate=True): self.logger.debug('tag: %s columns: %s vc before %s', self.tag, column, self.table.annotations[self.tag]) context_names = [i.value for i in (DerivaContext if contexts == [] else contexts)] for context, vc_list in self.table.annotations[self.tag].items(): if context == 'filter': vc_list = vc_list['and'] # Get list of column names that are in the spec, mapping back simple FK references. if context not in context_names: continue for s in vc_list: try: spec = DerivaSourceSpec(self.table, s, validate=False, src_tag=self.tag) except DerivaSourceError: # Spec is not correct.... continue if spec.column_name == column: # Create a SourceSpec for the column and then convert to outbound spec. spec.make_column(validate) s.update(spec.spec) self.logger.debug('done %s', self.table.annotations[self.tag]) def _reorder_sources(self, sources, positions): """ Reorder the columns in a visible columns specification. Order is determined by the positions argument. The form of this is a dictionary whose elements are: context: {key_column: column_list, key_column:column_list} The columns in the specified context are then reorded so that the columns in the column list follow the column in order. Key column specs are processed in order specified. The context name 'all' can be used to indicate that the order should be applied to all contexts currently in the visible_columns annotation. The context name can also be omitted an positions can be in the form of {key_column: columnlist, ...} and the context all is implied. :param sources: :param positions: :return: """ if positions == {}: return sources # Set up positions to apply to all contexts if you have {key_column: column_list} form. positions = self._normalize_positions(positions) self.logger.debug('normized positions %s', positions) new_sources = {} for context, source_list in sources.items(): deriva_context = DerivaContext(context) if deriva_context not in positions.keys(): continue if deriva_context == DerivaContext('filter'): source_list = source_list['and'] # Get the list of column names for the spec. source_names = [] for i in range(len(source_list)): name = DerivaSourceSpec(self.table, source_list[i], validate=False, src_tag=self.tag).column_name source_names.append(name + str(i) if name == 'pseudo_column' else name) self.logger.debug('source_names %s', source_names) # Now build up a map that has the indexes of the reordered columns. Include the columns in order # Unless they are in the column_list, in which case, insert them immediately after the key column. reordered_names = source_names[:] for key_col, column_list in positions[deriva_context].items(): if not (set(column_list + [key_col]) <= set(source_names)): # The column we are looking for is not in this source list. continue mapped_list = [j for i in reordered_names if i not in column_list for j in [i] + ( column_list if i == key_col else [] ) ] reordered_names = mapped_list source_list = [source_list[source_names.index(i)] for i in reordered_names] new_sources[context] = {'and': source_list} if context == 'filter' else source_list return {**sources, **new_sources} def delete_visible_source(self, columns, contexts=[]): """ Delete the named columns from a visible source list. :param columns: A list of column names. :param contexts: Names of the context to delete the sources from. :return: """ if self.tag not in self.table.annotations: return self.logger.debug('tag: %s columns: %s vc before %s', self.tag, columns, self.table.annotations.get(self.tag, None)) context_names = [i.value for i in (DerivaContext if contexts == [] else contexts)] self.logger.debug('context names %s', context_names) columns = [columns] if isinstance(columns, str) else columns for context, vc_list in self.table.annotations[self.tag].items(): # Get list of column names that are in the spec, mapping back simple FK references. if context not in context_names: continue if context == 'filter': vc_list = vc_list['and'] for col in columns: # Columns may have already been deleted, so do not validate. # Columns may have already been deleted, so do not validate. col_spec = DerivaSourceSpec(self.table, col, validate=False) self.logger.debug('checking %s %s %s', col, col_spec, vc_list) if col_spec.spec in vc_list: self.logger.debug('deleting %s', col) vc_list.remove(col_spec.spec) self.logger.debug('vc after %s', self.table.annotations[self.tag]) def reorder_visible_source(self, positions): vc = self._reorder_sources(self.table.annotations[self.tag], positions) self.table.annotations[self.tag].update({**self.table.annotations[self.tag], **vc}) class DerivaVisibleColumns(DerivaVisibleSources): def __init__(self, table): super().__init__(table, chaise_tags.visible_column) class DerivaVisibleForeignKeys(DerivaVisibleSources): def __init__(self, table): super().__init__(table, chaise_tags.visible_foreign_keys) class DerivaSourceSpec(DerivaLogging): def __init__(self, table, spec, validate=True, src_tag=chaise_tags.visible_columns): super().__init__() self.logger.debug('table: %s spec: %s', table.name, spec) self.table = table self.tag = src_tag if isinstance(spec, DerivaSourceSpec): self.spec = copy.deepcopy(spec.spec) self.tag = spec.tag else: self.spec = self._normalize_source_spec(spec, src_tag) self.logger.debug('normalized: %s', self.spec) if validate: self.validate() try: self.column_name = self._referenced_columns() except DerivaSourceError: if validate: raise else: self.column_name = 'pseudo_column' self.logger.debug('initialized: table %s spec: %s', table.name, self.spec) def __str__(self): return pprint.pformat(self.spec) @property def source(self): return self.spec['source'] @source.setter def source(self, value): self.spec['source'] = value def source_type(self): if type(self.source) is str: return 'column' elif isinstance(self.source, (list, tuple)) and len(self.source) == 2: if 'inbound' in self.source[0]: return 'inbound' elif 'outbound' in self.source[0]: return 'outbound' return None def _referenced_columns(self): """ Return the column name that is referenced in the source spec. If the spec is a a path then return the value pseudo_column. If it is a single This will require us to look up the column behind an outbound foreign key reference. If :return: """ if type(self.source) is str: return self.source elif len(self.source) == 2 and 'outbound' in self.source[0]: t = self.source[0]['outbound'][1] try: fk_cols = self.table.foreign_key[t].columns except DerivaForeignKeyError: raise DerivaSourceError(self, msg='Outbound source with non-existent foreign key: {}'.format(t)) return list(fk_cols)[0].name if len(fk_cols) == 1 else None else: return 'pseudo_column' def validate(self): """ Check the values of a normalized spec and make sure that all of the columns and keys in the source exist. :return: """ spec = self._normalize_source_spec(self.spec, None) source_entry = spec['source'] if type(spec['source']) is str: if spec['source'] not in [i.name for i in self.table.columns]: raise DerivaSourceError(self, 'Invalid source entry {}'.format(spec)) else: # We have a path of FKs so follow the path to make sure that all of the constraints line up. path_table = self.table for c in source_entry[0:-1]: if 'inbound' in c and len(c['inbound']) == 2: self.logger.debug('validating inbound table: %s: context: %s refererenced_by: %s', path_table.name, c, [i.name for i in path_table.referenced_by]) path_table = path_table.referenced_by[c['inbound'][1]].table elif 'outbound' in c and len(c['outbound']) == 2: self.logger.debug('validating outbound %s: %s', path_table.name, c) path_table = path_table.foreign_key[c['outbound'][1]].referenced_table else: raise DerivaSourceError(self, 'Invalid source entry {}'.format(c)) try: if not path_table.columns[source_entry[-1]]: raise DerivaSourceError(self, 'Invalid source entry {}'.format(source_entry[-1])) except (TypeError, AttributeError): raise DerivaSourceError(self, 'Invalid source entry {}'.format(source_entry[-1])) return spec def _normalize_source_spec(self, spec, src_tag): """ Convert a source spec into a uniform form using the new source notations. :param spec: :return: """ self.logger.debug('%s %s', self.table.name, spec) if type(spec) is str: if spec in [c.name for c in self.table.columns]: spec = {'source': spec} elif spec in self.table.foreign_keys: # TODO this is not right spec = {'source': [{'outbound': (self.table.schema_name, spec)}, 'RID']} elif spec in self.table.referenced_by: spec = {'source': [{'inbound': (self.table.schema_name, spec)}, 'RID']} else: raise DerivaSourceError(self, 'Invalid source entry {}'.format(spec)) # Check for old style foreign key notation and turn into inbound or outbound source. elif isinstance(spec, (tuple, list)) and len(spec) == 2: if spec[1] in self.table.keys: return {'source': next(iter(self.table.keys[spec[1]].columns)).name} elif spec[1] in self.table.foreign_keys: return {'source': [{'outbound': tuple(spec)}, 'RID']} elif spec[1] in self.table.referenced_by: return {'source': [{'inbound': tuple(spec)}, 'RID']} else: default_direction = 'inbound' if src_tag == chaise_tags.visible_foreign_keys else 'outbound' return {'source': [{default_direction: tuple(spec)}, 'RID']} else: # We have a spec that is already in source form. # every element of pseudo column source except the last must be either an inbound or outbound spec. try: if not (isinstance(spec['source'], str) or all(map(lambda x: len(x.get('inbound', x.get('outbound',[]))) == 2, spec['source'][0:-1]))): raise DerivaSourceError(self, 'Invalid source entry {}'.format(spec)) except (TypeError, KeyError): raise DerivaSourceError(self, 'Invalid source entry {}'.format(spec)) return spec def rename_column(self, column_map): self.logger.debug('spec: %s map %s ', self.spec, column_map) if self.column_name != 'pseudo_column': if self.column_name in column_map: # See if the column is used as a simple foreign key. try: fkey_name = self.table.foreign_keys[self.column_name].name self.logger.debug('column %s foreign key name %s %s', self.source, fkey_name, fkey_name in column_map) except DerivaCatalogError: fkey_name = None # If we are renaming a column, and it is used in a foreign_key, then make the spec be a outbound # source using the FK. Otherwise, just rename the column in the spec if needed. if fkey_name and fkey_name in column_map: return {'source': [ {'outbound': (column_map[fkey_name].referenced_table.schema_name, column_map[fkey_name].name)}, 'RID' ] } elif self.column_name in column_map: return {'source': column_map[self.column_name].name} else: self.logger.debug('mapping source , to %s ', {**self.spec, **{'source': column_map[self.source].name}}) return {**self.spec, **{'source': column_map[self.source].name}} else: return self.spec else: # We have a list of inbound/outbound specs. Go through the list and replace any names that are in the map. self.logger.debug('Looking for rename in source path: %s', self.source[:-1]) source = [] for s in self.source[:-1]: direction = next(iter(s)) # inbound or outbound key_schema, key_name = next(iter(s.values())) if key_schema == column_map.table.schema_name and key_name in column_map: source.append({direction: (key_schema, column_map[key_name].name)}) else: source.append(s) source.append(self.source[-1:]) return {**self.spec, **{'source': source}} def make_outbound(self, validate=True): if self.source_type() is 'column': col_name = self.table.foreign_key[self.source].name self.spec.update(self._normalize_source_spec([self.table.schema_name, col_name], self.tag)) if validate: self.validate() return self def make_column(self, validate=True): """ Convert a outbound spec on a foreign key to a column spec. :param validate: :return: """ # Get the fk_name from the spec and then change spec to be the key column. if self.source_type() is 'outbound': fk_name = self.source[0]['outbound'][1] self.spec.update(self._normalize_source_spec( next(iter(self.table.foreign_keys[fk_name].columns)).name, self.tag) ) if validate: self.validate() return self
[docs]class DerivaColumn(DerivaCore): """ Class that represents columns in Deriva catalog. """ def __init__(self, catalog, column): """ :param table: DerivaTable object, or None if the table is being defined along with the class. :param name: Name of the column. If a em.Column is passed in as a name, then its name is used. """ super().__init__(catalog) self.column = column self.catalog.model_map[column] = self def __str__(self): return '\n'.join( [ '{}: {}'.format(self.name, self.type.typename), '\tnullok: {} default: {}'.format(self.nullok, self.default), '\tcomment: {}'.format(self.comment), '\tacls: {}'.format(self.acls), '\tacl_bindings: {}'.format(self.acl_bindings) ] )
[docs] @classmethod def define(cls, name, type, nullok=True, default=None, fill=None, comment=None, acls={}, acl_bindings={}, annotations={}): return em.Column.define(name, em.builtin_types[type], nullok=nullok, default=default, comment=comment, acls=acls, acl_bindings=acl_bindings, annotations=annotations)
@property def name(self): return self.column.name @property def type(self): return self.column.type @type.setter def type(self, type_value): if isinstance(self.column, DerivaColumn._DerivaColumnDef): self.column.type = em.builtin_types[type_value] else: raise DerivaCatalogError(self, 'Cannot alter defined column type') @property def nullok(self): return self.column.nullok @nullok.setter def nullok(self, nullok): if isinstance(self.column, DerivaColumn._DerivaColumnDef): self.column.nullok = nullok else: raise DerivaCatalogError(self, 'Cannot alter nullok in defined column') @property def default(self): return self.column.default @property def fill(self): return self.column.fill if isinstance(self.column, DerivaColumn._DerivaColumnDef) else None @property def comment(self): return self.column.comment @comment.setter def comment(self, comment): self.column.comment = comment @property def display(self): return self.annotations[chaise_tags.display] @display.setter def display(self, value): self.annotations[chaise_tags.display] = value @property def column_display(self): return self.annotations[chaise_tags.column_display] @column_display.setter def column_display(self, value): self.annotations[chaise_tags.column_display] = value
[docs] def get_acls(self): return self.column.acls
[docs] def get_acl_bindings(self): return self.column.acl_bindings
[docs] def update_table(self, table): if self.table: return self.catalog = table.catalog self.table = table self.schema = self.catalog[table.schema_name]
[docs] def drop(self): """ Delete a single column. :return: """ self.table.drop_columns(self)
[docs] def validate(self): rval = self.annotations.validate(self) rval = self.acls.validate(self) and rval rval = self.acl_bindings.validate(self) and rval return rval
[docs] def validate_display(self): # TODO Need to finish return True
[docs]class DerivaKey(DerivaCore): def __init__(self, catalog, key): """ :param catalog: Catalog in which this key exists :param key: """ super().__init__(catalog) self.key = key self.catalog.model_map[key] = self def __str__(self): return '{name}:{columns}\n\tcomment: {comment}\n\tannotations: {annotations}'.format( name=self.name, columns=[i.name for i in self.columns], comment=self.comment, annotations=[a for a in self.annotations]) @property def name(self): try: return self.key.name except AttributeError: return self.key.names[0][1] if len(self.key.names) == 1 else None @property def table(self): return self.catalog.model_map[self.key.table] @property def full_name(self): try: return self.table.schema_name, self.key.name except AttributeError: return self.key.names[0] @property def columns(self): # Get the column names in the same order of the column declerations. return [c for c in self.table.columns if c in [self.catalog.model_map[kc] for kc in self.key.columns]] @property def comment(self): return self.key.comment @comment.setter def comment(self, comment): if isinstance(self.key, DerivaKey._DerivaKeyDef): self.key.comment = comment else: raise DerivaCatalogError(self, 'Cannot alter defined key type') @property def display(self): return self.annotations[chaise_tags.key_display] @display.setter def display(self, value): self.annotations[chaise_tags.display] = value @property def key_display(self): return self.annotations[chaise_tags.key_display] @key_display.setter def key_display(self, value): self.annotations[chaise_tags.key_display] = value
[docs] @staticmethod def define(columns, name=[], comment=None, annotations={}): return em.Key.define(columns, name, comment, annotations)
[docs] def update_table(self, table): if self.table: return self.catalog = table.catalog self.table = table self.schema = self.catalog[table.schema_name] self.key.update_name(table)
[docs] def drop(self): try: with DerivaModel(self.table.catalog): key.drop() except HTTPError as e: raise DerivaKeyError(self, msg=str(e)) self.key = None
[docs] def get_acls(self): return key.acls
[docs] def validate(self): return self.annotations.validate(self)
[docs] def validate_display(self): # TODO finish return True
[docs]class DerivaForeignKey(DerivaCore): def __init__(self, catalog, fkey): """" Create a DerivaForeignKey object from an existing ERMrest FKey, or initalize an object for a key to be created at some point in the future. :param table: DerivaTable in which this key exists :param name: Either the name of the key, an existing ERMrest FK or the unique columns in the key. """ super().__init__(catalog) self.fkey = fkey self.catalog.model_map[fkey] = fkey def __str__(self): return '\n'.join([ '{}: {}'.format(self.name, self.columns), '\tcomment: {}'.format(self.comment), '\tannotations: {}'.format(self.annotations) ])
[docs] @staticmethod def define( columns, dest_schema, dest_table, dest_columns, name=None, comment=None, on_update='NO ACTION', on_delete='NO ACTION', acls={}, acl_bindings={}, annotations={} ): return em.ForeignKey.define(columns, dest_schema, dest_table, dest_columns, constraint_names=[name] if name else [], comment=comment, on_update=on_update, on_delete=on_delete, acls=acls, acl_bindings=acl_bindings, annotations=annotations )
@property def name(self): return self.fkey.name[1] @name.setter def name(self,value): self.fkey.name = [self.table.schema, value] @property def full_name(self): try: return (self.table.schema_name, self.fkey.name) except AttributeError: return self.fkey.names[0] @property def table(self): return self.catalog.model_map[self.fkey.table] @property def columns(self): columns = [c for c in self.fkey.table.columns if c in self.fkey.foreign_key_columns] assert len(columns) == len(self.fkey.foreign_key_columns) return [self.catalog.model_map[c] for c in columns] @property def referenced_table(self): return self.catalog.model_map[self.referenced_columns[0].column.table] @property def referenced_columns(self): # Need to order columns so that they are consistent. col_map = self.fkey.column_map columns = [ col_map[c.column] for c in self.columns if c.name in [i.name for i in self.columns] ] return [self.catalog.model_map[c] for c in columns] @property def column_map(self): return {self.catalog.model_map[k]: self.catalog.model_map[v] for k,v in self.fkey.column_map.items()} @property def comment(self): return self.fkey.comment @comment.setter def comment(self, comment): if isinstance(self.fkey, DerivaForeignKey): self.fkey.comment = comment else: raise DerivaCatalogError(self, 'Cannot alter defined key type') @property def on_update(self): return self.fkey.on_update @property def on_delete(self): return self.fkey.on_delete
[docs] def standardize_name(self): self.name = '{}_'.format(self.fkey.table.name) + '_'.join([c for c in self.columns] + ['fkey'])
[docs] def drop(self): referenced_table = self.referenced_table column = next(iter(self.columns)) if len(self.columns) == 1 else False self.logger.debug('demoting visible column %s', column) referenced_table.visible_foreign_keys.delete_visible_source(self.name) del (referenced_table.referenced_by[self.name]) if column: self.table.visible_columns.make_column(column.name, validate=False) with DerivaModel(self.table.catalog) as m: self.fkey.drop() self.fkey = None
[docs] def get_acls(self): with DerivaModel(self.catalog) as m: return self.fkey.acls
[docs] def get_acl_bindings(self): with DerivaModel(self.catalog) as m: return self.fkey.acl_bindings
[docs] def definition(self): # Key will either be a DerivaForeignKey or an ermrest fkey. try: return self.fkey.definition(self) except AttributeError: return self.fkey
[docs] def validate(self): rval = self.annotations.validate(self) rval = self.acls.validate(self) and rval rval = self.acl_bindings.validate(self) and rval return rval
[docs] def validate_display(self): # TODO need to finish.... return True
[docs]class DerivaTable(DerivaCore): def __init__(self, catalog, table): DerivaCore.__init__(self, catalog) self.table = table self.deleted = False self._map_model() def __getitem__(self, column_name): return self.column(column_name) def __iter__(self): return self.columns.__iter__() def _repr_html_(self): rep = '\n'.join([ '<b>Table: <a href={}, target="_blank"> {}</b></a><br><br>'.format(self.chaise_uri, self.name), tabulate.tabulate( [[i.name, i.type.typename, i.nullok, i.default] for i in self.columns], headers=['Column Name', 'Type', 'NullOK', 'Default'], colalign=('center', 'center', 'center', 'center'), tablefmt='html'), '<br>', 'Keys:', tabulate.tabulate([[i.name, [c.name for c in i.columns]] for i in self.keys], headers=['Key Name', 'Key Columns'], colalign=('center','center'), tablefmt='html'), '<br>', 'Foreign Keys:', tabulate.tabulate( [[i.name, [c.name for c in i.columns], '->', '{} {}'.format(i.referenced_table.name, [c.name for c in i.referenced_columns])] for i in self.foreign_keys], headers=['Key Name', 'Key Columns', '', 'Referenced Table', 'Referenced Columns'], colalign=('center', 'center', 'center' 'center', 'center'), tablefmt='html'), '<br>', 'Referenced By:', tabulate.tabulate( [ [i.name, [c.name for c in i.referenced_columns], '<-', '{}:{}:'.format(i.table.schema_name, i.table.name), [c.name for c in i.columns] ] for i in self.referenced_by], headers=['Key Name', 'Key Columns', '', '', 'Referenced Columns'], colalign=('center','center', 'center', 'center', 'center'), tablefmt='html') ] ) return rep.replace('center','left') def __str__(self): return '\n'.join([ 'Table {}'.format(self.name), tabulate.tabulate( [[i.name, i.type.typename, i.nullok, i.default] for i in self.columns], headers=['Name', 'Type', 'NullOK', 'Default'] ), '\n', 'Keys:', tabulate.tabulate([[i.name[1], [c.name for c in i.columns]] for i in self.keys], headers=['Name', 'Columns']), '\n', 'Foreign Keys:', tabulate.tabulate( [[i.name, [c.name for c in i.columns], '->', i.referenced_table.name, [c.name for c in i.referenced_columns]] for i in self.foreign_keys], headers=['Name', 'Columns', '', 'Referenced Table', 'Referenced Columns']), '\n\n', 'Referenced By:', tabulate.tabulate( [ [i.name, [c.name for c in i.referenced_columns], '<-', '{}:{}:'.format(i.table.schema.name, i.table.name), [c.name for c in i.columns] ] for i in self.referenced_by], headers=['Name', 'Columns', '', '', 'Referenced Columns']) ] ) @property def chaise_uri(self): p = urlparse(self.catalog.server_uri) return '{}://{}/chaise/recordset/#{}/{}:{}'.format( p.scheme, p.hostname, self.catalog.catalog_id, self.schema_name, self.name) @property def name(self): return self.table.name @property def comment(self): return self.table.comment @comment.setter def comment(self, value): with DerivaModel(self.catalog): self.table.comment = value @property def display(self): return self.annotations[chaise_tags.display] @display.setter def display(self, value): self.annotations[chaise_tags.display] = value @property def table_display(self): return self.annotations[chaise_tags.table_display] @table_display.setter def table_display(self, value): self.annotations[chaise_tags.table_display] = value @property def visible_columns(self): try: return DerivaVisibleSources(self, chaise_tags.visible_columns) except KeyError: raise DerivaSourceError(self, msg='Visible columns not defined') @visible_columns.setter def visible_columns(self, vcs): with DerivaModel(self.catalog): self.table.visible_columns = vcs @property def visible_foreign_keys(self): return DerivaVisibleSources(self, chaise_tags.visible_foreign_keys) @visible_foreign_keys.setter def visible_foreign_keys(self, keys): with DerivaModel(self.catalog): self.table.visible_foreign_keys = keys @property def schema(self): return self.catalog.model_map[self.table.schema] @property def columns(self): return KeyedList([self.catalog.model_map[c] for c in self.table.column_definitions]) @property def keys(self): return KeyedList([ self.catalog.model_map[k] for k in self.table.keys]) @property def foreign_key(self): return self.foreign_keys @property def foreign_keys(self): return KeyedList([self.catalog.model_map[k] for k in self.table.foreign_keys]) @property def referenced_by(self): return KeyedList([self.catalog.model_map[fk] for fk in self.table.referenced_by])
[docs] def key_referenced(self, columns): """ Given a set of columns that are a key, return the list of foreign keys that reference those columns. :param columns: :return: """ if not self.key[columns]: raise DerivaCatalogError(self,msg='Argument to key_referenced is not a key') columns = set(columns) return [fk for fk in self.referenced_by if {i.name for i in fk.referenced_columns} == columns]
def _map_model(self): for c in self.table.column_definitions: self.catalog.model_map[c] = DerivaColumn(self.catalog, c) for k in self.table.keys: self.catalog.model_map[k] = DerivaKey(self.catalog, k) for fk in self.table.foreign_keys: self.catalog.model_map[fk] = DerivaForeignKey(self.catalog, fk) def _referenced(self, fkey_id, referenced_by): """ Return the list of DerivaForeignKeys associated with fk_id. The Referenced_by list is different in that it is keys ih other tables which that the current table as the target. We will name the FK we are interested in by providing eather 1) the name of that FK, 2) or 2) a ERMrest Foreign Key. In order to reassociate the key with the source table, we need to look into the list of referenced_by keys and search for the schema so we get the table that the key is in. :param fk: :return: A list of foreign keys that reference the column, or the single foreign key whose name matches. """ self.logger.debug('fkey_id: %s referenced_by: %s', fkey_id, [fk.names for fk in referenced_by]) fkey = None if isinstance(fkey_id, em.ForeignKey): fkey = fkey_id else: # We have either a constraint name or a column name. try: # See if we already have a key name fkey = referenced_by[tuple(fkey_id)] except (KeyError, TypeError): for schema in self.catalog: try: fkey = referenced_by[(schema.name, fkey_id)] except (TypeError, KeyError): continue if not fkey: raise DerivaCatalogError(self, 'referenced by requires name or key type: {}'.format(fkey_id)) # Now find the schema and table of the referring table src_schema = fkey.foreign_key_columns[0].table.schema.name src_table = fkey.foreign_key_columns[0].table.name self.logger.debug('creating fkey... %s', fkey.names[0]) return DerivaForeignKey(self.table.catalog[src_schema][src_table], fkey) @property def key(self): return self.keys @property def datapath(self): return self.catalog.getPathBuilder().schemas[self.table.schema.name].tables[self.name] @property def path(self): return self.datapath.path def _column_names(self): return [i.name for i in self.columns]
[docs] def create_key(self, columns, name=None, comment=None, annotations={}): key = DerivaKey(self, columns, name, comment, annotations, define=True) self.logger.debug('creating key....') key.create()
[docs] def column(self, column_name): return self.catalog.model_map[self.table[column_name]]
[docs] def validate(self): with DerivaModel(self.catalog): rval = self.annotations.validate(self) rval = self.keys.validate() and rval rval = self.foreign_keys.validate() and rval rval = self.columns.validate() and rval rval = self.acls.validate(self) and rval rval = self.acl_bindings.validate(self) and rval return rval
[docs] def validate_display(self): # TODO FInish.... return True
[docs] def validate_table_display(self): rval = True for k in self.table_display.keys(): DerivaContext(k) return rval
def _column_map(self, column_map, dest_table): return DerivaColumnMap(self, column_map, dest_table)
[docs] def entities(self): return self.datapath.entities()
[docs] def attributes(self, *attributes, **renamed_attributes): return self.datapath.attributes(*attributes, **renamed_attributes)
[docs] def create_foreign_key(self, columns, referenced_table, referenced_columns, name=None, comment=None, on_update='NO ACTION', on_delete='NO ACTION', acls=None, acl_bindings=None, annotations=None, position=None): """ :param columns: Column names in current table that are used for the foreign key :param referenced_table: Dervia table that is being referenced by this foreign key :param referenced_columns: :param name: :param comment: :param on_update: :param on_delete: :param acls: ACLs, defaults to {} :param acl_bindings: defaults to {} :param annotations: defaults to {} :param position: defaults to {} :return: """ if acls is None: acls = {} if acl_bindings is None: acl_bindings = {} if annotations is None: annotations = {} if position is None: position = {} if acl_bindings is None: acl_bindings = {} if acl_bindings is None: acl_bindings = [] self.logger.debug('table: %s columns: %s %s referenced_columns: %s referenced_by: %s', self.name, columns, referenced_table.name, referenced_columns, [i.name for i in referenced_table.referenced_by]) with DerivaModel(self.catalog): if name is None: ordered_columns = [c.name for c in self.columns if c.name in columns] name = ['{}_'.format(self.table.name) + '_'.join(ordered_columns) + ['_fkey']] fkey = self.create_fkey( DerivaForeignKey.define(columns, referenced_table, referenced_columns, comment=comment, acls=acls, acl_bindings=acl_bindings, name=name, on_update=on_update, on_delete=on_delete, annotations=annotations, define=True) ) _, _, inbound_sources = referenced_table.sources(filter=[fkey.name]) # Pick out the source for this key: self.logger.debug('inbound sources %s', [s.spec for s in inbound_sources]) self.logger.debug('inbound sources %s', [c.name for c in referenced_table.referenced_by]) referenced_table.visible_foreign_keys.insert_sources(inbound_sources, position) if len(columns) == 1: self.visible_columns.make_outbound(columns[0]) self.logger.debug('new vc %s', self.visible_columns) return DerivaForeignKey(self, name)
[docs] def sources(self, merge_outbound=False, filter=None): """ Create source lists from table columns. Go through the columns and keys in the current table and create a list of DerivaSourceSpecs for each of them. If filter is provided, only the column or key names in the list are examined. If merge_outbound is true and a column is used in a simple foreign key, used return an outbound source rather then the column source. :param merge_outbound: If True and the column is in a simple foreign_key s :param filter: List of column or key names to include in the returned source lists. :return: A triple of DerivaSourceSpec lists for columns, foreign_keys and incoming foreign_keys. """ def full_key_name(k): return (k.table.schema.name, k.name) # Go through the list of foreign keys and create a list of key columns in simple foreign keys fkey_names = { [c.name for c in fk.columns][0]: fk for fk in self.foreign_keys if len(fk.columns) == 1 } # TODO We should check to see if target is vocabulary and if so use ID rather then RID column_sources = [ DerivaSourceSpec(self, {'source': ( [{'outbound': full_key_name(fkey_names[col.name])}, 'RID'] if col.name in fkey_names and merge_outbound else col.name )} ) for col in self.columns if not filter or col.name in filter ] outbound_sources = [ DerivaSourceSpec(self, {'source': [{'outbound': full_key_name(i)}, 'RID']}) for i in self.foreign_keys if not filter or i.name in filter] inbound_sources = [ DerivaSourceSpec(self, {'source': [{'inbound': full_key_name(i)}, 'RID']}) for i in self.referenced_by if not filter or i.name in filter ] return column_sources, outbound_sources, inbound_sources
@staticmethod def _rename_markdown_pattern(pattern, column_map): # Look for column names {{columnname}} in the templace and update. # TODO handle: 'markdown_pattern': '{{{$fkeys.Beta_Cell.XRay_Tomography_Data_File_Type_FKey.rowName}}}' for k, v in column_map.get_names().items(): pattern = pattern.replace('{{{}}}'.format(k), '{{{}}}'.format(v)) pattern = pattern.replace('fkeys.{}.{}'.format(column_map.table.schema_name, k), 'fkeys.{}.{}'.format(column_map.dest_table.schema_name, v)) return pattern @staticmethod def _rename_columns_in_display(dval, column_map): return { k: DerivaTable._rename_markdown_pattern(v, column_map) if (k == 'markdown_name' or k == 'row_markdown_pattern') else v for k, v in dval.items() } @staticmethod def _rename_columns_in_context_display(dval, column_map): return {context: {k: DerivaTable._rename_markdown_pattern(v, column_map) for k, v in cvalue.items()} for context, cvalue in dval.items() } def _rename_columns_in_annotations(self, column_map, skip_annotations=[], validate=False): new_annotations = {} self.catalog.rename_visible_columns(column_map, validate=validate) for k, v in self.annotations.items(): if k in skip_annotations: renamed = v elif k == chaise_tags.display: renamed = self._rename_columns_in_display(v, column_map) elif (k == chaise_tags.table_display or k == chaise_tags.column_display): renamed = DerivaTable._rename_columns_in_context_display(v, column_map) else: renamed = v new_annotations[k] = renamed return new_annotations def _rename_columns_in_column_annotations(self, annotation, column_map): return annotation def _key_in_columns(self, columns, key_columns, rename=False): """ Given a set of columns and a key, return true if the key is in that column set. If we are simply renaming columns, rather then moving them to a new table, not all of the columns in a composite key have to be present as we still have the other columns available to us. Return false if there is no overlap. Raise an exception if you are attmpting to break up a composite key. :param columns: List of columns in a table that are being altered :param key_columns: list of columns in the key :param rename: true if you are renaming columns within a single table, rather then deleting or moving them. :return: True if the key is contained within columns. """ overlap = set(columns).intersection({k.name for k in key_columns}) # Determine if we are moving the column within the same table, or between tables. self.logger.debug('columns %s key_columns %s overlap %s', columns, {k.name for k in key_columns}, overlap) if len(overlap) == 0: return False if (not rename) and (len(overlap) < len(key_columns)): raise DerivaCatalogError(self, msg='Cannot rename part of compound key {}'.format(key_columns)) return True def _check_composite_keys(self, columns, rename=False): """ Go over all of the keys, incoming and outgoing foreign keys and check to make sure that renaming the set of columns won't break up composite keys if they are renamed. :param columns:list of columns that you want to check. :param rename: true if you are renaming columns within a single table, rather then deleting or moving them. :return: """ columns = set(columns) self.logger.debug('columns %s, %s', columns, rename) for i in self.keys: self.logger.debug('key %s', [k.name for k in i.columns]) self._key_in_columns(columns, i.columns, rename) for fk in self.foreign_keys: self.logger.debug('foreign_key %s %s', fk.table.name, [i.name for i in fk.columns]) self._key_in_columns(columns, fk.columns, rename) self._key_in_columns(columns, fk.columns, rename) def _copy_keys(self, column_map): """ Copy over the keys from the current table to the destination table, renaming columns. :param column_map: :return: """ for k, key_def in column_map.get_keys().items(): self.logger.debug('from key_name %s to key_name: %s', k, key_def.name) key_def.create() for k, fkey_def in column_map.get_foreign_keys().items(): self.logger.debug('fro fkey_name %s to %s', k, fkey_def.name) fkey_def.create() def _delete_columns_in_display(self, annotation, columns): raise DerivaCatalogError(self, 'Cannot delete column from display annotation') def _delete_columns_from_annotations(self, columns, column_specs): for k, v in self.annotations.items(): if k == chaise_tags.display: self._delete_columns_in_display(v, columns) elif k == chaise_tags.visible_columns or k == chaise_tags.visible_foreign_keys: DerivaVisibleSources(self, k).delete_visible_source(column_specs) def _create_upload_spec(self, file_pattern, extensions): """ Create a basic asset table and configures the bulk upload annotation to load the table along with a table of associated metadata. This routine assumes that the metadata table has already been defined, and there is a key associated metadata. This routine assumes that the metadata table has already been defined, and there is a key column the metadata table that can be used to associate the asset with a row in the table. The default configuration assumes that the assets are in a directory named with the table name for the metadata and that they either are in a subdirectory named by the key value, or that they are in a file whose name starts with the key value. :return: """ extension_pattern = '^.*[.](?P<file_ext>{})$'.format('|'.join(extensions if extensions else ['.*'])) key_column = 'foo' spec = [ # Any metadata is in a file named /records/schema_name/tablename.[csv|json] { 'default_columns': ['RID', 'RCB', 'RMB', 'RCT', 'RMT'], 'ext_pattern': '^.*[.](?P<file_ext>json|csv)$', 'asset_type': 'table', 'file_pattern': '^((?!/assets/).)*/records/(?P<schema>%s?)/(?P<table>%s)[.]' % (self.schema_name, self.name), 'target_table': [self.schema_name, self.name], }, # Assets are in format assets/schema_name/table_name/correlation_key/file.ext { 'checksum_types': ['md5'], 'column_map': { 'URL': '{URI}', 'Length': '{file_size}', self.name: '{table_rid}', 'Filename': '{file_name}', 'MD5': '{md5}', }, 'dir_pattern': '^.*/(?P<schema>%s)/(?P<table>%s)/(?P<key_column>.*)/' % (self.schema_name, self.name), 'ext_pattern': extension_pattern, 'file_pattern': file_pattern, 'hatrac_templates': {'hatrac_uri': '/hatrac/{schema}/{table}/{md5}.{file_name}'}, 'target_table': [self.schema_name, self.name], # Look for rows in the metadata table with matching key column values. 'metadata_query_templates': [ '/attribute/D:={schema}:{table}/%s={key_column}/table_rid:=D:RID' % key_column], # Rows in the asset table should have a FK reference to the RID for the matching metadata row 'record_query_template': '/entity/{schema}:{table}/{table}={table_rid}/MD5={md5}/URL={URI_urlencoded}', 'hatrac_options': {'versioned_uris': True}, } ] # The last thing we should do is update the upload spec to accomidate this new asset table. if chaise_tags.bulk_upload not in self.catalog.annotations: self.catalog.annotations.update({ chaise_tags.bulk_upload: { 'asset_mappings': [], 'version_update_url': 'https://github.com/informatics-isi-edu/deriva-qt/releases', 'version_compatibility': [['>=0.4.3', '<1.0.0']] } }) # Clean out any old upload specs if there are any and add the new specs. upload_annotations = self.catalog.annotations[chaise_tags.bulk_upload] upload_annotations['asset_mappings'] = \ [i for i in upload_annotations['asset_mappings'] if not ( i.get('target_table', []) == [self.schema_name, self.name] or ( i.get('target_table', []) == [self.schema_name, self.name] and i.get('asset_type', '') == 'table' ) ) ] + spec
[docs] def delete_columns(self, columns): """ Drop a set of columns from a table, cleaning up visible columns and keys. You cannot delete columns if they are being used by a foreign key in another table, or if they are part of a composite key and you are only deleting a subset of the columns. :param columns: A list of column names or DerivaColumn instances for the current table """ if isinstance(columns, DerivaColumn): columns = [columns.name] self.logger.debug('%s', columns) # Don't delete just part of a key or foreign_key. self._check_composite_keys(columns) # If columns are being referenced by another table, then do not delete them. for fk in self.referenced_by: self.logger.debug('referenced_columns %s %s %s %s', columns, fk.table.name, fk.referenced_table.name, [i.name for i in fk.referenced_columns]) if self._key_in_columns(columns, fk.referenced_columns) and fk.on_delete != 'CASCADE': raise DerivaCatalogError(self, msg='Key referenced by foreign key {}'.format(columns)) with DerivaModel(self.catalog) as m: # Capture the source specs before we start deleting columns.... column_specs = [DerivaSourceSpec(self, c) for c in columns] # Remove keys... for k in self.keys: if self._key_in_columns(columns, k.columns): k.drop() for fk in self.foreign_keys: if self._key_in_columns(columns, fk.columns): fk.drop() # Now delete the actual columns for c in columns: c.drop() # Now clean up all the annotations. self._delete_columns_from_annotations(columns, column_specs)
[docs] def copy_columns(self, column_map, dest_table=None): """ Copy a set of columns, updating visible columns list and keys to mirror source columns. The columns to copy are specified by a column map. Column map can be a dictionary with entries SrcCol: DerviaColumnSpec or SrcCol:TargetCol. :param column_map: a column_map that describes the list of columns. :param dest_table: Table name of destination table :param column_map: A dictionary that specifies column name mapping :return: """ self.logger.debug('%s %s',column_map , dest_table.name if dest_table else "None") with DerivaModel(self.catalog): dest_table = dest_table if dest_table else self column_map = self._column_map(column_map, dest_table) columns = column_map.get_columns() column_names = [k for k in column_map.get_columns().keys()] # TODO we need to figure out what to do about ACL binding # Make sure that we can rename the columns overlap = {v.name for v in columns.values()}.intersection(set(dest_table._column_names())) if len(overlap) != 0: raise ValueError('Column {} already exists.'.format(overlap)) self._check_composite_keys(column_names, rename=(dest_table == self)) # Update visible column spec, putting copied column right next to the source column. positions = {col: [column_map[col].name] for col in column_map.get_columns()} if dest_table is self else {} dest_table.create_columns([i for i in columns.values()], positions) # Copy over the old values from_path = self.datapath to_path = dest_table.datapath # Get the values of the columns, and remap the old column names to the new names. Skip over new columns that # don't exist in the source table. self.logger.debug('copying columns %s %s',[c.name for c in self.columns], [val.name for col, val in column_map.get_columns().items()]) rows = from_path.attributes( **{ **{val.name: getattr(from_path, col) for col, val in column_map.get_columns().items() if col in self.columns}, **{'RID': from_path.RID} } ) to_path.update(rows) # Copy over the keys. self._copy_keys(column_map) return
[docs] def create_columns(self, columns, positions={}, visible=True): """ Create a new column in the table. :param columns: A list of DerivaColumn. :param positions: Where the column should be added into the visible columns spec. :param visible: Include this column in the visible columns spec. :return: """ self.logger.debug('columns %s positions: %s', columns, positions, ) column_names = [] columns = columns if type(columns) is list else [columns] with DerivaModel(self.catalog): for column in columns: column.update_table(self) column.create() column_names.append(column.name) if visible: sources, _, _ = self.sources(filter=column_names) self.visible_columns.insert_sources(sources, positions)
[docs] def rename_column(self, from_column, to_column, default=None, nullok=None): """ Rename a column by copying it and then deleting the origional column. THe type of the new column is the same as the old column. It is possible to alter the settings of nullok and default. :param from_column: Name of the column being copied. :param to_column: Name of the new column :param default: Set default value on new column, otherwise, copy existing :param nullok: Set NullOK to provided value on new column, otherwise copy existing :return: """ column_map = {from_column: DerivaColumn(table=self, name=to_column, type=from_column.type, nullok=nullok, default=default)} self.rename_columns(column_map=column_map) return
[docs] def rename_columns(self, column_map, dest_table=None, delete=True): """ Rename a column by copying it and then deleting the origional column. :param dest_table: :param column_map: :param delete: :return: """ with DerivaModel(self.catalog): dest_table = dest_table if dest_table else self column_map = self._column_map(column_map, dest_table) self.logger.debug('%s', column_map) for fk in self.referenced_by: self.logger.debug('referenced_columns %s %s %s %s', column_map.get_names(), fk.table.name, fk.referenced_table.name, [i.name for i in fk.referenced_columns]) if self._key_in_columns(column_map.get_names(), fk.referenced_columns, rename=(self == dest_table)): raise DerivaCatalogError(self, msg='Key referenced by foreign key {}'.format(column_map.get_names())) self.copy_columns(column_map, dest_table) # Update column name in ACL bindings.... self._rename_columns_in_acl_bindings(column_map) # Update annotations where the old spec was being used. We have already moved over # the visible columns, so skip the visible columns annotation. self.annotations.update( self._rename_columns_in_annotations(column_map, skip_annotations=[chaise_tags.visible_columns]) ) if delete: columns = [k for k in column_map.get_columns().keys()] # Go through the keys and foreign_keys and delete any constraints that include the columns. for i in self.keys: if self._key_in_columns(columns, i.columns, rename=(self == dest_table)): self.logger.debug('delete key %s', [k.name for k in i.columns]) i.drop() for fk in self.foreign_keys: if self._key_in_columns(columns, fk.columns, rename=(self == dest_table)): self.logger.debug('delete key %s', [k.name for k in fk.columns]) fk.drop() self.delete_columns(columns) return
[docs] def drop(self): """ Delete a table :return: """ if len(self.referenced_by) != 0: DerivaCatalogError(self, 'Attept to delete table with incoming foreign keys') with DerivaModel(self.catalog): for fk in self.foreign_keys: fk.referenced_table.visible_foreign_keys.delete_visible_source(fk.name) # Now we can delete the table. self.table.drop() del self.catalog.model_map[self.table] self.deleted = True
def _relink_columns(self, dest_table, column_map): """ We want to replace the current table with the dest_table. Go through the list of tables that are currently pointing to this table and replace the foreign_key to reference dest_table instead. Some of the columns may have been renamed, so use the column_map to get the current table name. :param dest_table: :param column_map: :return: """ self.logger.debug('%s %s %s', self.name, dest_table.name, [i.name for i in self.referenced_by]) for fkey in list(self.referenced_by): fk_columns = [i.name for i in fkey.columns] referenced_columns = [i.name for i in fkey.referenced_columns] column_name_map = column_map.get_names() child_table = fkey.table self.logger.debug('relinking table: %s fkey: %s columns: %s %s', child_table.name, fkey.name, fk_columns, referenced_columns) if self._key_in_columns(column_name_map.keys(), fkey.referenced_columns, rename=(self == dest_table)): comment = fkey.comment acls = fkey.acls acl_bindings = fkey.acl_bindings annotations = fkey.annotations self.logger.debug('before delete table: %s fkey: %s referenced_by: %s', child_table.name, fkey.name, [i.name for i in self.referenced_by]) fkey.drop() self.logger.debug('after delete referenced_by: %s', [i.name for i in self.referenced_by]) child_table.create_foreign_key( fk_columns, dest_table, [column_name_map.get(i, i) for i in referenced_columns], comment=comment, acls=acls, acl_bindings=acl_bindings, annotations=annotations ) self.catalog.rename_visible_columns(column_map)
[docs] def copy_table(self, schema_name, table_name, column_map={}, clone=False, key_defs=[], fkey_defs=[], comment=None, acls={}, acl_bindings={}, annotations={} ): """ Copy the current table to the specified target schema and table. All annotations and keys are modified to capture the new schema and table name. Columns can be renamed in the target table by providing a column mapping. Key and foreign key definitions can be augmented or overwritten by providing appropriate arguments. Lastly if the clone argument is set to true, the RIDs of the source table are reused, so that the equivalent of a move operation can be obtained. :param schema_name: Target schema name :param table_name: Target table name :param column_map: A dictionary that is used to rename columns in the target table. :param clone: :param key_defs: :param fkey_defs: :param comment: :param acls: :param acl_bindings: :param annotations: :return: The new table """ self.logger.debug('schema_name %s dest_table %s', schema_name, table_name) with DerivaModel(self.catalog): # Augment the column_map with entries for columns in the table, but not in the map. new_map = {i.name: column_map.get(i.name, i.name) for i in self.columns} new_map.update(column_map) # Add keys to column map. We need to create a dummy destination table for this call. proto_table = namedtuple('ProtoTable', ['catalog', 'schema', 'schema_name', 'name']) dest_table = proto_table(self.catalog, self.catalog[schema_name], schema_name, table_name) column_map = self._column_map(new_map, dest_table) # new_columns = [c['name'] for c in column_defs] # TODO May want to preserver pseudo columns that start with outbound fk. annotations = self._rename_columns_in_annotations(column_map) annotations.pop(chaise_tags.visible_foreign_keys, None) new_table = self.catalog[schema_name].create_table( table_name, # Use column_map to change the name of columns in the new table. column_defs=column_map.get_columns().values(), key_defs=[i for i in column_map.get_keys().values()] + key_defs, fkey_defs=[i for i in column_map.get_foreign_keys().values()] + fkey_defs, comment=comment if comment else self.comment, acls={**self.acls, **acls}, acl_bindings={**self.acl_bindings, **acl_bindings}, annotations=annotations ) # Create new table new_table.table_model = table_name new_table.schema_model = schema_name # Copy over values from original to the new one, mapping column names where required. Use the column_fill # argument to provide values for non-null columns. pb = self.catalog.getPathBuilder() from_path = pb.schemas[self.schema_name].tables[self.name] to_path = pb.schemas[schema_name].tables[table_name] self.logger.debug('copying columns: %s', {column_map.get(i.name, i).name: getattr(from_path, i.name) for i in self.columns}) v = from_path.attributes( **{column_map.get(i.name, i).name: getattr(from_path, i.name) for i in self.columns}) rows = map( lambda x: {**x, **{k: v.fill for k, v in column_map.get_columns().items() if v.fill}}, v.fetch()) to_path.insert(list(rows), **({'nondefaults': {'RID', 'RCT', 'RCB'}} if clone else {})) return new_table
[docs] def move_table(self, schema_name, table_name, delete=True, column_map={}, key_defs=[], fkey_defs=[], comment=None, acls={}, acl_bindings={}, annotations={} ): """ Move a table, renaming and inserting new columns. :param schema_name: Schema for new table :param table_name: Name of new table :param delete: Delete the origional table. Defaults to True :param column_map: A DerivaColumnMap that defines and column renaming or insertions. :param key_defs: New keys that should be defined in the target table :param fkey_defs: :param comment: :param acls: :param acl_bindings: :param annotations: :return: New DerivaTable object """ self.logger.debug('%s %s %s', schema_name, table_name, column_map) with DerivaModel(self.catalog): # Augment the column_map with entries for columns in the table, but not in the map. new_map = {i.name: column_map.get(i.name, i.name) for i in self.columns} new_map.update(column_map) # Add keys to column map. We need to create a dummy destination table for this call. proto_table = namedtuple('ProtoTable', ['catalog', 'schema', 'schema_name', 'name']) dest_table = proto_table(self.catalog, self.catalog[schema_name], schema_name, table_name) column_map = self._column_map(new_map, dest_table) new_table = self.copy_table(schema_name, table_name, clone=True, column_map=column_map, key_defs=key_defs, fkey_defs=fkey_defs, comment=comment, acls=acls, acl_bindings=acl_bindings, annotations=annotations) self._relink_columns(new_table, column_map) if delete: self.drop() return new_table
[docs] def create_asset_table(self, key_column, extensions=[], file_pattern='.*', column_defs=[], key_defs=[], fkey_defs=[], comment=None, acls={}, acl_bindings={}, annotations={}, set_policy=True): """ Create a basic asset table and configures the bulk upload annotation to load the table along with a table of associated metadata. This routine assumes that the metadata table has already been defined, and there is a key associated metadata. This routine assumes that the metadata table has already been defined, and there is a key column the metadata table that can be used to associate the asset with a row in the table. The default configuration assumes that the assets are in a directory named with the table name for the metadata and that they either are in a subdirectory named by the key value, or that they are in a file whose name starts with the key value. :param key_column: The column in the metadata table to be used to correlate assets with entries. Assets will be named using the key column. :param extensions: List file extensions to be matched. Default is to match any extension. :param file_pattern: Regex that identified the files to be considered for upload :param column_defs: a list of Column.define() results for extra or overridden column definitions :param key_defs: a list of Key.define() results for extra or overridden key constraint definitions :param fkey_defs: a list of ForeignKey.define() results for foreign key definitions :param comment: a comment string for the asset table :param acls: a dictionary of ACLs for specific access modes :param acl_bindings: a dictionary of dynamic ACL bindings :param annotations: a dictionary of annotations :param set_policy: If true, add ACLs for self serve policy to the asset table :return: """ def create_asset_upload_spec(): extension_pattern = '^.*[.](?P<file_ext>{})$'.format('|'.join(extensions if extensions else ['.*'])) return [ # Any metadata is in a file named /records/schema_name/tablename.[csv|json] { 'default_columns': ['RID', 'RCB', 'RMB', 'RCT', 'RMT'], 'ext_pattern': '^.*[.](?P<file_ext>json|csv)$', 'asset_type': 'table', 'file_pattern': '^((?!/assets/).)*/records/(?P<schema>%s?)/(?P<table>%s)[.]' % (self.schema_name, self.name), 'target_table': [self.schema_name, self.name], }, # Assets are in format assets/schema_name/table_name/correlation_key/file.ext { 'checksum_types': ['md5'], 'column_map': { 'URL': '{URI}', 'Length': '{file_size}', self.name: '{table_rid}', 'Filename': '{file_name}', 'MD5': '{md5}', }, 'dir_pattern': '^.*/(?P<schema>%s)/(?P<table>%s)/(?P<key_column>.*)/' % (self.schema_name, self.name), 'ext_pattern': extension_pattern, 'file_pattern': file_pattern, 'hatrac_templates': {'hatrac_uri': '/hatrac/{schema}/{table}/{md5}.{file_name}'}, 'target_table': [self.schema_name, asset_table_name], # Look for rows in the metadata table with matching key column values. 'metadata_query_templates': [ '/attribute/D:={schema}:{table}/%s={key_column}/table_rid:=D:RID' % key_column], # Rows in the asset table should have a FK reference to the RID for the matching metadata row 'record_query_template': '/entity/{schema}:{table}_Asset/{table}={table_rid}/MD5={md5}/URL={URI_urlencoded}', 'hatrac_options': {'versioned_uris': True}, } ] asset_table_name = '{}_Asset'.format(self.name) if set_policy and chaise_tags.catalog_config not in self.catalog.annotations: raise DerivaCatalogError(self, msg='Attempting to configure table before catalog is configured') if key_column not in self.columns: raise DerivaCatalogError(self, msg='Key column not found in target table') column_defs = [ DerivaColumn.define('{}'.format(self.name), 'text', nullok=False, comment="The {} entry to which this asset is attached".format( self.name)), ] + column_defs # Set up policy so that you can only add an asset to a record that you own. fkey_acls, fkey_acl_bindings = {}, {} if set_policy: groups = self.catalog.get_groups() fkey_acls = { "insert": [groups['curator']], "update": [groups['curator']], } fkey_acl_bindings = { "self_linkage_creator": { "types": ["insert", "update"], "projection": ["RCB"], "projection_type": "acl", }, "self_linkage_owner": { "types": ["insert", "update"], "projection": ["Owner"], "projection_type": "acl", } } # Link asset table to metadata table with additional information about assets. asset_fkey_defs = [ DerivaForeignKey.define([self.name], self.schema_name, self.name, ['RID'], acls=fkey_acls, acl_bindings=fkey_acl_bindings, ) ] + fkey_defs comment = comment if comment else 'Asset table for {}'.format(self.name) if chaise_tags.table_display not in annotations: annotations[chaise_tags.table_display] = {'row_name': {'row_markdown_pattern': '{{{Filename}}}'}} asset_table = self.schema.create_asset( asset_table_name, column_defs=column_defs, key_defs=key_defs, annotations=annotations, acls=acls, acl_bindings=acl_bindings, comment=comment) # The last thing we should do is update the upload spec to accomidate this new asset table. if chaise_tags.bulk_upload not in self.catalog.annotations: self.catalog.annotations.update({ chaise_tags.bulk_upload: { 'asset_mappings': [], 'version_update_url': 'https://github.com/informatics-isi-edu/deriva-qt/releases', 'version_compatibility': [['>=0.4.3', '<1.0.0']] } }) # Clean out any old upload specs if there are any and add the new specs. upload_annotations = self.catalog.annotations[chaise_tags.bulk_upload] upload_annotations['asset_mappings'] = \ [i for i in upload_annotations['asset_mappings'] if not ( i.get('target_table', []) == [self.schema_name, asset_table_name] or ( i.get('target_table', []) == [self.schema_name, self.name] and i.get('asset_type', '') == 'table' ) ) ] + create_asset_upload_spec() return asset_table
[docs] def associate_vocabulary(self, term_table, table_column='RID'): """ Set an existing column in the table to refer to an existing vocabulary table. :param column_name: Name of the column whose value is to be from the vocabular :param term_table: The term table. :return: None. """ if not term_table.is_vocabulary_table(): raise DerivaCatalogError(self, 'Attempt to link_vocabulary on a non-vocabulary table') self.associate_tables(term_table, table_column=table_column, target_column='ID') return
[docs] def disassociate_tables(self, target_table): association_table_name = '{}_{}'.format(self.name, target_table.name) raise DerivaCatalogError('Not implented')
[docs] def associate_tables(self, target_table, table_column='RID', target_column='RID', inline=True): """ Create a pure binary association table that connects rows in the table to rows in the target table. Assume that RIDs are used for linking. however, this can be over riden. :param target_schema: Schema of the table that is to be associated with current table :param target_table: Name of the table that is to be associated with the current table :param table_column: Name of the column in the current table that is used for the foreign key, defaults to RID :param target_column: Name of the column in the target table that is to be used for the foreign key, defaults to RID :return: Association table. """ association_table_name = '{}_{}'.format(self.name, target_table.name) column_defs = [ DerivaColumn.define('{}'.format(self.name), 'text', nullok=False), DerivaColumn.define('{}'.format(target_table.name), 'text', nullok=False) ] key_defs = [DerivaKey.define([self.name, target_table.name])] fkey_defs = [ DerivaForeignKey.define([self.name], self, [table_column]), DerivaForeignKey.define([target_table.name], target_table, [target_column]) ] table_def = self.schema.create_table( association_table_name, column_defs, key_defs=key_defs, fkey_defs=fkey_defs, comment='Association table for {}'.format(association_table_name)) # Add reference to association table as an incoming reference to visible columns of table being associated. for fkey in table_def.foreign_keys: _, _, inbound_sources = fkey.referenced_table.sources(filter=[fkey.name]) fkey.referenced_table.visible_columns.insert_sources(inbound_sources)
[docs] def is_pure_binary(self): """ Check to see if the table has the propoerties of a pure binary association. 1. It only has two foreign keys, 2. There is a uniqueness constraint on the two keys. 3. NULL values are not allowed in the foreign keys. :return: Boolean """ # table has only two foreign_key constraints. # Each constraint is over only one column. if [len(fk.columns) for fk in self.foreign_keys] != [1,1]: return False [c0, c1] = [ next(iter(fk.columns)) for fk in self.foreign_keys] # There is a key constraint on the pair of fkey columns. try: self.key[c0.name, c1.name] except DerivaKeyError: return False # Null is not allowed on the column. if c0.nullok or c1.nullok: return False return True
[docs] def associated_tables(self): """ Assuming the table is an pure binary association table, return the two table endpoints :param table: ermrest table object for a table that is a pure binary association table. :return: list of 2-tuples that are the schema and table for the two tables in the M:N relationship """ if not self.is_pure_binary(): raise DerivaCatalogError(self,msg='Table not pure binary %s'.format(self.name)) return [fk.referenced_table for fk in self.foreign_keys]
[docs] def is_vocabulary_table(self): """ Test to see if a table is a deriva vocabulary table. :return: True or False. """ return {'ID', 'URI', 'Description', 'Name'} < set(self._column_names())
[docs] def describe(self): print(self)