Source code for deriva.transfer.download.processors.query.file_download_query_processor

import os
import json
import uuid
import datetime
import logging
import requests
import certifi
from bdbag import bdbag_ro as ro
from deriva.core import urlsplit, format_exception, get_transfer_summary, make_dirs, DEFAULT_CHUNK_SIZE
from deriva.core.utils.mime_utils import parse_content_disposition
from deriva.transfer.download.processors.query.base_query_processor import BaseQueryProcessor, \
    LOCAL_PATH_KEY, FILE_SIZE_KEY
from deriva.transfer.download import DerivaDownloadError, DerivaDownloadConfigurationError, \
    DerivaDownloadAuthenticationError


[docs]class FileDownloadQueryProcessor(BaseQueryProcessor): def __init__(self, envars=None, **kwargs): super(FileDownloadQueryProcessor, self).__init__(envars, **kwargs) self.content_type = "application/x-json-stream" filename = ''.join(['download-manifest_', str(uuid.uuid4()), ".json"]) self.output_relpath, self.output_abspath = self.create_paths(self.base_path, filename=filename) self.ro_file_provenance = False self.allow_anonymous = kwargs.get("allow_anonymous", True)
[docs] def process(self): if not self.identity and not self.allow_anonymous: raise DerivaDownloadAuthenticationError( "Unauthenticated (anonymous) users are not permitted to request direct file downloads.") super(FileDownloadQueryProcessor, self).process() self.outputs.update(self.downloadFiles(self.output_abspath)) return self.outputs
[docs] def getExternalFile(self, url, output_path, headers=None): host = urlsplit(url).netloc if output_path: if not headers: headers = self.HEADERS.copy() else: headers.update(self.HEADERS) session = self.getExternalSession(host) with session.get(url, headers=headers, stream=True, verify=certifi.where()) as r: if r.status_code != 200: file_error = "File [%s] transfer failed." % output_path url_error = 'HTTP GET Failed for url: %s' % url host_error = "Host %s responded:\n\n%s" % (urlsplit(url).netloc, r.text) raise DerivaDownloadError('%s\n\n%s\n%s' % (file_error, url_error, host_error)) else: total = 0 start = datetime.datetime.now() logging.debug("Transferring file %s to %s" % (url, output_path)) with open(output_path, 'wb') as data_file: for chunk in r.iter_content(chunk_size=DEFAULT_CHUNK_SIZE): data_file.write(chunk) total += len(chunk) elapsed = datetime.datetime.now() - start summary = get_transfer_summary(total, elapsed) logging.info("File [%s] transfer successful. %s" % (output_path, summary)) length = int(r.headers.get('Content-Length')) content_type = r.headers.get("Content-Type") return output_path, length, content_type
[docs] def downloadFiles(self, input_manifest): logging.info("Attempting to download file(s) based on the results of query: %s" % self.query) try: with open(input_manifest, "r", encoding='utf-8') as in_file: file_list = dict() for line in in_file: entry = json.loads(line) url = entry.get('url') if not url: logging.warning( "Skipping download due to missing required attribute \"url\" in download manifest entry %s" % json.dumps(entry)) continue store = self.getHatracStore(url) filename = entry.get('filename') if not self.output_filename else self.output_filename if not filename: if store: try: head = store.head(url, headers=self.HEADERS) except requests.HTTPError as e: raise DerivaDownloadError("HEAD request for [%s] failed: %s" % (url, e)) content_disposition = head.headers.get("Content-Disposition") if head.ok else None filename = os.path.basename(filename).split(":")[0] if not content_disposition else \ parse_content_disposition(content_disposition) else: filename = os.path.basename(url) env = self.envars.copy() env.update(entry) rel_path, file_path = self.create_paths(self.base_path, sub_path=self.sub_path, filename=filename, is_bag=self.is_bag, envars=env) output_dir = os.path.dirname(file_path) make_dirs(output_dir) if store: try: resp = store.get_obj(url, self.HEADERS, file_path) except requests.HTTPError as e: raise DerivaDownloadError("File [%s] transfer failed: %s" % (file_path, e)) length = int(resp.headers.get('Content-Length')) content_type = resp.headers.get("Content-Type") url = self.getExternalUrl(url) else: url = self.getExternalUrl(url) file_path, length, content_type = self.getExternalFile(url, file_path) file_bytes = os.path.getsize(file_path) if length != file_bytes: raise DerivaDownloadError( "File size of %s does not match expected size of %s for file %s" % (length, file_bytes, file_path)) if self.ro_manifest: ro.add_file_metadata(self.ro_manifest, source_url=url, local_path=rel_path, media_type=content_type, retrieved_on=ro.make_retrieved_on(), retrieved_by=ro.make_retrieved_by( self.ro_author_name, orcid=self.ro_author_orcid), bundled_as=ro.make_bundled_as()) file_list.update({rel_path: {LOCAL_PATH_KEY: file_path, FILE_SIZE_KEY: file_bytes}}) if self.callback: if not self.callback(progress="Downloaded [%s] to: %s" % (url, file_path)): break return file_list finally: os.remove(input_manifest)