Skip to content

Commit

Permalink
Refactor/nx prefix and move to thread (#2)
Browse files Browse the repository at this point in the history
* refactor: remove nx_ prefix from public API imports

This commit removes the nx_ prefix from public API imports to maintain consistency with decorator usage in examples and tests. The change was prompted by an inconsistency in the README where nx_signal was imported but used as @signal.

Changes include:
- Update imports in tests to use unprefixed versions (signal, with_signals, etc.)
- Keep nx_ prefix in internal implementation files (core.py, etc.)
- Update README examples to use consistent import style
- Add move_to_thread documentation to api.md
- Add emit documentation to signal decorator in api.md
- Add start/stop/queue_task documentation to worker decorator in api.md

The change makes the API more intuitive while maintaining the internal naming convention for better code organization.

* Bump version to 1.0.2 for release
---------

Co-authored-by: San <san.tekart@gmail.com>
  • Loading branch information
nexconnectio and san-tekart authored Jan 4, 2025
1 parent a0fc040 commit 485bf41
Show file tree
Hide file tree
Showing 21 changed files with 362 additions and 84 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.0.2] - 2025-01-04

### Changed
- Removed nx_ prefix from public API imports for better consistency:
- Updated imports in examples and tests to use unprefixed versions (signal, with_signals, etc.)
- Maintained nx_ prefix in internal implementation files
- Improved API documentation with detailed method descriptions

## [1.0.1] - 2025-01-02

### Changed
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ For background work, PynneX provides a `@nx_with_worker` decorator that:

**Worker Example**
```python
from pynnex import nx_with_worker, nx_signal
from pynnex import nx_with_worker, signal

@with_worker
class DataProcessor:
Expand Down
67 changes: 67 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,45 @@ Enables signal-slot functionality on a class. Classes decorated with `@nx_with_s

**Important**: `@nx_with_signals` expects that you already have an `asyncio` event loop running (e.g., via `asyncio.run(...)`) unless you only rely on synchronous slots in a single-thread scenario. When in doubt, wrap your main logic in an async function and call `asyncio.run(main())`.

**Key Methods**:

`move_to_thread(target_worker)`
Moves the instance to another thread by copying thread affinity from a worker. This allows dynamic thread reassignment of signal-slot objects.

- **Parameters:**
- **target_worker:** A worker instance decorated with `@nx_with_worker`. The instance will adopt this worker's thread affinity.
- **Raises:**
- **RuntimeError:** If the target worker's thread is not started.
- **TypeError:** If the target is not compatible (not decorated with `@nx_with_worker`).

**Example:**
```python
@nx_with_worker
class Worker:
async def run(self):
await self.wait_for_stop()

@nx_with_signals
class SignalEmitter:
@nx_signal
def value_changed(self):
pass

worker = Worker()
worker.start()

emitter = SignalEmitter()
emitter.move_to_thread(worker) # Now emitter runs in worker's thread
```

**Usage:**
```python
@nx_with_signals
class MyClass:
@nx_signal
def my_signal(self):
pass

**Usage:**
```python
@nx_with_signals
Expand All @@ -35,6 +74,15 @@ class MyClass:
### `@nx_signal`
Defines a signal within a class that has `@nx_with_signals`. Signals are callable attributes that, when emitted, notify all connected slots.

**Key Methods**:

`emit(*args, **kwargs)`
Emits the signal, invoking all connected slots with the provided arguments.

- **Parameters:**
- ***args:** Positional arguments to pass to the connected slots.
- ****kwargs:** Keyword arguments to pass to the connected slots.

**Usage:**

```python
Expand Down Expand Up @@ -72,6 +120,25 @@ Decorates a class to run inside a dedicated worker thread with its own event loo
- The `run(*args, **kwargs)` coroutine as the main entry point.
- A built-in async task queue via `queue_task`.

**Key Methods**:

`start(*args, **kwargs)`
Starts the worker thread and its event loop.
- **Parameters:**
- ***args:** Positional arguments passed to the worker's `run()` method.
- ****kwargs:** Keyword arguments passed to the worker's `run()` method.

`stop()`
Stops the worker thread and its event loop gracefully.
- Cancels any running tasks and waits for the thread to finish.

`queue_task(coro)`
Schedules a coroutine to run on the worker's event loop.
- **Parameters:**
- **coro:** A coroutine object to be executed in the worker thread.
- **Raises:**
- **RuntimeError:** If the worker is not started.

**Key Points:**

`run(*args, **kwargs)` is an async method that you can define to perform long-running operations or await a stopping event.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "pynnex"
version = "1.0.1"
version = "1.0.2"
description = "A Python Signal-Slot library"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
10 changes: 8 additions & 2 deletions src/pynnex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
nx_slot,
nx_graceful_shutdown,
NxConnectionType,
NxSignalConstants
NxConnection,
NxSignalConstants,
NxSignal,
_determine_connection_type
)
from .utils import nx_log_and_raise_error
from .contrib.patterns.worker.decorators import nx_with_worker
Expand All @@ -29,5 +32,8 @@
'nx_log_and_raise_error',
'nx_graceful_shutdown',
'NxConnectionType',
'NxSignalConstants'
'NxConnection',
'NxSignalConstants',
'NxSignal',
'_determine_connection_type'
]
81 changes: 71 additions & 10 deletions src/pynnex/contrib/patterns/worker/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class has the required asynchronous `initialize` and `finalize` methods,
import inspect
import logging
import threading
from pynnex.core import nx_signal
from pynnex.core import nx_signal, NxSignalConstants

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -185,7 +185,27 @@ async def start_queue(self):
)

def queue_task(self, coro):
"""Method to add a task to the queue"""
"""
Schedules a coroutine to run on the worker's event loop in a thread-safe manner.
Parameters
----------
coro : coroutine
A coroutine object to be executed in the worker thread.
Raises
------
RuntimeError
If the worker is not started.
ValueError
If the provided argument is not a coroutine object.
Notes
-----
- Thread-safe: Can be called from any thread.
- Tasks are processed in FIFO order.
- Failed tasks are logged but don't stop queue processing.
"""

if not asyncio.iscoroutine(coro):
logger.error(
Expand All @@ -200,7 +220,23 @@ def queue_task(self, coro):
loop.call_soon_threadsafe(lambda: self._nx_task_queue.put_nowait(coro))

def start(self, *args, **kwargs):
"""Start the worker thread."""
"""
Starts the worker thread and its event loop, initializing the worker's processing environment.
Parameters
----------
*args : Any
Positional arguments passed to the worker's `run()` method.
**kwargs : Any
Keyword arguments passed to the worker's `run()` method.
- run_coro: Optional coroutine to run instead of the default `run()` method.
Notes
-----
- Creates a new thread with its own event loop.
- Automatically starts task queue processing if no `run()` method is defined.
- Emits `started` signal when initialization is complete.
"""

run_coro = kwargs.pop(_WorkerConstants.RUN_CORO, None)

Expand Down Expand Up @@ -277,7 +313,16 @@ async def runner():
self._nx_thread.start()

def stop(self):
"""Stop the worker thread."""
"""
Gracefully stops the worker thread and its event loop.
Notes
-----
- Cancels any running tasks including the main `run()` coroutine.
- Waits for task queue to finish processing.
- Emits `stopped` signal before final cleanup.
- Thread is joined with a 2-second timeout.
"""

logger.debug("[WorkerClass][stop] Starting worker shutdown")

Expand All @@ -297,12 +342,28 @@ def stop(self):
self._nx_loop = None
self._nx_thread = None

def move_to_thread(self, target):
def _copy_affinity(self, target):
"""
Move target object to this worker's thread and loop.
target must be an object created by nx_with_signals or nx_with_worker,
and the worker must be started with start() method.
Copy this worker's thread affinity (thread, loop, and affinity object) to the target.
This is an internal method used by move_to_thread() and should not be called directly.
The method copies thread, event loop, and affinity object references from this worker
to the target object, effectively moving the target to this worker's thread context.
Parameters
----------
target : object
Target object that will receive this worker's thread affinity.
Must be decorated with @nx_with_signals or @nx_with_worker.
Raises
------
RuntimeError
If the worker thread is not started.
TypeError
If the target is not compatible (not decorated with @nx_with_signals or @nx_with_worker).
"""

with self._nx_lifecycle_lock:
if not self._nx_thread or not self._nx_loop:
raise RuntimeError(
Expand All @@ -312,8 +373,8 @@ def move_to_thread(self, target):

# Assume target is initialized with nx_with_signals
# Reset target's _nx_thread, _nx_loop, _nx_affinity
if not hasattr(target, "_nx_thread") or not hasattr(
target, "_nx_loop"
if not hasattr(target, NxSignalConstants.THREAD) or not hasattr(
target, NxSignalConstants.LOOP
):
raise TypeError(
"[WorkerClass][move_to_thread] Target is not compatible. "
Expand Down
6 changes: 6 additions & 0 deletions src/pynnex/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,13 @@ def __init__(self, *args, **kwargs):
# Call the original __init__
original_init(self, *args, **kwargs)

def move_to_thread(self, target_thread):
"""Change thread affinity of the instance to targetThread"""

target_thread._copy_affinity(self)

cls.__init__ = __init__
cls.move_to_thread = move_to_thread

return cls

Expand Down
12 changes: 6 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
import logging
import pytest
import pytest_asyncio
from pynnex import nx_with_signals, nx_signal, nx_slot
from pynnex import with_signals, signal, slot

# Only creating the logger without configuration
logger = logging.getLogger(__name__)


@nx_with_signals
@with_signals
class Sender:
"""Sender class"""

@nx_signal
@signal
def value_changed(self, value):
"""Signal for value changes"""

Expand All @@ -36,7 +36,7 @@ def emit_value(self, value):
self.value_changed.emit(value)


@nx_with_signals
@with_signals
class Receiver:
"""Receiver class"""

Expand All @@ -50,7 +50,7 @@ def __init__(self):
self.id = id(self)
logger.info("Created Receiver[%d]", self.id)

@nx_slot
@slot
async def on_value_changed(self, value: int):
"""Slot for value changes"""
logger.info(
Expand All @@ -67,7 +67,7 @@ async def on_value_changed(self, value: int):
self.received_count,
)

@nx_slot
@slot
def on_value_changed_sync(self, value: int):
"""Sync slot for value changes"""
logger.info(
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_thread_safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
import unittest
import threading
import gc
from pynnex.core import nx_with_signals, nx_signal
from pynnex import with_signals, signal


@nx_with_signals
@with_signals
class SafeSender:
"""
A class that sends events.
"""

@nx_signal
@signal
def event(self, value):
"""
Event signal.
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import time
import logging
import pytest
from pynnex.core import nx_with_signals, nx_signal, nx_slot
from pynnex import with_signals, signal, slot

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -77,23 +77,23 @@ async def call_slot():
async def test_connection_type_with_different_threads():
"""Test connection type is determined correctly for different thread scenarios"""

@nx_with_signals
@with_signals
class Sender:
"""Sender class"""

@nx_signal
@signal
def value_changed(self):
"""Signal emitted when value changes"""

@nx_with_signals
@with_signals
class Receiver:
"""Receiver class"""

def __init__(self):
super().__init__()
self.received = False

@nx_slot
@slot
def on_value_changed(self, value):
"""Slot called when value changes"""

Expand Down
Loading

0 comments on commit 485bf41

Please sign in to comment.