Skip to content

Commit

Permalink
Instantiate multiple load balanced senders
Browse files Browse the repository at this point in the history
  • Loading branch information
zbuc committed Nov 10, 2023
1 parent 26b0f50 commit 887e606
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 15 deletions.
11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@ penumbra-keys = { path = "../penumbra/crates/core/keys" }
penumbra-custody = { path = "../penumbra/crates/custody" }
penumbra-wallet = { path = "../penumbra/crates/wallet" }
penumbra-view = { path = "../penumbra/crates/view" }
penumbra-transaction = { path = "../penumbra/crates/core/transaction", features = ["download-proving-keys"] }
penumbra-transaction = { path = "../penumbra/crates/core/transaction", features = [
"download-proving-keys",
] }

# External dependencies
tower = "0.4"
tower = { version = "0.4", features = ["balance"] }
tower-service = { version = "0.3.1" }
anyhow = "1"
futures-util = { version = "0.3", default-features = false, features = [
"alloc",
] }
camino = "1"
directories = "4.0.1"
regex = "1"
pin-project-lite = "0.2.7"
serenity = { version = "0.11", default-features = false, features = [
"client",
"cache",
Expand Down
34 changes: 29 additions & 5 deletions src/opt/serve.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use anyhow::Context;
use clap::Parser;
use directories::ProjectDirs;
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use futures::stream::FuturesUnordered;
use futures_util::{stream::StreamExt, stream::TryStreamExt};
use num_traits::identities::Zero;
use penumbra_asset::Value;
use penumbra_custody::soft_kms::SoftKms;
Expand All @@ -18,13 +19,15 @@ use penumbra_proto::{
use penumbra_view::{ViewClient, ViewService};
use serenity::prelude::GatewayIntents;
use tokio::sync::oneshot;
use tower::limit::concurrency::ConcurrencyLimit;
// use serenity::utils::token;
use std::{env, path::PathBuf, time::Duration};
use tower::{balance as lb, load};
use url::Url;

use crate::{
opt::ChannelIdAndMessageId, responder::RequestQueue, Catchup, Handler, Responder, Sender,
Wallet,
opt::ChannelIdAndMessageId, responder::RequestQueue, sender::SenderSet, Catchup, Handler,
Responder, Sender, Wallet,
};

#[derive(Debug, Clone, Parser)]
Expand All @@ -41,6 +44,9 @@ pub struct Serve {
/// Maximum number of addresses per message to which to dispense tokens.
#[clap(long, default_value = "1")]
max_addresses: usize,
/// Number of accounts to send funds from. Funds will send from account indices [0, n-1].
#[clap(long, default_value = "4")]
account_count: u32,
/// Path to the directory to use to store data [default: platform appdata directory].
#[clap(long, short)]
data_dir: Option<PathBuf>,
Expand Down Expand Up @@ -130,10 +136,28 @@ impl Serve {
// From this point on, the view service is synchronized.
tracing::info!("initial sync complete");

let sender = Sender::new(0, fvk, view, custody);
// Instantiate a sender for each account index.
let mut senders = vec![];
for account_id in 0..self.account_count {
let sender = Sender::new(account_id, fvk.clone(), view.clone(), custody.clone());
senders.push(sender);
}

let d = SenderSet::new(
senders
.into_iter()
.enumerate()
.map(|(k, v)| (k as u32, v))
.collect(),
);
let lb = lb::p2c::Balance::new(load::PendingRequestsDiscover::new(
d,
load::CompleteOnResponse::default(),
));
let service = ConcurrencyLimit::new(lb, self.account_count.try_into().unwrap());

// Make a worker to handle the address queue
let (send_requests, responder) = Responder::new(sender, self.max_addresses, self.values);
let (send_requests, responder) = Responder::new(service, self.max_addresses, self.values);

let handler = Handler::new(self.rate_limit, self.reply_limit);

Expand Down
26 changes: 20 additions & 6 deletions src/responder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use penumbra_view::ViewClient;
use serenity::prelude::TypeMapKey;
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
use tower::balance::p2c::Balance;
use tower::limit::ConcurrencyLimit;
use tower::load::PendingRequestsDiscover;
use tower::Service;
use tower::ServiceExt;
use tracing::Instrument;

use crate::sender::SenderSet;
use crate::Sender;

mod request;
Expand All @@ -33,8 +36,13 @@ where
actions: mpsc::Receiver<Request>,
/// Values to send each time.
values: Vec<Value>,
/// The transaction sender.
sender: ConcurrencyLimit<Sender<V, C>>,
/// The transaction senders.
senders: ConcurrencyLimit<
Balance<
PendingRequestsDiscover<SenderSet<ConcurrencyLimit<Sender<V, C>>>>,
(Address, Vec<Value>),
>,
>,
}

/// `TypeMap` key for the address queue (so that `serenity` worker can send to it).
Expand All @@ -52,15 +60,20 @@ where
{
/// Create a new responder.
pub fn new(
sender: ConcurrencyLimit<Sender<V, C>>,
senders: ConcurrencyLimit<
Balance<
PendingRequestsDiscover<SenderSet<ConcurrencyLimit<Sender<V, C>>>>,
(Address, Vec<Value>),
>,
>,
max_addresses: usize,
values: Vec<Value>,
) -> (mpsc::Sender<Request>, Self) {
let (tx, rx) = mpsc::channel(10);
(
tx,
Responder {
sender,
senders,
max_addresses,
actions: rx,
values,
Expand Down Expand Up @@ -113,9 +126,10 @@ where
tracing::info!("processing send request, waiting for readiness");
});
let rsp = self
.sender
.senders
.ready()
.await?
.await
.map_err(|e| anyhow::anyhow!("{e}"))?
.call((*addr, self.values.clone()))
.instrument(span.clone());
tracing::info!("submitted send request");
Expand Down
41 changes: 39 additions & 2 deletions src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use std::{pin::Pin, task::Poll};
use std::{
pin::Pin,
task::{Context as TaskContext, Poll},
};

use futures::{Future, FutureExt};
use futures_util::Stream;
use penumbra_asset::Value;
use penumbra_custody::{AuthorizeRequest, CustodyClient};
use penumbra_keys::{Address, FullViewingKey};
use penumbra_transaction::memo::MemoPlaintext;
use penumbra_view::ViewClient;
use penumbra_wallet::plan::Planner;
use pin_project_lite::pin_project;
use rand::rngs::OsRng;
use tower::limit::ConcurrencyLimit;
use tower::{discover::Change, limit::ConcurrencyLimit};
use tower_service::Service;

/// The `Sender` maps `(Address, Vec<Value>)` send requests to `[u8; 32]` transaction hashes of sent funds.
#[derive(Clone)]
Expand Down Expand Up @@ -108,3 +114,34 @@ where
Poll::Ready(Ok(()))
}
}

type Key = u32;

pin_project! {
pub struct SenderSet<S> {
services: Vec<(Key, S)>,
}
}

impl<S> SenderSet<S> {
pub fn new(services: Vec<(Key, S)>) -> Self {
Self { services }
}
}

impl<S> Stream for SenderSet<S>
where
S: Service<(Address, Vec<Value>), Response = penumbra_transaction::Id, Error = anyhow::Error>,
{
type Item = Result<Change<Key, S>, anyhow::Error>;

fn poll_next(self: Pin<&mut Self>, _: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
match self.project().services.pop() {
Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))),
None => {
// there may be more later
Poll::Pending
}
}
}
}

0 comments on commit 887e606

Please sign in to comment.