Skip to content

Commit

Permalink
fix: normalized how exceptions are handled
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Kryukov committed Aug 8, 2023
1 parent 8d8e3b7 commit a7415b5
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 12 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ Tasks will only be submited when there are available workers (specified by
tasks than amount of available workers (so workers won't have to wait for
task creation). Refer to `tests/test_queue.py` for details.

Exceptions inside of the tasks craeted with executors are ignored. Refer
to `tests/test_exceptions.py` for details.

## Example / Showcase

```py
Expand Down
11 changes: 4 additions & 7 deletions bounded_pool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from abc import ABC
from asyncio import Semaphore, Task, create_task
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from typing import List, Optional
from typing import Optional


class BoundedExecutor(ABC):
Expand All @@ -31,27 +31,24 @@ def _get_default_max_workers():
return min(32, (os.cpu_count() or 1) + 4)

async def __aenter__(self):
self._tasks: List[Task] = []
self._semaphore = Semaphore(self._semaphore_size)
return self

async def __aexit__(self, exc_type, exc_value, traceback):
while self._tasks:
await self._tasks.pop()
# make sure no tasks being executed
for _ in range(self._semaphore_size):
await self._semaphore.acquire()

async def _acquire(self):
await self._semaphore.acquire()

def _release(self, fut):
if fut in self._tasks:
self._tasks.remove(fut)
self._semaphore.release()

async def submit(self, coro, *args, **kwargs) -> Task:
await self._acquire()
task = create_task(coro(*args, **kwargs))
task.add_done_callback(self._release)
self._tasks.append(task)
return task


Expand Down
4 changes: 0 additions & 4 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,12 @@ async def test_asyncio_executor_sleeps():
for _ in range(2):
await pool.submit(asyncio.sleep, 1)

assert not pool._tasks

async with aassert_takes(more=3.5):
async with BoundedAsyncioPoolExecutor(1) as pool:
async with aassert_takes(more=3):
for _ in range(4):
await pool.submit(asyncio.sleep, 1)

assert not pool._tasks


@pytest.mark.parametrize('cls', [BoundedThreadPoolExecutor, BoundedProcessPoolExecutor])
def test_sync_executor_sleeps(cls):
Expand Down
43 changes: 43 additions & 0 deletions tests/test_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pytest

from bounded_pool import (
BoundedAsyncioPoolExecutor,
BoundedProcessPoolExecutor,
BoundedThreadPoolExecutor,
)


def raiser():
raise ValueError()


async def araiser():
raise ValueError()


@pytest.mark.anyio
async def test_asyncio_executor_ignores_exceptions():
async with BoundedAsyncioPoolExecutor() as pool:
task = await pool.submit(araiser)

with pytest.raises(ValueError):
await task

task = await pool.submit(araiser)

with pytest.raises(ValueError):
await task


@pytest.mark.parametrize('cls', [BoundedThreadPoolExecutor, BoundedProcessPoolExecutor])
def test_sync_executor_ignores_exceptions(cls):
with cls() as pool:
task = pool.submit(raiser)

with pytest.raises(ValueError):
task.result()

task = pool.submit(raiser)

with pytest.raises(ValueError):
task.result()
2 changes: 1 addition & 1 deletion tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def test_asyncio_executor_queues():


@pytest.mark.parametrize('cls', [BoundedThreadPoolExecutor, BoundedProcessPoolExecutor])
def test_sync_executor_sleeps(cls):
def test_sync_executor_queues(cls):
with cls(1, 1) as pool:
with assert_takes(more=1, less=2):
with assert_takes(less=0.5):
Expand Down

0 comments on commit a7415b5

Please sign in to comment.