Skip to content

Commit

Permalink
add process_list_in_threads
Browse files Browse the repository at this point in the history
  • Loading branch information
superstes committed Nov 3, 2024
1 parent 721fad7 commit 2b81684
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 3 deletions.
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Versions

## 0.3

* PS
* `process_list_in_threads`

----

## 0.2

* Crypto
Expand All @@ -12,7 +19,7 @@
* `valid_port`
* `get_ipv`

* SubPS
* PS
* `process` - Stdin handling
* `wait_for_threads`

Expand Down Expand Up @@ -42,5 +49,8 @@
* `valid_public_ip`
* `valid_asn`

* PS
* `process`

* Django
* `datetime_from_db`
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ from oxl_utils.ps import process_cache

# thread handling
from oxl_utils.ps import wait_for_threads
from oxl_utils.ps import process_list_in_threads
```

----
Expand Down
26 changes: 26 additions & 0 deletions src/oxl_utils/ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from os import environ, getcwd
from functools import cache
from threading import Thread
from typing import Callable

from .log import log

Expand Down Expand Up @@ -100,3 +101,28 @@ def wait_for_threads(threads: list[Thread], timeout: int = 600):
while not threads_done and time() < break_time:
threads_done = all(not t.is_alive() for t in threads)
sleep(0.05)


def process_list_in_threads(callback: Callable, to_process: list, key: str = 'item', parallel: int = 10):
next_idx = 0
last_idx = len(to_process) - 1
threads = []

while next_idx <= last_idx:
finished = [t for t in threads if not t.is_alive()]
for t in finished:
threads.remove(t)

if len(threads) >= parallel:
sleep(0.05)
continue

t = Thread(
target=callback,
kwargs={key: to_process[next_idx]},
)
threads.append(t)
t.start()
next_idx += 1

wait_for_threads(threads, timeout=3600)
23 changes: 21 additions & 2 deletions src/oxl_utils/ps_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from time import sleep
from threading import Thread
from time import sleep, time
from threading import Thread, Lock

import pytest

Expand Down Expand Up @@ -107,3 +107,22 @@ def _dummy_workload(s: int):

for t in threads:
assert not t.is_alive()


def test_process_list_in_threads():
from .ps import process_list_in_threads
lock = Lock()
results = []

def _dummy_workload(s: int):
sleep(s)
with lock:
results.append(s)

start_time = time()
workload = [1, 1, 1, 1]
process_list_in_threads(callback=_dummy_workload, to_process=workload, key='s', parallel=4)
elapsed = time() - start_time

assert 1 < elapsed < 1.2
assert len(results) == 4

0 comments on commit 2b81684

Please sign in to comment.