Skip to content

Commit

Permalink
Add Emitter test
Browse files Browse the repository at this point in the history
  • Loading branch information
tkrs committed Mar 1, 2019
1 parent aa38537 commit 346108b
Showing 1 changed file with 75 additions and 3 deletions.
78 changes: 75 additions & 3 deletions src/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ impl Emitter {
}

pub fn emit<RW: WriteRead>(&self, rw: &mut RW, size: Option<usize>) {
let chunk = base64::encode(&Uuid::new_v4().to_string());
let mut buf = Vec::new();

let mut queue = self.queue.borrow_mut();
Expand All @@ -43,15 +42,88 @@ impl Emitter {
let mut entries = Vec::with_capacity(size);
queue.take(&mut entries);

let chunk = base64::encode(&Uuid::new_v4().to_string());

let _ = buffer::pack_record(&mut buf, self.tag.as_str(), entries, chunk.as_str());
if let Err(err) = rw.write_and_read(&buf, &chunk) {
error!(
"Tag '{}', an unexpected error occurred during emitting message: '{:?}'.",
"Tag '{}' unexpected error occurred during emitting message: '{:?}'.",
self.tag, err
);
}
}
}

#[cfg(test)]
mod test {}
mod test {
use super::*;
use crate::error::Error as MyError;
use backoff::Error;

struct TestStream;

impl WriteRead for TestStream {
fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), Error<MyError>> {
Ok(())
}
}

#[test]
fn emit_consume_queeu() {
let emitter = Emitter::new(1, "x".to_string());

emitter.push((SystemTime::now(), vec![0x00, 0x01]));
emitter.push((SystemTime::now(), vec![0x00, 0x02]));
emitter.push((SystemTime::now(), vec![0x00, 0x03]));
emitter.push((SystemTime::now(), vec![0x00, 0x04]));
emitter.push((SystemTime::now(), vec![0x00, 0x05]));

{
emitter.emit(&mut TestStream, Some(3));
let q = emitter.queue.borrow_mut();

assert_eq!(q.len(), 2);
}

{
emitter.emit(&mut TestStream, Some(3));
let q = emitter.queue.borrow_mut();

assert_eq!(q.len(), 0);
}

{
emitter.emit(&mut TestStream, Some(3));
let q = emitter.queue.borrow_mut();

assert_eq!(q.len(), 0);
}
}

struct TestErrStream;

impl WriteRead for TestErrStream {
fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), Error<MyError>> {
Err(Error::Permanent(MyError::AckUmatchedError(
"a".to_string(),
"b".to_string(),
)))
}
}

#[test]
fn emit_consume_queue_with_error_stream() {
let emitter = Emitter::new(1, "x".to_string());

emitter.push((SystemTime::now(), vec![0x00, 0x01]));
emitter.push((SystemTime::now(), vec![0x00, 0x02]));
emitter.push((SystemTime::now(), vec![0x00, 0x03]));
emitter.push((SystemTime::now(), vec![0x00, 0x04]));
emitter.push((SystemTime::now(), vec![0x00, 0x05]));

emitter.emit(&mut TestErrStream, Some(3));
let q = emitter.queue.borrow_mut();

assert_eq!(q.len(), 2);
}
}

0 comments on commit 346108b

Please sign in to comment.