Skip to content

Commit

Permalink
test(codec): added sink_stream tests for codecs
Browse files Browse the repository at this point in the history
Signed-off-by: JadKHaddad <jadkhaddad@gmail.com>
  • Loading branch information
JadKHaddad committed Sep 9, 2024
1 parent 0cf7407 commit ef7f153
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 10 deletions.
75 changes: 69 additions & 6 deletions src/codec/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,14 @@ mod test {

use std::vec::Vec;

use futures::StreamExt;
use futures::{SinkExt, StreamExt};
use tokio::io::AsyncWriteExt;

use super::*;
use crate::{decode::framed_read::FramedRead, test::init_tracing, tokio::Compat};
use crate::{
decode::framed_read::FramedRead, encode::framed_write::FramedWrite, test::init_tracing,
tokio::Compat,
};

async fn one_from_slice<const I: usize, const O: usize>() {
let read: &[u8] = b"1##";
Expand Down Expand Up @@ -247,11 +250,14 @@ mod test {
let buf = &mut [0_u8; I];

let framed_read = FramedRead::new(read, codec, buf);
let byte_chunks: Vec<_> = framed_read.collect().await;

let bytes: Vec<_> = byte_chunks.into_iter().flatten().collect::<Vec<_>>();
let items: Vec<_> = framed_read
.collect::<Vec<_>>()
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();

assert_eq!(bytes, result);
assert_eq!(items, result);
}

#[tokio::test]
Expand Down Expand Up @@ -289,4 +295,61 @@ mod test {

from_slow_reader::<1024, 24>().await;
}

#[tokio::test]
async fn sink_stream() {
const O: usize = 24;

init_tracing();

let items = std::vec![
heapless::Vec::<_, O>::from_slice(b"jh asjd").unwrap(),
heapless::Vec::<_, O>::from_slice(b"k hb").unwrap(),
heapless::Vec::<_, O>::from_slice(b"jsjuwjal kadj").unwrap(),
heapless::Vec::<_, O>::from_slice(b"jsadhjiu").unwrap(),
heapless::Vec::<_, O>::from_slice(b"w").unwrap(),
heapless::Vec::<_, O>::from_slice(b"jal kadjjsadhjiuwqens ").unwrap(),
heapless::Vec::<_, O>::from_slice(b"nd yxxcjajsdi").unwrap(),
heapless::Vec::<_, O>::from_slice(b"askdn asjdasd").unwrap(),
heapless::Vec::<_, O>::from_slice(b"iouqw essd").unwrap(),
];

let items_clone = items.clone();

let (read, write) = tokio::io::duplex(1024);

let handle = tokio::spawn(async move {
let write_buf = &mut [0_u8; 1024];
let mut framed_write = FramedWrite::new(
Compat::new(write),
AnyDelimiterCodec::<O>::new(b"##"),
write_buf,
);

for item in items_clone {
framed_write.send(item).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

framed_write.close().await.unwrap();
});

let read_buf = &mut [0_u8; 1024];
let framed_read = FramedRead::new(
Compat::new(read),
AnyDelimiterCodec::<O>::new(b"##"),
read_buf,
);

let collected_items: Vec<_> = framed_read
.collect::<Vec<_>>()
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();

handle.await.unwrap();

assert_eq!(collected_items, items);
}
}
59 changes: 57 additions & 2 deletions src/codec/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ mod test {

use std::vec::Vec;

use futures::StreamExt;
use futures::{SinkExt, StreamExt};
use tokio::io::AsyncWriteExt;

use super::*;
use crate::{decode::framed_read::FramedRead, test::init_tracing, tokio::Compat};
use crate::{
decode::framed_read::FramedRead, encode::framed_write::FramedWrite, test::init_tracing,
tokio::Compat,
};

async fn from_slice<const I: usize, const O: usize>() {
let read: &[u8] =
Expand Down Expand Up @@ -249,4 +252,56 @@ mod test {

from_slow_reader::<3, 5>().await;
}

#[tokio::test]
async fn sink_stream() {
const O: usize = 24;

init_tracing();

let chunks = std::vec![
heapless::Vec::<_, O>::from_slice(b"jh asjd").unwrap(),
heapless::Vec::<_, O>::from_slice(b"k hb").unwrap(),
heapless::Vec::<_, O>::from_slice(b"jsjuwjal kadj").unwrap(),
heapless::Vec::<_, O>::from_slice(b"jsadhjiu").unwrap(),
heapless::Vec::<_, O>::from_slice(b"w").unwrap(),
heapless::Vec::<_, O>::from_slice(b"jal kadjjsadhjiuwqens ").unwrap(),
heapless::Vec::<_, O>::from_slice(b"nd yxxcjajsdi").unwrap(),
heapless::Vec::<_, O>::from_slice(b"askdn asjdasd").unwrap(),
heapless::Vec::<_, O>::from_slice(b"iouqw essd").unwrap(),
];

let chunks_clone = chunks.clone();

let (read, write) = tokio::io::duplex(1024);

let handle = tokio::spawn(async move {
let write_buf = &mut [0_u8; 1024];
let mut framed_write = FramedWrite::new(Compat::new(write), BytesCodec::<O>, write_buf);

for item in chunks_clone {
framed_write.send(item).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

framed_write.close().await.unwrap();
});

let read_buf = &mut [0_u8; 1024];
let framed_read = FramedRead::new(Compat::new(read), BytesCodec::<O>, read_buf);

let collected_bytes: Vec<_> = framed_read
.collect::<Vec<_>>()
.await
.into_iter()
.flatten()
.flatten()
.collect::<Vec<_>>();

let bytes: Vec<_> = chunks.into_iter().flatten().collect();

handle.await.unwrap();

assert_eq!(collected_bytes, bytes);
}
}
104 changes: 102 additions & 2 deletions src/codec/lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,14 @@ mod test {
use core::str::FromStr;
use std::vec::Vec;

use futures::StreamExt;
use futures::{SinkExt, StreamExt};
use tokio::io::AsyncWriteExt;

use super::*;
use crate::{decode::framed_read::FramedRead, test::init_tracing, tokio::Compat};
use crate::{
decode::framed_read::FramedRead, encode::framed_write::FramedWrite, test::init_tracing,
tokio::Compat,
};

macro_rules! collect_items {
($framed_read:expr) => {{
Expand Down Expand Up @@ -495,4 +498,101 @@ mod test {

from_slow_reader::<1024, 24>().await;
}

#[tokio::test]
async fn sink_stream() {
const O: usize = 24;

init_tracing();

// Test with `LineBytesCodec`

let items = std::vec![
heapless::Vec::<_, O>::from_slice(b"jh asjd").unwrap(),
heapless::Vec::<_, O>::from_slice(b"k hb").unwrap(),
heapless::Vec::<_, O>::from_slice(b"jsjuwjal kadj").unwrap(),
heapless::Vec::<_, O>::from_slice(b"jsadhjiu").unwrap(),
heapless::Vec::<_, O>::from_slice(b"w").unwrap(),
heapless::Vec::<_, O>::from_slice(b"jal kadjjsadhjiuwqens ").unwrap(),
heapless::Vec::<_, O>::from_slice(b"nd yxxcjajsdi").unwrap(),
heapless::Vec::<_, O>::from_slice(b"askdn asjdasd").unwrap(),
heapless::Vec::<_, O>::from_slice(b"iouqw essd").unwrap(),
];

let items_clone = items.clone();

let (read, write) = tokio::io::duplex(1024);

let handle = tokio::spawn(async move {
let write_buf = &mut [0_u8; 1024];
let mut framed_write =
FramedWrite::new(Compat::new(write), LineBytesCodec::<O>::new(), write_buf);

for item in items_clone {
framed_write.send(item).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

framed_write.close().await.unwrap();
});

let read_buf = &mut [0_u8; 1024];
let framed_read = FramedRead::new(Compat::new(read), LineBytesCodec::<O>::new(), read_buf);

let collected_items: Vec<_> = framed_read
.collect::<Vec<_>>()
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();

handle.await.unwrap();

assert_eq!(collected_items, items);

// Test with `LinesCodec`

let items = std::vec![
heapless::String::<O>::from_str("jh asjd").unwrap(),
heapless::String::<O>::from_str("k hb").unwrap(),
heapless::String::<O>::from_str("jsjuwjal kadj").unwrap(),
heapless::String::<O>::from_str("jsadhjiu").unwrap(),
heapless::String::<O>::from_str("w").unwrap(),
heapless::String::<O>::from_str("jal kadjjsadhjiuwqens ").unwrap(),
heapless::String::<O>::from_str("nd yxxcjajsdi").unwrap(),
heapless::String::<O>::from_str("askdn asjdasd").unwrap(),
heapless::String::<O>::from_str("iouqw essd").unwrap(),
];

let items_clone = items.clone();

let (read, write) = tokio::io::duplex(1024);

let handle = tokio::spawn(async move {
let write_buf = &mut [0_u8; 1024];
let mut framed_write =
FramedWrite::new(Compat::new(write), LinesCodec::<O>::new(), write_buf);

for item in items_clone {
framed_write.send(item).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

framed_write.close().await.unwrap();
});

let read_buf = &mut [0_u8; 1024];
let framed_read = FramedRead::new(Compat::new(read), LinesCodec::<O>::new(), read_buf);

let collected_items: Vec<_> = framed_read
.collect::<Vec<_>>()
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();

handle.await.unwrap();

assert_eq!(collected_items, items);
}
}

0 comments on commit ef7f153

Please sign in to comment.