Skip to content

Commit

Permalink
Migrate to use either async streams or std streams
Browse files Browse the repository at this point in the history
  • Loading branch information
ttocsneb committed Mar 26, 2024
1 parent 392075f commit 2cf70c6
Show file tree
Hide file tree
Showing 11 changed files with 411 additions and 403 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@ keywords = ["binary", "serde"]

[dependencies]
byteorder = "1"
enum-as-inner = "0.6"
maybe-async = { version = "0.2" }
anyhow = "1"
thiserror = "1"
enum-as-inner = "0.6"
strum = { version = "0.26", features = ["derive"] }
maybe-async = "0.2"
async-generic = "1.0"

# Async Dependencies
async-generic = { version = "1.0.0", optional = true }
async-channel = { version = "2", optional = true }
tokio = { version = "1", features = ["rt-multi-thread", "io-util"], optional = true }
tokio = { version = "1", features = ["rt-multi-thread", "io-util", "fs"], optional = true }
async-trait = { version = "0.1", optional = true }
pin-project = "1.1.4"
strum = { version = "0.26.1", features = ["derive"] }
pin-project = { version = "1.1", optional = true }
async-recursion = "1.0.5"

[dev-dependencies]
futures = "0.3.30"
Expand All @@ -43,7 +44,7 @@ tokio = { version = "1", features = ["full"] }

sync = ["maybe-async/is_sync"]

async = ["dep:async-channel", "dep:async-generic", "dep:async-trait"]
async = ["dep:async-channel", "dep:async-trait", "dep:pin-project"]
async-tokio = ["async", "dep:tokio"]


Expand Down
45 changes: 17 additions & 28 deletions src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Contains [FileBuffer] and [FileBufferAsync], which is a wrapper for files.
//! A wrapper for files.
//!
//! [FileBuffer] and [FileBufferAsync] wraps another Reader/Writer and is able
//! [FileBuffer] wraps another Reader/Writer and is able
//! to hold a large buffer of the file and allows for seeks without clearing the
//! buffer. The buffer has a limited capacity which can be set with
//! [FileBufferOptions::with_max_cache()]/[FileBufferOptions::with_max_blocks()].
Expand All @@ -13,50 +13,36 @@
//! This wrapper is most useful for applications where the file is seeked often
//! and many reads/writes happen close together.
//!
//! In order to create a [FileBuffer] or [FileBufferAsync], the
//! In order to create a [FileBuffer], the
//! [FileBufferOptions] must be used.
//!
//! [FileBufferAsync] is only available when the feature `async-tokio` is
//! enabled.
//!
//! ```no_run
//! use mbon::buffer::FileBufferOptions;
//! use std::fs::File;
//!
//! let file = File::options()
//! .read(true)
//! .write(true)
//! .open("my_file.mbon").unwrap();
//!
//! let fb = FileBufferOptions::new()
//! .with_block_size(4096)
//! .with_max_cache(1_000_000)
//! .build(file);
//! ```
use std::{
collections::{BTreeSet, BinaryHeap, HashMap},
mem,
};

pub mod async_buf;
pub mod sync_buf;
#[cfg(feature = "async-tokio")]
mod async_buf;
#[cfg(feature = "sync")]
mod sync_buf;

#[cfg(feature = "sync")]
use std::io::{Read, Seek};

#[cfg(feature = "async-tokio")]
use tokio::io::{AsyncRead, AsyncSeek};

#[cfg(feature = "sync")]
pub use self::sync_buf::FileBuffer;

#[cfg(feature = "async-tokio")]
pub use self::async_buf::FileBufferAsync;
pub use self::async_buf::FileBufferAsync as FileBuffer;

struct Block {
data: Vec<u8>,
access: u64,
}

/// The internal buffer used by [FileBuffer] and [FileBufferAsync].
/// The internal buffer used by [FileBuffer].
struct Buffer {
blocks: HashMap<u64, Block>,
modified: BTreeSet<u64>,
Expand Down Expand Up @@ -172,13 +158,15 @@ impl Buffer {

/// reset all blocks to be unmodified and return all that were previously
/// marked as modified
#[allow(unused)]
fn take_modified(&mut self) -> Vec<u64> {
let mut modified: Vec<_> = mem::take(&mut self.modified).into_iter().collect();
modified.sort_unstable();
modified
}

/// Get the data from a block id.
#[allow(unused)]
fn get_block_mut(&mut self, block: u64) -> Option<&mut Vec<u8>> {
get_block!(mut self, block)
}
Expand Down Expand Up @@ -338,22 +326,23 @@ impl FileBufferOptions {
/// Build a [FileBuffer] with a given stream.
///
/// The stream must be at least a [Read] + [Seek]
#[cfg(feature = "sync")]
pub fn build<F: Read + Seek>(&self, f: F) -> FileBuffer<F> {
let buffer = self.internal_build();

FileBuffer::new(buffer, f)
}

/// Build a [FileBufferAsync] with a given async stream.
/// Build a [FileBuffer] with a given async stream.
///
/// The stream must be at least a [AsyncRead] + [AsyncSeek]
///
/// This function is only available with the feature `async-tokio` enabled.
#[cfg(feature = "async-tokio")]
pub fn build_async<F: AsyncRead + AsyncSeek>(&self, f: F) -> FileBufferAsync<F> {
pub fn build<F: AsyncRead + AsyncSeek>(&self, f: F) -> FileBuffer<F> {
let buffer = self.internal_build();

FileBufferAsync::new(buffer, f)
FileBuffer::new(buffer, f)
}
}

Expand Down
22 changes: 9 additions & 13 deletions src/buffer/async_buf.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![cfg(feature = "async-tokio")]

use super::Buffer;

use std::io::{self, SeekFrom};
Expand Down Expand Up @@ -70,7 +68,7 @@ impl Default for AsyncFileBufState {
/// let fb = FileBufferOptions::new()
/// .with_block_size(4096)
/// .with_max_cache(1_000_000)
/// .build_async(file);
/// .build(file);
/// # };
/// ```
#[pin_project]
Expand Down Expand Up @@ -633,9 +631,7 @@ mod test {
let file = lorem_ipsom();
let file = File::open(file).await.unwrap();

let mut f = FileBufferOptions::new()
.with_block_size(13)
.build_async(file);
let mut f = FileBufferOptions::new().with_block_size(13).build(file);

let mut buf = [0u8; 100];
for i in 0..(lic.len() / 100) {
Expand All @@ -662,7 +658,7 @@ mod test {

let mut f = FileBufferOptions::new()
.with_block_size(13)
.build_async(&mut file);
.build(&mut file);

f.write_all(SHORT).await.unwrap();
f.flush().await.unwrap();
Expand All @@ -687,7 +683,7 @@ mod test {
.unwrap();
let mut f = FileBufferOptions::new()
.with_block_size(13)
.build_async(&mut file);
.build(&mut file);

f.write_all(b"Hello World").await.unwrap();
f.flush().await.unwrap();
Expand Down Expand Up @@ -718,7 +714,7 @@ mod test {

let mut f = FileBufferOptions::new()
.with_block_size(13)
.build_async(&mut file);
.build(&mut file);

f.write_all(lic.as_slice()).await.unwrap();
f.flush().await.unwrap();
Expand All @@ -744,7 +740,7 @@ mod test {

let mut f = FileBufferOptions::new()
.with_block_size(13)
.build_async(&mut file);
.build(&mut file);

f.seek(SeekFrom::Start(9)).await.unwrap();
f.write_all(b"Hello World").await.unwrap();
Expand All @@ -769,7 +765,7 @@ mod test {
let mut f = FileBufferOptions::new()
.with_block_size(13)
.with_max_blocks(13)
.build_async(&mut file);
.build(&mut file);

let mut buf = [0u8; 100];
for i in 0..(lic.len() / 100) {
Expand All @@ -790,7 +786,7 @@ mod test {
let mut file = File::open(file).await.unwrap();
let mut f = FileBufferOptions::new()
.with_block_size(13)
.build_async(&mut file);
.build(&mut file);

let mut buf = [0u8; 100];
f.seek(SeekFrom::End(100)).await.unwrap();
Expand All @@ -813,7 +809,7 @@ mod test {
let mut f = FileBufferOptions::new()
.with_block_size(13)
.with_max_blocks(13)
.build_async(&mut file);
.build(&mut file);

let mut rng = StdRng::from_seed(*b"Hiya World This is a random seed");
// let mut rng = StdRng::from_entropy();
Expand Down
40 changes: 0 additions & 40 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,6 @@ impl<T> ChanSend<T> {
pub async fn send(&self, data: T) -> Result<(), SendError<T>> {
self.0.send(data).await
}

/// Send a message to the receiver
///
/// This is the same as [Self::send] when feature `sync` is set
#[cfg(feature = "async")]
#[inline]
pub fn send_blocking(&self, data: T) -> Result<(), SendError<T>> {
self.0.send_blocking(data)
}

/// Send a message to the receiver
///
/// This is the same as [Self::send] when feature `sync` is set
#[cfg(feature = "sync")]
#[inline]
pub fn send_blocking(&self, data: T) -> Result<(), SendError<T>> {
self.0.send(data)
}
}
#[maybe_async]
impl<T> ChanRecv<T> {
Expand All @@ -63,26 +45,4 @@ impl<T> ChanRecv<T> {
pub async fn recv(&self) -> Result<T, RecvError> {
self.0.recv().await
}

/// Receive a message from a sender.
///
/// This will wait until a message is ready
///
/// This is the same as [Self::recv] when feature `sync` is set
#[cfg(feature = "async")]
#[inline]
pub fn recv_blocking(&self) -> Result<T, RecvError> {
self.0.recv_blocking()
}

/// Receive a message from a sender.
///
/// This will wait until a message is ready
///
/// This is the same as [Self::recv] when feature `sync` is set
#[cfg(feature = "sync")]
#[inline]
pub fn recv_blocking(&self) -> Result<T, RecvError> {
self.0.recv()
}
}
Loading

0 comments on commit 2cf70c6

Please sign in to comment.