From 346108b6baaddb9865432496d30473fb4948d327 Mon Sep 17 00:00:00 2001 From: Takeru Sato Date: Fri, 1 Mar 2019 23:19:03 +0900 Subject: [PATCH] Add Emitter test --- src/emitter.rs | 78 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 75 insertions(+), 3 deletions(-) diff --git a/src/emitter.rs b/src/emitter.rs index f9a0ca0..502a530 100644 --- a/src/emitter.rs +++ b/src/emitter.rs @@ -24,7 +24,6 @@ impl Emitter { } pub fn emit(&self, rw: &mut RW, size: Option) { - let chunk = base64::encode(&Uuid::new_v4().to_string()); let mut buf = Vec::new(); let mut queue = self.queue.borrow_mut(); @@ -43,10 +42,12 @@ impl Emitter { let mut entries = Vec::with_capacity(size); queue.take(&mut entries); + let chunk = base64::encode(&Uuid::new_v4().to_string()); + let _ = buffer::pack_record(&mut buf, self.tag.as_str(), entries, chunk.as_str()); if let Err(err) = rw.write_and_read(&buf, &chunk) { error!( - "Tag '{}', an unexpected error occurred during emitting message: '{:?}'.", + "Tag '{}' unexpected error occurred during emitting message: '{:?}'.", self.tag, err ); } @@ -54,4 +55,75 @@ impl Emitter { } #[cfg(test)] -mod test {} +mod test { + use super::*; + use crate::error::Error as MyError; + use backoff::Error; + + struct TestStream; + + impl WriteRead for TestStream { + fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), Error> { + Ok(()) + } + } + + #[test] + fn emit_consume_queeu() { + let emitter = Emitter::new(1, "x".to_string()); + + emitter.push((SystemTime::now(), vec![0x00, 0x01])); + emitter.push((SystemTime::now(), vec![0x00, 0x02])); + emitter.push((SystemTime::now(), vec![0x00, 0x03])); + emitter.push((SystemTime::now(), vec![0x00, 0x04])); + emitter.push((SystemTime::now(), vec![0x00, 0x05])); + + { + emitter.emit(&mut TestStream, Some(3)); + let q = emitter.queue.borrow_mut(); + + assert_eq!(q.len(), 2); + } + + { + emitter.emit(&mut TestStream, Some(3)); + let q = emitter.queue.borrow_mut(); + + assert_eq!(q.len(), 0); + } + + { + emitter.emit(&mut TestStream, Some(3)); + let q = emitter.queue.borrow_mut(); + + assert_eq!(q.len(), 0); + } + } + + struct TestErrStream; + + impl WriteRead for TestErrStream { + fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), Error> { + Err(Error::Permanent(MyError::AckUmatchedError( + "a".to_string(), + "b".to_string(), + ))) + } + } + + #[test] + fn emit_consume_queue_with_error_stream() { + let emitter = Emitter::new(1, "x".to_string()); + + emitter.push((SystemTime::now(), vec![0x00, 0x01])); + emitter.push((SystemTime::now(), vec![0x00, 0x02])); + emitter.push((SystemTime::now(), vec![0x00, 0x03])); + emitter.push((SystemTime::now(), vec![0x00, 0x04])); + emitter.push((SystemTime::now(), vec![0x00, 0x05])); + + emitter.emit(&mut TestErrStream, Some(3)); + let q = emitter.queue.borrow_mut(); + + assert_eq!(q.len(), 2); + } +}