Source code for deriva.transfer.download.processors.base_processor

import os
import datetime
import logging
from deriva.core import urlquote
from deriva.core.utils import mime_utils as mu, hash_utils as hu

PROCESSOR_PARAMS_KEY = "processor_params"
SERVICE_URL_KEY = "deriva_service_url"
SOURCE_URL_KEY = "source_url"
LOCAL_PATH_KEY = "local_path"
REMOTE_PATHS_KEY = "remote_paths"
FILE_SIZE_KEY = "size"
MD5_KEY = "md5"
SHA256_KEY = "sha256"
CONTENT_TYPE_KEY = "content_type"
IDENTIFIER_KEY = "identifier"
IDENTIFIER_LANDING_PAGE = "identifier_landing_page"

logger = logging.getLogger(__name__)


[docs]class BaseProcessor(object): """ Base class for Processor classes """ def __init__(self, envars=None, **kwargs): self.envars = envars if (envars is not None) else dict() self._urlencode_envars() self.kwargs = kwargs self.outputs = kwargs["inputs"] self.parameters = kwargs.get(PROCESSOR_PARAMS_KEY, dict()) or dict() self.identity = kwargs.get("identity", dict()) or dict() self.wallet = kwargs.get("wallet", dict()) or dict() self.timeout = kwargs.get("timeout", None) self.callback = self.parameters.get("callback", self.default_callback) self.last_timestamp = datetime.datetime.now() def _urlencode_envars(self, safe_overrides=None): urlencoded = dict() if not safe_overrides: safe_overrides = dict() for k, v in self.envars.items(): if k.endswith("_urlencoded"): continue urlencoded[k + "_urlencoded"] = urlquote(str(v), safe_overrides.get(k, "")) self.envars.update(urlencoded)
[docs] @classmethod def process(cls): raise NotImplementedError("Must be implemented by subclass")
[docs] @staticmethod def create_paths(base_path, sub_path=None, filename=None, ext=None, is_bag=False, envars=None): relpath = sub_path if sub_path else '' if filename: relpath = ''.join([relpath, "/", filename]) if relpath else filename if isinstance(envars, dict): relpath = relpath.format(**envars) cur_ext = os.path.splitext(relpath)[1] if ext: ext = ext if ext.startswith(".") else "." + ext if not cur_ext == ext: relpath += ext abspath = os.path.abspath( os.path.join(base_path, 'data' if is_bag else '', relpath)) return relpath, abspath
[docs] @staticmethod def make_file_output_values(file_path, input_dict, make_file_hashes=True): input_dict[FILE_SIZE_KEY] = input_dict.get(FILE_SIZE_KEY, os.path.getsize(file_path)) input_dict[CONTENT_TYPE_KEY] = input_dict.get(CONTENT_TYPE_KEY, mu.guess_content_type(file_path)) has_file_hashes = input_dict.get(MD5_KEY) is not None and input_dict.get(SHA256_KEY) is not None if not has_file_hashes and make_file_hashes: input_dict.update(hu.compute_file_hashes(file_path, [MD5_KEY, SHA256_KEY]))
[docs] def should_abort(self): try: if self.timeout and isinstance(self.timeout, datetime.datetime): now = datetime.datetime.now() elapsed = now - self.last_timestamp elapsed_time = str("Elapsed time since last timestamp: %s." % elapsed) if \ (elapsed > datetime.timedelta(milliseconds=0)) else "" if elapsed_time: logging.debug(elapsed_time) if now > self.timeout: return True return False finally: self.last_timestamp = datetime.datetime.now()
[docs] def default_callback(self, **kwargs): progress = kwargs.get("progress", None) if progress and logger.isEnabledFor(logging.DEBUG): logger.debug(progress) if self.should_abort(): return False return True