Skip to content

Commit

Permalink
process and receive next batch in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
philsippl committed Aug 18, 2024
1 parent 4338e99 commit 0255b49
Showing 1 changed file with 37 additions and 30 deletions.
67 changes: 37 additions & 30 deletions iris-mpc/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn receive_batch(
client: &Client,
queue_url: &String,
store: &Store,
skip_request_ids: Vec<String>,
skip_request_ids: &[String],
) -> eyre::Result<BatchQuery> {
let mut batch_query = BatchQuery::default();

Expand Down Expand Up @@ -574,36 +574,35 @@ async fn server_main(config: Config) -> eyre::Result<()> {

// Main loop
let res: eyre::Result<()> = async {
loop {
// **Tensor format of queries**
//
// The functions `receive_batch` and `prepare_query_shares` will prepare the
// _query_ variables as `Vec<Vec<u8>>` formatted as follows:
//
// - The inner Vec is a flattening of these dimensions (inner to outer):
// - One u8 limb of one iris bit.
// - One code: 12800 coefficients.
// - One query: all rotated variants of a code.
// - One batch: many queries.
// - The outer Vec is the dimension of the Galois Ring (2):
// - A decomposition of each iris bit into two u8 limbs.
// **Tensor format of queries**
//
// The functions `receive_batch` and `prepare_query_shares` will prepare the
// _query_ variables as `Vec<Vec<u8>>` formatted as follows:
//
// - The inner Vec is a flattening of these dimensions (inner to outer):
// - One u8 limb of one iris bit.
// - One code: 12800 coefficients.
// - One query: all rotated variants of a code.
// - One batch: many queries.
// - The outer Vec is the dimension of the Galois Ring (2):
// - A decomposition of each iris bit into two u8 limbs.

// This batch can consist of N sets of iris_share + mask
// It also includes a vector of request ids, mapping to the sets above
// Skip requests based on the startup sync, only in the first iteration.
let skip_request_ids = mem::take(&mut skip_request_ids);
let mut next_batch = receive_batch(
party_id,
&sqs_client,
&config.requests_queue_url,
&store,
&skip_request_ids,
);

loop {
let now = Instant::now();

// Skip requests based on the startup sync, only in the first iteration.
let skip_request_ids = mem::take(&mut skip_request_ids);

// This batch can consist of N sets of iris_share + mask
// It also includes a vector of request ids, mapping to the sets above
let batch = receive_batch(
party_id,
&sqs_client,
&config.requests_queue_url,
&store,
skip_request_ids,
)
.await
.context("while receiving batches from SQS")?;
let batch = next_batch.await?;

// Iterate over a list of tracing payloads, and create logs with mappings to
// payloads Log at least a "start" event using a log with trace.id and
Expand All @@ -621,10 +620,18 @@ async fn server_main(config: Config) -> eyre::Result<()> {
tracing::info!("Received batch in {:?}", now.elapsed());
background_tasks.check_tasks();

let result_future = handle.submit_batch_query(batch).await;
let result_future = handle.submit_batch_query(batch);

next_batch = receive_batch(
party_id,
&sqs_client,
&config.requests_queue_url,
&store,
&skip_request_ids,
);

// await the result
let result = timeout(processing_timeout, result_future)
let result = timeout(processing_timeout, result_future.await)
.await
.map_err(|e| eyre!("ServerActor processing timeout: {:?}", e))?;

Expand Down

0 comments on commit 0255b49

Please sign in to comment.