Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify gateway HTTP server code using typed parameters #3293

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/subspace-gateway/src/commands/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Gateway http command.
//! This command start an HTTP server to serve object requests.
//! This command starts an HTTP server to serve object requests.

pub(crate) mod server;

Expand All @@ -22,7 +22,7 @@ pub(crate) struct HttpCommandOptions {
http_listen_on: String,
}

/// Runs an HTTP server
/// Runs an HTTP server which fetches DSN objects based on object hashes.
pub async fn run(run_options: HttpCommandOptions) -> anyhow::Result<()> {
let signal = shutdown_signal();

Expand Down
64 changes: 28 additions & 36 deletions crates/subspace-gateway/src/commands/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! HTTP server which fetches objects from the DSN based on a hash, using a mapping indexer service.

use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use serde::{Deserialize, Deserializer, Serialize};
use std::default::Default;
Expand All @@ -9,6 +11,7 @@ use subspace_data_retrieval::object_fetcher::ObjectFetcher;
use subspace_data_retrieval::piece_getter::PieceGetter;
use tracing::{debug, error, trace};

/// Parameters for the DSN object HTTP server.
pub(crate) struct ServerParameters<PG>
where
PG: PieceGetter + Send + Sync + 'static,
Expand All @@ -18,6 +21,7 @@ where
pub(crate) http_endpoint: String,
}

/// Object mapping format from the indexer service.
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
struct ObjectMapping {
Expand All @@ -28,6 +32,7 @@ struct ObjectMapping {
block_number: BlockNumber,
}

/// Utility function to deserialize a JSON string into a u32.
fn string_to_u32<'de, D>(deserializer: D) -> Result<u32, D::Error>
where
D: Deserializer<'de>,
Expand All @@ -36,68 +41,52 @@ where
s.parse::<u32>().map_err(serde::de::Error::custom)
}

async fn request_object_mappings(endpoint: String, key: String) -> anyhow::Result<ObjectMapping> {
/// Requests an object mapping with `hash` from the indexer service.
async fn request_object_mapping(endpoint: &str, hash: Blake3Hash) -> anyhow::Result<ObjectMapping> {
let client = reqwest::Client::new();
let object_mappings_url = format!("http://{}/objects/{}", endpoint, key,);
let object_mappings_url = format!("http://{}/objects/{}", endpoint, hex::encode(hash));

debug!(?key, ?object_mappings_url, "Requesting object mapping...");
debug!(?hash, ?object_mappings_url, "Requesting object mapping...");

let response = client
.get(object_mappings_url.clone())
.get(&object_mappings_url)
.send()
.await?
.json::<ObjectMapping>()
.await;
match &response {
Ok(json) => {
trace!(?key, ?json, "Requested object mapping.");
trace!(?hash, ?json, "Received object mapping");
}
Err(err) => {
error!(?key, ?err, ?object_mappings_url, "Request failed");
error!(?hash, ?err, ?object_mappings_url, "Request failed");
}
}

response.map_err(|err| err.into())
}

/// Fetches a DSN object with `hash`, using the mapping indexer service.
async fn serve_object<PG>(
key: web::Path<String>,
hash: web::Path<Blake3Hash>,
additional_data: web::Data<Arc<ServerParameters<PG>>>,
) -> impl Responder
where
PG: PieceGetter + Send + Sync + 'static,
{
let server_params = additional_data.into_inner();
let key = key.into_inner();

// Validate object hash
let decode_result = hex::decode(key.clone());
let object_hash = match decode_result {
Ok(hash) => {
if hash.len() != Blake3Hash::SIZE {
error!(?key, ?hash, "Invalid hash provided.");
return HttpResponse::BadRequest().finish();
}

Blake3Hash::try_from(hash.as_slice()).expect("Hash size was confirmed.")
}
Err(err) => {
error!(?key, ?err, "Invalid hash provided.");
return HttpResponse::BadRequest().finish();
}
};
let hash = hash.into_inner();

let Ok(object_mapping) =
request_object_mappings(server_params.indexer_endpoint.clone(), key.clone()).await
let Ok(object_mapping) = request_object_mapping(&server_params.indexer_endpoint, hash).await
else {
return HttpResponse::BadRequest().finish();
};

if object_mapping.hash != object_hash {
if object_mapping.hash != hash {
error!(
?key,
object_mapping_hash=?object_mapping.hash,
"Requested hash doesn't match object mapping."
?object_mapping,
?hash,
"Returned object mapping doesn't match requested hash"
);
return HttpResponse::ServiceUnavailable().finish();
}
Expand All @@ -109,22 +98,24 @@ where

let object = match object_fetcher_result {
Ok(object) => {
trace!(?key, size=%object.len(), "Object fetched successfully");
trace!(?hash, size = %object.len(), "Object fetched successfully");

let data_hash = blake3_hash(&object);
if data_hash != object_hash {
if data_hash != hash {
error!(
?data_hash,
?object_hash,
"Retrieved data did not match mapping hash"
data_size = %object.len(),
?hash,
"Retrieved data doesn't match requested mapping hash"
);
trace!(data = %hex::encode(object), "Retrieved data");
return HttpResponse::ServiceUnavailable().finish();
}

object
}
Err(err) => {
error!(?key, ?err, "Failed to fetch object.");
error!(?hash, ?err, "Failed to fetch object");
return HttpResponse::ServiceUnavailable().finish();
}
};
Expand All @@ -134,6 +125,7 @@ where
.body(object)
}

/// Starts the DSN object HTTP server.
pub async fn start_server<PG>(server_params: ServerParameters<PG>) -> std::io::Result<()>
where
PG: PieceGetter + Send + Sync + 'static,
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-gateway/src/commands/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Gateway rpc command.
//! This command start an RPC server to serve object requests.
//! This command starts an RPC server to serve object requests from the DSN.
pub(crate) mod server;

use crate::commands::rpc::server::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT};
Expand All @@ -21,7 +21,7 @@ pub(crate) struct RpcCommandOptions {
rpc_options: RpcOptions<RPC_DEFAULT_PORT>,
}

/// Runs an RPC server
/// Runs an RPC server which fetches DSN objects based on mappings.
pub async fn run(run_options: RpcCommandOptions) -> anyhow::Result<()> {
let signal = shutdown_signal();

Expand Down
Loading