Source code for deriva.core.deriva_binding


import uuid
import sys
import os
import json
import requests
from multiprocessing import Queue
from . import get_new_requests_session, urlquote_dcctx, ConcurrentUpdate, NotModified, DEFAULT_HEADERS, DEFAULT_SESSION_CONFIG


[docs]class DerivaClientContext (dict): """Represent Deriva-Client-Context header content. Well-known keys (originally defined for Chaise): - `cid`: client application ID i.e. program name - `wid`: window ID i.e. request stream ID - `pid`: page ID i.e. sub-stream ID - `action`: UX action embodied by request(s) - `table`: table upon which application is focused (if any) - `uinit`: True if request initiated by user action Default values to use process-wide: - `cid`:code:: os.path.basename(sys.argv[0]) if available - `wid`:code:: a random UUID The process-wide defaults MAY be customized by mutating `DerivaClientContext.defaults`:code: prior to constructing instances. """ defaults = { 'cid': os.path.basename(sys.argv[0]) if len(sys.argv) > 0 and sys.argv[0] else None, 'wid': uuid.uuid4().hex, } def __init__(self, *args, **kwargs): """Initialize DerivaClientContext from keywords or dict-like. Class-wide defaults are used for fields not set explicitly during construction. """ super(DerivaClientContext, self).__init__(*args, **kwargs) self.set_defaults() self.prune()
[docs] def set_defaults(self, defaults=None): """Set default key-values in self if key not already set.""" if defaults is None: defaults = self.defaults for k, v in defaults.items(): if v is not None and (k not in self or self[k] is None): self[k] = v
[docs] def prune(self): """Prune redundant keys to shorten context representation. Keys with None value or uinit=False are unnecessary as these are implicitly assumed when absent. """ for k, v in self.items(): if v is None: del self[k] if 'uinit' in self and not self['uinit']: del self['uinit']
[docs] def encoded(self): """Encode self as string suitable for Deriva-Client-Context HTTP header.""" self.set_defaults() self.prune() x = urlquote_dcctx(json.dumps(self, indent=None, separators=(',', ':'))) return x
[docs] def merged(self, overrides): c = DerivaClientContext(self) c.update(overrides) return c
[docs]class DerivaPathError (ValueError): pass
def _response_raise_for_status(self): """Raises requests.HTTPError if status code indicates an error. This unbound method can be monkey-patched onto a requests.Response instance or manually invoked on one. """ if 400 <= self.status_code < 600: details = " Details: %s" raise requests.HTTPError( u'%s %s Error: %s for url: [%s]%s' % ( self.status_code, 'Client' if self.status_code < 500 else 'Server', self.reason, self.url, details % self.content if self.content else "", ), response=self )
[docs]class DerivaBinding (object): """This is a base-class for implementation purposes. Not useful for clients.""" def __init__(self, scheme, server, credentials=None, caching=True, session_config=None): """Create HTTP(S) server binding. Arguments: scheme: 'http' or 'https' server: server FQDN string credentials: credential secrets, e.g. cookie caching: whether to retain a GET response cache Deriva Client Context: You MAY mutate self.dcctx to customize the context for this service endpoint prior to invoking web requests. E.g.: self.dcctx['cid'] = 'my application name' You MAY also supply custom per-request context by passing a headers dict to web request methods, e.g. self.get(..., headers={'deriva-client-context': {'action': 'myapp/function1'}}) This custom header will be merged as override values with the default context in self.dcctx in order to form the complete context for the request. """ self._base_server_uri = "%s://%s" % ( scheme, server ) self._server_uri = self._base_server_uri self.session_config = DEFAULT_SESSION_CONFIG if not session_config else session_config self._session = None self._get_new_session(self.session_config) self._caching = caching self._cache = {} self._response_raise_for_status = _response_raise_for_status self.dcctx = DerivaClientContext() self.set_credentials(credentials, server)
[docs] def get_server_uri(self): return self._server_uri
def _get_new_session(self, session_config=None): self._close_session() self._session = get_new_requests_session(self._server_uri + '/', session_config if session_config else self.session_config) # allow loopback requests to bypass SSL cert verification if "https://localhost" in self._server_uri: self._session.verify = False def _pre_get(self, path, headers): self.check_path(path) url = self._server_uri + path headers = headers.copy() prev_response = self._cache.get(url) if prev_response and 'etag' in prev_response.headers \ and not ('if-none-match' in headers or 'if-match' in headers): headers['if-none-match'] = prev_response.headers['etag'] else: prev_response = None headers['deriva-client-context'] = self.dcctx.merged(headers.get('deriva-client-context', {})).encoded() return url, headers, prev_response def _pre_mutate(self, path, headers, guard_response=None): self.check_path(path) url = self._server_uri + path headers = headers.copy() if guard_response and 'etag' in guard_response.headers: headers['If-Match'] = guard_response.headers['etag'] headers['deriva-client-context'] = self.dcctx.merged(headers.get('deriva-client-context', {})).encoded() return url, headers
[docs] @staticmethod def check_path(path): if not path: raise DerivaPathError("Path not specified") if not path.startswith("/"): raise DerivaPathError("Malformed path error (not rooted with \"/\"): %s" % path)
@staticmethod def _raise_for_status_304(r, p, raise_not_modified): if r.status_code == 304: if raise_not_modified: raise NotModified(p or r) else: return p or r _response_raise_for_status(r) setattr(r, 'raise_for_status', _response_raise_for_status.__get__(r)) return r @staticmethod def _raise_for_status_412(r): if r.status_code == 412: raise ConcurrentUpdate(r) _response_raise_for_status(r) setattr(r, 'raise_for_status', _response_raise_for_status.__get__(r)) return r
[docs] def set_credentials(self, credentials, server): if not credentials: return assert self._session is not None if 'bearer-token' in credentials: self._session.headers.update({'Authorization': 'Bearer {token}'.format(token=credentials['bearer-token'])}) elif 'cookie' in credentials: cname, cval = credentials['cookie'].split('=', 1) self._session.cookies.set(cname, cval, domain=server, path='/') elif 'username' in credentials and 'password' in credentials: self.post_authn_session(credentials)
[docs] def get_authn_session(self): headers = { 'deriva-client-context': self.dcctx.encoded() } r = self._session.get(self._base_server_uri + "/authn/session", headers=headers) _response_raise_for_status(r) return r
[docs] def post_authn_session(self, credentials): headers = { 'deriva-client-context': self.dcctx.encoded() } r = self._session.post(self._base_server_uri + "/authn/session", data=credentials, headers=headers) _response_raise_for_status(r) return r
[docs] def head(self, path, headers=DEFAULT_HEADERS, raise_not_modified=False): """Perform HEAD request, returning response object. Arguments: path: the path within this bound server headers: headers to set in request raise_not_modified: raise HTTPError for 304 response status when true. May consult built-in cache and apply 'if-none-match' request header unless input headers already include 'if-none-match' or 'if-match'. On cache hit, returns cached response unless raise_not_modified=true. Cached response may include content retrieved by GET on the same resource. """ url, headers, prev_response = self._pre_get(path, headers) return self._raise_for_status_304( self._session.head(url, headers=headers), prev_response, raise_not_modified )
[docs] def get(self, path, headers=DEFAULT_HEADERS, raise_not_modified=False, stream=False): """Perform GET request, returning response object. Arguments: path: the path within this bound server headers: headers to set in request raise_not_modified: raise HTTPError for 304 response status when true. stream: whether to defer content retrieval to streaming access mode on response object. May consult built-in cache and apply 'if-none-match' request header unless input headers already include 'if-none-match' or 'if-match'. On cache hit, returns cached response unless raise_not_modified=true. Caching of new results is disabled when stream=True. """ if headers is None: headers = {} url, headers, prev_response = self._pre_get(path, headers) r = self._raise_for_status_304( self._session.get(url, headers=headers), prev_response, raise_not_modified ) if self._caching and not stream: self._cache[url] = r return r
[docs] def post(self, path, data=None, json=None, headers=DEFAULT_HEADERS): """Perform POST request, returning response object. Arguments: path: the path within this bound server data: a buffer or file-like content value json: data to serialize as JSON content headers: headers to set in request Raises ConcurrentUpdate for 412 status. """ url, headers = self._pre_mutate(path, headers) r = self._session.post(url, data=data, json=json, headers=headers) return self._raise_for_status_412(r)
[docs] def put(self, path, data=None, json=None, headers=DEFAULT_HEADERS, guard_response=None): """Perform PUT request, returning response object. Arguments: path: the path within this bound server data: a buffer or file-like content value json: data to serialize as JSON content headers: headers to set in request guard_response: expected current resource state as previously seen response object. Uses guard_response to build appropriate 'if-match' header to assure change is only applied to expected state. Raises ConcurrentUpdate for 412 status. """ url, headers = self._pre_mutate(path, headers, guard_response) r = self._session.put(url, data=data, json=json, headers=headers) return self._raise_for_status_412(r)
[docs] def delete(self, path, headers=DEFAULT_HEADERS, guard_response=None): """Perform DELETE request, returning response object. Arguments: path: the path within this bound server headers: headers to set in request guard_response: expected current resource state as previously seen response object. Uses guard_response to build appropriate 'if-match' header to assure change is only applied to expected state. Raises ConcurrentUpdate for 412 status. """ url, headers = self._pre_mutate(path, headers, guard_response) r = self._session.delete(url, headers=headers) return self._raise_for_status_412(r)
def _close_session(self): if self._session is not None: self._session.close() self._session = None def __del__(self): self._close_session()