Skip to content

Commit

Permalink
adds fix to manifest issue
Browse files Browse the repository at this point in the history
  • Loading branch information
austbot committed Nov 23, 2022
1 parent 410220a commit edab696
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 36 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions plerkle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle"
description = "Geyser plugin with dynamic config reloading, message bus agnostic abstractions and a whole lot of fun."
version = "0.5.3"
version = "0.5.4"
authors = ["Metaplex Developers <dev@metaplex.com>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down Expand Up @@ -33,9 +33,9 @@ cadence-macros = "0.29.0"
chrono = "0.4.19"
tracing = "0.1.35"
hex = "0.4.3"
plerkle_messenger = { path = "../plerkle_messenger", version = "0.5.3", features = ["redis"] }
plerkle_messenger = { path = "../plerkle_messenger", version = "0.5.4", features = ["redis"] }
flatbuffers = "2.1.2"
plerkle_serialization = { path = "../plerkle_serialization", version = "0.5.3" }
plerkle_serialization = { path = "../plerkle_serialization", version = "0.5.4" }
tokio = { version = "1.17.0", features = ["full"] }
figment = { version = "0.10.6", features = ["env", "test"] }

Expand Down
2 changes: 1 addition & 1 deletion plerkle_messenger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle_messenger"
description = "Metaplex Messenger trait for Geyser plugin producer/consumer patterns."
version = "0.5.3"
version = "0.5.4"
authors = ["Metaplex Developers <dev@metaplex.com>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down
60 changes: 32 additions & 28 deletions plerkle_messenger/src/redis_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ pub struct RedisMessengerStream {
const REDIS_CON_STR: &str = "redis_connection_str";

impl RedisMessenger {
async fn xautoclaim(&mut self, stream_key: &'static str) -> Result<StreamRangeReply, MessengerError> {
async fn xautoclaim(
&mut self,
stream_key: &'static str,
) -> Result<StreamRangeReply, MessengerError> {
let mut id = "0-0".to_owned();
// We need to call `XAUTOCLAIM` repeatedly because it will (according to the docs)
// only look at up to 10 * `count` PEL entries each time, and `id` is used to
Expand All @@ -60,7 +63,7 @@ impl RedisMessenger {
.arg(id.as_str())
// For now, we're only looking for one message.
.arg("COUNT")
.arg(self.batch_size);
.arg(self.batch_size / 10);

// Before Redis 7 (we're using 6.2.x presently), `XAUTOCLAIM` returns an array of
// two items: an id to be used for the next call to continue scanning the PEL,
Expand Down Expand Up @@ -145,11 +148,13 @@ impl Messenger for RedisMessenger {
// specify any particular consumer_id.
.unwrap_or(String::from("ingester"));

let retries = config.get("retries")
let retries = config
.get("retries")
.and_then(|r| r.clone().to_u128().map(|n| n as usize))
.unwrap_or(DEFAULT_RETRIES);

let batch_size = config.get("batch_size")
let batch_size = config
.get("batch_size")
.and_then(|r| r.clone().to_u128().map(|n| n as usize))
.unwrap_or(DEFAULT_MSG_BATCH_SIZE);

Expand All @@ -159,7 +164,7 @@ impl Messenger for RedisMessenger {
stream_read_reply: StreamReadReply::default(),
consumer_id,
retries,
batch_size
batch_size,
})
}

Expand Down Expand Up @@ -244,30 +249,29 @@ impl Messenger for RedisMessenger {
ids: xauto_reply.ids,
}],
};
} else {
let opts = StreamReadOptions::default()
// Wait for up to 2 sec for a message. We're no longer blocking indefinitely
// here to avoid situations where we might be blocked on `XREAD` while pending
// messages accumulate that can be claimed.
.block(IDLE_TIMEOUT)
.count(self.batch_size) // Get one item.
.group(GROUP_NAME, self.consumer_id.as_str());

// Read on stream key and save the reply. Log but do not return errors.
self.stream_read_reply = match self
.connection
.as_mut()
.unwrap()
.xread_options(&[stream_key], &[">"], &opts)
.await
{
Ok(reply) => reply,
Err(e) => {
error!("Redis receive error: {e}");
return Err(MessengerError::ReceiveError { msg: e.to_string() });
}
};
}
let opts = StreamReadOptions::default()
// Wait for up to 2 sec for a message. We're no longer blocking indefinitely
// here to avoid situations where we might be blocked on `XREAD` while pending
// messages accumulate that can be claimed.
.block(IDLE_TIMEOUT)
.count(self.batch_size) // Get one item.
.group(GROUP_NAME, self.consumer_id.as_str());

// Read on stream key and save the reply. Log but do not return errors.
self.stream_read_reply = match self
.connection
.as_mut()
.unwrap()
.xread_options(&[stream_key], &[">"], &opts)
.await
{
Ok(reply) => reply,
Err(e) => {
error!("Redis receive error: {e}");
return Err(MessengerError::ReceiveError { msg: e.to_string() });
}
};

// Data vec that will be returned with parsed data from stream read reply. Since
// we're only waiting for up to 2 seconds for `XREAD` to return, we may end up
Expand Down
2 changes: 1 addition & 1 deletion plerkle_serialization/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle_serialization"
description = "Metaplex Flatbuffers Plerkle Serialization for Geyser plugin producer/consumer patterns."
version = "0.5.3"
version = "0.5.4"
authors = ["Metaplex Developers <dev@metaplex.com>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down

0 comments on commit edab696

Please sign in to comment.