Skip to content

Commit

Permalink
1.10.* support (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
austbot authored Sep 9, 2022
1 parent 683a316 commit e6c84c1
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 84 deletions.
7 changes: 3 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions plerkle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle"
description = "Geyser plugin with dynamic config reloading, message bus agnostic abstractions and a whole lot of fun."
version = "0.0.5"
version = "0.0.7"
authors = ["Metaplex Developers <dev@metaplex.com>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down Expand Up @@ -32,9 +32,9 @@ chrono = "0.4.19" # did anyone say leftpad ?
solana-runtime = "1.11.3"
tracing = "0.1.35"
hex = "0.4.3"
plerkle_messenger = { path="../plerkle_messenger", version = "0.0.5", features = ["redis"] }
plerkle_messenger = { path="../plerkle_messenger", version = "0.0.7", features = ["redis"] }
flatbuffers = "2.1.2"
plerkle_serialization = { path="../plerkle_serialization", version = "0.0.5" }
plerkle_serialization = { path="../plerkle_serialization", version = "0.0.7" }
tokio = { version = "1.17.0", features = ["full"] }
figment = { version = "0.10.6", features = ["env", "test"] }

Expand Down
162 changes: 90 additions & 72 deletions plerkle/src/geyser_plugin_nft.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use solana_geyser_plugin_interface::geyser_plugin_interface::{ReplicaAccountInfoV2, ReplicaTransactionInfo, ReplicaTransactionInfoV2};
use {
crate::{
accounts_selector::AccountsSelector,
Expand Down Expand Up @@ -258,42 +259,52 @@ impl GeyserPlugin for Plerkle<'static> {
slot: u64,
is_startup: bool,
) -> solana_geyser_plugin_interface::geyser_plugin_interface::Result<()> {
match account {
ReplicaAccountInfoVersions::V0_0_2(account) => {
// Check if account was selected in config.
if let Some(accounts_selector) = &self.accounts_selector {
if !accounts_selector.is_account_selected(account.pubkey, account.owner) {
return Ok(());
}
} else {
return Ok(());
}

// Get runtime and sender channel.
let runtime = self.get_runtime()?;
let sender = self.get_sender_clone()?;

// Serialize data.
let builder = FlatBufferBuilder::new();
let builder = serialize_account(builder, &account, slot, is_startup);
let owner = bs58::encode(account.owner).into_string();
// Send account info over channel.
runtime.spawn(async move {
let data = SerializedData {
stream: ACCOUNT_STREAM,
builder,
};
let _ = sender.send(data).await;
});
safe_metric(|| {
statsd_count!("account_seen_event", 1, "owner" => &owner);
});
let acct: ReplicaAccountInfoV2;
let account = match account {
ReplicaAccountInfoVersions::V0_0_2(a) => a,
ReplicaAccountInfoVersions::V0_0_1(a) => {
acct = ReplicaAccountInfoV2 {
pubkey: a.pubkey,
lamports: a.lamports,
owner: a.owner,
executable: a.executable,
rent_epoch: a.rent_epoch,
data: a.data,
write_version: a.write_version,
txn_signature: None,
};
&acct
}
_ => {
error!("Old Transaction Replica Object")
};

if let Some(accounts_selector) = &self.accounts_selector {
if !accounts_selector.is_account_selected(account.pubkey, account.owner) {
return Ok(());
}
} else {
return Ok(());
}

// Get runtime and sender channel.
let runtime = self.get_runtime()?;
let sender = self.get_sender_clone()?;

// Serialize data.
let builder = FlatBufferBuilder::new();
let builder = serialize_account(builder, &account, slot, is_startup);
let owner = bs58::encode(account.owner).into_string();
// Send account info over channel.
runtime.spawn(async move {
let data = SerializedData {
stream: ACCOUNT_STREAM,
builder,
};
let _ = sender.send(data).await;
});
safe_metric(|| {
statsd_count!("account_seen_event", 1, "owner" => &owner);
});

Ok(())
}

Expand Down Expand Up @@ -338,50 +349,57 @@ impl GeyserPlugin for Plerkle<'static> {
transaction_info: ReplicaTransactionInfoVersions,
slot: u64,
) -> solana_geyser_plugin_interface::geyser_plugin_interface::Result<()> {
match transaction_info {
ReplicaTransactionInfoVersions::V0_0_2(transaction_info) => {
// Don't log votes or transactions with error status.
if transaction_info.is_vote
|| transaction_info.transaction_status_meta.status.is_err()
{
return Ok(());
}
let rep: ReplicaTransactionInfoV2;
let transaction_info = match transaction_info {
ReplicaTransactionInfoVersions::V0_0_2(ti) => ti,
ReplicaTransactionInfoVersions::V0_0_1(ti) => {
rep = ReplicaTransactionInfoV2 {
signature: ti.signature,
is_vote: ti.is_vote,
transaction: ti.transaction,
transaction_status_meta: ti.transaction_status_meta,
index: 0,
};
&rep
}
};

// Check if transaction was selected in config.
if let Some(transaction_selector) = &self.transaction_selector {
if !transaction_selector.is_transaction_selected(
transaction_info.is_vote,
Box::new(transaction_info.transaction.message().account_keys().iter()),
) {
return Ok(());
}
} else {
return Ok(());
}
// Get runtime and sender channel.
let runtime = self.get_runtime()?;
let sender = self.get_sender_clone()?;
if transaction_info.is_vote
|| transaction_info.transaction_status_meta.status.is_err()
{
return Ok(());
}

// Serialize data.
let builder = FlatBufferBuilder::new();
let builder = serialize_transaction(builder, transaction_info, slot);
let slt_idx = format!("{}-{}", slot, transaction_info.index);
// Send transaction info over channel.
runtime.spawn(async move {
let data = SerializedData {
stream: TRANSACTION_STREAM,
builder,
};
let _ = sender.send(data).await;
});
safe_metric(|| {
statsd_count!("transaction_seen_event", 1, "slot-idx" => &slt_idx);
})
}
_ => {
error!("Old Transaction Replica Object")
// Check if transaction was selected in config.
if let Some(transaction_selector) = &self.transaction_selector {
if !transaction_selector.is_transaction_selected(
transaction_info.is_vote,
Box::new(transaction_info.transaction.message().account_keys().iter()),
) {
return Ok(());
}
} else {
return Ok(());
}
// Get runtime and sender channel.
let runtime = self.get_runtime()?;
let sender = self.get_sender_clone()?;

// Serialize data.
let builder = FlatBufferBuilder::new();
let builder = serialize_transaction(builder, transaction_info, slot);
let slt_idx = format!("{}-{}", slot, transaction_info.index);
// Send transaction info over channel.
runtime.spawn(async move {
let data = SerializedData {
stream: TRANSACTION_STREAM,
builder,
};
let _ = sender.send(data).await;
});
safe_metric(|| {
statsd_count!("transaction_seen_event", 1, "slot-idx" => &slt_idx);
});

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions plerkle_messenger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
[package]
name = "plerkle_messenger"
description = "Metaplex Messenger trait for Geyser plugin producer/consumer patterns."
version = "0.0.5"
version = "0.0.7"
authors = ["Metaplex Developers <dev@metaplex.com>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
edition = "2021"
readme = "Readme.md"

[dependencies]
plerkle_serialization = { path = "../plerkle_serialization", version = "0.0.5" }
plerkle_serialization = { path = "../plerkle_serialization", version = "0.0.7" }
redis = { version = "0.21.5", features = ["aio", "tokio-comp", "streams"], optional = true }
metaplex-pulsar = { version = "4.1.1", optional = true }
log = "0.4.11"
Expand Down
1 change: 0 additions & 1 deletion plerkle_messenger/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
extern crate core;
mod redis_messenger;
mod pulsar_messenger;

Expand Down
17 changes: 16 additions & 1 deletion plerkle_messenger/src/plerkle_messenger.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::BTreeMap;
use crate::error::MessengerError;
use async_trait::async_trait;
use figment::value::{Dict, Value};
use serde::Deserialize;

#[cfg(feature = "redis")]
use crate::RedisMessenger;
#[cfg(feature = "pulsar")]
Expand Down Expand Up @@ -42,7 +44,7 @@ pub async fn select_messenger(config: MessengerConfig) -> Result<Box<dyn Messeng
}
}

#[derive(Deserialize, Debug, PartialEq)]
#[derive(Deserialize, Debug, PartialEq, Clone)]
pub enum MessengerType {
Redis,
Pulsar,
Expand All @@ -61,6 +63,19 @@ pub struct MessengerConfig {
pub connection_config: Dict,
}

impl Clone for MessengerConfig {
fn clone(&self) -> Self {
let mut d: BTreeMap<String, Value> = BTreeMap::new();
for (k,i) in self.connection_config.iter() {
d.insert(k.clone(),i.clone());
}
MessengerConfig {
messenger_type: self.messenger_type.clone(),
connection_config: d
}
}
}

impl MessengerConfig {
pub fn get(&self, key: &str) -> Option<&Value> {
self.connection_config.get(key)
Expand Down
2 changes: 1 addition & 1 deletion plerkle_serialization/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "plerkle_serialization"
description = "Metaplex Flatbuffers Plerkle Serialization for Geyser plugin producer/consumer patterns."
version = "0.0.5"
version = "0.0.7"
authors = ["Metaplex Developers <dev@metaplex.com>"]
repository = "https://github.com/metaplex-foundation/digital-asset-validator-plugin"
license = "AGPL-3.0"
Expand Down
1 change: 1 addition & 0 deletions plerkle_serialization/src/transaction_info_generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod transaction_info {

use core::mem;
use core::cmp::Ordering;
use flatbuffers::VerifierOptions;

extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
Expand Down

0 comments on commit e6c84c1

Please sign in to comment.