import io
import os
import re
import sys
import datetime
import json
import shutil
import tempfile
import pathlib
import logging
import platform
import signal
from collections import OrderedDict
from deriva.core import ErmrestCatalog, HatracStore, HatracJobAborted, HatracJobPaused, \
HatracJobTimeout, urlquote, urlparse, stob, format_exception, get_credential, read_config, write_config, \
copy_config, resource_path, make_dirs, lock_file, DEFAULT_CHUNK_SIZE, IS_PY2, __version__ as VERSION
from deriva.core import DEFAULT_SESSION_CONFIG, DEFAULT_CREDENTIAL_FILE
from deriva.core.utils import hash_utils as hu, mime_utils as mu, version_utils as vu
from deriva.transfer.upload import *
from deriva.transfer.upload.processors import find_processor
from deriva.transfer.upload.processors.base_processor import *
from deriva.transfer.upload.processors.archive_processor import ArchiveProcessor
try:
from os import scandir, walk
except ImportError:
from scandir import scandir, walk
logger = logging.getLogger(__name__)
[docs]class Enum(tuple):
__getattr__ = tuple.index
UploadState = Enum(["Success", "Failed", "Pending", "Running", "Paused", "Aborted", "Cancelled", "Timeout"])
UploadMetadataReservedKeyNames = [
"URI", "file_name", "file_ext", "file_size", "base_path", "base_name", "content-disposition", "md5", "sha256",
"md5_base64", "sha256_base64", "schema", "table", "target_table", "_upload_year_", "_upload_month_", "_upload_day_",
"_upload_time_", "_identity_id", "_identity_display_name", "_identity_full_name", "_identity_email"]
DefaultConfig = {
"version_compatibility": [[">=%s" % VERSION]],
"version_update_url": "https://github.com/informatics-isi-edu/deriva-py/releases",
"asset_mappings": [
{
"asset_type": "table",
"default_columns": ["RID", "RCB", "RMB", "RCT", "RMT"],
"file_pattern": "^((?!/assets/).)*/records/(?P<schema>.+?)/(?P<table>.+?)[.]",
"ext_pattern": "^.*[.](?P<file_ext>json|csv)$"
}
]
}
[docs]class FileUploadState:
def __init__(self, state=UploadState.Pending, status="Pending", result=None):
self.state = state
self.status = status
self.result = result
[docs] def asdict(self):
return OrderedDict({
"State": self.state,
"Status": self.status,
"Result": self.result
})
[docs]class UploadEntry(object):
def __init__(self, asset_group, asset_mapping, groupdict, path):
self.asset_group = asset_group
self.asset_mapping = asset_mapping
self.groupdict = groupdict
self.path = path
[docs]class DerivaUpload(object):
"""
Base class for upload tasks. Encapsulates a catalog instance and a hatrac store instance and provides some common
and reusable functions.
This class is not intended to be instantiated directly, but rather extended by a specific implementation.
"""
DefaultConfigFileName = "config.json"
DefaultServerListFileName = "servers.json"
DefaultTransferStateBaseName = ".deriva-upload-state"
DefaultTransferStateFileName = "%s-%s.json"
def __init__(self, config_file=None, credential_file=None, server=None, dcctx_cid=None):
self.server_url = None
self.catalog = None
self.catalog_model = None
self.store = None
self.config = None
self.credentials = None
self.asset_mappings = None
self.transfer_state = dict()
self.transfer_state_fh = None
self.transfer_state_locks = dict()
self.cancelled = False
self.metadata = dict()
self.catalog_metadata = {"table_metadata": {}}
self.processor_output = dict()
self.identity = dict()
self.file_list = OrderedDict()
self.file_status = OrderedDict()
self.skipped_files = set()
self.override_config_file = config_file
self.override_credential_file = credential_file
self.server = self.getDefaultServer() if not server else server
self.dcctx_cid = dcctx_cid if dcctx_cid else self.__class__.__name__
signal.signal(signal.SIGINT, self.interrupt_handler)
self.initialize()
def __del__(self):
self.cleanupTransferState()
[docs] def interrupt_handler(self, signum, frame):
logger.info("Caught interrupt signal.")
self.cancel()
[docs] def initialize(self, cleanup=False):
info = "%s v%s [Python %s, %s]" % (
self.__class__.__name__, vu.get_installed_version(VERSION),
platform.python_version(), platform.platform(aliased=True))
logger.info("Initializing uploader: %s" % info)
# cleanup invalidates the current configuration and credentials in addition to clearing internal state
if cleanup:
self.cleanup()
# reset just clears the internal state
else:
self.reset()
if not self.server:
logger.warning("A server was not specified and an internal default has not been set.")
return
# server variable initialization
protocol = self.server.get('protocol', 'https')
host = self.server.get('host', '')
self.server_url = protocol + "://" + host
catalog_id = self.server.get("catalog_id", "1")
session_config = self.server.get('session', DEFAULT_SESSION_CONFIG.copy())
# default credential initialization
self.credentials = get_credential(host, self.override_credential_file or DEFAULT_CREDENTIAL_FILE)
# catalog and file store initialization
if self.catalog:
del self.catalog
self.catalog = ErmrestCatalog(protocol, host, catalog_id, self.credentials, session_config=session_config)
if self.store:
del self.store
self.store = HatracStore(protocol, host, self.credentials, session_config=session_config)
# determine identity
if self.credentials:
attributes = self.catalog.get_authn_session().json()
self.identity = attributes.get("client", self.identity)
# init dcctx cid to a default
self.set_dcctx_cid(self.dcctx_cid)
"""
Configuration initialization - this is a bit complex because we allow for:
1. Run-time overriding of the config file location.
2. Sub-classes of this class to bundle their own default configuration files in an arbitrary location.
3. The updating of already deployed configuration files if bundled internal defaults are newer.
"""
if self.override_config_file and not os.path.isfile(self.override_config_file):
raise DerivaUploadConfigurationError(
"The configuration file %s could not be found." % self.override_config_file)
config_file = self.override_config_file if self.override_config_file else None
# 1. If we don't already have a valid (i.e., overridden) path to a config file...
if not (config_file and os.path.isfile(config_file)):
# 2. Get the currently deployed config file path, which could possibly be overridden by subclass
config_file = self.getDeployedConfigFilePath()
# 3. If the deployed default path is not valid, OR, it is valid AND is older than the bundled default
if (not (config_file and os.path.isfile(config_file))
or self.isFileNewer(self.getDefaultConfigFilePath(), self.getDeployedConfigFilePath())):
# 4. If we can locate a bundled default config file,
if os.path.isfile(self.getDefaultConfigFilePath()):
# 4.1 Copy the bundled default config file to the deployment-specific config path
copy_config(self.getDefaultConfigFilePath(), config_file)
else:
# 4.2 Otherwise, fallback to writing a failsafe default based on internal hardcoded settings
write_config(config_file, DefaultConfig)
# 5. Finally, read the resolved configuration file into a config object
self._update_internal_config(read_config(config_file))
[docs] def set_dcctx_cid(self, cid):
assert cid, "A dcctx cid is required"
if self.catalog:
self.catalog.dcctx['cid'] = cid
if self.store:
self.store.dcctx['cid'] = cid
def _update_internal_config(self, config):
"""This updates the internal state of the uploader based on the config.
"""
self.config = config
# uploader initialization from configuration
self.asset_mappings = self.config.get('asset_mappings', [])
mu.add_types(self.config.get('mime_overrides'))
[docs] def cancel(self):
self.cancelled = True
[docs] def reset(self):
self.metadata.clear()
self.file_list.clear()
self.file_status.clear()
self.skipped_files.clear()
self.cleanupTransferState()
self.cancelled = False
[docs] def cleanup(self):
self.reset()
self.config = None
self.credentials = None
self.catalog_model = None
[docs] def setServer(self, server):
cleanup = self.server != server
self.server = server
self.initialize(cleanup)
[docs] def setCredentials(self, credentials):
host = self.server['host']
self.credentials = credentials
self.catalog.set_credentials(self.credentials, host)
self.store.set_credentials(self.credentials, host)
attributes = self.catalog.get_authn_session().json()
self.identity = attributes.get("client", self.identity)
[docs] def setConfig(self, config_file):
if not config_file:
config = self.getUpdatedConfig()
if config:
write_config(self.getDeployedConfigFilePath(), config)
else:
self._update_internal_config(read_config(config_file))
if not self.isVersionCompatible():
raise RuntimeError("Upload version incompatibility detected",
"Current version: [%s], required version(s): %s." %
(self.getVersion(), self.getVersionCompatibility()))
[docs] @classmethod
def getDefaultServer(cls):
servers = cls.getServers()
for server in servers:
lower = {k.lower(): v for k, v in server.items()}
if lower.get("default", False):
return server
return servers[0] if len(servers) else {}
[docs] @classmethod
def getServers(cls):
"""
This method must be implemented by subclasses.
"""
raise NotImplementedError("This method must be implemented by a subclass.")
[docs] @classmethod
def getVersion(cls):
"""
This method must be implemented by subclasses.
"""
raise NotImplementedError("This method must be implemented by a subclass.")
[docs] @classmethod
def getConfigPath(cls):
"""
This method must be implemented by subclasses.
"""
raise NotImplementedError("This method must be implemented by a subclass.")
[docs] @classmethod
def getDeployedConfigPath(cls):
return os.path.expanduser(os.path.normpath(cls.getConfigPath()))
[docs] def getVersionCompatibility(self):
return self.config.get("version_compatibility", list())
[docs] def isVersionCompatible(self):
compatibility = self.getVersionCompatibility()
if len(compatibility) > 0:
return vu.is_compatible(self.getVersion(), compatibility)
else:
return True
[docs] @classmethod
def getFileDisplayName(cls, file_path, asset_mapping=None):
return os.path.basename(file_path)
[docs] @staticmethod
def isFileNewer(src, dst):
if not (os.path.isfile(src) and os.path.isfile(dst)):
return False
# This comparison won't work with PyInstaller single-file bundles because the bundle is extracted to a temp dir
# and every timestamp for every file in the bundle is reset to the bundle extraction/creation time.
if getattr(sys, 'frozen', False):
prefix = os.path.sep + "_MEI"
if prefix in src:
return False
src_mtime = os.path.getmtime(os.path.abspath(src))
dst_mtime = os.path.getmtime(os.path.abspath(dst))
return src_mtime > dst_mtime
[docs] @staticmethod
def getFileSize(file_path):
return os.path.getsize(file_path)
[docs] @staticmethod
def guessContentType(file_path):
return mu.guess_content_type(file_path)
[docs] @staticmethod
def getFileHashes(file_path, hashes=frozenset(['md5'])):
return hu.compute_file_hashes(file_path, hashes)
[docs] @staticmethod
def getCatalogTable(asset_mapping, metadata_dict=None):
schema_name, table_name = asset_mapping.get('target_table', [None, None])
if not (schema_name and table_name):
metadata_dict_lower = {k.lower(): v for k, v in metadata_dict.items()}
schema_name = metadata_dict_lower.get("schema")
table_name = metadata_dict_lower.get("table")
if not (schema_name and table_name):
raise ValueError("Unable to determine target catalog table for asset type.")
return '%s:%s' % (urlquote(schema_name), urlquote(table_name))
[docs] @staticmethod
def interpolateDict(src, dst, allowNone=False):
if not (isinstance(src, dict) and isinstance(dst, dict)):
raise ValueError("Invalid input parameter type(s): (src = %s, dst = %s), expected (dict, dict)" % (
type(src).__name__, type(dst).__name__))
dst = dst.copy()
# prune None values from the src, we don't want those to be replaced with the string 'None' in the dest
empty = [k for k, v in src.items() if v is None]
for k in empty:
del src[k]
# perform the string replacement for the values in the destination dict
for k, v in dst.items():
try:
value = v.format(**src)
except KeyError:
value = v
if value:
if value.startswith('{') and value.endswith('}'):
value = None
dst.update({k: value})
# remove all None valued entries in the dest, if disallowed
if not allowNone:
empty = [k for k, v in dst.items() if v is None]
for k in empty:
del dst[k]
return dst
[docs] @staticmethod
def pruneDict(src, dst, stringify=True):
dst = dst.copy()
for k in dst.keys():
value = src.get(k)
dst[k] = str(value) if (stringify and value is not None) else value
return dst
[docs] def getCurrentConfigFilePath(self):
return self.override_config_file if self.override_config_file else self.getDeployedConfigFilePath()
[docs] def getDefaultConfigFilePath(self):
return os.path.normpath(resource_path(os.path.join("conf", self.DefaultConfigFileName)))
[docs] def getDeployedConfigFilePath(self):
return os.path.join(
self.getDeployedConfigPath(), self.server.get('host', ''), self.DefaultConfigFileName)
[docs] def getTransferStateFileName(self):
return self.DefaultTransferStateFileName % \
(self.DefaultTransferStateBaseName, self.server.get('host', 'localhost'))
[docs] def getRemoteConfig(self):
catalog_config = self.catalog.getCatalogModel()
return catalog_config.bulk_upload
[docs] def getUpdatedConfig(self):
# if we are using an overridden config file, skip the update check
if self.override_config_file:
return
logger.info("Checking for updated configuration...")
remote_config = self.getRemoteConfig()
if not remote_config:
logger.info("Remote configuration not present, using default local configuration file.")
return
deployed_config_file_path = self.getDeployedConfigFilePath()
if os.path.isfile(deployed_config_file_path):
current_md5 = hu.compute_file_hashes(deployed_config_file_path, hashes=['md5'])['md5'][0]
else:
logger.info("Local config not found.")
current_md5 = None
tempdir = tempfile.mkdtemp(prefix="deriva_upload_")
if os.path.exists(tempdir):
updated_config_path = os.path.abspath(os.path.join(tempdir, DerivaUpload.DefaultConfigFileName))
with io.open(updated_config_path, 'w', newline='\n', encoding='utf-8') as config:
remote_config_data = json.dumps(
remote_config, ensure_ascii=False, sort_keys=True, separators=(',', ': '), indent=2)
if IS_PY2 and isinstance(remote_config_data, str):
remote_config_data = unicode(remote_config_data, 'utf-8')
config.write(remote_config_data)
new_md5 = hu.compute_file_hashes(updated_config_path, hashes=['md5'])['md5'][0]
if current_md5 != new_md5:
logger.info("Updated configuration found.")
config = read_config(updated_config_path)
self._update_internal_config(config)
else:
logger.info("Configuration is up-to-date.")
config = None
shutil.rmtree(tempdir, ignore_errors=True)
return config
[docs] def getFileStatusAsArray(self):
result = list()
for key in self.file_status.keys():
item = {"File": key}
item.update(self.file_status[key])
result.append(item)
return result
[docs] @staticmethod
def archive_preprocessing_enabled(asset_mapping):
if not asset_mapping:
return False
if asset_mapping.get("archive_preprocessing_enabled", False):
return True
pre_processors = asset_mapping.get("pre_processors", [])
for processor_config in pre_processors:
processor_name = processor_config[PROCESSOR_NAME_KEY]
processor_type = processor_config.get(PROCESSOR_TYPE_KEY)
processor_impl = find_processor(processor_name, processor_type, bypass_whitelist=True)
if issubclass(processor_impl, ArchiveProcessor):
asset_mapping["archive_preprocessing_enabled"] = True
return True
return False
[docs] def validateFile(self, root, path, name):
if self.config.get("relative_path_validation", False):
file_path = os.path.normpath(os.path.join(os.path.relpath(path, root), name))
else:
file_path = os.path.normpath(os.path.join(path, name))
asset_group, asset_mapping, groupdict = self.getAssetMapping(file_path)
if not asset_mapping:
return None
if self.archive_preprocessing_enabled(asset_mapping):
final_path = os.path.abspath(os.path.normpath(groupdict.get("archive_path", path)))
else:
final_path = os.path.abspath(os.path.normpath(os.path.join(path, name)))
return UploadEntry(asset_group, asset_mapping, groupdict, final_path)
[docs] def scanDirectory(self, root, abort_on_invalid_input=False, purge_state=False):
"""
:param root:
:param abort_on_invalid_input:
:param purge_state:
:return:
"""
root = os.path.abspath(root)
if not os.path.isdir(root):
raise FileNotFoundError("Invalid directory specified: [%s]" % root)
self.loadTransferState(root, purge=purge_state)
logger.info("Scanning files in directory [%s]..." % root)
file_list = OrderedDict()
for path, dirs, files in walk(root):
for file_name in files:
if file_name.startswith(self.DefaultTransferStateBaseName):
continue
file_path = os.path.normpath(os.path.join(path, file_name))
upload_entry = self.validateFile(root, path, file_name)
if not upload_entry:
logger.info("Skipping file: [%s] -- Invalid file type or directory location." % file_path)
self.skipped_files.add(file_path)
if abort_on_invalid_input:
raise DerivaUploadError("Invalid input detected, aborting.")
else:
asset_group = upload_entry.asset_group
group_list = file_list.get(asset_group, {})
group_list.update({upload_entry.path: upload_entry})
file_list[asset_group] = group_list
# make sure that file entries in both self.file_list and self.file_status are ordered by the declared order of
# the asset_mapping for the file
for group in sorted(file_list.keys()):
self.file_list[group] = file_list[group]
for upload_entry in file_list[group].values():
file_path = upload_entry.path
logger.info("Including %s: [%s]." %
("directory (for archive)" if self.archive_preprocessing_enabled(
upload_entry.asset_mapping) else "file", file_path))
status = self.getTransferStateStatus(file_path)
if status:
self.file_status[file_path] = FileUploadState(UploadState.Paused, status).asdict()
else:
self.file_status[file_path] = FileUploadState(UploadState.Pending, "Pending").asdict()
[docs] def getAssetMapping(self, file_path):
"""
:param file_path:
:return:
"""
asset_group = -1
for asset_type in self.asset_mappings:
asset_group += 1
groupdict = dict()
dir_pattern = asset_type.get('dir_pattern', '')
ext_pattern = asset_type.get('ext_pattern', '')
file_pattern = asset_type.get('file_pattern', '')
path = file_path.replace("\\", "/")
if dir_pattern:
match = re.search(dir_pattern, path)
if not match:
logger.debug("The dir_pattern \"%s\" failed to match the input path [%s]" % (dir_pattern, path))
continue
groupdict.update(match.groupdict())
if ext_pattern:
if self.archive_preprocessing_enabled(asset_type):
logger.warning("The 'ext_pattern' parameter is not compatible when archive preprocessing "
"is enabled. Only input directories matching 'dir_pattern' are supported.")
continue
match = re.search(ext_pattern, path, re.IGNORECASE)
if not match:
logger.debug("The ext_pattern \"%s\" failed to match the input path [%s]" % (ext_pattern, path))
continue
groupdict.update(match.groupdict())
if file_pattern:
if self.archive_preprocessing_enabled(asset_type):
logger.warning("The 'file_pattern' parameter is not compatible when archive preprocessing "
"is enabled. Only input directories matching 'dir_pattern' are supported.")
continue
match = re.search(file_pattern, path)
if not match:
logger.debug("The file_pattern \"%s\" failed to match the input path [%s]" % (file_pattern, path))
continue
groupdict.update(match.groupdict())
return asset_group, asset_type, groupdict
return None, None, None
[docs] def uploadFiles(self, status_callback=None, file_callback=None):
completed = 0
for group, assets in self.file_list.items():
if self.cancelled:
break
for entry in assets.values():
if self.cancelled:
self.file_status[entry.path] = FileUploadState(UploadState.Cancelled, "Cancelled by user").asdict()
break
try:
self.file_status[entry.path] = FileUploadState(UploadState.Running, "In-progress").asdict()
if status_callback:
status_callback()
result = self.uploadFile(entry.path,
entry.asset_mapping,
entry.groupdict,
file_callback or self.defaultFileCallback)
if self.cancelled:
self.file_status[entry.path] = FileUploadState(UploadState.Cancelled,
"Cancelled by user").asdict()
break
else:
self.file_status[entry.path] = FileUploadState(UploadState.Success, "Complete", result).asdict()
completed += 1
except HatracJobPaused:
status = self.getTransferStateStatus(entry.path)
if status:
self.file_status[entry.path] = FileUploadState(
UploadState.Paused, "Paused: %s" % status).asdict()
continue
except HatracJobTimeout:
status = self.getTransferStateStatus(entry.path)
if status:
self.file_status[entry.path] = FileUploadState(UploadState.Timeout, "Timeout").asdict()
continue
except HatracJobAborted:
self.file_status[entry.path] = FileUploadState(UploadState.Aborted, "Aborted by user").asdict()
except:
logger.debug("Unexpected exception", exc_info=sys.exc_info())
(etype, value, traceback) = sys.exc_info()
self.file_status[entry.path] = FileUploadState(UploadState.Failed, format_exception(value)).asdict()
self.delTransferState(entry.path)
if status_callback:
status_callback()
failed_uploads = dict()
try:
for key, value in self.file_status.items():
if (value["State"] == UploadState.Failed) or (value["State"] == UploadState.Timeout):
failed_uploads[key] = value["Status"]
if self.skipped_files:
logger.warning("The following %d file(s) were skipped because they did not satisfy the matching "
"criteria of the configuration:\n\n%s\n" %
(len(self.skipped_files), '\n'.join(sorted(self.skipped_files))))
if failed_uploads:
logger.warning("The following %d file(s) failed to upload due to errors:\n\n%s\n" %
(len(failed_uploads), '\n'.join(["%s -- %s" % (key, failed_uploads[key])
for key in sorted(failed_uploads.keys())])))
raise RuntimeError("%s file(s) failed to upload due to errors." % len(failed_uploads))
finally:
logger.info("File upload processing completed: %s files were uploaded successfully, "
"%s files failed to upload due to errors, "
"%s files were skipped because they did not satisfy the matching criteria of the configuration."
% (completed, len(failed_uploads), len(self.skipped_files)))
return self.file_status
[docs] def uploadFile(self, file_path, asset_mapping, match_groupdict, callback=None):
"""
Primary API subclass function.
:param file_path:
:param asset_mapping:
:param match_groupdict:
:param callback:
:return:
"""
logger.info("Processing: [%s]" % file_path)
if asset_mapping.get("asset_type", "file") == "table":
return self._uploadTable(file_path, asset_mapping, match_groupdict)
else:
return self._uploadAsset(file_path, asset_mapping, match_groupdict, callback)
def _uploadAsset(self, file_path, asset_mapping, match_groupdict, callback=None):
# 1. Populate initial file metadata from directory scan pattern matches
self._initFileMetadata(file_path, asset_mapping, match_groupdict)
# 2. Execute any configured preprocessors
self._execute_processors(file_path, asset_mapping, match_groupdict, processor_list=PRE_PROCESSORS_KEY)
if PROCESSOR_MODIFIED_FILE_PATH_KEY in self.processor_output:
file_path = self.processor_output[PROCESSOR_MODIFIED_FILE_PATH_KEY]
self._urlEncodeMetadata(asset_mapping.get("url_encoding_safe_overrides"))
logger.info("Computed metadata for: [%s]." % file_path)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Current metadata: %s" % self.metadata)
# 3. Compute checksum(s) for current file and add to metadata
logger.info("Computing checksums for file: [%s]. Please wait..." % file_path)
hashes = self.getFileHashes(file_path, asset_mapping.get('checksum_types', ['md5', 'sha256']))
for alg, checksum in hashes.items():
alg = alg.lower()
self.metadata[alg] = checksum[0]
self.metadata[alg + "_base64"] = checksum[1]
if self.cancelled:
return
# 4. Populate additional metadata by querying the catalog
self._queryFileMetadata(asset_mapping)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Updated metadata: %s" % self.metadata)
# 5. If "create_record_before_upload" specified in asset_mapping, check for an existing record, creating a new
# one if necessary. Otherwise, delay this logic until after the file upload.
result = record = None
if stob(asset_mapping.get("create_record_before_upload", False)):
record = self._getFileRecord(asset_mapping)
# 6. Perform the Hatrac upload
self._getFileHatracMetadata(asset_mapping)
hatrac_options = asset_mapping.get("hatrac_options", {})
file_size = self.metadata["file_size"]
versioned_uri = \
self._hatracUpload(self.metadata["URI"],
file_path,
md5=self.metadata.get("md5_base64"),
sha256=self.metadata.get("sha256_base64"),
content_type=self.guessContentType(file_path),
content_disposition=self.metadata.get("content-disposition"),
chunked=True if (file_size > DEFAULT_CHUNK_SIZE or file_size == 0) else False,
create_parents=stob(hatrac_options.get("create_parents", True)),
allow_versioning=stob(hatrac_options.get("allow_versioning", True)),
callback=callback)
logger.debug("Hatrac upload successful. Result object URI: %s" % versioned_uri)
versioned_uris = True
if "versioned_uris" in hatrac_options:
versioned_uris = stob(hatrac_options.get("versioned_uris", True))
if "versioned_urls" in hatrac_options:
versioned_uris = stob(hatrac_options.get("versioned_urls", True))
if versioned_uris:
self.metadata["URI"] = versioned_uri
else:
self.metadata["URI"] = versioned_uri.rsplit(":")[0]
self.metadata["URI_urlencoded"] = urlquote(self.metadata["URI"])
# 7. Check for an existing record and create a new one if necessary
if not record:
record, result = self._getFileRecord(asset_mapping)
# 8. Update an existing record, if necessary
column_map = asset_mapping.get("column_map", {})
updated_record = self.interpolateDict(self.metadata, column_map)
if updated_record != record:
record_update_template = asset_mapping.get("record_update_template")
require_record_update_template = stob(asset_mapping.get("require_record_update_template", False))
if require_record_update_template and not record_update_template:
raise DerivaUploadCatalogUpdateError(
"A required 'record_update_template' parameter for this asset mapping could not be found in the "
"configuration. The record will not be updated.")
logger.info("Updating catalog for file [%s]" % self.getFileDisplayName(file_path))
result = self._catalogRecordUpdate(self.metadata['target_table'],
record,
updated_record,
record_update_template)[0]
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Updated catalog for file [%s]: %s" % (self.getFileDisplayName(file_path), result))
record, result = self._getFileRecord(asset_mapping)
# 9. Execute any configured post_processors
self._execute_processors(file_path, asset_mapping, match_groupdict, processor_list=POST_PROCESSORS_KEY)
return result
def _uploadTable(self, file_path, asset_mapping, match_groupdict, callback=None):
if self.cancelled:
return None
self._initFileMetadata(file_path, asset_mapping, match_groupdict)
self._execute_processors(file_path, asset_mapping, match_groupdict, processor_list=PRE_PROCESSORS_KEY)
try:
default_columns = asset_mapping.get("default_columns")
if not default_columns:
default_columns = self.catalog.getDefaultColumns({}, self.metadata['target_table'])
default_param = ('?defaults=%s' % ','.join(default_columns)) if len(default_columns) > 0 else ''
file_ext = self.metadata['file_ext']
file_ext = file_ext.lower()
if file_ext == 'csv':
headers = {'content-type': 'text/csv'}
elif file_ext == 'json':
headers = {'content-type': 'application/json'}
else:
raise DerivaUploadCatalogCreateError("Unsupported file type for catalog bulk upload: %s" % file_ext)
with open(file_path, "rb") as fp:
result = self.catalog.post(
'/entity/%s%s' % (self.metadata['target_table'], default_param), fp, headers=headers)
return result
except:
(etype, value, traceback) = sys.exc_info()
raise DerivaUploadCatalogCreateError(format_exception(value))
finally:
self._execute_processors(file_path, asset_mapping, match_groupdict, processor_list=POST_PROCESSORS_KEY)
def _getFileRecord(self, asset_mapping):
"""
Helper function that queries the catalog to get a record linked to the asset, or create it if it doesn't exist.
:return: the file record
"""
record = None
column_map = asset_mapping.get("column_map", {})
rqt = asset_mapping['record_query_template']
try:
path = rqt.format(**self.metadata)
except KeyError as e:
raise DerivaUploadConfigurationError("Record query template substitution error: %s" % format_exception(e))
result = self.catalog.get(path).json()
if result:
record = result[0]
self._updateFileMetadata(record, no_overwrite=True)
return self.pruneDict(record, column_map), record
else:
row = self.interpolateDict(self.metadata, column_map)
result = self._catalogRecordCreate(self.metadata['target_table'], row)
if result:
record = result[0]
self._updateFileMetadata(record)
return self.interpolateDict(self.metadata, column_map, allowNone=True), record
def _urlEncodeMetadata(self, safe_overrides=None):
urlencoded = dict()
if not safe_overrides:
safe_overrides = dict()
for k, v in self.metadata.items():
if k.endswith("_urlencoded"):
continue
urlencoded[k + "_urlencoded"] = urlquote(str(v), safe_overrides.get(k, ""))
self._updateFileMetadata(urlencoded)
def _initFileMetadata(self, file_path, asset_mapping, match_groupdict):
self.metadata.clear()
self._updateFileMetadata(match_groupdict)
self.metadata['target_table'] = self.getCatalogTable(asset_mapping, match_groupdict)
self.metadata["file_name"] = self.getFileDisplayName(file_path)
self.metadata["file_size"] = self.getFileSize(file_path)
if "file_ext" not in self.metadata:
self.metadata["file_ext"] = "".join(pathlib.PurePath(file_path).suffixes)
self.metadata["base_path"] = os.path.dirname(file_path)
self.metadata["base_name"] = self.metadata["file_name"].rsplit(
self.metadata["file_ext"])[0] if self.metadata["file_ext"] else self.metadata["file_name"]
time = datetime.datetime.now()
self.metadata["_upload_year_"] = time.year
self.metadata["_upload_month_"] = time.month
self.metadata["_upload_day_"] = time.day
self.metadata["_upload_time_"] = time.timestamp()
self.metadata["_identity_id"] = self.identity.get("id", "anonymous")
self.metadata["_identity_display_name"] = self.identity.get("display_name")
self.metadata["_identity_full_name"] = self.identity.get("full_name")
self.metadata["_identity_email"] = self.identity.get("email")
self._urlEncodeMetadata(asset_mapping.get("url_encoding_safe_overrides"))
def _updateFileMetadata(self, src, strict=False, no_overwrite=False):
if not (isinstance(src, dict)):
raise ValueError("Invalid input parameter type(s): (src = %s), expected (dict)" % type(src).__name__)
dst = src.copy()
for k in src.keys():
if strict:
if k in UploadMetadataReservedKeyNames:
logger.warning("Context metadata update specified reserved key name [%s], "
"ignoring value: %s " % (k, src[k]))
del dst[k]
continue
# don't overwrite any existing metadata field
if no_overwrite:
if k in self.metadata:
del dst[k]
self.metadata.update(dst)
def _queryFileMetadata(self, asset_mapping):
"""
Helper function that queries the catalog to get required metadata for a given file/asset
"""
metadata_queries = asset_mapping.get("metadata_query_templates", [])
if logger.isEnabledFor(logging.DEBUG) and metadata_queries:
logger.debug("Querying catalog for additional metadata...")
for uri in metadata_queries:
try:
path = uri.format(**self.metadata)
except KeyError as e:
raise RuntimeError("Metadata query template substitution error: %s" % format_exception(e))
result = self.catalog.get(path).json()
if result:
self._updateFileMetadata(result[0], True)
else:
raise RuntimeError("Metadata query did not return any results: %s" % path)
self._getFileExtensionMetadata(self.metadata.get("file_ext"))
for k, v in asset_mapping.get("column_value_templates", {}).items():
try:
self.metadata[k] = v.format(**self.metadata)
except KeyError as e:
logger.warning("Column value template substitution error: %s" % format_exception(e))
continue
self._urlEncodeMetadata(asset_mapping.get("url_encoding_safe_overrides"))
def _getFileExtensionMetadata(self, ext):
ext_map = self.config.get("file_ext_mappings", {})
entry = ext_map.get(ext)
if entry:
self._updateFileMetadata(entry)
def _getFileHatracMetadata(self, asset_mapping):
try:
hatrac_templates = asset_mapping["hatrac_templates"]
# URI is required
self.metadata["URI"] = hatrac_templates["hatrac_uri"].format(**self.metadata)
# overridden content-disposition is optional
content_disposition = hatrac_templates.get("content-disposition")
if content_disposition:
filename = content_disposition.format(**self.metadata)
else:
filename = urlparse(self.metadata["URI"]).path.rsplit("/", 1)[-1]
sanitized_filename, sanitized_content_disp = \
self._validateHatracFilename(filename, asset_mapping.get("hatrac_options", {}))
if content_disposition:
self.metadata["content-disposition"] = sanitized_content_disp
else:
self.metadata["URI"] = self.metadata["URI"].replace(filename, sanitized_filename)
self._urlEncodeMetadata(asset_mapping.get("url_encoding_safe_overrides"))
except KeyError as e:
raise DerivaUploadConfigurationError("Hatrac template substitution error: %s" % format_exception(e))
def _validateHatracFilename(self, filename, hatrac_options):
if not filename:
return None
sanitize = hatrac_options.get("sanitize_filenames", True)
pattern = hatrac_options.get("sanitize_filenames_pattern")
is_content_disp = re.match(r"filename\*?=['\"]?(?:UTF-\d['\"]*)?([^;\r\n\"']*)['\"]?;?", filename)
if is_content_disp:
filename = is_content_disp.group(1)
pattern = pattern if pattern else "[^a-zA-Z0-9_.-]"
sanitized_filename = urlquote(re.sub(pattern, "_", filename)) if sanitize else filename
if is_content_disp:
content_disp = is_content_disp.string.replace(filename, sanitized_filename)
else:
content_disp = "filename*=UTF-8''" + sanitized_filename
return sanitized_filename, content_disp
def _hatracUpload(self,
uri,
file_path,
md5=None,
sha256=None,
content_type=None,
content_disposition=None,
chunked=True,
create_parents=True,
allow_versioning=True,
callback=None):
# check if there is already an in-progress transfer for this file,
# and if so, that the local file has not been modified since the original upload job was created
can_resume = False
transfer_state = self.getTransferState(file_path)
if transfer_state:
content_md5 = transfer_state.get("content-md5")
content_sha256 = transfer_state.get("content-sha256")
if content_md5 or content_sha256:
if (md5 == content_md5) or (sha256 == content_sha256):
can_resume = True
if transfer_state and can_resume:
logger.info("Resuming upload (%s) of file: [%s] to host %s. Please wait..." % (
self.getTransferStateStatus(file_path), file_path, transfer_state.get("host")))
path = transfer_state["target"]
job_id = transfer_state['url'].rsplit("/", 1)[1]
if not (transfer_state["total"] == transfer_state["completed"]):
self.store.put_obj_chunked(path,
file_path,
job_id,
callback=callback,
start_chunk=transfer_state["completed"],
cancel_job_on_error=False)
return self.store.finalize_upload_job(path, job_id)
else:
logger.info("Uploading file: [%s] to host %s. Please wait..." % (file_path, self.server_url))
return self.store.put_loc(uri,
file_path,
md5=md5,
sha256=sha256,
content_type=content_type,
content_disposition=content_disposition,
chunked=chunked,
create_parents=create_parents,
allow_versioning=allow_versioning,
callback=callback,
cancel_job_on_error=False)
def _get_catalog_table_columns(self, table):
table_columns = set()
catalog_table_metadata = self.catalog_metadata["table_metadata"]
table_metadata = catalog_table_metadata.get(table)
if table_metadata:
table_columns = table_metadata.get("table_columns")
if not table_columns:
table_columns = self.catalog.getTableColumns(table)
catalog_table_metadata.update({table: {"table_columns": table_columns}})
return table_columns
def _validate_catalog_row_columns(self, row, table):
return set(row.keys()) - self._get_catalog_table_columns(table)
def _validate_row_key_constraints(self, catalog_table, row):
logger.debug("Validating row key constraints for %s: %s" % (catalog_table, row))
if not self.catalog_model:
logger.debug("Fetching catalog model...")
self.catalog_model = self.catalog.getCatalogModel()
schema_name, table_name = self.catalog.splitQualifiedCatalogName(catalog_table)
schema = self.catalog_model.schemas.get(schema_name)
table = schema.tables.get(table_name)
non_null_correlations = {cname for cname, cval in row.items() if cval is not None}
for key in table.keys:
if set(key.unique_columns.elements).issubset(non_null_correlations):
logger.debug("%s is a subset of non-null correlations %s" %
(set(key.unique_columns.elements), non_null_correlations))
return True # it is safe
else:
logger.debug("%s is not a subset of non-null correlations %s" %
(set(key.unique_columns.elements), non_null_correlations))
return False # it is not safe
def _get_catalog_default_columns(self, row, table, exclude=None, quote_url=True):
columns = self._get_catalog_table_columns(table)
if isinstance(exclude, list):
for col in exclude:
columns.remove(col)
defaults = []
supplied_columns = row.keys()
for col in columns:
if col not in supplied_columns:
defaults.append(urlquote(col, safe='') if quote_url else col)
return defaults
def _catalogRecordCreate(self, catalog_table, row, default_columns=None):
"""
:param catalog_table:
:param row:
:param default_columns:
:return:
"""
if self.cancelled:
return None
try:
missing = self._validate_catalog_row_columns(row, catalog_table)
if missing:
raise ValueError(
"Unable to update catalog entry because one or more specified columns do not exist in the "
"target table: [%s]" % ','.join(missing))
if not default_columns:
default_columns = self._get_catalog_default_columns(row, catalog_table)
default_param = ('?defaults=%s' % ','.join(default_columns)) if len(default_columns) > 0 else ''
# for default in default_columns:
# row[default] = None
create_uri = '/entity/%s%s' % (catalog_table, default_param)
logger.debug(
"Attempting catalog record create [%s] with data: %s" % (create_uri, json.dumps(row)))
return self.catalog.post(create_uri, json=[row]).json()
except:
(etype, value, traceback) = sys.exc_info()
raise DerivaUploadCatalogCreateError(format_exception(value))
def _catalogRecordUpdate(self, catalog_table, old_row, new_row, record_update_template=None):
"""
:param catalog_table:
:param new_row:
:param old_row:
:return:
"""
if self.cancelled:
return None
try:
keys = sorted(list(new_row.keys()))
old_keys = sorted(list(old_row.keys()))
if keys != old_keys:
raise RuntimeError("Cannot update catalog - "
"new row column list and old row column list do not match: New: %s != Old: %s" %
(keys, old_keys))
o_keys = ','.join(["o%d:=%s" % (i, urlquote(keys[i])) for i in range(len(keys))])
n_keys = ','.join(["n%d:=%s" % (i, urlquote(keys[i])) for i in range(len(keys))])
update_row = {
'o%d' % i: old_row[keys[i]]
for i in range(len(keys))
}
update_row.update({
'n%d' % i: new_row[keys[i]]
for i in range(len(keys))
})
if record_update_template:
update_uri = record_update_template.format(**self.metadata)
update_row = new_row
else:
update_uri = '/attributegroup/%s/%s;%s' % (catalog_table, o_keys, n_keys)
if self.config.get("strict_update_check", True) and not \
self._validate_row_key_constraints(catalog_table, old_row):
raise ValueError(
"Potential unsafe attributegroup update [%s]: at least one pre-existing, non-null correlation "
"key is required. Old values: %s, New values: %s" %
(update_uri, json.dumps(old_row), json.dumps(new_row)))
logger.debug(
"Attempting catalog record update [%s] with data: %s" % (update_uri, json.dumps(update_row)))
return self.catalog.put(update_uri, json=[update_row]).json()
except:
(etype, value, traceback) = sys.exc_info()
raise DerivaUploadCatalogUpdateError(format_exception(value))
def _execute_processors(self, file_path, asset_mapping, match_groupdict,
processor_list=PRE_PROCESSORS_KEY, **kwargs):
processors = asset_mapping.get(processor_list, [])
if processors:
for processor_config in processors:
processor_name = processor_config[PROCESSOR_NAME_KEY]
processor_type = processor_config.get(PROCESSOR_TYPE_KEY)
processor_params = processor_config.get(PROCESSOR_PARAMS_KEY)
try:
processor_impl = find_processor(processor_name, processor_type, bypass_whitelist=True)
processor = processor_impl(
processor_params=processor_params,
file_path=file_path,
asset_mapping=asset_mapping,
match_groupdict=match_groupdict,
metadata=self.metadata,
processor_output=self.processor_output,
**kwargs)
proc_class = processor.__class__.__module__
proc_name = processor.__class__.__name__
if processor_params is not None and processor_params.get(
PROCESSOR_REQUIRES_METADATA_QUERY_KEY, False):
self._queryFileMetadata(asset_mapping)
logger.debug("Attempting to execute upload processor class %s from module: %s" %
(proc_name, proc_class))
output = processor.process()
if isinstance(output, dict):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("%s output context: %s" % (proc_name, output))
except:
(etype, value, traceback) = sys.exc_info()
raise DerivaUploadError(format_exception(value))
[docs] def defaultFileCallback(self, **kwargs):
completed = kwargs.get("completed")
total = kwargs.get("total")
file_path = kwargs.get("file_path")
file_name = os.path.basename(file_path) if file_path else ""
job_info = kwargs.get("job_info", {})
job_info.update()
if completed and total:
file_name = " [%s]" % file_name
job_info.update({"completed": completed, "total": total, "host": kwargs.get("host")})
status = "Uploading file%s: %d%% complete" % (
file_name, round(((float(completed) / float(total)) % 100) * 100))
self.setTransferState(file_path, job_info)
else:
summary = kwargs.get("summary", "")
file_name = "Uploaded file: [%s] " % file_name
status = file_name # + summary
if status:
# logger.debug(status)
pass
if self.cancelled:
return -1
return True
[docs] @staticmethod
def find_file_in_dir_hierarchy(filename, path):
""" Find all instances of a filename in the entire directory hierarchy specified by path.
"""
file_paths = set()
# First, descend from the base path looking for filename in all sub dirs
for root, dirs, files in walk(path, followlinks=True):
if filename in files:
# found a file
found_path = os.path.normcase(os.path.join(root, filename))
file_paths.add(os.path.normcase(os.path.realpath(found_path)))
continue
# Next, ascend from the base path looking for the same filename in all parent dirs
current = path
while True:
parent = os.path.dirname(current)
if parent == current:
break
for entry in scandir(parent):
if (entry.name == filename) and entry.is_file():
file_paths.add(os.path.normcase(os.path.realpath(os.path.normcase(entry.path))))
current = parent
return file_paths
[docs] def delete_dependent_locks(self, directory):
for path in self.find_file_in_dir_hierarchy(self.getTransferStateFileName(), directory):
logger.info("Attempting to delete an existing transfer state file (dependent lock) at: [%s]" % path)
try:
os.remove(path)
except OSError as e:
logger.warning("Unable to delete transfer state file [%s]: %s" % (path, format_exception(e)))
[docs] def acquire_dependent_locks(self, directory):
for path in self.find_file_in_dir_hierarchy(self.getTransferStateFileName(), directory):
logger.info("Attempting to acquire a dependent lock in [%s]" % os.path.dirname(path))
try:
transfer_state_lock = lock_file(path, 'r+')
transfer_state_fh = transfer_state_lock.acquire(timeout=0, fail_when_locked=True)
self.transfer_state_locks.update({path: {"lock": transfer_state_lock, "handle": transfer_state_fh}})
except Exception as e:
raise DerivaUploadError("Unable to acquire resource lock for directory [%s]. "
"Multiple upload processes cannot operate within the same directory hierarchy. "
"%s" % (os.path.dirname(path), format_exception(e)))
[docs] def loadTransferState(self, directory, purge=False):
transfer_state_file_path = os.path.normcase(os.path.join(directory, self.getTransferStateFileName()))
if purge:
self.delete_dependent_locks(directory)
self.acquire_dependent_locks(directory)
try:
if not os.path.isfile(transfer_state_file_path):
with lock_file(transfer_state_file_path, mode="wb" if IS_PY2 else "w") as tsfp:
json.dump(self.transfer_state, tsfp)
transfer_state_lock = self.transfer_state_locks.get(transfer_state_file_path)
if transfer_state_lock:
self.transfer_state_fh = transfer_state_lock["handle"]
else:
transfer_state_lock = lock_file(transfer_state_file_path, 'r+')
self.transfer_state_fh = transfer_state_lock.acquire(timeout=0, fail_when_locked=True)
self.transfer_state_locks.update(
{directory: {"lock": transfer_state_lock, "handle": self.transfer_state_fh}})
self.transfer_state = json.load(self.transfer_state_fh, object_pairs_hook=OrderedDict)
except Exception as e:
raise DerivaUploadError("Unable to acquire resource lock for directory [%s]. "
"Multiple upload processes cannot operate within the same directory hierarchy. %s"
% (directory, format_exception(e)))
[docs] def getTransferState(self, file_path):
return self.transfer_state.get(file_path)
[docs] def setTransferState(self, file_path, transfer_state):
self.transfer_state[file_path] = transfer_state
self.writeTransferState()
[docs] def delTransferState(self, file_path):
transfer_state = self.getTransferState(file_path)
if transfer_state:
del self.transfer_state[file_path]
self.writeTransferState()
[docs] def writeTransferState(self):
if not self.transfer_state_fh:
return
try:
self.transfer_state_fh.seek(0, 0)
self.transfer_state_fh.truncate()
json.dump(self.transfer_state, self.transfer_state_fh, indent=2)
self.transfer_state_fh.flush()
os.fsync(self.transfer_state_fh.fileno())
except Exception as e:
logger.warning("Unable to write transfer state file: %s" % format_exception(e))
[docs] def cleanupTransferState(self):
if self.transfer_state_fh and not self.transfer_state_fh.closed:
try:
self.transfer_state_fh.flush()
os.fsync(self.transfer_state_fh.fileno())
except Exception as e:
logger.warning("Unable to flush/close transfer state file: %s" % format_exception(e))
finally:
for entry in self.transfer_state_locks.values():
lock = entry.get("lock")
if lock and not lock.fh.closed:
lock.release()
self.transfer_state_locks.clear()
self.transfer_state_fh = None
[docs] def getTransferStateStatus(self, file_path):
transfer_state = self.getTransferState(file_path)
if transfer_state:
return "%d%% complete" % (
round(((float(transfer_state["completed"]) / float(transfer_state["total"])) % 100) * 100))
return None
[docs]class GenericUploader(DerivaUpload):
def __init__(self, config_file=None, credential_file=None, server=None, dcctx_cid=None):
DerivaUpload.__init__(self,
config_file=config_file,
credential_file=credential_file,
server=server,
dcctx_cid=dcctx_cid)
[docs] @classmethod
def getVersion(cls):
return VERSION
[docs] @classmethod
def getConfigPath(cls):
return "~/.deriva/upload/"
[docs] @classmethod
def getServers(cls):
return read_config(os.path.join(
cls.getDeployedConfigPath(), cls.DefaultServerListFileName), create_default=True, default=[])
[docs] @classmethod
def setServers(cls, servers):
return write_config(os.path.join(cls.getDeployedConfigPath(), cls.DefaultServerListFileName), servers)