diff --git a/app/pyproject.toml b/app/pyproject.toml index ad0f7d67d..702bdb80b 100644 --- a/app/pyproject.toml +++ b/app/pyproject.toml @@ -51,4 +51,3 @@ profile = "black" [tool.pytest.ini_options] addopts = "--junitxml=junit.xml -p no:cacheprovider --cov-report=xml --cov-report=term" junit_family = "xunit2" -asyncio_default_fixture_loop_scope = "function" diff --git a/app/routes/revocation.py b/app/routes/revocation.py index 73e73f9c1..b3398bb41 100644 --- a/app/routes/revocation.py +++ b/app/routes/revocation.py @@ -17,6 +17,7 @@ ) from app.services import revocation_registry from app.util.retry_method import coroutine_with_retry_until_value +from shared import PUBLISH_REVOCATIONS_TIMEOUT from shared.log_config import get_logger logger = get_logger(__name__) @@ -209,7 +210,7 @@ async def publish_revocations( field_name="state", expected_value="transaction_acked", logger=bound_logger, - max_attempts=30, + max_attempts=PUBLISH_REVOCATIONS_TIMEOUT, retry_delay=1, ) except asyncio.TimeoutError as e: diff --git a/app/services/definitions/credential_definitions.py b/app/services/definitions/credential_definitions.py index 6944bdaa7..9c7d0acf5 100644 --- a/app/services/definitions/credential_definitions.py +++ b/app/services/definitions/credential_definitions.py @@ -12,7 +12,7 @@ from app.util.assert_public_did import assert_public_did from app.util.definitions import credential_definition_from_acapy from app.util.transaction_acked import wait_for_transaction_ack -from shared import REGISTRY_SIZE +from shared import CRED_DEF_ACK_TIMEOUT, REGISTRY_SIZE from shared.log_config import get_logger logger = get_logger(__name__) @@ -58,7 +58,10 @@ async def create_credential_definition( if result.txn and result.txn.transaction_id: await wait_for_transaction_ack( - aries_controller=aries_controller, transaction_id=result.txn.transaction_id + aries_controller=aries_controller, + transaction_id=result.txn.transaction_id, + max_attempts=CRED_DEF_ACK_TIMEOUT, + retry_delay=1, ) if support_revocation: diff --git a/app/services/onboarding/issuer.py b/app/services/onboarding/issuer.py index 17f941b44..022ac8fb0 100644 --- a/app/services/onboarding/issuer.py +++ b/app/services/onboarding/issuer.py @@ -6,7 +6,6 @@ from app.services.onboarding.util.register_issuer_did import ( create_connection_with_endorser, register_issuer_did, - wait_issuer_did_transaction_endorsed, ) from app.util.did import qualified_did_sov from shared.log_config import get_logger @@ -115,17 +114,11 @@ async def onboard_issuer_no_public_did( ) issuer_did = await register_issuer_did( - endorser_controller=endorser_controller, issuer_controller=issuer_controller, issuer_label=issuer_label, + issuer_endorser_connection_id=issuer_connection_id, logger=bound_logger, ) - - await wait_issuer_did_transaction_endorsed( - issuer_controller=issuer_controller, - issuer_connection_id=issuer_connection_id, - logger=logger, - ) except Exception as e: bound_logger.exception("Could not create connection with endorser.") raise CloudApiException( diff --git a/app/services/onboarding/util/register_issuer_did.py b/app/services/onboarding/util/register_issuer_did.py index 9d1fd63ba..06c5aad5a 100644 --- a/app/services/onboarding/util/register_issuer_did.py +++ b/app/services/onboarding/util/register_issuer_did.py @@ -1,5 +1,4 @@ import asyncio -import os from logging import Logger from aries_cloudcontroller import ( @@ -17,9 +16,7 @@ set_endorser_info, set_endorser_role, ) -from shared import ACAPY_ENDORSER_ALIAS - -MAX_ATTEMPTS = int(os.getenv("WAIT_ISSUER_DID_MAX_ATTEMPTS", "15")) +from shared import ACAPY_ENDORSER_ALIAS, ISSUER_DID_ENDORSE_TIMEOUT async def create_connection_with_endorser( @@ -162,25 +159,32 @@ async def configure_endorsement( async def register_issuer_did( *, - endorser_controller: AcaPyClient, issuer_controller: AcaPyClient, issuer_label: str, + issuer_endorser_connection_id: str, logger: Logger, ) -> DID: + logger.debug("Accepting TAA on behalf of issuer") + await acapy_ledger.accept_taa_if_required(issuer_controller) + logger.info("Creating DID for issuer") issuer_did = await acapy_wallet.create_did(issuer_controller) await acapy_ledger.register_nym_on_ledger( - endorser_controller, + issuer_controller, did=issuer_did.did, verkey=issuer_did.verkey, alias=issuer_label, + create_transaction_for_endorser=True, + ) + + logger.debug("Waiting for issuer DID transaction to be endorsed") + await wait_transactions_endorsed( # Needs to be endorsed before setting public DID + issuer_controller=issuer_controller, + issuer_connection_id=issuer_endorser_connection_id, + logger=logger, ) - logger.debug("Accepting TAA on behalf of issuer") - await acapy_ledger.accept_taa_if_required(issuer_controller) - # NOTE: Still needs endorsement in 0.7.5 release - # Otherwise did has no associated services. logger.debug("Setting public DID for issuer") await acapy_wallet.set_public_did( issuer_controller, @@ -188,6 +192,13 @@ async def register_issuer_did( create_transaction_for_endorser=True, ) + logger.debug("Waiting for ATTRIB transaction to be endorsed") + await wait_transactions_endorsed( # Needs to be endorsed before continuing + issuer_controller=issuer_controller, + issuer_connection_id=issuer_endorser_connection_id, + logger=logger, + ) + logger.debug("Issuer DID registered.") return issuer_did @@ -239,12 +250,12 @@ async def wait_endorser_connection_completed( raise asyncio.TimeoutError -async def wait_issuer_did_transaction_endorsed( +async def wait_transactions_endorsed( *, issuer_controller: AcaPyClient, issuer_connection_id: str, logger: Logger, - max_attempts: int = MAX_ATTEMPTS, + max_attempts: int = ISSUER_DID_ENDORSE_TIMEOUT, retry_delay: float = 1.0, ) -> None: attempt = 0 @@ -255,19 +266,38 @@ async def wait_issuer_did_transaction_endorsed( await issuer_controller.endorse_transaction.get_records() ) - for transaction in transactions_response.results: - if ( - transaction.connection_id == issuer_connection_id - and transaction.state == "transaction_acked" - ): - return + transactions = [ + transaction + for transaction in transactions_response.results + if transaction.connection_id == issuer_connection_id + ] + + if not transactions: + logger.error( + "No transactions found for connection {}. Found {} transactions.", + issuer_connection_id, + transactions_response, + ) + raise CloudApiException("No transactions found for connection", 404) + + all_acked = all( + transaction.state == "transaction_acked" for transaction in transactions + ) + + if all_acked: + return + else: + logger.debug( + "Waiting for transaction acknowledgements. Current states: %s", + ", ".join(f"{t.transaction_id}: {t.state}" for t in transactions), + ) except Exception as e: # pylint: disable=W0718 if attempt + 1 == max_attempts: logger.error( "Maximum number of retries exceeded with exception. Failing." ) - raise asyncio.TimeoutError from e # Raise TimeoutError if max attempts exceeded + raise asyncio.TimeoutError("Timeout waiting for endorsement") from e logger.warning( ( @@ -284,4 +314,4 @@ async def wait_issuer_did_transaction_endorsed( attempt += 1 logger.error("Maximum number of retries exceeded while waiting for transaction ack") - raise asyncio.TimeoutError + raise asyncio.TimeoutError("Timeout waiting for endorsement") diff --git a/app/services/onboarding/util/set_endorser_metadata.py b/app/services/onboarding/util/set_endorser_metadata.py index ae92acc6c..2f0b5c001 100644 --- a/app/services/onboarding/util/set_endorser_metadata.py +++ b/app/services/onboarding/util/set_endorser_metadata.py @@ -13,7 +13,11 @@ # todo: Migrate to endorser service async def set_endorser_role( - *, endorser_controller: AcaPyClient, endorser_connection_id: str, logger: Logger + *, + endorser_controller: AcaPyClient, + endorser_connection_id: str, + logger: Logger, + delay: float = DEFAULT_DELAY, ): try: logger.debug("Setting roles for endorser on endorser-issuer connection.") @@ -24,7 +28,7 @@ async def set_endorser_role( transaction_my_job="TRANSACTION_ENDORSER", ) logger.debug("Successfully set endorser role.") - await asyncio.sleep(DEFAULT_DELAY) # Allow ACA-Py records to update + await asyncio.sleep(delay) # Allow ACA-Py records to update except CloudApiException as e: logger.error("Failed to set endorser role: {}.", e) raise CloudApiException( @@ -35,7 +39,11 @@ async def set_endorser_role( async def set_author_role( - *, issuer_controller: AcaPyClient, issuer_connection_id: str, logger: Logger + *, + issuer_controller: AcaPyClient, + issuer_connection_id: str, + logger: Logger, + delay: float = DEFAULT_DELAY, ): try: logger.debug("Setting roles for author on issuer-endorser connection") @@ -46,7 +54,7 @@ async def set_author_role( transaction_my_job="TRANSACTION_AUTHOR", ) logger.debug("Successfully set author role.") - await asyncio.sleep(DEFAULT_DELAY) # Allow ACA-Py records to update + await asyncio.sleep(delay) # Allow ACA-Py records to update except CloudApiException as e: logger.error("Failed to set author role: {}.", e) raise CloudApiException( @@ -62,6 +70,7 @@ async def set_endorser_info( issuer_connection_id: str, endorser_did: str, logger: Logger, + delay: float = DEFAULT_DELAY, ): try: logger.debug("Setting endorser info on issuer-endorser connection") @@ -72,7 +81,7 @@ async def set_endorser_info( endorser_did=endorser_did, ) logger.debug("Successfully set endorser info.") - await asyncio.sleep(DEFAULT_DELAY) # Allow ACA-Py records to update + await asyncio.sleep(delay) # Allow ACA-Py records to update except CloudApiException as e: logger.error("Failed to set endorser info: {}.", e) raise CloudApiException( diff --git a/app/tests/e2e/verifier/test_proof_revoked_credential.py b/app/tests/e2e/verifier/test_proof_revoked_credential.py index b672e1cc2..a40cc677c 100644 --- a/app/tests/e2e/verifier/test_proof_revoked_credential.py +++ b/app/tests/e2e/verifier/test_proof_revoked_credential.py @@ -1,3 +1,4 @@ +import asyncio import time from typing import List @@ -34,7 +35,7 @@ async def test_proof_revoked_credential( alice_member_client: RichAsyncClient, acme_and_alice_connection: AcmeAliceConnect, ): - time.sleep(10) # moment for revocation registry to update + await asyncio.sleep(14) # moment for revocation registry to update # todo: remove sleep when issue resolved: https://github.com/openwallet-foundation/acapy/issues/3018 # Do proof request @@ -124,7 +125,7 @@ async def test_regression_proof_revoked_credential( alice_member_client: RichAsyncClient, acme_and_alice_connection: AcmeAliceConnect, ): - time.sleep(10) # moment for revocation registry to update + await asyncio.sleep(14) # moment for revocation registry to update # todo: remove sleep when issue resolved: https://github.com/openwallet-foundation/acapy/issues/3018 referent = get_or_issue_regression_cred_revoked.referent diff --git a/app/tests/services/onboarding/test_onboarding.py b/app/tests/services/onboarding/test_onboarding.py index e16469148..67253a215 100644 --- a/app/tests/services/onboarding/test_onboarding.py +++ b/app/tests/services/onboarding/test_onboarding.py @@ -1,3 +1,5 @@ +from unittest.mock import AsyncMock, patch + import pytest from aries_cloudcontroller import ( DID, @@ -88,22 +90,19 @@ async def test_onboard_issuer_no_public_did( endorser_controller = get_mock_agent_controller() + # Mock the necessary functions and methods when(acapy_wallet).get_public_did(controller=mock_agent_controller).thenRaise( CloudApiException(detail="Error") ) when(acapy_wallet).get_public_did(controller=endorser_controller).thenReturn( to_async(did_object) ) - when(endorser_controller.out_of_band).create_invitation(...).thenReturn( to_async(InvitationRecord(invitation=InvitationMessage())) ) - - # Mock responses when(mock_agent_controller.out_of_band).receive_invitation(...).thenReturn( to_async(ConnRecord(connection_id=issuer_connection_id)) ) - when(endorser_controller.connection).get_connections(...).thenReturn( to_async( ConnectionList( @@ -115,20 +114,17 @@ async def test_onboard_issuer_no_public_did( ) ) ) - when(mock_agent_controller.endorse_transaction).set_endorser_role(...).thenReturn( to_async() ) - when(endorser_controller.endorse_transaction).set_endorser_role(...).thenReturn( to_async() ) when(mock_agent_controller.endorse_transaction).set_endorser_info(...).thenAnswer( lambda conn_id, endorser_did: to_async() ) - - when(mock_agent_controller.endorse_transaction).get_records(...).thenReturn( - to_async( + when(mock_agent_controller.endorse_transaction).get_records(...).thenAnswer( + lambda: to_async( # lambda to avoid "cannot reuse already awaited coroutine" TransactionList( results=[ TransactionRecord( @@ -138,7 +134,6 @@ async def test_onboard_issuer_no_public_did( ) ) ) - when(acapy_wallet).create_did(mock_agent_controller).thenReturn( to_async(did_object) ) @@ -156,27 +151,32 @@ async def test_onboard_issuer_no_public_did( ) ) - onboard_result = await issuer.onboard_issuer( - issuer_label="issuer_name", - endorser_controller=endorser_controller, - issuer_controller=mock_agent_controller, - issuer_wallet_id="issuer_wallet_id", - ) + # Patch asyncio.sleep to return immediately + with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + onboard_result = await issuer.onboard_issuer( + issuer_label="issuer_name", + endorser_controller=endorser_controller, + issuer_controller=mock_agent_controller, + issuer_wallet_id="issuer_wallet_id", + ) + # Assertions assert_that(onboard_result).has_did("did:sov:WgWxqztrNooG92RXvxSTWv") + verify(acapy_ledger).accept_taa_if_required(mock_agent_controller) verify(acapy_wallet).create_did(mock_agent_controller) verify(acapy_ledger).register_nym_on_ledger( - endorser_controller, + mock_agent_controller, did="WgWxqztrNooG92RXvxSTWv", verkey="WgWxqztrNooG92RXvxSTWvWgWxqztrNooG92RXvxSTWv", alias="issuer_name", + create_transaction_for_endorser=True, ) - verify(acapy_ledger).accept_taa_if_required(mock_agent_controller) verify(acapy_wallet).set_public_did( mock_agent_controller, did="WgWxqztrNooG92RXvxSTWv", create_transaction_for_endorser=True, ) + mock_sleep.assert_awaited() # Ensure that sleep was called and patched @pytest.mark.anyio diff --git a/app/tests/services/onboarding/test_register_issuer_did.py b/app/tests/services/onboarding/test_register_issuer_did.py index 8bce00a04..b3d1c851e 100644 --- a/app/tests/services/onboarding/test_register_issuer_did.py +++ b/app/tests/services/onboarding/test_register_issuer_did.py @@ -6,7 +6,7 @@ from app.services.onboarding.util.register_issuer_did import ( wait_endorser_connection_completed, - wait_issuer_did_transaction_endorsed, + wait_transactions_endorsed, ) @@ -127,7 +127,7 @@ async def test_wait_issuer_did_transaction_endorsed_happy_path(): ) # Invocation - await wait_issuer_did_transaction_endorsed( + await wait_transactions_endorsed( issuer_controller=issuer_controller, issuer_connection_id="test_id", logger=logger, @@ -153,7 +153,7 @@ async def test_wait_issuer_did_transaction_endorsed_retry_logic(): ] ) - await wait_issuer_did_transaction_endorsed( + await wait_transactions_endorsed( issuer_controller=issuer_controller, issuer_connection_id="test_id", logger=logger, @@ -175,7 +175,7 @@ async def test_wait_issuer_did_transaction_endorsed_max_retries_with_exception() ) with pytest.raises(asyncio.TimeoutError): - await wait_issuer_did_transaction_endorsed( + await wait_transactions_endorsed( issuer_controller=issuer_controller, issuer_connection_id="test_id", logger=logger, @@ -202,7 +202,7 @@ async def test_wait_issuer_did_transaction_endorsed_max_retries_no_ack(): ) with pytest.raises(asyncio.TimeoutError): - await wait_issuer_did_transaction_endorsed( + await wait_transactions_endorsed( issuer_controller=issuer_controller, issuer_connection_id="test_id", logger=logger, diff --git a/app/tests/util/connections.py b/app/tests/util/connections.py index d02bca464..ea8996558 100644 --- a/app/tests/util/connections.py +++ b/app/tests/util/connections.py @@ -2,7 +2,6 @@ from typing import Optional from app.models.tenants import CreateTenantResponse -from app.routes.connections import CreateInvitation from app.routes.connections import router as conn_router from app.routes.oob import router as oob_router from app.routes.wallet.dids import router as did_router @@ -13,6 +12,7 @@ assert_fail_on_recreating_fixtures, ) from app.tests.util.webhooks import assert_both_webhooks_received, check_webhook_state +from app.util.did import qualified_did_sov from app.util.string import base64_to_json from shared import RichAsyncClient from shared.models.connection_record import Connection @@ -314,7 +314,7 @@ async def create_did_exchange( # Get Bob's public DID. Bob is the issuer in this case i.e. should have public DID did_response = (await bob_member_client.get(f"{DID_BASE_PATH}/public")).json() - bob_public_did = did_response["did"] + bob_public_did = qualified_did_sov(did_response["did"]) # Alice create invitation alice_connection = ( diff --git a/app/tests/util/webhooks.py b/app/tests/util/webhooks.py index 679ee184b..3f3878835 100644 --- a/app/tests/util/webhooks.py +++ b/app/tests/util/webhooks.py @@ -28,7 +28,7 @@ async def check_webhook_state( topic: CloudApiTopics, state: str, filter_map: Optional[Dict[str, str]] = None, - max_duration: int = 30, + max_duration: int = 45, max_tries: int = 2, delay: float = 0.5, ) -> Dict[str, Any]: diff --git a/app/util/transaction_acked.py b/app/util/transaction_acked.py index 823fbe69d..c5721f31c 100644 --- a/app/util/transaction_acked.py +++ b/app/util/transaction_acked.py @@ -10,7 +10,10 @@ async def wait_for_transaction_ack( - aries_controller: AcaPyClient, transaction_id: str + aries_controller: AcaPyClient, + transaction_id: str, + max_attempts: int = 15, + retry_delay: int = 1, ) -> None: """ Wait for the transaction to be acknowledged by the endorser. @@ -25,8 +28,8 @@ async def wait_for_transaction_ack( field_name="state", expected_value="transaction_acked", logger=bound_logger, - max_attempts=10, - retry_delay=2, + max_attempts=max_attempts, + retry_delay=retry_delay, ) except asyncio.TimeoutError as e: raise CloudApiException( diff --git a/endorser/pyproject.toml b/endorser/pyproject.toml index 0be8750f3..1607ab15b 100644 --- a/endorser/pyproject.toml +++ b/endorser/pyproject.toml @@ -43,4 +43,3 @@ profile = "black" [tool.pytest.ini_options] addopts = "--junitxml=junit.xml -p no:cacheprovider --cov-report=xml --cov-report=term" junit_family = "xunit2" -asyncio_default_fixture_loop_scope = "function" diff --git a/endorser/services/endorsement_processor.py b/endorser/services/endorsement_processor.py index 3c0feb98c..c117d2162 100644 --- a/endorser/services/endorsement_processor.py +++ b/endorser/services/endorsement_processor.py @@ -86,12 +86,12 @@ async def _process_endorsement_requests(self) -> NoReturn: subscription = await self._subscribe() while True: try: - messages = await subscription.fetch(batch=1, timeout=5, heartbeat=1) + messages = await subscription.fetch(batch=1, timeout=0.5, heartbeat=0.2) for message in messages: message_subject = message.subject message_data = message.data.decode() logger.trace( - "received message: {} with subject {}", + "Received message: {}, with subject {}", message_data, message_subject, ) @@ -105,10 +105,10 @@ async def _process_endorsement_requests(self) -> NoReturn: finally: await message.ack() except FetchTimeoutError: - logger.trace("FetchTimeoutError continuing...") + logger.trace("Encountered FetchTimeoutError. Continuing ...") await asyncio.sleep(0.1) except TimeoutError as e: - logger.warning("Timeout error fetching messages re-subscribing: {}", e) + logger.warning("Timeout fetching messages: {}. Re-subscribing.", e) await subscription.unsubscribe() subscription = await self._subscribe() except Exception: # pylint: disable=W0718 @@ -133,23 +133,25 @@ async def _process_endorsement_event(self, event_json: str) -> None: return endorsement = Endorsement(**event.payload) + transaction_id = endorsement.transaction_id async with AcaPyClient( base_url=GOVERNANCE_AGENT_URL, api_key=GOVERNANCE_AGENT_API_KEY ) as client: # Check if endorsement request is indeed applicable - if not await should_accept_endorsement(client, endorsement): - logger.info( # check already logged the reason as warning + transaction = await should_accept_endorsement(client, transaction_id) + if not transaction: + logger.info( # The check has already logged the reason as warning "Endorsement request with transaction id `{}` is not applicable for endorsement.", - endorsement.transaction_id, + transaction_id, ) return logger.info( - "Endorsement request with transaction id `{}` is applicable for endorsement, accepting request.", - endorsement.transaction_id, + "Endorsement request is applicable for endorsement, accepting transaction: {}", + transaction.model_dump(exclude={"messages_attach"}), ) - await accept_endorsement(client, endorsement) + await accept_endorsement(client, transaction_id) async def _handle_unprocessable_endorse_event( self, key: str, event_json: str, error: Exception diff --git a/endorser/tests/test_endorser_processor.py b/endorser/tests/test_endorser_processor.py index d56dbd9f3..b0631b148 100644 --- a/endorser/tests/test_endorser_processor.py +++ b/endorser/tests/test_endorser_processor.py @@ -230,6 +230,29 @@ async def test_process_endorsement_requests_loop_exit( assert mock_subscription.fetch.call_count == 3 +@pytest.mark.anyio +async def test_process_endorsement_requests_unexpected_error( + endorsement_processor_mock, mock_nats_client +): + # Setup + mock_subscription = AsyncMock() + mock_nats_client.pull_subscribe.return_value = mock_subscription + + # Simulate an unexpected exception during message fetching + mock_subscription.fetch.side_effect = [ + Exception("Unexpected error"), + asyncio.CancelledError, + ] + + # Test + with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + with pytest.raises(asyncio.CancelledError): + await endorsement_processor_mock._process_endorsement_requests() + + # Assertions + mock_sleep.assert_awaited_once_with(2) + + @pytest.mark.anyio async def test_process_endorsement_event_governance(endorsement_processor_mock): governance = GOVERNANCE_LABEL @@ -247,7 +270,7 @@ async def test_process_endorsement_event_governance(endorsement_processor_mock): ) as mock_should_accept_endorsement, patch( "endorser.services.endorsement_processor.accept_endorsement" ) as mock_accept_endorsement: - mock_should_accept_endorsement.return_value = True + mock_should_accept_endorsement.return_value = MagicMock() mock_accept_endorsement.return_value = AsyncMock() await endorsement_processor_mock._process_endorsement_event(event_json) @@ -299,7 +322,6 @@ async def test_process_endorsement_event_not_governance(endorsement_processor_mo ) as mock_should_accept_endorsement, patch( "endorser.services.endorsement_processor.accept_endorsement" ) as mock_accept_endorsement: - mock_should_accept_endorsement.return_value = True mock_accept_endorsement.return_value = AsyncMock() await endorsement_processor_mock._process_endorsement_event(event_json) @@ -336,3 +358,62 @@ async def test_endorsement_processor_subscribe_error( with pytest.raises(exception): await processor._subscribe() + + +@pytest.mark.anyio +async def test_check_jetstream_success(endorsement_processor_mock): + # Setup + mock_account_info = AsyncMock() + mock_account_info.streams = 5 + mock_account_info.consumers = 10 + endorsement_processor_mock.jetstream.account_info.return_value = mock_account_info + + # Test + result = await endorsement_processor_mock.check_jetstream() + + # Assertions + assert result == { + "is_working": True, + "streams_count": 5, + "consumers_count": 10, + } + endorsement_processor_mock.jetstream.account_info.assert_awaited_once() + + +@pytest.mark.anyio +async def test_check_jetstream_no_streams(endorsement_processor_mock): + # Setup + mock_account_info = AsyncMock() + mock_account_info.streams = 0 + mock_account_info.consumers = 0 + endorsement_processor_mock.jetstream.account_info.return_value = mock_account_info + + # Test + result = await endorsement_processor_mock.check_jetstream() + + # Assertions + assert result == { + "is_working": False, + "streams_count": 0, + "consumers_count": 0, + } + endorsement_processor_mock.jetstream.account_info.assert_awaited_once() + + +@pytest.mark.anyio +async def test_check_jetstream_exception(endorsement_processor_mock): + # Setup + endorsement_processor_mock.jetstream.account_info.side_effect = Exception( + "JetStream error" + ) + + # Test + with patch("endorser.services.endorsement_processor.logger") as mock_logger: + result = await endorsement_processor_mock.check_jetstream() + + # Assertions + assert result == {"is_working": False} + endorsement_processor_mock.jetstream.account_info.assert_awaited_once() + mock_logger.exception.assert_called_once_with( + "Caught exception while checking jetstream status" + ) diff --git a/endorser/tests/test_main.py b/endorser/tests/test_main.py index 003d16325..174683bf7 100644 --- a/endorser/tests/test_main.py +++ b/endorser/tests/test_main.py @@ -3,6 +3,7 @@ import pytest from fastapi import FastAPI, HTTPException +from fastapi.testclient import TestClient from endorser.main import app, app_lifespan, health_check, health_ready from endorser.services.endorsement_processor import EndorsementProcessor @@ -152,3 +153,10 @@ async def test_health_ready_with_timeout(): "status": "not ready", "error": "JetStream health check timed out", } + + +def test_scalar_html(): + client = TestClient(app) + # Simulate a request to the /docs endpoint + response = client.get("/docs") + assert response.status_code == 200 diff --git a/endorser/tests/test_util_endorsement.py b/endorser/tests/test_util_endorsement.py index 4ad51d378..76ec6c8e6 100644 --- a/endorser/tests/test_util_endorsement.py +++ b/endorser/tests/test_util_endorsement.py @@ -1,10 +1,14 @@ import asyncio -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock, patch import pytest from fastapi import HTTPException -from endorser.util.endorsement import accept_endorsement, should_accept_endorsement +from endorser.util.endorsement import ( + accept_endorsement, + retry_is_valid_issuer, + should_accept_endorsement, +) from shared.models.endorsement import Endorsement valid_endorsement = Endorsement( @@ -37,7 +41,7 @@ async def test_should_accept_endorsement_success_claim_def(mock_acapy_client, mo result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) - assert result is True + assert result is transaction_mock @pytest.mark.anyio @@ -59,7 +63,7 @@ async def test_should_accept_endorsement_fail_not_claim_def(mock_acapy_client, m result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) - assert result is False + assert result is None @pytest.mark.anyio @@ -79,7 +83,7 @@ async def test_should_accept_endorsement_fail_no_attach(mock_acapy_client, mocke result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) - assert result is False + assert result is None @pytest.mark.anyio @@ -99,7 +103,7 @@ async def test_should_accept_endorsement_fail_no_operation(mock_acapy_client, mo result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) - assert result is False + assert result is None @pytest.mark.anyio @@ -119,7 +123,7 @@ async def test_should_accept_endorsement_fail_no_type(mock_acapy_client, mocker) result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) - assert result is False + assert result is None @pytest.mark.anyio @@ -141,7 +145,7 @@ async def test_should_accept_endorsement_is_cred_def(mock_acapy_client, mocker): result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) - assert result is True + assert result is transaction_mock @pytest.mark.anyio @@ -163,7 +167,7 @@ async def test_should_accept_endorsement_is_attrib(mock_acapy_client, mocker): result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) - assert result is True + assert result is transaction_mock @pytest.mark.anyio @@ -185,7 +189,7 @@ async def test_should_accept_endorsement_fail_not_cred_def(mock_acapy_client, mo result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) - assert result is False + assert result is None @pytest.mark.anyio @@ -209,7 +213,7 @@ async def test_should_accept_endorsement_fail_not_correct_attach( result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) - assert result is False + assert result is None @pytest.mark.anyio @@ -237,7 +241,7 @@ async def test_should_accept_endorsement_fail_not_valid_issuer( result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) - assert result is False + assert result is None @pytest.mark.anyio @@ -365,13 +369,61 @@ async def mock_retry_is_valid_issuer( # Assertions assert ( - result is False + result is None ), "The endorsement should not be accepted due to is_valid_issuer failing repeatedly" assert ( mock_is_valid_issuer.call_count == 5 ), "is_valid_issuer should have been called exactly `max_retries` times." +@pytest.mark.anyio +async def test_retry_is_valid_issuer_success_after_retries(mocker): + # Setup + did = "did:sov:test-did" + schema_id = "test-schema-id" + transaction_id = "test-transaction" + + # Mock is_valid_issuer to raise HTTPException first, then return True + mock_is_valid_issuer = mocker.patch( + "endorser.util.endorsement.is_valid_issuer", + side_effect=[HTTPException(status_code=500), True], + ) + + # Test + result = await retry_is_valid_issuer( + did, schema_id, transaction_id, max_retries=2, retry_delay=0.01 + ) + + # Assertions + assert result is True + assert mock_is_valid_issuer.call_count == 2 + + +@pytest.mark.anyio +async def test_retry_is_valid_issuer_fails_after_max_retries(mocker): + # Setup + did = "did:sov:test-did" + schema_id = "test-schema-id" + transaction_id = "test-transaction" + + # Mock is_valid_issuer to always raise HTTPException + mock_is_valid_issuer = mocker.patch( + "endorser.util.endorsement.is_valid_issuer", + side_effect=HTTPException(status_code=500), + ) + + # Test + with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + result = await retry_is_valid_issuer( + did, schema_id, transaction_id, max_retries=3, retry_delay=0.01 + ) + + # Assertions + assert result is False + assert mock_is_valid_issuer.call_count == 3 + assert mock_sleep.await_count == 2 # Retries should happen max_retries - 1 times + + @pytest.mark.anyio async def test_should_accept_endorsement_fail_bad_state(mock_acapy_client): invalid_endorsement = Endorsement( @@ -380,13 +432,71 @@ async def test_should_accept_endorsement_fail_bad_state(mock_acapy_client): result = await should_accept_endorsement(mock_acapy_client, invalid_endorsement) - assert result is False + assert result is None @pytest.mark.anyio async def test_accept_endorsement(mock_acapy_client): - await accept_endorsement(mock_acapy_client, valid_endorsement) + await accept_endorsement(mock_acapy_client, valid_endorsement.transaction_id) mock_acapy_client.endorse_transaction.endorse_transaction.assert_awaited_once_with( tran_id=valid_endorsement.transaction_id ) + + +@pytest.mark.anyio +async def test_should_accept_endorsement_signature_request_applicable( + mock_acapy_client, mocker +): + # Mock transaction response with signature_request + transaction_mock = MagicMock() + transaction_mock.state = "request_received" + transaction_mock.signature_request = [ + { + "context": "did:sov", + "method": "add-signature", + "signature_type": "default", + "signer_goal_code": "aries.transaction.ledger.write_did", + "author_goal_code": "aries.transaction.register_public_did", + } + ] + mock_acapy_client.endorse_transaction.get_transaction.return_value = ( + transaction_mock + ) + + # Assume the transaction has no operation type + mocker.patch( + "endorser.util.endorsement.get_endorsement_request_attachment", + return_value={"operation": {}}, + ) + + result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) + + assert ( + result is transaction_mock + ), "The endorsement should be accepted based on signature_request." + + +@pytest.mark.anyio +async def test_should_accept_endorsement_signature_request_not_applicable( + mock_acapy_client, mocker +): + # Mock transaction response with signature_request + transaction_mock = MagicMock() + transaction_mock.state = "request_received" + transaction_mock.signature_request = [{"author_goal_code": "wrong"}] + mock_acapy_client.endorse_transaction.get_transaction.return_value = ( + transaction_mock + ) + + # Assume the transaction has no operation type + mocker.patch( + "endorser.util.endorsement.get_endorsement_request_attachment", + return_value={"operation": {}}, + ) + + result = await should_accept_endorsement(mock_acapy_client, valid_endorsement) + + assert ( + result is None + ), "The endorsement should not be accepted based on signature_request." diff --git a/endorser/util/endorsement.py b/endorser/util/endorsement.py index 04fd7cf3c..6a32ea7d8 100644 --- a/endorser/util/endorsement.py +++ b/endorser/util/endorsement.py @@ -1,7 +1,7 @@ import asyncio from typing import Any, Dict, Optional -from aries_cloudcontroller import AcaPyClient +from aries_cloudcontroller import AcaPyClient, TransactionRecord from fastapi import HTTPException from endorser.util.transaction_record import ( @@ -13,14 +13,14 @@ ) from endorser.util.trust_registry import is_valid_issuer from shared.log_config import get_logger -from shared.models.endorsement import Endorsement, applicable_transaction_state +from shared.models.endorsement import applicable_transaction_state logger = get_logger(__name__) async def should_accept_endorsement( - client: AcaPyClient, endorsement: Endorsement -) -> bool: + client: AcaPyClient, transaction_id: str +) -> Optional[TransactionRecord]: """Check whether a transaction endorsement request should be endorsed. Whether the request should be accepted is based on the follow criteria: @@ -29,15 +29,14 @@ async def should_accept_endorsement( 3. The schema_id is registered in the trust registry. Args: - endorsement (Endorsement): The endorsement event model + transaction_id (str): The transaction id for this endorsement request Returns: - bool: Whether the endorsement request should be accepted + Optional[TransactionRecord]: The transaction record if it should be endorsed, None otherwise """ - bound_logger = logger.bind(body=endorsement) + bound_logger = logger.bind(body={"transaction_id": transaction_id}) bound_logger.debug("Validating if endorsement transaction should be endorsed") - transaction_id = endorsement.transaction_id bound_logger.debug("Fetching transaction with id: `{}`", transaction_id) transaction = await client.endorse_transaction.get_transaction( tran_id=transaction_id @@ -51,23 +50,28 @@ async def should_accept_endorsement( applicable_transaction_state, transaction.state, ) - return False + return None attachment = get_endorsement_request_attachment(transaction) if not attachment: bound_logger.warning("Could not extract attachment from transaction.") - return False + return None operation_type = await extract_operation_type(attachment) if not operation_type: - return False + # The request to register a DID on ledger has no operation type, but has signature request + if await is_signature_request_applicable(transaction): + return transaction + return None - return await check_applicable_operation_type( - client, endorsement, operation_type, attachment - ) + if await check_applicable_operation_type( + client, transaction_id, operation_type, attachment + ): + return transaction + return None -async def extract_operation_type(attachment) -> Optional[str]: +async def extract_operation_type(attachment: Dict[str, Any]) -> Optional[str]: operation = attachment.get("operation") if not operation: logger.debug("Key `operation` not in attachment: `{}`.", attachment) @@ -81,13 +85,38 @@ async def extract_operation_type(attachment) -> Optional[str]: return operation_type +async def is_signature_request_applicable(transaction: TransactionRecord) -> bool: + """ + Check if the signature_request in the transaction has the required author_goal_code. + + Args: + transaction: The transaction object to check. + + Returns: + bool: True if the signature_request is applicable, False otherwise. + """ + signature_request = transaction.signature_request + if not signature_request or not isinstance(signature_request, list): + logger.debug("No valid signature_request found in transaction.") + return False + + first_request = signature_request[0] + author_goal_code = first_request.get("author_goal_code") + if author_goal_code == "aries.transaction.register_public_did": + logger.debug("Transaction is applicable based on signature_request.") + return True + + logger.debug("Transaction is not applicable based on signature_request.") + return False + + async def check_applicable_operation_type( client: AcaPyClient, - endorsement: Endorsement, + transaction_id: str, operation_type: str, attachment: Dict[str, Any], ) -> bool: - bound_logger = logger.bind(body=endorsement) + bound_logger = logger.bind(body={"transaction_id": transaction_id}) if is_revocation_def_or_entry(operation_type): bound_logger.debug("Endorsement request is for revocation definition or entry.") @@ -106,17 +135,17 @@ async def check_applicable_operation_type( client, attachment ) - return await retry_is_valid_issuer(did, schema_id, endorsement) + return await retry_is_valid_issuer(did, schema_id, transaction_id) async def retry_is_valid_issuer( did: str, schema_id: str, - endorsement: Endorsement, + transaction_id: str, max_retries: int = 5, retry_delay: float = 1.0, ) -> bool: - bound_logger = logger.bind(body=endorsement) + bound_logger = logger.bind(body={"transaction_id": transaction_id}) for attempt in range(max_retries): try: valid_issuer = await is_valid_issuer(did, schema_id) @@ -125,7 +154,7 @@ async def retry_is_valid_issuer( bound_logger.warning( "Endorsement request with transaction id `{}` is not for did " "and schema registered in the trust registry.", - endorsement.transaction_id, + transaction_id, ) return False @@ -146,8 +175,6 @@ async def retry_is_valid_issuer( return False -async def accept_endorsement(client: AcaPyClient, endorsement: Endorsement) -> None: - logger.debug("Endorsing transaction with id: `{}`", endorsement.transaction_id) - await client.endorse_transaction.endorse_transaction( - tran_id=endorsement.transaction_id - ) +async def accept_endorsement(client: AcaPyClient, transaction_id: str) -> None: + logger.debug("Endorsing transaction with id: `{}`", transaction_id) + await client.endorse_transaction.endorse_transaction(tran_id=transaction_id) diff --git a/pyproject.toml b/pyproject.toml index 3237176a7..4d07cd3ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,4 +19,3 @@ profile = "black" [tool.pytest.ini_options] addopts = "--junitxml=junit.xml -p no:cacheprovider --cov-report=xml --cov-report=term" junit_family = "xunit2" -asyncio_default_fixture_loop_scope = "function" diff --git a/shared/constants.py b/shared/constants.py index 6977ab1e4..d78f1f49d 100644 --- a/shared/constants.py +++ b/shared/constants.py @@ -59,11 +59,13 @@ # client.py TEST_CLIENT_TIMEOUT = int(os.getenv("TEST_CLIENT_TIMEOUT", "300")) -MAX_NUM_RETRIES = int(os.getenv("MAX_NUM_RETRIES", "3")) -# timeout for waiting for registries to be created +# timeout for endorsement events and registry creation +CRED_DEF_ACK_TIMEOUT = int(os.getenv("CRED_DEF_ACK_TIMEOUT", "60")) +PUBLISH_REVOCATIONS_TIMEOUT = int(os.getenv("PUBLISH_REVOCATIONS_TIMEOUT", "60")) REGISTRY_CREATION_TIMEOUT = int(os.getenv("REGISTRY_CREATION_TIMEOUT", "60")) REGISTRY_SIZE = int(os.getenv("REGISTRY_SIZE", "32767")) +ISSUER_DID_ENDORSE_TIMEOUT = int(os.getenv("ISSUER_DID_ENDORSE_TIMEOUT", "60")) # NATS NATS_SERVER = os.getenv("NATS_SERVER", "nats://nats:4222") diff --git a/trustregistry/pyproject.toml b/trustregistry/pyproject.toml index 69faa4fb4..da1e84919 100644 --- a/trustregistry/pyproject.toml +++ b/trustregistry/pyproject.toml @@ -43,4 +43,3 @@ profile = "black" [tool.pytest.ini_options] addopts = "--junitxml=junit.xml -p no:cacheprovider --cov-report=xml --cov-report=term" junit_family = "xunit2" -asyncio_default_fixture_loop_scope = "function" diff --git a/waypoint/pyproject.toml b/waypoint/pyproject.toml index 6b55488d9..4967543ba 100644 --- a/waypoint/pyproject.toml +++ b/waypoint/pyproject.toml @@ -51,4 +51,3 @@ profile = "black" [tool.pytest.ini_options] addopts = "--junitxml=junit.xml -p no:cacheprovider --cov-report=xml --cov-report=term" junit_family = "xunit2" -asyncio_default_fixture_loop_scope = "function" diff --git a/waypoint/services/nats_service.py b/waypoint/services/nats_service.py index 53e041486..26736dd83 100644 --- a/waypoint/services/nats_service.py +++ b/waypoint/services/nats_service.py @@ -41,13 +41,18 @@ async def _subscribe( state: str, start_time: str = None, ) -> JetStreamContext.PullSubscription: - - logger.debug( - "Subscribing to JetStream for wallet_id: {}, group_id: {}", - wallet_id, - group_id, + bound_logger = logger.bind( + body={ + "wallet_id": wallet_id, + "group_id": group_id, + "topic": topic, + "state": state, + "start_time": start_time, + } ) + bound_logger.debug("Subscribing to JetStream") + group_id = group_id or "*" subscribe_kwargs = { "subject": f"{NATS_STATE_SUBJECT}.{group_id}.{wallet_id}.{topic}.{state}", @@ -59,54 +64,46 @@ async def _subscribe( opt_start_time=start_time, ) + def _retry_log(retry_state: RetryCallState): + """Custom logging for retry attempts.""" + if retry_state.outcome.failed: + exception = retry_state.outcome.exception() + bound_logger.warning( + "Retry attempt {} failed due to {}: {}", + retry_state.attempt_number, + type(exception).__name__, + exception, + ) + # This is a custom retry decorator that will retry on TimeoutError # and wait exponentially up to a max of 16 seconds between retries indefinitely @retry( retry=retry_if_exception_type(TimeoutError), wait=wait_exponential(multiplier=1, max=16), - after=self._retry_log, + after=_retry_log, stop=stop_never, ) async def pull_subscribe(config, **kwargs): try: - logger.trace( - "Attempting to subscribe to JetStream for wallet_id: {}, group_id: {}", - wallet_id, - group_id, - ) + bound_logger.trace("Attempting to subscribe to JetStream") subscription = await self.js_context.pull_subscribe( config=config, **kwargs ) - logger.debug( - "Successfully subscribed to JetStream for wallet_id: {}, group_id: {}", - wallet_id, - group_id, - ) + bound_logger.debug("Successfully subscribed to JetStream") return subscription except BadSubscriptionError as e: - logger.error("BadSubscriptionError subscribing to NATS: {}", e) + bound_logger.error("BadSubscriptionError subscribing to NATS: {}", e) raise except Error as e: - logger.error("Error subscribing to NATS: {}", e) + bound_logger.error("Error subscribing to NATS: {}", e) raise try: return await pull_subscribe(config, **subscribe_kwargs) except Exception: - logger.exception("An exception occurred subscribing to NATS") + bound_logger.exception("An exception occurred subscribing to NATS") raise - def _retry_log(self, retry_state: RetryCallState): - """Custom logging for retry attempts.""" - if retry_state.outcome.failed: - exception = retry_state.outcome.exception() - logger.warning( - "Retry attempt {} failed due to {}: {}", - retry_state.attempt_number, - type(exception).__name__, - exception, - ) - @asynccontextmanager async def process_events( self, @@ -119,13 +116,17 @@ async def process_events( duration: int = 10, look_back: int = 60, ): - logger.debug( - "Processing events for group {} and wallet {} on topic {} with state {}", - group_id, - wallet_id, - topic, - state, + bound_logger = logger.bind( + body={ + "wallet_id": wallet_id, + "group_id": group_id, + "topic": topic, + "state": state, + "duration": duration, + "look_back": look_back, + } ) + bound_logger.debug("Processing events") # Get the current time in UTC current_time = datetime.now(timezone.utc) @@ -148,9 +149,9 @@ async def event_generator( end_time = time.time() + duration while not stop_event.is_set(): remaining_time = end_time - time.time() - logger.trace("remaining_time: {}", remaining_time) + bound_logger.trace("Remaining time: {}", remaining_time) if remaining_time <= 0: - logger.debug("Timeout reached") + bound_logger.debug("Timeout reached") stop_event.set() break @@ -160,25 +161,25 @@ async def event_generator( ) for message in messages: event = orjson.loads(message.data) - logger.trace("Received event: {}", event) + bound_logger.trace("Received event: {}", event) yield CloudApiWebhookEventGeneric(**event) await message.ack() except FetchTimeoutError: # Fetch timeout, continue - logger.trace("Timeout fetching messages continuing...") + bound_logger.trace("Timeout fetching messages continuing...") await asyncio.sleep(0.1) except TimeoutError: # Timeout error, resubscribe - logger.warning( + bound_logger.warning( "Subscription lost connection, attempting to resubscribe..." ) try: await subscription.unsubscribe() except BadSubscriptionError as e: # If we can't unsubscribe, log the error and continue - logger.warning( + bound_logger.warning( "BadSubscriptionError unsubscribing from NATS: {}", e ) @@ -189,15 +190,15 @@ async def event_generator( state=state, start_time=start_time, ) - logger.debug("Successfully resubscribed to NATS.") + bound_logger.debug("Successfully resubscribed to NATS.") except Exception: # pylint: disable=W0718 - logger.exception("Unexpected error in event generator") + bound_logger.exception("Unexpected error in event generator") stop_event.set() raise except asyncio.CancelledError: - logger.debug("Event generator cancelled") + bound_logger.debug("Event generator cancelled") stop_event.set() try: @@ -217,17 +218,17 @@ async def event_generator( state=state, ) except Exception as e: # pylint: disable=W0718 - logger.exception("Unexpected error processing events: {}") + bound_logger.exception("Unexpected error processing events: {}") raise e finally: if subscription: try: - logger.trace("Closing subscription...") + bound_logger.trace("Closing subscription...") await subscription.unsubscribe() - logger.debug("Subscription closed") + bound_logger.debug("Subscription closed") except BadSubscriptionError as e: - logger.warning( + bound_logger.warning( "BadSubscriptionError unsubscribing from NATS: {}", e ) diff --git a/waypoint/tests/services/test_nats_service.py b/waypoint/tests/services/test_nats_service.py index e7872e6fe..8c17f51ed 100644 --- a/waypoint/tests/services/test_nats_service.py +++ b/waypoint/tests/services/test_nats_service.py @@ -1,6 +1,6 @@ import asyncio import json -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, patch import pytest from nats.aio.client import Client as NATS @@ -9,7 +9,6 @@ from nats.js.api import ConsumerConfig, DeliverPolicy from nats.js.client import JetStreamContext from nats.js.errors import FetchTimeoutError -from tenacity import RetryCallState from shared.constants import NATS_STATE_STREAM, NATS_STATE_SUBJECT from shared.models.webhook_events import CloudApiWebhookEventGeneric @@ -44,7 +43,7 @@ async def test_init_nats_client(nats_creds_file): async def test_init_nats_client_error(exception): with patch("nats.connect", side_effect=exception): with pytest.raises(exception): - async for jetstream in init_nats_client(): + async for _ in init_nats_client(): pass @@ -123,7 +122,7 @@ async def test_process_events( topic="test_topic", state="state", stop_event=stop_event, - duration=1, + duration=0.5, ) as event_generator: events = [] async for event in event_generator: @@ -155,7 +154,7 @@ async def test_process_events_cancelled_error( topic="test_topic", state="state", stop_event=stop_event, - duration=1, + duration=0.5, ) as event_generator: events = [] async for event in event_generator: @@ -182,7 +181,7 @@ async def test_process_events_fetch_timeout_error( topic="test_topic", state="state", stop_event=stop_event, - duration=1, + duration=0.5, ) as event_generator: events = [] async for event in event_generator: @@ -215,7 +214,7 @@ async def test_process_events_timeout_error( topic="test_topic", state="state", stop_event=stop_event, - duration=2, + duration=0.5, ) as event_generator: events = [] async for event in event_generator: @@ -257,7 +256,7 @@ async def test_process_events_bad_subscription_error_on_unsubscribe( topic="test_topic", state="state", stop_event=stop_event, - duration=2, + duration=0.5, ) as event_generator: events = [] async for event in event_generator: @@ -297,7 +296,7 @@ async def test_process_events_base_exception( topic="test_topic", state="state", stop_event=stop_event, - duration=2, + duration=0.5, ) as event_generator: events = [] async for event in event_generator: @@ -372,27 +371,3 @@ def failed(self): def exception(self): return self._exception - - -def test_retry_log(mock_nats_client): # pylint: disable=redefined-outer-name - processor = NatsEventsProcessor(mock_nats_client) - # Mock a retry state - mock_retry_state = MagicMock(spec=RetryCallState) - - # Mock the outcome attribute with a Future-like object - mock_retry_state.outcome = MockFuture(exception=ValueError("Test retry exception")) - mock_retry_state.attempt_number = 3 # Retry attempt number - - # Patch the logger to capture log calls - with patch("waypoint.services.nats_service.logger") as mock_logger: - processor._retry_log( # pylint: disable=protected-access - retry_state=mock_retry_state - ) - - # Assert that logger.warning was called with the expected message - mock_logger.warning.assert_called_once_with( - "Retry attempt {} failed due to {}: {}", - 3, - "ValueError", - mock_retry_state.outcome.exception(), - )