Skip to content

Commit

Permalink
🎨 add some debug handling
Browse files Browse the repository at this point in the history
  • Loading branch information
cl0ete committed Dec 10, 2024
1 parent 91444c2 commit a29b1c8
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions waypoint/services/nats_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ async def event_generator(
start_time: str,
):
try:
current_subscription = subscription
end_time = time.time() + duration
while not stop_event.is_set():
remaining_time = end_time - time.time()
Expand All @@ -148,38 +147,47 @@ async def event_generator(
break

try:
messages = await current_subscription.fetch(
messages = await subscription.fetch(
batch=5, timeout=0.2, heartbeat=0.02
)
for message in messages:
event = orjson.loads(message.data)
logger.trace("Received event: {}", event)
yield CloudApiWebhookEventGeneric(**event)
await message.ack()

except FetchTimeoutError:
# Fetch timeout, continue
logger.trace("Timeout fetching messages continuing...")
await asyncio.sleep(0.1)

except TimeoutError:
# Timeout error, resubscribe
logger.warning(
"Subscription lost connection, attempting to resubscribe..."
)
await current_subscription.unsubscribe()
try:
current_subscription = await self._subscribe(
group_id=group_id,
wallet_id=wallet_id,
topic=topic,
state=state,
start_time=start_time,
await subscription.unsubscribe()
except BadSubscriptionError as e:
# If we can't unsubscribe, log the error and continue
logger.warning(
"BadSubscriptionError unsubscribing from NATS: {}", e
)
logger.info("Successfully resubscribed to NATS.")
except Exception as e: # pylint: disable=W0718
logger.error("Failed to resubscribe to NATS: {}", e)
await asyncio.sleep(1)

subscription = await self._subscribe(
group_id=group_id,
wallet_id=wallet_id,
topic=topic,
state=state,
start_time=start_time,
)
logger.info("Successfully resubscribed to NATS.")

except Exception as e: # pylint: disable=W0718
logger.exception("Unexpected error in event generator: {}", e)
stop_event.set()
break

except asyncio.CancelledError:
logger.debug("Event generator cancelled")
stop_event.set()
Expand Down

0 comments on commit a29b1c8

Please sign in to comment.