Skip to content

Commit

Permalink
osdc: properly acquire locks for getters
Browse files Browse the repository at this point in the history
This was left as a TODO. : /

Signed-off-by: Patrick Donnelly <[email protected]>
  • Loading branch information
batrick committed Sep 25, 2024
1 parent ce5d84d commit 55652f0
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 38 deletions.
46 changes: 23 additions & 23 deletions src/osdc/Journaler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class Journaler::C_ReProbe : public Context {
void Journaler::recover(Context *onread)
{
lock_guard l(lock);
if (is_stopping()) {
if (state == STATE_STOPPING) {
onread->complete(-EAGAIN);
return;
}
Expand Down Expand Up @@ -218,7 +218,7 @@ void Journaler::_reread_head(Context *onfinish)
void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
{
lock_guard l(lock);
if (is_stopping()) {
if (state == STATE_STOPPING) {
finish->complete(-EAGAIN);
return;
}
Expand Down Expand Up @@ -250,7 +250,7 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
void Journaler::_finish_read_head(int r, bufferlist& bl)
{
lock_guard l(lock);
if (is_stopping())
if (state == STATE_STOPPING)
return;

ceph_assert(state == STATE_READHEAD);
Expand Down Expand Up @@ -342,7 +342,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
C_OnFinisher *onfinish)
{
lock_guard l(lock);
if (is_stopping()) {
if (state == STATE_STOPPING) {
onfinish->complete(-EAGAIN);
return;
}
Expand All @@ -359,7 +359,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
void Journaler::_finish_probe_end(int r, uint64_t end)
{
lock_guard l(lock);
if (is_stopping())
if (state == STATE_STOPPING)
return;

ceph_assert(state == STATE_PROBING);
Expand Down Expand Up @@ -413,7 +413,7 @@ void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
{
// Expect to be called back from finish_reread_head, which already takes lock
// lock is locked
if (is_stopping()) {
if (state == STATE_STOPPING) {
onfinish->complete(-EAGAIN);
return;
}
Expand Down Expand Up @@ -605,7 +605,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)
write_pos += wrote;

// flush previous object?
uint64_t su = get_layout_period();
uint64_t su = layout.get_period();
ceph_assert(su > 0);
uint64_t write_off = write_pos % su;
uint64_t write_obj = write_pos / su;
Expand All @@ -630,7 +630,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)

void Journaler::_do_flush(unsigned amount)
{
if (is_stopping())
if (state == STATE_STOPPING)
return;
if (write_pos == flush_pos)
return;
Expand All @@ -645,7 +645,7 @@ void Journaler::_do_flush(unsigned amount)

// zero at least two full periods ahead. this ensures
// that the next object will not exist.
uint64_t period = get_layout_period();
uint64_t period = layout.get_period();
if (flush_pos + len + 2*period > prezero_pos) {
_issue_prezero();

Expand Down Expand Up @@ -718,7 +718,7 @@ void Journaler::_do_flush(unsigned amount)
void Journaler::wait_for_flush(Context *onsafe)
{
lock_guard l(lock);
if (is_stopping()) {
if (state == STATE_STOPPING) {
if (onsafe)
onsafe->complete(-EAGAIN);
return;
Expand Down Expand Up @@ -752,7 +752,7 @@ void Journaler::_wait_for_flush(Context *onsafe)
void Journaler::flush(Context *onsafe)
{
lock_guard l(lock);
if (is_stopping()) {
if (state == STATE_STOPPING) {
if (onsafe)
onsafe->complete(-EAGAIN);
return;
Expand Down Expand Up @@ -812,7 +812,7 @@ void Journaler::_issue_prezero()
* issue zero requests based on write_pos, even though the invariant
* is that we zero ahead of flush_pos.
*/
uint64_t period = get_layout_period();
uint64_t period = layout.get_period();
uint64_t to = write_pos + period * num_periods + period - 1;
to -= to % period;

Expand Down Expand Up @@ -1062,7 +1062,7 @@ void Journaler::_issue_read(uint64_t len)
// here because it will wait for all object reads to complete before
// giving us back any data. this way we can process whatever bits
// come in that are contiguous.
uint64_t period = get_layout_period();
uint64_t period = layout.get_period();
while (len > 0) {
uint64_t e = requested_pos + period;
e -= e % period;
Expand All @@ -1079,7 +1079,7 @@ void Journaler::_issue_read(uint64_t len)

void Journaler::_prefetch()
{
if (is_stopping())
if (state == STATE_STOPPING)
return;

ldout(cct, 10) << "_prefetch" << dendl;
Expand All @@ -1096,7 +1096,7 @@ void Journaler::_prefetch()
uint64_t raw_target = read_pos + pf;

// read full log segments, so increase if necessary
uint64_t period = get_layout_period();
uint64_t period = layout.get_period();
uint64_t remainder = raw_target % period;
uint64_t adjustment = remainder ? period - remainder : 0;
uint64_t target = raw_target + adjustment;
Expand Down Expand Up @@ -1215,8 +1215,8 @@ void Journaler::erase(Context *completion)
lock_guard l(lock);

// Async delete the journal data
uint64_t first = trimmed_pos / get_layout_period();
uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
uint64_t first = trimmed_pos / layout.get_period();
uint64_t num = (write_pos - trimmed_pos) / layout.get_period() + 2;
filer.purge_range(ino, &layout, SnapContext(), first, num,
ceph::real_clock::now(), 0,
wrap_finisher(new C_EraseFinish(
Expand All @@ -1231,7 +1231,7 @@ void Journaler::erase(Context *completion)
void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
{
lock_guard l(lock);
if (is_stopping()) {
if (state == STATE_STOPPING) {
completion->complete(-EAGAIN);
return;
}
Expand Down Expand Up @@ -1309,7 +1309,7 @@ void Journaler::wait_for_readable(Context *onreadable)

void Journaler::_wait_for_readable(Context *onreadable)
{
if (is_stopping()) {
if (state == STATE_STOPPING) {
finisher->queue(onreadable, -EAGAIN);
return;
}
Expand Down Expand Up @@ -1354,11 +1354,11 @@ void Journaler::trim()

void Journaler::_trim()
{
if (is_stopping())
if (state == STATE_STOPPING)
return;

ceph_assert(!readonly);
uint64_t period = get_layout_period();
uint64_t period = layout.get_period();
uint64_t trim_to = last_committed.expire_pos;
trim_to -= trim_to % period;
ldout(cct, 10) << "trim last_commited head was " << last_committed
Expand Down Expand Up @@ -1633,8 +1633,8 @@ void Journaler::check_isreadable()
{
std::unique_lock l(lock);
while (!_is_readable() &&
get_read_pos() < get_write_pos() &&
!get_error()) {
read_pos < write_pos &&
!error) {
C_SaferCond readable_waiter;
_wait_for_readable(&readable_waiter);
l.unlock();
Expand Down
73 changes: 58 additions & 15 deletions src/osdc/Journaler.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ class Journaler {
private:
// me
CephContext *cct;
std::mutex lock;
mutable ceph::mutex lock;
const std::string name;
typedef std::lock_guard<std::mutex> lock_guard;
typedef std::unique_lock<std::mutex> unique_lock;
typedef std::lock_guard<ceph::mutex> lock_guard;
typedef std::unique_lock<ceph::mutex> unique_lock;
Finisher *finisher;
Header last_written;
inodeno_t ino;
Expand Down Expand Up @@ -408,7 +408,7 @@ class Journaler {
Journaler(const std::string &name_, inodeno_t ino_, int64_t pool,
const char *mag, Objecter *obj, PerfCounters *l, int lkey, Finisher *f) :
last_committed(mag),
cct(obj->cct), name(name_), finisher(f), last_written(mag),
cct(obj->cct), lock(ceph::make_mutex("Journaler::" + name_)), name(name_), finisher(f), last_written(mag),
ino(ino_), pg_pool(pool), readonly(true),
stream_format(-1), journal_stream(-1),
magic(mag),
Expand Down Expand Up @@ -528,24 +528,67 @@ class Journaler {

// Synchronous getters
// ===================
// TODO: need some locks on reads for true safety
uint64_t get_layout_period() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return layout.get_period();
}
file_layout_t& get_layout() { return layout; }
bool is_active() { return state == STATE_ACTIVE; }
bool is_stopping() { return state == STATE_STOPPING; }
int get_error() { return error; }
bool is_readonly() { return readonly; }
file_layout_t get_layout() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return layout;
}
bool is_active() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return state == STATE_ACTIVE;
}
bool is_stopping() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return state == STATE_STOPPING;
}
int get_error() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return error;
}
bool is_readonly() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return readonly;
}
bool is_readable();
bool _is_readable();
bool try_read_entry(bufferlist& bl);
uint64_t get_write_pos() const { return write_pos; }
uint64_t get_write_safe_pos() const { return safe_pos; }
uint64_t get_read_pos() const { return read_pos; }
uint64_t get_expire_pos() const { return expire_pos; }
uint64_t get_trimmed_pos() const { return trimmed_pos; }
uint64_t get_write_pos() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return write_pos;
}
uint64_t get_write_safe_pos() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return safe_pos;
}
uint64_t get_read_pos() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return read_pos;
}
uint64_t get_expire_pos() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return expire_pos;
}
uint64_t get_trimmed_pos() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return trimmed_pos;
}
size_t get_journal_envelope_size() const {
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
lock_guard l(lock);
return journal_stream.get_envelope_size();
}
void check_isreadable();
Expand Down

0 comments on commit 55652f0

Please sign in to comment.