diff --git a/Cargo.lock b/Cargo.lock index b277a02..6a181f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,6 +53,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59d2a3357dde987206219e78ecfbbb6e8dad06cbb65292758d3270e6254f7355" + [[package]] name = "arc-swap" version = "1.6.0" @@ -165,6 +171,28 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "async-task" version = "4.4.1" @@ -1845,6 +1873,18 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "iana-time-zone" version = "0.1.57" @@ -1911,6 +1951,7 @@ dependencies = [ "hyper", "mime", "mpart-async", + "prost", "rstest", "rustls 0.20.9", "rustls-native-certs", @@ -1924,6 +1965,7 @@ dependencies = [ "tokio", "tokio-postgres", "tokio-postgres-rustls", + "tonic", "tower", "tower-http", "tracing", @@ -1983,6 +2025,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -2421,6 +2472,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "quote" version = "1.0.33" @@ -3097,6 +3171,16 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -3242,6 +3326,33 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -3250,9 +3361,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util 0.7.8", "tower-layer", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 5a558cc..1a185dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,8 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } uuid = { version = "1.4", features = ["fast-rng", "v4", "serde"] } value-bag = "1.4.1" +tonic = "0.10.2" +prost = "0.12.3" [dev-dependencies] tower = { version = "0.4", features = ["util"] } diff --git a/src/domain/models/indexer.rs b/src/domain/models/indexer.rs index 71a5945..d165c0e 100644 --- a/src/domain/models/indexer.rs +++ b/src/domain/models/indexer.rs @@ -12,6 +12,7 @@ use strum_macros::{Display, EnumString}; use uuid::Uuid; use crate::domain::models::types::AxumErrorResponse; +use crate::grpc::apibara_sink_v1::GetStatusResponse; use crate::infra::errors::InfraError; #[derive(Clone, Default, Debug, PartialEq, EnumString, Serialize, Deserialize, Display, Copy)] @@ -42,6 +43,28 @@ pub struct IndexerModel { pub status_server_port: Option, } +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +pub struct IndexerServerStatus { + pub status: i32, + pub starting_block: Option, + pub current_block: Option, + pub head_block: Option, + #[serde(rename = "reason")] + pub reason_: Option, +} + +impl From for IndexerServerStatus { + fn from(value: GetStatusResponse) -> Self { + Self { + status: value.status, + starting_block: value.starting_block, + current_block: value.current_block, + head_block: value.head_block, + reason_: value.reason, + } + } +} + #[derive(Debug, thiserror::Error)] pub enum IndexerError { #[error("internal server error: {0}")] @@ -78,6 +101,12 @@ pub enum IndexerError { InvalidIndexerType(String), #[error("failed to serialize {0}")] FailedToSerialize(String), + #[error("indexer status server port not found")] + IndexerStatusServerPortNotFound, + #[error("failed to connect to gRPC server")] + FailedToConnectGRPC(tonic::transport::Error), + #[error("gRPC request failed")] + GRPCRequestFailed(tonic::Status), } impl From for IndexerError { diff --git a/src/grpc/apibara_sink_v1.rs b/src/grpc/apibara_sink_v1.rs new file mode 100644 index 0000000..f613b51 --- /dev/null +++ b/src/grpc/apibara_sink_v1.rs @@ -0,0 +1,302 @@ +/// Request for the `GetStatus` method. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetStatusRequest {} +/// Response for the `GetStatus` method. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetStatusResponse { + /// The status of the sink. + #[prost(enumeration = "SinkStatus", tag = "1")] + pub status: i32, + /// The starting block. + #[prost(uint64, optional, tag = "2")] + pub starting_block: ::core::option::Option, + /// The current block. + #[prost(uint64, optional, tag = "3")] + pub current_block: ::core::option::Option, + /// The current head of the chain. + #[prost(uint64, optional, tag = "4")] + pub head_block: ::core::option::Option, + /// The reason why the sink is not running. + #[prost(string, optional, tag = "5")] + pub reason: ::core::option::Option<::prost::alloc::string::String>, +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum SinkStatus { + Unknown = 0, + /// The sink is running. + Running = 1, + /// The sink has errored. + Errored = 2, +} +impl SinkStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + SinkStatus::Unknown => "SINK_STATUS_UNKNOWN", + SinkStatus::Running => "SINK_STATUS_RUNNING", + SinkStatus::Errored => "SINK_STATUS_ERRORED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SINK_STATUS_UNKNOWN" => Some(Self::Unknown), + "SINK_STATUS_RUNNING" => Some(Self::Running), + "SINK_STATUS_ERRORED" => Some(Self::Errored), + _ => None, + } + } +} +/// Generated client implementations. +pub mod status_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; + use tonic::codegen::*; + #[derive(Debug, Clone)] + pub struct StatusClient { + inner: tonic::client::Grpc, + } + impl StatusClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl StatusClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor(inner: T, interceptor: F) -> StatusClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response<>::ResponseBody>, + >, + >>::Error: Into + Send + Sync, + { + StatusClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// Get Sink status. + pub async fn get_status( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/apibara.sink.v1.Status/GetStatus"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("apibara.sink.v1.Status", "GetStatus")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod status_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with + /// StatusServer. + #[async_trait] + pub trait Status: Send + Sync + 'static { + /// Get Sink status. + async fn get_status( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct StatusServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl StatusServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for StatusServer + where + T: Status, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/apibara.sink.v1.Status/GetStatus" => { + #[allow(non_camel_case_types)] + struct GetStatusSvc(pub Arc); + impl tonic::server::UnaryService for GetStatusSvc { + type Response = super::GetStatusResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).get_status(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = GetStatusSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), + } + } + } + impl Clone for StatusServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for StatusServer { + const NAME: &'static str = "apibara.sink.v1.Status"; + } +} diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs new file mode 100644 index 0000000..c2219d1 --- /dev/null +++ b/src/grpc/mod.rs @@ -0,0 +1 @@ +pub mod apibara_sink_v1; diff --git a/src/handlers/indexers/create_indexer.rs b/src/handlers/indexers/create_indexer.rs index 9ffbbd3..ab793a5 100644 --- a/src/handlers/indexers/create_indexer.rs +++ b/src/handlers/indexers/create_indexer.rs @@ -49,7 +49,7 @@ impl CreateIndexerRequest { true } - /// Set a random available port for the GRPC status server + /// Set a random available port for the gRPC status server fn set_random_port(&mut self) { // Bind to a random port if let Ok(listener) = TcpListener::bind("localhost:0") { diff --git a/src/handlers/indexers/get_indexer.rs b/src/handlers/indexers/get_indexer.rs index f3734f0..232459f 100644 --- a/src/handlers/indexers/get_indexer.rs +++ b/src/handlers/indexers/get_indexer.rs @@ -2,7 +2,9 @@ use axum::extract::State; use axum::Json; use uuid::Uuid; -use crate::domain::models::indexer::{IndexerError, IndexerModel}; +use crate::domain::models::indexer::{IndexerError, IndexerModel, IndexerServerStatus}; +use crate::grpc::apibara_sink_v1::status_client::StatusClient; +use crate::grpc::apibara_sink_v1::GetStatusRequest; use crate::infra::repositories::indexer_repository::{IndexerRepository, Repository}; use crate::utils::PathExtractor; use crate::AppState; @@ -20,9 +22,24 @@ pub async fn get_indexer( pub async fn get_indexer_status( State(state): State, PathExtractor(id): PathExtractor, -) -> Result, IndexerError> { +) -> Result, IndexerError> { let repository = IndexerRepository::new(&state.pool); let indexer_model = repository.get(id).await.map_err(IndexerError::InfraError)?; - Ok(Json(indexer_model)) + let server_port = indexer_model.status_server_port.ok_or(IndexerError::IndexerStatusServerPortNotFound)?; + + // Create a gRPC client + let endpoint = format!("http://localhost:{}", server_port); + let mut client = StatusClient::connect(endpoint).await.map_err(IndexerError::FailedToConnectGRPC)?; + + // Create a GetStatusRequest + let request = tonic::Request::new(GetStatusRequest {}); + + // Fetch the status + let response = client.get_status(request).await.map_err(IndexerError::GRPCRequestFailed)?; + + // Process the response + let status_response = response.into_inner(); + + Ok(Json(status_response.into())) } diff --git a/src/main.rs b/src/main.rs index 480a3c4..da94b9c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,9 @@ use crate::errors::internal_error; use crate::handlers::indexers::start_indexer::start_all_indexers; use crate::routes::app_router; +/// gRPC clients +mod grpc; + /// Configuration of the service (AWS, DB, etc) mod config; /// Constants used accross the service