Skip to content

Commit

Permalink
Fetch object batches in RPC and HTTP gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Jan 2, 2025
1 parent 0c0577d commit 6699fa6
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 89 deletions.
9 changes: 2 additions & 7 deletions crates/subspace-gateway-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,8 @@ where
return Err(Error::TooManyMappings { count });
}

let mut objects = Vec::with_capacity(count);
// TODO: fetch concurrently
for mapping in mappings.objects() {
let data = self.object_fetcher.fetch_object(*mapping).await?;

objects.push(data.into());
}
let objects = self.object_fetcher.fetch_objects(mappings).await?;
let objects = objects.into_iter().map(HexData::from).collect();

Ok(objects)
}
Expand Down
78 changes: 50 additions & 28 deletions crates/subspace-gateway/src/commands/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,79 +18,101 @@ where
pub(crate) http_endpoint: String,
}

/// Requests the object mapping with `hash` from the indexer service.
/// Requests the object mappings for `hashes` from the indexer service.
/// Multiple hashes are separated by `+`.
async fn request_object_mapping(
endpoint: &str,
hash: Blake3Hash,
hashes: &Vec<Blake3Hash>,
) -> anyhow::Result<ObjectMappingResponse> {
let client = reqwest::Client::new();
let object_mappings_url = format!("{}/objects/{}", endpoint, hex::encode(hash));
let hash_list = hashes.iter().map(hex::encode).collect::<Vec<_>>();
let object_mappings_url = format!("{}/objects/{}", endpoint, hash_list.join("+"));

debug!(?hash, ?object_mappings_url, "Requesting object mapping...");
debug!(
?hashes,
?object_mappings_url,
"Requesting object mappings..."
);

let response = client.get(&object_mappings_url).send().await?.json().await;

match &response {
Ok(json) => {
trace!(?hash, ?json, "Received object mapping");
trace!(?hashes, ?json, "Received object mappings");
}
Err(err) => {
error!(?hash, ?err, ?object_mappings_url, "Request failed");
error!(?hashes, ?err, ?object_mappings_url, "Request failed");
}
}

response.map_err(|err| err.into())
}

/// Fetches a DSN object with `hash`, using the mapping indexer service.
/// Fetches the DSN objects with `hashes`, using the mapping indexer service.
/// Multiple hashes are separated by `+`.
async fn serve_object<PG>(
hash: web::Path<Blake3Hash>,
hashes: web::Path<String>,
additional_data: web::Data<Arc<ServerParameters<PG>>>,
) -> impl Responder
where
PG: PieceGetter + Send + Sync + 'static,
{
let server_params = additional_data.into_inner();
let hash = hash.into_inner();
let hashes = hashes.into_inner();
let hashes = hashes
.split('+')
.map(|s| {
let mut hash = Blake3Hash::default();
hex::decode_to_slice(s, hash.as_mut()).map(|()| hash)
})
.try_collect::<Vec<_>>();

let Ok(object_mapping) = request_object_mapping(&server_params.indexer_endpoint, hash).await
else {
let Ok(hashes) = hashes else {
return HttpResponse::BadRequest().finish();
};

// TODO: fetch multiple objects
let Some(&object_mapping) = object_mapping.objects.objects().first() else {
let Ok(object_mappings) =
request_object_mapping(&server_params.indexer_endpoint, &hashes).await
else {
return HttpResponse::BadRequest().finish();
};

if object_mapping.hash != hash {
error!(
?object_mapping,
?hash,
"Returned object mapping doesn't match requested hash"
);
return HttpResponse::ServiceUnavailable().finish();
for object_mapping in object_mappings.objects.objects() {
if !hashes.contains(&object_mapping.hash) {
error!(
?object_mapping,
?hashes,
"Returned object mapping wasn't in requested hashes"
);
return HttpResponse::ServiceUnavailable().finish();
}
}

let object_fetcher_result = server_params
.object_fetcher
.fetch_object(object_mapping)
.fetch_objects(object_mappings.objects)
.await;

let object = match object_fetcher_result {
Ok(object) => {
trace!(?hash, size = %object.len(), "Object fetched successfully");
object
let objects = match object_fetcher_result {
Ok(objects) => {
trace!(
?hashes,
count = %objects.len(),
sizes = ?objects.iter().map(|object| object.len()),
"Objects fetched successfully"
);
objects
}
Err(err) => {
error!(?hash, ?err, "Failed to fetch object");
error!(?hashes, ?err, "Failed to fetch objects");
return HttpResponse::ServiceUnavailable().finish();
}
};

// TODO: return a multi-part response, with one part per object
HttpResponse::Ok()
.content_type("application/octet-stream")
.body(object)
.body(objects.concat())
}

/// Starts the DSN object HTTP server.
Expand All @@ -103,7 +125,7 @@ where
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(server_params.clone()))
.route("/data/{hash}", web::get().to(serve_object::<PG>))
.route("/data/{hashes}", web::get().to(serve_object::<PG>))
})
.bind(http_endpoint)?
.run()
Expand Down
2 changes: 2 additions & 0 deletions crates/subspace-gateway/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Subspace gateway implementation.
#![feature(iterator_try_collect)]

mod commands;
mod node_client;
mod piece_getter;
Expand Down
118 changes: 64 additions & 54 deletions shared/subspace-data-retrieval/src/object_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
use std::sync::Arc;
use subspace_archiving::archiver::{Segment, SegmentItem};
use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash};
use subspace_core_primitives::objects::GlobalObject;
use subspace_core_primitives::objects::{GlobalObject, GlobalObjectMapping};
use subspace_core_primitives::pieces::{Piece, PieceIndex, RawRecord};
use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentIndex};
use subspace_erasure_coding::ErasureCoding;
Expand Down Expand Up @@ -193,74 +193,84 @@ where
}
}

/// Assemble the object in `mapping` by fetching necessary pieces using the piece getter, and
/// putting the object's bytes together.
/// Assemble the objects in `mapping` by fetching necessary pieces using the piece getter, and
/// putting the objects' bytes together.
///
/// Checks the object's hash to make sure the correct bytes are returned.
pub async fn fetch_object(&self, mapping: GlobalObject) -> Result<Vec<u8>, Error> {
let GlobalObject {
hash,
piece_index,
offset,
} = mapping;
/// Checks the objects' hashes to make sure the correct bytes are returned.
pub async fn fetch_objects(
&self,
mappings: GlobalObjectMapping,
) -> Result<Vec<Vec<u8>>, Error> {
let mut objects = Vec::with_capacity(mappings.objects().len());

// TODO: sort mappings in piece index order, and keep pieces until they're no longer needed
for &mapping in mappings.objects() {
let GlobalObject {
hash,
piece_index,
offset,
} = mapping;

// Validate parameters
if !piece_index.is_source() {
debug!(
?mapping,
"Invalid piece index for object: must be a source piece",
);

// Validate parameters
if !piece_index.is_source() {
debug!(
?mapping,
"Invalid piece index for object: must be a source piece",
);
// Parity pieces contain effectively random data, and can't be used to fetch objects
return Err(Error::NotSourcePiece { mapping });
}

// Parity pieces contain effectively random data, and can't be used to fetch objects
return Err(Error::NotSourcePiece { mapping });
}
if offset >= RawRecord::SIZE as u32 {
debug!(
?mapping,
RawRecord_SIZE = RawRecord::SIZE,
"Invalid piece offset for object: must be less than the size of a raw record",
);

if offset >= RawRecord::SIZE as u32 {
debug!(
?mapping,
RawRecord_SIZE = RawRecord::SIZE,
"Invalid piece offset for object: must be less than the size of a raw record",
);
return Err(Error::PieceOffsetTooLarge { mapping });
}

return Err(Error::PieceOffsetTooLarge { mapping });
}
// Try fast object assembling from individual pieces,
// then regular object assembling from segments
let data = match self.fetch_object_fast(mapping).await? {
Some(data) => data,
None => {
let data = self.fetch_object_regular(mapping).await?;

debug!(
?mapping,
len = %data.len(),
"Fetched object using regular object assembling",

// Try fast object assembling from individual pieces,
// then regular object assembling from segments
let data = match self.fetch_object_fast(mapping).await? {
Some(data) => data,
None => {
let data = self.fetch_object_regular(mapping).await?;
);

data
}
};

let data_hash = blake3_hash(&data);
if data_hash != hash {
debug!(
?data_hash,
data_size = %data.len(),
?mapping,
len = %data.len(),
"Fetched object using regular object assembling",

"Retrieved data doesn't match requested mapping hash"
);
trace!(data = %hex::encode(&data), "Retrieved data");

data
return Err(Error::InvalidDataHash {
data_hash,
data_size: data.len(),
mapping,
});
}
};

let data_hash = blake3_hash(&data);
if data_hash != hash {
debug!(
?data_hash,
data_size = %data.len(),
?mapping,
"Retrieved data doesn't match requested mapping hash"
);
trace!(data = %hex::encode(&data), "Retrieved data");

return Err(Error::InvalidDataHash {
data_hash,
data_size: data.len(),
mapping,
});
objects.push(data);
}

Ok(data)
Ok(objects)
}

/// Fast object fetching and assembling where the object doesn't cross piece (super fast) or
Expand Down

0 comments on commit 6699fa6

Please sign in to comment.