diff --git a/src/client.rs b/src/client.rs index 2123640..47f20b3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -48,7 +48,7 @@ impl WorkerPool { let receiver = Arc::new(Mutex::new(receiver)); for id in 0..settings.workers { - debug!("Worker {} creating...", id); + info!("Worker {} creating...", id); let conn_settings = connect::ConnectionSettings { connect_retry_initial_delay: settings.connection_retry_initial_delay, connect_retry_max_delay: settings.connection_retry_max_delay, @@ -113,21 +113,21 @@ impl Client for WorkerPool { fn close(&mut self) { if self.closed.fetch_or(true, Ordering::SeqCst) { - debug!("Workers are already closed."); + info!("Workers are already closed."); return; } - debug!("Sending terminate message to all workers."); + info!("Sending terminate message to all workers."); for _ in &mut self.workers { let sender = self.sender.clone(); sender.send(Message::Terminate).unwrap(); } - debug!("Shutting down all workers."); + info!("Shutting down all workers."); for wkr in &mut self.workers { - debug!("Shutting down worker {}", wkr.id); + info!("Shutting down worker {}", wkr.id); if let Some(w) = wkr.handler.take() { w.join().unwrap(); diff --git a/src/connect.rs b/src/connect.rs index a3e4526..7992bc6 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -1,9 +1,11 @@ +use crate::buffer; +use crate::error::Error as MyError; use backoff::{Error, ExponentialBackoff, Operation}; use std::cell::RefCell; use std::fmt::Debug; -use std::io::{self, ErrorKind, Read, Write}; +use std::io::{self, Read, Write}; use std::net::{TcpStream, ToSocketAddrs}; -use std::time::{Duration, Instant}; +use std::time::Duration; pub trait Connect where @@ -25,20 +27,17 @@ pub trait Reconnect { } pub trait ConnectRetryDelay { - fn now(&self) -> Instant; fn connect_retry_initial_delay(&self) -> Duration; fn connect_retry_timeout(&self) -> Duration; fn connect_retry_max_delay(&self) -> Duration; } +pub trait WriteRead { + fn write_and_read(&mut self, buf: &[u8], chunk: &str) -> Result<(), Error>; +} + #[derive(Debug)] -pub struct Stream -where - A: ToSocketAddrs + Clone + Debug, - S: ReconnectableWrite, - S: Connect, - S: Write + Read + TcpConfig, -{ +pub struct Stream { pub addr: A, pub stream: RefCell, pub settings: ConnectionSettings, @@ -60,12 +59,10 @@ pub struct ConnectionSettings { impl Stream where A: ToSocketAddrs + Clone + Debug, - S: ReconnectableWrite, - S: Connect, - S: Write + Read + TcpConfig, + S: Connect + TcpConfig, { pub fn connect(addr: A, settings: ConnectionSettings) -> io::Result> { - let stream = S::connect_with_retry(addr.clone(), settings)?; + let stream = connect_with_retry(addr.clone(), settings)?; let stream = RefCell::new(stream); Ok(Stream { addr, @@ -73,17 +70,25 @@ where settings, }) } + + fn write_retry_initial_delay(&self) -> Duration { + self.settings.write_retry_initial_delay + } + fn write_retry_timeout(&self) -> Duration { + self.settings.write_retry_timeout + } + fn write_retry_max_delay(&self) -> Duration { + self.settings.write_retry_max_delay + } } impl Reconnect for Stream where A: ToSocketAddrs + Clone + Debug, - S: ReconnectableWrite, - S: Connect, - S: Write + Read + TcpConfig, + S: Connect + TcpConfig, { fn reconnect(&mut self) -> io::Result<()> { - let stream = S::connect_with_retry(self.addr.clone(), self.settings)?; + let stream = connect_with_retry(self.addr.clone(), self.settings)?; *self.stream.borrow_mut() = stream; Ok(()) } @@ -91,10 +96,7 @@ where impl Write for Stream where - A: ToSocketAddrs + Clone + Debug, - S: ReconnectableWrite, - S: Connect, - S: Write + Read + TcpConfig, + S: Write, { fn write(&mut self, buf: &[u8]) -> io::Result { self.stream.borrow_mut().write(buf) @@ -107,16 +109,80 @@ where impl Read for Stream where - A: ToSocketAddrs + Clone + Debug, - S: ReconnectableWrite, - S: Connect, - S: Write + Read + TcpConfig, + S: Read, { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.stream.borrow_mut().read(buf) } } +impl WriteRead for Stream +where + A: ToSocketAddrs + Clone + Debug, + S: Connect + TcpConfig + Read + Write, +{ + fn write_and_read(&mut self, buf: &[u8], chunk: &str) -> Result<(), Error> { + let mut backoff = ExponentialBackoff { + current_interval: self.write_retry_initial_delay(), + initial_interval: self.write_retry_initial_delay(), + max_interval: self.write_retry_max_delay(), + max_elapsed_time: Some(self.write_retry_timeout()), + ..Default::default() + }; + + let mut op = || { + if let Err(err) = self.write(buf) { + warn!( + "Failed to write message, chunk: {}. cause: {:?}.", + chunk, err + ); + let e = match err.kind() { + io::ErrorKind::BrokenPipe + | io::ErrorKind::NotConnected + | io::ErrorKind::ConnectionRefused + | io::ErrorKind::ConnectionAborted => self.reconnect(), + _ => Err(err), + }; + return e.map_err(|e| Error::Transient(MyError::NetworkError(e))); + } + + let mut resp_buf = [0u8; 64]; + let read_size = self.read(&mut resp_buf).map_err(|e| { + warn!("Failed to read response, chunk: {}, cause: {:?}.", chunk, e); + Error::Transient(MyError::NetworkError(e)) + })?; + + if read_size == 0 { + warn!("Received empty response, chunk: {}.", chunk); + if let Err(err) = self.reconnect() { + warn!("Failed to reconnect: {:?}.", err); + } + Err(Error::Transient(MyError::NoAckResponseError)) + } else { + let reply = + buffer::unpack_response(&resp_buf, read_size).map_err(Error::Transient)?; + if reply.ack == chunk { + Ok(()) + } else { + warn!( + "Did not match ack and chunk, ack: {}, chunk: {}.", + reply.ack, chunk + ); + if let Err(err) = self.reconnect() { + warn!("Failed to reconnect: {:?}.", err); + } + Err(Error::Transient(MyError::AckUmatchedError( + reply.ack, + chunk.to_string(), + ))) + } + } + }; + + op.retry(&mut backoff) + } +} + impl TcpConfig for TcpStream { fn set_nodelay(&self, v: bool) -> io::Result<()> { self.set_nodelay(v) @@ -140,135 +206,45 @@ impl Connect for TcpStream { } } -pub trait ReconnectableWrite -where - T: Connect, - T: Write + Read + TcpConfig, -{ - fn connect_with_retry(addr: A, settings: ConnectionSettings) -> io::Result - where - A: ToSocketAddrs + Clone + Debug; -} - -impl ReconnectableWrite for C -where - C: Connect, - C: Write + Read + TcpConfig, -{ - fn connect_with_retry(addr: A, settings: ConnectionSettings) -> io::Result - where - A: ToSocketAddrs + Clone + Debug, - { - let mut backoff = ExponentialBackoff { - current_interval: settings.connect_retry_initial_delay, - initial_interval: settings.connect_retry_initial_delay, - max_interval: settings.connect_retry_max_delay, - max_elapsed_time: Some(settings.connect_retry_timeout), - ..Default::default() - }; - - let mut op = || { - let addr = addr.clone(); - debug!("Start connect to {:?}.", addr); - C::connect(addr) - .map(|s| { - s.set_nodelay(true).unwrap(); - s.set_read_timeout(Some(settings.read_timeout)).unwrap(); - s.set_write_timeout(Some(settings.write_timeout)).unwrap(); - s - }) - .map_err(Error::Transient) - }; - - op.retry(&mut backoff).map_err(|err| match err { - Error::Transient(e) => e, - Error::Permanent(e) => e, - }) - } -} - -pub trait WriteRetryDelay { - fn now(&self) -> Instant; - fn write_retry_initial_delay(&self) -> Duration; - fn write_retry_timeout(&self) -> Duration; - fn write_retry_max_delay(&self) -> Duration; -} - -impl WriteRetryDelay for Stream +fn connect_with_retry(addr: A, settings: ConnectionSettings) -> io::Result where A: ToSocketAddrs + Clone + Debug, - S: ReconnectableWrite, - S: Connect, - S: Write + Read + TcpConfig, + C: Connect + TcpConfig, { - fn now(&self) -> Instant { - Instant::now() - } - fn write_retry_initial_delay(&self) -> Duration { - self.settings.write_retry_initial_delay - } - fn write_retry_timeout(&self) -> Duration { - self.settings.write_retry_timeout - } - fn write_retry_max_delay(&self) -> Duration { - self.settings.write_retry_max_delay - } -} - -pub trait ReconnectWrite { - fn write(&mut self, buf: Vec) -> io::Result<()>; -} - -impl ReconnectWrite for W { - fn write(&mut self, buf: Vec) -> io::Result<()> { - let mut backoff = ExponentialBackoff { - current_interval: self.write_retry_initial_delay(), - initial_interval: self.write_retry_initial_delay(), - max_interval: self.write_retry_max_delay(), - start_time: self.now(), - max_elapsed_time: Some(self.write_retry_timeout()), - ..Default::default() - }; - - let mut op = || -> Result<(), Error> { - trace!("Start write entries."); - self.write_all(&buf[..]).map_err(|e| { - warn!("Write error found {:?}.", e); - // TODO: Consider handling by error kind - match e.kind() { - ErrorKind::BrokenPipe - | ErrorKind::NotConnected - | ErrorKind::ConnectionRefused - | ErrorKind::ConnectionAborted => { - debug!("Try reconnect."); - if let Err(e) = self.reconnect() { - Error::Permanent(e) - } else { - Error::Transient(e) - } - } - _ => Error::Transient(e), - } + let mut backoff = ExponentialBackoff { + current_interval: settings.connect_retry_initial_delay, + initial_interval: settings.connect_retry_initial_delay, + max_interval: settings.connect_retry_max_delay, + max_elapsed_time: Some(settings.connect_retry_timeout), + ..Default::default() + }; + + let mut op = || { + let addr = addr.clone(); + debug!("Start connect to {:?}.", addr); + C::connect(&addr) + .map(|s| { + s.set_nodelay(true).unwrap(); + s.set_read_timeout(Some(settings.read_timeout)).unwrap(); + s.set_write_timeout(Some(settings.write_timeout)).unwrap(); + s }) - }; + .map_err(|err| { + warn!("Failed to connect to {:?}.", addr); + Error::Transient(err) + }) + }; - op.retry(&mut backoff).map_err(|e| match e { - Error::Transient(e) => { - warn!("Failed to write entries: {}", e); - e - } - Error::Permanent(e) => { - warn!("Failed to write entries: {}", e); - e - } - }) - } + op.retry(&mut backoff).map_err(|err| match err { + Error::Transient(e) => e, + Error::Permanent(e) => e, + }) } #[cfg(test)] mod tests { use super::{io, Duration, ToSocketAddrs}; - use super::{Connect, ConnectionSettings, Reconnect, ReconnectWrite, Stream, TcpConfig}; + use super::{Connect, ConnectionSettings, Reconnect, Stream, TcpConfig}; mod s { use super::*; @@ -365,66 +341,4 @@ mod tests { let mut ret = Stream::::connect(addr, settings).unwrap(); ret.reconnect().unwrap(); } - - #[test] - fn write() { - let addr = "127.0.0.1:80".to_string(); - let settings = ConnectionSettings { - connect_retry_initial_delay: Duration::new(0, 1), - connect_retry_max_delay: Duration::new(0, 1), - connect_retry_timeout: Duration::from_millis(100), - write_retry_initial_delay: Duration::new(0, 1), - write_retry_max_delay: Duration::new(0, 1), - write_retry_timeout: Duration::from_millis(10), - ..Default::default() - }; - let mut ret = Stream::::connect(addr, settings).unwrap(); - let mut msg = Vec::new(); - msg.push(0x00); - msg.push(0x01); - msg.push(0x02); - ret.write(msg).unwrap(); - } - - #[test] - fn write_giveup() { - let addr = "127.0.0.1:80".to_string(); - let settings = ConnectionSettings { - connect_retry_initial_delay: Duration::new(0, 1), - connect_retry_max_delay: Duration::new(0, 1), - connect_retry_timeout: Duration::from_millis(10), - write_retry_initial_delay: Duration::new(0, 1), - write_retry_max_delay: Duration::new(0, 1), - write_retry_timeout: Duration::new(0, 5), - ..Default::default() - }; - let mut ret = Stream::::connect(addr, settings).unwrap(); - let mut msg = Vec::new(); - msg.push(0x00); - msg.push(0x01); - msg.push(0x02); - let err = ret.write(msg).err().unwrap(); - assert_eq!(err.kind(), io::ErrorKind::TimedOut) - } - - // #[test] - // fn write_reconnect_fail() { - // let addr = "a".to_string(); - // let settings = ConnectionSettings { - // connect_retry_initial_delay: Duration::new(0, 1), - // connect_retry_max_delay: Duration::new(0, 1), - // connect_retry_timeout: Duration::new(0, 3), - // write_retry_initial_delay: Duration::new(0, 1), - // write_retry_max_delay: Duration::new(0, 1), - // write_retry_timeout: Duration::from_millis(5), - // ..Default::default() - // }; - // let mut ret = Stream::::connect(addr, settings).unwrap(); - // let mut msg = Vec::new(); - // msg.push(0x00); - // msg.push(0x01); - // msg.push(0x02); - // let err = ret.write(msg).err().unwrap(); - // assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused) - // } } diff --git a/src/emitter.rs b/src/emitter.rs index 1465075..f9a0ca0 100644 --- a/src/emitter.rs +++ b/src/emitter.rs @@ -1,11 +1,8 @@ use crate::buffer::{self, Take}; -use crate::connect::ReconnectWrite; -use crate::error::Error; -use backoff::{Error as RetryError, ExponentialBackoff, Operation}; +use crate::connect::*; use base64; use std::cell::RefCell; use std::collections::VecDeque; -use std::io::Read; use std::time::SystemTime; use uuid::Uuid; @@ -26,10 +23,7 @@ impl Emitter { q.push_back(elem) } - pub fn emit(&self, rw: &mut RW, size: Option) - where - RW: ReconnectWrite + Read, - { + pub fn emit(&self, rw: &mut RW, size: Option) { let chunk = base64::encode(&Uuid::new_v4().to_string()); let mut buf = Vec::new(); @@ -50,7 +44,7 @@ impl Emitter { queue.take(&mut entries); let _ = buffer::pack_record(&mut buf, self.tag.as_str(), entries, chunk.as_str()); - if let Err(err) = write_and_read(rw, &buf, &chunk) { + if let Err(err) = rw.write_and_read(&buf, &chunk) { error!( "Tag '{}', an unexpected error occurred during emitting message: '{:?}'.", self.tag, err @@ -59,42 +53,5 @@ impl Emitter { } } -fn write_and_read(rw: &mut RW, buf: &[u8], chunk: &str) -> Result<(), RetryError> -where - RW: ReconnectWrite + Read, -{ - let mut op = || { - rw.write(buf.to_owned()) - .map_err(Error::NetworkError) - .map_err(RetryError::Transient)?; - let mut resp_buf = [0u8; 64]; - let to_write = rw - .read(&mut resp_buf) - .map_err(Error::NetworkError) - .map_err(RetryError::Transient)?; - if to_write == 0 { - warn!("Failed to received ack response. chunk: {}.", chunk); - Err(RetryError::Transient(Error::NoAckResponseError)) - } else { - let reply = - buffer::unpack_response(&resp_buf, to_write).map_err(RetryError::Transient)?; - if reply.ack == chunk { - Ok(()) - } else { - warn!( - "Did not match ack and chunk. ack: {}, chunk: {}.", - reply.ack, chunk - ); - Err(RetryError::Transient(Error::AckUmatchedError( - reply.ack, - chunk.to_string(), - ))) - } - } - }; - let mut backoff = ExponentialBackoff::default(); // TODO: Should be configurable. - op.retry(&mut backoff) -} - #[cfg(test)] mod test {} diff --git a/src/worker.rs b/src/worker.rs index ec6b4c9..611f16e 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,9 +1,9 @@ -use crate::connect::{ConnectionSettings, ReconnectWrite, Stream}; +use crate::connect::*; use crate::emitter::Emitter; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Debug; -use std::io::{self, Read}; +use std::io; use std::net::{Shutdown, TcpStream, ToSocketAddrs}; use std::sync::mpsc; use std::sync::{Arc, Mutex}; @@ -52,7 +52,7 @@ impl Worker { } } Message::Terminate => { - debug!("Worker {} received a terminate message.", id); + info!("Worker {} received a terminate message.", id); wh.flush(&mut stream, None); match stream.stream.borrow_mut().shutdown(Shutdown::Both) { Ok(_) => (), @@ -96,7 +96,7 @@ impl WorkerHandler { fn flush(&self, rw: &mut RW, size: Option) where - RW: ReconnectWrite + Read, + RW: Reconnect + WriteRead, { for emitter in self.emitters.borrow().values() { emitter.emit(rw, size)