Skip to content

Commit

Permalink
generalizes send_job_and_wait_async
Browse files Browse the repository at this point in the history
  • Loading branch information
BWMac committed Jan 14, 2025
1 parent fd457a2 commit 2fba65e
Showing 1 changed file with 34 additions and 5 deletions.
39 changes: 34 additions & 5 deletions synapseclient/models/mixins/asynchronous_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@ class AsynchronousCommunicator:
"""Mixin to handle communication with the Synapse Asynchronous Job service."""

def to_synapse_request(self) -> None:
"""Converts the request to a request expected of the Synapse REST API."""
"""Converts the request to a request expected of the Synapse REST API.
This is a placeholder for any additional logic that needs to be run before the exchange with Synapse.
It must be overridden by subclasses if needed.
"""
raise NotImplementedError("to_synapse_request must be implemented.")

def fill_from_dict(self, synapse_response: Dict[str, str]) -> Self:
"""
Converts a response from the REST API into this dataclass.
This is a placeholder for any additional logic that needs to be run after the exchange with Synapse.
It must be overridden by subclasses if needed.
Arguments:
synapse_response: The response from the REST API.
Expand All @@ -36,7 +43,15 @@ def fill_from_dict(self, synapse_response: Dict[str, str]) -> Self:
async def _post_exchange_async(
self, synapse_client: Optional[Synapse] = None, **kwargs
) -> None:
"""Any additional logic to run after the exchange with Synapse."""
"""Any additional logic to run after the exchange with Synapse.
This is a placeholder for any additional logic that needs to be run after the exchange with Synapse.
It must be overridden by subclasses if needed.
Arguments:
synapse_client: The Synapse client to use for the request.
**kwargs: Additional arguments to pass to the request.
"""
pass

async def send_job_and_wait_async(
Expand All @@ -45,9 +60,22 @@ async def send_job_and_wait_async(
*,
synapse_client: Optional[Synapse] = None,
) -> Self:
"""Send the job to the Asynchronous Job service and wait for it to complete."""
"""Send the job to the Asynchronous Job service and wait for it to complete.
This is a placeholder for any additional logic that needs to be run after the exchange with Synapse.
It must be overridden by subclasses if needed.
Arguments:
post_exchange_args: Additional arguments to pass to the request.
synapse_client: The Synapse client to use for the request.
Returns:
An instance of this class.
"""
result = await send_job_and_wait_async(
request=self.to_synapse_request(), synapse_client=synapse_client
request=self.to_synapse_request(),
request_type=self.concrete_type,
synapse_client=synapse_client,
)
self.fill_from_dict(synapse_response=result)
await self._post_exchange_async(
Expand Down Expand Up @@ -197,6 +225,7 @@ def fill_from_dict(self, async_job_status: dict) -> "AsynchronousJobStatus":

async def send_job_and_wait_async(
request: Dict[str, Any],
request_type: str,
endpoint: str = None,
*,
synapse_client: Optional["Synapse"] = None,
Expand Down Expand Up @@ -225,7 +254,7 @@ async def send_job_and_wait_async(
"jobId": job_id,
**await get_job_async(
job_id=job_id,
request_type=AGENT_CHAT_REQUEST,
request_type=request_type,
synapse_client=synapse_client,
endpoint=endpoint,
),
Expand Down

0 comments on commit 2fba65e

Please sign in to comment.