Source code for deriva.transfer.download.processors.query.base_query_processor

import os
import errno
import certifi
import requests
from deriva.core import urlsplit, get_new_requests_session, stob, make_dirs, DEFAULT_SESSION_CONFIG
from deriva.transfer.download import DerivaDownloadError, DerivaDownloadConfigurationError, \
    DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError
from deriva.transfer.download.processors.base_processor import BaseProcessor, \
    LOCAL_PATH_KEY, FILE_SIZE_KEY, SOURCE_URL_KEY
from bdbag import bdbag_ro as ro


[docs]class BaseQueryProcessor(BaseProcessor): """ Base class for QueryProcessor classes """ HEADERS = {'Connection': 'keep-alive'} def __init__(self, envars=None, **kwargs): super(BaseQueryProcessor, self).__init__(envars, **kwargs) self.catalog = kwargs["catalog"] self.store = kwargs["store"] self.base_path = kwargs["base_path"] self.query = self.parameters["query_path"] if self.envars: self.query = self.query.format(**self.envars) self.sub_path = self.parameters.get("output_path") self.output_filename = self.parameters.get("output_filename") self.store_base = kwargs.get("store_base", "/hatrac/") self.is_bag = kwargs.get("bag", False) self.sessions = kwargs.get("sessions", dict()) self.content_type = "application/octet-stream" self.url = ''.join([self.catalog.get_server_uri(), self.query]) self.ro_file_provenance = stob(self.parameters.get("ro_file_provenance", False if not self.is_bag else True)) self.ro_manifest = self.kwargs.get("ro_manifest") self.ro_author_name = self.kwargs.get("ro_author_name") self.ro_author_orcid = self.kwargs.get("ro_author_orcid") self.output_relpath = None self.output_abspath = None self.paged_query = self.parameters.get("paged_query", False) self.paged_query_size = self.parameters.get("paged_query_size", 100000) self.paged_query_sort_columns = self.parameters.get("paged_query_sort_columns", ["RID"])
[docs] def process(self): resp = self.catalogQuery(headers={'accept': self.content_type}) if os.path.isfile(self.output_abspath): if self.ro_manifest and self.ro_file_provenance: ro.add_file_metadata(self.ro_manifest, source_url=self.url, local_path=self.output_relpath, media_type=self.content_type, retrieved_on=ro.make_retrieved_on(), retrieved_by=ro.make_retrieved_by(self.ro_author_name, orcid=self.ro_author_orcid), bundled_as=ro.make_bundled_as()) self.outputs.update({self.output_relpath: {LOCAL_PATH_KEY: self.output_abspath, FILE_SIZE_KEY: os.path.getsize(self.output_abspath), SOURCE_URL_KEY: self.url}}) return self.outputs
[docs] def catalogQuery(self, headers=None, as_file=True): if not headers: headers = self.HEADERS.copy() else: headers.update(self.HEADERS) if as_file: output_dir = os.path.dirname(self.output_abspath) make_dirs(output_dir) try: if as_file: return self.catalog.getAsFile(self.query, self.output_abspath, headers=headers, callback=self.callback, delete_if_empty=True, paged=self.paged_query, page_size=self.paged_query_size, page_sort_columns=self.paged_query_sort_columns) else: return self.catalog.get(self.query, headers=headers).json() except requests.HTTPError as e: if e.response.status_code == 401: raise DerivaDownloadAuthenticationError(e) if e.response.status_code == 403: raise DerivaDownloadAuthorizationError(e) if as_file: os.remove(self.output_abspath) raise DerivaDownloadError("Error executing catalog query: %s" % e) except Exception: if as_file: os.remove(self.output_abspath) raise
[docs] def headForHeaders(self, url, raise_for_status=False): store = self.getHatracStore(url) if store: r = store.head(url, headers=self.HEADERS) if raise_for_status: r.raise_for_status() headers = r.headers else: url = self.getExternalUrl(url) session = self.getExternalSession(urlsplit(url).hostname) r = session.head(url, headers=self.HEADERS) if raise_for_status: r.raise_for_status() headers = r.headers return headers
[docs] def getHatracStore(self, url): urlparts = urlsplit(url) if not urlparts.path.startswith(self.store_base): return None if url.startswith(self.store_base): return self.store else: serverURI = urlparts.scheme + "://" + urlparts.netloc if serverURI == self.store.get_server_uri(): return self.store else: # do we need to deal with the possibility of a fully qualified URL referencing a different hatrac host? raise DerivaDownloadConfigurationError( "Got a reference to a Hatrac server [%s] that is different from the expected Hatrac server: %s" % ( serverURI, self.store.get_server_uri))
[docs] def getExternalUrl(self, url): urlparts = urlsplit(url) if urlparts.path.startswith(self.store_base): path_only = url.startswith(self.store_base) server_uri = urlparts.scheme + "://" + urlparts.netloc if server_uri == self.store.get_server_uri() or path_only: url = ''.join([self.store.get_server_uri(), url]) if path_only else url else: if not (urlparts.scheme and urlparts.netloc): urlparts = urlsplit(self.catalog.get_server_uri()) server_uri = urlparts.scheme + "://" + urlparts.netloc url = ''.join([server_uri, url]) return url
[docs] def getExternalSession(self, host): sessions = self.sessions auth_params = self.kwargs.get("auth_params", dict()) cookies = auth_params.get("cookies") auth_url = auth_params.get("auth_url") login_params = auth_params.get("login_params") session_config = self.kwargs.get("session_config") session = sessions.get(host) if session is not None: return session if not session_config: session_config = DEFAULT_SESSION_CONFIG session = get_new_requests_session(session_config=session_config) if cookies: session.cookies.update(cookies) if login_params and auth_url: r = session.post(auth_url, data=login_params, verify=certifi.where()) if r.status_code > 203: raise DerivaDownloadError( 'GetExternalSession Failed with Status Code: %s\n%s\n' % (r.status_code, r.text)) sessions[host] = session return session
[docs] def create_default_paths(self): self.output_relpath, self.output_abspath = self.create_paths(self.base_path, sub_path=self.sub_path, filename=self.output_filename, ext=self.ext, is_bag=self.is_bag, envars=self.envars)
def __del__(self): for session in self.sessions.values(): session.close()
[docs]class CSVQueryProcessor(BaseQueryProcessor): def __init__(self, envars=None, **kwargs): super(CSVQueryProcessor, self).__init__(envars, **kwargs) self.ext = ".csv" self.content_type = "text/csv" self.create_default_paths()
[docs]class JSONQueryProcessor(BaseQueryProcessor): def __init__(self, envars=None, **kwargs): super(JSONQueryProcessor, self).__init__(envars, **kwargs) self.ext = ".json" self.content_type = "application/json" self.create_default_paths()
[docs]class JSONStreamQueryProcessor(BaseQueryProcessor): def __init__(self, envars=None, **kwargs): super(JSONStreamQueryProcessor, self).__init__(envars, **kwargs) self.ext = ".json" self.content_type = "application/x-json-stream" self.create_default_paths()
[docs]class JSONEnvUpdateProcessor(BaseQueryProcessor): def __init__(self, envars=None, **kwargs): super(JSONEnvUpdateProcessor, self).__init__(envars, **kwargs) self.query_keys = self.parameters.get("query_keys")
[docs] def process(self): resp = self.catalogQuery(headers={'accept': "application/json"}, as_file=False) if resp: if isinstance(resp, list): resp = resp[0] if self.query_keys is not None: results = {key: resp[key] for key in self.query_keys} else: results = resp self.envars.update(results) self._urlencode_envars() return {}
[docs]class CreateDirProcessor(JSONEnvUpdateProcessor): def __init__(self, envars=None, **kwargs): super(CreateDirProcessor, self).__init__(envars, **kwargs) self.ext = ""
[docs] def process(self): super(CreateDirProcessor, self).process() self.create_default_paths() make_dirs(self.output_abspath)