Skip to content

Commit

Permalink
refactor(event-listeners): flatten crate structures
Browse files Browse the repository at this point in the history
  • Loading branch information
Serial-ATA committed Jan 3, 2025
1 parent 4652366 commit 8a62871
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 175 deletions.
121 changes: 0 additions & 121 deletions crates/event-listeners/evm/src/contracts.rs

This file was deleted.

9 changes: 9 additions & 0 deletions crates/event-listeners/evm/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Client error: {0}")]
Client(String),
#[error("Transport error: {0}")]
TransportError(#[from] alloy_transport::RpcError<alloy_transport::TransportErrorKind>),
}

pub type Result<T> = gadget_std::result::Result<T, Error>;
129 changes: 120 additions & 9 deletions crates/event-listeners/evm/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,124 @@
use thiserror::Error;
pub mod error;
use error::Error;

pub mod contracts;
use alloy_contract::ContractInstance;
use alloy_contract::Event;
use alloy_network::Ethereum;
use alloy_provider::Provider;
use alloy_provider::RootProvider;
use alloy_rpc_types::{BlockNumberOrTag, Filter};
use alloy_sol_types::SolEvent;
use alloy_transport::BoxTransport;
use gadget_event_listeners_core::{Error as CoreError, EventListener};
use gadget_std::collections::VecDeque;
use gadget_std::time::Duration;
use gadget_stores::local_database::LocalDatabase;
use uuid::Uuid;

#[derive(Debug, Error)]
pub enum EvmEventListenerError {
#[error("Client error: {0}")]
Client(String),
#[error("Transport error: {0}")]
TransportError(#[from] alloy_transport::RpcError<alloy_transport::TransportErrorKind>),
pub type AlloyRootProvider = RootProvider<BoxTransport>;
pub type AlloyContractInstance = ContractInstance<BoxTransport, AlloyRootProvider, Ethereum>;

pub struct EvmContractEventListener<E: SolEvent + Send + 'static> {
instance: AlloyContractInstance,
chain_id: u64,
local_db: LocalDatabase<u64>,
should_cooldown: bool,
enqueued_events: VecDeque<(E, alloy_rpc_types::Log)>,
}

pub type Result<T> = gadget_std::result::Result<T, EvmEventListenerError>;
#[async_trait::async_trait]
impl<E: SolEvent + Send + Sync + 'static>
EventListener<(E, alloy_rpc_types::Log), AlloyContractInstance>
for EvmContractEventListener<E>
{
type ProcessorError = Error;

async fn new(context: &AlloyContractInstance) -> Result<Self, CoreError<Self::ProcessorError>>
where
Self: Sized,
{
let provider = context.provider().root();
// Add more detailed error handling and logging
let chain_id = provider
.get_chain_id()
.await
.map_err(Self::ProcessorError::from)?;

let local_db = LocalDatabase::open(format!("./db/{}", Uuid::new_v4()));
Ok(Self {
chain_id,
should_cooldown: false,
enqueued_events: VecDeque::new(),
local_db,
instance: context.clone(),
})
}

async fn next_event(&mut self) -> Option<(E, alloy_rpc_types::Log)> {
if let Some(event) = self.enqueued_events.pop_front() {
return Some(event);
}

if self.should_cooldown {
tokio::time::sleep(Duration::from_millis(5000)).await;
self.should_cooldown = false;
}

let contract = &self.instance;
let step = 100;
let target_block_number: u64 = contract
.provider()
.get_block_number()
.await
.unwrap_or_default();

let block = self
.local_db
.get(&format!("LAST_BLOCK_NUMBER_{}", contract.address()))
.unwrap_or(0);

let should_cooldown = block >= target_block_number;
if should_cooldown {
self.should_cooldown = true;
return self.next_event().await;
}

let dest_block = core::cmp::min(block + step, target_block_number);

// Query events
let events_filter = Event::new(contract.provider(), Filter::new())
.address(*contract.address())
.from_block(BlockNumberOrTag::Number(block + 1))
.to_block(BlockNumberOrTag::Number(dest_block))
.event_signature(E::SIGNATURE_HASH);

gadget_logging::info!("Querying events for filter, address: {}, from_block: {}, to_block: {}, event_signature: {}", contract.address(), block + 1, dest_block, E::SIGNATURE_HASH);
match events_filter.query().await {
Ok(events) => {
let events = events.into_iter().collect::<VecDeque<_>>();
self.local_db.set(
&format!("LAST_BLOCK_NUMBER_{}", contract.address()),
dest_block,
);

self.local_db.set(
&format!("TARGET_BLOCK_{}", contract.address()),
target_block_number,
);

if events.is_empty() {
self.should_cooldown = true;
return self.next_event().await;
}

self.enqueued_events = events;

self.next_event().await
}
Err(e) => {
gadget_logging::error!(?e, %self.chain_id, "Error while querying events");
None
}
}
}
}
7 changes: 3 additions & 4 deletions crates/event-listeners/periodic/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use gadget_std::string::String;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum PeriodicEventListenerError {
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Inner listener error: {0}")]
InnerListener(String),
}

pub type Result<T> = gadget_std::result::Result<T, PeriodicEventListenerError>;
pub type Result<T> = gadget_std::result::Result<T, Error>;
42 changes: 41 additions & 1 deletion crates/event-listeners/periodic/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,42 @@
pub mod error;
pub mod periodic;
use error::Error;

use async_trait::async_trait;
use gadget_event_listeners_core::{Error as CoreError, EventListener};
use gadget_std::time::Duration;

#[derive(Default)]
pub struct PeriodicEventListener<const MSEC: usize, T, Event, Ctx = ()> {
listener: T,
_pd: gadget_std::marker::PhantomData<(Event, Ctx)>,
}

#[async_trait]
impl<
const MSEC: usize,
T: EventListener<Event, Ctx>,
Ctx: Send + Sync + 'static,
Event: Send + Sync + 'static,
> EventListener<Event, Ctx> for PeriodicEventListener<MSEC, T, Event, Ctx>
{
type ProcessorError = Error;

async fn new(context: &Ctx) -> Result<Self, CoreError<Self::ProcessorError>>
where
Self: Sized,
{
let listener = T::new(context)
.await
.map_err(|e| Error::InnerListener(e.to_string()))?;
Ok(Self {
listener,
_pd: gadget_std::marker::PhantomData,
})
}

async fn next_event(&mut self) -> Option<Event> {
let interval = Duration::from_millis(MSEC as u64);
tokio::time::sleep(interval).await;
self.listener.next_event().await
}
}
40 changes: 0 additions & 40 deletions crates/event-listeners/periodic/src/periodic.rs

This file was deleted.

0 comments on commit 8a62871

Please sign in to comment.