Skip to content

Commit

Permalink
Draft: Global process registry - resolves lunatic-solutions#127
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoric committed Feb 28, 2023
1 parent d428e93 commit e7b5da3
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 2 deletions.
52 changes: 50 additions & 2 deletions crates/lunatic-control-axum/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use axum::{
body::Bytes,
extract::DefaultBodyLimit,
routing::{get, post},
routing::{get, post, delete},
Extension, Json, Router,
};
use lunatic_distributed::{
Expand All @@ -15,7 +15,7 @@ use tower_http::limit::RequestBodyLimitLayer;

use crate::{
api::{ok, ApiError, ApiResponse, HostExtractor, JsonExtractor, NodeAuth, PathExtractor},
server::ControlServer,
server::{ControlServer, ProcessId},
};

pub async fn register(
Expand Down Expand Up @@ -50,6 +50,9 @@ pub async fn register(
get_module: format!("http://{host}/module/{{id}}"),
add_module: format!("http://{host}/module"),
get_nodes: format!("http://{host}/nodes"),
get_process: format!("http://{host}/process/get/{{id}}"),
add_process: format!("http://{host}/process/add"),
remove_process: format!("http://{host}/process/remove/{{id}}"),
},
})
}
Expand Down Expand Up @@ -141,6 +144,48 @@ pub async fn get_module(
ok(ModuleBytes { bytes })
}

pub async fn get_process(
node_auth: NodeAuth,
PathExtractor(id): PathExtractor<ProcessId>,
control: Extension<Arc<ControlServer>>,
) -> ApiResponse</* FIXME: What should we return here?*/String> {
log::info!("Node {} get_process {}", node_auth.node_name, id);

let process = control
.processes
.get(&id)
.ok_or_else(|| ApiError::custom_code("error_reading_process_name"))?;

ok(process.value().to_string())
}

pub async fn remove_process(
node_auth: NodeAuth,
PathExtractor(id): PathExtractor<ProcessId>,
control: Extension<Arc<ControlServer>>,
) -> ApiResponse</* FIXME: What should we return here?*/bool> {
log::info!("Node {} remove_process {}", node_auth.node_name, id);

let was_removed = control.processes
.remove(&id)
.is_some();

ok(was_removed)
}

pub async fn add_process(
node_auth: NodeAuth,
control: Extension<Arc<ControlServer>>,
PathExtractor(id): PathExtractor<ProcessId>,
JsonExtractor(name): JsonExtractor<String>,
) -> ApiResponse</* FIXME: What should we return here?*/bool> {
log::info!("Node {} add_process {}", node_auth.node_name, id);

let was_replaced = control.processes.insert(id, name).is_some();

ok(was_replaced)
}

pub fn init_routes() -> Router {
Router::new()
.route("/", post(register))
Expand All @@ -149,6 +194,9 @@ pub fn init_routes() -> Router {
.route("/nodes", get(list_nodes))
.route("/module", post(add_module))
.route("/module/:id", get(get_module))
.route("/process/:id", get(get_process))
.route("/process/:id", post(add_process))
.route("/process/:id", delete(remove_process))
.layer(DefaultBodyLimit::disable())
.layer(RequestBodyLimitLayer::new(50 * 1024 * 1024)) // 50 mb
}
14 changes: 14 additions & 0 deletions crates/lunatic-control-axum/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use chrono::{DateTime, Utc};
use dashmap::DashMap;
use lunatic_distributed::control::api::{NodeStart, Register};
use rcgen::Certificate;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::routes;
Expand All @@ -33,12 +34,24 @@ pub struct NodeDetails {
pub attributes: serde_json::Value,
}

/// The id of a process.
///
/// FIXME: Is this a global id or a local id?
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, Deserialize, Serialize)]
pub struct ProcessId(u64);
impl std::fmt::Display for ProcessId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

pub struct ControlServer {
pub ca_cert: Certificate,
pub quic_client: lunatic_distributed::quic::Client,
pub registrations: DashMap<u64, Registered>,
pub nodes: DashMap<u64, NodeDetails>,
pub modules: DashMap<u64, Vec<u8>>,
pub(crate) processes: DashMap<ProcessId, /* name */ String>,
next_registration_id: AtomicU64,
next_node_id: AtomicU64,
next_module_id: AtomicU64,
Expand All @@ -52,6 +65,7 @@ impl ControlServer {
registrations: DashMap::new(),
nodes: DashMap::new(),
modules: DashMap::new(),
processes: DashMap::new(),
next_registration_id: AtomicU64::new(1),
next_node_id: AtomicU64::new(1),
next_module_id: AtomicU64::new(1),
Expand Down
5 changes: 5 additions & 0 deletions crates/lunatic-distributed/src/control/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ pub struct ControlUrls {
pub get_module: String,
pub add_module: String,
pub get_nodes: String,

/// Get a process
pub get_process: String,
pub add_process: String,
pub remove_process: String,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down

0 comments on commit e7b5da3

Please sign in to comment.