import sys
import json
import re
from deriva.core import ErmrestCatalog, AttrDict, ermrest_model, get_credential
from deriva.config.base_config import BaseSpec, BaseSpecList, ConfigUtil, ConfigBaseCLI
if sys.version_info > (3,):
unicode = str
MY_VERSION = 0.99
[docs]class NoForeignKeyError(ValueError):
pass
[docs]class AttrSpecList(BaseSpecList):
SPEC_TYPES = ["catalog_annotations", "schema_annotations", "table_annotations", "column_annotations",
"foreign_key_annotations"]
def __init__(self, known_attrs, specdict, strict=False):
self.ignore_unmanaged = False
self.managed_annotations = self.annotation_list(known_attrs.get(u'managed'))
if self.managed_annotations is None:
raise ValueError("No 'managed' attribute list")
if known_attrs.get(u'ignore_all_unmanaged'):
self.ignore_unmanaged = True
self.ignored_annotations = self.annotation_list(known_attrs.get(u'ignored'))
if self.ignored_annotations is None:
self.ignored_annotations = []
# dictlist = dictlist + [{"managed_annotations": self.managed_annotations}, {"ignored_annotations": self.ignored_annotations}, {"ignore_all_unmanaged": self.ignore_unmanaged}]
BaseSpecList.__init__(self, AttrSpec, specdict, strict)
[docs] def annotation_list(self, orig_list):
if orig_list is None:
return None
new = []
for item in orig_list:
new.append(unicode(item))
return new
[docs] def add_list(self, dictlist):
for d in dictlist:
if len(d) > 0:
s = AttrSpec(d, self.managed_annotations, self.ignore_unmanaged, self.ignored_annotations)
self.add_spec(s)
[docs]class AttrSpec(BaseSpec):
def __init__(self, specdict, managed_annotations, ignore_unmanaged, ignored_annotations):
BaseSpec.__init__(self, specdict, ["uri", "value"], "attributes", ignore_unmanaged)
self.ignore_unmanaged = ignore_unmanaged
self.managed_annotations = managed_annotations
self.ignored_annotations = ignored_annotations
self.known_annotations = self.managed_annotations + self.ignored_annotations
self.validate_annotation()
[docs] def validate_annotation(self):
return self.specdict.get("uri") in self.managed_annotations
[docs]class AttrConfig:
def __init__(self, server, catalog_id, config_file, credentials, verbose=False, schema_name=None, table_name=None):
self.config = json.load(open(config_file))
self.ignored_schema_patterns = []
ip = self.config.get("ignored_schema_patterns")
if ip is not None:
for p in ip:
self.ignored_schema_patterns.append(re.compile(p))
self.known_attrs = self.config.get(u'known_attributes')
self.managed_annotations = self.known_attrs.get(u'managed')
self.known_annotations = self.managed_annotations
self.all_annotations = self.known_annotations
self.ignored_annotations = self.known_attrs.get(u'ignored')
if self.ignored_annotations is not None:
self.all_annotations = self.all_annotations + self.ignored_annotations
self.ignore_unmanaged = self.known_attrs.get(u'ignore_all_unmanaged')
self.annotation_specs = dict()
for key in AttrSpecList.SPEC_TYPES:
self.annotation_specs[key] = self.make_speclist(key)
self.server = server
self.catalog_id = catalog_id
self.verbose = verbose
old_catalog = ErmrestCatalog('https', self.server, self.catalog_id, credentials)
self.saved_toplevel_config = ConfigUtil.find_toplevel_node(old_catalog.getCatalogModel(), schema_name,
table_name)
self.catalog = ErmrestCatalog('https', self.server, self.catalog_id, credentials)
self.toplevel_config = ConfigUtil.find_toplevel_node(self.catalog.getCatalogModel(), schema_name, table_name)
[docs] def make_speclist(self, name):
d = self.config.get(unicode(name))
if d is None:
d = [dict()]
return AttrSpecList(self.known_attrs, d)
[docs] def find_best_schema_specs(self, schema_name):
specs = dict()
for key in self.managed_annotations:
specs[key] = self.annotation_specs["schema_annotations"].find_best_schema_spec(schema_name, key=key)
return specs
[docs] def find_best_table_specs(self, schema_name, table_name):
specs = dict()
for key in self.managed_annotations:
specs[key] = self.annotation_specs["table_annotations"].find_best_table_spec(schema_name, table_name,
key=key)
return specs
[docs] def find_best_fkey_specs(self, fkey):
specs = dict()
for key in self.managed_annotations:
specs[key] = self.annotation_specs["foreign_key_annotations"].find_best_foreign_key_spec(fkey.table.schema.name,
fkey.table.name,
fkey.names,
key=key)
return specs
[docs] def find_best_column_specs(self, schema_name, table_name, column_name):
specs = dict()
for key in self.managed_annotations:
specs[key] = self.annotation_specs["column_annotations"].find_best_column_spec(schema_name, table_name,
column_name, key=key)
return specs
[docs] def node_name(self, node):
if isinstance(node, ermrest_model.Schema):
return "schema {s}".format(s=str(node.name))
if isinstance(node, ermrest_model.Table):
return "table {s}.{t}".format(s=str(node.schema.name), t=str(node.name))
if isinstance(node, ermrest_model.Column):
return "column {s}.{t}.{c}".format(s=str(node.table.schema.name), t=str(node.table.name), c=str(node.name))
if isinstance(node, ermrest_model.ForeignKey):
return "foreign key {n}".format(n=str(node.names))
return str("unknown node type {t}".format(t=type(node)))
[docs] def set_node_annotations(self, node, specs, saved_node):
if specs is None:
if not self.ignore_unmanaged:
if self.verbose:
print("{n}: clearing annotations".format(n=self.node_name(node)))
node.annotations.clear()
return
for k in self.managed_annotations:
s = specs.get(k)
if s is not None and u'value' in s:
if self.verbose:
print("{n}: setting {k} to {v}".format(n=self.node_name(node), k=k, v=s[u'value']))
node.annotations[k] = s[u'value']
elif k in node.annotations:
if self.verbose:
print("{n}: clearing {k}".format(n=self.node_name(node), k=k))
node.annotations.pop(k)
if not self.ignore_unmanaged:
for k in node.annotations.keys():
if k not in self.all_annotations:
raise ValueError("annotation key {k} is neither managed nor ignored".format(k=k))
[docs] def set_table_annotations(self, table, saved_table):
self.set_node_annotations(table, self.find_best_table_specs(table.schema.name, table.name), saved_table)
for column in table.column_definitions:
self.set_column_annotations(column, self.find_named_column(saved_table, column.name))
for fkey in table.foreign_keys:
self.set_fkey_annotations(fkey, self.find_corresponding_fkey(saved_table, fkey))
[docs] def find_corresponding_fkey(self, table, base_fkey):
if table is None:
return None
if base_fkey.names is None or len(base_fkey.names) == 0:
return None
names = base_fkey.names[0]
if len(names) != 2:
return None
for fkey in table.foreign_keys:
if fkey is not None and fkey.names is not None and len(fkey.names) > 0:
for n in fkey.names:
if len(n) == 2 and n[0] == names[0] and n[1] == names[1]:
return fkey
return None
[docs] def find_named_column(self, table, column_name):
if table is None:
return None
for column in table.column_definitions:
if column.name == column_name:
return column
return None
[docs] def find_named_schema(self, catalog, schema_name):
if catalog is None or catalog.schemas is None:
return None
return catalog.schemas.get(schema_name)
[docs] def find_named_table(self, schema, table_name):
if schema is None:
return None
if schema.tables is None:
return None
return schema.tables.get(table_name)
[docs] def set_fkey_annotations(self, fkey, saved_fkey):
self.set_node_annotations(fkey, self.find_best_fkey_specs(fkey), saved_fkey)
[docs] def set_column_annotations(self, column, saved_column):
self.set_node_annotations(column, self.find_best_column_specs(column.table.schema.name, column.table.name, column.name),
saved_column)
[docs] def set_schema_annotations(self, schema, saved_schema):
for pat in self.ignored_schema_patterns:
if pat.match(schema.name) is not None:
print("ignoring schema {s}".format(s=schema.name))
return
specs = self.find_best_schema_specs(schema.name)
self.set_node_annotations(schema, specs, saved_schema)
for table in schema.tables.values():
self.set_table_annotations(table, self.find_named_table(saved_schema, table.name))
[docs] def set_catalog_annotations(self):
specs = dict()
for key in self.managed_annotations:
specs[key] = self.annotation_specs["catalog_annotations"].find_catalog_spec(key)
self.set_node_annotations(self.toplevel_config, specs, self.saved_toplevel_config)
for schema in self.toplevel_config.schemas.values():
self.set_schema_annotations(schema, self.find_named_schema(self.saved_toplevel_config, schema.name))
[docs] def set_attributes(self):
if isinstance(self.toplevel_config, ermrest_model.Model):
self.set_catalog_annotations()
elif isinstance(self.toplevel_config, ermrest_model.Schema):
self.set_schema_annotations(self.toplevel_config, self.saved_toplevel_config)
elif isinstance(self.toplevel_config, ermrest_model.Table):
self.set_table_annotations(self.toplevel_config, self.saved_toplevel_config)
else:
raise ValueError("toplevel config is a {t}".format(t=str(type(self.toplevel_config))))
[docs] def apply_annotations(self):
self.toplevel_config.apply(self.saved_toplevel_config)
[docs]def main():
cli = ConfigBaseCLI("annotation config tool", None, version=MY_VERSION)
args = cli.parse_cli()
table_name = cli.get_table_arg(args)
schema_names = cli.get_schema_arg_list(args)
credentials = get_credential(args.host, args.credential_file)
for schema in schema_names:
attr_config = AttrConfig(args.host, args.catalog, args.config_file, credentials, args.verbose or args.debug,
schema, table_name)
attr_config.set_attributes()
if not args.dryrun:
attr_config.apply_annotations()
if __name__ == '__main__':
sys.exit(main())