Source code for deriva.utils.catalog.manage.update_catalog

import argparse
import logging
from requests.exceptions import HTTPError

from deriva.core import get_credential, AttrDict, ErmrestCatalog

logger = logging.getLogger(__name__)


[docs]def parse_args(server, catalog_id, is_table=False, is_catalog=False): parser = argparse.ArgumentParser(description='Update catalog configuration') parser.add_argument('--host', default=server, help='Catalog host name') parser.add_argument('--catalog', default=catalog_id, help='ID of desired catalog') parser.add_argument('--replace', action='store_true', help='Replace existing values with new ones. Otherwise, attempt to merge in values provided.') if is_table: modes = ['table', 'annotations', 'acls', 'comment', 'keys', 'fkeys', 'columns'] elif is_catalog: modes = ['annotations', 'acls'] parser.add_argument('--recurse', action='store_true', help='Update all schema and tables in the catalog.') else: modes = ['schema', 'annotations', 'acls', 'comment'] parser.add_argument('--recurse', action='store_true', help='Update all tables in the schema.') parser.add_argument('mode', choices=modes, help='Model element to be updated.') args = parser.parse_args() return args.mode, args.replace, args.host, args.catalog
[docs]class CatalogUpdaterException(Exception): def __init__(self, msg='Catalog Update Exception'): self.msg = msg
[docs]class CatalogUpdater: def __init__(self, catalog): self._catalog = catalog self._model = self._catalog.getCatalogModel()
[docs] @staticmethod def update_annotations(o, annotations, merge=False): logger.debug('%s %s %s', o, annotations, merge) if not merge: o.annotations.clear() o.annotations.update(annotations)
[docs] @staticmethod def update_acls(o, acls, merge=False): if not merge: o.acls.clear() o.acls.update(acls)
[docs] @staticmethod def update_acl_bindings(o, acl_bindings, merge=False): if not merge: o.acl_bindings.clear() o.acl_bindings.update(acl_bindings)
[docs] def update_catalog(self, mode, annotations, acls, replace=False, merge=False): if mode not in ['annotations', 'acls']: raise CatalogUpdaterException(msg="Unknown mode {}".format(mode)) if mode == 'annotations': self.update_annotations(self._model, annotations, merge=merge) elif mode == 'acls': self.update_acls(self._model, acls, merge=merge) self._model.apply()
[docs] def update_schema(self, mode, schema_def, replace=False, merge=False, really=False): schema_name = schema_def['schema_name'] annotations = schema_def['annotations'] acls = schema_def['acls'] comment = schema_def.get('comment', None) if mode not in ['schema', 'annotations', 'comment', 'acls']: raise CatalogUpdaterException(msg="Unknown mode {}".format(mode)) if mode == 'schema': if replace: schema = self._model.schema_exists(schema_name) logger.info('Deleting schema %s', schema.name) ok = 'YES' if really else input('Type YES to confirm:') if ok == 'YES': schema.drop() schema = self._model.create_schema(schema_def) else: schema = self._model.schemas[schema_name] if mode == 'annotations': self.update_annotations(schema, annotations, merge=merge) elif mode == 'acls': self.update_acls(schema, acls, merge=merge) elif mode == 'comment': schema.comment = comment self._model.apply() return schema
[docs] def update_table(self, mode, schema_name, table_def, replace=False, merge=False, really=False): schema = self._model.schemas[schema_name] table_name = table_def['table_name'] column_defs = table_def['column_definitions'] table_acls = table_def['acls'] table_acl_bindings = table_def['acl_bindings'] table_annotations = table_def['annotations'] # TODO: changed this to get attribute to work around some test failures column_annotations = table_def.get('column_annotations') table_comment = table_def.get('comment', None) column_comment = table_def.get('column_comment', None) key_defs = table_def['keys'] fkey_defs = table_def['foreign_keys'] logger.info('Updating {}:{}'.format(schema_name, table_name)) if mode not in ['table', 'columns', 'fkeys', 'keys', 'annotations', 'comment', 'acls']: raise CatalogUpdaterException(msg="Unknown mode {}".format(mode)) skip_fkeys = False if mode == 'table': if replace: table = schema.tables[table_name] logger.info('Deleting table %s', table.name) ok = 'YES' if really else input('Type YES to confirm:') if ok == 'YES': table.drop() schema = self._model.schemas[schema_name] if skip_fkeys: table_def.fkey_defs = [] logger.info('Creating table...%s', table_name) table = schema.create_table(table_def) return table table = self._model.schemas[schema_name].tables[table_name] if mode == 'columns': if replace: table = schema.tables[table_name] logger.info('Deleting columns ', table.name) ok = 'YES' if really else input('Type YES to confirm:') if ok == 'YES': for k in [c for c in table.column_definitions]: if k.name in ['RID', 'RMB', 'RCB', 'RCT', 'RMT']: continue k.drop() # Go through the column definitions and add a new column if it doesn't already exist. for i in column_defs: try: logger.info('Creating column {}'.format(i['name'])) table.create_column(i) except HTTPError as e: if 'already exists' in e.args: print("Skipping existing column {}".format(i['names'])) else: print("Skipping: column key {} {}: \n{}".format(i['names'], i, e.args)) if mode == 'fkeys': if replace: logger.info('deleting foreign_keys') for k in [fk for fk in table.foreign_keys]: print('dropping', k.name) k.drop() for i in fkey_defs: try: table.create_fkey(i) print('Created foreign key {} {}'.format(i['names'], i)) except HTTPError as e: if 'already exists' in e.args: print("Skipping existing foreign key {}".format(i['names'])) else: print("Skipping: foreign key {} {}: \n{}".format(i['names'], i, e.args)) if mode == 'keys': if replace: logger.info('Deleting keys') for k in table.keys: k.drop() for i in key_defs: try: table.create_key(i) print('Created key {}'.format(i['names'])) except HTTPError as err: if 'already exists' in err.response.text: print("Skipping: key {} already exists".format(i['names'])) else: print(err.response.text) if mode == 'annotations': self.update_annotations(table, table_annotations, merge=merge) for c in table.column_definitions: if c.name in column_annotations: self.update_annotations(c, column_annotations[c.name], merge=merge) if mode == 'comment': table.comment = table_comment for c in table.column_definitions: if c.name in column_comment: c.comment = column_comment[c.name] if mode == 'acls': self.update_acls(table, table_acls) self.update_acl_bindings(table, table_acl_bindings, merge=merge) column_acls = {i['name']: i['acls'] for i in column_defs if 'acls' in i} column_acl_bindings = {i['name']: i['acl_bindings'] for i in column_defs if 'acl_bindings in i'} for c in table.column_definitions: if c.name in column_acls: self.update_acls(c, column_acls[c.name], merge=merge) if c.name in column_acl_bindings: self.update_acl_bindings(c, column_acl_bindings[c.name], merge=merge) self._model.apply()