Skip to content

Commit

Permalink
feat(server): binding listener on fragment request
Browse files Browse the repository at this point in the history
  • Loading branch information
jabibamman committed Nov 24, 2023
1 parent 9e19a82 commit 7b729c2
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 17 deletions.
18 changes: 16 additions & 2 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use crate::image::open_image;
use cli::operation::parse_to_address;
use cli::parser::{CliArgs, CliClientArgs, Parser};
use server::services::{connect::connect, reader::get_response, write::write};
use server::messages::serialization::serialize_request;
use shared::types::filesystem::FileExtension;
use shared::types::fractal_descriptor::FractalType::IteratedSinZ;
use shared::types::fractal_descriptor::{FractalDescriptor, IteratedSinZDescriptor};
use shared::types::messages::FragmentTask;
use shared::types::messages::{FragmentTask, FragmentRequest};
use shared::types::point::Point;
use shared::types::range::Range;
use shared::types::u8data::U8Data;
Expand All @@ -23,9 +24,22 @@ fn main() -> io::Result<()> {
let cli_args: CliArgs = CliArgs::Client(CliClientArgs::parse());
let connection_result = connect(&parse_to_address(cli_args));

let fragment_request = FragmentRequest {
worker_name: "Worker 1".to_string(),
maximal_work_load: 1000,
};

let serialized_request = match serialize_request(&fragment_request) {
Ok(serialized_request) => serialized_request,
Err(e) => {
eprintln!("Erreur lors de la sérialisation de la requête : {}", e);
return Ok(());
}
};

if let Ok(mut stream) = connection_result {
println!("Connected to the server!");
match write(&mut stream, "Hello World !") {
match write(&mut stream, &serialized_request) {
Ok(_) => println!("Message sent!"),
Err(error) => println!("Failed to send message: {}", error),
}
Expand Down
2 changes: 2 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ edition = "2021"
complex = { path = "../complex" }
shared = { path = "../shared" }
cli = { path = "../cli" }
serde = { version = "1.0.193", features = ["derive"] }
serde_json ="1.0.108"

[lib]
path = "src/lib.rs"
4 changes: 3 additions & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod handler;
pub mod messages;
pub mod image;
pub mod services;
pub use serde::{Deserialize, Serialize};

9 changes: 9 additions & 0 deletions server/src/messages/fragment_maker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use shared::types::messages::{FragmentRequest, FragmentTask, FragmentResult};

pub fn create_task_for_request(_request: FragmentRequest) -> FragmentTask {
todo!()
}

pub fn process_result(_result: FragmentResult) {
todo!()
}
43 changes: 33 additions & 10 deletions server/src/handler.rs → server/src/messages/handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{io::Read, net::TcpStream};
use std::{io::{Read, self, BufReader}, net::TcpStream};

use shared::types::messages::Message;

use crate::services;

use super::{serialization::{deserialize_message, serialize_task}, fragment_maker::{create_task_for_request, process_result}};

/// Handles a client TCP stream.
///
Expand Down Expand Up @@ -26,18 +32,35 @@ use std::{io::Read, net::TcpStream};
/// }
/// }
/// ```
pub fn handle_client(mut stream: TcpStream) {
pub fn handle_client(mut stream: TcpStream) -> io::Result<()> {
match stream.local_addr() {
Ok(addr) => println!("[SERVER] Connection established {}", addr),
Err(e) => println!("[SERVER] Failed to get local address: {}", e),
}

let mut buffer = [0; 1024];
match stream.read(&mut buffer) {
Ok(_) => println!(
"[SERVER] Message received: {}",
String::from_utf8_lossy(&buffer[..])
),
Err(e) => println!("[SERVER] Error reading from stream: {}", e),
let mut stream_reader = BufReader::new(&mut stream);
let mut data = String::new();
stream_reader.read_to_string(&mut data)?;

let trimmed_data = data.trim();
let message_result = deserialize_message(trimmed_data);

match message_result {
Ok(Message::FragmentRequest(request)) => {
let task = create_task_for_request(request);
let serialized_task: String = serialize_task(&task)?;
services::write::write(&mut stream, &serialized_task)?;
}
Ok(Message::FragmentTask(_task)) => {
todo!()
}
Ok(Message::FragmentResult(result)) => {
process_result(result);
}
Err(e) => {
println!("[SERVER] Error deserializing request: {:?}", e);
}
}
}

Ok(())
}
3 changes: 3 additions & 0 deletions server/src/messages/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod handler;
pub mod serialization;
pub mod fragment_maker;
51 changes: 51 additions & 0 deletions server/src/messages/serialization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use serde_json::json;
use shared::types::messages::{FragmentRequest, Message, FragmentTask};
use serde::de::Error as SerdeError;

pub fn serialize_request(request: &FragmentRequest) -> Result<String, serde_json::Error> {
let request_details = json!({
"worker_name": &request.worker_name,
"maximal_work_load": request.maximal_work_load
});

let request = json!({
"FragmentRequest": request_details
});

serde_json::to_string(&request)
}

pub fn deserialize_request(response: &str) -> serde_json::Result<FragmentRequest> {
let response_value: serde_json::Value = serde_json::from_str(response)?;
let response_details = response_value.get("FragmentRequest").ok_or_else(|| SerdeError::custom("FragmentRequest not found"))?;

let worker_name = response_details.get("worker_name").and_then(|c| c.as_str()).ok_or_else(|| SerdeError::custom("Invalid worker name"))?;
let maximal_work_load = response_details.get("maximal_work_load").and_then(|c| c.as_u64()).ok_or_else(|| SerdeError::custom("Invalid maximal work load"))?;

let response = FragmentRequest {
worker_name: worker_name.to_string(),
maximal_work_load: maximal_work_load as u32,
};

Ok(response)
}

pub fn deserialize_message(response: &str) -> serde_json::Result<Message> {
let response_value: serde_json::Value = serde_json::from_str(response)?;
match response_value.as_object().and_then(|obj| obj.keys().next()) {
Some(key) if key == "FragmentRequest" => {
deserialize_request(response).map(Message::FragmentRequest)
}
Some(key) if key == "FragmentTask" => {
todo!()
}
Some(key) if key == "FragmentResult" => {
todo!()
}
_ => Err(serde_json::Error::custom("No recognizable message type found"))
}
}

pub fn serialize_task(_task: &FragmentTask) -> Result<String, serde_json::Error> {
todo!()
}
4 changes: 2 additions & 2 deletions server/src/services/server_runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::handler::handle_client;
use crate::messages::handler::handle_client;
use std::net::TcpListener;

/// Starts a TCP server on the specified address.
Expand Down Expand Up @@ -38,7 +38,7 @@ pub fn run_server(address: &str) -> std::io::Result<()> {
};

for stream in listener.incoming() {
handle_client(stream?);
let _ = handle_client(stream?);
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ path = "src/lib.rs"

[dependencies]
rand = "0.8.5"
serde = { version = "1.0.193", features = ["derive"] }
serde_json ="1.0.108"

[dev-dependencies]
tempfile = "3.8.1"
12 changes: 10 additions & 2 deletions shared/src/types/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ use crate::types::range::Range;
use crate::types::resolution::Resolution;
use crate::types::u8data::U8Data;


/// Represents a request for a fragment of work from a worker.
///
/// Attributes:
/// - `worker_name`: A `String` representing the name of the worker making the request.
/// - `maximal_work_load`: An `u32` indicating the maximum workload (in terms of pixels) the worker can handle.
#[derive(Debug, Clone, PartialEq)]
pub struct FragmentRequest {
worker_name: String,
maximal_work_load: u32,
pub worker_name: String,
pub maximal_work_load: u32,
}

/// Describes a task assigned to a worker for fractal computation by a Server.
Expand Down Expand Up @@ -46,3 +47,10 @@ pub struct FragmentResult {
range: Range,
pixels: PixelData,
}

#[derive(Debug, Clone, PartialEq)]
pub enum Message {
FragmentRequest(FragmentRequest),
FragmentTask(FragmentTask),
FragmentResult(FragmentResult),
}

0 comments on commit 7b729c2

Please sign in to comment.