Skip to content

Commit

Permalink
Robj/task system config (vmware#497)
Browse files Browse the repository at this point in the history
* add a task_system_config and plumb it through the tests
* allow using bg and fg threads at the same time
* make all possible task configs valid
* fix task shutdown race
* rename platform_sleep to platform_sleep_ns
* cleanup redundant code in task.c
  • Loading branch information
rtjohnso authored Jan 6, 2023
1 parent bcd7933 commit 20a926d
Show file tree
Hide file tree
Showing 36 changed files with 952 additions and 819 deletions.
69 changes: 54 additions & 15 deletions include/splinterdb/splinterdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ typedef struct {
bool cache_use_stats;
const char *cache_logfile;

// task system
// Background threads configuration:
//
// - Memtable bg-threads work on Memtables tasks which are short but latency
// sensitive. A rule of thumb is to allocate around 1 memtable bg-thread
// for every 10 threads performing insertions. Too few memtable threads
// can cause some insertions to have high latency.
// - Normal bg-threads work on task such as compacting branches in the trunk
// and building filters. These tasks take longer and are less latency
// sensitive. A rule of thumb is to allocate 1-2 normal background
// threads for every thread performing insertions. Too few "normal"
// background threads can cause disk I/O bandwidth to go underutilized.
uint64 num_memtable_bg_threads;
uint64 num_normal_bg_threads;

// btree
uint64 btree_rough_count_height;

Expand All @@ -67,21 +82,45 @@ typedef struct {
uint64 use_stats;
uint64 reclaim_threshold;

// Background threads configuration: Both have to be non-zero in order for
// background threads to be started. (It is an error for one to be zero
// while the other is non-zero.)
//
// - Memtable bg-threads work on Memtables tasks which are short but latency
// sensitive. A rule of thumb is to allocate around 1 memtable bg-thread
// for every 10 threads performing insertions. Too few memtable threads
// can cause some insertions to have high latency.
// - Normal bg-threads work on task such as compacting branches in the trunk
// and building filters. These tasks take longer and are less latency
// sensitive. A rule of thumb is to allocate 1-2 normal background
// threads for every thread performing insertions. Too few "normal"
// background threads can cause disk I/O bandwidth to go underutilized.
uint64 num_memtable_bg_threads;
uint64 num_normal_bg_threads;
// The following parameter governs when foreground threads
// performing an update to the database will perform queued
// background tasks. When a foreground thread performs a
// background task, the latency of that update can be very large,
// because some background tasks can take many milliseconds to
// execute. However, if foreground threads never perform
// background tasks, then queues of background tasks may grow
// unboundedly if there are not enough background threads, and this
// may cause some processes, such as memtable rotation, to stall
// updates to the database.

// When queue_scale_percent is 0, then foreground threads will
// perform a background task whenever one is available. This will
// result in high tail latencies for database updates, but will
// ensure that background task queues are always short.

// When queue_scale_percent is UINT64_MAX, then foreground threads
// will never perform background tasks unless there are no background
// threads allocated to that task group. This will ensure that
// foreground tasks have low latency, but requires that you
// configure enough background threads to keep up with arriving
// background tasks. Thus you should use this option only if you
// know how many background threads you need for each task type.

// The default value of 100 says that foreground threads will begin
// performing background tasks if there are more queued tasks than
// there are background threads to serve them. This heuristic
// allows you to configure the number of background threads as you
// see fit, and the system will do its best to execute tasks on the
// provided background threads, but will perform tasks on
// foreground threads if needed.

// Increasing this value (e.g. to 200, 300, etc), will cause more
// work to take place on background threads, but task queues may
// grow longer, causing some other parts of the system to stall.
// Decreasing this value (e.g. to 50, 25, 10, etc) will cause more
// work to be performed on foreground threads, increasing tail
// latencies.
uint64 queue_scale_percent;

} splinterdb_config;

Expand Down
2 changes: 1 addition & 1 deletion src/btree.c
Original file line number Diff line number Diff line change
Expand Up @@ -1898,7 +1898,7 @@ btree_insert(cache *cc, // IN
bool need_to_rebuild_spec = FALSE;
while (!btree_node_claim(cc, cfg, &child_node)) {
btree_node_unget(cc, cfg, &child_node);
platform_sleep(leaf_wait);
platform_sleep_ns(leaf_wait);
leaf_wait = leaf_wait > 2048 ? leaf_wait : 2 * leaf_wait;
btree_node_get(cc, cfg, &child_node, PAGE_TYPE_MEMTABLE);
need_to_rebuild_spec = TRUE;
Expand Down
9 changes: 4 additions & 5 deletions src/clockcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "allocator.h"
#include "clockcache.h"
#include "io.h"
#include "task.h"

#include <stddef.h>
#include "util.h"
Expand Down Expand Up @@ -990,7 +989,7 @@ clockcache_get_read(clockcache *cc, uint32 entry_number)

uint64 wait = 1;
while (rc == GET_RC_CONFLICT) {
platform_sleep(wait);
platform_sleep_ns(wait);
wait = wait > 1024 ? wait : 2 * wait;
rc = clockcache_try_get_read(cc, entry_number, TRUE);
}
Expand Down Expand Up @@ -1082,12 +1081,12 @@ clockcache_get_write(clockcache *cc, uint32 entry_number)
for (threadid thr_i = 0; thr_i < CC_RC_WIDTH; thr_i++) {
if (tid % CC_RC_WIDTH != thr_i) {
while (clockcache_get_ref(cc, entry_number, thr_i)) {
platform_sleep(1);
platform_sleep_ns(1);
}
} else {
// we have a single ref, so wait for others to drop
while (clockcache_get_ref(cc, entry_number, thr_i) > 1) {
platform_sleep(1);
platform_sleep_ns(1);
}
}
}
Expand Down Expand Up @@ -1589,7 +1588,7 @@ clockcache_get_free_page(clockcache *cc,
clockcache_entry *entry;
timestamp wait_start;

debug_assert(tid < MAX_THREADS - 1);
debug_assert(tid < MAX_THREADS);
if (cc->per_thread[tid].free_hand == CC_UNMAPPED_ENTRY) {
clockcache_move_hand(cc, FALSE);
}
Expand Down
1 change: 0 additions & 1 deletion src/clockcache.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "allocator.h"
#include "cache.h"
#include "io.h"
#include "task.h"

//#define ADDR_TRACING
#define TRACE_ADDR (UINT64_MAX - 1)
Expand Down
6 changes: 3 additions & 3 deletions src/memtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ memtable_maybe_rotate_and_get_insert_lock(memtable_context *ctxt,
if (mt->state != MEMTABLE_STATE_READY) {
// The next memtable is not ready yet, back off and wait.
cache_unget(cc, *lock_page);
platform_sleep(wait);
platform_sleep_ns(wait);
wait = wait > 2048 ? wait : 2 * wait;
continue;
}
Expand All @@ -78,7 +78,7 @@ memtable_maybe_rotate_and_get_insert_lock(memtable_context *ctxt,
memtable_process(ctxt, process_generation);
} else {
cache_unget(cc, *lock_page);
platform_sleep(wait);
platform_sleep_ns(wait);
wait *= 2;
}
continue;
Expand Down Expand Up @@ -216,7 +216,7 @@ memtable_force_finalize(memtable_context *ctxt)
uint64 wait = 100;
while (!cache_claim(cc, lock_page)) {
cache_unget(cc, lock_page);
platform_sleep(wait);
platform_sleep_ns(wait);
wait *= 2;
lock_page = cache_get(cc, lock_addr, TRUE, PAGE_TYPE_LOCK_NO_DATA);
}
Expand Down
8 changes: 4 additions & 4 deletions src/mini_allocator.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ mini_full_lock_meta_tail(mini_allocator *mini)
break;
}
cache_unget(mini->cc, meta_page);
platform_sleep(wait);
platform_sleep_ns(wait);
wait = wait > 1024 ? wait : 2 * wait;
}
cache_lock(mini->cc, meta_page);
Expand Down Expand Up @@ -210,7 +210,7 @@ mini_get_claim_meta_page(cache *cc, uint64 meta_addr, page_type type)
break;
}
cache_unget(cc, meta_page);
platform_sleep(wait);
platform_sleep_ns(wait);
wait = wait > 1024 ? wait : 2 * wait;
}
return meta_page;
Expand Down Expand Up @@ -455,7 +455,7 @@ mini_lock_batch_get_next_addr(mini_allocator *mini, uint64 batch)
|| !__sync_bool_compare_and_swap(
&mini->next_addr[batch], next_addr, MINI_WAIT))
{
platform_sleep(wait);
platform_sleep_ns(wait);
wait = wait > 1024 ? wait : 2 * wait;
next_addr = mini->next_addr[batch];
}
Expand Down Expand Up @@ -1125,7 +1125,7 @@ mini_wait_for_blockers(cache *cc, uint64 meta_head)
allocator *al = cache_get_allocator(cc);
uint64 wait = 1;
while (allocator_get_ref(al, base_addr(cc, meta_head)) != AL_ONE_REF) {
platform_sleep(wait);
platform_sleep_ns(wait);
wait = wait > 1024 ? wait : 2 * wait;
}
}
Expand Down
1 change: 0 additions & 1 deletion src/platform_linux/laio.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#pragma once

#include "io.h"
#include "task.h"
#include <libaio.h>

/*
Expand Down
2 changes: 1 addition & 1 deletion src/platform_linux/platform.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

#include <sys/mman.h>

__thread threadid xxxtid;
__thread threadid xxxtid = INVALID_TID;

bool platform_use_hugetlb = FALSE;
bool platform_use_mlock = FALSE;
Expand Down
2 changes: 1 addition & 1 deletion src/platform_linux/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ static inline timestamp
platform_get_real_time(void);

static inline void
platform_sleep(uint64 ns);
platform_sleep_ns(uint64 ns);

static inline void
platform_semaphore_destroy(platform_semaphore *sema);
Expand Down
2 changes: 1 addition & 1 deletion src/platform_linux/platform_inline.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ platform_pause()
}

static inline void
platform_sleep(uint64 ns)
platform_sleep_ns(uint64 ns)
{
if (ns < USEC_TO_NSEC(50)) {
for (uint64 i = 0; i < ns / 5 + 1; i++) {
Expand Down
2 changes: 1 addition & 1 deletion src/shard_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ shard_log_write(log_handle *logh, key tuple_key, message msg, uint64 generation)
page = cache_get(cc, thread_data->addr, TRUE, PAGE_TYPE_LOG);
uint64 wait = 1;
while (!cache_claim(cc, page)) {
platform_sleep(wait);
platform_sleep_ns(wait);
wait = wait > 1024 ? wait : 2 * wait;
}
cache_lock(cc, page);
Expand Down
1 change: 0 additions & 1 deletion src/shard_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include "iterator.h"
#include "splinterdb/data.h"
#include "mini_allocator.h"
#include "task.h"

/*
* Configuration structure to set up the sharded log sub-system.
Expand Down
24 changes: 14 additions & 10 deletions src/splinterdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ typedef struct splinterdb {
clockcache_config cache_cfg;
clockcache cache_handle;
shard_log_config log_cfg;
task_system_config task_cfg;
allocator_root_id trunk_id;
trunk_config trunk_cfg;
trunk_handle *spl;
Expand Down Expand Up @@ -188,6 +189,16 @@ splinterdb_init_config(const splinterdb_config *kvs_cfg, // IN

shard_log_config_init(&kvs->log_cfg, &kvs->cache_cfg.super, kvs->data_cfg);

uint64 num_bg_threads[NUM_TASK_TYPES] = {0};
num_bg_threads[TASK_TYPE_MEMTABLE] = kvs_cfg->num_memtable_bg_threads;
num_bg_threads[TASK_TYPE_NORMAL] = kvs_cfg->num_normal_bg_threads;

rc = task_system_config_init(
&kvs->task_cfg, cfg.use_stats, num_bg_threads, trunk_get_scratch_size());
if (!SUCCESS(rc)) {
return rc;
}

trunk_config_init(&kvs->trunk_cfg,
&kvs->cache_cfg.super,
kvs->data_cfg,
Expand All @@ -199,6 +210,7 @@ splinterdb_init_config(const splinterdb_config *kvs_cfg, // IN
cfg.filter_remainder_size,
cfg.filter_index_size,
cfg.reclaim_threshold,
cfg.queue_scale_percent,
cfg.use_log,
cfg.use_stats,
FALSE,
Expand Down Expand Up @@ -245,16 +257,8 @@ splinterdb_create_or_open(const splinterdb_config *kvs_cfg, // IN
goto deinit_kvhandle;
}

uint64 num_bg_threads[NUM_TASK_TYPES] = {0};
num_bg_threads[TASK_TYPE_MEMTABLE] = kvs_cfg->num_memtable_bg_threads;
num_bg_threads[TASK_TYPE_NORMAL] = kvs_cfg->num_normal_bg_threads;

status = task_system_create(kvs->heap_id,
&kvs->io_handle,
&kvs->task_sys,
TRUE,
num_bg_threads,
trunk_get_scratch_size());
status = task_system_create(
kvs->heap_id, &kvs->io_handle, &kvs->task_sys, &kvs->task_cfg);
if (!SUCCESS(status)) {
platform_error_log(
"Failed to initialize SplinterDB task system state: %s\n",
Expand Down
Loading

0 comments on commit 20a926d

Please sign in to comment.