from __future__ import print_function
from urllib.parse import urlparse
import ast
import logging
import os
import re
import sys
import traceback
import requests
from requests.exceptions import HTTPError
from deriva.core import format_exception
from deriva.core.utils import eprint
from deriva.core.base_cli import BaseCLI
from yapf.yapflib.yapf_api import FormatCode
from deriva.core import get_credential, AttrDict, ErmrestCatalog
from deriva.core import tag as chaise_tags
from deriva.utils.catalog.manage.deriva_file_templates import table_file_template, schema_file_template, \
catalog_file_template
from deriva.utils.catalog.version import __version__ as VERSION
from deriva.utils.catalog.manage.graph_catalog import DerivaCatalogToGraph
IS_PY2 = (sys.version_info[0] == 2)
IS_PY3 = (sys.version_info[0] == 3)
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
yapf_style = {
'based_on_style': 'pep8',
'allow_split_before_dict_value': False,
'split_before_first_argument': False,
'disable_ending_comma_heuristic': True,
'DEDENT_CLOSING_BRACKETS': True,
'column_limit': 100
}
[docs]class DerivaDumpCatalogException (Exception):
"""Base exception class for DerivaDumpCatalog.
"""
def __init__(self, message):
"""Initializes the exception.
"""
super(DerivaDumpCatalogException, self).__init__(message)
[docs]class UsageException (DerivaDumpCatalogException):
"""Usage exception.
"""
def __init__(self, message):
"""Initializes the exception.
"""
super(UsageException, self).__init__(message)
[docs]class DerivaCatalogToString:
def __init__(self, catalog, provide_system_columns=True, groups=None):
self._model = catalog.getCatalogModel()
self.host = urlparse(catalog.get_server_uri()).hostname
self.catalog_id = self._model.catalog.catalog_id
self._provide_system_columns = provide_system_columns
# Get the currently known groups for this catalog.
self._groups = groups
if groups is None:
try:
self._groups = AttrDict(
{e['Display_Name']: e['ID'] for e in self._model.catalog.getPathBuilder().public.ERMrest_Group.entities()}
)
except AttributeError:
logger.warning('Cannot access ERMrest_Group table. Check ACLs')
self._groups = AttrDict()
self._referenced_groups = {}
self._variables = self._groups.copy()
self._variables.update(chaise_tags)
[docs] def substitute_variables(self, code):
"""
Factor out code and replace with a variable name.
:param code:
:return: new code
"""
for k, v in self._variables.items():
varsub = r"(['\"])+{}\1".format(v)
if k in chaise_tags:
repl = 'chaise_tags.{}'.format(k)
elif k in self._groups:
repl = 'groups[{!r}]'.format(k)
if v in code:
self._referenced_groups[k] = v
else:
repl = k
code = re.sub(varsub, repl, code)
return code
[docs] def variable_to_str(self, name, value, substitute=True):
"""
Print out a variable assignment on one line if empty, otherwise pretty print.
:param name: Left hand side of assigment
:param value: Right hand side of assignment
:param substitute: If true, replace the group and tag values with their corresponding names
:return:
"""
s = '{} = {!r}\n'.format(name, value)
if substitute:
s = self.substitute_variables(s)
return s
[docs] def tag_variables_to_str(self, annotations):
"""
For each convenient annotation name in tag_map, print out a variable declaration of the form annotation = v
where v is the value of the annotation the dictionary. If the tag is not in the set of annotations, do nothing.
:param annotations:
:return:
"""
s = []
for t, v in chaise_tags.items():
if v in annotations:
s.append(self.variable_to_str(t, annotations[v]))
s.append('\n')
return ''.join(s)
[docs] def annotations_to_str(self, annotations, var_name='annotations'):
"""
Print out the annotation definition in annotations, substituting the python variable for each of the tags
specified in tag_map.
:param annotations:
:param var_name:
:return:
"""
var_map = {v: k for k, v in self._variables.items()}
if annotations == {}:
s = '{} = {{}}\n'.format(var_name)
else:
s = '{} = {{'.format(var_name)
for t, v in annotations.items():
if t in var_map:
# Use variable value rather then inline annotation value.
s += self.substitute_variables('{!r}:{},'.format(t, var_map[t]))
else:
s += "'{}' : {!r},".format(t, v)
s += '}\n'
return s
[docs] def schema_to_str(self, schema_name):
schema = self._model.schemas[schema_name]
annotations = self.variable_to_str('annotations', schema.annotations)
acls = self.variable_to_str('acls', schema.acls)
comments = self.variable_to_str('comment', schema.comment)
groups = self.variable_to_str('groups', self._referenced_groups, substitute=False)
s = schema_file_template.format(host=self.host, catalog_id=self.catalog_id, schema_name=schema_name,
annotations=annotations, acls=acls, comments=comments, groups=groups,
table_names='table_names = [\n{}]\n'.format(
str.join('', ['{!r},\n'.format(i) for i in schema.tables])))
s = FormatCode(s, style_config=yapf_style)[0]
return s
[docs] def catalog_to_str(self):
tag_variables = self.tag_variables_to_str(self._model.annotations)
annotations = self.annotations_to_str(self._model.annotations)
acls = self.variable_to_str('acls', self._model.acls)
groups = self.variable_to_str('groups', self._referenced_groups, substitute=False)
s = catalog_file_template.format(host=self.host, catalog_id=self.catalog_id, groups=groups,
tag_variables=tag_variables,
annotations=annotations,
acls=acls)
s = FormatCode(s, style_config=yapf_style)[0]
return s
[docs] def table_annotations_to_str(self, table):
s = ''.join([self.tag_variables_to_str(table.annotations), '\n',
self.annotations_to_str(table.annotations, var_name='table_annotations'), '\n',
self.variable_to_str('table_comment', table.comment), '\n',
self.variable_to_str('table_acls', table.acls), '\n',
self.variable_to_str('table_acl_bindings', table.acl_bindings)])
return s
[docs] def column_annotations_to_str(self, table):
column_annotations = {}
column_acls = {}
column_acl_bindings = {}
column_comment = {}
for i in table.column_definitions:
if not (i.annotations == '' or not i.comment):
column_annotations[i.name] = i.annotations
if not (i.comment == '' or not i.comment):
column_comment[i.name] = i.comment
if i.annotations != {}:
column_annotations[i.name] = i.annotations
if i.acls != {}:
column_acls[i.name] = i.acls
if i.acl_bindings != {}:
column_acl_bindings[i.name] = i.acl_bindings
s = self.variable_to_str('column_annotations', column_annotations) + '\n'
s += self.variable_to_str('column_comment', column_comment) + '\n'
s += self.variable_to_str('column_acls', column_acls) + '\n'
s += self.variable_to_str('column_acl_bindings', column_acl_bindings) + '\n'
return s
[docs] def foreign_key_defs_to_str(self, table):
s = 'fkey_defs = [\n'
for fkey in table.foreign_keys:
s += """ em.ForeignKey.define({},
'{}', '{}', {},
constraint_names={},\n""".format([c.name for c in fkey.foreign_key_columns],
fkey.pk_table.schema.name,
fkey.pk_table.name,
[c.name for c in fkey.referenced_columns],
fkey.names)
for i in ['annotations', 'acls', 'acl_bindings', 'on_update', 'on_delete', 'comment']:
a = getattr(fkey, i)
if not (a == {} or a is None or a == 'NO ACTION' or a == ''):
v = "'" + a + "'" if re.match('comment|on_update|on_delete', i) else a
s += " {}={},\n".format(i, v)
s += ' ),\n'
s += ']'
s = self.substitute_variables(s)
return s
[docs] def key_defs_to_str(self, table):
s = 'key_defs = [\n'
for key in table.keys:
s += """ em.Key.define({},
constraint_names={},\n""".format([c.name for c in key.unique_columns],
key.names if key.name else [])
for i in ['annotations', 'comment']:
a = getattr(key, i)
if not (a == {} or a is None or a == ''):
v = "'" + a + "'" if i == 'comment' else a
s += " {} = {},\n".format(i, v)
s += '),\n'
s += ']'
s = self.substitute_variables(s)
return s
[docs] def column_defs_to_str(self, table):
system_columns = ['RID', 'RCB', 'RMB', 'RCT', 'RMT']
s = ['column_defs = [']
for col in table.column_definitions:
if col.name in system_columns and self._provide_system_columns:
continue
s.append(''' em.Column.define('{}', em.builtin_types['{}'],'''.
format(col.name, col.type.typename + '[]' if 'is_array' is True else col.type.typename))
if col.nullok is False:
s.append("nullok=False,")
if col.default and col.name not in system_columns:
s.append("default={!r},".format(col.default))
for i in ['annotations', 'acls', 'acl_bindings', 'comment']:
colvar = getattr(col, i)
if colvar: # if we have a value for this field....
s.append("{}=column_{}['{}'],".format(i, i, col.name))
s.append('),\n')
s.append(']')
return ''.join(s)
[docs] def table_def_to_str(self):
s = """table_def = em.Table.define(table_name,
column_defs=column_defs,
key_defs=key_defs,
fkey_defs=fkey_defs,
annotations=table_annotations,
acls=table_acls,
acl_bindings=table_acl_bindings,
comment=table_comment,
provide_system = {}
)""".format(self._provide_system_columns)
return s
[docs] def table_to_str(self, schema_name, table_name):
logger.debug('%s %s %s', schema_name, table_name, [i for i in self._model.schemas])
table = self._model.schemas[schema_name].tables[table_name]
column_annotations = self.column_annotations_to_str(table)
column_defs = self.column_defs_to_str(table)
table_annotations = self.table_annotations_to_str(table)
key_defs = self.key_defs_to_str(table)
fkey_defs = self.foreign_key_defs_to_str(table)
table_def = self.table_def_to_str()
groups = self.variable_to_str('groups', self._referenced_groups, substitute=False)
s = table_file_template.format(host=self.host, catalog_id=self.catalog_id,
table_name=table_name, schema_name=schema_name, groups=groups,
column_annotations=column_annotations,
column_defs=column_defs,
table_annotations=table_annotations,
key_defs=key_defs,
fkey_defs=fkey_defs,
table_def=table_def)
s = FormatCode(s, style_config=yapf_style)[0]
return s
[docs]class DerivaDumpCatalogCLI (BaseCLI):
def __init__(self, description, epilog):
super(DerivaDumpCatalogCLI, self).__init__(description, epilog, VERSION, hostname_required=True)
def python_value(s):
try:
val = ast.literal_eval(s)
except ValueError:
val = s
return val
self.dumpdir = ''
self.host = None
self.catalog_id = 1
self.graph_format = None
self.catalog = None
# parent arg parser
parser = self.parser
parser.add_argument('--catalog', '--catalog-id', metavar='CATALOG-NUMBER', default=1, help='ID number of desired catalog')
parser.add_argument('--dir', default="catalog-configs", help='output directory name')
group = parser.add_mutually_exclusive_group()
group.add_argument('--table', default=None, help='Only dump out the spec for the specified table. Format is '
'schema_name:table_name')
parser.add_argument('--schemas', nargs='*', default=[],
help='Only dump out the spec for the specified schemas.')
parser.add_argument('--skip-schemas', nargs='*', default=[], help='List of schema so skip over')
group.add_argument('--graph', action='store_true', help='Dump graph of catalog')
parser.add_argument('--graph-format', choices=['pdf', 'dot', 'png', 'svg'],
default='pdf', help='Format to use for graph dump')
@staticmethod
def _get_credential(host_name, token=None):
if token:
return {"cookie": "webauthn={t}".format(t=token)}
else:
return get_credential(host_name)
def _dump_table(self, schema_name, table_name, stringer=None, dumpdir='.'):
logger.info("Dumping out table def: {}:{}".format(schema_name,table_name))
if not stringer:
stringer = DerivaCatalogToString(self.catalog)
table_string = stringer.table_to_str(schema_name, table_name)
filename= dumpdir + '/' + table_name + '.py'
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'wb') as f:
f.write(table_string.encode("utf-8"))
def _dump_catalog(self):
stringer = DerivaCatalogToString(self.catalog)
catalog_string = stringer.catalog_to_str()
with open('{}/{}_{}.py'.format(self.dumpdir, self.host, self.catalog_id), 'wb') as f:
f.write(catalog_string.encode("utf-8"))
for schema_name in self.schemas:
logger.info("Dumping schema def for {}....".format(schema_name))
schema_string = stringer.schema_to_str(schema_name)
with open('{}/{}.schema.py'.format(self.dumpdir, schema_name), 'wb') as f:
f.write(schema_string.encode("utf-8"))
for schema_name, schema in self.model.schemas.items():
if schema_name in self.schemas:
for table_name in schema.tables:
self._dump_table(schema_name, table_name, stringer=stringer,
dumpdir='{}/{}'.format(self.dumpdir, schema_name))
def _graph_catalog(self):
graph = DerivaCatalogToGraph(self.catalog)
graphfile = '{}_{}'.format(self.host, self.catalog_id)
graph.catalog_to_graph(schemas=[s for s in self.schemas if s not in ['_acl_admin', 'public', 'WWW']],
skip_terms=True,
skip_association_tables=True)
graph.save(filename=graphfile, format=self.graph_format)
[docs] def main(self):
args = self.parse_cli()
self.dumpdir = args.dir
self.host = args.host
self.catalog_id = args.catalog
self.graph_format = args.graph_format
if self.host is None:
eprint('Host name must be provided')
return 1
self.catalog = ErmrestCatalog('https', self.host, self.catalog_id, credentials=self._get_credential(self.host))
self.model = self.catalog.getCatalogModel()
self.schemas = [s for s in (args.schemas if args.schemas else self.model.schemas)
if s not in args.skip_schemas
]
try:
os.makedirs(self.dumpdir, exist_ok=True)
except OSError as e:
sys.stderr.write(str(e))
return 1
logger.info('Catalog has {} schema and {} tables'.format(len(self.model.schemas),
sum([len(v.tables) for k, v in
self.model.schemas.items()])))
logger.info('\n'.join([' {} has {} tables'.format(k, len(s.tables))
for k, s in self.model.schemas.items()]))
try:
if args.table:
if ':' not in args.table:
raise DerivaDumpCatalogException('Table name must be in form of schema:table')
[schema_name, table_name] = args.table.split(":")
self._dump_table(schema_name, table_name)
elif args.graph:
self._graph_catalog()
else:
self._dump_catalog()
except DerivaDumpCatalogException as e:
print(e)
except HTTPError as e:
if e.response.status_code == requests.codes.unauthorized:
msg = 'Authentication required for {}'.format(args.server)
elif e.response.status_code == requests.codes.forbidden:
msg = 'Permission denied'
else:
msg = e
logging.debug(format_exception(e))
eprint(msg)
except RuntimeError as e:
sys.stderr.write(str(e))
return 1
except:
traceback.print_exc()
return 1
finally:
sys.stderr.write("\n\n")
return
[docs]def main():
DESC = "DERIVA Dump Catalog Command-Line Interface"
INFO = "For more information see: https://github.com/informatics-isi-edu/deriva-catalog-manage"
return DerivaDumpCatalogCLI(DESC, INFO).main()
if __name__ == '__main__':
sys.exit(main())