Source code for deriva.core.hatrac_store

import os
import datetime
import requests
import logging
from . import format_exception, NotModified, DEFAULT_HEADERS, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_CHUNK_LIMIT, \
    DEFAULT_MAX_REQUEST_SIZE, urlquote, Megabyte, get_transfer_summary, calculate_optimal_transfer_shape
from .deriva_binding import DerivaBinding
from .utils import hash_utils as hu, mime_utils as mu


[docs]class HatracHashMismatch (ValueError): pass
[docs]class HatracJobAborted (Exception): pass
[docs]class HatracJobPaused (Exception): pass
[docs]class HatracJobTimeout (Exception): pass
[docs]class HatracStore(DerivaBinding): def __init__(self, scheme, server, credentials=None, session_config=None): """Create Hatrac server binding. Arguments: scheme: 'http' or 'https' server: server FQDN string credentials: credential secrets, e.g. cookie Deriva Client Context: You MAY mutate self.dcctx to customize the context for this service endpoint prior to invoking web requests. E.g.: self.dcctx['cid'] = 'my application name' You MAY also supply custom per-request context by passing a headers dict to web request methods, e.g. self.get(..., headers={'deriva-client-context': {'action': 'myapp/function1'}}) This custom header will be merged as override values with the default context in self.dcctx in order to form the complete context for the request. """ DerivaBinding.__init__(self, scheme, server, credentials, caching=False, session_config=session_config)
[docs] def content_equals(self, path, filename=None, md5=None, sha256=None): """ Check if a remote object's content is equal to the content of the at least one of the specified input file, input md5, or input sha256 by comparing MD5 hashes. :return: True IFF the object exists and the MD5 or SHA256 hash matches the MD5 or SHA256 hash of the input file or the passed MD5 or SHA256 parameters. """ self.check_path(path) assert filename or md5 or sha256 if filename: hashes = hu.compute_file_hashes(filename, hashes=['md5', 'sha256']) md5 = hashes['md5'][1] sha256 = hashes['sha256'][1] r = self.head(path) if r.status_code == 200 and \ (md5 and r.headers.get('Content-MD5') == md5 or sha256 and r.headers.get('Content-SHA256') == sha256): return True else: return False
[docs] def get_obj(self, path, headers=DEFAULT_HEADERS, destfilename=None, callback=None, chunk_size=DEFAULT_CHUNK_SIZE): """Retrieve resource optionally streamed to destination file. If destfilename is provided, download content to file with that name. Caller is responsible to clean up file even on error, when the file may or may not be exist. If hatrac provides a Content-MD5 response header, the resulting download file will be hash-verified on success or raise HatracHashMismatch on errors. This is not verified when destfilename is None, as the client must instead consume and validate content directly from the response object. """ self.check_path(path) headers = headers.copy() if destfilename is not None: destfile = open(destfilename, 'w+b') stream = True else: destfile = None stream = False headers['deriva-client-context'] = self.dcctx.merged(headers.get('deriva-client-context', {})).encoded() try: r = self._session.get(self._server_uri + path, headers=headers, stream=stream) self._response_raise_for_status(r) if destfilename is not None: total = 0 current_chunk = 0 start = datetime.datetime.now() logging.debug("Transferring file %s to %s" % (self._server_uri + path, destfilename)) for buf in r.iter_content(chunk_size=chunk_size): destfile.write(buf) total += len(buf) current_chunk += 1 if callback: if not callback(progress="Downloading: %.2f MB transferred" % (total / Megabyte), total_bytes=total, current_chunk=current_chunk): destfile.close() r.close() os.remove(destfilename) return None elapsed = datetime.datetime.now() - start summary = get_transfer_summary(total, elapsed) destfile.flush() logging.info("File [%s] transfer successful. %s" % (destfilename, summary)) if callback: callback(summary=summary, file_path=destfilename) if 'Content-SHA256' in r.headers: destfile.seek(0, 0) logging.info("Verifying SHA256 checksum for downloaded file [%s]" % destfilename) fsha256 = hu.compute_hashes(destfile, hashes=['sha256'])['sha256'][1] rsha256 = r.headers.get('Content-SHA256', r.headers.get('content-sha256', None)) if fsha256 != rsha256: raise HatracHashMismatch('Content-SHA256 %s != computed sha256 %s' % (rsha256, fsha256)) elif 'Content-MD5' in r.headers: destfile.seek(0, 0) logging.info("Verifying MD5 checksum for downloaded file [%s]" % destfilename) fmd5 = hu.compute_hashes(destfile, hashes=['md5'])['md5'][1] rmd5 = r.headers.get('Content-MD5', r.headers.get('content-md5', None)) if fmd5 != rmd5: raise HatracHashMismatch('Content-MD5 %s != computed MD5 %s' % (rmd5, fmd5)) r.close() return r finally: if destfile is not None: destfile.close()
[docs] def put_obj(self, path, data, headers=DEFAULT_HEADERS, md5=None, sha256=None, parents=True, content_type=None, content_disposition=None, allow_versioning=True): """Idempotent upload of object, returning object location URI. Arguments: path: name of object data: filename or seekable file-like object headers: additional headers md5: a base64 encoded md5 digest may be provided in order to skip the automatic hash computation sha256: a base64 encoded sha256 digest may be provided in order to skip the automatic hash computation parents: automatically create parent namespace(s) if missing content_type: the content-type of the object (optional) content_disposition: the preferred content-disposition of the object (optional) allow_versioning: reject with NotModified if content already exists (optional) Automatically computes and sends Content-MD5 if no digests provided. If an object-version already exists under the same name with the same Content-MD5, that location is returned instead of creating a new one. """ self.check_path(path) headers = headers.copy() if hasattr(data, 'read') and hasattr(data, 'seek'): data.seek(0, os.SEEK_END) file_size = data.tell() data.seek(0, 0) f = data else: file_size = os.path.getsize(data) f = open(data, 'rb') if not (md5 or sha256): md5 = hu.compute_hashes(f, hashes=['md5'])['md5'][1] f.seek(0, 0) max_request_size = self.session_config.get("max_request_size", DEFAULT_MAX_REQUEST_SIZE) if file_size > max_request_size: raise ValueError("The PUT request payload size of %d bytes is larger than the currently allowed maximum " "payload size of %d bytes for single request PUT operations. Use the 'put_loc' function " "to perform chunked uploads of large data objects." % (file_size, max_request_size)) try: r = self.head(path) if r.status_code == 200: if (md5 and r.headers.get('Content-MD5') == md5 or sha256 and r.headers.get('Content-SHA256') == sha256): # object already has same content so skip upload f.close() return r.headers.get('Content-Location') elif not allow_versioning: raise NotModified("The data cannot be uploaded because content already exists for this object " "and multiple versions are not allowed.") except requests.HTTPError as e: if e.response.status_code != 404: logging.debug("HEAD request failed: %s" % format_exception(e)) pass # TODO: verify incoming hashes if supplied? headers['Content-MD5'] = md5 headers['Content-SHA256'] = sha256 headers['deriva-client-context'] = self.dcctx.merged(headers.get('deriva-client-context', {})).encoded() if content_type: headers['Content-Type'] = content_type if content_disposition: headers['Content-Disposition'] = content_disposition url = self._server_uri + path url = '%s%s' % (url.rstrip("/") if url.endswith("/") else url, "" if not parents else "?parents=%s" % str(parents).lower()) r = self._session.put(url, data=f, headers=headers) self._response_raise_for_status(r) loc = r.text.strip() or r.url if loc.startswith(self._server_uri): loc = loc[len(self._server_uri):] return loc
[docs] def del_obj(self, path): """Delete an object. """ self.check_path(path) self.delete(path) logging.debug('Deleted object "%s%s".' % (self._server_uri, path))
[docs] def put_loc(self, path, file_path, headers=DEFAULT_HEADERS, md5=None, sha256=None, content_type=None, content_disposition=None, chunked=False, chunk_size=DEFAULT_CHUNK_SIZE, create_parents=True, allow_versioning=True, callback=None, cancel_job_on_error=True): """ :param path: :param file_path: :param headers: :param md5: :param sha256: :param content_type: :param content_disposition: :param chunked: :param chunk_size: :param create_parents: :param allow_versioning: :param callback: :param cancel_job_on_error: :return: """ self.check_path(path) if not chunked: return self.put_obj(path, file_path, headers, md5, sha256, content_type=content_type, content_disposition=content_disposition, parents=create_parents, allow_versioning=allow_versioning) if not (md5 or sha256): md5 = hu.compute_file_hashes(file_path, hashes=['md5'])['md5'][1] try: r = self.head(path) if r.status_code == 200: if (md5 and r.headers.get('Content-MD5') == md5 or sha256 and r.headers.get('Content-SHA256') == sha256): # object already has same content so skip upload return r.headers.get('Content-Location') elif not allow_versioning: raise NotModified("The file [%s] cannot be uploaded because content already exists for this object " "and multiple versions are not allowed." % file_path) except requests.HTTPError as e: if e.response.status_code != 404: logging.debug("HEAD request failed: %s" % format_exception(e)) pass job_id = self.create_upload_job(path, file_path, md5, sha256, content_type=content_type, content_disposition=content_disposition, create_parents=create_parents, chunk_size=chunk_size) try: self.put_obj_chunked(path, file_path, job_id, chunk_size=chunk_size, callback=callback, cancel_job_on_error=cancel_job_on_error) return self.finalize_upload_job(path, job_id) except (requests.Timeout, requests.ConnectionError, requests.exceptions.RetryError) as e: raise HatracJobTimeout(e)
[docs] def put_obj_chunked(self, path, file_path, job_id, chunk_size=DEFAULT_CHUNK_SIZE, callback=None, start_chunk=0, cancel_job_on_error=True): self.check_path(path) job_info = self.get_upload_job(path, job_id).json() chunk_size = job_info.get("chunk-length", chunk_size) logging.debug("Current chunk size: %d bytes. " % chunk_size) try: file_size = os.path.getsize(file_path) chunks = file_size // chunk_size if file_size % chunk_size: chunks += 1 with open(file_path, 'rb') as f: total = 0 chunk = start_chunk if chunk > 0: total = chunk * chunk_size f.seek(total) start = datetime.datetime.now() logging.debug("Transferring file %s to %s%s" % (file_path, self._server_uri, path)) while True: data = f.read(chunk_size) if not data: break url = '%s;upload/%s/%d' % (path, job_id, chunk) headers = {'Content-Type': 'application/octet-stream', 'Content-Length': '%d' % len(data)} r = self.put(url, data=data, headers=headers) self._response_raise_for_status(r) total += len(data) chunk += 1 if callback: ret = callback(job_info=job_info, completed=chunk, total=chunks, file_path=file_path, host=self._server_uri) if ret == 0: self.cancel_upload_job(path, job_id) raise HatracJobAborted("Upload in-progress cancelled by user.") elif ret == -1: raise HatracJobPaused("Upload in-progress paused by user.") elapsed = datetime.datetime.now() - start summary = get_transfer_summary(total, elapsed) logging.info("File [%s] upload successful. %s" % (file_path, summary)) if callback: callback(summary=summary, file_path=file_path) except: if cancel_job_on_error: try: self.cancel_upload_job(path, job_id) except: pass raise
[docs] def create_upload_job(self, path, file_path, md5, sha256, create_parents=True, chunk_size=DEFAULT_CHUNK_SIZE, content_type=None, content_disposition=None): self.check_path(path) max_chunk_size, chunk_count, remainder = \ calculate_optimal_transfer_shape(os.path.getsize(file_path), self.session_config.get("max_chunk_limit", DEFAULT_MAX_CHUNK_LIMIT), requested_chunk_size=chunk_size) if chunk_size > max_chunk_size: logging.warning("Requested chunk size of %d bytes is larger than the hard limit of %d bytes for this " "application. The chunk size will be reset to this maximum." % (chunk_size, max_chunk_size)) chunk_size = max_chunk_size url = '%s;upload%s' % (path, "" if not create_parents else "?parents=%s" % str(create_parents).lower()) obj = {"chunk-length": chunk_size, "content-length": os.path.getsize(file_path)} if md5: obj["content-md5"] = md5 if sha256: obj["content-sha256"] = sha256 if content_disposition: obj['content-disposition'] = content_disposition obj['content-type'] = content_type if content_type else mu.guess_content_type(file_path) r = self.post(url, json=obj, headers={'Content-Type': 'application/json'}) job_id = r.text.split('/')[-1][:-1] logging.debug('Created job_id "%s" for url "%s".' % (job_id, url)) return job_id
[docs] def get_upload_job(self, path, job_id): self.check_path(path) url = '%s;upload/%s' % (path, job_id) headers = {} r = self.get(url, headers=headers) return r
[docs] def finalize_upload_job(self, path, job_id): self.check_path(path) url = '%s;upload/%s' % (path, job_id) headers = {} r = self.post(url, headers=headers) return r.text.strip()
[docs] def cancel_upload_job(self, path, job_id): self.check_path(path) url = '%s;upload/%s' % (path, job_id) headers = {} self.delete(url, headers=headers)
[docs] def is_valid_namespace(self, namespace_path): """Check if a namespace already exists. """ headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} try: self.head(namespace_path, headers) return True except requests.HTTPError as e: if e.response.status_code == requests.codes.not_found: return False raise
[docs] def retrieve_namespace(self, namespace_path): """Retrieve a namespace. """ headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} try: resp = self.get(namespace_path, headers) namespaces = resp.json() return namespaces except requests.HTTPError as e: if e.response.status_code == requests.codes.not_found: return None raise
[docs] def create_namespace(self, namespace_path, parents=True): """Create a namespace. """ self.check_path(namespace_path) url = "?".join([namespace_path, "parents=%s" % str(parents).lower()]) headers = {'Content-Type': 'application/x-hatrac-namespace', 'Accept': 'application/json'} self.put(url, headers=headers) logging.debug('Created namespace "%s%s".' % (self._server_uri, namespace_path))
[docs] def delete_namespace(self, namespace_path): """Delete a namespace. """ self.check_path(namespace_path) headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} self.delete(namespace_path, headers=headers) logging.debug('Deleted namespace "%s%s".' % (self._server_uri, namespace_path))
[docs] def get_acl(self, resource_name, access=None, role=None): """Get the object or namespace ACL resource. """ headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} url = resource_name + ';acl' if access: url += '/' + urlquote(access) if role: url += '/' + urlquote(role) elif role: raise ValueError('Do not specify "role" if "access" mode is not specified.') try: resp = self.get(url, headers) if role: return {access: [role]} elif access: return {access: resp.json()} else: return resp.json() except requests.HTTPError as e: if e.response.status_code == requests.codes.not_found: return None raise
[docs] def set_acl(self, resource_name, access, roles, add_role=False): """Set the object or namespace ACL resource. if 'add_role' is True, the operation will add a single role to the ACL, else it will attempt to replace all of the ACL's roles. This option is only valid when a list of one role is given. """ if add_role and len(roles) > 1: raise ValueError("Cannot add more than one role at a time.") headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} url = "%(resource_name)s;acl/%(access)s" % {'resource_name': resource_name, 'access': urlquote(access)} roles_obj = None if add_role: url += '/' + urlquote(roles[0]) else: roles_obj = roles self.put(url, json=roles_obj, headers=headers) return None
[docs] def del_acl(self, resource_name, access, role=None): """Delete the object or namespace ACL resource. """ headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} url = "%(resource_name)s;acl/%(access)s" % {'resource_name': resource_name, 'access': urlquote(access)} if role: url += '/' + urlquote(role) self.delete(url, headers) return None