Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(notifications): add load test psudo-code #4372

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions bats/core/notifications/notifications.bats
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ setup_file() {
create_user 'alice'
}

NOTIFICATIONS_GRPC_ENDPOINT="localhost:6685"
IMPORT_PATH="${REPO_ROOT}/core/notifications/proto"
NOTIFICATIONS_PROTO_FILE="${REPO_ROOT}/core/notifications/proto/notifications.proto"

@test "notifications: list stateful transactions" {
btc_wallet_name="alice.btc_wallet_id"
amount="0.01"
Expand Down Expand Up @@ -79,3 +83,41 @@ setup_file() {
acknowledged_at=$(graphql_output '.data.statefulNotificationAcknowledge.notification.acknowledgedAt')
[[ "$acknowledged_at" != "null" ]] || exit 1
}

@test "notifications: load test" {
update_user_locale_method="services.notifications.v1.NotificationsService/UpdateUserLocale"

declare -a user_ids

for i in $(seq 1 10000); do
request_data=$(jq -n --arg userId "$i" --arg locale "es" '{
"userId": $userId,
"locale": $locale
}')
grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$update_user_locale_method" "$request_data"

done

# localized_content_en='{"title": "Hello", "body": "World"}'
# localized_content_es='{"title": "Hola", "body": "World"}'
# user_ids=$(printf '%s\n' "${user_ids[@]}" | jq -R . | jq -s .)
# request_data=$(jq -n \
# --argjson user_ids "$user_ids" \
# --argjson localized_content_en "$localized_content_en" \
# --argjson localized_content_es "$localized_content_es" \
# '{
# "event": {
# "marketingNotificationTriggered": {
# "user_ids": $user_ids,
# "localized_push_content": {
# "en": $localized_content_en,
# "es": $localized_content_es
# }
# }
# }
# }')

# handle_notification_event_method="services.notifications.v1.NotificationsService/HandleNotificationEvent"
# grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$handle_notification_event_method" "$request_data"

}
4 changes: 3 additions & 1 deletion core/notifications/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,12 @@ impl NotificationsApp {
marketing_notification: MarketingNotificationTriggered,
) -> Result<(), ApplicationError> {
let mut tx = self.pool.begin().await?;
let mut user_ids: Vec<GaloyUserId> = user_ids.into_iter().collect();
user_ids.sort();
job::spawn_multi_user_event_dispatch(
&mut tx,
(
user_ids.into_iter().collect(),
user_ids,
NotificationEventPayload::from(marketing_notification),
),
)
Expand Down
56 changes: 8 additions & 48 deletions core/notifications/src/job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod config;
mod multi_user_event_dispatch;
mod send_email_notification;
mod send_push_notification;

Expand All @@ -20,6 +21,7 @@ use crate::{

pub use config::*;
use error::JobError;
use multi_user_event_dispatch::MultiUserEventDispatchData;
use send_email_notification::SendEmailNotificationData;
use send_push_notification::SendPushNotificationData;

Expand Down Expand Up @@ -187,41 +189,13 @@ async fn multi_user_event_dispatch(
) -> Result<(), JobError> {
let pool = current_job.pool().clone();
JobExecutor::builder(&mut current_job)
.initial_retry_delay(std::time::Duration::from_secs(20))
.build()
.expect("couldn't build JobExecutor")
.execute(|data| async move {
let data: MultiUserEventDispatchData =
data.expect("no MultiUserEventDispatchData available");
let batch_limit = 1000;
let (ids, next_user_ids) = data
.user_ids
.split_at(std::cmp::min(data.user_ids.len(), batch_limit));

let mut tx = pool.begin().await?;
if !next_user_ids.is_empty() {
let data = MultiUserEventDispatchData {
user_ids: next_user_ids.to_vec(),
payload: data.payload.clone(),
tracing_data: HashMap::default(),
};
spawn_multi_user_event_dispatch(&mut tx, data).await?;
}

let payload = data.payload.clone();

history.add_events(&mut tx, ids, payload.clone()).await?;

for user_id in ids {
if payload.should_send_email() {
spawn_send_email_notification(&mut tx, (user_id.clone(), payload.clone()))
.await?;
}
if payload.should_send_push() {
spawn_send_push_notification(&mut tx, (user_id.clone(), payload.clone()))
.await?;
}
}
Ok::<_, JobError>(JobResult::CompleteWithTx(tx))
multi_user_event_dispatch::execute(data, history, pool).await
})
.await?;
Ok(())
Expand Down Expand Up @@ -252,9 +226,12 @@ pub async fn spawn_send_push_notification(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
data: impl Into<SendPushNotificationData>,
) -> Result<(), JobError> {
println!("spawn_send_push_notification");
let data = data.into();
if let Err(e) = send_push_notification
.builder()
.set_retry_backoff(std::time::Duration::from_secs(20))
.set_ordered(true)
.set_json(&data)
.expect("Couldn't set json")
.spawn(&mut **tx)
Expand Down Expand Up @@ -290,6 +267,7 @@ pub async fn spawn_multi_user_event_dispatch(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
data: impl Into<MultiUserEventDispatchData>,
) -> Result<(), JobError> {
println!("spawn_multi_user_event_dispatch");
let data = data.into();
if let Err(e) = multi_user_event_dispatch
.builder()
Expand Down Expand Up @@ -418,21 +396,3 @@ impl From<()> for LinkEmailReminderData {
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub(super) struct MultiUserEventDispatchData {
user_ids: Vec<GaloyUserId>,
payload: NotificationEventPayload,
#[serde(flatten)]
pub(super) tracing_data: HashMap<String, serde_json::Value>,
}

impl From<(Vec<GaloyUserId>, NotificationEventPayload)> for MultiUserEventDispatchData {
fn from((user_ids, payload): (Vec<GaloyUserId>, NotificationEventPayload)) -> Self {
Self {
user_ids,
payload,
tracing_data: HashMap::default(),
}
}
}
87 changes: 87 additions & 0 deletions core/notifications/src/job/multi_user_event_dispatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use serde::{Deserialize, Serialize};
use tracing::instrument;

use std::collections::HashMap;

use crate::{
history::NotificationHistory, notification_event::NotificationEventPayload,
primitives::GaloyUserId,
};
use job_executor::JobResult;

use super::error::JobError;

#[derive(Debug, Serialize, Deserialize)]
pub(super) struct MultiUserEventDispatchData {
user_ids: Vec<GaloyUserId>,
payload: NotificationEventPayload,
#[serde(flatten)]
pub(super) tracing_data: HashMap<String, serde_json::Value>,
}

impl From<(Vec<GaloyUserId>, NotificationEventPayload)> for MultiUserEventDispatchData {
fn from((user_ids, payload): (Vec<GaloyUserId>, NotificationEventPayload)) -> Self {
Self {
user_ids,
payload,
tracing_data: HashMap::default(),
}
}
}

#[instrument(
name = "job.multi_user_event_dispatch",
skip(history, pool),
fields(first_id, last_id, ids_len, next_ids_len),
err
)]
pub async fn execute(
data: MultiUserEventDispatchData,
history: NotificationHistory,
pool: sqlx::PgPool,
) -> Result<JobResult, JobError> {
let batch_limit = 10;
let (ids, next_user_ids) = data
.user_ids
.split_at(std::cmp::min(data.user_ids.len(), batch_limit));
let span = tracing::Span::current();
if ids.len() > 0 {
span.record("first_id", &tracing::field::display(&ids[0]));
span.record("last_id", &tracing::field::display(&ids[ids.len() - 1]));
span.record("ids_len", &tracing::field::display(ids.len()));
span.record(
"next_ids_len",
&tracing::field::display(next_user_ids.len()),
);
}
println!(
"multi_user_event_dispatch: {}, {}",
ids[0],
ids[ids.len() - 1]
);
let mut tx = pool.begin().await?;
if !next_user_ids.is_empty() {
let data = MultiUserEventDispatchData {
user_ids: next_user_ids.to_vec(),
payload: data.payload.clone(),
tracing_data: HashMap::default(),
};
super::spawn_multi_user_event_dispatch(&mut tx, data).await?;
}

let payload = data.payload.clone();

history.add_events(&mut tx, ids, payload.clone()).await?;

for user_id in ids {
if payload.should_send_email() {
super::spawn_send_email_notification(&mut tx, (user_id.clone(), payload.clone()))
.await?;
}
if payload.should_send_push() {
super::spawn_send_push_notification(&mut tx, (user_id.clone(), payload.clone()))
.await?;
}
}
Ok::<_, JobError>(JobResult::CompleteWithTx(tx))
}
2 changes: 1 addition & 1 deletion core/notifications/src/primitives.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};

#[derive(Hash, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
#[derive(Hash, Ord, PartialOrd, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct GaloyUserId(String);
impl GaloyUserId {
pub fn search_begin() -> Self {
Expand Down
8 changes: 8 additions & 0 deletions dev/config/notifications/notifications-alt.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
app:
push_executor:
fcm:
google_application_credentials_path: "dev/config/notifications/fake_service_account.json"
subgraph_server:
port: 1234
grpc_server:
port: 2345
2 changes: 1 addition & 1 deletion dev/config/notifications/notifications.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
app:
push_executor:
fcm:
google_application_credentials_path: "./config/notifications/fake_service_account.json"
google_application_credentials_path: "dev/config/notifications/fake_service_account.json"
4 changes: 2 additions & 2 deletions lib/job-executor-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ impl<'a> JobExecutor<'a> {
let max_interval = self.max_retry_delay;
let handle = tokio::spawn(async move {
loop {
tokio::time::sleep(interval / 2).await;
interval = max_interval.min(interval * 2);
if let Err(e) = sqlx::query("SELECT mq_keep_alive(ARRAY[$1], $2)")
.bind(id)
.bind(interval)
Expand All @@ -110,6 +108,8 @@ impl<'a> JobExecutor<'a> {
tracing::error!("Failed to keep job {id} alive: {e}");
break;
}
tokio::time::sleep(interval / 2).await;
interval = max_interval.min(interval * 2);
}
});
KeepAliveHandle::new(handle)
Expand Down
54 changes: 54 additions & 0 deletions load.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/bash

set -xe

source "bats/helpers/_common.bash"
source "bats/helpers/user.bash"
source "bats/helpers/onchain.bash"

NOTIFICATIONS_GRPC_ENDPOINT="localhost:6685"
IMPORT_PATH="${REPO_ROOT}/core/notifications/proto"
NOTIFICATIONS_PROTO_FILE="${REPO_ROOT}/core/notifications/proto/notifications.proto"

# update_user_locale_method="services.notifications.v1.NotificationsService/UpdateUserLocale"

# for i in $(seq 1 10000); do
# request_data=$(jq -n --arg userId "$i" --arg locale "es" '{
# "userId": $userId,
# "locale": $locale
# }')
# grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$update_user_locale_method" "$request_data"

# done
localized_content_en='{"title": "Hello", "body": "World"}'
localized_content_es='{"title": "Hola", "body": "World"}'
# Generate user_ids array from "1" to "10000"
user_ids=($(seq -f "%04g" 1 8000))

# Convert user_ids array to a JSON array of strings
user_ids=$(printf '%s\n' "${user_ids[@]}" | jq -R . | jq -s .)

# Create the JSON request payload using jq
request_data=$(jq -n \
--argjson user_ids "$user_ids" \
--argjson localized_content_en "$localized_content_en" \
--argjson localized_content_es "$localized_content_es" \
'{
"event": {
"marketingNotificationTriggered": {
"user_ids": $user_ids,
"localized_push_content": {
"en": $localized_content_en,
"es": $localized_content_es
}
}
}
}')

echo $request_data | jq
# Specify the GRPC method to call
handle_notification_event_method="services.notifications.v1.NotificationsService/HandleNotificationEvent"

# Make the GRPC request using grpcurl
grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$handle_notification_event_method" "$request_data"

Loading