Skip to content

Commit

Permalink
Copy-on-Write Trunk
Browse files Browse the repository at this point in the history
This changeset implements copy-on-write for trunk nodes, which includes
several high-level changes. This PR still needs to be rebased onto main,
but the the purpose is to discuss high- and low-level design decisions.

Changes in this PR:

Trunk root lock. A distributed RW lock is used to access/change the
current root.

Flush from root. Flushes proceed from the root and cascade immediately
rather than being triggered at the beginning of trunk_compact_bundle.

Copy-on-write. Trunk nodes cannot be modified directly, and instead are
change via a copy-on-write of the root-to-node path together with a
change of the root node.

Garbage Collection for unlinked branches and filters. After a
copy-on-write, the nodes on the old path will be unreferenced. This PR
does not GC the trunk nodes themselves, but it includes a GC path to
dereference the replaced branches and filters.

platform_batch_rwlock. Replaces distributed locks using dummy cache
pages with a batched distributed RW lock implementation in
platform.[ch].
  • Loading branch information
ajhconway committed Apr 23, 2023
1 parent b5a283b commit fa990cf
Show file tree
Hide file tree
Showing 12 changed files with 1,393 additions and 897 deletions.
4 changes: 1 addition & 3 deletions src/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ typedef enum page_type {
PAGE_TYPE_LOG,
PAGE_TYPE_SUPERBLOCK,
PAGE_TYPE_MISC, // Used mainly as a testing hook, for cache access testing.
PAGE_TYPE_LOCK_NO_DATA,
NUM_PAGE_TYPES,
} page_type;

Expand All @@ -65,8 +64,7 @@ static const char *const page_type_str[] = {"invalid",
"filter",
"log",
"superblock",
"misc",
"lock"};
"misc"};

// Ensure that the page-type lookup array is adequately sized.
_Static_assert(
Expand Down
3 changes: 1 addition & 2 deletions src/clockcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -2263,8 +2263,7 @@ clockcache_get(clockcache *cc, uint64 addr, bool blocking, page_type type)
page_handle *handle;

debug_assert(cc->per_thread[platform_get_tid()].enable_sync_get
|| type == PAGE_TYPE_MEMTABLE
|| type == PAGE_TYPE_LOCK_NO_DATA);
|| type == PAGE_TYPE_MEMTABLE);
while (1) {
retry = clockcache_get_internal(cc, addr, blocking, type, &handle);
if (!retry) {
Expand Down
1 change: 0 additions & 1 deletion src/data_internal.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#include "data_internal.h"

message_type
Expand Down
193 changes: 86 additions & 107 deletions src/memtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#define MEMTABLE_COUNT_GRANULARITY 128

#define MEMTABLE_INSERT_LOCK_IDX 0
#define MEMTABLE_LOOKUP_LOCK_IDX 1

bool
memtable_is_full(const memtable_config *cfg, memtable *mt)
{
Expand All @@ -40,46 +43,106 @@ memtable_process(memtable_context *ctxt, uint64 generation)
ctxt->process(ctxt->process_ctxt, generation);
}

void
memtable_get_insert_lock(memtable_context *ctxt)
{
platform_batch_rwlock_get(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
}

void
memtable_unget_insert_lock(memtable_context *ctxt)
{
platform_batch_rwlock_unget(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
}

bool
memtable_try_lock_insert_lock(memtable_context *ctxt)
{
if (!platform_batch_rwlock_try_claim(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX))
{
return FALSE;
}
platform_batch_rwlock_lock(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
return TRUE;
}

void
memtable_lock_insert_lock(memtable_context *ctxt)
{
platform_batch_rwlock_claim(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
platform_batch_rwlock_lock(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
}

void
memtable_unlock_insert_lock(memtable_context *ctxt)
{
platform_batch_rwlock_unclaim(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
platform_batch_rwlock_unlock(ctxt->rwlock, MEMTABLE_INSERT_LOCK_IDX);
}

void
memtable_get_lookup_lock(memtable_context *ctxt)
{
platform_batch_rwlock_get(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
}

void
memtable_unget_lookup_lock(memtable_context *ctxt)
{
platform_batch_rwlock_unget(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
}

void
memtable_lock_lookup_lock(memtable_context *ctxt)
{
platform_batch_rwlock_claim(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
platform_batch_rwlock_lock(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
}

void
memtable_unlock_lookup_lock(memtable_context *ctxt)
{
platform_batch_rwlock_unclaim(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
platform_batch_rwlock_unlock(ctxt->rwlock, MEMTABLE_LOOKUP_LOCK_IDX);
}


platform_status
memtable_maybe_rotate_and_get_insert_lock(memtable_context *ctxt,
uint64 *generation,
page_handle **lock_page)
uint64 *generation)
{
cache *cc = ctxt->cc;
uint64 lock_addr = ctxt->insert_lock_addr;
uint64 wait = 100;
uint64 wait = 100;
while (TRUE) {
*lock_page = cache_get(cc, lock_addr, TRUE, PAGE_TYPE_LOCK_NO_DATA);
memtable_get_insert_lock(ctxt);
*generation = ctxt->generation;
uint64 mt_no = *generation % ctxt->cfg.max_memtables;
memtable *mt = &ctxt->mt[mt_no];
if (mt->state != MEMTABLE_STATE_READY) {
// The next memtable is not ready yet, back off and wait.
cache_unget(cc, *lock_page);
memtable_unget_insert_lock(ctxt);
platform_sleep_ns(wait);
wait = wait > 2048 ? wait : 2 * wait;
continue;
}
wait = 100;

if (memtable_is_full(&ctxt->cfg, &ctxt->mt[mt_no])) {
// If the current memtable is full, try to retire it.
if (cache_try_claim(cc, *lock_page)) {
// We successfully got the claim, so we do the finalization
cache_lock(cc, *lock_page);
memtable_unget_insert_lock(ctxt);
if (memtable_try_lock_insert_lock(ctxt)) {
// We successfully got the lock, so we do the finalization
memtable_transition(
mt, MEMTABLE_STATE_READY, MEMTABLE_STATE_FINALIZED);

// Safe to increment non-atomically because we have a lock on
// the insert lock
uint64 process_generation = ctxt->generation++;
memtable_mark_empty(ctxt);
cache_unlock(cc, *lock_page);
cache_unclaim(cc, *lock_page);
cache_unget(cc, *lock_page);
memtable_unlock_insert_lock(ctxt);
memtable_process(ctxt, process_generation);
} else {
cache_unget(cc, *lock_page);
platform_sleep_ns(wait);
wait *= 2;
wait = wait > 2048 ? wait : 2 * wait;
}
continue;
}
Expand Down Expand Up @@ -144,46 +207,6 @@ memtable_insert(memtable_context *ctxt,
return rc;
}

void
memtable_unget_insert_lock(memtable_context *ctxt, page_handle *lock_page)
{
cache_unget(ctxt->cc, lock_page);
}

page_handle *
memtable_get_lookup_lock(memtable_context *ctxt)
{
return cache_get(
ctxt->cc, ctxt->lookup_lock_addr, TRUE, PAGE_TYPE_LOCK_NO_DATA);
}

void
memtable_unget_lookup_lock(memtable_context *ctxt, page_handle *lock_page)
{
cache_unget(ctxt->cc, lock_page);
}

page_handle *
memtable_uncontended_get_claim_lock_lookup_lock(memtable_context *ctxt)
{
page_handle *lock_page = memtable_get_lookup_lock(ctxt);
cache *cc = ctxt->cc;
bool claimed = cache_try_claim(cc, lock_page);
platform_assert(claimed);
cache_lock(cc, lock_page);
return lock_page;
}

void
memtable_unlock_unclaim_unget_lookup_lock(memtable_context *ctxt,
page_handle *lock_page)
{
cache *cc = ctxt->cc;
cache_unlock(cc, lock_page);
cache_unclaim(cc, lock_page);
cache_unget(cc, lock_page);
}

/*
* if there are no outstanding refs, then destroy and reinit memtable and
* transition to READY
Expand All @@ -209,18 +232,7 @@ memtable_dec_ref_maybe_recycle(memtable_context *ctxt, memtable *mt)
uint64
memtable_force_finalize(memtable_context *ctxt)
{
uint64 lock_addr = ctxt->insert_lock_addr;
cache *cc = ctxt->cc;
page_handle *lock_page =
cache_get(cc, lock_addr, TRUE, PAGE_TYPE_LOCK_NO_DATA);
uint64 wait = 100;
while (!cache_try_claim(cc, lock_page)) {
cache_unget(cc, lock_page);
platform_sleep_ns(wait);
wait *= 2;
lock_page = cache_get(cc, lock_addr, TRUE, PAGE_TYPE_LOCK_NO_DATA);
}
cache_lock(cc, lock_page);
memtable_lock_insert_lock(ctxt);

uint64 generation = ctxt->generation;
uint64 mt_no = generation % ctxt->cfg.max_memtables;
Expand All @@ -229,10 +241,7 @@ memtable_force_finalize(memtable_context *ctxt)
uint64 process_generation = ctxt->generation++;
memtable_mark_empty(ctxt);

cache_unlock(cc, lock_page);
cache_unclaim(cc, lock_page);
cache_unget(cc, lock_page);

memtable_unlock_insert_lock(ctxt);
return process_generation;
}

Expand Down Expand Up @@ -268,29 +277,10 @@ memtable_context_create(platform_heap_id hid,
ctxt->cc = cc;
memmove(&ctxt->cfg, cfg, sizeof(ctxt->cfg));

uint64 base_addr;
allocator *al = cache_get_allocator(cc);
platform_status rc = allocator_alloc(al, &base_addr, PAGE_TYPE_LOCK_NO_DATA);
platform_assert_status_ok(rc);

ctxt->insert_lock_addr = base_addr;
ctxt->lookup_lock_addr = base_addr + cache_page_size(cc);

page_handle *lock_page =
cache_alloc(cc, ctxt->insert_lock_addr, PAGE_TYPE_LOCK_NO_DATA);
cache_pin(cc, lock_page);
cache_unlock(cc, lock_page);
cache_unclaim(cc, lock_page);
cache_unget(cc, lock_page);

lock_page = cache_alloc(cc, ctxt->lookup_lock_addr, PAGE_TYPE_LOCK_NO_DATA);
cache_pin(cc, lock_page);
cache_unlock(cc, lock_page);
cache_unclaim(cc, lock_page);
cache_unget(cc, lock_page);

platform_spinlock_init(
&ctxt->incorporation_lock, platform_get_module_id(), hid);
platform_mutex_init(
&ctxt->incorporation_mutex, platform_get_module_id(), hid);
ctxt->rwlock = TYPED_MALLOC(hid, ctxt->rwlock);
platform_batch_rwlock_init(ctxt->rwlock);

for (uint64 mt_no = 0; mt_no < cfg->max_memtables; mt_no++) {
uint64 generation = mt_no;
Expand All @@ -317,19 +307,8 @@ memtable_context_destroy(platform_heap_id hid, memtable_context *ctxt)
memtable_deinit(cc, &ctxt->mt[mt_no]);
}

platform_spinlock_destroy(&ctxt->incorporation_lock);

/*
* lookup lock and insert lock share extents but not pages.
* this deallocs both.
*/
allocator *al = cache_get_allocator(cc);
uint8 ref =
allocator_dec_ref(al, ctxt->insert_lock_addr, PAGE_TYPE_LOCK_NO_DATA);
platform_assert(ref == AL_NO_REFS);
cache_extent_discard(cc, ctxt->insert_lock_addr, PAGE_TYPE_LOCK_NO_DATA);
ref = allocator_dec_ref(al, ctxt->insert_lock_addr, PAGE_TYPE_LOCK_NO_DATA);
platform_assert(ref == AL_FREE);
platform_mutex_destroy(&ctxt->incorporation_mutex);
platform_free(hid, ctxt->rwlock);

platform_free(hid, ctxt);
}
Expand Down
54 changes: 27 additions & 27 deletions src/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,20 @@ typedef struct memtable_context {
process_fn process;
void *process_ctxt;

// Protected by insert_lock. Can read without lock. Must get read lock to
// freeze and write lock to modify.
uint64 insert_lock_addr;
// batch distributed read/write locks protect the generation and
// generation_retired counters
platform_batch_rwlock *rwlock;

// Protected by the MEMTABLE_INSERT_LOCK_IDX'th lock of rwlock. Can read
// without lock. Must get read lock to freeze and write lock to modify.
volatile uint64 generation;

// Protected by incorporation_lock. Must hold to read or modify.
platform_spinlock incorporation_lock;
volatile uint64 generation_to_incorporate;
// Protected by incorporation_mutex. Must hold to read or modify.
platform_mutex incorporation_mutex;
volatile uint64 generation_to_incorporate;

// Protected by the lookup lock. Must hold read lock to read and write lock
// to modify.
uint64 lookup_lock_addr;
// Protected by the MEMTABLE_INSERT_LOCK_IDX'th lock of rwlock. Must hold
// read lock to read and write lock to modify.
volatile uint64 generation_retired;

bool is_empty;
Expand All @@ -139,11 +141,22 @@ typedef struct memtable_context {

platform_status
memtable_maybe_rotate_and_get_insert_lock(memtable_context *ctxt,
uint64 *generation,
page_handle **lock_page);
uint64 *generation);

void
memtable_unget_insert_lock(memtable_context *ctxt, page_handle *lock_page);
memtable_unget_insert_lock(memtable_context *ctxt);

void
memtable_get_lookup_lock(memtable_context *ctxt);

void
memtable_unget_lookup_lock(memtable_context *ctxt);

void
memtable_lock_lookup_lock(memtable_context *ctxt);

void
memtable_unlock_lookup_lock(memtable_context *ctxt);

platform_status
memtable_insert(memtable_context *ctxt,
Expand All @@ -153,19 +166,6 @@ memtable_insert(memtable_context *ctxt,
message msg,
uint64 *generation);

page_handle *
memtable_get_lookup_lock(memtable_context *ctxt);

void
memtable_unget_lookup_lock(memtable_context *ctxt, page_handle *lock_page);

page_handle *
memtable_uncontended_get_claim_lock_lookup_lock(memtable_context *ctxt);

void
memtable_unlock_unclaim_unget_lookup_lock(memtable_context *ctxt,
page_handle *lock_page);

bool
memtable_dec_ref_maybe_recycle(memtable_context *ctxt, memtable *mt);

Expand Down Expand Up @@ -245,13 +245,13 @@ memtable_increment_to_generation_retired(memtable_context *ctxt,
static inline void
memtable_lock_incorporation_lock(memtable_context *ctxt)
{
platform_spin_lock(&ctxt->incorporation_lock);
platform_mutex_lock(&ctxt->incorporation_mutex);
}

static inline void
memtable_unlock_incorporation_lock(memtable_context *ctxt)
{
platform_spin_unlock(&ctxt->incorporation_lock);
platform_mutex_unlock(&ctxt->incorporation_mutex);
}

static inline void
Expand Down
Loading

0 comments on commit fa990cf

Please sign in to comment.