Skip to content

Commit

Permalink
Merge pull request ceph#28539 from dillaman/wip-40072
Browse files Browse the repository at this point in the history
librbd: improve journal performance to match expected degradation

Reviewed-by: Mykola Golub <[email protected]>
  • Loading branch information
trociny authored Jun 20, 2019
2 parents e4114eb + 5f3b043 commit 961df63
Show file tree
Hide file tree
Showing 20 changed files with 430 additions and 399 deletions.
8 changes: 7 additions & 1 deletion src/common/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7304,12 +7304,18 @@ static std::vector<Option> get_rbd_options() {
.set_default(5)
.set_description("commit time interval, seconds"),

Option("rbd_journal_object_writethrough_until_flush", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(true)
.set_description("when enabled, the rbd_journal_object_flush* configuration "
"options are ignored until the first flush so that batched "
"journal IO is known to be safe for consistency"),

Option("rbd_journal_object_flush_interval", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(0)
.set_description("maximum number of pending commits per journal object"),

Option("rbd_journal_object_flush_bytes", Option::TYPE_SIZE, Option::LEVEL_ADVANCED)
.set_default(0)
.set_default(1_M)
.set_description("maximum number of pending bytes per journal object"),

Option("rbd_journal_object_flush_age", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED)
Expand Down
112 changes: 66 additions & 46 deletions src/journal/JournalRecorder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

#define dout_subsys ceph_subsys_journaler
#undef dout_prefix
#define dout_prefix *_dout << "JournalRecorder: " << this << " "
#define dout_prefix *_dout << "JournalRecorder: " << this << " " << __func__ \
<< ": "

using std::shared_ptr;

Expand Down Expand Up @@ -49,12 +50,9 @@ struct C_Flush : public Context {
JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
const std::string &object_oid_prefix,
const JournalMetadataPtr& journal_metadata,
uint32_t flush_interval, uint64_t flush_bytes,
double flush_age,
uint64_t max_in_flight_appends)
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
m_flush_bytes(flush_bytes), m_flush_age(flush_age),
m_journal_metadata(journal_metadata),
m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
m_current_set(m_journal_metadata->get_active_set()) {
Expand All @@ -65,13 +63,14 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,

uint8_t splay_width = m_journal_metadata->get_splay_width();
for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
m_object_locks.push_back(shared_ptr<Mutex>(
new Mutex("ObjectRecorder::m_lock::"+
std::to_string(splay_offset))));
shared_ptr<Mutex> object_lock(new Mutex(
"ObjectRecorder::m_lock::" + std::to_string(splay_offset)));
m_object_locks.push_back(object_lock);

uint64_t object_number = splay_offset + (m_current_set * splay_width);
Mutex::Locker locker(*object_lock);
m_object_ptrs[splay_offset] = create_object_recorder(
object_number,
m_object_locks[splay_offset]);
object_number, m_object_locks[splay_offset]);
}

m_journal_metadata->add_listener(&m_listener);
Expand Down Expand Up @@ -108,8 +107,30 @@ void JournalRecorder::shut_down(Context *on_safe) {
flush(on_safe);
}

void JournalRecorder::set_append_batch_options(int flush_interval,
uint64_t flush_bytes,
double flush_age) {
ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
<< "flush_bytes=" << flush_bytes << ", "
<< "flush_age=" << flush_age << dendl;

Mutex::Locker locker(m_lock);
m_flush_interval = flush_interval;
m_flush_bytes = flush_bytes;
m_flush_age = flush_age;

uint8_t splay_width = m_journal_metadata->get_splay_width();
for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
Mutex::Locker object_locker(*m_object_locks[splay_offset]);
auto object_recorder = get_object(splay_offset);
object_recorder->set_append_batch_options(flush_interval, flush_bytes,
flush_age);
}
}

Future JournalRecorder::append(uint64_t tag_tid,
const bufferlist &payload_bl) {
ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl;

m_lock.Lock();

Expand All @@ -132,7 +153,9 @@ Future JournalRecorder::append(uint64_t tag_tid,
entry_bl);
ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size());

bool object_full = object_ptr->append_unlock({{future, entry_bl}});
bool object_full = object_ptr->append({{future, entry_bl}});
m_object_locks[splay_offset]->Unlock();

if (object_full) {
ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
<< dendl;
Expand All @@ -143,6 +166,8 @@ Future JournalRecorder::append(uint64_t tag_tid,
}

void JournalRecorder::flush(Context *on_safe) {
ldout(m_cct, 20) << dendl;

C_Flush *ctx;
{
Mutex::Locker locker(m_lock);
Expand Down Expand Up @@ -172,7 +197,7 @@ void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {

// entry overflow from open object
if (m_current_set != object_set) {
ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl;
ldout(m_cct, 20) << "close already in-progress" << dendl;
return;
}

Expand All @@ -186,8 +211,7 @@ void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
++m_current_set;
++m_in_flight_advance_sets;

ldout(m_cct, 20) << __func__ << ": closing active object set "
<< object_set << dendl;
ldout(m_cct, 10) << "closing active object set " << object_set << dendl;
if (close_object_set(m_current_set)) {
advance_object_set();
}
Expand All @@ -197,8 +221,7 @@ void JournalRecorder::advance_object_set() {
ceph_assert(m_lock.is_locked());

ceph_assert(m_in_flight_object_closes == 0);
ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set
<< dendl;
ldout(m_cct, 10) << "advance to object set " << m_current_set << dendl;
m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
this));
}
Expand All @@ -213,8 +236,8 @@ void JournalRecorder::handle_advance_object_set(int r) {
--m_in_flight_advance_sets;

if (r < 0 && r != -ESTALE) {
lderr(m_cct) << __func__ << ": failed to advance object set: "
<< cpp_strerror(r) << dendl;
lderr(m_cct) << "failed to advance object set: " << cpp_strerror(r)
<< dendl;
}

if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
Expand All @@ -230,8 +253,7 @@ void JournalRecorder::handle_advance_object_set(int r) {
void JournalRecorder::open_object_set() {
ceph_assert(m_lock.is_locked());

ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set
<< dendl;
ldout(m_cct, 10) << "opening object set " << m_current_set << dendl;

uint8_t splay_width = m_journal_metadata->get_splay_width();

Expand All @@ -244,15 +266,14 @@ void JournalRecorder::open_object_set() {
ceph_assert(object_recorder->is_closed());

// ready to close object and open object in active set
create_next_object_recorder_unlock(object_recorder);
} else {
uint8_t splay_offset = object_number % splay_width;
m_object_locks[splay_offset]->Unlock();
create_next_object_recorder(object_recorder);
}
}
unlock_object_recorders();
}

bool JournalRecorder::close_object_set(uint64_t active_set) {
ldout(m_cct, 10) << "active_set=" << active_set << dendl;
ceph_assert(m_lock.is_locked());

// object recorders will invoke overflow handler as they complete
Expand All @@ -263,14 +284,14 @@ bool JournalRecorder::close_object_set(uint64_t active_set) {
it != m_object_ptrs.end(); ++it) {
ObjectRecorderPtr object_recorder = it->second;
if (object_recorder->get_object_number() / splay_width != active_set) {
ldout(m_cct, 10) << __func__ << ": closing object "
<< object_recorder->get_oid() << dendl;
ldout(m_cct, 10) << "closing object " << object_recorder->get_oid()
<< dendl;
// flush out all queued appends and hold future appends
if (!object_recorder->close()) {
++m_in_flight_object_closes;
} else {
ldout(m_cct, 20) << __func__ << ": object "
<< object_recorder->get_oid() << " closed" << dendl;
ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " closed"
<< dendl;
}
}
}
Expand All @@ -280,30 +301,32 @@ bool JournalRecorder::close_object_set(uint64_t active_set) {

ObjectRecorderPtr JournalRecorder::create_object_recorder(
uint64_t object_number, shared_ptr<Mutex> lock) {
ldout(m_cct, 10) << "object_number=" << object_number << dendl;
ObjectRecorderPtr object_recorder(new ObjectRecorder(
m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
object_number, lock, m_journal_metadata->get_work_queue(),
m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
&m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
m_flush_bytes, m_flush_age, m_max_in_flight_appends));
&m_object_handler, m_journal_metadata->get_order(),
m_max_in_flight_appends));
object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes,
m_flush_age);
return object_recorder;
}

void JournalRecorder::create_next_object_recorder_unlock(
void JournalRecorder::create_next_object_recorder(
ObjectRecorderPtr object_recorder) {
ceph_assert(m_lock.is_locked());

uint64_t object_number = object_recorder->get_object_number();
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint8_t splay_offset = object_number % splay_width;
ldout(m_cct, 10) << "object_number=" << object_number << dendl;

ceph_assert(m_object_locks[splay_offset]->is_locked());

ObjectRecorderPtr new_object_recorder = create_object_recorder(
(m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);

ldout(m_cct, 10) << __func__ << ": "
<< "old oid=" << object_recorder->get_oid() << ", "
ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", "
<< "new oid=" << new_object_recorder->get_oid() << dendl;
AppendBuffers append_buffers;
object_recorder->claim_append_buffers(&append_buffers);
Expand All @@ -315,7 +338,7 @@ void JournalRecorder::create_next_object_recorder_unlock(
new_object_recorder->get_object_number());
}

new_object_recorder->append_unlock(std::move(append_buffers));
new_object_recorder->append(std::move(append_buffers));
m_object_ptrs[splay_offset] = new_object_recorder;
}

Expand All @@ -325,15 +348,13 @@ void JournalRecorder::handle_update() {
uint64_t active_set = m_journal_metadata->get_active_set();
if (m_current_set < active_set) {
// peer journal client advanced the active set
ldout(m_cct, 20) << __func__ << ": "
<< "current_set=" << m_current_set << ", "
ldout(m_cct, 10) << "current_set=" << m_current_set << ", "
<< "active_set=" << active_set << dendl;

uint64_t current_set = m_current_set;
m_current_set = active_set;
if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
ldout(m_cct, 20) << __func__ << ": closing current object set "
<< current_set << dendl;
ldout(m_cct, 10) << "closing current object set " << current_set << dendl;
if (close_object_set(active_set)) {
open_object_set();
}
Expand All @@ -342,7 +363,7 @@ void JournalRecorder::handle_update() {
}

void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
ldout(m_cct, 10) << object_recorder->get_oid() << dendl;

Mutex::Locker locker(m_lock);

Expand All @@ -356,8 +377,8 @@ void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
--m_in_flight_object_closes;

// object closed after advance active set committed
ldout(m_cct, 20) << __func__ << ": object "
<< active_object_recorder->get_oid() << " closed" << dendl;
ldout(m_cct, 10) << "object " << active_object_recorder->get_oid()
<< " closed" << dendl;
if (m_in_flight_object_closes == 0) {
if (m_in_flight_advance_sets == 0) {
// peer forced closing of object set
Expand All @@ -370,7 +391,7 @@ void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
}

void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
ldout(m_cct, 10) << object_recorder->get_oid() << dendl;

Mutex::Locker locker(m_lock);

Expand All @@ -380,9 +401,8 @@ void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
ceph_assert(active_object_recorder->get_object_number() == object_number);

ldout(m_cct, 20) << __func__ << ": object "
<< active_object_recorder->get_oid() << " overflowed"
<< dendl;
ldout(m_cct, 10) << "object " << active_object_recorder->get_oid()
<< " overflowed" << dendl;
close_and_advance_object_set(object_number / splay_width);
}

Expand Down
15 changes: 9 additions & 6 deletions src/journal/JournalRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ class JournalRecorder {
public:
JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
const JournalMetadataPtr &journal_metadata,
uint32_t flush_interval, uint64_t flush_bytes,
double flush_age, uint64_t max_in_flight_appends);
uint64_t max_in_flight_appends);
~JournalRecorder();

void shut_down(Context *on_safe);

void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
double flush_age);

Future append(uint64_t tag_tid, const bufferlist &bl);
void flush(Context *on_safe);

Expand Down Expand Up @@ -79,9 +82,9 @@ class JournalRecorder {

JournalMetadataPtr m_journal_metadata;

uint32_t m_flush_interval;
uint64_t m_flush_bytes;
double m_flush_age;
uint32_t m_flush_interval = 0;
uint64_t m_flush_bytes = 0;
double m_flush_age = 0;
uint64_t m_max_in_flight_appends;

Listener m_listener;
Expand Down Expand Up @@ -109,7 +112,7 @@ class JournalRecorder {

ObjectRecorderPtr create_object_recorder(uint64_t object_number,
std::shared_ptr<Mutex> lock);
void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder);
void create_next_object_recorder(ObjectRecorderPtr object_recorder);

void handle_update();

Expand Down
13 changes: 9 additions & 4 deletions src/journal/Journaler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,20 @@ void Journaler::committed(const Future &future) {
m_trimmer->committed(future_impl->get_commit_tid());
}

void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
double flush_age, uint64_t max_in_flight_appends) {
void Journaler::start_append(uint64_t max_in_flight_appends) {
ceph_assert(m_recorder == nullptr);

// TODO verify active object set >= current replay object set

m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
m_metadata, flush_interval, flush_bytes,
flush_age, max_in_flight_appends);
m_metadata, max_in_flight_appends);
}

void Journaler::set_append_batch_options(int flush_interval,
uint64_t flush_bytes,
double flush_age) {
ceph_assert(m_recorder != nullptr);
m_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age);
}

void Journaler::stop_append(Context *on_safe) {
Expand Down
5 changes: 3 additions & 2 deletions src/journal/Journaler.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ class Journaler {
void stop_replay(Context *on_finish);

uint64_t get_max_append_size() const;
void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
uint64_t max_in_flight_appends);
void start_append(uint64_t max_in_flight_appends);
void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
double flush_age);
Future append(uint64_t tag_tid, const bufferlist &bl);
void flush_append(Context *on_safe);
void stop_append(Context *on_safe);
Expand Down
Loading

0 comments on commit 961df63

Please sign in to comment.