From 7b729c2c1daf13ba2f56d500316317b2ed471c16 Mon Sep 17 00:00:00 2001 From: James ABIB Date: Fri, 24 Nov 2023 23:41:35 +0100 Subject: [PATCH] feat(server): binding listener on fragment request --- client/src/main.rs | 18 ++++++++-- server/Cargo.toml | 2 ++ server/src/lib.rs | 4 ++- server/src/messages/fragment_maker.rs | 9 +++++ server/src/{ => messages}/handler.rs | 43 ++++++++++++++++------ server/src/messages/mod.rs | 3 ++ server/src/messages/serialization.rs | 51 +++++++++++++++++++++++++++ server/src/services/server_runner.rs | 4 +-- shared/Cargo.toml | 2 ++ shared/src/types/messages.rs | 12 +++++-- 10 files changed, 131 insertions(+), 17 deletions(-) create mode 100644 server/src/messages/fragment_maker.rs rename server/src/{ => messages}/handler.rs (53%) create mode 100644 server/src/messages/mod.rs create mode 100644 server/src/messages/serialization.rs diff --git a/client/src/main.rs b/client/src/main.rs index d5725e0..3be2330 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -9,10 +9,11 @@ use crate::image::open_image; use cli::operation::parse_to_address; use cli::parser::{CliArgs, CliClientArgs, Parser}; use server::services::{connect::connect, reader::get_response, write::write}; +use server::messages::serialization::serialize_request; use shared::types::filesystem::FileExtension; use shared::types::fractal_descriptor::FractalType::IteratedSinZ; use shared::types::fractal_descriptor::{FractalDescriptor, IteratedSinZDescriptor}; -use shared::types::messages::FragmentTask; +use shared::types::messages::{FragmentTask, FragmentRequest}; use shared::types::point::Point; use shared::types::range::Range; use shared::types::u8data::U8Data; @@ -23,9 +24,22 @@ fn main() -> io::Result<()> { let cli_args: CliArgs = CliArgs::Client(CliClientArgs::parse()); let connection_result = connect(&parse_to_address(cli_args)); + let fragment_request = FragmentRequest { + worker_name: "Worker 1".to_string(), + maximal_work_load: 1000, + }; + + let serialized_request = match serialize_request(&fragment_request) { + Ok(serialized_request) => serialized_request, + Err(e) => { + eprintln!("Erreur lors de la sĂ©rialisation de la requĂȘte : {}", e); + return Ok(()); + } + }; + if let Ok(mut stream) = connection_result { println!("Connected to the server!"); - match write(&mut stream, "Hello World !") { + match write(&mut stream, &serialized_request) { Ok(_) => println!("Message sent!"), Err(error) => println!("Failed to send message: {}", error), } diff --git a/server/Cargo.toml b/server/Cargo.toml index daaa073..09ac827 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,6 +10,8 @@ edition = "2021" complex = { path = "../complex" } shared = { path = "../shared" } cli = { path = "../cli" } +serde = { version = "1.0.193", features = ["derive"] } +serde_json ="1.0.108" [lib] path = "src/lib.rs" diff --git a/server/src/lib.rs b/server/src/lib.rs index c6b23f6..f08f60f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,3 +1,5 @@ -pub mod handler; +pub mod messages; pub mod image; pub mod services; +pub use serde::{Deserialize, Serialize}; + diff --git a/server/src/messages/fragment_maker.rs b/server/src/messages/fragment_maker.rs new file mode 100644 index 0000000..befc2ec --- /dev/null +++ b/server/src/messages/fragment_maker.rs @@ -0,0 +1,9 @@ +use shared::types::messages::{FragmentRequest, FragmentTask, FragmentResult}; + +pub fn create_task_for_request(_request: FragmentRequest) -> FragmentTask { + todo!() +} + +pub fn process_result(_result: FragmentResult) { + todo!() +} \ No newline at end of file diff --git a/server/src/handler.rs b/server/src/messages/handler.rs similarity index 53% rename from server/src/handler.rs rename to server/src/messages/handler.rs index 8061021..bf1038f 100644 --- a/server/src/handler.rs +++ b/server/src/messages/handler.rs @@ -1,4 +1,10 @@ -use std::{io::Read, net::TcpStream}; +use std::{io::{Read, self, BufReader}, net::TcpStream}; + +use shared::types::messages::Message; + +use crate::services; + +use super::{serialization::{deserialize_message, serialize_task}, fragment_maker::{create_task_for_request, process_result}}; /// Handles a client TCP stream. /// @@ -26,18 +32,35 @@ use std::{io::Read, net::TcpStream}; /// } /// } /// ``` -pub fn handle_client(mut stream: TcpStream) { +pub fn handle_client(mut stream: TcpStream) -> io::Result<()> { match stream.local_addr() { Ok(addr) => println!("[SERVER] Connection established {}", addr), Err(e) => println!("[SERVER] Failed to get local address: {}", e), } - let mut buffer = [0; 1024]; - match stream.read(&mut buffer) { - Ok(_) => println!( - "[SERVER] Message received: {}", - String::from_utf8_lossy(&buffer[..]) - ), - Err(e) => println!("[SERVER] Error reading from stream: {}", e), + let mut stream_reader = BufReader::new(&mut stream); + let mut data = String::new(); + stream_reader.read_to_string(&mut data)?; + + let trimmed_data = data.trim(); + let message_result = deserialize_message(trimmed_data); + + match message_result { + Ok(Message::FragmentRequest(request)) => { + let task = create_task_for_request(request); + let serialized_task: String = serialize_task(&task)?; + services::write::write(&mut stream, &serialized_task)?; + } + Ok(Message::FragmentTask(_task)) => { + todo!() + } + Ok(Message::FragmentResult(result)) => { + process_result(result); + } + Err(e) => { + println!("[SERVER] Error deserializing request: {:?}", e); + } } -} + + Ok(()) +} \ No newline at end of file diff --git a/server/src/messages/mod.rs b/server/src/messages/mod.rs new file mode 100644 index 0000000..4e4bc5f --- /dev/null +++ b/server/src/messages/mod.rs @@ -0,0 +1,3 @@ +pub mod handler; +pub mod serialization; +pub mod fragment_maker; \ No newline at end of file diff --git a/server/src/messages/serialization.rs b/server/src/messages/serialization.rs new file mode 100644 index 0000000..5a8fd89 --- /dev/null +++ b/server/src/messages/serialization.rs @@ -0,0 +1,51 @@ +use serde_json::json; +use shared::types::messages::{FragmentRequest, Message, FragmentTask}; +use serde::de::Error as SerdeError; + +pub fn serialize_request(request: &FragmentRequest) -> Result { + let request_details = json!({ + "worker_name": &request.worker_name, + "maximal_work_load": request.maximal_work_load + }); + + let request = json!({ + "FragmentRequest": request_details + }); + + serde_json::to_string(&request) +} + +pub fn deserialize_request(response: &str) -> serde_json::Result { + let response_value: serde_json::Value = serde_json::from_str(response)?; + let response_details = response_value.get("FragmentRequest").ok_or_else(|| SerdeError::custom("FragmentRequest not found"))?; + + let worker_name = response_details.get("worker_name").and_then(|c| c.as_str()).ok_or_else(|| SerdeError::custom("Invalid worker name"))?; + let maximal_work_load = response_details.get("maximal_work_load").and_then(|c| c.as_u64()).ok_or_else(|| SerdeError::custom("Invalid maximal work load"))?; + + let response = FragmentRequest { + worker_name: worker_name.to_string(), + maximal_work_load: maximal_work_load as u32, + }; + + Ok(response) +} + +pub fn deserialize_message(response: &str) -> serde_json::Result { + let response_value: serde_json::Value = serde_json::from_str(response)?; + match response_value.as_object().and_then(|obj| obj.keys().next()) { + Some(key) if key == "FragmentRequest" => { + deserialize_request(response).map(Message::FragmentRequest) + } + Some(key) if key == "FragmentTask" => { + todo!() + } + Some(key) if key == "FragmentResult" => { + todo!() + } + _ => Err(serde_json::Error::custom("No recognizable message type found")) + } +} + +pub fn serialize_task(_task: &FragmentTask) -> Result { + todo!() +} \ No newline at end of file diff --git a/server/src/services/server_runner.rs b/server/src/services/server_runner.rs index 0ea1cb6..9a7d8be 100644 --- a/server/src/services/server_runner.rs +++ b/server/src/services/server_runner.rs @@ -1,4 +1,4 @@ -use crate::handler::handle_client; +use crate::messages::handler::handle_client; use std::net::TcpListener; /// Starts a TCP server on the specified address. @@ -38,7 +38,7 @@ pub fn run_server(address: &str) -> std::io::Result<()> { }; for stream in listener.incoming() { - handle_client(stream?); + let _ = handle_client(stream?); } Ok(()) diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 9fd1d3b..3b42f19 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -10,6 +10,8 @@ path = "src/lib.rs" [dependencies] rand = "0.8.5" +serde = { version = "1.0.193", features = ["derive"] } +serde_json ="1.0.108" [dev-dependencies] tempfile = "3.8.1" diff --git a/shared/src/types/messages.rs b/shared/src/types/messages.rs index ac7c8d2..e1255bd 100644 --- a/shared/src/types/messages.rs +++ b/shared/src/types/messages.rs @@ -4,6 +4,7 @@ use crate::types::range::Range; use crate::types::resolution::Resolution; use crate::types::u8data::U8Data; + /// Represents a request for a fragment of work from a worker. /// /// Attributes: @@ -11,8 +12,8 @@ use crate::types::u8data::U8Data; /// - `maximal_work_load`: An `u32` indicating the maximum workload (in terms of pixels) the worker can handle. #[derive(Debug, Clone, PartialEq)] pub struct FragmentRequest { - worker_name: String, - maximal_work_load: u32, + pub worker_name: String, + pub maximal_work_load: u32, } /// Describes a task assigned to a worker for fractal computation by a Server. @@ -46,3 +47,10 @@ pub struct FragmentResult { range: Range, pixels: PixelData, } + +#[derive(Debug, Clone, PartialEq)] +pub enum Message { + FragmentRequest(FragmentRequest), + FragmentTask(FragmentTask), + FragmentResult(FragmentResult), +}