Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYNPY-1548] Swap to a FIFO queue #1147

Merged
merged 14 commits into from
Dec 17, 2024
76 changes: 24 additions & 52 deletions synapseclient/core/download/download_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

import httpx

from synapseclient.api.file_services import (
get_file_handle_for_download,
get_file_handle_for_download_async,
)
from synapseclient.api.file_services import get_file_handle_for_download
from synapseclient.core.exceptions import (
SynapseDownloadAbortedException,
_raise_for_status_httpx,
Expand All @@ -29,6 +26,7 @@
RETRYABLE_CONNECTION_ERRORS,
RETRYABLE_CONNECTION_EXCEPTIONS,
with_retry_time_based,
with_retry_time_based_async,
)
from synapseclient.core.transfer_bar import get_or_create_download_progress_bar

Expand Down Expand Up @@ -110,24 +108,6 @@ class PresignedUrlProvider:
# offset parameter used to buffer url expiration checks, time in seconds
_TIME_BUFFER: datetime.timedelta = datetime.timedelta(seconds=5)

async def get_info_async(self) -> PresignedUrlInfo:
"""
Using async, returns the cached info if it's not expired, otherwise
retrieves a new pre-signed url and returns that.

Returns:
Information about a retrieved presigned-url from either the cache or a
new request
"""
if not self._cached_info or (
datetime.datetime.now(tz=datetime.timezone.utc)
+ PresignedUrlProvider._TIME_BUFFER
>= self._cached_info.expiration_utc
):
self._cached_info = await self._get_pre_signed_info_async()

return self._cached_info

def get_info(self) -> PresignedUrlInfo:
"""
Using a thread lock, returns the cached info if it's not expired, otherwise
Expand Down Expand Up @@ -168,27 +148,6 @@ def _get_pre_signed_info(self) -> PresignedUrlInfo:
expiration_utc=_pre_signed_url_expiration_time(pre_signed_url),
)

async def _get_pre_signed_info_async(self) -> PresignedUrlInfo:
"""
Make an HTTP request to get a pre-signed url to download a file.

Returns:
Information about a retrieved presigned-url from a new request.
"""
response = await get_file_handle_for_download_async(
file_handle_id=self.request.file_handle_id,
synapse_id=self.request.object_id,
entity_type=self.request.object_type,
synapse_client=self.client,
)
file_name = response["fileHandle"]["fileName"]
pre_signed_url = response["preSignedURL"]
return PresignedUrlInfo(
file_name=file_name,
url=pre_signed_url,
expiration_utc=_pre_signed_url_expiration_time(pre_signed_url),
)


def _generate_chunk_ranges(
file_size: int,
Expand Down Expand Up @@ -232,40 +191,47 @@ def _pre_signed_url_expiration_time(url: str) -> datetime:
return return_data


async def _get_file_size_wrapper(syn: "Synapse", url: str, debug: bool) -> int:
async def _get_file_size_wrapper(
syn: "Synapse", url_provider: PresignedUrlProvider, debug: bool
) -> int:
"""
Gets the size of the file located at url

Arguments:
syn: The synapseclient
url: The pre-signed url of the file
url_provider: A URL provider for the presigned urls
debug: A boolean to specify if debug mode is on

Returns:
The size of the file in bytes
"""

loop = asyncio.get_running_loop()
return await loop.run_in_executor(
syn._get_thread_pool_executor(asyncio_event_loop=loop),
_get_file_size,
syn,
url,
url_provider,
debug,
)


def _get_file_size(syn: "Synapse", url: str, debug: bool) -> int:
def _get_file_size(
syn: "Synapse", presigned_url_provider: PresignedUrlProvider, debug: bool
) -> int:
"""
Gets the size of the file located at url

Arguments:
url: The pre-signed url of the file
url_provider: A URL provider for the presigned urls
debug: A boolean to specify if debug mode is on

Returns:
The size of the file in bytes
"""
with syn._requests_session_storage.stream("GET", url) as response:
with syn._requests_session_storage.stream(
method="GET", url=presigned_url_provider.get_info().url
) as response:
Comment on lines +232 to +234
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the logic to here aligns with how the logic works when streaming each individual part of the files. This is executing in a different thread so the presigned URL should be instantly used.

I also wrapped this in a retry block as well to ensure it'll remain functional.

_raise_for_status_httpx(
response=response,
logger=syn.logger,
Expand Down Expand Up @@ -306,9 +272,15 @@ async def download_file(self) -> None:
"""
url_provider = PresignedUrlProvider(self._syn, request=self._download_request)

url_info = await url_provider.get_info_async()
file_size = await _get_file_size_wrapper(
syn=self._syn, url=url_info.url, debug=self._download_request.debug
file_size = await with_retry_time_based_async(
function=lambda: _get_file_size_wrapper(
syn=self._syn,
url_provider=url_provider,
debug=self._download_request.debug,
),
retry_status_codes=[403],
retry_max_wait_before_failure=30,
read_response_content=False,
)
self._progress_bar = get_or_create_download_progress_bar(
file_size=file_size,
Expand Down
38 changes: 29 additions & 9 deletions synapseclient/core/download/download_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ async def download_file_entity(
if_collision=if_collision,
synapse_cache_location=synapse_cache_location,
cached_file_path=cached_file_path,
entity_id=getattr(entity, "id", None),
synapse_client=client,
)
if download_path is None:
Expand All @@ -157,9 +158,15 @@ async def download_file_entity(
if not os.path.exists(download_location):
os.makedirs(download_location)
client.logger.info(
f"Copying existing file from {cached_file_path} to {download_path}"
f"[{getattr(entity, 'id', None)}:{file_name}]: Copying existing "
f"file from {cached_file_path} to {download_path}"
)
shutil.copy(cached_file_path, download_path)
else:
client.logger.info(
f"[{getattr(entity, 'id', None)}:{file_name}]: Found existing file "
f"at {download_path}, skipping download."
)

else: # download the file from URL (could be a local file)
object_type = "FileEntity" if submission is None else "SubmissionAttachment"
Expand Down Expand Up @@ -257,6 +264,7 @@ async def download_file_entity_model(
if_collision=if_collision,
synapse_cache_location=synapse_cache_location,
cached_file_path=cached_file_path,
entity_id=file.id,
synapse_client=client,
)
if download_path is None:
Expand All @@ -268,9 +276,13 @@ async def download_file_entity_model(
if not os.path.exists(download_location):
os.makedirs(download_location)
client.logger.info(
f"Copying existing file from {cached_file_path} to {download_path}"
f"[{file.id}:{file_name}]: Copying existing file from {cached_file_path} to {download_path}"
)
shutil.copy(cached_file_path, download_path)
else:
client.logger.info(
f"[{file.id}:{file_name}]: Found existing file at {download_path}, skipping download."
)
Comment on lines +282 to +285
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In lieu of structured logging setup this is at least a start to get to the point where logging is clearer within the client. I updated a bunch of the messages around the download process so that it's clear what Synapse ID/file/entity produced the message.


else: # download the file from URL (could be a local file)
object_type = "FileEntity" if submission is None else "SubmissionAttachment"
Expand Down Expand Up @@ -526,7 +538,7 @@ def download_fn(
),
)

syn.logger.info(f"Downloaded {synapse_id} to {downloaded_path}")
syn.logger.info(f"[{synapse_id}]: Downloaded to {downloaded_path}")
syn.cache.add(
file_handle["id"], downloaded_path, file_handle.get("contentMd5", None)
)
Expand All @@ -541,7 +553,8 @@ def download_fn(
exc_info = sys.exc_info()
ex.progress = 0 if not hasattr(ex, "progress") else ex.progress
syn.logger.debug(
f"\nRetrying download on error: [{exc_info[0]}] after progressing {ex.progress} bytes",
f"\n[{synapse_id}]: Retrying "
f"download on error: [{exc_info[0]}] after progressing {ex.progress} bytes",
exc_info=True,
) # this will include stack trace
if ex.progress == 0: # No progress was made reduce remaining retries.
Expand Down Expand Up @@ -669,7 +682,7 @@ def download_from_url(
actual_md5 = None
redirect_count = 0
delete_on_md5_mismatch = True
client.logger.debug(f"Downloading from {url} to {destination}")
client.logger.debug(f"[{entity_id}]: Downloading from {url} to {destination}")
while redirect_count < REDIRECT_LIMIT:
redirect_count += 1
scheme = urllib_urlparse.urlparse(url).scheme
Expand Down Expand Up @@ -854,7 +867,8 @@ def _ftp_report_hook(
)
increment_progress_bar(n=transferred, progress_bar=progress_bar)
client.logger.debug(
f"Resuming partial download to {temp_destination}. "
f"[{entity_id}]: Resuming "
f"partial download to {temp_destination}. "
f"{previously_transferred}/{to_be_transferred} bytes already "
"transferred."
)
Expand Down Expand Up @@ -894,7 +908,8 @@ def _ftp_report_hook(
# verify that the file was completely downloaded and retry if it is not complete
if to_be_transferred > 0 and transferred < to_be_transferred:
client.logger.warning(
"\nRetrying download because the connection ended early.\n"
f"\n[{entity_id}]: "
"Retrying download because the connection ended early.\n"
)
continue

Expand All @@ -903,7 +918,9 @@ def _ftp_report_hook(
shutil.move(temp_destination, destination)
break
else:
client.logger.error(f"Unable to download URLs of type {scheme}")
client.logger.error(
f"[{entity_id}]: " f"Unable to download URLs of type {scheme}"
BryanFauble marked this conversation as resolved.
Show resolved Hide resolved
)
return None

else: # didn't break out of loop
Expand Down Expand Up @@ -949,6 +966,7 @@ def resolve_download_path_collisions(
if_collision: str,
synapse_cache_location: str,
cached_file_path: str,
entity_id: str,
*,
synapse_client: Optional["Synapse"] = None,
) -> Union[str, None]:
Expand All @@ -964,6 +982,7 @@ def resolve_download_path_collisions(
May be "overwrite.local", "keep.local", or "keep.both".
synapse_cache_location: The location in .synapseCache where the file would be
corresponding to its FileHandleId.
entity_id: The entity id
cached_file_path: The file path of the cached copy

Raises:
Expand Down Expand Up @@ -1000,7 +1019,8 @@ def resolve_download_path_collisions(
pass # Let the download proceed and overwrite the local file.
elif if_collision == COLLISION_KEEP_LOCAL:
client.logger.info(
f"Found existing file at {download_path}, skipping download."
f"[{entity_id}]: Found existing "
f"file at {download_path}, skipping download."
)

# Don't want to overwrite the local file.
Expand Down
42 changes: 30 additions & 12 deletions synapseclient/core/transfer_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,17 @@ def increment_progress_bar(n: int, progress_bar: Union[tqdm, None]) -> None:

@contextmanager
def shared_download_progress_bar(
file_size: int, *, synapse_client: Optional["Synapse"] = None
file_size: int,
custom_message: str = None,
*,
synapse_client: Optional["Synapse"] = None,
):
"""An outside process that will eventually trigger a download through this module
can configure a shared Progress Bar by running its code within this context manager.

Arguments:
file_size: The size of the file being downloaded.
custom_message: A custom message to display on the progress bar instead of default.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.
Expand All @@ -86,22 +90,28 @@ def shared_download_progress_bar(

syn = Synapse.get_client(synapse_client=synapse_client)
with logging_redirect_tqdm(loggers=[syn.logger]):
get_or_create_download_progress_bar(file_size=file_size, synapse_client=syn)
get_or_create_download_progress_bar(
file_size=file_size, custom_message=custom_message, synapse_client=syn
)
try:
yield
finally:
_thread_local.progress_bar_download_context_managed = False
if _thread_local.progress_bar_download:
_thread_local.progress_bar_download.close()
_thread_local.progress_bar_download.refresh()
del _thread_local.progress_bar_download
close_download_progress_bar()


def close_download_progress_bar() -> None:
"""Handle closing the download progress bar if it is not context managed."""
if not _is_context_managed_download_bar():
def close_download_progress_bar(force_close: bool = False) -> None:
"""Handle closing the download progress bar if it is not context managed. This will
also only close the progress bar if there are no other downloads sharing it."""
if force_close or not _is_context_managed_download_bar():
progress_bar: tqdm = getattr(_thread_local, "progress_bar_download", None)
if progress_bar is not None:
transfer_count: int = getattr(_thread_local, "transfer_count", 0)
transfer_count -= 1
if transfer_count < 0:
transfer_count = 0

_thread_local.transfer_count = transfer_count
if progress_bar is not None and not transfer_count:
Comment on lines +108 to +114
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://sagebionetworks.jira.com/browse/SYNPY-1507 - There was an issue where using asyncio.gather to execute a bunch of download tasks in parallel would cause the progress bar to get closed, and no longer show. By tracking the files being transferred and increment when we open a new bar/decrement when closing a bar we can close the bar when the last transfer occurs. It allows the bar to remain open for the duration of the transfer and maintain it's progress/context throughout.

progress_bar.close()
progress_bar.refresh()
del _thread_local.progress_bar_download
Expand All @@ -113,7 +123,11 @@ def _is_context_managed_download_bar() -> bool:


def get_or_create_download_progress_bar(
file_size: int, postfix: str = None, *, synapse_client: Optional["Synapse"] = None
file_size: int,
postfix: str = None,
custom_message: str = None,
*,
synapse_client: Optional["Synapse"] = None,
) -> Union[tqdm, None]:
"""Return the existing progress bar if it exists, otherwise create a new one.

Expand All @@ -132,11 +146,15 @@ def get_or_create_download_progress_bar(
if syn.silent:
return None

transfer_count: int = getattr(_thread_local, "transfer_count", 0)
transfer_count += 1
_thread_local.transfer_count = transfer_count

progress_bar: tqdm = getattr(_thread_local, "progress_bar_download", None)
if progress_bar is None:
progress_bar = tqdm(
total=file_size,
desc="Downloading files",
desc=custom_message or "Downloading files",
unit="B",
unit_scale=True,
smoothing=0,
Expand Down
Loading
Loading