From aa070fa02b6d9b6abcfd96d96d12a931c67ca006 Mon Sep 17 00:00:00 2001 From: adel Date: Wed, 28 Aug 2024 10:47:59 +0200 Subject: [PATCH] feat(theoros): Shared state for events (#9) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(theoros_remove_db): Remove db stuff * feat(theoros_remove_db): Shared state for events * feat(theoros_remove_db): Clean services, MadaraTM * feat(theoros_remove_db): * feat(theoros_remove_db): lintttt * feat(theoros_remove_db): meh * feat(theoros_remove_db): nit * feat(theoros_remove_db): rustdoc * feat(theoros_remove_db): Indexer log * feat(theoros_remove_db): Fixed double clone * feat(theoros_remove_db): metrics page * feat(theoros_remove_db): readme * feat(theoros_remove_db): utils -> pragma_utils * feat(theoros_remove_db): Updated workspace * feat(theoros_remove_db): Last nit πŸ’€ * feat(theoros_remove_db): Unwraps killed + unstable features * feat(theoros_remove_db): Removed parking_lot and used tokio - better for async context --- .prettierignore | 2 + README.md | 12 +- monorepo.code-workspace | 4 +- rust/Cargo.lock | 211 ++---------------- rust/Cargo.toml | 2 +- rust/pragma-cli/Cargo.toml | 3 +- rust/pragma-cli/src/main.rs | 2 +- rust/{utils => pragma-utils}/Cargo.toml | 4 +- .../src/conversions/apibara.rs | 0 .../src/conversions/mod.rs | 0 rust/{utils => pragma-utils}/src/lib.rs | 2 +- rust/pragma-utils/src/services.rs | 84 +++++++ rust/{utils => pragma-utils}/src/tracing.rs | 0 rust/rustfmt.toml | 14 -- rust/theoros/Cargo.toml | 12 +- .../down.sql | 6 - .../up.sql | 36 --- rust/theoros/openapi.json | 63 +----- rust/theoros/scripts/init_db.sh | 51 ----- rust/theoros/src/config.rs | 8 +- rust/theoros/src/errors/app_error.rs | 2 +- rust/theoros/src/infra/db/migrations.rs | 15 -- rust/theoros/src/infra/db/mod.rs | 1 - rust/theoros/src/infra/mod.rs | 1 - rust/theoros/src/main.rs | 69 +++--- rust/theoros/src/servers/api/mod.rs | 39 ---- rust/theoros/src/servers/mod.rs | 2 - .../src/{servers => services}/api/docs.rs | 0 rust/theoros/src/services/api/mod.rs | 56 +++++ .../src/{servers => services}/api/router.rs | 0 .../services/{indexer.rs => indexer/mod.rs} | 90 +++----- .../metrics.rs => services/metrics/mod.rs} | 33 ++- rust/theoros/src/services/mod.rs | 6 + rust/theoros/src/types/dispatch_event.rs | 2 +- rust/theoros/src/types/event_storage.rs | 34 +++ rust/theoros/src/types/mod.rs | 4 + rust/utils/src/db.rs | 14 -- 37 files changed, 322 insertions(+), 562 deletions(-) create mode 100644 .prettierignore rename rust/{utils => pragma-utils}/Cargo.toml (83%) rename rust/{utils => pragma-utils}/src/conversions/apibara.rs (100%) rename rust/{utils => pragma-utils}/src/conversions/mod.rs (100%) rename rust/{utils => pragma-utils}/src/lib.rs (67%) create mode 100644 rust/pragma-utils/src/services.rs rename rust/{utils => pragma-utils}/src/tracing.rs (100%) delete mode 100644 rust/theoros/migrations/00000000000000_diesel_initial_setup/down.sql delete mode 100644 rust/theoros/migrations/00000000000000_diesel_initial_setup/up.sql delete mode 100644 rust/theoros/scripts/init_db.sh delete mode 100644 rust/theoros/src/infra/db/migrations.rs delete mode 100644 rust/theoros/src/infra/db/mod.rs delete mode 100644 rust/theoros/src/infra/mod.rs delete mode 100644 rust/theoros/src/servers/api/mod.rs delete mode 100644 rust/theoros/src/servers/mod.rs rename rust/theoros/src/{servers => services}/api/docs.rs (100%) create mode 100644 rust/theoros/src/services/api/mod.rs rename rust/theoros/src/{servers => services}/api/router.rs (100%) rename rust/theoros/src/services/{indexer.rs => indexer/mod.rs} (52%) rename rust/theoros/src/{servers/metrics.rs => services/metrics/mod.rs} (81%) create mode 100644 rust/theoros/src/types/event_storage.rs delete mode 100644 rust/utils/src/db.rs diff --git a/.prettierignore b/.prettierignore new file mode 100644 index 00000000..0a0061b1 --- /dev/null +++ b/.prettierignore @@ -0,0 +1,2 @@ +target +openapi.json diff --git a/README.md b/README.md index 9ae87a88..e7ab2e8d 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,9 @@ [license]: https://www.apache.org/licenses/LICENSE-2.0 [license-badge]: https://img.shields.io/badge/License-Apache-blue.svg -## CLI +## Rust -Pragma CLI +Pragma CLI CLI used to interact with the Pragma protocol. @@ -20,6 +20,14 @@ CLI used to interact with the Pragma protocol. - Schedule new data feeds - Connect pragma to your protocol +Theoros + +Request the API to construct the calldata necessary for cross-chain updates. + +- Listens for live data feeds update +- Retrieves the signatures of the Hyperlane Validators +- Constructs the calldata for data feeds requested through HTTP/WebSocket + ## Solidity Solidity SDK diff --git a/monorepo.code-workspace b/monorepo.code-workspace index a14e0882..dddb5161 100644 --- a/monorepo.code-workspace +++ b/monorepo.code-workspace @@ -17,8 +17,8 @@ "name": "pragma-cli", }, { - "path": "rust/utils", - "name": "utils", + "path": "rust/pragma-utils", + "name": "pragma-utils", }, { "path": "solidity", diff --git a/rust/Cargo.lock b/rust/Cargo.lock index f3abb58e..f4ab5eae 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1375,46 +1375,6 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" -[[package]] -name = "deadpool" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" -dependencies = [ - "deadpool-runtime", - "num_cpus", - "tokio", -] - -[[package]] -name = "deadpool-diesel" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "590573e9e29c5190a5ff782136f871e6e652e35d598a349888e028693601adf1" -dependencies = [ - "deadpool", - "deadpool-sync", - "diesel", -] - -[[package]] -name = "deadpool-runtime" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" -dependencies = [ - "tokio", -] - -[[package]] -name = "deadpool-sync" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524bc3df0d57e98ecd022e21ba31166c2625e7d3e5bcc4510efaeeab4abcab04" -dependencies = [ - "deadpool-runtime", -] - [[package]] name = "der" version = "0.7.9" @@ -1470,63 +1430,6 @@ dependencies = [ "syn 2.0.75", ] -[[package]] -name = "diesel" -version = "2.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65e13bab2796f412722112327f3e575601a3e9cdcbe426f0d30dbf43f3f5dc71" -dependencies = [ - "bigdecimal 0.4.5", - "bitflags 2.6.0", - "byteorder", - "chrono", - "diesel_derives", - "ipnetwork", - "itoa", - "libc", - "num-bigint", - "num-integer", - "num-traits", - "pq-sys", - "r2d2", - "serde_json", - "time", - "uuid 1.10.0", -] - -[[package]] -name = "diesel_derives" -version = "2.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f2c3de51e2ba6bf2a648285696137aaf0f5f487bcbea93972fe8a364e131a4" -dependencies = [ - "diesel_table_macro_syntax", - "dsl_auto_type", - "proc-macro2", - "quote", - "syn 2.0.75", -] - -[[package]] -name = "diesel_migrations" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a73ce704bad4231f001bff3314d91dce4aba0770cee8b233991859abc15c1f6" -dependencies = [ - "diesel", - "migrations_internals", - "migrations_macros", -] - -[[package]] -name = "diesel_table_macro_syntax" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "209c735641a413bc68c4923a9d6ad4bcb3ca306b794edaa7eb0b3228a99ffb25" -dependencies = [ - "syn 2.0.75", -] - [[package]] name = "digest" version = "0.9.0" @@ -1586,20 +1489,6 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" -[[package]] -name = "dsl_auto_type" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5d9abe6314103864cc2d8901b7ae224e0ab1a103a0a416661b4097b0779b607" -dependencies = [ - "darling", - "either", - "heck 0.5.0", - "proc-macro2", - "quote", - "syn 2.0.75", -] - [[package]] name = "dunce" version = "1.0.5" @@ -2520,15 +2409,6 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" -[[package]] -name = "ipnetwork" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf466541e9d546596ee94f9f69590f89473455f88372423e0008fc1a7daf100e" -dependencies = [ - "serde", -] - [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -2760,27 +2640,6 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" -[[package]] -name = "migrations_internals" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd01039851e82f8799046eabbb354056283fb265c8ec0996af940f4e85a380ff" -dependencies = [ - "serde", - "toml", -] - -[[package]] -name = "migrations_macros" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb161cc72176cb37aa47f1fc520d3ef02263d67d661f44f05d05a079e1237fd" -dependencies = [ - "migrations_internals", - "proc-macro2", - "quote", -] - [[package]] name = "mime" version = "0.3.17" @@ -3327,15 +3186,6 @@ dependencies = [ "zerocopy", ] -[[package]] -name = "pq-sys" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a24ff9e4cf6945c988f0db7005d87747bf72864965c3529d259ad155ac41d584" -dependencies = [ - "vcpkg", -] - [[package]] name = "pragma-cli" version = "0.1.0" @@ -3352,6 +3202,7 @@ dependencies = [ "hex", "inquire", "parity-scale-codec", + "pragma-utils", "rand", "reqwest 0.12.7", "scale-info", @@ -3365,7 +3216,22 @@ dependencies = [ "toml", "tracing", "url", - "utils", +] + +[[package]] +name = "pragma-utils" +version = "0.1.0" +dependencies = [ + "anyhow", + "apibara-core", + "apibara-sdk", + "async-trait", + "futures", + "starknet 0.11.0", + "tokio", + "tracing", + "tracing-axiom", + "tracing-subscriber", ] [[package]] @@ -3614,17 +3480,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "r2d2" -version = "0.8.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" -dependencies = [ - "log", - "parking_lot", - "scheduled-thread-pool", -] - [[package]] name = "radium" version = "0.7.0" @@ -4119,15 +3974,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" -dependencies = [ - "parking_lot", -] - [[package]] name = "scopeguard" version = "1.2.0" @@ -4961,29 +4807,28 @@ dependencies = [ "anyhow", "apibara-core", "apibara-sdk", + "async-trait", "axum 0.7.5", "axum-macros", "bigdecimal 0.4.5", "chrono", - "deadpool-diesel", - "diesel", - "diesel_migrations", "dotenvy", "envy", "futures-util", "hyper 0.14.30", "opentelemetry 0.24.0", + "pragma-utils", "prometheus", "serde", "serde_json", "starknet 0.11.0", + "strum", "thiserror", "tokio", "tower-http", "tracing", "tracing-subscriber", "url", - "utils", "utoipa", "utoipa-swagger-ui", "utoipauto", @@ -5606,22 +5451,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" -[[package]] -name = "utils" -version = "0.1.0" -dependencies = [ - "anyhow", - "apibara-core", - "apibara-sdk", - "deadpool-diesel", - "futures", - "starknet 0.11.0", - "tokio", - "tracing", - "tracing-axiom", - "tracing-subscriber", -] - [[package]] name = "utoipa" version = "4.2.3" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index e575cb85..d056e953 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["pragma-cli", "theoros"] +members = ["pragma-cli", "theoros", "pragma-utils"] [workspace.package] version = "0.1.0" diff --git a/rust/pragma-cli/Cargo.toml b/rust/pragma-cli/Cargo.toml index 3e79d513..62e5221a 100644 --- a/rust/pragma-cli/Cargo.toml +++ b/rust/pragma-cli/Cargo.toml @@ -13,7 +13,6 @@ bollard = { workspace = true } clap = { workspace = true } dirs = { workspace = true } env_logger = { workspace = true } -#Β TODO: alloy alloy = { workspace = true } eyre = { workspace = true } futures-util = { workspace = true } @@ -35,4 +34,4 @@ toml = { workspace = true } tracing = { workspace = true } url = { workspace = true } -utils = { path = "../utils" } +pragma-utils = { path = "../pragma-utils" } diff --git a/rust/pragma-cli/src/main.rs b/rust/pragma-cli/src/main.rs index aa0e194d..fe084389 100644 --- a/rust/pragma-cli/src/main.rs +++ b/rust/pragma-cli/src/main.rs @@ -1,7 +1,7 @@ use anyhow::Result; use clap::{Parser, Subcommand}; +use pragma_utils::tracing::init_tracing; use tracing::Level; -use utils::tracing::init_tracing; use pragma_cli::{ cli::{ diff --git a/rust/utils/Cargo.toml b/rust/pragma-utils/Cargo.toml similarity index 83% rename from rust/utils/Cargo.toml rename to rust/pragma-utils/Cargo.toml index 4501833c..9b0cc811 100644 --- a/rust/utils/Cargo.toml +++ b/rust/pragma-utils/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "utils" +name = "pragma-utils" version = "0.1.0" edition = "2021" @@ -7,7 +7,7 @@ edition = "2021" anyhow = { workspace = true } apibara-core = { workspace = true } apibara-sdk = { workspace = true } -deadpool-diesel = { workspace = true, features = ["postgres"] } +async-trait = { workspace = true } futures = { workspace = true } starknet = { workspace = true } tokio = { workspace = true, features = ["signal"] } diff --git a/rust/utils/src/conversions/apibara.rs b/rust/pragma-utils/src/conversions/apibara.rs similarity index 100% rename from rust/utils/src/conversions/apibara.rs rename to rust/pragma-utils/src/conversions/apibara.rs diff --git a/rust/utils/src/conversions/mod.rs b/rust/pragma-utils/src/conversions/mod.rs similarity index 100% rename from rust/utils/src/conversions/mod.rs rename to rust/pragma-utils/src/conversions/mod.rs diff --git a/rust/utils/src/lib.rs b/rust/pragma-utils/src/lib.rs similarity index 67% rename from rust/utils/src/lib.rs rename to rust/pragma-utils/src/lib.rs index 648904ae..c9d7eba5 100644 --- a/rust/utils/src/lib.rs +++ b/rust/pragma-utils/src/lib.rs @@ -1,3 +1,3 @@ pub mod conversions; -pub mod db; +pub mod services; pub mod tracing; diff --git a/rust/pragma-utils/src/services.rs b/rust/pragma-utils/src/services.rs new file mode 100644 index 00000000..3071e640 --- /dev/null +++ b/rust/pragma-utils/src/services.rs @@ -0,0 +1,84 @@ +use anyhow::Context; +use std::panic; +use tokio::task::JoinSet; + +/// Source: +/// https://github.com/madara-alliance/madara/blob/main/crates/primitives/utils/src/service.rs +/// - +/// The app is divided into services, with each service having a different responsability within the app. +/// +/// This trait enables launching nested services and groups. +#[async_trait::async_trait] +pub trait Service: 'static + Send + Sync { + async fn start(&mut self, _join_set: &mut JoinSet>) -> anyhow::Result<()> { + Ok(()) + } + + async fn start_and_drive_to_end(mut self) -> anyhow::Result<()> + where + Self: Sized, + { + let mut join_set = JoinSet::new(); + self.start(&mut join_set).await.context("Starting service")?; + drive_joinset(join_set).await + } +} + +pub struct ServiceGroup { + services: Vec>, + join_set: Option>>, +} + +impl Default for ServiceGroup { + fn default() -> Self { + Self { services: vec![], join_set: Some(Default::default()) } + } +} + +impl ServiceGroup { + pub fn new(services: Vec>) -> Self { + Self { services, join_set: Some(Default::default()) } + } + + /// Add a new service to the service group. + pub fn push(&mut self, value: impl Service) { + if self.join_set.is_none() { + panic!("Cannot add services to a group that has been started.") + } + self.services.push(Box::new(value)); + } + + pub fn with(mut self, value: impl Service) -> Self { + self.push(value); + self + } +} + +#[async_trait::async_trait] +impl Service for ServiceGroup { + async fn start(&mut self, join_set: &mut JoinSet>) -> anyhow::Result<()> { + // drive the join set as a nested task + let mut own_join_set = self.join_set.take().expect("Service has already been started."); + for svc in self.services.iter_mut() { + svc.start(&mut own_join_set).await.context("Starting service")?; + } + + join_set.spawn(drive_joinset(own_join_set)); + Ok(()) + } +} + +async fn drive_joinset(mut join_set: JoinSet>) -> anyhow::Result<()> { + while let Some(result) = join_set.join_next().await { + match result { + Ok(result) => result?, + Err(panic_error) if panic_error.is_panic() => { + // bubble up panics too + panic::resume_unwind(panic_error.into_panic()); + } + Err(_task_cancelled_error) => {} + } + } + + Ok(()) +} diff --git a/rust/utils/src/tracing.rs b/rust/pragma-utils/src/tracing.rs similarity index 100% rename from rust/utils/src/tracing.rs rename to rust/pragma-utils/src/tracing.rs diff --git a/rust/rustfmt.toml b/rust/rustfmt.toml index af44f685..1c0bc12b 100644 --- a/rust/rustfmt.toml +++ b/rust/rustfmt.toml @@ -4,17 +4,3 @@ use_field_init_shorthand = true use_small_heuristics = "Max" use_try_shorthand = true max_width = 120 - -# Unstable features below -unstable_features = true -version = "Two" -comment_width = 100 -format_code_in_doc_comments = true -format_macro_bodies = true -format_macro_matchers = true -format_strings = true -imports_granularity = "Module" -group_imports = "StdExternalCrate" -normalize_comments = true -normalize_doc_attributes = true -wrap_comments = true diff --git a/rust/theoros/Cargo.toml b/rust/theoros/Cargo.toml index a57baf3a..34b93522 100644 --- a/rust/theoros/Cargo.toml +++ b/rust/theoros/Cargo.toml @@ -7,18 +7,11 @@ edition = "2021" anyhow = { workspace = true } apibara-core = { workspace = true } apibara-sdk = { workspace = true } +async-trait = { workspace = true } axum = { workspace = true, features = ["macros", "ws", "tokio"] } axum-macros = { workspace = true } bigdecimal = { workspace = true } chrono = { workspace = true, features = ["serde"] } -deadpool-diesel = { workspace = true, features = ["postgres"] } -diesel = { workspace = true, features = [ - "postgres", - "extras", - "postgres_backend", - "serde_json", -] } -diesel_migrations = { workspace = true } dotenvy = { workspace = true } envy = { workspace = true } futures-util = { workspace = true } @@ -28,6 +21,7 @@ prometheus = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } starknet = { workspace = true } +strum = { workspace = true, features = ["derive"] } thiserror = { workspace = true } tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] } tower-http = { workspace = true, features = ["fs", "trace", "cors"] } @@ -38,4 +32,4 @@ utoipa = { workspace = true, features = ["axum_extras", "chrono", "uuid"] } utoipa-swagger-ui = { workspace = true, features = ["axum"] } utoipauto = { workspace = true } -utils = { path = "../utils" } +pragma-utils = { path = "../pragma-utils" } diff --git a/rust/theoros/migrations/00000000000000_diesel_initial_setup/down.sql b/rust/theoros/migrations/00000000000000_diesel_initial_setup/down.sql deleted file mode 100644 index a9f52609..00000000 --- a/rust/theoros/migrations/00000000000000_diesel_initial_setup/down.sql +++ /dev/null @@ -1,6 +0,0 @@ --- This file was automatically created by Diesel to setup helper functions --- and other internal bookkeeping. This file is safe to edit, any future --- changes will be added to existing projects as new migrations. - -DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass); -DROP FUNCTION IF EXISTS diesel_set_updated_at(); diff --git a/rust/theoros/migrations/00000000000000_diesel_initial_setup/up.sql b/rust/theoros/migrations/00000000000000_diesel_initial_setup/up.sql deleted file mode 100644 index 2cad9831..00000000 --- a/rust/theoros/migrations/00000000000000_diesel_initial_setup/up.sql +++ /dev/null @@ -1,36 +0,0 @@ --- This file was automatically created by Diesel to setup helper functions --- and other internal bookkeeping. This file is safe to edit, any future --- changes will be added to existing projects as new migrations. - - --- Sets up a trigger for the given table to automatically set a column called --- `updated_at` whenever the row is modified (unless `updated_at` was included --- in the modified columns) --- --- # Example --- --- ```sql --- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW()); --- --- SELECT diesel_manage_updated_at('users'); --- ``` -CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS -$$ -BEGIN - EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s - FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl); -END; -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS -$$ -BEGIN - IF ( - NEW IS DISTINCT FROM OLD AND - NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at - ) THEN - NEW.updated_at := current_timestamp; - END IF; - RETURN NEW; -END; -$$ LANGUAGE plpgsql; diff --git a/rust/theoros/openapi.json b/rust/theoros/openapi.json index ae627cdb..03411612 100644 --- a/rust/theoros/openapi.json +++ b/rust/theoros/openapi.json @@ -1,62 +1 @@ -{ - "openapi": "3.0.3", - "info": { - "title": "theoros", - "description": "", - "license": { "name": "" }, - "version": "0.1.0" - }, - "paths": { - "/v1/calldata": { - "get": { - "tags": ["crate::handlers::get_calldata"], - "operationId": "get_calldata", - "parameters": [], - "responses": { - "200": { - "description": "Get the calldata", - "content": { - "application/json": { - "schema": { - "type": "array", - "items": { - "$ref": "#/components/schemas/GetCalldataResponse" - } - } - } - } - } - } - } - } - }, - "components": { - "schemas": { - "GetCalldataError": { - "type": "string", - "enum": ["InternalServerError", "DatabaseConnection"] - }, - "GetCalldataQuery": { "type": "object" }, - "GetCalldataResponse": { - "type": "object", - "required": ["hash"], - "properties": { "hash": { "type": "string" } } - } - }, - "responses": { - "GetCalldataResponse": { - "description": "", - "content": { - "application/json": { - "schema": { - "type": "object", - "required": ["hash"], - "properties": { "hash": { "type": "string" } } - } - } - } - } - } - }, - "tags": [{ "name": "theoros", "description": "Theoros Pragma Consultant" }] -} +{"openapi":"3.0.3","info":{"title":"theoros","description":"","license":{"name":""},"version":"0.1.0"},"paths":{"/v1/calldata":{"get":{"tags":["crate::handlers::get_calldata"],"operationId":"get_calldata","parameters":[],"responses":{"200":{"description":"Get the calldata","content":{"application/json":{"schema":{"type":"array","items":{"$ref":"#/components/schemas/GetCalldataResponse"}}}}}}}}},"components":{"schemas":{"GetCalldataError":{"type":"string","enum":["InternalServerError","DatabaseConnection"]},"GetCalldataQuery":{"type":"object"},"GetCalldataResponse":{"type":"object","required":["hash"],"properties":{"hash":{"type":"string"}}}},"responses":{"GetCalldataResponse":{"description":"","content":{"application/json":{"schema":{"type":"object","required":["hash"],"properties":{"hash":{"type":"string"}}}}}}}},"tags":[{"name":"theoros","description":"Theoros - the Pragma Consultant"}]} \ No newline at end of file diff --git a/rust/theoros/scripts/init_db.sh b/rust/theoros/scripts/init_db.sh deleted file mode 100644 index 8b13d43d..00000000 --- a/rust/theoros/scripts/init_db.sh +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env bash -if ! [ -x "$(command -v psql)" ]; then - echo >&2 "Error: psql is not installed." - exit 1 -fi - -if ! [ -x "$(command -v sqlx)" ]; then - echo >&2 "Error: sqlx is not installed." - exit 1 -fi - -# Check if a custom user exists - otherwise defaults to "postgres" -DB_USER="${POSTGRES_USER:=postgres}" - -# Check if a custom password exists - otherwise default to "password" -DB_PASSWORD="${POSTGRES_PASSWORD:=password}" - -# Check if a custom database name exists - otherwise default to "indexing" -DB_NAME="${POSTGRES_DB:=indexing}" - -# Check if a custom port exists - otherwise default to "5432" -DB_PORT="${POSTGRES_PORT:=5432}" - -# Check if a custom host exists - otherwise default to "localhost" -DB_HOST="${POSTGRES_HOST:=localhost}" - -CONTAINER_NAME="pragma-x-indexer-db" - -# Skip if the postgres container is already running -if [[ -z "${SKIP_DOCKER}" ]]; then - docker run \ - -e POSTGRES_USER=${DB_USER} \ - -e POSTGRES_PASSWORD=${DB_PASSWORD} \ - -e POSTGRES_DB=${DB_NAME} \ - -p "${DB_PORT}":5432 \ - --name ${CONTAINER_NAME} \ - -d postgres \ - postgres -N 1000 # Maximum number of connections -fi - -# Wait for the postgres container to be healthy -until docker exec ${CONTAINER_NAME} pg_isready; do - echo >&2 "🐘 Waiting for Postgres to be ready..." - sleep 2 -done - -DATABASE_URL=postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/${DB_NAME} -echo >&2 "Database URL: $DATABASE_URL" -export DATABASE_URL - -echo >&2 "πŸ₯³ Postgres is up and running at ${DB_HOST}:${DB_PORT}" diff --git a/rust/theoros/src/config.rs b/rust/theoros/src/config.rs index ac7535e6..a269b33d 100644 --- a/rust/theoros/src/config.rs +++ b/rust/theoros/src/config.rs @@ -1,7 +1,7 @@ use serde::Deserialize; use tokio::sync::OnceCell; -#[derive(Default, Debug, Deserialize, PartialEq)] +#[derive(Default, Debug, Deserialize, PartialEq, Clone)] #[serde(rename_all = "lowercase")] pub enum Mode { #[default] @@ -9,12 +9,12 @@ pub enum Mode { Production, } -#[derive(Default, Debug, Deserialize)] +#[derive(Default, Debug, Deserialize, Clone)] pub struct ModeConfig { mode: Mode, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub struct ServerConfig { host: String, port: u16, @@ -26,7 +26,7 @@ impl Default for ServerConfig { } } -#[derive(Default, Debug, Deserialize)] +#[derive(Default, Debug, Deserialize, Clone)] pub struct Config { mode: ModeConfig, server: ServerConfig, diff --git a/rust/theoros/src/errors/app_error.rs b/rust/theoros/src/errors/app_error.rs index 7aad8d59..1e385e6b 100644 --- a/rust/theoros/src/errors/app_error.rs +++ b/rust/theoros/src/errors/app_error.rs @@ -4,8 +4,8 @@ use axum::Json; use serde_json::json; #[derive(Debug)] -#[allow(unused)] pub enum AppError { + #[allow(unused)] InternalServerError, BodyParsingError(String), } diff --git a/rust/theoros/src/infra/db/migrations.rs b/rust/theoros/src/infra/db/migrations.rs deleted file mode 100644 index 8a4e185b..00000000 --- a/rust/theoros/src/infra/db/migrations.rs +++ /dev/null @@ -1,15 +0,0 @@ -use anyhow::{anyhow, Result}; -use deadpool_diesel::postgres::Pool; -use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; - -pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations/"); - -pub async fn run_migrations(pool: &Pool) -> Result<()> { - let conn = pool.get().await?; - conn.interact(|conn| conn.run_pending_migrations(MIGRATIONS).map(|_| ())) - .await - .map_err(|e| anyhow!("Could not run migrations: {e}"))? - .map_err(|e| anyhow!("Could not run migrations: {e}"))?; - - Ok(()) -} diff --git a/rust/theoros/src/infra/db/mod.rs b/rust/theoros/src/infra/db/mod.rs deleted file mode 100644 index 3e37da35..00000000 --- a/rust/theoros/src/infra/db/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod migrations; diff --git a/rust/theoros/src/infra/mod.rs b/rust/theoros/src/infra/mod.rs deleted file mode 100644 index dec10232..00000000 --- a/rust/theoros/src/infra/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod db; diff --git a/rust/theoros/src/main.rs b/rust/theoros/src/main.rs index 0c41646d..7d31e780 100644 --- a/rust/theoros/src/main.rs +++ b/rust/theoros/src/main.rs @@ -2,34 +2,36 @@ mod config; mod errors; mod extractors; mod handlers; -mod infra; -mod servers; mod services; mod types; +use std::sync::Arc; + use anyhow::Result; -use deadpool_diesel::postgres::Pool; use prometheus::Registry; use tracing::Level; -use utils::{db::init_db_pool, tracing::init_tracing}; + +use pragma_utils::{ + services::{Service, ServiceGroup}, + tracing::init_tracing, +}; use crate::{ - config::{config, Config}, - servers::{api::start_api_server, metrics::MetricsServer}, - services::indexer::start_indexer_service, + config::config, + services::{ApiService, IndexerService, MetricsService}, + types::EventStorage, }; // TODO: Config those const APP_NAME: &str = "theoros"; const LOG_LEVEL: Level = Level::INFO; -const ENV_DATABASE_URL: &str = "INDEXER_DB_URL"; +const EVENTS_MEM_SIZE: usize = 10; const METRICS_PORT: u16 = 8080; #[derive(Clone)] -#[allow(unused)] pub struct AppState { - indexer_pool: Pool, - metrics_registry: Registry, + pub event_storage: Arc, + pub metrics_registry: Registry, } #[tokio::main] @@ -37,41 +39,30 @@ pub struct AppState { async fn main() -> Result<()> { dotenvy::dotenv()?; let config = config().await; - init_tracing(APP_NAME, LOG_LEVEL)?; - // TODO: indexer_db_url should be handled in config() - let database_url = std::env::var(ENV_DATABASE_URL)?; - let indexer_pool = init_db_pool(APP_NAME, &database_url)?; - infra::db::migrations::run_migrations(&indexer_pool).await?; - - start_theorus(config, indexer_pool).await?; + init_tracing(APP_NAME, LOG_LEVEL)?; - // Ensure that the tracing provider is shutdown correctly - opentelemetry::global::shutdown_tracer_provider(); + let metrics_service = MetricsService::new(false, METRICS_PORT)?; - Ok(()) -} + // TODO: state should contains the rpc_client to interact with a Madara node + let state = AppState { + event_storage: Arc::new(EventStorage::new(EVENTS_MEM_SIZE)), + metrics_registry: metrics_service.registry(), + }; -/// Starts all the Theoros services, i.e: -/// - API server -/// - Indexer service -/// - Metrics server -async fn start_theorus(config: &Config, indexer_pool: Pool) -> Result<()> { - let metrics = MetricsServer::new(false, METRICS_PORT)?; + // TODO: Should be Pragma X DNA url - see with Apibara team + should be in config + let apibara_uri = "https://mainnet.starknet.a5a.ch"; + // TODO: key in config / .env (...) + let apibara_api_key = std::env::var("APIBARA_API_KEY")?; + let indexer_service = IndexerService::new(state.clone(), apibara_uri, apibara_api_key)?; - // TODO: state should contains the rpc_client to interact with a Madara node - let state = AppState { indexer_pool, metrics_registry: metrics.registry() }; + let api_service = ApiService::new(state.clone(), config.server_host(), config.server_port()); - // TODO: spawn one indexer for mainnet & one for testnet - let indexer_handle = start_indexer_service(config, state.clone())?; - let api_handle = start_api_server(config, state.clone())?; - let metrics_handle = metrics.start()?; + let theoros = ServiceGroup::default().with(metrics_service).with(indexer_service).with(api_service); + theoros.start_and_drive_to_end().await?; - // TODO: Better struct that groups handles, bubble errors etc... - let (indexer_result, api_result, metrics_result) = tokio::join!(indexer_handle, api_handle, metrics_handle); - indexer_result??; - api_result??; - metrics_result??; + // Ensure that the tracing provider is shutdown correctly + opentelemetry::global::shutdown_tracer_provider(); Ok(()) } diff --git a/rust/theoros/src/servers/api/mod.rs b/rust/theoros/src/servers/api/mod.rs deleted file mode 100644 index b6ac4485..00000000 --- a/rust/theoros/src/servers/api/mod.rs +++ /dev/null @@ -1,39 +0,0 @@ -pub mod docs; -pub mod router; - -use std::net::SocketAddr; - -use anyhow::{Context, Result}; -use router::api_router; -use tokio::{net::TcpListener, task::JoinHandle}; -use tower_http::{ - cors::CorsLayer, - trace::{DefaultMakeSpan, TraceLayer}, -}; - -use crate::{config::Config, servers::api::docs::ApiDoc, AppState}; - -#[tracing::instrument(skip(config, state))] -pub fn start_api_server(config: &Config, state: AppState) -> Result>> { - let host = config.server_host().to_owned(); - let port = config.server_port(); - - // Uncomment this line below in order to generate the OpenAPI specs in the theoros folder - // ApiDoc::generate_openapi_json("./theoros".into())?; - - let handle = tokio::spawn(async move { - let address = format!("{}:{}", host, port); - let socket_addr: SocketAddr = address.parse()?; - let listener = TcpListener::bind(socket_addr).await?; - - let router = api_router::(state.clone()) - .with_state(state.clone()) - .layer(TraceLayer::new_for_http().make_span_with(DefaultMakeSpan::default().include_headers(true))) - .layer(CorsLayer::permissive()); - - tracing::info!("🧩 API server started at http://{}", socket_addr); - axum::serve(listener, router).await.context("😱 API server stopped!") - }); - - Ok(handle) -} diff --git a/rust/theoros/src/servers/mod.rs b/rust/theoros/src/servers/mod.rs deleted file mode 100644 index 47e9eb41..00000000 --- a/rust/theoros/src/servers/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod api; -pub mod metrics; diff --git a/rust/theoros/src/servers/api/docs.rs b/rust/theoros/src/services/api/docs.rs similarity index 100% rename from rust/theoros/src/servers/api/docs.rs rename to rust/theoros/src/services/api/docs.rs diff --git a/rust/theoros/src/services/api/mod.rs b/rust/theoros/src/services/api/mod.rs new file mode 100644 index 00000000..63cd528e --- /dev/null +++ b/rust/theoros/src/services/api/mod.rs @@ -0,0 +1,56 @@ +pub mod docs; +pub mod router; + +use std::net::SocketAddr; + +use anyhow::{Context, Result}; +use docs::ApiDoc; +use router::api_router; +use tokio::{net::TcpListener, task::JoinSet}; +use tower_http::{ + cors::CorsLayer, + trace::{DefaultMakeSpan, TraceLayer}, +}; + +use pragma_utils::services::Service; + +use crate::AppState; + +pub struct ApiService { + state: AppState, + host: String, + port: u16, +} + +impl ApiService { + pub fn new(state: AppState, host: &str, port: u16) -> Self { + Self { state, host: host.to_owned(), port } + } +} + +#[async_trait::async_trait] +impl Service for ApiService { + async fn start(&mut self, join_set: &mut JoinSet>) -> anyhow::Result<()> { + // Uncomment this line below in order to generate the OpenAPI specs in the theoros folder + // ApiDoc::generate_openapi_json("./theoros".into())?; + + let host = self.host.to_owned(); + let port = self.port; + let state = self.state.clone(); + + join_set.spawn(async move { + let address = format!("{}:{}", host, port); + let socket_addr: SocketAddr = address.parse()?; + let listener = TcpListener::bind(socket_addr).await?; + + let router = api_router::(state.clone()) + .with_state(state) + .layer(TraceLayer::new_for_http().make_span_with(DefaultMakeSpan::default().include_headers(true))) + .layer(CorsLayer::permissive()); + + tracing::info!("🧩 API server started at http://{}", socket_addr); + axum::serve(listener, router).await.context("😱 API server stopped!") + }); + Ok(()) + } +} diff --git a/rust/theoros/src/servers/api/router.rs b/rust/theoros/src/services/api/router.rs similarity index 100% rename from rust/theoros/src/servers/api/router.rs rename to rust/theoros/src/services/api/router.rs diff --git a/rust/theoros/src/services/indexer.rs b/rust/theoros/src/services/indexer/mod.rs similarity index 52% rename from rust/theoros/src/services/indexer.rs rename to rust/theoros/src/services/indexer/mod.rs index ea595f4e..ecdb7dbd 100644 --- a/rust/theoros/src/services/indexer.rs +++ b/rust/theoros/src/services/indexer/mod.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + use anyhow::{anyhow, Context, Result}; use apibara_core::{ node::v1alpha2::DataFinality, @@ -6,52 +8,45 @@ use apibara_core::{ use apibara_sdk::{configuration, ClientBuilder, Configuration, DataMessage, Uri}; use futures_util::TryStreamExt; use starknet::core::types::Felt; -use tokio::task::JoinHandle; +use tokio::task::JoinSet; -use utils::conversions::apibara::felt_as_apibara_field; +use pragma_utils::{conversions::apibara::felt_as_apibara_field, services::Service}; -use crate::{config::Config, types::dispatch_event::DispatchEvent, AppState}; +use crate::{types::DispatchEvent, AppState}; // TODO: depends on the host machine - should be configurable const INDEXING_STREAM_CHUNK_SIZE: usize = 256; -/// Creates & run the indexer service. -#[tracing::instrument(skip(_config, _state))] -pub fn start_indexer_service(_config: &Config, _state: AppState) -> Result>> { - // TODO: retrieve API key from config - let apibara_api_key = std::env::var("APIBARA_API_KEY")?; - - let handle = tokio::spawn(async move { - let indexer_service = IndexerService::new(apibara_api_key); - // TODO: network should be in the config - tracing::info!("🧩 Indexer service running for mainnet!"); - indexer_service.start().await.context("😱 Indexer service failed!") - }); - Ok(handle) -} - -#[allow(unused)] +#[derive(Clone)] pub struct IndexerService { + state: AppState, uri: Uri, apibara_api_key: String, stream_config: Configuration, - reached_pending_block: bool, +} + +#[async_trait::async_trait] +impl Service for IndexerService { + async fn start(&mut self, join_set: &mut JoinSet>) -> anyhow::Result<()> { + let service = self.clone(); + join_set.spawn(async move { + tracing::info!("🧩 Indexer service started"); + service.run_forever().await?; + Ok(()) + }); + Ok(()) + } } impl IndexerService { - pub fn new(apibara_api_key: String) -> IndexerService { - // TODO: Should be Pragma X DNA url - see with Apibara team + should be in config - let uri = Uri::from_static("https://mainnet.starknet.a5a.ch"); - // TODO: this should not be a parameter & retrieve from the latest block indexed from the database - // for the selected network - let from_block = 10; + pub fn new(state: AppState, apibara_uri: &str, apibara_api_key: String) -> Result { + let uri = Uri::from_str(apibara_uri)?; // TODO: should be a config let pragma_oracle_contract = felt_as_apibara_field(&Felt::ZERO); // TODO: should be a config let dispatch_event_selector = felt_as_apibara_field(&Felt::ZERO); let stream_config = Configuration::::default() - .with_starting_block(from_block) .with_finality(DataFinality::DataStatusPending) .with_filter(|mut filter| { filter @@ -64,22 +59,23 @@ impl IndexerService { .build() }); - IndexerService { uri, apibara_api_key, stream_config, reached_pending_block: false } + let indexer_service = Self { state, uri, apibara_api_key, stream_config }; + Ok(indexer_service) } - pub async fn start(mut self) -> Result<()> { + pub async fn run_forever(mut self) -> Result<()> { let (config_client, config_stream) = configuration::channel(INDEXING_STREAM_CHUNK_SIZE); - config_client.send(self.stream_config.clone()).await.unwrap(); + config_client.send(self.stream_config.clone()).await.context("Sending indexing stream configuration")?; let mut stream = ClientBuilder::default() .with_bearer_token(Some(self.apibara_api_key.clone())) .connect(self.uri.clone()) .await - .unwrap() + .map_err(|e| anyhow!("Error while connecting to Apibara DNA: {}", e))? .start_stream::(config_stream) .await - .unwrap(); + .map_err(|e| anyhow!("Error while starting indexing stream: {}", e))?; loop { match stream.try_next().await { @@ -93,14 +89,10 @@ impl IndexerService { /// Process a batch of blocks indexed by Apibara DNA async fn process_batch(&mut self, batch: DataMessage) -> Result<()> { match batch { - DataMessage::Data { cursor: _, end_cursor: _, finality, batch } => { - if finality == DataFinality::DataStatusPending && !self.reached_pending_block { - self.log_pending_block_reached(batch.last()); - self.reached_pending_block = true; - } + DataMessage::Data { cursor: _, end_cursor: _, finality: _, batch } => { for block in batch { for event in block.events.into_iter().filter_map(|e| e.event) { - self.process_dispatch_event(event).await?; + self.decode_and_store_event(event).await?; } } } @@ -117,28 +109,12 @@ impl IndexerService { Ok(()) } - async fn process_dispatch_event(&self, event: Event) -> Result<()> { + async fn decode_and_store_event(&self, event: Event) -> Result<()> { if event.from_address.is_none() { return Ok(()); } - - let _dispatch = DispatchEvent::from_event_data(event.data); - + let dispatch_event = DispatchEvent::from_event_data(event.data)?; + self.state.event_storage.add(dispatch_event).await; Ok(()) } - - /// Logs that we successfully reached current pending block - fn log_pending_block_reached(&self, last_block_in_batch: Option<&Block>) { - let maybe_pending_block_number = if let Some(last_block) = last_block_in_batch { - last_block.header.as_ref().map(|header| header.block_number) - } else { - None - }; - - if let Some(pending_block_number) = maybe_pending_block_number { - tracing::info!("[πŸ” Indexer] πŸ₯³πŸŽ‰ Reached pending block #{}!", pending_block_number); - } else { - tracing::info!("[πŸ” Indexer] πŸ₯³πŸŽ‰ Reached pending block!"); - } - } } diff --git a/rust/theoros/src/servers/metrics.rs b/rust/theoros/src/services/metrics/mod.rs similarity index 81% rename from rust/theoros/src/servers/metrics.rs rename to rust/theoros/src/services/metrics/mod.rs index 1d2b4247..fbe95e41 100644 --- a/rust/theoros/src/servers/metrics.rs +++ b/rust/theoros/src/services/metrics/mod.rs @@ -8,9 +8,6 @@ use hyper::{ service::{make_service_fn, service_fn}, Body, Request, Response, Server, StatusCode, }; -use prometheus::{Encoder, TextEncoder}; -use tokio::{net::TcpListener, task::JoinHandle}; - #[allow(unused)] pub use prometheus::{ self, @@ -18,9 +15,16 @@ pub use prometheus::{ AtomicF64 as F64, AtomicI64 as I64, AtomicU64 as U64, GenericCounter as Counter, GenericCounterVec as CounterVec, GenericGauge as Gauge, GenericGaugeVec as GaugeVec, }, - exponential_buckets, Error as PrometheusError, Histogram, HistogramOpts, HistogramVec, IntGaugeVec, Opts, Registry, + exponential_buckets, Encoder, Error as PrometheusError, Histogram, HistogramOpts, HistogramVec, IntGaugeVec, Opts, + Registry, TextEncoder, +}; +use tokio::{ + net::TcpListener, + task::{JoinHandle, JoinSet}, }; +use pragma_utils::services::Service; + #[derive(thiserror::Error, Debug)] #[error("error while handling request in prometheus endpoint: {0}")] enum MetricsError { @@ -29,13 +33,26 @@ enum MetricsError { HyperHttp(#[from] hyper::http::Error), } -pub struct MetricsServer { +#[derive(Clone)] +pub struct MetricsService { prometheus_external: bool, prometheus_port: u16, registry: Registry, } -impl MetricsServer { +#[async_trait::async_trait] +impl Service for MetricsService { + async fn start(&mut self, join_set: &mut JoinSet>) -> anyhow::Result<()> { + let service = self.clone(); + join_set.spawn(async move { + service.run_forever()?; + Ok(()) + }); + Ok(()) + } +} + +impl MetricsService { pub fn new(prometheus_external: bool, prometheus_port: u16) -> Result { let service = Self { prometheus_external, prometheus_port, registry: Default::default() }; Ok(service) @@ -45,7 +62,7 @@ impl MetricsServer { self.registry.clone() } - pub fn start(&self) -> Result>> { + pub fn run_forever(&self) -> Result>> { let listen_addr = if self.prometheus_external { Ipv4Addr::UNSPECIFIED // listen on 0.0.0.0 } else { @@ -101,6 +118,6 @@ async fn endpoint(req: Request, registry: Registry) -> Result
πŸ‘‰ See Metrics"))?) + .body(Body::from("See Metrics"))?) } } diff --git a/rust/theoros/src/services/mod.rs b/rust/theoros/src/services/mod.rs index d99eb481..61b19ff1 100644 --- a/rust/theoros/src/services/mod.rs +++ b/rust/theoros/src/services/mod.rs @@ -1 +1,7 @@ +pub mod api; pub mod indexer; +pub mod metrics; + +pub use api::ApiService; +pub use indexer::IndexerService; +pub use metrics::MetricsService; diff --git a/rust/theoros/src/types/dispatch_event.rs b/rust/theoros/src/types/dispatch_event.rs index 4fa982ba..c908b092 100644 --- a/rust/theoros/src/types/dispatch_event.rs +++ b/rust/theoros/src/types/dispatch_event.rs @@ -3,7 +3,7 @@ use apibara_core::starknet::v1alpha2::FieldElement; use bigdecimal::BigDecimal; use starknet::core::types::U256; -use utils::conversions::apibara::FromFieldBytes; +use pragma_utils::conversions::apibara::FromFieldBytes; #[derive(Debug, Clone)] #[allow(unused)] diff --git a/rust/theoros/src/types/event_storage.rs b/rust/theoros/src/types/event_storage.rs new file mode 100644 index 00000000..f53234d5 --- /dev/null +++ b/rust/theoros/src/types/event_storage.rs @@ -0,0 +1,34 @@ +use std::collections::VecDeque; + +use tokio::sync::RwLock; + +use crate::types::dispatch_event::DispatchEvent; + +/// FIFO Buffer of fixed size used to store DispatchEvent from our +/// oracle contract ; the first element being the latest. +pub struct EventStorage { + dispatches: RwLock>, + max_size: usize, +} + +impl EventStorage { + pub fn new(max_size: usize) -> Self { + Self { dispatches: RwLock::new(VecDeque::with_capacity(max_size)), max_size } + } + + pub async fn add(&self, dispatch: DispatchEvent) { + let mut dispatches = self.dispatches.write().await; + dispatches.push_front(dispatch); + if dispatches.len() > self.max_size { + dispatches.pop_back(); + } + } + + pub async fn latest(&self) -> Option { + self.dispatches.read().await.front().cloned() + } + + pub async fn all(&self) -> Vec { + self.dispatches.read().await.iter().cloned().collect() + } +} diff --git a/rust/theoros/src/types/mod.rs b/rust/theoros/src/types/mod.rs index 61c3fda3..bfa0694a 100644 --- a/rust/theoros/src/types/mod.rs +++ b/rust/theoros/src/types/mod.rs @@ -1 +1,5 @@ pub mod dispatch_event; +pub mod event_storage; + +pub use dispatch_event::*; +pub use event_storage::*; diff --git a/rust/utils/src/db.rs b/rust/utils/src/db.rs deleted file mode 100644 index 8f9ac9d4..00000000 --- a/rust/utils/src/db.rs +++ /dev/null @@ -1,14 +0,0 @@ -use anyhow::{anyhow, Result}; - -use deadpool_diesel::postgres::{Manager, Pool}; - -pub fn init_db_pool(app_name: &str, database_url: &str) -> Result { - let manager_url = format!("{}?application_name={}", database_url, app_name); - let manager = Manager::new(manager_url, deadpool_diesel::Runtime::Tokio1); - - Pool::builder(manager) - // TODO: configurable db max conn - .max_size(25) - .build() - .map_err(|e| anyhow!("Could not build database Pool: {e}")) -}