Skip to content

Commit

Permalink
Add more testcases
Browse files Browse the repository at this point in the history
  • Loading branch information
diqiu50 committed Jan 15, 2025
1 parent 07b38b2 commit 96c69e8
Show file tree
Hide file tree
Showing 5 changed files with 494 additions and 199 deletions.
2 changes: 1 addition & 1 deletion clients/filesystem-fuse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::signal;

#[tokio::main]
async fn main() -> fuse3::Result<()> {
tracing_subscriber::fmt().init();
tracing_subscriber::fmt::init();

// todo need inmprove the args parsing
let args: Vec<String> = std::env::args().collect();
Expand Down
1 change: 0 additions & 1 deletion clients/filesystem-fuse/src/memory_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl PathFileSystem for MemoryFileSystem {
.data
.clone();
if flags.is_read() {

opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() }));
}
if flags.is_write() || flags.is_append() {
Expand Down
104 changes: 91 additions & 13 deletions clients/filesystem-fuse/src/open_dal_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use bytes::Bytes;
use fuse3::FileType::{Directory, RegularFile};
use fuse3::{Errno, FileType, Timestamp};
use log::error;
use opendal::{EntryMode, ErrorKind, Metadata, Operator};
use opendal::{Buffer, EntryMode, ErrorKind, Metadata, Operator};
use std::path::{Path, PathBuf};
use std::time::SystemTime;

Expand All @@ -37,6 +37,8 @@ pub(crate) struct OpenDalFileSystem {
impl OpenDalFileSystem {}

impl OpenDalFileSystem {
const WRITE_BUFFER_SIZE: usize = 5 * 1024 * 1024;

pub(crate) fn new(op: Operator, _config: &AppConfig, _fs_context: &FileSystemContext) -> Self {
Self { op: op }
}
Expand Down Expand Up @@ -121,17 +123,29 @@ impl PathFileSystem for OpenDalFileSystem {
file.reader = Some(Box::new(FileReaderImpl { reader }));
}
if !flags.is_create() && flags.is_append() {
error!("The file system does not support open a exists file with the append mode ");
return Err(Errno::from(libc::EBADF));
}

if flags.is_write() || flags.is_truncate() {
if flags.is_truncate() {
self.op
.write(&file_name, Buffer::new())
.await
.map_err(opendal_error_to_errno)?;
}

if flags.is_write() || flags.is_append() || flags.is_truncate() {
let writer = self
.op
.writer_with(&file_name)
.await
.map_err(opendal_error_to_errno)?;
file.writer = Some(Box::new(FileWriterImpl { writer }));
file.writer = Some(Box::new(FileWriterImpl {
writer,
buffer: Vec::with_capacity(OpenDalFileSystem::WRITE_BUFFER_SIZE),
}));
}

Ok(file)
}

Expand All @@ -146,14 +160,10 @@ impl PathFileSystem for OpenDalFileSystem {
async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
let file_name = path.to_string_lossy().to_string();

let mut writer = self
.op
.writer_with(&file_name)
self.op
.write(&file_name, Buffer::new())
.await
.map_err(opendal_error_to_errno)?;

writer.close().await.map_err(opendal_error_to_errno)?;

let file = self.open_file(path, flags).await?;
Ok(file)
}
Expand Down Expand Up @@ -214,19 +224,34 @@ impl FileReader for FileReaderImpl {

struct FileWriterImpl {
writer: opendal::Writer,
buffer: Vec<u8>,
}

#[async_trait]
impl FileWriter for FileWriterImpl {
async fn write(&mut self, _offset: u64, data: &[u8]) -> Result<u32> {
self.writer
.write(data.to_vec())
.await
.map_err(opendal_error_to_errno)?;
if self.buffer.len() > OpenDalFileSystem::WRITE_BUFFER_SIZE {
let mut new_buffer: Vec<u8> = Vec::with_capacity(OpenDalFileSystem::WRITE_BUFFER_SIZE);
new_buffer.append(&mut self.buffer);

self.writer
.write(new_buffer)
.await
.map_err(opendal_error_to_errno)?;
}
self.buffer.extend(data);
Ok(data.len() as u32)
}

async fn close(&mut self) -> Result<()> {
if !self.buffer.is_empty() {
let mut new_buffer: Vec<u8> = vec![];
new_buffer.append(&mut self.buffer);
self.writer
.write(new_buffer)
.await
.map_err(opendal_error_to_errno)?;
}
self.writer.close().await.map_err(opendal_error_to_errno)?;
Ok(())
}
Expand Down Expand Up @@ -268,6 +293,7 @@ mod test {
use crate::s3_filesystem::tests::s3_test_config;
use crate::test_enable_with;
use crate::RUN_TEST_WITH_S3;
use bytes::Buf;
use opendal::layers::LoggingLayer;
use opendal::{services, Builder, Operator};

Expand Down Expand Up @@ -331,4 +357,56 @@ mod test {
}
}
}

#[tokio::test]
async fn s3_ut_test_s3_write() {
test_enable_with!(RUN_TEST_WITH_S3);
let config = s3_test_config();

let op = create_opendal(&config);
let path = "/s1/fileset1/gvfs_test/test_dir/test_file";
let mut writer = op.writer_with(path).await.unwrap();

let mut buffer: Vec<u8> = vec![];
for i in 0..10 * 1024 {
let data = vec![i as u8; 1024];
buffer.extend(&data);

if buffer.len() > 5 * 1024 * 1024 {
writer.write(buffer).await.unwrap();
buffer = vec![];
};
}

if !buffer.is_empty() {
writer.write(buffer).await.unwrap();
}
writer.close().await.unwrap();
}

#[tokio::test]
async fn s3_ut_test_s3_read() {
test_enable_with!(RUN_TEST_WITH_S3);
let config = s3_test_config();

let op = create_opendal(&config);
let path = "/s1/fileset1/test_dir/test_big_file";
let reader = op.reader(path).await.unwrap();

let mut buffer = Vec::new();

let mut start = 0;
let mut end = 1024;
loop {
let buf = reader.read(start..end).await.unwrap();
if buf.is_empty() {
break;
}
buffer.extend_from_slice(buf.chunk());
start = end;
end += 1024;
}

println!("Read {} bytes.", buffer.len());
}
}
9 changes: 8 additions & 1 deletion clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,20 @@ if [ "$1" == "test" ]; then
echo "Running tests..."
cd $CLIENT_FUSE_DIR
export RUN_TEST_WITH_FUSE=1
cargo test --test fuse_test fuse_it_
cargo test --test fuse_test fuse_it_ -- weak_consistency

elif [ "$1" == "start" ]; then
# Start the servers
echo "Starting servers..."
start_servers

elif [ "$1" == "restart" ]; then
echo "Stopping servers..."
stop_servers
# Start the servers
echo "Starting servers..."
start_servers

elif [ "$1" == "stop" ]; then
# Stop the servers
echo "Stopping servers..."
Expand Down
Loading

0 comments on commit 96c69e8

Please sign in to comment.