From fb4e1e98818ab4b5d2c4a23dcfc1f7ac90d1d37a Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Sat, 16 Mar 2024 14:00:22 +0100 Subject: [PATCH 1/3] Generalize metric assertions --- crates/orchestrator/src/network/node.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/crates/orchestrator/src/network/node.rs b/crates/orchestrator/src/network/node.rs index 5c4a4cd1c..ca8418644 100644 --- a/crates/orchestrator/src/network/node.rs +++ b/crates/orchestrator/src/network/node.rs @@ -148,17 +148,27 @@ impl NetworkNode { &self, metric_name: impl Into, value: impl Into, + ) -> Result { + let value: f64 = value.into(); + self.assert_with(metric_name, |v| v == value).await + } + + /// Assert on a metric value using a given predicate. + /// See [`assert`] description for details. + pub async fn assert_with( + &self, + metric_name: impl Into, + predicate: impl Fn(f64) -> bool, ) -> Result { let metric_name = metric_name.into(); - let value = value.into(); let val = self.metric(&metric_name).await?; - if val == value { + if predicate(val) { Ok(true) } else { // reload metrics self.fetch_metrics().await?; let val = self.metric(&metric_name).await?; - Ok(val == value) + Ok(predicate(val)) } } From d76d649e4978694f19369e4d27bdd8f110bd69bd Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Sun, 17 Mar 2024 14:26:26 +0100 Subject: [PATCH 2/3] Initial implementation --- crates/orchestrator/src/lib.rs | 4 +- crates/orchestrator/src/network.rs | 91 +++++++++++--- crates/orchestrator/src/network/node.rs | 5 + crates/orchestrator/src/network_spec/node.rs | 4 +- crates/orchestrator/src/spawner.rs | 122 +++++++++++++++++++ crates/provider/src/kubernetes/namespace.rs | 8 ++ crates/provider/src/kubernetes/node.rs | 12 +- crates/provider/src/lib.rs | 8 +- crates/provider/src/native/namespace.rs | 13 ++ crates/provider/src/native/node.rs | 27 +++- 10 files changed, 264 insertions(+), 30 deletions(-) diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index 633257deb..729468cd7 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -2,7 +2,7 @@ #![allow(dead_code, clippy::expect_fun_call)] pub mod errors; -mod generators; +pub mod generators; pub mod network; mod network_helper; mod network_spec; @@ -252,7 +252,7 @@ where } else { node.spec.p2p_port.0 }, - node.inner.args().as_ref(), + node.inner.args().await.as_ref(), &node.spec.p2p_cert_hash, )?, ); diff --git a/crates/orchestrator/src/network.rs b/crates/orchestrator/src/network.rs index 9e3a7d4dd..c5ad059ce 100644 --- a/crates/orchestrator/src/network.rs +++ b/crates/orchestrator/src/network.rs @@ -16,7 +16,7 @@ use support::fs::FileSystem; use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain}; use crate::{ generators::chain_spec::ChainSpec, - network_spec::{self, NetworkSpec}, + network_spec::{self, node::NodeSpec, NetworkSpec}, shared::{ macros, types::{ChainDefaultContext, RegisterParachainOptions}, @@ -31,7 +31,6 @@ pub struct Network { relay: Relaychain, initial_spec: NetworkSpec, parachains: HashMap, - nodes_by_name: HashMap, } impl std::fmt::Debug for Network { @@ -41,7 +40,6 @@ impl std::fmt::Debug for Network { .field("relay", &self.relay) .field("initial_spec", &self.initial_spec) .field("parachains", &self.parachains) - .field("nodes_by_name", &self.nodes_by_name) .finish() } } @@ -68,7 +66,6 @@ impl Network { relay, initial_spec, parachains: Default::default(), - nodes_by_name: Default::default(), } } @@ -122,7 +119,7 @@ impl Network { let name = name.into(); let relaychain = self.relaychain(); - if self.nodes_by_name.contains_key(&name) { + if self.nodes_iter().any(|n| n.name == name) { return Err(anyhow::anyhow!("Name: {} is already used.", name)); } @@ -178,10 +175,42 @@ impl Network { // // tx_helper::validator_actions::register(vec![&node], &running_node.ws_uri, None).await?; // } - // Add node to the global hash + // Add node to relay self.add_running_node(node.clone(), None); - // add node to relay - self.relay.nodes.push(node); + + Ok(()) + } + + /// Replace an already existing but now dead node spawning a new node with a given spec + /// (which can be obtained from the old node and modified if needed). + /// The spec should contain a node name already existing in the network, otherwise the call + /// fails. + /// + /// TODO: It doesn't currently check if the node in question is actually dead. Trying to + /// replace a running node is a possible UD. + pub async fn replace_node(&mut self, spec: NodeSpec) -> Result<(), anyhow::Error> { + if self.nodes_iter().all(|n| n.name != spec.name) { + return Err(anyhow::anyhow!("Name: {} is not found.", spec.name)); + } + + let relaychain = self.relaychain(); + let base_dir = self.ns.base_dir().to_string_lossy(); + let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir); + + let ctx = SpawnNodeCtx { + chain_id: &relaychain.chain_id, + parachain_id: None, + chain: &relaychain.chain, + role: ZombieRole::Node, + ns: &self.ns, + scoped_fs: &scoped_fs, + parachain: None, + bootnodes_addr: &vec![], + wait_ready: true, + }; + + let node = spawner::respawn_node(&spec, &ctx).await?; + self.replace_running_node(node)?; Ok(()) } @@ -294,8 +323,6 @@ impl Network { network_spec::node::NodeSpec::from_ad_hoc(name.into(), options.into(), &chain_context)?; let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?; - let para = self.parachains.get_mut(¶_id).unwrap(); - para.collators.push(node.clone()); self.add_running_node(node, None); Ok(()) @@ -487,14 +514,13 @@ impl Network { // remove_parachain() pub fn get_node(&self, name: impl Into) -> Result<&NetworkNode, anyhow::Error> { - let name = &name.into(); - if let Some(node) = self.nodes_by_name.get(name) { + let name = name.into(); + if let Some(node) = self.nodes_iter().find(|&n| n.name == name) { return Ok(node); } let list = self - .nodes_by_name - .keys() + .nodes_iter().map(|n| &n.name) .cloned() .collect::>() .join(", "); @@ -504,8 +530,18 @@ impl Network { )) } + pub fn get_node_mut( + &mut self, + name: impl Into, + ) -> Result<&mut NetworkNode, anyhow::Error> { + let name = name.into(); + self.nodes_iter_mut() + .find(|n| n.name == name) + .ok_or(anyhow::anyhow!("can't find node with name: {name:?}")) + } + pub fn nodes(&self) -> Vec<&NetworkNode> { - self.nodes_by_name.values().collect::>() + self.nodes_iter().collect() } pub async fn detach(&self) { @@ -524,9 +560,12 @@ impl Network { } else { self.relay.nodes.push(node.clone()); } - // TODO: we should hold a ref to the node in the vec in the future. - let node_name = node.name.clone(); - self.nodes_by_name.insert(node_name, node); + } + + pub(crate) fn replace_running_node(&mut self, node: NetworkNode) -> Result<(), anyhow::Error> { + let old_node = self.get_node_mut(&node.name)?; + *old_node = node; + Ok(()) } pub(crate) fn add_para(&mut self, para: Parachain) { @@ -544,4 +583,20 @@ impl Network { pub(crate) fn parachains(&self) -> Vec<&Parachain> { self.parachains.values().collect() } + + pub(crate) fn nodes_iter(&self) -> impl Iterator { + self.relay + .nodes + .iter() + .chain(self.parachains.iter().map(|(_, p)| &p.collators).flatten()) + } + + pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator { + self.relay.nodes.iter_mut().chain( + self.parachains + .iter_mut() + .map(|(_, p)| &mut p.collators) + .flatten(), + ) + } } diff --git a/crates/orchestrator/src/network/node.rs b/crates/orchestrator/src/network/node.rs index ca8418644..a46973c55 100644 --- a/crates/orchestrator/src/network/node.rs +++ b/crates/orchestrator/src/network/node.rs @@ -118,6 +118,11 @@ impl NetworkNode { Ok(()) } + pub async fn kill(&self) -> Result<(), anyhow::Error> { + self.inner.kill().await?; + Ok(()) + } + /// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir) pub async fn restart(&self, after: Option) -> Result<(), anyhow::Error> { self.inner.restart(after).await?; diff --git a/crates/orchestrator/src/network_spec/node.rs b/crates/orchestrator/src/network_spec/node.rs index f44c531d5..74ea8dac6 100644 --- a/crates/orchestrator/src/network_spec/node.rs +++ b/crates/orchestrator/src/network_spec/node.rs @@ -50,10 +50,10 @@ pub struct NodeSpec { pub(crate) name: String, /// Node key, used for compute the p2p identity. - pub(crate) key: String, + pub key: String, // libp2p local identity - pub(crate) peer_id: String, + pub peer_id: String, /// Accounts to be injected in the keystore. pub(crate) accounts: NodeAccounts, diff --git a/crates/orchestrator/src/spawner.rs b/crates/orchestrator/src/spawner.rs index c5e9dd782..b33dd4eaf 100644 --- a/crates/orchestrator/src/spawner.rs +++ b/crates/orchestrator/src/spawner.rs @@ -227,3 +227,125 @@ where running_node, )) } + +// TODO: Effectively it's a cut down copy-pasted version of `spawn_node`. Code duplication +// should be addressed. +pub async fn respawn_node<'a, T>( + node: &NodeSpec, + ctx: &SpawnNodeCtx<'a, T>, +) -> Result +where + T: FileSystem, +{ + let base_dir = format!("{}/{}", ctx.ns.base_dir().to_string_lossy(), &node.name); + + let (cfg_path, data_path, relay_data_path) = if !ctx.ns.capabilities().prefix_with_full_path { + ( + NODE_CONFIG_DIR.into(), + NODE_DATA_DIR.into(), + NODE_RELAY_DATA_DIR.into(), + ) + } else { + let cfg_path = format!("{}{NODE_CONFIG_DIR}", &base_dir); + let data_path = format!("{}{NODE_DATA_DIR}", &base_dir); + let relay_data_path = format!("{}{NODE_RELAY_DATA_DIR}", &base_dir); + (cfg_path, data_path, relay_data_path) + }; + + let gen_opts = generators::GenCmdOptions { + relay_chain_name: ctx.chain, + cfg_path: &cfg_path, // TODO: get from provider/ns + data_path: &data_path, // TODO: get from provider + relay_data_path: &relay_data_path, // TODO: get from provider + use_wrapper: false, // TODO: get from provider + bootnode_addr: ctx.bootnodes_addr.clone(), + // IFF the provider require an image (e.g k8s) we should use the default ports in the cmd. + use_default_ports_in_cmd: ctx.ns.capabilities().use_default_ports_in_cmd, + }; + + let (program, args) = match ctx.role { + // Collator should be `non-cumulus` one (e.g adder/undying) + ZombieRole::Node | ZombieRole::Collator => { + let maybe_para_id = ctx.parachain.map(|para| para.id); + + generators::generate_node_command(node, gen_opts, maybe_para_id) + }, + ZombieRole::CumulusCollator => { + let para = ctx.parachain.expect(&format!( + "parachain must be part of the context {THIS_IS_A_BUG}" + )); + let full_p2p = generators::generate_node_port(None)?; + generators::generate_node_command_cumulus(node, gen_opts, para.id, full_p2p.0) + }, + _ => unreachable!(), /* TODO: do we need those? + * ZombieRole::Bootnode => todo!(), + * ZombieRole::Companion => todo!(), */ + }; + + info!( + "🚀 {}, respawning.... with command: {} {}", + node.name, + program, + args.join(" ") + ); + + // Drops the port parking listeners before spawn + node.ws_port.drop_listener(); + node.p2p_port.drop_listener(); + node.rpc_port.drop_listener(); + node.prometheus_port.drop_listener(); + + let running_node = ctx + .ns + .respawn_node(&node.name, args) + .await + .with_context(|| format!("Failed to respawn node: {}", node.name))?; + + let mut ip_to_use = LOCALHOST; + + let (rpc_port_external, prometheus_port_external); + + // Create port-forward iff we are not in CI + if !running_in_ci() { + let ports = futures::future::try_join_all(vec![ + running_node.create_port_forward(node.rpc_port.0, RPC_PORT), + running_node.create_port_forward(node.prometheus_port.0, PROMETHEUS_PORT), + ]) + .await?; + + (rpc_port_external, prometheus_port_external) = ( + ports[0].unwrap_or(node.rpc_port.0), + ports[1].unwrap_or(node.prometheus_port.0), + ); + } else { + // running in ci requrire to use ip and default port + (rpc_port_external, prometheus_port_external) = (RPC_PORT, PROMETHEUS_PORT); + ip_to_use = running_node.ip().await?; + } + + let ws_uri = format!("ws://{}:{}", ip_to_use, rpc_port_external); + let prometheus_uri = format!("http://{}:{}/metrics", ip_to_use, prometheus_port_external); + info!("🚀 {}, should be running now", node.name); + info!( + "🚀 {}: direct link https://polkadot.js.org/apps/?rpc={ws_uri}#/explorer", + node.name + ); + info!("🚀 {}: metrics link {prometheus_uri}", node.name); + // TODO: the cmd for the logs should live on the node or ns. + if ctx.ns.capabilities().requires_image { + info!( + "📓 logs cmd: kubectl -n {} logs {}", + ctx.ns.name(), + node.name + ); + } else { + info!("📓 logs cmd: tail -f {}/{}.log", base_dir, node.name); + } + Ok(NetworkNode::new( + node.name.clone(), + ws_uri, + prometheus_uri, + node.clone(), + running_node, + )) +} diff --git a/crates/provider/src/kubernetes/namespace.rs b/crates/provider/src/kubernetes/namespace.rs index 0a5888f5a..063fea044 100644 --- a/crates/provider/src/kubernetes/namespace.rs +++ b/crates/provider/src/kubernetes/namespace.rs @@ -418,6 +418,14 @@ where Ok(node) } + async fn respawn_node( + &self, + _name: &str, + _args: Vec, + ) -> Result { + todo!() + } + async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> { debug!("options {:#?}", options); diff --git a/crates/provider/src/kubernetes/node.rs b/crates/provider/src/kubernetes/node.rs index 8a158d83e..f7feaaeaf 100644 --- a/crates/provider/src/kubernetes/node.rs +++ b/crates/provider/src/kubernetes/node.rs @@ -401,8 +401,8 @@ where &self.name } - fn args(&self) -> Vec<&str> { - self.args.iter().map(|arg| arg.as_str()).collect() + async fn args(&self) -> Vec { + self.args.clone() } fn base_dir(&self) -> &PathBuf { @@ -656,6 +656,14 @@ where Ok(()) } + async fn kill(&self) -> Result<(), ProviderError> { + todo!() + } + + async fn respawn(&self) -> Result<(), ProviderError> { + todo!() + } + async fn restart(&self, after: Option) -> Result<(), ProviderError> { if let Some(duration) = after { sleep(duration).await; diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index 0e72d2ff6..439da5430 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -147,6 +147,8 @@ pub trait ProviderNamespace { async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result; + async fn respawn_node(&self, name: &str, args: Vec) -> Result; + async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError>; async fn destroy(&self) -> Result<(), ProviderError>; @@ -160,7 +162,7 @@ pub type DynNamespace = Arc; pub trait ProviderNode { fn name(&self) -> &str; - fn args(&self) -> Vec<&str>; + async fn args(&self) -> Vec; fn base_dir(&self) -> &PathBuf; @@ -221,6 +223,10 @@ pub trait ProviderNode { async fn resume(&self) -> Result<(), ProviderError>; + async fn kill(&self) -> Result<(), ProviderError>; + + async fn respawn(&self) -> Result<(), ProviderError>; + async fn restart(&self, after: Option) -> Result<(), ProviderError>; async fn destroy(&self) -> Result<(), ProviderError>; diff --git a/crates/provider/src/native/namespace.rs b/crates/provider/src/native/namespace.rs index 33886cbc6..d0c70c9dd 100644 --- a/crates/provider/src/native/namespace.rs +++ b/crates/provider/src/native/namespace.rs @@ -134,6 +134,19 @@ where Ok(node) } + async fn respawn_node(&self, name: &str, args: Vec) -> Result { + let nodes = self.nodes.read().await; + + let node = nodes + .get(&name.to_owned()) + .ok_or(ProviderError::MissingNode(name.to_owned()))? + .clone(); + node.set_args(&args).await; + node.respawn().await?; + + Ok(node) + } + async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> { let node_name = if let Some(name) = options.temp_name { name diff --git a/crates/provider/src/native/node.rs b/crates/provider/src/native/node.rs index 37e73438f..695a555e3 100644 --- a/crates/provider/src/native/node.rs +++ b/crates/provider/src/native/node.rs @@ -44,7 +44,7 @@ where namespace: Weak>, name: String, program: String, - args: Vec, + args: RwLock>, env: Vec<(String, String)>, base_dir: PathBuf, config_dir: PathBuf, @@ -101,7 +101,7 @@ where namespace: namespace.clone(), name: name.to_string(), program: program.to_string(), - args: args.to_vec(), + args: RwLock::new(args.to_vec()), env: env.to_vec(), base_dir, config_dir, @@ -130,6 +130,10 @@ where Ok(node) } + pub(super) async fn set_args(&self, args: &[String]) { + *(self.args.write().await) = args.to_vec().into(); + } + async fn initialize_startup_paths(&self, paths: &[PathBuf]) -> Result<(), ProviderError> { trace!("creating paths {:?}", paths); let base_dir_raw = self.base_dir.to_string_lossy(); @@ -217,7 +221,7 @@ where async fn initialize_process(&self) -> Result<(ChildStdout, ChildStderr), ProviderError> { let mut process = Command::new(&self.program) - .args(&self.args) + .args(self.args.read().await.iter()) .envs(self.env.to_vec()) .stdin(Stdio::null()) .stdout(Stdio::piped()) @@ -359,8 +363,8 @@ where &self.name } - fn args(&self) -> Vec<&str> { - self.args.iter().map(|arg| arg.as_str()).collect() + async fn args(&self) -> Vec { + self.args.read().await.clone() } fn base_dir(&self) -> &PathBuf { @@ -533,6 +537,19 @@ where Ok(()) } + async fn kill(&self) -> Result<(), ProviderError> { + self.abort() + .await + .map_err(|err| ProviderError::KillNodeFailed(self.name.clone(), err)) + } + + async fn respawn(&self) -> Result<(), ProviderError> { + let (stdout, stderr) = self.initialize_process().await?; + self.initialize_log_writing(stdout, stderr).await; + + Ok(()) + } + async fn restart(&self, after: Option) -> Result<(), ProviderError> { if let Some(duration) = after { sleep(duration).await; From 43994db54eae384f821ff5680d7e05977c81957b Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Sun, 17 Mar 2024 15:30:37 +0100 Subject: [PATCH 3/3] `cargo fmt` --- crates/orchestrator/src/network.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/orchestrator/src/network.rs b/crates/orchestrator/src/network.rs index c5ad059ce..2001eaaf1 100644 --- a/crates/orchestrator/src/network.rs +++ b/crates/orchestrator/src/network.rs @@ -520,7 +520,8 @@ impl Network { } let list = self - .nodes_iter().map(|n| &n.name) + .nodes_iter() + .map(|n| &n.name) .cloned() .collect::>() .join(", ");