Skip to content

Commit

Permalink
Merge pull request tursodatabase#874 from tursodatabase/checkpoint-ca…
Browse files Browse the repository at this point in the history
…llback

Introduce checkpoint callback
  • Loading branch information
MarinPostma authored Jan 19, 2024
2 parents e0319b0 + 62f61db commit 2c0d47d
Show file tree
Hide file tree
Showing 18 changed files with 489 additions and 136 deletions.
30 changes: 22 additions & 8 deletions bottomless/src/bottomless_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::sync::{Arc, Mutex};

use libsql_sys::ffi::{SQLITE_BUSY, SQLITE_IOERR_WRITE};
use libsql_sys::wal::wrapper::{WalWrapper, WrapWal};
use libsql_sys::wal::{CheckpointMode, Error, Result, Wal};
use libsql_sys::wal::{
BusyHandler, CheckpointCallback, CheckpointMode, Error, Result, Sqlite3Db, Wal,
};

use crate::replicator::Replicator;

Expand Down Expand Up @@ -80,16 +82,19 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {
Ok(())
}

fn checkpoint<B: libsql_sys::wal::BusyHandler>(
fn checkpoint(
&mut self,
wrapped: &mut T,
db: &mut libsql_sys::wal::Sqlite3Db,
mode: libsql_sys::wal::CheckpointMode,
busy_handler: Option<&mut B>,
db: &mut Sqlite3Db,
mode: CheckpointMode,
busy_handler: Option<&mut dyn BusyHandler>,
sync_flags: u32,
// temporary scratch buffer
buf: &mut [u8],
) -> libsql_sys::wal::Result<(u32, u32)> {
checkpoint_cb: Option<&mut dyn CheckpointCallback>,
in_wal: Option<&mut i32>,
backfilled: Option<&mut i32>,
) -> Result<()> {
{
tracing::trace!("bottomless checkpoint");

Expand Down Expand Up @@ -141,7 +146,16 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {
})??;
}

let ret = wrapped.checkpoint(db, mode, busy_handler, sync_flags, buf)?;
wrapped.checkpoint(
db,
mode,
busy_handler,
sync_flags,
buf,
checkpoint_cb,
in_wal,
backfilled,
)?;

#[allow(clippy::await_holding_lock)]
// uncontended -> only gets called under a libSQL write lock
Expand All @@ -159,6 +173,6 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {
})??;
}

Ok(ret)
Ok(())
}
}
43 changes: 33 additions & 10 deletions libsql-ffi/bundled/SQLite3MultipleCiphers/src/sqlite3.c
Original file line number Diff line number Diff line change
Expand Up @@ -13890,7 +13890,12 @@ typedef struct libsql_wal_methods {
int nBuf, /* Size of buffer nBuf */
unsigned char *zBuf, /* Temporary buffer to use */
int *pnLog, /* OUT: Number of frames in WAL */
int *pnCkpt /* OUT: Number of backfilled frames in WAL */
int *pnCkpt, /* OUT: Number of backfilled frames in WAL */
/*
* Called for each page being inserted in the wal, and once if the whole checkpoint operation was successfull with pPage == NULL
*/
int (*xCb)(void* pCbData, int mxSafeFrame, const unsigned char* pPage, int nPage, int page_no, int frame_no), /* called */
void* pCbData /* user data passed to xCb */
);

/* Return the value to pass to a sqlite3_wal_hook callback, the
Expand Down Expand Up @@ -56933,7 +56938,12 @@ typedef struct libsql_wal_methods {
int nBuf, /* Size of buffer nBuf */
unsigned char *zBuf, /* Temporary buffer to use */
int *pnLog, /* OUT: Number of frames in WAL */
int *pnCkpt /* OUT: Number of backfilled frames in WAL */
int *pnCkpt, /* OUT: Number of backfilled frames in WAL */
/*
* Called for each page being inserted in the wal, and once if the whole checkpoint operation was successfull with pPage == NULL
*/
int (*xCb)(void* pCbData, int mxSafeFrame, const unsigned char* pPage, int nPage, int page_no, int frame_no), /* called */
void* pCbData /* user data passed to xCb */
);

/* Return the value to pass to a sqlite3_wal_hook callback, the
Expand Down Expand Up @@ -64648,7 +64658,7 @@ SQLITE_PRIVATE int sqlite3PagerCheckpoint(
(eMode==SQLITE_CHECKPOINT_PASSIVE ? 0 : pPager->xBusyHandler),
pPager->pBusyHandlerArg,
pPager->walSyncFlags, pPager->pageSize, (u8 *)pPager->pTmpSpace,
pnLog, pnCkpt
pnLog, pnCkpt, NULL, NULL
);
}
return rc;
Expand Down Expand Up @@ -65228,7 +65238,9 @@ static int sqlite3WalCheckpoint(
int nBuf, /* Size of temporary buffer */
u8 *zBuf, /* Temporary buffer to use */
int *pnLog, /* OUT: Number of frames in WAL */
int *pnCkpt /* OUT: Number of backfilled frames in WAL */
int *pnCkpt, /* OUT: Number of backfilled frames in WAL */
int (*xCb)(void*, int, const unsigned char*, int, int, int),
void *pCbData
);
static void sqlite3WalEndReadTransaction(Wal *pWal);
static int sqlite3WalEndWriteTransaction(Wal *pWal);
Expand Down Expand Up @@ -66960,7 +66972,9 @@ static int walCheckpoint(
int (*xBusy)(void*), /* Function to call when busy */
void *pBusyArg, /* Context argument for xBusyHandler */
int sync_flags, /* Flags for OsSync() (or 0) */
u8 *zBuf /* Temporary buffer to use */
u8 *zBuf, /* Temporary buffer to use */
void *pCbData, /* User data passed to xCb */
int (*xCb)(void* pCbData, int mxSafeFrame, const unsigned char* pPage, int nPage, int pageNo, int frameNo) /* Checkpoint callback */
){
int rc = SQLITE_OK; /* Return code */
int szPage; /* Database page-size */
Expand Down Expand Up @@ -67009,7 +67023,7 @@ static int walCheckpoint(

/* Allocate the iterator */
if( pInfo->nBackfill<mxSafeFrame ){
rc = walIteratorRevInit(pWal, pInfo->nBackfill, &pIter, mxSafeFrame, 1);
rc = walIteratorRevInit(pWal, pInfo->nBackfill, &pIter, mxSafeFrame, xCb == NULL);
assert(rc == SQLITE_OK || pIter.frames == NULL);
}

Expand Down Expand Up @@ -67062,12 +67076,19 @@ static int walCheckpoint(
testcase( IS_BIG_INT(iOffset) );
rc = sqlite3OsWrite(pWal->pDbFd, zBuf, szPage, iOffset);
if( rc!=SQLITE_OK ) break;
if (xCb) {
rc = (xCb)(pCbData, mxSafeFrame, zBuf, szPage, iDbpage, iFrame);
}
if( rc!=SQLITE_OK ) break;
}
sqlite3OsFileControl(pWal->pDbFd, SQLITE_FCNTL_CKPT_DONE, 0);

/* If work was actually accomplished... */
if( rc==SQLITE_OK ){
if( mxSafeFrame==walIndexHdr(pWal)->mxFrame ){
if (xCb) {
rc = (xCb)(pCbData, mxSafeFrame, NULL, 0, 0, 0);
}
i64 szDb = pWal->hdr.nPage*(i64)szPage;
testcase( IS_BIG_INT(szDb) );
rc = sqlite3OsTruncate(pWal->pDbFd, szDb);
Expand Down Expand Up @@ -67191,7 +67212,7 @@ static int sqlite3WalClose(
pWal->exclusiveMode = WAL_EXCLUSIVE_MODE;
}
rc = sqlite3WalCheckpoint(pWal, db,
SQLITE_CHECKPOINT_PASSIVE, 0, 0, sync_flags, nBuf, zBuf, 0, 0
SQLITE_CHECKPOINT_PASSIVE, 0, 0, sync_flags, nBuf, zBuf, 0, 0, NULL, NULL
);
if( rc==SQLITE_OK ){
int bPersist = -1;
Expand Down Expand Up @@ -68874,7 +68895,9 @@ static int sqlite3WalCheckpoint(
int nBuf, /* Size of temporary buffer */
u8 *zBuf, /* Temporary buffer to use */
int *pnLog, /* OUT: Number of frames in WAL */
int *pnCkpt /* OUT: Number of backfilled frames in WAL */
int *pnCkpt, /* OUT: Number of backfilled frames in WAL */
int (*xCb)(void*, int, const unsigned char*, int, int, int), /* page, page_no, frame_no */
void *pCbData
){
int rc; /* Return code */
int isChanged = 0; /* True if a new wal-index header is loaded */
Expand Down Expand Up @@ -68948,7 +68971,7 @@ static int sqlite3WalCheckpoint(
if( pWal->hdr.mxFrame && walPagesize(pWal)!=nBuf ){
rc = SQLITE_CORRUPT_BKPT;
}else{
rc = walCheckpoint(pWal, db, eMode2, xBusy2, pBusyArg, sync_flags,zBuf);
rc = walCheckpoint(pWal, db, eMode2, xBusy2, pBusyArg, sync_flags,zBuf, pCbData, xCb);
}

/* If no error occurred, set the output variables. */
Expand Down Expand Up @@ -69330,7 +69353,7 @@ static int sqlite3WalOpen(
out->methods.xSavepoint = (void (*)(wal_impl *, unsigned int *))sqlite3WalSavepoint;
out->methods.xSavepointUndo = (int (*)(wal_impl *, unsigned int *))sqlite3WalSavepointUndo;
out->methods.xFrames = (int (*)(wal_impl *, int, libsql_pghdr *, unsigned int, int, int))sqlite3WalFrames;
out->methods.xCheckpoint = (int (*)(wal_impl *, sqlite3 *, int, int (*)(void *), void *, int, int, unsigned char *, int *, int *))sqlite3WalCheckpoint;
out->methods.xCheckpoint = (int (*)(wal_impl *, sqlite3 *, int, int (*)(void *), void *, int, int, unsigned char *, int *, int *, int (*)(void*, int, const unsigned char*, int, int, int), void*))sqlite3WalCheckpoint;
out->methods.xCallback = (int (*)(wal_impl *))sqlite3WalCallback;
out->methods.xExclusiveMode = (int (*)(wal_impl *, int))sqlite3WalExclusiveMode;
out->methods.xHeapMemory = (int (*)(wal_impl *))sqlite3WalHeapMemory;
Expand Down
11 changes: 11 additions & 0 deletions libsql-ffi/bundled/bindings/bindgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3325,6 +3325,17 @@ pub struct libsql_wal_methods {
zBuf: *mut ::std::os::raw::c_uchar,
pnLog: *mut ::std::os::raw::c_int,
pnCkpt: *mut ::std::os::raw::c_int,
xCb: ::std::option::Option<
unsafe extern "C" fn(
pCbData: *mut ::std::os::raw::c_void,
mxSafeFrame: ::std::os::raw::c_int,
pPage: *const ::std::os::raw::c_uchar,
nPage: ::std::os::raw::c_int,
page_no: ::std::os::raw::c_int,
frame_no: ::std::os::raw::c_int,
) -> ::std::os::raw::c_int,
>,
pCbData: *mut ::std::os::raw::c_void,
) -> ::std::os::raw::c_int,
>,
pub xCallback:
Expand Down
11 changes: 11 additions & 0 deletions libsql-ffi/bundled/bindings/session_bindgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3837,6 +3837,17 @@ pub struct libsql_wal_methods {
zBuf: *mut ::std::os::raw::c_uchar,
pnLog: *mut ::std::os::raw::c_int,
pnCkpt: *mut ::std::os::raw::c_int,
xCb: ::std::option::Option<
unsafe extern "C" fn(
pCbData: *mut ::std::os::raw::c_void,
mxSafeFrame: ::std::os::raw::c_int,
pPage: *const ::std::os::raw::c_uchar,
nPage: ::std::os::raw::c_int,
page_no: ::std::os::raw::c_int,
frame_no: ::std::os::raw::c_int,
) -> ::std::os::raw::c_int,
>,
pCbData: *mut ::std::os::raw::c_void,
) -> ::std::os::raw::c_int,
>,
pub xCallback:
Expand Down
43 changes: 33 additions & 10 deletions libsql-ffi/bundled/src/sqlite3.c
Original file line number Diff line number Diff line change
Expand Up @@ -13890,7 +13890,12 @@ typedef struct libsql_wal_methods {
int nBuf, /* Size of buffer nBuf */
unsigned char *zBuf, /* Temporary buffer to use */
int *pnLog, /* OUT: Number of frames in WAL */
int *pnCkpt /* OUT: Number of backfilled frames in WAL */
int *pnCkpt, /* OUT: Number of backfilled frames in WAL */
/*
* Called for each page being inserted in the wal, and once if the whole checkpoint operation was successfull with pPage == NULL
*/
int (*xCb)(void* pCbData, int mxSafeFrame, const unsigned char* pPage, int nPage, int page_no, int frame_no), /* called */
void* pCbData /* user data passed to xCb */
);

/* Return the value to pass to a sqlite3_wal_hook callback, the
Expand Down Expand Up @@ -56933,7 +56938,12 @@ typedef struct libsql_wal_methods {
int nBuf, /* Size of buffer nBuf */
unsigned char *zBuf, /* Temporary buffer to use */
int *pnLog, /* OUT: Number of frames in WAL */
int *pnCkpt /* OUT: Number of backfilled frames in WAL */
int *pnCkpt, /* OUT: Number of backfilled frames in WAL */
/*
* Called for each page being inserted in the wal, and once if the whole checkpoint operation was successfull with pPage == NULL
*/
int (*xCb)(void* pCbData, int mxSafeFrame, const unsigned char* pPage, int nPage, int page_no, int frame_no), /* called */
void* pCbData /* user data passed to xCb */
);

/* Return the value to pass to a sqlite3_wal_hook callback, the
Expand Down Expand Up @@ -64648,7 +64658,7 @@ SQLITE_PRIVATE int sqlite3PagerCheckpoint(
(eMode==SQLITE_CHECKPOINT_PASSIVE ? 0 : pPager->xBusyHandler),
pPager->pBusyHandlerArg,
pPager->walSyncFlags, pPager->pageSize, (u8 *)pPager->pTmpSpace,
pnLog, pnCkpt
pnLog, pnCkpt, NULL, NULL
);
}
return rc;
Expand Down Expand Up @@ -65228,7 +65238,9 @@ static int sqlite3WalCheckpoint(
int nBuf, /* Size of temporary buffer */
u8 *zBuf, /* Temporary buffer to use */
int *pnLog, /* OUT: Number of frames in WAL */
int *pnCkpt /* OUT: Number of backfilled frames in WAL */
int *pnCkpt, /* OUT: Number of backfilled frames in WAL */
int (*xCb)(void*, int, const unsigned char*, int, int, int),
void *pCbData
);
static void sqlite3WalEndReadTransaction(Wal *pWal);
static int sqlite3WalEndWriteTransaction(Wal *pWal);
Expand Down Expand Up @@ -66960,7 +66972,9 @@ static int walCheckpoint(
int (*xBusy)(void*), /* Function to call when busy */
void *pBusyArg, /* Context argument for xBusyHandler */
int sync_flags, /* Flags for OsSync() (or 0) */
u8 *zBuf /* Temporary buffer to use */
u8 *zBuf, /* Temporary buffer to use */
void *pCbData, /* User data passed to xCb */
int (*xCb)(void* pCbData, int mxSafeFrame, const unsigned char* pPage, int nPage, int pageNo, int frameNo) /* Checkpoint callback */
){
int rc = SQLITE_OK; /* Return code */
int szPage; /* Database page-size */
Expand Down Expand Up @@ -67009,7 +67023,7 @@ static int walCheckpoint(

/* Allocate the iterator */
if( pInfo->nBackfill<mxSafeFrame ){
rc = walIteratorRevInit(pWal, pInfo->nBackfill, &pIter, mxSafeFrame, 1);
rc = walIteratorRevInit(pWal, pInfo->nBackfill, &pIter, mxSafeFrame, xCb == NULL);
assert(rc == SQLITE_OK || pIter.frames == NULL);
}

Expand Down Expand Up @@ -67062,12 +67076,19 @@ static int walCheckpoint(
testcase( IS_BIG_INT(iOffset) );
rc = sqlite3OsWrite(pWal->pDbFd, zBuf, szPage, iOffset);
if( rc!=SQLITE_OK ) break;
if (xCb) {
rc = (xCb)(pCbData, mxSafeFrame, zBuf, szPage, iDbpage, iFrame);
}
if( rc!=SQLITE_OK ) break;
}
sqlite3OsFileControl(pWal->pDbFd, SQLITE_FCNTL_CKPT_DONE, 0);

/* If work was actually accomplished... */
if( rc==SQLITE_OK ){
if( mxSafeFrame==walIndexHdr(pWal)->mxFrame ){
if (xCb) {
rc = (xCb)(pCbData, mxSafeFrame, NULL, 0, 0, 0);
}
i64 szDb = pWal->hdr.nPage*(i64)szPage;
testcase( IS_BIG_INT(szDb) );
rc = sqlite3OsTruncate(pWal->pDbFd, szDb);
Expand Down Expand Up @@ -67191,7 +67212,7 @@ static int sqlite3WalClose(
pWal->exclusiveMode = WAL_EXCLUSIVE_MODE;
}
rc = sqlite3WalCheckpoint(pWal, db,
SQLITE_CHECKPOINT_PASSIVE, 0, 0, sync_flags, nBuf, zBuf, 0, 0
SQLITE_CHECKPOINT_PASSIVE, 0, 0, sync_flags, nBuf, zBuf, 0, 0, NULL, NULL
);
if( rc==SQLITE_OK ){
int bPersist = -1;
Expand Down Expand Up @@ -68874,7 +68895,9 @@ static int sqlite3WalCheckpoint(
int nBuf, /* Size of temporary buffer */
u8 *zBuf, /* Temporary buffer to use */
int *pnLog, /* OUT: Number of frames in WAL */
int *pnCkpt /* OUT: Number of backfilled frames in WAL */
int *pnCkpt, /* OUT: Number of backfilled frames in WAL */
int (*xCb)(void*, int, const unsigned char*, int, int, int), /* page, page_no, frame_no */
void *pCbData
){
int rc; /* Return code */
int isChanged = 0; /* True if a new wal-index header is loaded */
Expand Down Expand Up @@ -68948,7 +68971,7 @@ static int sqlite3WalCheckpoint(
if( pWal->hdr.mxFrame && walPagesize(pWal)!=nBuf ){
rc = SQLITE_CORRUPT_BKPT;
}else{
rc = walCheckpoint(pWal, db, eMode2, xBusy2, pBusyArg, sync_flags,zBuf);
rc = walCheckpoint(pWal, db, eMode2, xBusy2, pBusyArg, sync_flags,zBuf, pCbData, xCb);
}

/* If no error occurred, set the output variables. */
Expand Down Expand Up @@ -69330,7 +69353,7 @@ static int sqlite3WalOpen(
out->methods.xSavepoint = (void (*)(wal_impl *, unsigned int *))sqlite3WalSavepoint;
out->methods.xSavepointUndo = (int (*)(wal_impl *, unsigned int *))sqlite3WalSavepointUndo;
out->methods.xFrames = (int (*)(wal_impl *, int, libsql_pghdr *, unsigned int, int, int))sqlite3WalFrames;
out->methods.xCheckpoint = (int (*)(wal_impl *, sqlite3 *, int, int (*)(void *), void *, int, int, unsigned char *, int *, int *))sqlite3WalCheckpoint;
out->methods.xCheckpoint = (int (*)(wal_impl *, sqlite3 *, int, int (*)(void *), void *, int, int, unsigned char *, int *, int *, int (*)(void*, int, const unsigned char*, int, int, int), void*))sqlite3WalCheckpoint;
out->methods.xCallback = (int (*)(wal_impl *))sqlite3WalCallback;
out->methods.xExclusiveMode = (int (*)(wal_impl *, int))sqlite3WalExclusiveMode;
out->methods.xHeapMemory = (int (*)(wal_impl *))sqlite3WalHeapMemory;
Expand Down
11 changes: 10 additions & 1 deletion libsql-ffi/bundled/src/sqlite3.h
Original file line number Diff line number Diff line change
Expand Up @@ -8695,6 +8695,10 @@ SQLITE_API int sqlite3_status64(
*/
SQLITE_API int sqlite3_db_status(sqlite3*, int op, int *pCur, int *pHiwtr, int resetFlg);

#ifdef LIBSQL_CUSTOM_PAGER_CODEC
SQLITE_API void *libsql_leak_pager(sqlite3*);
#endif

/*
** CAPI3REF: Status Parameters for database connections
** KEYWORDS: {SQLITE_DBSTATUS options}
Expand Down Expand Up @@ -13512,7 +13516,12 @@ typedef struct libsql_wal_methods {
int nBuf, /* Size of buffer nBuf */
unsigned char *zBuf, /* Temporary buffer to use */
int *pnLog, /* OUT: Number of frames in WAL */
int *pnCkpt /* OUT: Number of backfilled frames in WAL */
int *pnCkpt, /* OUT: Number of backfilled frames in WAL */
/*
* Called for each page being inserted in the wal, and once if the whole checkpoint operation was successfull with pPage == NULL
*/
int (*xCb)(void* pCbData, int mxSafeFrame, const unsigned char* pPage, int nPage, int page_no, int frame_no), /* called */
void* pCbData /* user data passed to xCb */
);

/* Return the value to pass to a sqlite3_wal_hook callback, the
Expand Down
Loading

0 comments on commit 2c0d47d

Please sign in to comment.