Source code for deriva.core.ermrest_catalog

import os
import io
import re
import logging
import datetime
import codecs
import csv
import json
import requests
from typing import NamedTuple

from . import urlquote, urlsplit, urlunsplit, datapath, DEFAULT_HEADERS, DEFAULT_CHUNK_SIZE, DEFAULT_SESSION_CONFIG, \
    Megabyte, Kilobyte, get_transfer_summary, IS_PY2
from .deriva_binding import DerivaBinding, DerivaPathError
from . import ermrest_model
from .ermrest_model import nochange

[docs]class DerivaServer (DerivaBinding): """Persistent handle for a Deriva server.""" def __init__(self, scheme, server, credentials=None, caching=True, session_config=None): """Create a Deriva server binding. Arguments: scheme: 'http' or 'https' server: server FQDN string credentials: credential secrets, e.g. cookie caching: whether to retain a GET response cache """ super(DerivaServer, self).__init__(scheme, server, credentials, caching, session_config) self.scheme = scheme self.server = server self.credentials = credentials self.caching = caching self.session_config = session_config
[docs] def connect_ermrest(self, catalog_id, snaptime=None): """Connect to an ERMrest catalog and return the catalog binding. :param catalog_id: The id (or alias) of the existing catalog :param snaptime: The id for a desired catalog snapshot (default None) The catalog_id is normally a bare id (str), and the optional snaptime is a bare snapshot id (str). If the snaptime is None, the catalog_id may be a concatenated <id>@<snaptime> string, and it will be split to determine the snaptime. If no snaptime is passed separately or compounded with catalog_id, an ErmrestCatalog binding will be returned. Conversely, if a snaptime is determined, an ErmrestSnapshot (immutable) binding will be returned. """ return ErmrestCatalog.connect(self, catalog_id, snaptime)
[docs] def create_ermrest_catalog(self, id=None, owner=None): """Create an ERMrest catalog. :param id: The (str) id desired by the client (default None) :param owner: The initial (list of str) ACL desired by the client (default None) The new catalog id will be returned in the response, and used in future catalog access. The use of the id parameter may yield errors if the supplied value is not available for use by the client. The value None will result in a server-assigned catalog id. The initial "owner" ACL on the new catalog will be the client-supplied owner if provided. The use of owner parameter may yield errors if the supplied ACL does not match the client, i.e. the client cannot lock themselves out of the catalog. The value None will result in a server-assigned ACL with the requesting client's identity. Certain failure modes (or message loss) may leave the id reserved in the system. In this case, the effective owner ACL influences which client(s) are allowed to retry creation with the same id. """ return ErmrestCatalog.create(self, id, owner)
[docs] def connect_ermrest_alias(self, id): """Connect to an ERMrest alias and return the alias binding. :param id: The id of the existing alias """ return ErmrestAlias.connect(self, id)
[docs] def create_ermrest_alias(self, id=None, owner=None, alias_target=None): """Create an ERMrest catalog alias. :param id: The (str) id desired by the client (default None) :param owner: The initial (list of str) ACL desired by the client (default None) :param alias_target: The initial target catalog id binding desired by the client (default None) The new alias id will be returned in the response, and used in future alias access. The use of the id parameter may yield errors if the supplied value is not available for use by the client. The value None will result in a server-assigned alias id. The initial "owner" ACL on the new alias will be the client-supplied owner. The use of owner parameter may yield errors if the supplied ACL does not match the client, i.e. the client cannot lock themselves out of the alias. The value None will result in a server-assigned ACL with the requesting client's identity. The alias is bound to the client-supplied alias_target, if supplied. The use of alias_target may yield errors if the supplied value is not a valid target catalog id. The value None will reserve the alias in an unbound state. Certain failure modes (or message loss) may leave the id reserved in the system. In this case, the effective owner_acl influences which client(s) are allowed to retry creation with the same id. """ return ErmrestAlias.create(self, id, owner, alias_target)
[docs]class ErmrestCatalogMutationError(Exception): pass
_clone_state_url = "tag:isrd.isi.edu,2018:clone-status" DEFAULT_PAGE_SIZE = 100000
[docs]class ResolveRidResult (NamedTuple): datapath: datapath.DataPath table: ermrest_model.Table rid: str
[docs]class ErmrestCatalog(DerivaBinding): """Persistent handle for an ERMrest catalog. Provides basic REST client for HTTP methods on arbitrary paths. Caller has to understand ERMrest APIs and compose appropriate paths, headers, and/or content. Additional utility methods provided for accessing catalog metadata. """ table_schemas = dict() @property def deriva_server(self): """Return DerivaServer binding for the same server this catalog belongs to.""" return DerivaServer( self._scheme, self._server, self._credentials, self._caching, self._session_config, )
[docs] @classmethod def connect(cls, deriva_server, catalog_id, snaptime=None): """Connect to an ERMrest catalog and return the catalog binding. :param deriva_server: The DerivaServer binding which hosts ermrest :param catalog_id: The id (or alias) of the existing catalog :param snaptime: The id for a desired catalog snapshot (default None) The catalog_id is normally a bare id (str), and the optional snaptime is a bare snapshot id (str). If the snaptime is None, the catalog_id may be a concatenated <id>@<snaptime> string, and it will be split to determine the snaptime. If no snaptime is passed separately or compounded with catalog_id, an ErmrestCatalog binding will be returned. Conversely, if a snaptime is determined, an ErmrestSnapshot (immutable) binding will be returned. """ if not snaptime: splits = str(catalog_id).split('@') if len(splits) > 2: raise Exception('Malformed catalog identifier: multiple "@" characters found.') catalog_id = splits[0] snaptime = splits[1] if len(splits) == 2 else None if snaptime: return ErmrestSnapshot( deriva_server.scheme, deriva_server.server, catalog_id, snaptime, deriva_server.credentials, deriva_server.caching, deriva_server.session_config ) return cls( deriva_server.scheme, deriva_server.server, catalog_id, deriva_server.credentials, deriva_server.caching, deriva_server.session_config )
@classmethod def _digest_catalog_args(cls, id, owner): rep = dict() if isinstance(id, str): rep['id'] = id elif isinstance(id, (type(nochange), type(None))): pass else: raise TypeError('id must be of type str or None or nochange, not %s' % type(id)) if isinstance(owner, list): for e in owner: if not isinstance(e, str): raise TypeError('owner members must be of type str, not %s' % type(e)) rep['owner'] = owner elif isinstance(owner, (type(nochange), type(None))): pass else: raise TypeError('owner must be of type list or None or nochange, not %s' % type(owner)) return rep
[docs] @classmethod def create(cls, deriva_server, id=None, owner=None): """Create an ERMrest catalog and return the ERMrest catalog binding. :param deriva_server: The DerivaServer binding which hosts ermrest. :param id: The (str) id desired by the client (default None) :param owner: The initial (list of str) ACL desired by the client (default None) The new catalog id will be returned in the response, and used in future catalog access. The use of the id parameter may yield errors if the supplied value is not available for use by the client. The value None will result in a server-assigned catalog id. The initial "owner" ACL on the new catalog will be the client-supplied owner ACL. The use of owner parameter may yield errors if the supplied ACL does not match the client, i.e. the client cannot lock themselves out of the catalog. The value None will result in a server-assigned ACL with the requesting client's identity. Certain failure modes (or message loss) may leave the id reserved in the system. In this case, the effective owner ACL influences which client(s) are allowed to retry creation with the same id. """ path = '/ermrest/catalog' r = deriva_server.post(path, json=cls._digest_catalog_args(id, owner)) r.raise_for_status() return cls.connect(deriva_server, r.json()['id'])
def __init__(self, scheme, server, catalog_id, credentials=None, caching=True, session_config=None): """Create ERMrest catalog binding. Arguments: scheme: 'http' or 'https' server: server FQDN string catalog_id: e.g. '1' credentials: credential secrets, e.g. cookie caching: whether to retain a GET response cache Deriva Client Context: You MAY mutate self.dcctx to customize the context for this service endpoint prior to invoking web requests. E.g.: self.dcctx['cid'] = 'my application name' You MAY also supply custom per-request context by passing a headers dict to web request methods, e.g. self.get(..., headers={'deriva-client-context': {'action': 'myapp/function1'}}) This custom header will be merged as override values with the default context in self.dcctx in order to form the complete context for the request. """ super(ErmrestCatalog, self).__init__(scheme, server, credentials, caching, session_config) if isinstance(catalog_id, int): catalog_id = str(catalog_id) self._server_uri = "%s/ermrest/catalog/%s" % ( self._server_uri, urlquote(catalog_id), ) self._scheme, self._server, self._catalog_id, self._credentials, self._caching, self._session_config = \ scheme, server, catalog_id, credentials, caching, session_config @property def catalog_id(self): return self._catalog_id @property def alias_target(self): r = self.get('/') r.raise_for_status() rep = r.json() return rep.get('alias_target')
[docs] def exists(self): """Simple boolean test for catalog existence. :return: True if exists, False if not (404), otherwise raises exception """ try: self.get('/') return True except requests.HTTPError as e: if e.response.status_code == 404: return False else: raise
[docs] def latest_snapshot(self): """Gets a handle to this catalog's latest snapshot. """ r = self.get('/') r.raise_for_status() return ErmrestSnapshot(self._scheme, self._server, self._catalog_id, r.json()['snaptime'], self._credentials, self._caching, self._session_config)
[docs] def getCatalogModel(self): return ermrest_model.Model.fromcatalog(self)
[docs] def getCatalogSchema(self): path = '/schema' r = self.get(path) r.raise_for_status() return r.json()
[docs] def getPathBuilder(self): """Returns the 'path builder' interface for this catalog.""" return datapath.from_catalog(self)
[docs] def getTableSchema(self, fq_table_name): # first try to get from cache(s) s, t = self.splitQualifiedCatalogName(fq_table_name) cat = self.getCatalogSchema() schema = cat['schemas'][s]['tables'][t] if cat else None if schema: return schema schema = self.table_schemas.get(fq_table_name) if schema: return schema path = '/schema/%s/table/%s' % (s, t) r = self.get(path) resp = r.json() self.table_schemas[fq_table_name] = resp r.raise_for_status() return resp
[docs] def getTableColumns(self, fq_table_name): columns = set() schema = self.getTableSchema(fq_table_name) for column in schema['column_definitions']: columns.add(column['name']) return columns
[docs] def validateRowColumns(self, row, fq_tableName): columns = self.getTableColumns(fq_tableName) return set(row.keys()) - columns
[docs] def getDefaultColumns(self, row, table, exclude=None, quote_url=True): columns = self.getTableColumns(table) if isinstance(exclude, list): for col in exclude: columns.remove(col) defaults = [] supplied_columns = row.keys() for col in columns: if col not in supplied_columns: defaults.append(urlquote(col, safe='') if quote_url else col) return defaults
[docs] @staticmethod def splitQualifiedCatalogName(name): entity = name.split(':') if len(entity) != 2: logging.debug("Unable to tokenize %s into a fully qualified <schema:table> name." % name) return None return entity[0], entity[1]
[docs] def resolve_rid(self, rid: str, model: ermrest_model.Model=None, builder: datapath._CatalogWrapper=None) -> ResolveRidResult: """Resolve a RID value to return a ResolveRidResult (a named tuple). :param rid: The RID (str) to resolve :param model: A result from self.getCatalogModel() to reuse :param builder: A result from self.getPathBuilder() to reuse Raises KeyError if RID is not found in the catalog. The elements of the ResolveRidResult namedtuple provide more information about the entity identified by the supplied RID in this catalog: - datapath: datapath instance for querying the resolved entity - table: ermrest_model.Table instance containing the entity - rid: normalized version of the input RID value Example to simply retrieve entity content: path, _, _ = catalog.resolve_rid('1-0000') data = path.entities().fetch()[0] """ if model is None: model = self.getCatalogModel() if builder is None: builder = self.getPathBuilder() try: r = self.get('/entity_rid/%s' % urlquote(rid)) info = r.json() sname = info['schema_name'] tname = info['table_name'] rid = info['RID'] ptable = builder.schemas[sname].tables[tname] return ResolveRidResult( ptable.path.filter(ptable.RID == rid), model.schemas[sname].tables[tname], rid ) except requests.exceptions.HTTPError as e: if e.response.status_code == requests.codes.not_found: raise KeyError(rid) raise
[docs] def getAsFile(self, path, destfilename, headers=DEFAULT_HEADERS, callback=None, delete_if_empty=False, paged=False, page_size=DEFAULT_PAGE_SIZE, page_sort_columns=frozenset(["RID"])): """ Retrieve catalog data streamed to destination file. Caller is responsible to clean up file even on error, when the file may or may not exist. If "delete_if_empty" is True, the file will be inspected for "empty" content. In the case of json/json-stream content, the presence of a single empty JSON object will be tested for. In the case of CSV content, the file will be parsed with CSV reader to determine that only a single header line and no row data is present. """ self.check_path(path) # Only entity API supported with paged mode at this time, otherwise fallback. We fallback rather than raise an # exception in the case that the caller might be trying to perform an opportunistic paged request without # knowing a priori if paged support for the given query is available. page_size = page_size if page_size > 0 else DEFAULT_PAGE_SIZE if not (path.startswith("/entity") or path.startswith("/attribute")) and paged: logging.warning("Paged data retrieval only supported for entity or attribute API queries.") paged = False # Only "application/x-json-stream" or "text/csv" supported with paged mode at this time, otherwise fallback. accept = headers.get("accept") if accept not in ("application/x-json-stream", "text/csv"): logging.debug("Paged data retrieval not supported for content type: %s" % accept) paged = False headers = headers.copy() destfile = open(destfilename, 'w+b') try: total = 0 start = datetime.datetime.now() if not paged: with self._session.get(self._server_uri + path, headers=headers, stream=True) as r: self._response_raise_for_status(r) content_type = r.headers.get("Content-Type") logging.debug("Transferring file %s to %s" % (self._server_uri + path, destfilename)) for buf in r.iter_content(chunk_size=DEFAULT_CHUNK_SIZE): destfile.write(buf) total += len(buf) if callback: if not callback(progress="Downloading: %.2f MB transferred" % (float(total) / float(Megabyte))): destfile.close() return destfile.flush() else: first_page = True first_line = None last_record = None usr = urlsplit(self._server_uri + path) path = str(usr.path.split('@sort')[0]) while True: sort = "@sort(%s)%s" % (",".join(page_sort_columns or ["RID"]), ("@after(%s)" % ",".join(last_record)) if last_record is not None else "") limit = "limit=%s" % int(page_size) if page_size > 0 else "none" query = re.sub(r"([^.]*)(limit=.*?)($|[&;])([^.]*)$", r"\1%s\3\4" % limit, usr.query, flags=re.I) url = urlunsplit((usr.scheme, usr.netloc, path + sort, query if query else limit, usr.fragment)) # 1. Try to get a page worth of data, back-off page size if query run time errors are encountered with self._session.get(url, headers=headers) as r: if r.status_code == 400 and "Query run time limit exceeded" in r.text: if page_size == 1: self._response_raise_for_status(r) r.close() page_size //= 2 page_size = 1 if page_size < 1 else page_size logging.warning("Query runtime exceeded while attempting to transfer rows from %s to file " "[%s]. The page size is being reduced to %s and the query will be retried." % (url, destfilename, page_size)) if callback: if not callback(progress="Retrying query: %s" % url): destfile.close() return continue else: self._response_raise_for_status(r) # 2. Write the page to disk and check the last record processed in order to get the next page last_line = {} content_type = r.headers.get("Content-Type") logging.debug("Transferring data from [%s] to %s" % (url, destfilename)) # CSV processing iterates over lines in the response, skipping the header line(s) in all but # the first page, and captures the last line of each page to determine the last record processed if content_type == "text/csv": skip = 1 line_num = 0 if first_page: lines = r.iter_lines(decode_unicode=True) reader = csv.reader(lines) first_line = next(reader) skip = reader.line_num for line in r.iter_lines(): if not first_page: line_num += 1 if line_num <= skip: continue tline = line + b"\n" destfile.write(tline) total += len(tline) last_line = tline if last_line and last_line != first_line: reader = csv.DictReader([last_line.decode('utf-8')], first_line) last_line = next(reader) first_page = False # JSON-Stream processing writes the entire buffer to the destination file. The last line is # captured by reverse seeking in the buffer from right before the last b'\n' newline to the next # newline or buf[0], then calling readline from the current position elif content_type == "application/x-json-stream": buf = r.content if not buf: break destfile.write(buf) total += len(buf) b = io.BytesIO(buf) b.seek(-2, os.SEEK_END) while b.read(1) != b'\n': b.seek(-2, os.SEEK_CUR) if b.tell() == os.SEEK_SET: break last_line = json.loads(b.readline().decode('utf-8')) # 3. Save the last record key and flush the destination file buffers to disk. if not last_line: break destfile.flush() last_record = [urlquote(str(last_line.get(key))) for key in page_sort_columns] if callback: if not callback(progress="Downloading: %.2f MB transferred" % (float(total) / float(Megabyte))): destfile.close() return elapsed = datetime.datetime.now() - start summary = get_transfer_summary(total, elapsed) # perform automatic file deletion on detected "empty" content, if requested delete_file = True if total == 0 else False if delete_if_empty and total > 0: destfile.seek(0) if content_type == "application/json" or content_type == "application/x-json-stream": buf = destfile.read(16) if buf == b"[]\n" or buf == b"{}\n": delete_file = True elif content_type == "text/csv": reader = csv.reader(codecs.iterdecode(destfile, 'utf-8') if not IS_PY2 else destfile) rowcount = 0 for row in reader: rowcount += 1 if rowcount > 1: break if rowcount <= 1: delete_file = True # automatically delete zero-length files or detected "empty" content if delete_file: destfile.close() os.remove(destfilename) destfile = None log_msg = "File [%s] transfer successful. %s %s" % \ (destfilename, summary, "File was automatically deleted due to empty content." if delete_file else "") logging.info(log_msg) if callback: callback(summary=log_msg, file_path=destfilename) finally: if destfile: destfile.close()
[docs] def delete(self, path, headers=DEFAULT_HEADERS, guard_response=None): """Perform DELETE request, returning response object. Arguments: path: the path within this bound catalog headers: headers to set in request guard_response: expected current resource state as previously seen response object. Uses guard_response to build appropriate 'if-match' header to assure change is only applied to expected state. Raises ConcurrentUpdate for 412 status. """ if path == "/": raise DerivaPathError('See self.delete_ermrest_catalog() if you really want to destroy this catalog.') return DerivaBinding.delete(self, path, headers=headers, guard_response=guard_response)
[docs] def delete_ermrest_catalog(self, really=False): """Perform DELETE request, destroying catalog on server. Arguments: really: delete when True, abort when False (default) """ if really is True: return DerivaBinding.delete(self, '/') else: raise ValueError('Catalog deletion refused when really is %s.' % really)
[docs] def clone_catalog(self, dst_catalog=None, copy_data=True, copy_annotations=True, copy_policy=True, truncate_after=True, exclude_schemas=None): """Clone this catalog's content into dest_catalog, creating a new catalog if needed. :param dst_catalog: Destination catalog or None to request creation of new destination (default). :param copy_data: Copy table contents when True (default). :param copy_annotations: Copy annotations when True (default). :param copy_policy: Copy access-control policies when True (default). :param truncate_after: Truncate destination history after cloning when True (default). :param exclude_schemas: A list of schema names to exclude from the cloning process. When dest_catalog is provided, attempt an idempotent clone, assuming content MAY be partially cloned already using the same parameters. This routine uses a table-level annotation "tag:isrd.isi.edu,2018:clone-state" to save progress markers which help it restart efficiently if interrupted. Cloning preserves source row RID values for application tables so that any RID-based foreign keys are still valid. It is not generally advisable to try to merge more than one source into the same clone, nor to clone on top of rows generated locally in the destination, since this could cause duplicate RID conflicts. Cloning does not preserve all RID values for special ERMrest tables in the public schema (e.g. ERMrest_Client, ERMrest_Group) but normal applications should only consider the ID key of these tables. Truncation after cloning avoids retaining incremental snapshots which contain partial clones. """ src_model = self.getCatalogModel() session_config = self._session_config.copy() if self._session_config else DEFAULT_SESSION_CONFIG.copy() session_config["allow_retry_on_all_methods"] = True if dst_catalog is None: # TODO: refactor with DerivaServer someday server = DerivaBinding(self._scheme, self._server, self._credentials, self._caching, session_config) dst_id = server.post("/ermrest/catalog").json()["id"] dst_catalog = ErmrestCatalog(self._scheme, self._server, dst_id, self._credentials, self._caching, session_config) # set top-level config right away and find fatal usage errors... if copy_policy: if not src_model.acls: raise ValueError("Use of copy_policy=True not possible when caller does not own source catalog.") dst_catalog.put('/acl', json=src_model.acls) if copy_annotations: dst_catalog.put('/annotation', json=src_model.annotations) # build up the model content we will copy to destination dst_model = dst_catalog.getCatalogModel() new_model = [] new_columns = [] # ERMrest does not currently allow bulk column creation new_keys = [] # ERMrest does not currently allow bulk key creation clone_states = {} fkeys_deferred = {} exclude_schemas = [] if exclude_schemas is None else exclude_schemas def prune_parts(d, *extra_victims): victims = set(extra_victims) # we will apply config as a second pass after extending dest model # but loading bulk first may speed that up if not copy_annotations: victims |= {'annotations',} if not copy_policy: victims |= {'acls', 'acl_bindings'} for k in victims: d.pop(k, None) return d def copy_sdef(s): """Copy schema definition structure with conditional parts for cloning.""" d = prune_parts(s.prejson(), 'tables') return d def copy_tdef_core(t): """Copy table definition structure with conditional parts excluding fkeys.""" d = prune_parts(t.prejson(), 'foreign_keys') d['column_definitions'] = [ prune_parts(c) for c in d['column_definitions'] ] d['keys'] = [ prune_parts(c) for c in d.get('keys', []) ] d.setdefault('annotations', {})[_clone_state_url] = 1 if copy_data else None return d def copy_tdef_fkeys(t): """Copy table fkeys structure.""" def check(fkdef): for fkc in fkdef['referenced_columns']: if fkc['schema_name'] == 'public' \ and fkc['table_name'] in {'ERMrest_Client', 'ERMrest_Group', 'ERMrest_RID_Lease'} \ and fkc['column_name'] == 'RID': raise ValueError("Cannot clone catalog with foreign key reference to %(schema_name)s:%(table_name)s:%(column_name)s" % fkc) return fkdef return [ prune_parts(check(d)) for d in t.prejson().get('foreign_keys', []) ] def copy_cdef(c): """Copy column definition with conditional parts.""" return (sname, tname, prune_parts(c.prejson())) def check_column_compatibility(src, dst): """Check compatibility of source and destination column definitions.""" def error(fieldname, sv, dv): return ValueError("Source/dest column %s mismatch %s != %s for %s:%s:%s" % ( fieldname, sv, dv, src.sname, src.tname, src.name )) if src.type.typename != dst.type.typename: raise error("type", src.type.typename, dst.type.typename) if src.nullok != dst.nullok: raise error("nullok", src.nullok, dst.nullok) if src.default != dst.default: raise error("default", src.default, dst.default) def copy_kdef(k): return (sname, tname, prune_parts(k.prejson())) for sname, schema in src_model.schemas.items(): if sname in exclude_schemas: continue if sname not in dst_model.schemas: new_model.append(copy_sdef(schema)) for tname, table in schema.tables.items(): if table.kind != 'table': logging.warning('Skipping cloning of %s %s:%s' % (table.kind, sname, tname)) continue if 'RID' not in table.column_definitions.elements: raise ValueError("Source table %s.%s lacks system-columns and cannot be cloned." % (sname, tname)) if sname not in dst_model.schemas or tname not in dst_model.schemas[sname].tables: new_model.append(copy_tdef_core(table)) clone_states[(sname, tname)] = 1 if copy_data else None fkeys_deferred[(sname, tname)] = copy_tdef_fkeys(table) else: if dst_model.schemas[sname].tables[tname].foreign_keys: # assume that presence of any destination foreign keys means we already loaded deferred_fkeys copy_data = False else: fkeys_deferred[(sname, tname)] = copy_tdef_fkeys(table) src_columns = { c.name: c for c in table.column_definitions } dst_columns = { c.name: c for c in dst_model.schemas[sname].tables[tname].column_definitions } for cname in src_columns: if cname not in dst_columns: new_columns.append(copy_cdef(src_columns[cname])) else: check_column_compatibility(src_columns[cname], dst_columns[cname]) for cname in dst_columns: if cname not in src_columns: raise ValueError("Destination column %s.%s.%s does not exist in source catalog." % (sname, tname, cname)) src_keys = { tuple(sorted(c.name for c in key.unique_columns)): key for key in table.keys } dst_keys = { tuple(sorted(c.name for c in key.unique_columns)): key for key in dst_model.schemas[sname].tables[tname].keys } for utuple in src_keys: if utuple not in dst_keys: new_keys.append(copy_kdef(src_keys[utuple])) for utuple in dst_keys: if utuple not in src_keys: raise ValueError("Destination key %s.%s(%s) does not exist in source catalog." % (sname, tname, ', '.join(utuple))) clone_states[(sname, tname)] = dst_model.schemas[sname].tables[tname].annotations.get(_clone_state_url) clone_states[('public', 'ERMrest_RID_Lease')] = None # never try to sync leases # apply the stage 1 model to the destination in bulk if new_model: dst_catalog.post("/schema", json=new_model).raise_for_status() for sname, tname, cdef in new_columns: dst_catalog.post("/schema/%s/table/%s/column" % (urlquote(sname), urlquote(tname)), json=cdef).raise_for_status() for sname, tname, kdef in new_keys: dst_catalog.post("/schema/%s/table/%s/key" % (urlquote(sname), urlquote(tname)), json=kdef).raise_for_status() # copy data in stage 2 if copy_data: page_size = 10000 for sname, tname in clone_states.keys(): tname_uri = "%s:%s" % (urlquote(sname), urlquote(tname)) if clone_states[(sname, tname)] == 1: # determine current position in (partial?) copy r = dst_catalog.get("/entity/%s@sort(RID::desc::)?limit=1" % tname_uri).json() if r: last = r[0]['RID'] else: last = None while True: page = self.get( "/entity/%s@sort(RID)%s?limit=%d" % ( tname_uri, ("@after(%s)" % urlquote(last)) if last is not None else "", page_size ) ).json() if page: dst_catalog.post("/entity/%s?nondefaults=RID,RCT,RCB" % tname_uri, json=page) last = page[-1]['RID'] else: break # record our progress on catalog in case we fail part way through dst_catalog.put( "/schema/%s/table/%s/annotation/%s" % ( urlquote(sname), urlquote(tname), urlquote(_clone_state_url), ), json=2 ) elif clone_states[(sname, tname)] is None and (sname, tname) in { ('public', 'ERMrest_Client'), ('public', 'ERMrest_Group'), }: # special sync behavior for magic ermrest tables # HACK: these are assumed small enough to join via local merge of arrays page = self.get("/entity/%s?limit=none" % tname_uri).json() dst_catalog.post("/entity/%s?onconflict=skip" % tname_uri, json=page) # record our progress on catalog in case we fail part way through dst_catalog.put( "/schema/%s/table/%s/annotation/%s" % ( urlquote(sname), urlquote(tname), urlquote(_clone_state_url), ), json=2 ) # apply stage 2 model in bulk only... we won't get here unless preceding succeeded new_fkeys = [] for fkeys in fkeys_deferred.values(): new_fkeys.extend(fkeys) if new_fkeys: dst_catalog.post("/schema", json=new_fkeys) # copy over configuration in stage 3 # we need to do this after deferred_fkeys to handle acl_bindings projections with joins dst_model = dst_catalog.getCatalogModel() for sname, src_schema in src_model.schemas.items(): if sname in exclude_schemas: continue dst_schema = dst_model.schemas[sname] if copy_annotations: dst_schema.annotations.clear() dst_schema.annotations.update(src_schema.annotations) if copy_policy: dst_schema.acls.clear() dst_schema.acls.update(src_schema.acls) for tname, src_table in src_schema.tables.items(): dst_table = dst_schema.tables[tname] if copy_annotations: merged = dict(src_table.annotations) if _clone_state_url in dst_table.annotations: merged[_clone_state_url] = dst_table.annotations[_clone_state_url] dst_table.annotations.clear() dst_table.annotations.update(merged) if copy_policy: dst_table.acls.clear() dst_table.acls.update(src_table.acls) dst_table.acl_bindings.clear() dst_table.acl_bindings.update(src_table.acl_bindings) for cname, src_col in src_table.columns.elements.items(): dst_col = dst_table.columns[cname] if copy_annotations: dst_col.annotations.clear() dst_col.annotations.update(src_col.annotations) if copy_policy: dst_col.acls.clear() dst_col.acls.update(src_col.acls) dst_col.acl_bindings.clear() dst_col.acl_bindings.update(src_col.acl_bindings) for src_key in src_table.keys: dst_key = dst_table.key_by_columns([ col.name for col in src_key.unique_columns ]) if copy_annotations: dst_key.annotations.clear() dst_key.annotations.update(src_key.annotations) def xlate_column_map(fkey): dst_from_table = dst_table dst_to_schema = dst_model.schemas[fkey.pk_table.schema.name] dst_to_table = dst_to_schema.tables[fkey.pk_table.name] return { dst_from_table._own_column(from_col.name): dst_to_table._own_column(to_col.name) for from_col, to_col in fkey.column_map.items() } for src_fkey in src_table.foreign_keys: dst_fkey = dst_table.fkey_by_column_map(xlate_column_map(src_fkey)) if copy_annotations: dst_fkey.annotations.clear() dst_fkey.annotations.update(src_fkey.annotations) if copy_policy: dst_fkey.acls.clear() dst_fkey.acls.update(src_fkey.acls) dst_fkey.acl_bindings.clear() dst_fkey.acl_bindings.update(src_fkey.acl_bindings) # send all the config changes to the server dst_model.apply() # truncate cloning history if truncate_after: snaptime = dst_catalog.get("/").json()["snaptime"] dst_catalog.delete("/history/,%s" % urlquote(snaptime)) return dst_catalog
[docs]class ErmrestSnapshot(ErmrestCatalog): """Persistent handle for an ERMrest catalog snapshot. Inherits from ErmrestCatalog and provides the same interfaces, except that the interfaces are now bound to a fixed snapshot of the catalog. """ def __init__(self, scheme, server, catalog_id, snaptime, credentials=None, caching=True, session_config=None): """Create ERMrest catalog snapshot binding. Arguments: scheme: 'http' or 'https' server: server FQDN string catalog_id: e.g., '1' snaptime: e.g., '2PM-DGYP-56Z4' credentials: credential secrets, e.g. cookie caching: whether to retain a GET response cache """ super(ErmrestSnapshot, self).__init__(scheme, server, catalog_id, credentials, caching, session_config) self._server_uri = "%s@%s" % ( self._server_uri, snaptime ) self._snaptime = snaptime @property def snaptime(self): """The snaptime for this catalog snapshot instance.""" return self._snaptime def _pre_mutate(self, path, headers, guard_response=None): """Override and disable mutation operations. When called by the super-class, this method raises an exception. """ raise ErmrestCatalogMutationError('Catalog snapshot is immutable')
[docs]class ErmrestAlias(DerivaBinding): """Persistent handle for an ERMrest alias. Provides basic REST client for HTTP methods on arbitrary paths. Caller has to understand ERMrest APIs and compose appropriate paths, headers, and/or content. Additional utility methods provided for accessing alias metadata. """
[docs] @classmethod def connect(cls, deriva_server, alias_id): """Connect to an ERMrest alias and return the alias binding. :param deriva_server: The DerivaServer binding which hosts ermrest :param alias_id: The id of the existing alias The alias_id is a bare id (str). """ return cls( deriva_server.scheme, deriva_server.server, alias_id, deriva_server.credentials, deriva_server.caching, deriva_server.session_config )
@classmethod def _digest_alias_args(cls, id, owner, alias_target): rep = ErmrestCatalog._digest_catalog_args(id, owner) if isinstance(alias_target, (str, type(None))): rep['alias_target'] = alias_target elif isinstance(alias_target, type(nochange)): pass else: raise TypeError('alias_target must be of type str or None or nochange, not %s' % type(alias_target)) return rep
[docs] @classmethod def create(cls, deriva_server, id=None, owner=None, alias_target=None): """Create an ERMrest catalog alias. :param deriva_server: The DerivaServer binding which hosts ermrest :param id: The (str) id desired by the client (default None) :param owner: The initial (list of str) ACL desired by the client (default None) :param alias_target: The initial target catalog id desired by the client (default None) The new alias id will be returned in the response, and used in future alias access. The use of the id parameter may yield errors if the supplied value is not available for use by the client. The value None will result in a server-assigned alias id. The initial "owner" ACL on the new alias will be the client-supplied owner parameter. The use of owner may yield errors if the supplied ACL does not match the client, i.e. the client cannot lock themselves out of the alias. The value None will result in a server-assigned ACL with the requesting client's identity. The alias is bound to the client-supplied alias_target, if supplied. The use of alias_target may yield errors if the supplied value is not a valid target catalog id. The value None will reserve the alias in an unbound state. Certain failure modes (or message loss) may leave the id reserved in the system. In this case, the effective owner ACL influences which client(s) are allowed to retry creation with the same id. """ path = '/ermrest/alias' r = deriva_server.post(path, json=cls._digest_alias_args(id, owner, alias_target)) r.raise_for_status() return cls.connect(deriva_server, r.json()['id'])
def __init__(self, scheme, server, alias_id, credentials=None, caching=True, session_config=None): """Create ERMrest alias binding. :param scheme: 'http' or 'https' :param server: server FQDN string :param alias_id: e.g. '1' :param credentials: credential secrets, e.g. cookie :param caching: whether to retain a GET response cache """ super(ErmrestAlias, self).__init__(scheme, server, credentials, caching, session_config) self._server_uri = "%s/ermrest/alias/%s" % ( self._server_uri, alias_id ) self._scheme, self._server, self._alias_id, self._credentials, self._caching, self._session_config = \ scheme, server, alias_id, credentials, caching, session_config @property def alias_id(self): return self._alias_id
[docs] def check_path(self, path): if path != '': raise ValueError('ErmrestAlias requires "" relative path')
[docs] def retrieve(self): """Retrieve current alias binding state as a dict. The returned dictionary is suitable for local revision and being passed back into self.update: state = self.retrieve() state.update({ "owner": ..., "alias_target": ...) self.update(**state) """ return self.get('').json()
[docs] def update(self, owner=nochange, alias_target=nochange, id=None): """Update alias binding state in server, returning the response message dict. :param owner: Revised owner ACL for binding or nochange (default None) :param alias_target: Revised target for binding or nochange (default None) :param id: Current self.alias_id or None (default None) The optional id parameter must be None or self.alias_id and does not affect state changes to the server. It is only specified in order to allow an idiom like: state = self.retrieve() state.update(...) self.update(**state) where the original "id" field of self.retrieve() is harmlessly passed through as a keyword. """ rep = self._digest_alias_args(id, owner, alias_target) if id is not None and id != self.alias_id: raise ValueError('parameter id must be None or %r, not %r' % (self.alias_id, id)) return self.put('', json=rep).json()
[docs] def delete_ermrest_alias(self, really=False): """Perform DELETE request, destroying alias on server. :param really: delete when True, abort when False (default) """ if really is True: return DerivaBinding.delete(self, '') else: raise ValueError('Alias deletion refused when really is %s.' % really)