Skip to content

Commit

Permalink
cmdr messages, relay client updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Quinn committed Dec 27, 2024
1 parent ec8e240 commit 4447231
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 196 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

# IDE stuff
.idea/
.vscode/

# Allow developers to use python pre-commit locally
/.pre-commit-config.yaml
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ rev = "787ab78581b705af0946bcfe3a0453b64af2193f"
git = "https://github.com/worldcoin/orb-relay-messages.git"
rev = "6de97229c26d46eb600b1dd288b5325830ed816b"
features = ["client"]
# Uncomment for local development
[patch."https://github.com/worldcoin/orb-relay-messages.git"]
orb-relay-messages = { path = "../orb-relay-messages/rust" }

[workspace.dependencies.nusb]
git = "https://github.com/kevinmehall/nusb"
Expand Down
7 changes: 2 additions & 5 deletions endpoints/src/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ impl Endpoints {
orb_id,
"",
),
relay: concat_urls(
&format!("https://relay.{subdomain}.worldcoin.org/"),
orb_id,
"",
),
relay: Url::parse(&format!("https://relay.{subdomain}.worldcoin.org/"))
.expect("urls with validated orb ids should always parse"),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion fleet-cmdr/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Orb Fleet Commander

This is the Orb Fleet Commander. It is a process the connects to the Fleet Backend to allow for fleet management.
This is the Orb Fleet Commander. It is a process the connects to the Orb Relay Service and processes messages for fleet management.
4 changes: 2 additions & 2 deletions fleet-cmdr/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ pub struct Args {
#[arg(long)]
pub config: Option<String>,
/// The URL of the orb relay.
#[arg(long, default_value = "https://relay.worldcoin.org")]
#[arg(long)]
pub orb_relay_url: Option<String>,
/// The path to the orb name file.
#[arg(long, default_value = "/etc/worldcoin/orb_name")]
#[arg(long, default_value = "/usr/persistent/orb_name")]
pub orb_name_path: Option<PathBuf>,
}

Expand Down
66 changes: 65 additions & 1 deletion fleet-cmdr/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,71 @@
pub mod args;
pub mod settings;
pub mod orb_info;
pub mod settings;

use color_eyre::eyre::{eyre, Result};
use orb_build_info::{make_build_info, BuildInfo};
use orb_endpoints::{Endpoints, OrbId};
use orb_relay_client::client::Client;
use orb_relay_messages::common;
use std::time::Duration;
use tracing::error;

pub const BUILD_INFO: BuildInfo = make_build_info!();

const ORB_FLEET_CMDR_NAMESPACE: &str = "orb-fleet-cmdr";
const ORB_RELAY_DEST_ID: &str = "orb-fleet-cmdr";

pub async fn relay_connect(
orb_id: &OrbId,
orb_token: String,
endpoints: &Endpoints,
reties: u32,
timeout: Duration,
) -> Result<Client> {
let mut relay = Client::new_as_orb(
endpoints.relay.to_string(),
orb_token,
orb_id.to_string(),
ORB_RELAY_DEST_ID.to_string(),
ORB_FLEET_CMDR_NAMESPACE.to_string(),
);
if let Err(e) = relay.connect().await {
return Err(eyre!("Relay: Failed to connect: {e}"));
}
for _ in 0..reties {
if let Ok(()) = relay
.send_blocking(
common::v1::AnnounceOrbId {
orb_id: orb_id.to_string(),
mode_type: common::v1::announce_orb_id::ModeType::SelfServe.into(),
hardware_type: common::v1::announce_orb_id::HardwareType::Pearl
.into(),
},
timeout,
)
.await
{
// Happy path. We have successfully announced and acknowledged the OrbId.
return Ok(relay);
}
error!("Relay: Failed to AnnounceOrbId. Retrying...");
relay.reconnect().await?;
if relay.has_pending_messages().await? > 0 {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
Err(eyre!(
"Relay: Failed to send AnnounceOrbId after a reconnect"
))
}

pub async fn relay_disconnect(
relay: &mut Client,
wait_for_pending_messages: Duration,
wait_for_shutdown: Duration,
) -> Result<()> {
relay
.graceful_shutdown(wait_for_pending_messages, wait_for_shutdown)
.await;
Ok(())
}
100 changes: 19 additions & 81 deletions fleet-cmdr/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use orb_endpoints::{backend::Backend, endpoints::Endpoints, OrbId};
use color_eyre::eyre::Result;
use orb_endpoints::{backend::Backend, endpoints::Endpoints};
use orb_fleet_cmdr::{
args::Args,
orb_info::{get_orb_id, get_orb_token},
relay_connect, relay_disconnect,
settings::Settings,
};
use orb_relay_client::client::Client;
use orb_relay_messages::common;
use std::{borrow::Cow, path::Path, time::Duration};
use orb_relay_messages::orb_commands::v1::OrbCommand;
use std::time::Duration;
use tracing::{debug, error, info};

const CFG_DEFAULT_PATH: &str = "/etc/orb_fleet_cmdr.conf";
const ENV_VAR_PREFIX: &str = "ORB_FLEET_CMDR_";
const CFG_ENV_VAR: &str = const_format::concatcp!(ENV_VAR_PREFIX, "CONFIG");
const SYSLOG_IDENTIFIER: &str = "worldcoin-fleet-cmdr";
const ORB_FLEET_CMDR_NAMESPACE: &str = "orb-fleet-cmdr";
const ORB_RELAY_DEST_ID: &str = "orb-fleet-cmdr";

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -26,27 +21,12 @@ async fn main() -> Result<()> {
.init();

let args = Args::parse();
let config_path = get_config_source(&args);

let settings = Settings::get(&args, config_path, ENV_VAR_PREFIX)?;
let settings = Settings::get(&args)?;

debug!(?settings, "starting fleet commander with settings");
run(&settings).await
}

fn get_config_source(args: &Args) -> Cow<'_, Path> {
if let Some(config) = &args.config {
info!("using config provided by command line argument: `{config}`");
Cow::Borrowed(config.as_ref())
} else if let Some(config) = figment::providers::Env::var(CFG_ENV_VAR) {
info!("using config set in environment variable `{CFG_ENV_VAR}={config}`");
Cow::Owned(std::path::PathBuf::from(config))
} else {
info!("using default config at `{CFG_DEFAULT_PATH}`");
Cow::Borrowed(CFG_DEFAULT_PATH.as_ref())
}
}

async fn run(settings: &Settings) -> Result<()> {
info!("running fleet commander: {:?}", settings);

Expand All @@ -58,65 +38,23 @@ async fn run(settings: &Settings) -> Result<()> {
relay_connect(&orb_id, orb_token, &endpoints, 3, Duration::from_secs(10))
.await?;

// TODO: Implement the main loop

relay_disconnect(&mut relay, Duration::from_secs(10), Duration::from_secs(10))
.await?;

Ok(())
}

async fn relay_connect(
orb_id: &OrbId,
orb_token: String,
endpoints: &Endpoints,
reties: u32,
timeout: Duration,
) -> Result<Client> {
let mut relay = Client::new_as_orb(
endpoints.relay.to_string(),
orb_token,
orb_id.to_string(),
ORB_RELAY_DEST_ID.to_string(),
ORB_FLEET_CMDR_NAMESPACE.to_string(),
);
if let Err(e) = relay.connect().await {
return Err(eyre!("Relay: Failed to connect: {e}"));
}
for _ in 0..reties {
if let Ok(()) = relay
.send_blocking(
common::v1::AnnounceOrbId {
orb_id: orb_id.to_string(),
mode_type: common::v1::announce_orb_id::ModeType::SelfServe.into(),
hardware_type: common::v1::announce_orb_id::HardwareType::Pearl
.into(),
},
timeout,
)
loop {
match relay
.wait_for_msg::<OrbCommand>(Duration::from_secs(10))
.await
{
// Happy path. We have successfully announced and acknowledged the OrbId.
return Ok(relay);
}
error!("Relay: Failed to AnnounceOrbId. Retrying...");
relay.reconnect().await?;
if relay.has_pending_messages().await? > 0 {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(OrbCommand { commands: command }) => {
println!("Received command: {:?}", command);
}
Err(e) => {
error!("Error receiving message: {:?}", e);
break;
}
}
}
Err(eyre!(
"Relay: Failed to send AnnounceOrbId after a reconnect"
))
}

async fn relay_disconnect(
relay: &mut Client,
wait_for_pending_messages: Duration,
wait_for_shutdown: Duration,
) -> Result<()> {
relay
.graceful_shutdown(wait_for_pending_messages, wait_for_shutdown)
.await;
relay_disconnect(&mut relay, Duration::from_secs(10), Duration::from_secs(10))
.await?;

Ok(())
}
35 changes: 27 additions & 8 deletions fleet-cmdr/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
use std::path::{Path, PathBuf};
use std::{
borrow::Cow,
path::{Path, PathBuf},
};

use figment::providers::Format;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tracing::info;

use crate::args::Args;

const CFG_DEFAULT_PATH: &str = "/etc/orb_fleet_cmdr.conf";
const ENV_VAR_PREFIX: &str = "ORB_FLEET_CMDR_";
const CFG_ENV_VAR: &str = const_format::concatcp!(ENV_VAR_PREFIX, "CONFIG");

#[serde_as]
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct Settings {
Expand All @@ -17,15 +25,26 @@ impl Settings {
/// Constructs `Settings` from a config file, environment variables, and command line
/// arguments. Command line arguments always take precedence over environment variables, which
/// in turn take precedence over the config file.
pub fn get<P: AsRef<Path>>(
args: &Args,
config: P,
env_prefix: &str,
) -> figment::error::Result<Settings> {
pub fn get(args: &Args) -> figment::error::Result<Settings> {
let config_path = Self::get_config_source(args);

figment::Figment::new()
.merge(figment::providers::Toml::file(config))
.merge(figment::providers::Env::prefixed(env_prefix))
.merge(figment::providers::Toml::file(config_path))
.merge(figment::providers::Env::prefixed(ENV_VAR_PREFIX))
.merge(figment::providers::Serialized::defaults(args))
.extract()
}

fn get_config_source(args: &Args) -> Cow<'_, Path> {
if let Some(config) = &args.config {
info!("using config provided by command line argument: `{config}`");
Cow::Borrowed(config.as_ref())
} else if let Some(config) = figment::providers::Env::var(CFG_ENV_VAR) {
info!("using config set in environment variable `{CFG_ENV_VAR}={config}`");
Cow::Owned(std::path::PathBuf::from(config))
} else {
info!("using default config at `{CFG_DEFAULT_PATH}`");
Cow::Borrowed(CFG_DEFAULT_PATH.as_ref())
}
}
}
Loading

0 comments on commit 4447231

Please sign in to comment.