Skip to content

Commit

Permalink
Remove redundant call between serializer and disk consumer. Also rena…
Browse files Browse the repository at this point in the history
…med a variable to make its purpose more clear. (cmu-db#1235)
  • Loading branch information
mbutrovich authored Oct 14, 2020
1 parent a170a6e commit dc108eb
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DiskLogConsumerTask : public common::DedicatedThreadTask {
common::ConcurrentQueue<SerializedLogs> *filled_buffer_queue_;

// Flag used by the serializer thread to signal the disk log consumer task thread to persist the data on disk
volatile bool do_persist_;
volatile bool force_flush_;

// Synchronisation primitives to synchronise persisting buffers to disk
std::mutex persist_lock_;
Expand Down
6 changes: 3 additions & 3 deletions src/storage/write_ahead_log/disk_log_consumer_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void DiskLogConsumerTask::DiskLogConsumerTaskLoop() {
// 4) Our persist interval timed out

bool signaled = disk_log_writer_thread_cv_.wait_for(
lock, curr_sleep, [&] { return do_persist_ || !filled_buffer_queue_->Empty() || !run_task_; });
lock, curr_sleep, [&] { return force_flush_ || !filled_buffer_queue_->Empty() || !run_task_; });
next_sleep = signaled ? persist_interval_ : curr_sleep * 2;
next_sleep = std::min(next_sleep, max_sleep);
}
Expand All @@ -105,14 +105,14 @@ void DiskLogConsumerTask::DiskLogConsumerTaskLoop() {
bool timeout = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now() -
last_persist) > curr_sleep;

if (timeout || current_data_written_ > persist_threshold_ || do_persist_ || !run_task_) {
if (timeout || current_data_written_ > persist_threshold_ || force_flush_ || !run_task_) {
std::unique_lock<std::mutex> lock(persist_lock_);
num_buffers = PersistLogFile();
num_bytes = current_data_written_;
// Reset meta data
last_persist = std::chrono::high_resolution_clock::now();
current_data_written_ = 0;
do_persist_ = false;
force_flush_ = false;

// Signal anyone who forced a persist that the persist has finished
persist_cv_.notify_all();
Expand Down
4 changes: 2 additions & 2 deletions src/storage/write_ahead_log/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ void LogManager::ForceFlush() {
log_serializer_task_->Process();
// Signal the disk log consumer task thread to persist the buffers to disk
std::unique_lock<std::mutex> lock(disk_log_writer_task_->persist_lock_);
disk_log_writer_task_->do_persist_ = true;
disk_log_writer_task_->force_flush_ = true;
disk_log_writer_task_->disk_log_writer_thread_cv_.notify_one();

// Wait for the disk log consumer task thread to persist the logs
disk_log_writer_task_->persist_cv_.wait(lock, [&] { return !disk_log_writer_task_->do_persist_; });
disk_log_writer_task_->persist_cv_.wait(lock, [&] { return !disk_log_writer_task_->force_flush_; });
}

void LogManager::PersistAndStop() {
Expand Down
3 changes: 0 additions & 3 deletions src/storage/write_ahead_log/log_serializer_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ std::tuple<uint64_t, uint64_t, uint64_t> LogSerializerTask::Process() {
// Mark the last buffer that was written to as full
if (buffers_processed) HandFilledBufferToWriter();

// Mark the last buffer that was written to as full
if (filled_buffer_ != nullptr) HandFilledBufferToWriter();

// Bulk remove all the transactions we serialized. This prevents having to take the TimestampManager's latch once
// for each timestamp we remove.
for (const auto &txns : serialized_txns_) {
Expand Down

0 comments on commit dc108eb

Please sign in to comment.