Source code for datalad.customremotes.archives

# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
#   See COPYING file distributed along with the datalad package for the
#   copyright and license terms.
#
### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Custom remote to get the load from archives present under annex"""

__docformat__ = 'restructuredtext'

import logging
import os
import os.path as op
import shutil
from operator import itemgetter
from pathlib import Path
from urllib.parse import urlparse

from annexremote import UnsupportedRequest

from datalad.consts import ARCHIVES_SPECIAL_REMOTE
from datalad.customremotes import RemoteError
from datalad.customremotes.main import main as super_main
from datalad.distribution.dataset import Dataset
from datalad.support.annexrepo import AnnexRepo
from datalad.support.archives import ArchivesCache
from datalad.support.cache import DictCache
from datalad.support.exceptions import CapturedException
from datalad.support.locking import lock_if_check_fails
from datalad.support.network import URL
from datalad.utils import (
    ensure_bytes,
    get_dataset_root,
    getpwd,
    unique,
    unlink,
)

from .base import AnnexCustomRemote

lgr = logging.getLogger('datalad.customremotes.archive')


# ####
# Preserve from previous version
# TODO: document intention
# ####
# this one might get under Runner for better output/control




# TODO: RF functionality not specific to being a custom remote (loop etc)
#       into a separate class
[docs] class ArchiveAnnexCustomRemote(AnnexCustomRemote): """Special custom remote allowing to obtain files from archives Archives must be under annex'ed themselves. """ CUSTOM_REMOTE_NAME = "archive" SUPPORTED_SCHEMES = ( AnnexCustomRemote._get_custom_scheme(CUSTOM_REMOTE_NAME),) # Since we support only 1 scheme here URL_SCHEME = SUPPORTED_SCHEMES[0] URL_PREFIX = URL_SCHEME + ":" COST = 500 def __init__(self, annex, path=None, persistent_cache=True, **kwargs): super().__init__(annex) # MIH figure out what the following is all about # in particular path==None self.repo = Dataset(get_dataset_root(Path.cwd())).repo \ if not path \ else AnnexRepo(path, create=False, init=False) self.path = self.repo.path # annex requests load by KEY not but URL which it originally asked # about. So for a key we might get back multiple URLs and as a # heuristic let's use the most recently asked one self._last_url = None # for heuristic to choose among multiple URLs self._cache = ArchivesCache(self.path, persistent=persistent_cache) self._contentlocations = DictCache(size_limit=100) # TODO: config ?
[docs] def stop(self, *args): """Stop communication with annex""" self._cache.clean()
[docs] def get_file_url(self, archive_file=None, archive_key=None, file=None, size=None): """Given archive (file or a key) and a file -- compose URL for access Examples -------- dl+archive:SHA256E-s176--69...3e.tar.gz#path=1/d2/2d&size=123 when size of file within archive was known to be 123 dl+archive:SHA256E-s176--69...3e.tar.gz#path=1/d2/2d when size of file within archive was not provided Parameters ---------- size: int, optional Size of the file. If not provided, will simply be empty """ assert(file is not None) if archive_file is not None: if archive_key is not None: raise ValueError( "Provide archive_file or archive_key - not both") archive_key = self.repo.get_file_annexinfo(archive_file)['key'] assert(archive_key is not None) attrs = dict() # looking forward for more if file: attrs['path'] = file.lstrip('/') if size is not None: attrs['size'] = size return str(URL(scheme=self.URL_SCHEME, path=archive_key, fragment=attrs))
@property def cache(self): return self._cache def _parse_url(self, url): """Parse url and return archive key, file within archive and additional attributes (such as size)""" url = URL(url) assert(url.scheme == self.URL_SCHEME) fdict = url.fragment_dict if 'path' not in fdict: # must be old-style key/path#size= assert '/' in url.path, "must be of key/path format" key, path = url.path.split('/', 1) else: key, path = url.path, fdict.pop('path') if 'size' in fdict: fdict['size'] = int(fdict['size']) return key, path, fdict def _gen_akey_afiles(self, key, sorted=False, unique_akeys=True): """Given a key, yield akey, afile pairs if `sorted`, then first those which have extracted version in local cache will be yielded Gets determined based on urls for datalad archives Made "generators all the way" as an exercise but also to delay any checks etc until really necessary. """ # we will need all URLs anyways later on ATM, so lets list() them # Anyways here we have a single scheme (archive) so there is not # much optimization possible urls = list(self.gen_URLS(key)) akey_afiles = [ self._parse_url(url)[:2] # skip size for url in urls ] if unique_akeys: akey_afiles = unique(akey_afiles, key=itemgetter(0)) if not sorted: for pair in akey_afiles: yield pair return # Otherwise we will go through each one # multiple URLs are available so we need to figure out which one # would be most efficient to "deal with" akey_afile_paths = ( ((akey, afile), self.get_contentlocation( akey, absolute=True, verify_exists=False )) for akey, afile in akey_afiles ) # by default get_contentlocation would return empty result for a key # which is not available locally. But we could still have extracted # archive in the cache. So we need pretty much get first all possible # and then only remove those which aren't present locally. So # verify_exists was added yielded = set() akey_afile_paths_ = [] # utilize cache to check which archives might already be present in the # cache for akey_afile, akey_path in akey_afile_paths: if akey_path and self.cache[akey_path].is_extracted: yield akey_afile yielded.add(akey_afile) akey_afile_paths_.append((akey_afile, akey_path)) # replace generators with already collected ones into a list. The idea # that in many cases we don't even need to create a full list and that # initial single yield would be enough, thus we don't need to check # locations etc for every possible hit akey_afile_paths = akey_afile_paths_ # if not present in the cache -- check which are present # locally and choose that one to use, so it would get extracted for akey_afile, akey_path in akey_afile_paths: if akey_path and op.exists(akey_path): yielded.add(akey_afile) yield akey_afile # So no archive is present either in the cache or originally under # annex XXX some kind of a heuristic I guess is to use last_url ;-) if self._last_url and self._last_url in urls \ and (len(urls) == len(akey_afiles)): akey_afile, _ = akey_afile_paths[urls.index(self._last_url)] yielded.add(akey_afile) yield akey_afile for akey_afile, _ in akey_afile_paths: if akey_afile not in yielded: yield akey_afile
[docs] def get_contentlocation(self, key, absolute=False, verify_exists=True): """Return (relative to top or absolute) path to the file containing the key This is a wrapper around AnnexRepo.get_contentlocation which provides caching of the result (we are asking the location for the same archive key often) """ if key not in self._contentlocations: fpath = self.repo.get_contentlocation(key, batch=True) if fpath: # shouldn't store empty ones self._contentlocations[key] = fpath else: fpath = self._contentlocations[key] # but verify that it exists if verify_exists and not op.lexists(op.join(self.path, fpath)): # prune from cache del self._contentlocations[key] fpath = '' if absolute and fpath: return op.join(self.path, fpath) else: return fpath
# Protocol implementation
[docs] def checkurl(self, url): # TODO: what about those MULTI and list to be returned? # should we return all filenames or keys within archive? # might be way too many? # only if just archive portion of url is given or the one pointing # to specific file? lgr.debug("Current directory: %s, url: %s", os.getcwd(), url) akey, afile, attrs = self._parse_url(url) size = attrs.get('size', None) # But reply that present only if archive is present # TODO: this would throw exception if not present, so this statement is # kinda bogus akey_path = self.get_contentlocation(akey, absolute=True) if akey_path: # Extract via cache only if size is not yet known if size is None: # if for testing we want to force getting the archive extracted efile = self.cache[akey_path].get_extracted_filename(afile) efile = ensure_bytes(efile) if op.exists(efile): size = os.stat(efile).st_size # so it was a good successful one -- record self._last_url = url if size is None: return True else: # FIXME: providing filename causes annex to not even talk to # ask upon drop :-/ return [dict(size=size)] # , basename(afile)) else: # TODO: theoretically we should first check if key is available # from any remote to know if file is available return False
[docs] def checkpresent(self, key): # TODO: so we need to maintain mapping from urls to keys. Then # we could even store the filename within archive # Otherwise it is unrealistic to even require to recompute key if we # knew the backend etc # The same content could be available from multiple locations within # the same archive, so let's not ask it twice since here we don't care # about "afile" for akey, _ in self._gen_akey_afiles(key, unique_akeys=True): if self.get_contentlocation(akey) \ or self.repo.is_available(akey, batch=True, key=True): return True # it is unclear to MIH why this must be UNKNOWN rather than FALSE # but this is how I found it raise RemoteError('Key not present')
[docs] def remove(self, key): raise UnsupportedRequest('This special remote cannot remove content')
# # TODO: proxy query to the underlying tarball under annex that if # # tarball was removed (not available at all) -- report success, # # otherwise failure (current the only one) # akey, afile = self._get_akey_afile(key) # if False: # # TODO: proxy, checking present of local tarball is not # # sufficient # # not exists(self.get_key_path(key)): # self.send("REMOVE-SUCCESS", akey) # else: # self.send("REMOVE-FAILURE", akey, # "Removal from file archives is not supported")
[docs] def whereis(self, key): return False
# although more logical is to report back success, it leads to imho # more confusing duplication. See # http://git-annex.branchable.com/design/external_special_remote_protocol/#comment-3f9588f6a972ae566347b6f467b53b54 # try: # key, file = self._get_akey_afile(key) # self.send("WHEREIS-SUCCESS", "file %s within archive %s" % (file, key)) # except ValueError: # self.send("WHEREIS-FAILURE")
[docs] def transfer_retrieve(self, key, file): akeys_tried = [] # the same file could come from multiple files within the same archive # So far it doesn't make sense to "try all" of them since if one fails # it means the others would fail too, so it makes sense to immediately # prune the list so we keep only the ones from unique akeys. # May be whenever we support extraction directly from the tarballs # we should go through all and choose the one easiest to get or smth. for akey, afile in self._gen_akey_afiles( key, sorted=True, unique_akeys=True): if not akey: lgr.warning("Got an empty archive key %r for key %s. Skipping", akey, key) continue akeys_tried.append(akey) try: with lock_if_check_fails( check=(self.get_contentlocation, (akey,)), lock_path=( lambda k: op.join(self.repo.path, '.git', 'datalad-archives-%s' % k), (akey,)), operation="annex-get" ) as (akey_fpath, lock): if lock: assert not akey_fpath self._annex_get_archive_by_key(akey) akey_fpath = self.get_contentlocation(akey) if not akey_fpath: raise RuntimeError( "We were reported to fetch it alright but now can't " "get its location. Check logic" ) akey_path = op.join(self.repo.path, akey_fpath) assert op.exists(akey_path), \ "Key file %s is not present" % akey_path # Extract that bloody file from the bloody archive # TODO: implement/use caching, for now a simple one # actually patool doesn't support extraction of a single file # https://github.com/wummel/patool/issues/20 # so pwd = getpwd() lgr.debug( "Getting file %s from %s while PWD=%s", afile, akey_path, pwd) was_extracted = self.cache[akey_path].is_extracted apath = self.cache[akey_path].get_extracted_file(afile) link_file_load(apath, file) if not was_extracted and self.cache[akey_path].is_extracted: self.message( "%s special remote is using an extraction cache " "under %s. Remove it with DataLad's 'clean' " "command to save disk space." % (ARCHIVES_SPECIAL_REMOTE, self.cache[akey_path].path), type='info', ) return except Exception as exc: ce = CapturedException(exc) self.message( "Failed to fetch {akey} containing {key}: {msg}".format( akey=akey, key=key, # we need to get rid of any newlines, or we might # break the special remote protocol msg=str(ce).replace('\n', '|') )) continue raise RemoteError( "Failed to fetch any archive containing {key}. " "Tried: {akeys_tried}".format(**locals()) )
[docs] def claimurl(self, url): scheme = urlparse(url).scheme if scheme in self.SUPPORTED_SCHEMES: return True else: return False
def _annex_get_archive_by_key(self, akey): # TODO: make it more stringent? # Command could have fail to run if key was not present locally yet # Thus retrieve the key using annex # TODO: we need to report user somehow about this happening and # progress on the download from humanize import naturalsize from datalad.support.annexrepo import AnnexJsonProtocol akey_size = self.repo.get_size_from_key(akey) self.message( "To obtain some keys we need to fetch an archive " "of size %s" % (naturalsize(akey_size) if akey_size else "unknown"), type='info', ) try: self.repo._call_annex( ["get", "--json", "--json-progress", "--key", akey], protocol=AnnexJsonProtocol, ) except Exception: self.message(f'Failed to fetch archive with key {akey}') raise
[docs] def main(): """cmdline entry point""" super_main( cls=ArchiveAnnexCustomRemote, remote_name='datalad-archives', description=\ "extract content from archives (.tar{,.gz}, .zip, etc) which are " "in turn managed by git-annex. See `datalad add-archive-content` " "command", )