Skip to content

Commit

Permalink
Merge pull request #26 from WEHI-ResearchComputing/aiostream
Browse files Browse the repository at this point in the history
Limited concurrency, progress bars, retry
  • Loading branch information
multimeric authored Aug 14, 2024
2 parents 32f03d7 + c0dceb0 commit d46bf0a
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 177 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v4
Expand All @@ -20,4 +20,6 @@ jobs:
run: pip install .
- uses: jakebailey/pyright-action@v2
with:
version: 1.1.367
version: 1.1.376
- name: Test CLI help
run: filesender --help
163 changes: 70 additions & 93 deletions docs/benchmark.ipynb

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
# Changelog

## Version 1.4.0
## Version 2.0.0

### Added

* Implement directory tree uploads. You can now pass directory paths anywhere you could previously just pass file paths, including the CLI and the Python API. However, note that currently the directory hierarchy won't be preserved. So if you upload `dir_a/file_a.txt` and `dir_b/file_b.txt`, they will simply be downloaded as `file_a.txt` and `file_b.txt` with their directories stripped out. This is a limitation of the current API. See https://github.com/filesender/filesender/issues/1555 for context.
* A progress bar to uploads

### Fixed

* Memory usage in 1.3.0 was incredibly high, because the entire file was being kept in memory due to a bug. This is now resolved

### Changed

* The `concurrent_reqs` and `concurrent_reads` client arguments, as well as their corresponding CLI arguments have been replaced by `concurrent_chunks` and `concurrent_files`. Read the docs to understand how these work
* Automatic retries for transient failures.

## Version 1.3.0

Expand Down
8 changes: 8 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Command Line Interface

::: mkdocs-typer
:module: filesender.main
:depth: 1
:command: app
:prog_name: filesender

137 changes: 87 additions & 50 deletions filesender/api.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,34 @@
from typing import Any, Coroutine, Iterable, List, Optional, Tuple, AsyncIterator, Set
from typing import Any, Iterable, List, Optional, Tuple, AsyncIterator, Set
from bs4 import BeautifulSoup
import filesender.response_types as response
import filesender.request_types as request
from urllib.parse import urlparse, urlunparse, unquote
from filesender.auth import Auth
from pathlib import Path
from httpx import Request, AsyncClient, HTTPStatusError, RequestError
from asyncio import Semaphore, gather
from httpx import Request, AsyncClient, HTTPStatusError, RequestError, ReadError
import math
import aiofiles
from aiostream import stream
from contextlib import contextmanager
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception
import logging
from tqdm.asyncio import tqdm

logger = logging.getLogger(__name__)

def should_retry(e: BaseException) -> bool:
"""
Returns True if the exception is a transient exception from the FileSender server,
that can be retried.
"""
if isinstance(e, ReadError):
# Seems to be just a bug in the backend
# https://github.com/encode/httpx/discussions/2941
return True
elif isinstance(e, HTTPStatusError) and e.response.status_code == 500 and e.response.json()["message"] == "auth_remote_too_late":
# These errors are caused by lag between creating the response and it being received
return True
return False


def url_without_scheme(url: str) -> str:
Expand All @@ -34,6 +54,7 @@ def raise_status():
f"Request failed with content {e.response.text} for request {e.request.method} {e.request.url}"
) from e
except RequestError as e:
# TODO: check for SSL read error
raise Exception(
f"Request failed for request {e.request.method} {e.request.url}"
) from e
Expand Down Expand Up @@ -88,18 +109,16 @@ class FileSenderClient:
auth: Auth
# Session to use for all HTTP requests
http_client: AsyncClient
#: Limits concurrent reads
_read_sem: Semaphore
#: Limits concurrent requests
_req_sem: Semaphore
concurrent_files: Optional[int]
concurrent_chunks: Optional[int]

def __init__(
self,
base_url: str,
chunk_size: Optional[int] = None,
auth: Auth = Auth(),
concurrent_reads: Optional[int] = None,
concurrent_requests: Optional[int] = None,
concurrent_files: Optional[int] = 1,
concurrent_chunks: Optional[int] = 2,
):
"""
Args:
Expand All @@ -112,22 +131,20 @@ def __init__(
auth: The authentication method.
This is optional, but you almost always want to provide it.
Generally you will want to use [`UserAuth`][filesender.UserAuth] or [`GuestAuth`][filesender.GuestAuth].
concurrent_reads: The maximum number of file chunks that can be processed at a time. Reducing this number will decrease the memory
usage of the application. None, the default value, sets no limit.
See <https://wehi-researchcomputing.github.io/FileSenderCli/benchmark> for a detailed explanation of this parameter.
concurrent_requests: The maximum number of API requests the client can be waiting for at a time. Reducing this number will decrease the memory
usage of the application. None, the default value, sets no limit.
See <https://wehi-researchcomputing.github.io/FileSenderCli/benchmark> for a detailed explanation of this parameter.
concurrent_files: The number of files that will be uploaded concurrently.
This works multiplicatively with `concurrent_chunks`, so `concurrent_files=2, concurrent_chunks=2` means 4 total
chunks of data will be stored in memory and sent concurrently.
concurrent_chunks: The number of chunks that will be read from each file concurrently. Increase this number to
speed up transfers, or reduce this number to reduce memory usage and network errors.
This can be set to `None` to enable unlimited concurrency, but use at your own risk.
"""
self.base_url = base_url
self.auth = auth
# FileSender seems to sometimes use redirects
self.http_client = AsyncClient(timeout=None, follow_redirects=True)
self.chunk_size = chunk_size
# If we don't want a concurrency limit, we just use an infinitely large semaphore
# See: https://github.com/python/typeshed/issues/12147
self._read_sem = Semaphore(concurrent_reads or float("inf")) # type: ignore
self._req_sem = Semaphore(concurrent_requests or float("inf")) # type: ignore
self.concurrent_chunks = concurrent_chunks
self.concurrent_files = concurrent_files

async def prepare(self) -> None:
"""
Expand All @@ -146,11 +163,20 @@ async def _sign_send(self, request: Request) -> Any:
"""
Signs a request and sends it, returning the JSON result
"""
with raise_status():
return await self._sign_send_inner(request)

@retry(
retry=retry_if_exception(should_retry),
wait=wait_fixed(0.1),
stop=stop_after_attempt(5),
before_sleep=lambda x: logger.warn(f"Attempt {x.attempt_number}.{x.outcome}")
)
async def _sign_send_inner(self, request: Request) -> Any:
# Needs to be a separate function to handle retry policy correctly
self.auth.sign(request, self.http_client)
async with self._req_sem:
with raise_status():
res = await self.http_client.send(request)
res.raise_for_status()
res = await self.http_client.send(request)
res.raise_for_status()
return res.json()

async def create_transfer(
Expand Down Expand Up @@ -229,20 +255,25 @@ async def upload_file(self, file_info: response.File, path: Path) -> None:
Params:
file_info: Identifier obtained from the result of [`create_transfer`][filesender.FileSenderClient.create_transfer]
path: File path to the file to be uploaded
Returns: An async generator. You can force the uploads to occur using `list()`.
"""
if self.chunk_size is None:
raise Exception(".prepare() has not been called!")

tasks: List[Coroutine[None, None, None]] = []
# Each chunk is read synchronously since `async for` is effectively synchronous
async for chunk, offset in yield_chunks(path, self.chunk_size):
async with self._read_sem:
# However, the upload is not awaited, which allows them to run in parallel
tasks.append(
self._upload_chunk(chunk=chunk, offset=offset, file_info=file_info)
)
# Pause until all running tasks are finished
await gather(*tasks)
async def _task_generator(chunk_size: int) -> AsyncIterator[Tuple[response.File, int, bytes]]:
async for chunk, offset in yield_chunks(path, chunk_size):
yield file_info, offset, chunk

async with (
stream.starmap(
_task_generator(self.chunk_size),
self._upload_chunk, # type: ignore
task_limit=self.concurrent_chunks
)
).stream() as streamer:
async for _ in tqdm(streamer, total=math.ceil(file_info["size"] / self.chunk_size), desc=file_info["name"]): # type: ignore
pass

async def _upload_chunk(
self,
Expand Down Expand Up @@ -310,12 +341,17 @@ async def download_files(
token: Obtained from the transfer email. The same as [`GuestAuth`][filesender.GuestAuth]'s `guest_token`.
out_dir: The path to write the downloaded files.
"""

file_ids = await self._files_from_token(token)

async def _download_args() -> AsyncIterator[Tuple[str, Any, Path]]:
"Yields tuples of arguments to pass to download_file"
for file_id in file_ids:
yield token, file_id, out_dir

# Each file is downloaded in parallel
tasks = [
self.download_file(token=token, file_id=file, out_dir=out_dir)
for file in await self._files_from_token(token)
]
await gather(*tasks)
# Pyright messes this up
await stream.starmap(_download_args(), self.download_file, task_limit=self.concurrent_files) # type: ignore

async def download_file(
self,
Expand Down Expand Up @@ -368,16 +404,14 @@ async def upload_workflow(
Args:
files: A list of files and/or directories to upload.
transfer_args: Additional options to include when creating the transfer, for example a subject or message. See [`PartialTransfer`][filesender.request_types.PartialTransfer].
Returns:
: See [`Transfer`][filesender.response_types.Transfer]
"""
files_by_name = {key: value for key, value in iter_files(files)}
file_info: List[request.File] = [{"name": name, "size": file.stat().st_size} for name, file in files_by_name.items()]
transfer = await self.create_transfer(
{
"files": [
{"name": name, "size": file.stat().st_size} for name, file in files_by_name.items()
],
"files": file_info,
"options": {
"email_download_complete": True,
},
Expand All @@ -387,16 +421,19 @@ async def upload_workflow(
self.http_client.params = self.http_client.params.set(
"roundtriptoken", transfer["roundtriptoken"]
)
# Upload each file in parallel
# Note: update to TaskGroup once Python 3.10 is unsupported
tasks = [
self.upload_complete(file_info=file, path=files_by_name[file["name"]])
for file in transfer["files"]

async def _upload_args() -> AsyncIterator[Tuple[response.File, Path]]:
for file in transfer["files"]:
# Skip folders, which aren't real
if file["name"] in files_by_name
]
await gather(*tasks)
if file["name"] in files_by_name:
# Pyright seems to not understand that some fields are optional
yield file, files_by_name[file["name"]]

# Upload each file in parallel
# Pyright doesn't map the type signatures correctly here
await stream.starmap(_upload_args(), self.upload_complete, ordered=False, task_limit=self.concurrent_files) # type: ignore

# Mark the transfer as complete
transfer = await self.update_transfer(
transfer_id=transfer["id"], body={"complete": True}
)
Expand Down
2 changes: 1 addition & 1 deletion filesender/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def sign(self, request: SignType, client: AsyncClient) -> SignType:
else:
raise Exception("?")

request.url = request.url.copy_add_param("signature", signature.hexdigest())
request.url = request.url.copy_remove_param("signature").copy_add_param("signature", signature.hexdigest())
return request

@dataclass(unsafe_hash=True)
Expand Down
18 changes: 7 additions & 11 deletions filesender/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from secrets import token_bytes
import tempfile
from pathlib import Path
from typing import Any, Dict, Generator, Iterable, List, Tuple, Union
from typing import Any, Dict, Generator, List, Tuple
from dataclasses import dataclass
import multiprocessing as mp

Expand All @@ -24,8 +24,7 @@ class BenchResult:
"Memory in fractional sections"
memory: int
"Memory in bytes"
read_limit: int
req_limit: int
concurrent_chunks: int

@contextmanager
def make_tempfile(size: int, **kwargs: Any) -> Generator[Path, Any, None]:
Expand Down Expand Up @@ -71,21 +70,19 @@ async def upload_capture_mem(client_args: Dict[str, Any], upload_args: Dict[str,
return BenchResult(
memory=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss,
time = end - start,
read_limit=client_args["concurrent_reads"],
req_limit=client_args["concurrent_requests"],
concurrent_chunks=client_args["concurrent_chunks"]
)

def upload_capture_mem_sync(*args: Any) -> BenchResult:
return asyncio.run(upload_capture_mem(*args))

def benchmark(paths: List[Path], read_limit: Iterable[Union[int, float]], req_limit: Iterable[Union[int, float]], base_url: str, username: str, apikey: str, recipient: str) -> List[BenchResult]:
def benchmark(paths: List[Path], limit: int, base_url: str, username: str, apikey: str, recipient: str) -> List[BenchResult]:
"""
Runs a test upload using a variety of semaphore settings, and return one result for each.
Args:
paths: A list of files to upload. Typically generated by [`make_tempfiles`][filesender.benchmark.make_tempfiles]
read_limit: The value for `concurrent_reads` in the [`FileSenderClient`][filesender.FileSenderClient] constructor
req_limit: The value for `concurrent_requests` in the [`FileSenderClient`][filesender.FileSenderClient] constructor
limit: The value for `concurrent_chunks` in the [`FileSenderClient`][filesender.FileSenderClient] constructor
base_url: The FileSender instance URL
username: Your username for accessing the FileSender instance
apikey: Your API key for accessing the FileSender instance
Expand All @@ -95,12 +92,11 @@ def benchmark(paths: List[Path], read_limit: Iterable[Union[int, float]], req_li
# The spawn context ensures that no memory is shared with the controlling process
with mp.get_context("spawn").Pool(processes=1) as pool:
args: List[Tuple[Any, ...]] = []
for concurrent_reads, concurrent_requests in zip(read_limit, req_limit):
for concurrent_chunks in range(1, limit):
args.append(({
"base_url": base_url,
"auth": UserAuth(api_key=apikey, username=username),
"concurrent_reads": concurrent_reads,
"concurrent_requests": concurrent_requests
"concurrent_chunks": concurrent_chunks,
},
{
"files": paths,
Expand Down
Loading

0 comments on commit d46bf0a

Please sign in to comment.