From 9bf4ad27db958a2174821d6d79a4116a30d7b901 Mon Sep 17 00:00:00 2001 From: 0xevolve Date: Sun, 24 Nov 2024 19:27:03 +0000 Subject: [PATCH] feat: add connection_string column --- .../down.sql | 2 + .../up.sql | 4 ++ src/domain/models/indexer.rs | 1 + src/handlers/indexers/create_indexer.rs | 40 ++++++++++++++----- src/handlers/indexers/indexer_types/mod.rs | 1 + .../indexers/indexer_types/postgres.rs | 5 ++- src/infra/db/schema.rs | 1 + src/infra/repositories/indexer_repository.rs | 6 +++ src/tests/common/mod.rs | 5 +++ src/tests/repository.rs | 6 +++ 10 files changed, 59 insertions(+), 12 deletions(-) create mode 100644 migrations/2024-11-24-191420_add_indexer_custom_connection_string/down.sql create mode 100644 migrations/2024-11-24-191420_add_indexer_custom_connection_string/up.sql diff --git a/migrations/2024-11-24-191420_add_indexer_custom_connection_string/down.sql b/migrations/2024-11-24-191420_add_indexer_custom_connection_string/down.sql new file mode 100644 index 0000000..43af77d --- /dev/null +++ b/migrations/2024-11-24-191420_add_indexer_custom_connection_string/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE indexers DROP COLUMN custom_connection_string; diff --git a/migrations/2024-11-24-191420_add_indexer_custom_connection_string/up.sql b/migrations/2024-11-24-191420_add_indexer_custom_connection_string/up.sql new file mode 100644 index 0000000..ad84af7 --- /dev/null +++ b/migrations/2024-11-24-191420_add_indexer_custom_connection_string/up.sql @@ -0,0 +1,4 @@ +-- Your SQL goes here +-- Add custom connection string column +ALTER TABLE indexers +ADD COLUMN custom_connection_string VARCHAR; \ No newline at end of file diff --git a/src/domain/models/indexer.rs b/src/domain/models/indexer.rs index 9a428a5..f3079a2 100644 --- a/src/domain/models/indexer.rs +++ b/src/domain/models/indexer.rs @@ -41,6 +41,7 @@ pub struct IndexerModel { pub target_url: Option, pub table_name: Option, pub status_server_port: Option, + pub custom_connection_string: Option, } #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] diff --git a/src/handlers/indexers/create_indexer.rs b/src/handlers/indexers/create_indexer.rs index 274dd35..058d101 100644 --- a/src/handlers/indexers/create_indexer.rs +++ b/src/handlers/indexers/create_indexer.rs @@ -7,6 +7,7 @@ use axum::Json; use diesel::SelectableHelper; use diesel_async::scoped_futures::ScopedFutureExt; use diesel_async::{AsyncConnection, RunQueryDsl}; +use serde::Deserialize; use uuid::Uuid; use crate::config::config; @@ -19,13 +20,29 @@ use crate::publishers::indexers::publish_start_indexer; use crate::utils::env::get_environment_variable; use crate::AppState; -#[derive(Default)] -struct CreateIndexerRequest { - target_url: Option, - data: Bytes, - table_name: Option, - indexer_type: IndexerType, - status_server_port: i32, +#[derive(Debug, Deserialize)] +pub struct CreateIndexerRequest { + pub indexer_type: IndexerType, + pub target_url: Option, + pub table_name: Option, + pub custom_connection_string: Option, + #[serde(skip)] + pub data: Bytes, + #[serde(skip)] + pub status_server_port: i32, +} + +impl Default for CreateIndexerRequest { + fn default() -> Self { + Self { + indexer_type: IndexerType::default(), + target_url: None, + table_name: None, + custom_connection_string: None, + data: Bytes::new(), + status_server_port: 1234, + } + } } impl CreateIndexerRequest { @@ -103,12 +120,13 @@ pub async fn create_indexer( let id = Uuid::new_v4(); let create_indexer_request = build_create_indexer_request(&mut request).await?; let new_indexer_db = indexer_repository::NewIndexerDb { + id, status: IndexerStatus::Created.to_string(), type_: create_indexer_request.indexer_type.to_string(), - id, - target_url: create_indexer_request.target_url, - table_name: create_indexer_request.table_name, - status_server_port: Some(create_indexer_request.status_server_port), + target_url: create_indexer_request.target_url.clone(), + table_name: create_indexer_request.table_name.clone(), + status_server_port: None, + custom_connection_string: create_indexer_request.custom_connection_string.clone(), }; let config = config().await; diff --git a/src/handlers/indexers/indexer_types/mod.rs b/src/handlers/indexers/indexer_types/mod.rs index 9acfa3b..2c3c6ed 100644 --- a/src/handlers/indexers/indexer_types/mod.rs +++ b/src/handlers/indexers/indexer_types/mod.rs @@ -212,6 +212,7 @@ mod tests { target_url: Some("https://example.com".to_string()), table_name: None, status_server_port: Some(1234), + custom_connection_string: None, }; // clear the sqs queue diff --git a/src/handlers/indexers/indexer_types/postgres.rs b/src/handlers/indexers/indexer_types/postgres.rs index be5cfa3..34c2ce9 100644 --- a/src/handlers/indexers/indexer_types/postgres.rs +++ b/src/handlers/indexers/indexer_types/postgres.rs @@ -10,7 +10,10 @@ pub struct PostgresIndexer; impl Indexer for PostgresIndexer { async fn start(&self, indexer: &IndexerModel, attempt: u32) -> Result { let binary_file = format!("{}/{}", get_environment_variable("BINARY_BASE_PATH"), "sink-postgres"); - let postgres_connection_string = get_environment_variable("APIBARA_POSTGRES_CONNECTION_STRING"); + let postgres_connection_string = indexer + .custom_connection_string + .clone() + .unwrap_or_else(|| get_environment_variable("APIBARA_POSTGRES_CONNECTION_STRING")); let table_name = indexer.table_name.as_ref().expect("`table_name` not set for postgres indexer"); let id = self.start_common( binary_file, diff --git a/src/infra/db/schema.rs b/src/infra/db/schema.rs index 1124b29..e537476 100644 --- a/src/infra/db/schema.rs +++ b/src/infra/db/schema.rs @@ -10,6 +10,7 @@ diesel::table! { target_url -> Nullable, table_name -> Nullable, status_server_port -> Nullable, + custom_connection_string -> Nullable, } } diff --git a/src/infra/repositories/indexer_repository.rs b/src/infra/repositories/indexer_repository.rs index ec06aae..924e6a3 100644 --- a/src/infra/repositories/indexer_repository.rs +++ b/src/infra/repositories/indexer_repository.rs @@ -23,6 +23,7 @@ pub struct IndexerDb { pub target_url: Option, pub table_name: Option, pub status_server_port: Option, + pub custom_connection_string: Option, } #[derive(Deserialize)] @@ -39,6 +40,7 @@ pub struct NewIndexerDb { pub target_url: Option, pub table_name: Option, pub status_server_port: Option, + pub custom_connection_string: Option, } #[derive(Deserialize, Insertable)] @@ -220,6 +222,7 @@ impl TryFrom for IndexerModel { process_id: None, table_name: value.table_name, status_server_port: value.status_server_port, + custom_connection_string: value.custom_connection_string, } .try_into()?; Ok(model) @@ -237,6 +240,7 @@ impl TryFrom for IndexerModel { target_url: value.target_url, table_name: value.table_name, status_server_port: value.status_server_port, + custom_connection_string: value.custom_connection_string, }; Ok(model) } @@ -273,6 +277,7 @@ mod tests { target_url: Some(target_url.to_string()), table_name: Some(table_name.into()), status_server_port: Some(1234), + custom_connection_string: None, }; let indexer_model: Result = indexer_db.try_into(); @@ -313,6 +318,7 @@ mod tests { target_url: Some(target_url.to_string()), table_name: Some(table_name.into()), status_server_port: Some(1234), + custom_connection_string: None, }; let indexer_model: Result = indexer_db.try_into(); diff --git a/src/tests/common/mod.rs b/src/tests/common/mod.rs index 4ae8646..473fea3 100644 --- a/src/tests/common/mod.rs +++ b/src/tests/common/mod.rs @@ -30,6 +30,7 @@ impl MockRepository { target_url: Some("https://example.com".to_string()), table_name: None, status_server_port: None, + custom_connection_string: None, }, IndexerModel { id: uuid::Uuid::new_v4(), @@ -39,6 +40,7 @@ impl MockRepository { target_url: Some("https://example.com".to_string()), table_name: None, status_server_port: None, + custom_connection_string: None, }, IndexerModel { id: uuid::Uuid::new_v4(), @@ -48,6 +50,7 @@ impl MockRepository { target_url: Some("https://example.com".to_string()), table_name: None, status_server_port: None, + custom_connection_string: None, }, IndexerModel { id: uuid::Uuid::new_v4(), @@ -57,6 +60,7 @@ impl MockRepository { target_url: Some("https://example.com".to_string()), table_name: None, status_server_port: None, + custom_connection_string: None, }, IndexerModel { id: uuid::Uuid::new_v4(), @@ -66,6 +70,7 @@ impl MockRepository { target_url: Some("https://example.com".to_string()), table_name: None, status_server_port: None, + custom_connection_string: None, }, ], } diff --git a/src/tests/repository.rs b/src/tests/repository.rs index 4238311..b7a2ad0 100644 --- a/src/tests/repository.rs +++ b/src/tests/repository.rs @@ -23,6 +23,7 @@ async fn test_get_indexer() { target_url: Some("https://example.com".to_string()), // TODO: Mock webhook and test its behavior table_name: None, status_server_port: None, + custom_connection_string: None, }) .await .unwrap(); @@ -54,6 +55,7 @@ async fn test_insert_indexer() { target_url: Some("https://example.com".to_string()), table_name: None, status_server_port: None, + custom_connection_string: None, }) .await .unwrap(); @@ -82,6 +84,7 @@ async fn test_update_status() { target_url: Some("https://example.com".to_string()), table_name: None, status_server_port: None, + custom_connection_string: None, }) .await .unwrap(); @@ -109,6 +112,7 @@ async fn test_update_status_and_process_id() { target_url: Some("https://example.com".to_string()), table_name: None, status_server_port: None, + custom_connection_string: None, }) .await .unwrap(); @@ -144,6 +148,7 @@ async fn test_get_all_indexers() { target_url: Some("https://example.com".to_string()), table_name: None, status_server_port: None, + custom_connection_string: None, }) .await .unwrap(); @@ -158,6 +163,7 @@ async fn test_get_all_indexers() { target_url: Some("https://example.com".to_string()), table_name: None, status_server_port: None, + custom_connection_string: None, }) .await .unwrap();