Source code for deriva.config.dump_catalog_annotations

import sys
import json
import re
from deriva.core import ErmrestCatalog, AttrDict, get_credential
from deriva.config.base_config import BaseSpec, BaseSpecList, ConfigUtil, ConfigBaseCLI
from deriva.config.annotation_config import AttrSpec, AttrSpecList

MY_VERSION = 0.99


[docs]class Annotations: def __init__(self, server, catalog, credentials, config): self.annotations = {} self.ignored_schema_patterns = [] ip = config.get("ignored_schema_patterns") if ip is not None: for p in ip: self.ignored_schema_patterns.append(re.compile(p)) self.types = set() self.managed_attributes = [] self.ignore_unmanaged = True self.ignored_attributes = [] self.consolidated_annotations = {} self.annotations_to_delete = [] if config is not None: known_attributes = config.get("known_attributes") if known_attributes is not None: self.managed_attributes = known_attributes.get("managed", []) self.ignored_attributes = known_attributes.get("ignored", []) self.annotations_to_delete = known_attributes.get("to_delete", []) self.ignore_unmanaged = known_attributes.get("ignore_all_unmanaged", True) self.annotations["known_attributes"] = known_attributes else: self.annotations["known_attributes"] = {'managed': [], 'ignore_all_unmanaged': True} self.consolidated_annotations = config.get("consolidated_annotations", {}) for k in AttrSpecList.SPEC_TYPES: d = self.consolidated_annotations.get(k) if d is None: d = [dict()] self.consolidated_annotations[k] = AttrSpecList(known_attributes, d) self.annotations[k] = self.munge_specs(self.consolidated_annotations[k]) for k in self.managed_attributes: if k in self.annotations_to_delete: raise ValueError("{k} is both 'managed' and 'to_delete'".format(k=k)) self.catalog = ErmrestCatalog('https', server, catalog, credentials) self.catalog_config = self.catalog.getCatalogModel() if self.catalog_config.annotations is not None: self.add_catalog_annotations(self.catalog_config) if self.catalog_config.schemas is not None: for s in self.catalog_config.schemas.values(): self.add_schema_annotations(s)
[docs] def munge_specs(self, annotation_list): speclist = [] if annotation_list is not None: if isinstance(annotation_list, AttrSpecList): annotation_list = annotation_list.get_specs() for spec in annotation_list: speclist.append(spec.config_format()) return speclist
[docs] def consolidated_schema_annotation(self, annotation): matches = [] for c in self.consolidated_annotations["schema_annotations"].get_specs(): if c.schema_entry_matches(annotation.get("schema"), key=annotation.get("uri")): matches.append(c) return self.check_consolidation(matches, annotation.get("value"))
[docs] def consolidated_table_annotation(self, annotation): matches = [] for c in self.consolidated_annotations["table_annotations"].get_specs(): if c.table_entry_matches(annotation.get("schema"), annotation.get("table"), key=annotation.get("uri")): matches.append(c) return self.check_consolidation(matches, annotation.get("value"))
[docs] def consolidated_column_annotation(self, annotation): matches = [] for c in self.consolidated_annotations["column_annotations"].get_specs(): if c.column_entry_matches(annotation.get("schema"), annotation.get("table"), annotation.get("column"), key=annotation.get("uri")): matches.append(c) return self.check_consolidation(matches, annotation.get("value"))
[docs] def consolidated_foreign_key_annotation(self, annotation): matches = [] for c in self.consolidated_annotations["foreign_key_annotations"].get_specs(): if c.foreign_key_entry_matches(annotation.get("schema"), annotation.get("table"), annotation.get("foreign_key_schema"), annotation.get("foreign_key"), key=annotation.get("uri")): matches.append(c) return self.check_consolidation(matches, annotation.get("value"))
[docs] def check_consolidation(self, matches, value): if len(matches) != 1: # Zero or more than one matching pattern, so we need the exact spec to disambiguate return False match = matches[0] # if match.get("override") == True: # # We don't care what the original version was. We want to go with the pattern match # return True return match.get("value") == value
[docs] def add_catalog_annotations(self, catalog): annotations = self.find_relevant_annotations(catalog.annotations) if annotations is not None: for v in annotations: self.annotations["catalog_annotations"].append(v)
[docs] def add_schema_annotations(self, schema): annotations = self.find_relevant_annotations(schema.annotations) if annotations is not None: for v in annotations: v["schema"] = schema.name if not self.consolidated_schema_annotation(v): self.annotations["schema_annotations"].append(v) for table in schema.tables.values(): self.add_table_annotations(table)
[docs] def add_table_annotations(self, table): annotations = self.find_relevant_annotations(table.annotations) if annotations is not None: for v in annotations: v["schema"] = table.schema.name v["table"] = table.name if not self.consolidated_table_annotation(v): self.annotations["table_annotations"].append(v) for column in table.column_definitions: self.add_column_annotations(table, column) for fkey in table.foreign_keys: self.add_foreign_key_annotations(fkey)
[docs] def add_column_annotations(self, table, column): annotations = self.find_relevant_annotations(column.annotations) if annotations is not None: for v in annotations: v["schema"] = table.schema.name v["table"] = table.name v["column"] = column.name if not self.consolidated_column_annotation(v): self.annotations["column_annotations"].append(v)
[docs] def add_foreign_key_annotations(self, fkey): annotations = self.find_relevant_annotations(fkey.annotations) if annotations is not None: if len(fkey.names) < 1: raise ValueError("foreign key without a name") for v in annotations: v["schema"] = fkey.table.schema.name v["table"] = fkey.table.name v["foreign_key_schema"] = fkey.names[0][0] v["foreign_key"] = fkey.names[0][1] if not self.consolidated_foreign_key_annotation(v): self.annotations["foreign_key_annotations"].append(v)
[docs] def find_relevant_annotations(self, annotations): if annotations is None or len(annotations) == 0: return None new = [] if self.managed_attributes is None: for k in annotations.keys(): if k not in self.annotations_to_delete: new.append({"uri": k, "value": annotations[k]}) self.types.add(k) else: for k in annotations.keys(): if k in self.managed_attributes: new.append({"uri": k, "value": annotations[k]}) self.types.add(k) if len(new) == 0: return None return new
[docs] def dumps(self): return json.dumps(self.annotations, indent=4, sort_keys=True)
[docs] def types_list(self): types = list(self.types) types.sort() return types
[docs]def main(): cli = ConfigBaseCLI("annotation dump tool", None, version=MY_VERSION) cli.parser.add_argument('-l', help="list tags encountered", action="store_true") args = cli.parse_cli() managed_attrs = None if args.config_file is None: print("No config file specified") return 1 if args.host is None: print("No host specified") return 1 config = json.load(open(args.config_file)) credentials = get_credential(args.host, args.credential_file) annotations = Annotations(args.host, args.catalog, credentials, config) if args.l: for t in annotations.types_list(): print(t) if not args.l: print(annotations.dumps()) return 0
if __name__ == '__main__': sys.exit(main())