Skip to content

Commit

Permalink
refactor: fixing clippy issues and minor code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
bochaco committed Dec 30, 2024
1 parent f8babc8 commit 302d82d
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 206 deletions.
8 changes: 2 additions & 6 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,13 @@ pub struct BatchInProgress {
// List of nodes which status is temporarily immutable/locked,
// along with expiration information for when it should be unlocked.
#[cfg(feature = "ssr")]
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct ImmutableNodeStatus(
Arc<Mutex<HashMap<super::node_instance::ContainerId, (Instant, Duration)>>>,
);

#[cfg(feature = "ssr")]
impl ImmutableNodeStatus {
pub fn new() -> Self {
Self(Arc::new(Mutex::new(HashMap::new())))
}

pub async fn insert(&self, container_id: ContainerId, expiration: Duration) {
self.0
.lock()
Expand Down Expand Up @@ -216,7 +212,7 @@ pub fn App() -> impl IntoView {
alerts: RwSignal::new(vec![]),
batch_in_progress: RwSignal::new(None),
selecting_nodes: RwSignal::new((false, false, HashSet::new())),
nodes_sort_strategy: RwSignal::new(NodesSortStrategy::ByCreationDate(true)),
nodes_sort_strategy: RwSignal::new(NodesSortStrategy::CreationDate(true)),
});

// spawn poller task only on client side
Expand Down
6 changes: 3 additions & 3 deletions src/bg_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ async fn retrieve_current_rewards_balances<T: Transport + Clone, P: Provider<T,
}
}

let balance: U256 = updated_balances.iter().map(|(_, b)| b).sum();
let balance: U256 = updated_balances.values().sum();
update_lcd_stats(&lcd_stats, &[(LCD_LABEL_BALANCE, balance.to_string())]).await;
}

Expand All @@ -542,7 +542,7 @@ async fn update_lcd_stats(
) {
let mut s = lcd_stats.lock().await;
labels_vals
.into_iter()
.iter()
.filter(|(l, v)| !l.is_empty() && !v.is_empty())
.for_each(|(label, value)| {
let _ = s.insert(label.to_string(), value.clone());
Expand All @@ -552,7 +552,7 @@ async fn update_lcd_stats(
// Helper to remove stats being displayed on external LCD device
async fn remove_lcd_stats(lcd_stats: &Arc<Mutex<HashMap<String, String>>>, labels: &[&str]) {
let mut s = lcd_stats.lock().await;
labels.into_iter().for_each(|label| {
labels.iter().for_each(|label| {
let _ = s.remove(*label);
});
}
23 changes: 11 additions & 12 deletions src/db_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ impl CachedNodeMetadata {
}
if !self.rewards.is_empty() {
if let Ok(v) = U256::from_str(&self.rewards) {
info.rewards = Some(v.into());
info.rewards = Some(v);
}
}
if !self.balance.is_empty() {
if let Ok(v) = U256::from_str(&self.balance) {
info.balance = Some(v.into());
info.balance = Some(v);
}
}
if let Ok(v) = self.records.parse::<usize>() {
Expand Down Expand Up @@ -129,7 +129,7 @@ impl DbClient {
Ok(()) => logging::log!("Created database successfully!"),
Err(err) => {
logging::log!("Failed to create database: {err}");
return Err(err.into());
return Err(err);
}
}
}
Expand Down Expand Up @@ -213,18 +213,17 @@ impl DbClient {

// Insert node metadata onto local cache DB
pub async fn insert_node_metadata(&self, info: &NodeInstanceInfo) {
let query_str = format!(
"INSERT OR REPLACE INTO nodes (\
let query_str = "INSERT OR REPLACE INTO nodes (\
container_id, status, port, \
records, connected_peers, kbuckets_peers \
) VALUES (?, ?, ?, ?, ?, ?)"
);
.to_string();

let db_lock = self.db.lock().await;
match sqlx::query(&query_str)
.bind(info.container_id.clone())
.bind(json!(info.status).to_string())
.bind(info.port.clone())
.bind(info.port)
.bind(info.records.map_or("".to_string(), |v| v.to_string()))
.bind(
info.connected_peers
Expand Down Expand Up @@ -462,7 +461,7 @@ impl DbClient {
match sqlx::query_as::<_, CachedSettings>("SELECT * FROM settings")
.fetch_all(&*db_lock)
.await
.map(|s| s.get(0).cloned())
.map(|s| s.first().cloned())
{
Ok(Some(s)) => AppSettings {
nodes_auto_upgrade: s.nodes_auto_upgrade,
Expand All @@ -474,11 +473,11 @@ impl DbClient {
rewards_balances_retrieval_freq: Duration::from_secs(
s.rewards_balances_retrieval_freq_secs,
),
l2_network_rpc_url: s.l2_network_rpc_url,
token_contract_address: s.token_contract_address,
l2_network_rpc_url: s.l2_network_rpc_url.clone(),
token_contract_address: s.token_contract_address.clone(),
lcd_display_enabled: s.lcd_display_enabled,
lcd_device: s.lcd_device,
lcd_addr: s.lcd_addr,
lcd_device: s.lcd_device.clone(),
lcd_addr: s.lcd_addr.clone(),
},
Ok(None) => {
logging::log!("No settings found in DB, we'll be using defaults.");
Expand Down
184 changes: 89 additions & 95 deletions src/docker_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,84 @@ impl ReqMethod {
fn post_empty_body() -> Self {
Self::Post("".to_string())
}

// Send request to Docker server
async fn try_send_request(
&self,
base_url: &str,
query_params: &[(&str, &str)],
socket_path: &Path,
) -> Result<Response<Incoming>, DockerClientError> {
let unix_stream = UnixStream::connect(socket_path).await.map_err(|err| {
DockerClientError::ClientError(format!(
"Failed to connect to Docker socket at {socket_path:?}: {err}"
))
})?;
let io = TokioIo::new(unix_stream);
let (mut docker_reqs_sender, connection) = conn::http1::handshake(io).await?;
tokio::spawn(async move {
if let Err(err) = connection.await {
logging::log!("Error when connecting to Docker: {err}");
}
});

// Construct the query string using url::form_urlencoded
let query_string = form_urlencoded::Serializer::new(String::new())
.extend_pairs(query_params)
.finish();

// Construct the full URL with query parameters
let full_url = format!("{base_url}?{query_string}");

let req_builder = Request::builder()
.uri(full_url)
// Host added just because http1 requires it
.header("Host", "localhost");

let req = match self {
ReqMethod::Post(body_str) => req_builder
.method(Method::POST)
.header(CONTENT_TYPE, "application/json")
.body(Body::from(body_str.clone()))?,
ReqMethod::Put(bytes) => req_builder
.header(CONTENT_TYPE, "application/octet-stream")
.header(CONTENT_LENGTH, bytes.len())
.method(Method::PUT)
.body(Body::from(bytes.clone()))?,
ReqMethod::Get => req_builder.method(Method::GET).body(Body::from(()))?,
ReqMethod::Delete => req_builder.method(Method::DELETE).body(Body::from(()))?,
};

let resp = docker_reqs_sender.send_request(req).await?;

match resp.status() {
StatusCode::NO_CONTENT | StatusCode::CREATED | StatusCode::OK => Ok(resp),
StatusCode::NOT_FOUND => {
let resp_bytes = get_response_bytes(resp).await?;
let msg: ServerErrorMessage = serde_json::from_slice(&resp_bytes)?;
logging::log!("404 ERROR: {}", msg.message);
// TODO: unfortunatelly the API returns different error
// msgs instead of different error codes to properly handle them
if msg.message.starts_with("No such image") {
Err(DockerClientError::ImageNotFound)
} else {
Err(DockerClientError::DockerServerError((
StatusCode::NOT_FOUND.into(),
msg.message,
)))
}
}
other => {
let resp_bytes = get_response_bytes(resp).await?;
let msg = match serde_json::from_slice::<ServerErrorMessage>(&resp_bytes) {
Ok(msg) => msg.message,
Err(_) => String::from_utf8_lossy(&resp_bytes).to_string(),
};
logging::log!("ERROR: {other:?} - {msg}");
Err(DockerClientError::DockerServerError((other.into(), msg)))
}
}
}
}

// Client to send requests to a Docker server's API
Expand Down Expand Up @@ -226,7 +304,7 @@ impl DockerClient {
let url = format!("{DOCKER_CONTAINERS_API}/create");
// we don't expose/map the metrics_port from here since we had to expose it
// with nginx from within the dockerfile.
let mapped_ports = vec![port];
let mapped_ports = [port];

let mut labels = vec![
(LABEL_KEY_VERSION.to_string(), self.node_image_tag.clone()),
Expand Down Expand Up @@ -268,13 +346,12 @@ impl DockerClient {
}],
)
})
.into_iter()
.collect::<PortBindings>(),
),
}),
};

let random_name = hex::encode(rand::random::<[u8; 10]>().to_vec());
let random_name = hex::encode(rand::random::<[u8; 10]>());
logging::log!(
"[CREATE] Sending Docker request to CREATE a new container (named: {random_name}): {url} ..."
);
Expand Down Expand Up @@ -379,11 +456,9 @@ impl DockerClient {
.exec_in_container(id, cmd, "get node bin version")
.await?;

let version = if let Some(v) = resp_str.strip_prefix("Autonomi Node v") {
Some(v.replace('\n', "").replace('\r', ""))
} else {
None
};
let version = resp_str
.strip_prefix("Autonomi Node v")
.map(|v| v.replace(['\n', '\r'], ""));
logging::log!("Node bin version in container {id}: {version:?}");

let cmd = "cat node_data/secret-key | od -A n -t x1 | tr -d ' \n'".to_string();
Expand Down Expand Up @@ -484,15 +559,15 @@ impl DockerClient {
url: &str,
query: &[(&str, &str)],
) -> Result<Vec<u8>, DockerClientError> {
let resp = match self.try_send_request(&method, url, query).await {
let resp = match method.try_send_request(url, query, &self.socket_path).await {
Err(DockerClientError::ImageNotFound) => {
logging::log!(
"We need to pull the formica image: {}.",
self.node_image_name
);
// let's pull the image before retrying
self.pull_formica_image().await?;
self.try_send_request(&method, url, query).await
method.try_send_request(url, query, &self.socket_path).await
}
other => other,
}?;
Expand All @@ -508,15 +583,15 @@ impl DockerClient {
url: &str,
query: &[(&str, &str)],
) -> Result<impl Stream<Item = Result<Bytes, DockerClientError>>, DockerClientError> {
let resp = match self.try_send_request(&method, url, query).await {
let resp = match method.try_send_request(url, query, &self.socket_path).await {
Err(DockerClientError::ImageNotFound) => {
logging::log!(
"We need to pull the formica image: {} ...",
self.node_image_name
);
// let's pull the image before retrying
self.pull_formica_image().await?;
self.try_send_request(&method, url, query).await
method.try_send_request(url, query, &self.socket_path).await
}
other => other,
}?;
Expand All @@ -536,8 +611,8 @@ impl DockerClient {
("fromImage", self.node_image_name.as_str()),
("tag", self.node_image_tag.as_str()),
];
let resp = self
.try_send_request(&ReqMethod::post_empty_body(), &url, query)
let resp = ReqMethod::post_empty_body()
.try_send_request(&url, query, &self.socket_path)
.await?;

// consume and await end of response stream, discarding the bytes
Expand All @@ -547,87 +622,6 @@ impl DockerClient {
//logging::log!("Formica image {NODE_CONTAINER_IMAGE_NAME} was successfully pulled!");
Ok(())
}

// Send request to Docker server
async fn try_send_request(
&self,
method: &ReqMethod,
base_url: &str,
query_params: &[(&str, &str)],
) -> Result<Response<Incoming>, DockerClientError> {
let unix_stream = UnixStream::connect(&self.socket_path)
.await
.map_err(|err| {
DockerClientError::ClientError(format!(
"Failed to connect to Docker socket at {:?}: {err}",
self.socket_path
))
})?;
let io = TokioIo::new(unix_stream);
let (mut docker_reqs_sender, connection) = conn::http1::handshake(io).await?;
tokio::spawn(async move {
if let Err(err) = connection.await {
logging::log!("Error when connecting to Docker: {err}");
}
});

// Construct the query string using url::form_urlencoded
let query_string = form_urlencoded::Serializer::new(String::new())
.extend_pairs(query_params)
.finish();

// Construct the full URL with query parameters
let full_url = format!("{base_url}?{query_string}");

let req_builder = Request::builder()
.uri(full_url)
// Host added just because http1 requires it
.header("Host", "localhost");

let req = match method {
ReqMethod::Post(body_str) => req_builder
.method(Method::POST)
.header(CONTENT_TYPE, "application/json")
.body(Body::from(body_str.clone()))?,
ReqMethod::Put(bytes) => req_builder
.header(CONTENT_TYPE, "application/octet-stream")
.header(CONTENT_LENGTH, bytes.len())
.method(Method::PUT)
.body(Body::from(bytes.clone()))?,
ReqMethod::Get => req_builder.method(Method::GET).body(Body::from(()))?,
ReqMethod::Delete => req_builder.method(Method::DELETE).body(Body::from(()))?,
};

let resp = docker_reqs_sender.send_request(req).await?;

match resp.status() {
StatusCode::NO_CONTENT | StatusCode::CREATED | StatusCode::OK => Ok(resp),
StatusCode::NOT_FOUND => {
let resp_bytes = get_response_bytes(resp).await?;
let msg: ServerErrorMessage = serde_json::from_slice(&resp_bytes)?;
logging::log!("404 ERROR: {}", msg.message);
// TODO: unfortunatelly the API returns different error
// msgs instead of different error codes to properly handle them
if msg.message.starts_with("No such image") {
Err(DockerClientError::ImageNotFound)
} else {
Err(DockerClientError::DockerServerError((
StatusCode::NOT_FOUND.into(),
msg.message,
)))
}
}
other => {
let resp_bytes = get_response_bytes(resp).await?;
let msg = match serde_json::from_slice::<ServerErrorMessage>(&resp_bytes) {
Ok(msg) => msg.message,
Err(_) => String::from_utf8_lossy(&resp_bytes).to_string(),
};
logging::log!("ERROR: {other:?} - {msg}");
Err(DockerClientError::DockerServerError((other.into(), msg)))
}
}
}
}

// Convert a Response into a Stream of its body bytes.
Expand Down
Loading

0 comments on commit 302d82d

Please sign in to comment.