Skip to content

Commit

Permalink
feat: obtain node rpc addresses from the node manager inventory
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Jan 19, 2024
1 parent cbcf638 commit d666210
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 13 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tar = "0.4"
tempfile = "3.8.0"
tokio = { version = "1.26", features = ["full"] }
tokio-stream = "0.1.14"
walkdir = "2.4.0"

[dev-dependencies]
httpmock = "0.6"
11 changes: 11 additions & 0 deletions resources/ansible/node_manager_inventory.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
- name: fetch node manager inventory from remote machines
hosts: all
ignore_unreachable: yes
max_fail_percentage: 10
tasks:
- name: fetch inventory file
fetch:
src: "/var/safenode-manager/node_registry.json"
dest: "{{dest}}"
flat: no
104 changes: 91 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ use crate::{
};
use flate2::read::GzDecoder;
use indicatif::{ProgressBar, ProgressStyle};
use log::debug;
use log::{debug, error, trace};
use rand::Rng;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use rpc_client::parse_output;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{
collections::HashMap,
fs::File,
io::{BufRead, BufReader, BufWriter, Write},
net::{IpAddr, SocketAddr},
Expand All @@ -44,6 +45,7 @@ use std::{
time::Duration,
};
use tar::Archive;
use walkdir::WalkDir;

/// How or where to build the binaries from.
#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -493,15 +495,15 @@ impl TestnetDeploy {

let genesis_inventory = self
.ansible_runner
.inventory_list(genesis_inventory_path, false)
.inventory_list(genesis_inventory_path.clone(), false)
.await?;
let build_inventory = self
.ansible_runner
.inventory_list(build_inventory_path, false)
.await?;
let remaining_nodes_inventory = self
.ansible_runner
.inventory_list(remaining_nodes_inventory_path, false)
.inventory_list(remaining_nodes_inventory_path.clone(), false)
.await?;

// It also seems to be possible for a workspace and inventory files to still exist, but
Expand All @@ -521,16 +523,73 @@ impl TestnetDeploy {
}
let (genesis_multiaddr, genesis_ip) = self.get_genesis_multiaddr(name).await?;

// To generate the rpc end points of each of the node.
// Genesis contains one node, and the rpc port is 12001
// For the other vms, the rpc ports starts from 12001 to (12000 + node_instance_count)
let mut rpc_endpoints = Vec::new();
rpc_endpoints.push(SocketAddr::new(genesis_inventory[0].1, 12001));
for (_, entry_ip) in remaining_nodes_inventory.iter() {
(1..node_instance_count.unwrap_or(0) + 1).for_each(|port_suffix| {
rpc_endpoints.push(SocketAddr::new(*entry_ip, 12000 + port_suffix));
})
}
// Get the rpc end points from the node-manager inventory file
let rpc_endpoints = {
debug!("Fetching node manager inventory");
let temp_dir_path = tempfile::tempdir()?.into_path();
let temp_dir_json = serde_json::to_string(&temp_dir_path)?;

self.ansible_runner.run_playbook(
PathBuf::from("node_manager_inventory.yml"),
remaining_nodes_inventory_path,
self.cloud_provider.get_ssh_user(),
Some(format!("{{ \"dest\": {temp_dir_json} }}")),
)?;
self.ansible_runner.run_playbook(
PathBuf::from("node_manager_inventory.yml"),
genesis_inventory_path,
self.cloud_provider.get_ssh_user(),
Some(format!("{{ \"dest\": {temp_dir_json} }}")),
)?;

let all_node_inventory = genesis_inventory
.iter()
.chain(remaining_nodes_inventory.iter())
.cloned()
.collect::<HashMap<_, _>>();

// collect the manager inventory file paths along with their respective ip addr
let manager_inventory_files = WalkDir::new(temp_dir_path)
.into_iter()
.flatten()
.filter_map(|entry| {
if entry.file_type().is_file()
&& entry.path().extension().is_some_and(|ext| ext == "json")
{
// tempdir/<testnet_name>-node/var/safenode-manager/node_registry.json
let mut vm_name = entry.path().to_path_buf();
trace!("Found file with json extension: {vm_name:?}");
vm_name.pop();
vm_name.pop();
vm_name.pop();
// Extract the <testnet_name>-node string
trace!("Extracting the vm name from the path");
let vm_name = vm_name.file_name()?.to_str()?;
trace!("Extracted vm name from path: {vm_name}");
if let Some(ip_addr) = all_node_inventory.get(vm_name) {
Some((entry.path().to_path_buf(), *ip_addr))
} else {
error!("Could not obtain ip addr for the provided vm name {vm_name:?}");
None
}
} else {
None
}
})
.collect::<Vec<(PathBuf, IpAddr)>>();

manager_inventory_files
.par_iter()
.flat_map(|(file_path, ip_addr)| {
match get_rpc_ports_from_manager_inventory(file_path) {
Ok(rpc_ports) => rpc_ports
.map(|port| Ok(SocketAddr::new(*ip_addr, port)))
.collect::<Vec<_>>(),
Err(e) => vec![Err(e)],
}
})
.collect::<Result<Vec<SocketAddr>>>()?
};

// The scripts are relative to the `resources` directory, so we need to change the current
// working directory back to that location first.
Expand Down Expand Up @@ -830,3 +889,22 @@ pub fn get_progress_bar(length: u64) -> Result<ProgressBar> {
progress_bar.enable_steady_tick(Duration::from_millis(100));
Ok(progress_bar)
}

fn get_rpc_ports_from_manager_inventory(
inventory_file_path: &PathBuf,
) -> Result<impl Iterator<Item = u16>> {
#[derive(Deserialize)]
struct NodeRegistry {
nodes: Vec<Node>,
}
#[derive(Deserialize)]
struct Node {
rpc_port: u16,
}

let file = File::open(inventory_file_path)?;
let reader = BufReader::new(file);
let node_registry: Result<NodeRegistry, _> = serde_json::from_reader(reader);
let rpc_ports = node_registry?.nodes.into_iter().map(|node| node.rpc_port);
Ok(rpc_ports)
}

0 comments on commit d666210

Please sign in to comment.