From 436504fe72ab1e85737a47a0d8d2eb3cfc563d37 Mon Sep 17 00:00:00 2001 From: Georg Grab Date: Wed, 23 Feb 2022 12:14:25 +0100 Subject: [PATCH] Add onlyMatching argument to syncFromSynapse to filter downloads --- synapseutils/sync.py | 51 ++++++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/synapseutils/sync.py b/synapseutils/sync.py index 769b1954a..a39a3f12a 100644 --- a/synapseutils/sync.py +++ b/synapseutils/sync.py @@ -46,7 +46,7 @@ def _sync_executor(syn): def syncFromSynapse(syn, entity, path=None, ifcollision='overwrite.local', allFiles=None, followLink=False, - manifest="all", downloadFile=True): + manifest="all", downloadFile=True, onlyMatching=None): """Synchronizes all the files in a folder (including subfolders) from Synapse and adds a readme manifest with file metadata. @@ -69,6 +69,11 @@ def syncFromSynapse(syn, entity, path=None, ifcollision='overwrite.local', allFi :param downloadFile Determines whether downloading the files. Defaults to True + :param onlyMatching Determines list of regexes to be matched against files. + Only if at least one file matches the regex, it will + be downloaded. + Defaults to None + :returns: list of entities (files, tables, links) This function will crawl all subfolders of the project/folder specified by `entity` and download all files that have @@ -104,7 +109,7 @@ def syncFromSynapse(syn, entity, path=None, ifcollision='overwrite.local', allFi # 2 threads always, if those aren't available then we'll run single threaded to avoid a deadlock with _sync_executor(syn) as executor: sync_from_synapse = _SyncDownloader(syn, executor) - files = sync_from_synapse.sync(entity, path, ifcollision, followLink, downloadFile, manifest) + files = sync_from_synapse.sync(entity, path, ifcollision, followLink, downloadFile, manifest, onlyMatching) # the allFiles parameter used to be passed in as part of the recursive implementation of this function # with the public signature invoking itself. now that this isn't a recursive any longer we don't need @@ -225,7 +230,7 @@ def __init__(self, syn, executor: concurrent.futures.Executor, max_concurrent_fi max_concurrent_file_downloads = max(int(max_concurrent_file_downloads or self._syn.max_threads / 2), 1) self._file_semaphore = threading.BoundedSemaphore(max_concurrent_file_downloads) - def sync(self, entity, path, ifcollision, followLink, downloadFile=True, manifest="all"): + def sync(self, entity, path, ifcollision, followLink, downloadFile=True, manifest="all", onlyMatching=None): progress = CumulativeTransferProgress('Downloaded') if is_synapse_id(entity): @@ -238,7 +243,7 @@ def sync(self, entity, path, ifcollision, followLink, downloadFile=True, manifes ) if is_container(entity): - root_folder_sync = self._sync_root(entity, path, ifcollision, followLink, progress, downloadFile, manifest) + root_folder_sync = self._sync_root(entity, path, ifcollision, followLink, progress, downloadFile, manifest, onlyMatching) # once the whole folder hierarchy has been traversed this entrant thread waits for # all file downloads to complete before returning @@ -256,7 +261,7 @@ def sync(self, entity, path, ifcollision, followLink, downloadFile=True, manifes files.sort(key=lambda f: f.get('path') or '') return files - def _sync_file(self, entity_id, parent_folder_sync, path, ifcollision, followLink, progress, downloadFile): + def _sync_file(self, entity_id, parent_folder_sync, path, ifcollision, followLink, progress, downloadFile, onlyMatching): try: # we use syn.get to download the File. # these context managers ensure that we are using some shared state @@ -264,14 +269,31 @@ def _sync_file(self, entity_id, parent_folder_sync, path, ifcollision, followLin # by all multi threaded downloads in this sync) with progress.accumulate_progress(), \ download_shared_executor(self._executor): - - entity = self._syn.get( - entity_id, - downloadLocation=path, - ifcollision=ifcollision, - followLink=followLink, - downloadFile=downloadFile, - ) + + file_matches = True + if onlyMatching is not None: + file_matches = False + entity_meta = self._syn.get( + entity_id, + downloadFile=False, + ) + for regex in onlyMatching: + if re.match(regex, entity_meta.name) is not None: + file_matches = True + + if file_matches: + entity = self._syn.get( + entity_id, + downloadLocation=path, + ifcollision=ifcollision, + followLink=followLink, + downloadFile=downloadFile, + ) + else: + parent_folder_sync.update( + finished_id=entity_id, + ) + return files = [] provenance = None @@ -302,7 +324,7 @@ def _sync_file(self, entity_id, parent_folder_sync, path, ifcollision, followLin finally: self._file_semaphore.release() - def _sync_root(self, root, root_path, ifcollision, followLink, progress, downloadFile, manifest="all"): + def _sync_root(self, root, root_path, ifcollision, followLink, progress, downloadFile, manifest="all", onlyMatching=None): # stack elements are a 3-tuple of: # 1. the folder entity/dict # 2. the local path to the folder to download to @@ -372,6 +394,7 @@ def _sync_root(self, root, root_path, ifcollision, followLink, progress, downloa followLink, progress, downloadFile, + onlyMatching, ) for child_folder in child_folders: