Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Jan 19, 2024
1 parent ac9e2e3 commit 69a8e2d
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 27 deletions.
15 changes: 13 additions & 2 deletions bottomless/src/bottomless_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::sync::{Arc, Mutex};

use libsql_sys::ffi::{SQLITE_BUSY, SQLITE_IOERR_WRITE};
use libsql_sys::wal::wrapper::{WalWrapper, WrapWal};
use libsql_sys::wal::{CheckpointMode, Error, Result, Wal, Sqlite3Db, BusyHandler, CheckpointCallback};
use libsql_sys::wal::{
BusyHandler, CheckpointCallback, CheckpointMode, Error, Result, Sqlite3Db, Wal,
};

use crate::replicator::Replicator;

Expand Down Expand Up @@ -144,7 +146,16 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {
})??;
}

wrapped.checkpoint(db, mode, busy_handler, sync_flags, buf, checkpoint_cb, in_wal, backfilled)?;
wrapped.checkpoint(
db,
mode,
busy_handler,
sync_flags,
buf,
checkpoint_cb,
in_wal,
backfilled,
)?;

#[allow(clippy::await_holding_lock)]
// uncontended -> only gets called under a libSQL write lock
Expand Down
16 changes: 12 additions & 4 deletions libsql-replication/src/injector/injector_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::num::NonZeroU32;

use libsql_sys::ffi::{Pager, PgHdr};
use libsql_sys::wal::{
BusyHandler, CheckpointMode, PageHeaders, Result, Sqlite3Db, Sqlite3File, Sqlite3Wal,
Sqlite3WalManager, UndoHandler, Vfs, Wal, WalManager, CheckpointCallback,
BusyHandler, CheckpointCallback, CheckpointMode, PageHeaders, Result, Sqlite3Db, Sqlite3File,
Sqlite3Wal, Sqlite3WalManager, UndoHandler, Vfs, Wal, WalManager,
};

use crate::frame::FrameBorrowed;
Expand Down Expand Up @@ -191,8 +191,16 @@ impl Wal for InjectorWal {
in_wal: Option<&mut i32>,
backfilled: Option<&mut i32>,
) -> Result<()> {
self.inner
.checkpoint(db, mode, busy_handler, sync_flags, buf, checkpoint_cb, in_wal, backfilled)
self.inner.checkpoint(
db,
mode,
busy_handler,
sync_flags,
buf,
checkpoint_cb,
in_wal,
backfilled,
)
}

fn exclusive_mode(&mut self, op: c_int) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use libsql_sys::wal::wrapper::{WalWrapper, WrapWal, WrappedWal};
use libsql_sys::wal::{Wal, WalManager, CheckpointCallback, BusyHandler};
use libsql_sys::wal::{BusyHandler, CheckpointCallback, Wal, WalManager};
use metrics::{histogram, increment_counter};
use once_cell::sync::Lazy;
use parking_lot::{Mutex, RwLock};
Expand Down
32 changes: 27 additions & 5 deletions libsql-server/src/replication/primary/replication_logger_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::sync::Arc;

use bytes::Bytes;
use libsql_sys::ffi::Sqlite3DbHeader;
use libsql_sys::wal::{BusyHandler, Result, Sqlite3Wal, Sqlite3WalManager, WalManager, CheckpointCallback};
use libsql_sys::wal::{
BusyHandler, CheckpointCallback, Result, Sqlite3Wal, Sqlite3WalManager, WalManager,
};
use libsql_sys::wal::{PageHeaders, Sqlite3Db, Sqlite3File, UndoHandler};
use libsql_sys::wal::{Vfs, Wal};
use rusqlite::ffi::{libsql_pghdr, SQLITE_IOERR, SQLITE_SYNC_NORMAL};
Expand Down Expand Up @@ -234,8 +236,16 @@ impl Wal for ReplicationLoggerWal {
backfilled: Option<&mut i32>,
) -> Result<()> {
self.inject_replication_index()?;
self.inner
.checkpoint(db, mode, busy_handler, sync_flags, buf, checkpoint_cb, in_wal, backfilled)
self.inner.checkpoint(
db,
mode,
busy_handler,
sync_flags,
buf,
checkpoint_cb,
in_wal,
backfilled,
)
}

fn exclusive_mode(&mut self, op: c_int) -> Result<()> {
Expand Down Expand Up @@ -318,7 +328,10 @@ impl ReplicationLoggerWal {

#[cfg(test)]
mod test {
use libsql_sys::wal::{wrapper::{WalWrapper, WrapWal}, CheckpointMode};
use libsql_sys::wal::{
wrapper::{WalWrapper, WrapWal},
CheckpointMode,
};
use metrics::atomics::AtomicU64;
use rusqlite::ffi::{sqlite3_wal_checkpoint_v2, SQLITE_CHECKPOINT_FULL};
use tempfile::tempdir;
Expand Down Expand Up @@ -348,7 +361,16 @@ mod test {
in_wal: Option<&mut i32>,
backfilled: Option<&mut i32>,
) -> libsql_sys::wal::Result<()> {
wrapped.checkpoint(db, mode, busy_handler, sync_flags, buf, checkpoint_cb, in_wal, backfilled)?;
wrapped.checkpoint(
db,
mode,
busy_handler,
sync_flags,
buf,
checkpoint_cb,
in_wal,
backfilled,
)?;
let buf = &mut [0; LIBSQL_PAGE_SIZE as _];
wrapped.inner.db_file().read_at(buf, 0).unwrap();
let header = Sqlite3DbHeader::mut_from_prefix(buf).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions libsql-sys/src/wal/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use std::ptr::null;

use libsql_ffi::{
libsql_wal, libsql_wal_manager, libsql_wal_methods, sqlite3, sqlite3_file, sqlite3_vfs,
wal_impl, wal_manager_impl, PgHdr, SQLITE_CHECKPOINT_FULL, SQLITE_CHECKPOINT_PASSIVE,
SQLITE_CHECKPOINT_RESTART, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_OK, WAL_SAVEPOINT_NDATA, Error,
wal_impl, wal_manager_impl, Error, PgHdr, SQLITE_CHECKPOINT_FULL, SQLITE_CHECKPOINT_PASSIVE,
SQLITE_CHECKPOINT_RESTART, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_OK, WAL_SAVEPOINT_NDATA,
};

use crate::wal::{BusyHandler, CheckpointMode, UndoHandler, CheckpointCallback};
use crate::wal::{BusyHandler, CheckpointCallback, CheckpointMode, UndoHandler};

use super::{PageHeaders, Sqlite3Db, Sqlite3File, Vfs, Wal, WalManager};

Expand Down
8 changes: 7 additions & 1 deletion libsql-sys/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,13 @@ pub enum CheckpointMode {
}

pub trait CheckpointCallback {
fn frame(&mut self, max_safe_frame_no: u32, frame: &[u8], page_no: NonZeroU32, frame_no: NonZeroU32) -> Result<()>;
fn frame(
&mut self,
max_safe_frame_no: u32,
frame: &[u8],
page_no: NonZeroU32,
frame_no: NonZeroU32,
) -> Result<()>;
fn finish(&mut self) -> Result<()>;
}

Expand Down
29 changes: 23 additions & 6 deletions libsql-sys/src/wal/sqlite3_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use libsql_ffi::{
};

use super::{
BusyHandler, CheckpointMode, PageHeaders, Result, Sqlite3Db, Sqlite3File, UndoHandler, Vfs,
Wal, WalManager, CheckpointCallback,
BusyHandler, CheckpointCallback, CheckpointMode, PageHeaders, Result, Sqlite3Db, Sqlite3File,
UndoHandler, Vfs, Wal, WalManager,
};

/// SQLite3 default wal_manager implementation.
Expand Down Expand Up @@ -323,12 +323,24 @@ impl Wal for Sqlite3Wal {
this.handle_busy() as _
}

unsafe extern "C" fn call_cb(data: *mut c_void, max_safe_frame_no: c_int, page: *const u8, page_len: c_int, page_no: c_int, frame_no: c_int) -> c_int {
unsafe extern "C" fn call_cb(
data: *mut c_void,
max_safe_frame_no: c_int,
page: *const u8,
page_len: c_int,
page_no: c_int,
frame_no: c_int,
) -> c_int {
let this = &mut *(data as *mut &mut dyn CheckpointCallback);
let ret = if page.is_null() {
this.finish()
} else {
this.frame(max_safe_frame_no as _, std::slice::from_raw_parts(page, page_len as _), NonZeroU32::new(page_no as _).unwrap(), NonZeroU32::new(frame_no as _).unwrap())
this.frame(
max_safe_frame_no as _,
std::slice::from_raw_parts(page, page_len as _),
NonZeroU32::new(page_no as _).unwrap(),
NonZeroU32::new(frame_no as _).unwrap(),
)
};

match ret {
Expand All @@ -346,10 +358,15 @@ impl Wal for Sqlite3Wal {
.unwrap_or(std::ptr::null_mut());

let checkpoint_cb_fn = checkpoint_cb.is_some().then_some(call_cb as _);
let checkpoint_cb_data = checkpoint_cb.as_mut().map(|d| d as *mut &mut dyn CheckpointCallback as *mut _).unwrap_or(std::ptr::null_mut());
let checkpoint_cb_data = checkpoint_cb
.as_mut()
.map(|d| d as *mut &mut dyn CheckpointCallback as *mut _)
.unwrap_or(std::ptr::null_mut());

let out_log_num_frames = in_wal.map(|ptr| ptr as _).unwrap_or(std::ptr::null_mut());
let out_backfilled = backfilled.map(|ptr| ptr as _).unwrap_or(std::ptr::null_mut());
let out_backfilled = backfilled
.map(|ptr| ptr as _)
.unwrap_or(std::ptr::null_mut());

let rc = unsafe {
(self.inner.methods.xCheckpoint.unwrap())(
Expand Down
38 changes: 33 additions & 5 deletions libsql-sys/src/wal/wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::ffi::c_int;
use std::num::NonZeroU32;

use super::{Wal, WalManager, BusyHandler, CheckpointCallback};
use super::{BusyHandler, CheckpointCallback, Wal, WalManager};

/// A convenient wrapper struct that implement WAL with a `wrapper` where the wrapper needs to
/// implement `WrapWal` instead of `Wal`, where all methods delegate to wrapped by default.
Expand Down Expand Up @@ -178,7 +178,7 @@ where
buf,
checkpoint_cb,
in_wal,
backfilled
backfilled,
)
}

Expand Down Expand Up @@ -288,7 +288,16 @@ pub trait WrapWal<W: Wal> {
in_wal: Option<&mut i32>,
backfilled: Option<&mut i32>,
) -> super::Result<()> {
wrapped.checkpoint(db, mode, busy_handler, sync_flags, buf, checkpoint_cb, in_wal, backfilled)
wrapped.checkpoint(
db,
mode,
busy_handler,
sync_flags,
buf,
checkpoint_cb,
in_wal,
backfilled,
)
}

fn exclusive_mode(&mut self, wrapped: &mut W, op: std::ffi::c_int) -> super::Result<()> {
Expand Down Expand Up @@ -452,8 +461,27 @@ impl<T: WrapWal<W>, W: Wal> WrapWal<W> for Option<T> {
backfilled: Option<&mut i32>,
) -> super::Result<()> {
match self {
Some(t) => t.checkpoint(wrapped, db, mode, busy_handler, sync_flags, buf, checkpoint_cb, in_wal, backfilled),
None => wrapped.checkpoint(db, mode, busy_handler, sync_flags, buf, checkpoint_cb, in_wal, backfilled),
Some(t) => t.checkpoint(
wrapped,
db,
mode,
busy_handler,
sync_flags,
buf,
checkpoint_cb,
in_wal,
backfilled,
),
None => wrapped.checkpoint(
db,
mode,
busy_handler,
sync_flags,
buf,
checkpoint_cb,
in_wal,
backfilled,
),
}
}

Expand Down

0 comments on commit 69a8e2d

Please sign in to comment.