diff --git a/waypoint/services/nats_service.py b/waypoint/services/nats_service.py index fc55b2a77..530d5ea28 100644 --- a/waypoint/services/nats_service.py +++ b/waypoint/services/nats_service.py @@ -137,7 +137,6 @@ async def event_generator( start_time: str, ): try: - current_subscription = subscription end_time = time.time() + duration while not stop_event.is_set(): remaining_time = end_time - time.time() @@ -148,7 +147,7 @@ async def event_generator( break try: - messages = await current_subscription.fetch( + messages = await subscription.fetch( batch=5, timeout=0.2, heartbeat=0.02 ) for message in messages: @@ -156,30 +155,39 @@ async def event_generator( logger.trace("Received event: {}", event) yield CloudApiWebhookEventGeneric(**event) await message.ack() + except FetchTimeoutError: + # Fetch timeout, continue logger.trace("Timeout fetching messages continuing...") await asyncio.sleep(0.1) + except TimeoutError: + # Timeout error, resubscribe logger.warning( "Subscription lost connection, attempting to resubscribe..." ) - await current_subscription.unsubscribe() try: - current_subscription = await self._subscribe( - group_id=group_id, - wallet_id=wallet_id, - topic=topic, - state=state, - start_time=start_time, + await subscription.unsubscribe() + except BadSubscriptionError as e: + # If we can't unsubscribe, log the error and continue + logger.warning( + "BadSubscriptionError unsubscribing from NATS: {}", e ) - logger.info("Successfully resubscribed to NATS.") - except Exception as e: # pylint: disable=W0718 - logger.error("Failed to resubscribe to NATS: {}", e) - await asyncio.sleep(1) + + subscription = await self._subscribe( + group_id=group_id, + wallet_id=wallet_id, + topic=topic, + state=state, + start_time=start_time, + ) + logger.info("Successfully resubscribed to NATS.") + except Exception as e: # pylint: disable=W0718 logger.exception("Unexpected error in event generator: {}", e) stop_event.set() break + except asyncio.CancelledError: logger.debug("Event generator cancelled") stop_event.set()