Source code for deriva.config.acl_config

import sys
import json
import re
from deriva.core import ErmrestCatalog, AttrDict, ermrest_model, get_credential, __version__ as VERSION, \
    format_exception, urlquote
from deriva.config.base_config import BaseSpec, BaseSpecList, ConfigUtil, ConfigBaseCLI
from requests.exceptions import HTTPError
from uuid import UUID
import warnings


[docs]class NoForeignKeyError(ValueError): pass
[docs]class ACLSpecList(BaseSpecList): def __init__(self, dictlist=None): BaseSpecList.__init__(self, ACLSpec, dictlist)
[docs]class ACLSpec(BaseSpec): def __init__(self, specdict): BaseSpec.__init__(self, specdict, ["acl", "no_acl", "acl_bindings", "invalidate_bindings"], "acl")
[docs] def validate(self): BaseSpec.validate(self) if self.get("no_acl") not in [True, False, None]: raise ValueError("no_acl must be True or False (or not present)") if self.get("acl") is not None and self.get("no_acl"): raise ValueError("can't specify an acl and no_acl=True in the same spec") if self.get("acl") is None and self.get("no_acl") == False: raise ValueError("if no_acl=False, an acl must be specified")
[docs]class AclConfig: NC_NAME = 'name' GC_NAME = 'groups' ACL_TYPES = ["catalog_acl", "schema_acls", "table_acls", "column_acls", "foreign_key_acls"] GLOBUS_PREFIX = 'https://auth.globus.org/' ROBOT_PREFIX_FORMAT = 'https://{server}/webauthn_robot/' def __init__(self, server, catalog_id, config_file, credentials, schema_name=None, table_name=None, verbose=False): self.config = json.load(open(config_file)) self.ignored_schema_patterns = [] self.verbose = verbose self.server = server self.catalog_id = catalog_id ip = self.config.get("ignored_schema_patterns") if ip is not None: for p in ip: self.ignored_schema_patterns.append(re.compile(p)) self.acl_specs = {"catalog_acl": self.config.get("catalog_acl")} for key in self.ACL_TYPES: if key != "catalog_acl": self.acl_specs[key] = self.make_speclist(key) self.groups = self.config.get("groups") self.expand_groups() self.acl_definitions = self.config.get("acl_definitions") self.expand_acl_definitions() self.acl_bindings = self.config.get("acl_bindings") self.invalidate_bindings = self.config.get("invalidate_bindings") old_catalog = ErmrestCatalog('https', self.server, self.catalog_id, credentials) self.saved_toplevel_config = ConfigUtil.find_toplevel_node(old_catalog.getCatalogModel(), schema_name, table_name) self.catalog = ErmrestCatalog('https', self.server, self.catalog_id, credentials) self.toplevel_config = ConfigUtil.find_toplevel_node(self.catalog.getCatalogModel(), schema_name, table_name)
[docs] def make_speclist(self, name): d = self.config.get(name) if d is None: d = dict() return ACLSpecList(d)
[docs] def add_node_acl(self, node, acl_name): acl = self.acl_definitions.get(acl_name) if acl is None: raise ValueError("no acl set called '{name}'".format(name=acl_name)) for k in acl.keys(): node.acls[k] = acl[k]
[docs] def add_node_acl_binding(self, node, table_node, binding_name): if not binding_name in self.acl_bindings: raise ValueError("no acl binding called '{name}'".format(name=binding_name)) binding = self.acl_bindings.get(binding_name) try: node.acl_bindings[binding_name] = self.expand_acl_binding(binding, table_node) except NoForeignKeyError as e: detail = '' if isinstance(node, ermrest_model.Column): detail = 'on column {n}'.format(n=node.name) elif isinstance(node, ermrest_model.ForeignKey): detail = 'on foreign key {s}.{n}'.format(s=node.names[0][0], n=node.names[0][1]) else: detail = ' {t}'.format(t=type(node)) print( "couldn't expand acl binding {b} {d} table {s}.{t}".format(b=binding_name, d=detail, s=table_node.schema.name, t=table_node.name)) raise e
[docs] def expand_acl_binding(self, binding, table_node): if not isinstance(binding, dict): return binding new_binding = dict() for k in binding.keys(): if k == "projection": new_binding[k] = [] for proj in binding.get(k): new_binding[k].append(self.expand_projection(proj, table_node)) elif k == "scope_acl": new_binding[k] = self.get_group(binding.get(k)) else: new_binding[k] = binding[k] return new_binding
[docs] def expand_projection(self, proj, table_node): if isinstance(proj, dict): new_proj = dict() is_first_outbound = True for k in proj.keys(): if k == "outbound_col": if is_first_outbound: is_first_outbound = False else: raise NotImplementedError( "don't know how to expand 'outbound_col' on anything but the first entry in a projection; " "use 'outbound' instead") if table_node is None: raise NotImplementedError( "don't know how to expand 'outbound_col' in a foreign key acl/annotation; use 'outbound' " "instead") new_proj["outbound"] = self.expand_projection_column(proj[k], table_node) if new_proj["outbound"] is None: return None else: new_proj[k] = proj[k] is_first_outbound = False return new_proj else: return proj
[docs] def expand_projection_column(self, col_name, table_node): for fkey in table_node.foreign_keys: if len(fkey.foreign_key_columns) == 1: col = fkey.foreign_key_columns[0] if col.table.name == table_node.name and col.table.schema.name == table_node.schema.name and col.name == col_name: return fkey.names[0] raise NoForeignKeyError("can't find foreign key for column %I.%I(%I)", table_node.schema.name, table_node.name, col_name)
[docs] def set_node_acl_bindings(self, node, table_node, binding_list, invalidate_list): node.acl_bindings.clear() if binding_list is not None: for binding_name in binding_list: self.add_node_acl_binding(node, table_node, binding_name) if invalidate_list is not None: for binding_name in invalidate_list: if binding_list and binding_name in binding_list: raise ValueError( "Binding {b} appears in both acl_bindings and invalidate_bindings for table {s}.{t} node {n}".format( b=binding_name, s=table_node.schema.name, t=table_node.name, n=node.name)) node.acl_bindings[binding_name] = False
[docs] def save_groups(self): glt = self.create_or_validate_group_table() if glt is not None and self.groups is not None: rows = [] for name in self.groups.keys(): row = {'name': name, 'groups': self.groups.get(name)} for c in ['RCB', 'RMB']: if glt.getColumn(c) is not None: row[c] = None rows.append(row) glt.upsertRows(self.catalog, rows)
[docs] def create_or_validate_schema(self, schema_name): schema = self.catalog.getCatalogSchema()['schemas'].get(schema_name) if schema is None: self.catalog.post("/schema/{s}".format(s=schema_name)) return self.catalog.getCatalogSchema()['schemas'].get(schema_name)
[docs] def create_table(self, schema_name, table_name, table_spec, comment=None): if table_spec is None: table_spec = dict() if schema_name is None: return None table_spec["schema_name"] = schema_name table_spec["table_name"] = table_name if table_spec.get('comment') is None and comment is not None: table_spec['comment'] = comment if table_spec.get('kind') is None: table_spec['kind'] = 'table' self.catalog.post("/schema/{s}/table".format(s=schema_name), json=table_spec) schema = self.catalog.getCatalogSchema()['schemas'].get(schema_name) return schema['tables'].get(table_name)
[docs] def create_or_validate_group_table(self): glt_spec = self.config.get('group_list_table') if glt_spec is None: return None sname = glt_spec.get('schema') tname = glt_spec.get('table') if sname is None or tname is None: raise ValueError("group_list_table missing schema or table") schema = self.create_or_validate_schema(sname) assert schema is not None glt = Table(schema['tables'].get(tname)) if glt == {}: glt_spec = ermrest_model.Table.define( tname, column_defs=[ ermrest_model.Column.define( self.NC_NAME, ermrest_model.builtin_types.text, nullok=False, comment='Name of grouplist, used in foreign keys. This table is maintained by the acl-config ' 'program and should not be updated by hand.' ), ermrest_model.Column.define( self.GC_NAME, ermrest_model.builtin_types['text[]'], nullok=True, comment='List of groups. This table is maintained by the acl-config program and should not be ' 'updated by hand.' ) ], key_defs=[ ermrest_model.Key.define( [self.NC_NAME], constraint_names=[[sname, "{t}_{c}_u".format(t=tname, c=self.NC_NAME)]] ) ], comment="Named lists of groups used in ACLs. Maintained by the acl-config program. Do not update this " "table manually.", annotations={'tag:isrd.isi.edu,2016:generated': None} ) glt = Table(self.create_table(sname, tname, glt_spec)) else: name_col = glt.getColumn(self.NC_NAME) if name_col is None: raise ValueError( 'table specified for group lists ({s}.{t}) lacks a "{n}" column'.format(s=sname, t=tname, n=self.NC_NAME)) if name_col.get('nullok'): raise ValueError( "{n} column in group list table ({s}.{t}) allows nulls".format(n=self.NC_NAME, s=sname, t=tname)) nc_uniq = False for key in glt.get('keys'): cols = key.get('unique_columns') if len(cols) == 1 and cols[0] == self.NC_NAME: nc_uniq = True break if not nc_uniq: raise ValueError( "{n} column in group list table ({s}.{t}) is not a key".format(n=self.NC_NAME, s=sname, t=tname)) val_col = glt.getColumn(self.GC_NAME) if val_col is None: raise ValueError( 'table specified for group lists ({s}.{t}) lacks a "{n}" column'.format(s=sname, t=tname, n=self.GC_NAME)) if glt == {}: return None else: return glt
[docs] def set_node_acl(self, node, spec): node.acls.clear() acl_name = spec.get("acl") if acl_name is not None: self.add_node_acl(node, acl_name)
[docs] def expand_groups(self): for group_name in self.groups.keys(): self.expand_group(group_name)
[docs] def get_group(self, group_name): group = self.groups.get(group_name) if group is None: group = [group_name] return group
[docs] def validate_group(self, group): if group == '*': return elif group.startswith(self.GLOBUS_PREFIX): self.validate_globus_group(group) elif group.startswith(self.ROBOT_PREFIX_FORMAT.format(server=self.server)): self.validate_webauthn_robot(group) else: warnings.warn("Can't determine format of group '{g}'".format(g=group))
[docs] def validate_globus_group(self, group): guid = group[len(self.GLOBUS_PREFIX):] try: UUID(guid) except ValueError: raise ValueError("Group '{g}' appears to be a malformed Globus group".format(g=group)) if self.verbose: print("group '{g}' appears to be a syntactically-correct Globus group".format(g=group))
[docs] def validate_webauthn_robot(self, group): robot_name = group[len(self.ROBOT_PREFIX_FORMAT.format(server=self.server)):] if not robot_name: raise ValueError("Group '{g}' appears to be a malformed webauthn robot identity".format(g=group)) if self.verbose: print("group '{g}' appears to be a syntactically-correct webauthn robot identity".format(g=group))
[docs] def expand_group(self, group_name): groups = [] for child_name in self.groups.get(group_name): child = self.groups.get(child_name) if child is None: self.validate_group(child_name) groups.append(child_name) else: self.expand_group(child_name) groups = groups + self.groups[child_name] self.groups[group_name] = list(set(groups))
[docs] def expand_acl_definitions(self): for acl_name in self.acl_definitions.keys(): self.expand_acl_definition(acl_name)
[docs] def expand_acl_definition(self, acl_name): spec = self.acl_definitions.get(acl_name) for op_type in spec.keys(): groups = [] raw_groups = spec[op_type] if isinstance(raw_groups, list): for group_name in spec[op_type]: groups = groups + self.get_group(group_name) else: groups = self.get_group(raw_groups) spec[op_type] = groups
[docs] def set_table_acls(self, table): spec = self.acl_specs["table_acls"].find_best_table_spec(table.schema.name, table.name) table.acls.clear() table.acl_bindings.clear() if spec is not None: self.set_node_acl(table, spec) self.set_node_acl_bindings(table, table, spec.get("acl_bindings"), spec.get("invalidate_bindings")) if self.verbose: print( "set table {s}.{t} acls to {a}, bindings to {b}".format(s=table.schema.name, t=table.name, a=str(table.acls), b=str(table.acl_bindings))) for column in table.column_definitions: self.set_column_acls(column, table) for fkey in table.foreign_keys: self.set_fkey_acls(fkey, table)
[docs] def set_column_acls(self, column, table): spec = self.acl_specs["column_acls"].find_best_column_spec(column.table.schema.name, column.table.name, column.name) column.acls.clear() column.acl_bindings.clear() if spec is not None: self.set_node_acl(column, spec) self.set_node_acl_bindings(column, table, spec.get("acl_bindings"), spec.get("invalidate_bindings")) if self.verbose: print("set column {s}.{t}.{c} acls to {a}, bindings to {b}".format(s=column.table.schema.name, t=column.table.name, c=column.name, a=str(column.acls), b=str(column.acl_bindings)))
[docs] def set_fkey_acls(self, fkey, table): spec = self.acl_specs["foreign_key_acls"].find_best_foreign_key_spec(fkey.table.schema.name, fkey.table.name, fkey.names) fkey.acls.clear() fkey.acl_bindings.clear() if spec is not None: self.set_node_acl(fkey, spec) self.set_node_acl_bindings(fkey, table, spec.get("acl_bindings"), spec.get("invalidate_bindings")) if self.verbose: print("set fkey {f} acls to {a}, bindings to {b}".format(f=str(fkey.names), a=str(fkey.acls), b=str(fkey.acl_bindings)))
[docs] def set_catalog_acls(self, catalog): spec = self.acl_specs["catalog_acl"] if spec is not None: catalog.acls.clear() self.set_node_acl(catalog, spec) if self.verbose: print("set catalog acls to {a}".format(a=str(catalog.acls))) for schema in self.toplevel_config.schemas.values(): self.set_schema_acls(schema)
[docs] def set_schema_acls(self, schema): for pattern in self.ignored_schema_patterns: if pattern.match(schema.name) is not None: print("ignoring schema {s}".format(s=schema.name)) return spec = self.acl_specs["schema_acls"].find_best_schema_spec(schema.name) schema.acls.clear() if spec is not None: self.set_node_acl(schema, spec) if self.verbose: print("set schema {s} acls to {a}".format(s=schema.name, a=str(schema.acls))) for table in schema.tables.values(): self.set_table_acls(table)
[docs] def set_acls(self): if isinstance(self.toplevel_config, ermrest_model.Model): self.set_catalog_acls(self.toplevel_config) elif isinstance(self.toplevel_config, ermrest_model.Schema): self.set_schema_acls(self.toplevel_config) elif isinstance(self.toplevel_config, ermrest_model.Table): self.set_table_acls(self.toplevel_config) else: raise ValueError("toplevel config is a {t}".format(t=str(type(self.toplevel_config))))
[docs] def apply_acls(self): self.toplevel_config.apply(self.saved_toplevel_config)
[docs] def dumps(self): """Dump a serialized (string) representation of the config. """ return json.dumps(self.toplevel_config.prejson(), indent=2)
[docs]class Table(AttrDict): ERMREST_DEFAULT_COLS = ["RID", "RCB", "RMB", "RCT", "RMT"] def __init__(self, d): if d is None: return self.base_entity_url = "/entity/{s}:{t}".format(s=d['schema_name'], t=d['table_name']) AttrDict.__init__(self, d)
[docs] def getColumn(self, name): if self.get('column_definitions') is None: return None for c in self['column_definitions']: if c.get('name') == name: return c return None
[docs] def getBaseEntityURL(self): return self.base_entity_url
[docs] def upsertRows(self, catalog, rows): try: self.insertRows(catalog, rows) except HTTPError as err: if err.response.status_code == 409: for row in rows: self.upsertRow(catalog, row)
[docs] def find_keys(self): keys = self.get('keys') if keys is None: return keys for k in keys: for u in k.get('unique_columns'): c = self.getColumn(u) if c.get('nullok'): keys.remove(k) break return keys
[docs] def row_has_key(self, row, key): for u in key.get('unique_columns'): if row.get(u) is None: return False return True
[docs] def getRowFilter(self, row): filters = [] key = None for k in self.find_keys(): if self.row_has_key(row, k): key = k break if key is None: raise ValueError("can't find appropriate key") for k in key.get('unique_columns'): filters.append("{k}={v}".format(k=urlquote(k), v=urlquote(row[k]))) return filters
[docs] def getRow(self, catalog, row, filters): url = "{u}/{f}".format(u=self.getBaseEntityURL(), f="&".join(filters)) vals = catalog.get(url, headers={'Content-Type': 'application/json'}).json() if vals is None or len(vals) == 0: return None return vals[0]
[docs] def getDefaultCols(self, add_ermrest_defaults=True): default_cols = [] for col in self.column_definitions: if col.get("default") is not None: default_cols.append(col.get("name")) if add_ermrest_defaults: default_cols = list(set(default_cols + self.ERMREST_DEFAULT_COLS)) return default_cols
[docs] def upsertRow(self, catalog, row): try: return self.insertRows(catalog, [row]) except HTTPError as err: if err.response.status_code == 409: return self.updateRow(catalog, row)
[docs] def updateRow(self, catalog, row): filters = self.getRowFilter(row) old_row = self.getRow(catalog, row, filters) for c in self.getDefaultCols(): if row.get(c) is None and old_row.get(c) is not None: row[c] = old_row[c] return catalog.put(self.getBaseEntityURL(), json=[row], headers={'Content-Type': 'application/json'})
[docs] def insertRows(self, catalog, rows): default_cols = self.getDefaultCols(False) if default_cols is not None and len(default_cols) != 0: url = "{u}?defaults={d}".format(u=self.getBaseEntityURL(), d=",".join(default_cols)) else: url = self.getBaseEntityURL() return catalog.post(url, json=rows, headers={'Content-Type': 'application/json'})
def __str__(self): return dict.__str__(self)
[docs]class AclCLI(ConfigBaseCLI): def __init__(self): ConfigBaseCLI.__init__(self, "ACL configuration tool", None, version=VERSION) group = self.parser.add_mutually_exclusive_group() group.add_argument('-g', '--groups-only', help="create group table only", action="store_true") group.add_argument('-o', '--omit-groups', help="do not create group table", action="store_true")
[docs]def main(): cli = AclCLI() args = cli.parse_cli() table_name = cli.get_table_arg(args) schema_names = cli.get_schema_arg_list(args) credentials = get_credential(args.host, args.credential_file) save_groups = not (args.dryrun or args.omit_groups) for schema in schema_names: acl_config = AclConfig(args.host, args.catalog, args.config_file, credentials, schema_name=schema, table_name=table_name, verbose=args.verbose or args.debug) try: if save_groups: acl_config.save_groups() save_groups = False if not args.groups_only: acl_config.set_acls() if not args.dryrun: acl_config.apply_acls() except HTTPError as e: print(format_exception(e)) raise if args.dryrun: print(acl_config.dumps())
if __name__ == '__main__': sys.exit(main())