Skip to content

Commit

Permalink
tests: fixing a few things in the sharded nros case
Browse files Browse the repository at this point in the history
Signed-off-by: Reto Achermann <achreto@gmail.com>
  • Loading branch information
achreto committed Nov 1, 2023
1 parent 0015958 commit aec1c7a
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 53 deletions.
86 changes: 47 additions & 39 deletions kernel/tests/s11_rackscale_benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ fn rackscale_fxmark_benchmark(transport: RackscaleTransport) {
test.file_name = file_name.clone();
test.arg = Some(config);

fn cmd_fn(num_cores: usize, arg: Option<FxmarkConfig>) -> String {
fn cmd_fn(num_cores: usize, _num_clients: usize, arg: Option<FxmarkConfig>) -> String {
// TODO: add in arg with formatting.
//1XmixX0 is - mix benchmark for 0% writes with 1 open file
let config = arg.expect("Missing fxmark config");
Expand Down Expand Up @@ -296,7 +296,7 @@ fn rackscale_vmops_benchmark(transport: RackscaleTransport, benchtype: VMOpsBenc
test.file_name = file_name.clone();
test.arg = Some(benchtype);

fn cmd_fn(num_cores: usize, _arg: Option<VMOpsBench>) -> String {
fn cmd_fn(num_cores: usize, _num_clients: usize, _arg: Option<VMOpsBench>) -> String {
format!("initargs={}", num_cores)
}
fn baseline_timeout_fn(num_cores: usize) -> u64 {
Expand Down Expand Up @@ -427,7 +427,7 @@ fn s11_rackscale_shmem_leveldb_benchmark() {
test.arg = Some(config);
test.run_dhcpd_for_baseline = true;

fn cmd_fn(num_cores: usize, arg: Option<LevelDBConfig>) -> String {
fn cmd_fn(num_cores: usize, _num_clients: usize, arg: Option<LevelDBConfig>) -> String {
let config = arg.expect("missing leveldb config");
format!(
r#"init=dbbench.bin initargs={} appcmd='--threads={} --benchmarks=fillseq,readrandom --reads={} --num={} --value_size={}'"#,
Expand Down Expand Up @@ -630,7 +630,11 @@ fn rackscale_memcached_benchmark(transport: RackscaleTransport) {
);
}

fn cmd_fn(num_cores: usize, arg: Option<MemcachedInternalConfig>) -> String {
fn cmd_fn(
num_cores: usize,
_num_clients: usize,
arg: Option<MemcachedInternalConfig>,
) -> String {
let config = arg.expect("missing leveldb config");
format!(
r#"init=memcachedbench.bin initargs={} appcmd='--x-benchmark-mem={} --x-benchmark-queries={}'"#,
Expand Down Expand Up @@ -1018,9 +1022,6 @@ fn s11_rackscale_memcached_benchmark_sharded_linux() {

let r = pty.process.kill(SIGKILL);


println!("{:?}", res);

// single node
for protocol in &["tcp", "unix"] {
config.protocol = protocol;
Expand Down Expand Up @@ -1114,6 +1115,8 @@ fn s11_rackscale_memcached_benchmark_sharded_linux() {
#[test]
#[cfg(not(feature = "baremetal"))]
fn s11_rackscale_memcached_benchmark_sharded_nros() {
use rexpect::process::signal::Signal::SIGKILL;

let out_dir_path = PathBuf::from(env!("CARGO_TARGET_TMPDIR")).join("sharded-memcached");
let is_smoke = cfg!(feature = "smoke");

Expand Down Expand Up @@ -1148,7 +1151,7 @@ fn s11_rackscale_memcached_benchmark_sharded_nros() {
command.args(&["--binary"]);
command.arg(format!("--num-queries={}", config.num_queries).as_str());
command.arg(format!("--num-threads={}", config.num_threads).as_str());
command.arg(format!("--max-memory={}", config.mem_size).as_str());
command.arg(format!("--max-memory={}", config.mem_size / 8).as_str());
let mut servers = String::from("--servers=");
for i in 0..config.num_servers {
if i > 0 {
Expand Down Expand Up @@ -1206,14 +1209,16 @@ fn s11_rackscale_memcached_benchmark_sharded_nros() {
fn controller_run_fun(
config: Option<&MemcachedShardedConfig>,
num_servers: usize,
num_threads: usize,
timeout_ms: u64,
) -> Result<PtySession> {
// here we should wait
std::thread::sleep(Duration::from_secs(15));
std::thread::sleep(Duration::from_secs(15 + 2 * num_servers as u64));

let mut config = config.unwrap().clone();

config.num_servers = num_servers;
config.num_threads = num_servers * num_threads;
spawn_loadbalancer(&config, timeout_ms)
}

Expand Down Expand Up @@ -1245,7 +1250,7 @@ fn s11_rackscale_memcached_benchmark_sharded_nros() {
return Err(Error(Timeout(expected, got, timeout), st));
}
Err(err) => {
println!("Failed: {:?}", err);
// println!("Failed: {:?}", err);
return Err(err);
}
};
Expand All @@ -1259,7 +1264,6 @@ fn s11_rackscale_memcached_benchmark_sharded_nros() {
let r = csv_file.write(out.as_bytes());
assert!(r.is_ok());

println!("{:?}", res);
Ok(())
}

Expand All @@ -1272,22 +1276,6 @@ fn s11_rackscale_memcached_benchmark_sharded_nros() {
_is_baseline: bool,
_arg: Option<MemcachedShardedConfig>,
) -> Result<()> {
match proc.exp_regex(r#"\[ INFO\]: bootloader/src/kernel.rs"#) {
Ok(_) => (),
Err(rexpect::errors::Error(
rexpect::errors::ErrorKind::EOF(_expected, _s, _),
_state,
)) => {
// for l in s.lines() {
// println!("MEMCACHED-OUTPUT: {}", l);
// }
}
Err(e) => {
println!("{e:?}");
panic!("error")
}
}

match proc.exp_regex(r#"dhcp: vioif0: adding IP address (\d+).(\d+).(\d+).(\d+)/(\d+)"#) {
Ok((_prev, matched)) => {
println!(" > Networking setup succeeded. {matched}");
Expand All @@ -1311,7 +1299,7 @@ fn s11_rackscale_memcached_benchmark_sharded_nros() {
}

let (prev, matched) = proc.exp_regex(r#"x_benchmark_mem = (\d+) MB"#).unwrap();
println!("C> {}", matched);
println!("> {}", matched);
// let b_mem = matched.replace("x_benchmark_mem = ", "").replace(" MB", "");

*output += prev.as_str();
Expand All @@ -1334,16 +1322,21 @@ fn s11_rackscale_memcached_benchmark_sharded_nros() {

if !is_smoke {
test.shmem_size = std::cmp::max(
MEMCACHED_MEM_SIZE_MB * 8,
testutils::helpers::SHMEM_SIZE * 4,
MEMCACHED_MEM_SIZE_MB * 2,
testutils::helpers::SHMEM_SIZE * 2,
);
}

fn cmd_fn(num_cores: usize, arg: Option<MemcachedShardedConfig>) -> String {
fn cmd_fn(num_cores: usize, num_clients: usize, arg: Option<MemcachedShardedConfig>) -> String {
let config = arg.expect("missing configuration");
let num_threads = num_cores / num_clients;

format!(
r#"init=memcachedbench.bin initargs={} appcmd='--x-benchmark-no-run --disable-evictions --conn-limit=1024 --threads={} --x-benchmark-mem={} --memory-limit={}'"#,
num_cores, num_cores, config.mem_size, config.mem_size
num_threads,
num_threads,
config.mem_size,
config.mem_size * 2
)
}

Expand All @@ -1355,22 +1348,34 @@ fn s11_rackscale_memcached_benchmark_sharded_nros() {
1200_000 + 60_000 * num_cores as u64
}

fn mem_fn(num_cores: usize, _num_clients: usize, is_smoke: bool) -> usize {
fn mem_fn(_num_cores: usize, num_clients: usize, is_smoke: bool) -> usize {
if is_smoke {
8192
} else {
// Memory must also be divisible by number of nodes, which could be 1, 2, 3, or 4
(8192
+ std::cmp::max(
MEMCACHED_MEM_SIZE_MB * 8,
testutils::helpers::SHMEM_SIZE * 4,
// mem = result of this function / num_clients - shmem_size
(8092
+ 2 * std::cmp::max(
MEMCACHED_MEM_SIZE_MB * 2,
testutils::helpers::SHMEM_SIZE * 2,
))
* (((((num_cores + 1) / 2) + 3 - 1) / 3) * 3)
* num_clients
}
}

println!("----------------------------------------------------------");

let machine = Machine::determine();

let mut pings = Vec::new();
for i in 0..machine.max_numa_nodes() {
let mut command = Command::new("ping");
command.arg(&format!("172.31.0.{}", 10 + i + 1));

let proc = spawn_command(command, None).unwrap();
pings.push(proc);
}

// construct bench and run it!
let bench = RackscaleBench {
test,
Expand All @@ -1380,6 +1385,9 @@ fn s11_rackscale_memcached_benchmark_sharded_nros() {
mem_fn,
};
bench.run_bench(false, is_smoke);
for mut ping in pings.into_iter() {
ping.process.kill(SIGKILL);
}
}

#[test]
Expand Down Expand Up @@ -1428,7 +1436,7 @@ fn rackscale_monetdb_benchmark(transport: RackscaleTransport) {
test.arg = None;
test.run_dhcpd_for_baseline = true;

fn cmd_fn(num_cores: usize, _arg: Option<()>) -> String {
fn cmd_fn(num_cores: usize, _num_clients: usize, _arg: Option<()>) -> String {
format!(
r#"init=monetdbd.bin initargs={} appcmd='create dbfarm'"#,
num_cores
Expand Down
36 changes: 22 additions & 14 deletions kernel/testutils/src/rackscale_runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::sync::{mpsc::channel, Arc, Mutex};
use std::thread;
use std::thread::{self, sleep};
use std::time::Duration;

use rexpect::errors::*;
Expand Down Expand Up @@ -47,8 +48,12 @@ type RackscaleMatchFn<T> = fn(
arg: Option<T>,
) -> Result<()>;

type ControllerRunFn<T> =
fn(config: Option<&T>, num_clients: usize, timeout_ms: u64) -> Result<PtySession>;
type ControllerRunFn<T> = fn(
config: Option<&T>,
num_clients: usize,
num_threas: usize,
timeout_ms: u64,
) -> Result<PtySession>;

#[derive(Clone)]
pub struct RackscaleRun<T>
Expand Down Expand Up @@ -457,6 +462,8 @@ impl<T: Clone + Send + 'static> RackscaleRun<T> {
let (tx_build_timer, _rx_build_timer) = channel();
let tx_build_timer_mut = Arc::new(Mutex::new(tx_build_timer));

let boot_counter = Arc::new(AtomicUsize::new(0));

// Run client in separate thead. Wait a bit to make sure controller started
let mut client_procs = Vec::new();
for i in 0..self.num_clients {
Expand All @@ -467,6 +474,7 @@ impl<T: Clone + Send + 'static> RackscaleRun<T> {
let client_file_name = self.file_name.clone();
let client_cmd = self.cmd.clone();
let client_placement_cores = placement_cores.clone();
let client_boot_counter = boot_counter.clone();
let state = self.clone();
let client_tx_build_timer = tx_build_timer_mut.clone();
let use_large_pages = self.use_qemu_huge_pages;
Expand Down Expand Up @@ -495,14 +503,8 @@ impl<T: Clone + Send + 'static> RackscaleRun<T> {
let mut output = String::new();
let qemu_run = || -> Result<WaitStatus> {
let mut p = spawn_nrk(&cmdline_client)?;

// output += p.exp_string("CLIENT READY")?.as_str();
// {
// let tx = client_tx_build_timer
// .lock()
// .expect("Failed to get build timer lock");
// send_signal(&tx);
// }
output += p.exp_string("NRK booting on")?.as_str();
client_boot_counter.fetch_add(1, Ordering::SeqCst);

// User-supplied function to check output
(state.client_match_fn)(
Expand Down Expand Up @@ -552,6 +554,11 @@ impl<T: Clone + Send + 'static> RackscaleRun<T> {
);
})
.expect("Client thread failed to spawn");

while i == boot_counter.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(500));
}

client_procs.push(client);
}

Expand All @@ -576,6 +583,7 @@ impl<T: Clone + Send + 'static> RackscaleRun<T> {
let mut p = run_fn(
controller_arg.as_ref(),
state.num_clients,
state.cores_per_client,
state.controller_timeout,
)?;

Expand Down Expand Up @@ -779,8 +787,8 @@ impl<T: Clone + Send + 'static> RackscaleRun<T> {
pub struct RackscaleBench<T: Clone + Send + 'static> {
// Test to run
pub test: RackscaleRun<T>,
// Function to calculate the command. Takes as argument number of application cores
pub cmd_fn: fn(usize, Option<T>) -> String,
// Function to calculate the command. Takes as argument number of application cores and the number of clients
pub cmd_fn: fn(usize, usize, Option<T>) -> String,
// Function to calculate the timeout. Takes as argument number of application cores
pub rackscale_timeout_fn: fn(usize) -> u64,
// Function to calculate the timeout. Takes as argument number of application cores
Expand Down Expand Up @@ -873,7 +881,7 @@ impl<T: Clone + Send + 'static> RackscaleBench<T> {
test_run.num_clients = num_clients;

// Calculate command based on the number of cores
test_run.cmd = (self.cmd_fn)(total_cores, test_run.arg.clone());
test_run.cmd = (self.cmd_fn)(total_cores, num_clients, test_run.arg.clone());

// Caclulate memory for each component
if !is_baseline {
Expand Down

0 comments on commit aec1c7a

Please sign in to comment.