Source code for deriva.transfer.download.deriva_download

import os
import time
import datetime
import uuid
import logging
import platform
import requests
from requests.exceptions import HTTPError
from bdbag import bdbag_api as bdb, bdbag_ro as ro, BAG_PROFILE_TAG, BDBAG_RO_PROFILE_ID
from deriva.core import ErmrestCatalog, HatracStore, format_exception, get_credential, format_credential, read_config, \
    stob, Megabyte, __version__ as VERSION
from deriva.core.utils.version_utils import get_installed_version
from deriva.transfer.download.processors import find_query_processor, find_transform_processor, find_post_processor
from deriva.transfer.download.processors.base_processor import LOCAL_PATH_KEY, REMOTE_PATHS_KEY, SERVICE_URL_KEY, \
    FILE_SIZE_KEY
from deriva.transfer.download import DerivaDownloadError, DerivaDownloadConfigurationError, \
    DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError


[docs]class DerivaDownload(object): """ """ def __init__(self, server, **kwargs): self.server = server self.hostname = None self.catalog = None self.store = None self.cancelled = False self.output_dir = os.path.abspath(kwargs.get("output_dir", ".")) self.envars = kwargs.get("envars", dict()) self.config = kwargs.get("config") self.credentials = kwargs.get("credentials", dict()) config_file = kwargs.get("config_file") credential_file = kwargs.get("credential_file") self.metadata = dict() self.sessions = dict() self.allow_anonymous = kwargs.get("allow_anonymous", True) self.max_payload_size_mb = int(kwargs.get("max_payload_size_mb", 0) or 0) self.timeout_secs = int(kwargs.get("timeout", 0) or 0) self.timeout = None info = "%s v%s [Python %s, %s]" % ( self.__class__.__name__, get_installed_version(VERSION), platform.python_version(), platform.platform(aliased=True)) logging.info("Initializing downloader: %s" % info) if not self.server: raise DerivaDownloadConfigurationError("Server not specified!") # server variable initialization self.hostname = self.server.get('host', '') if not self.hostname: raise DerivaDownloadConfigurationError("Host not specified!") protocol = self.server.get('protocol', 'https') self.server_url = protocol + "://" + self.hostname catalog_id = self.server.get("catalog_id", "1") session_config = self.server.get('session') # credential initialization token = kwargs.get("token") oauth2_token = kwargs.get("oauth2_token") username = kwargs.get("username") password = kwargs.get("password") if credential_file: self.credentials = get_credential(self.hostname, credential_file) elif token or oauth2_token or (username and password): self.credentials = format_credential(token=token, oauth2_token=oauth2_token, username=username, password=password) # catalog and file store initialization if self.catalog: del self.catalog self.catalog = ErmrestCatalog( protocol, self.hostname, catalog_id, self.credentials, session_config=session_config) if self.store: del self.store self.store = HatracStore( protocol, self.hostname, self.credentials, session_config=session_config) # init dcctx cid self.set_dcctx_cid(kwargs.get("dcctx_cid", "api/" + self.__class__.__name__)) # process config file if config_file: try: self.config = read_config(config_file) except Exception as e: raise DerivaDownloadConfigurationError(e)
[docs] def set_dcctx_cid(self, cid): assert cid, "A dcctx cid is required" if self.catalog: self.catalog.dcctx['cid'] = cid if self.store: self.store.dcctx['cid'] = cid
[docs] def set_config(self, config): self.config = config
[docs] def set_credentials(self, credentials): self.catalog.set_credentials(credentials, self.hostname) self.store.set_credentials(credentials, self.hostname) self.credentials = credentials
[docs] def check_payload_size(self, outputs): if self.max_payload_size_mb < 1: return # This is not very well optimized for the way it is invoked but until it becomes an issue, it is good enough. max_bytes = self.max_payload_size_mb * Megabyte total_bytes = 0 for v in outputs.values(): total_bytes += v.get(FILE_SIZE_KEY, 0) if total_bytes >= max_bytes: raise DerivaDownloadError("Maximum payload size of %d megabytes exceeded." % self.max_payload_size_mb)
[docs] def download(self, **kwargs): if not self.config: raise DerivaDownloadConfigurationError("No configuration specified!") if self.config.get("catalog") is None: raise DerivaDownloadConfigurationError("Catalog configuration error!") if self.timeout_secs > 0: self.timeout = datetime.datetime.now() + datetime.timedelta(0, self.timeout_secs) ro_manifest = None ro_author_name = None ro_author_orcid = None remote_file_manifest = os.path.abspath( ''.join([os.path.join(self.output_dir, 'remote-file-manifest_'), str(uuid.uuid4()), ".json"])) catalog_config = self.config['catalog'] self.envars.update(self.config.get('env', dict())) self.envars.update({"hostname": self.hostname}) # 1. If we don't have a client identity, we need to authenticate identity = kwargs.get("identity") if not identity: try: if not self.credentials: self.set_credentials(get_credential(self.hostname)) logging.info("Validating credentials for host: %s" % self.hostname) attributes = self.catalog.get_authn_session().json() identity = attributes["client"] except HTTPError as he: if he.response.status_code == 404: logging.info("No existing login session found for host: %s" % self.hostname) except Exception as e: raise DerivaDownloadAuthenticationError("Unable to validate credentials: %s" % format_exception(e)) wallet = kwargs.get("wallet", {}) # 2. Check for bagging config and initialize bag related variables bag_path = None bag_archiver = None bag_algorithms = None bag_idempotent = False bag_config = self.config.get('bag') create_bag = True if bag_config else False if create_bag: bag_name = bag_config.get( 'bag_name', ''.join(["deriva_bag", '_', time.strftime("%Y-%m-%d_%H.%M.%S")])).format(**self.envars) bag_path = os.path.abspath(os.path.join(self.output_dir, bag_name)) bag_archiver = bag_config.get('bag_archiver') bag_algorithms = bag_config.get('bag_algorithms', ['sha256']) bag_idempotent = stob(bag_config.get('bag_idempotent', False)) bag_metadata = bag_config.get('bag_metadata', {"Internal-Sender-Identifier": "deriva@%s" % self.server_url}) bag_ro = create_bag and not bag_idempotent and stob(bag_config.get('bag_ro', "True")) if create_bag: bdb.ensure_bag_path_exists(bag_path) bag = bdb.make_bag(bag_path, algs=bag_algorithms, metadata=bag_metadata, idempotent=bag_idempotent) if bag_ro: ro_author_name = bag.info.get("Contact-Name", None if not identity else identity.get('full_name', identity.get('display_name', identity.get('id', None)))) ro_author_orcid = bag.info.get("Contact-Orcid") ro_manifest = ro.init_ro_manifest(author_name=ro_author_name, author_orcid=ro_author_orcid) bag_metadata.update({BAG_PROFILE_TAG: BDBAG_RO_PROFILE_ID}) # 3. Process the set of queries by locating, instantiating, and invoking the specified processor(s) outputs = dict() base_path = bag_path if bag_path else self.output_dir for processor in catalog_config['query_processors']: processor_name = processor["processor"] processor_type = processor.get('processor_type') processor_params = processor.get('processor_params') try: query_processor = find_query_processor(processor_name, processor_type) processor = query_processor(self.envars, inputs=outputs, bag=create_bag, catalog=self.catalog, store=self.store, base_path=base_path, processor_params=processor_params, remote_file_manifest=remote_file_manifest, ro_manifest=ro_manifest, ro_author_name=ro_author_name, ro_author_orcid=ro_author_orcid, identity=identity, wallet=wallet, allow_anonymous=self.allow_anonymous, timeout=self.timeout) outputs = processor.process() if processor.should_abort(): raise DerivaDownloadTimeoutError("Timeout (%s seconds) waiting for processor [%s] to complete." % (self.timeout_secs, processor_name)) self.check_payload_size(outputs) except Exception as e: logging.error(format_exception(e)) if create_bag: bdb.cleanup_bag(bag_path) if remote_file_manifest and os.path.isfile(remote_file_manifest): os.remove(remote_file_manifest) raise # 4. Execute anything in the transform processing pipeline, if configured transform_processors = self.config.get('transform_processors', []) if transform_processors: for processor in transform_processors: processor_name = processor["processor"] processor_type = processor.get('processor_type') processor_params = processor.get('processor_params') try: transform_processor = find_transform_processor(processor_name, processor_type) processor = transform_processor( self.envars, inputs=outputs, processor_params=processor_params, base_path=base_path, bag=create_bag, ro_manifest=ro_manifest, ro_author_name=ro_author_name, ro_author_orcid=ro_author_orcid, identity=identity, wallet=wallet, allow_anonymous=self.allow_anonymous, timeout=self.timeout) outputs = processor.process() if processor.should_abort(): raise DerivaDownloadTimeoutError( "Timeout (%s seconds) waiting for processor [%s] to complete." % (self.timeout_secs, processor_name)) self.check_payload_size(outputs) except Exception as e: if create_bag: bdb.cleanup_bag(bag_path) if remote_file_manifest and os.path.isfile(remote_file_manifest): os.remove(remote_file_manifest) raise # 5. Create the bag, and archive (serialize) if necessary if create_bag: try: if ro_manifest: ro.write_bag_ro_metadata(ro_manifest, bag_path) if not os.path.isfile(remote_file_manifest): remote_file_manifest = None bdb.make_bag(bag_path, algs=bag_algorithms, remote_file_manifest=remote_file_manifest if (remote_file_manifest and os.path.getsize(remote_file_manifest) > 0) else None, update=True, idempotent=bag_idempotent) except Exception as e: logging.fatal("Exception while updating bag manifests: %s" % format_exception(e)) bdb.cleanup_bag(bag_path) raise finally: if remote_file_manifest and os.path.isfile(remote_file_manifest): os.remove(remote_file_manifest) logging.info('Created bag: %s' % bag_path) if bag_archiver is not None: try: archive = bdb.archive_bag(bag_path, bag_archiver.lower(), idempotent=bag_idempotent) bdb.cleanup_bag(bag_path) outputs = {os.path.basename(archive): {LOCAL_PATH_KEY: archive}} except Exception as e: logging.error("Exception while creating data bag archive: %s" % format_exception(e)) raise else: outputs = {os.path.basename(bag_path): {LOCAL_PATH_KEY: bag_path}} # 6. Execute anything in the post processing pipeline, if configured post_processors = self.config.get('post_processors', []) if post_processors: for processor in post_processors: processor_name = processor["processor"] processor_type = processor.get('processor_type') processor_params = processor.get('processor_params') try: post_processor = find_post_processor(processor_name, processor_type) processor = post_processor( self.envars, inputs=outputs, processor_params=processor_params, identity=identity, wallet=wallet, allow_anonymous=self.allow_anonymous, timeout=self.timeout) outputs = processor.process() if processor.should_abort(): raise DerivaDownloadTimeoutError( "Timeout (%s seconds) waiting for processor [%s] to complete." % (self.timeout_secs, processor_name)) self.check_payload_size(outputs) except Exception as e: logging.error(format_exception(e)) raise return outputs
def __del__(self): for session in self.sessions.values(): session.close()
[docs]class GenericDownloader(DerivaDownload): LOCAL_PATH_KEY = LOCAL_PATH_KEY REMOTE_PATHS_KEY = REMOTE_PATHS_KEY SERVICE_URL_KEY = SERVICE_URL_KEY def __init__(self, *args, **kwargs): DerivaDownload.__init__(self, *args, **kwargs)