diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index b30a34a5ef5d5..745c04df64394 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -579,8 +579,9 @@ template void Journal::handle_replay_complete(int r) { CephContext *cct = m_image_ctx.cct; - Mutex::Locker locker(m_lock); + m_lock.Lock(); if (m_state != STATE_REPLAYING) { + m_lock.Unlock(); return; } @@ -588,11 +589,13 @@ void Journal::handle_replay_complete(int r) { m_journaler->stop_replay(); if (r < 0) { transition_state(STATE_FLUSHING_RESTART, r); + m_lock.Unlock(); m_journal_replay->flush(create_context_callback< Journal, &Journal::handle_flushing_restart>(this)); } else { transition_state(STATE_FLUSHING_REPLAY, 0); + m_lock.Unlock(); m_journal_replay->flush(create_context_callback< Journal, &Journal::handle_flushing_replay>(this)); diff --git a/src/librbd/journal/Replay.cc b/src/librbd/journal/Replay.cc index 95a412f519d7f..bd645e09e421c 100644 --- a/src/librbd/journal/Replay.cc +++ b/src/librbd/journal/Replay.cc @@ -38,7 +38,7 @@ Replay::~Replay() { assert(m_in_flight_aio == 0); assert(m_aio_modify_unsafe_contexts.empty()); assert(m_aio_modify_safe_contexts.empty()); - assert(m_op_contexts.empty()); + assert(m_op_events.empty()); } template @@ -56,7 +56,6 @@ void Replay::process(bufferlist::iterator *it, Context *on_ready, return; } - Mutex::Locker locker(m_lock); RWLock::RLocker owner_lock(m_image_ctx.owner_lock); boost::apply_visitor(EventVisitor(this, on_ready, on_safe), event_entry.event); @@ -67,22 +66,35 @@ void Replay::flush(Context *on_finish) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << dendl; + OpTids cancel_op_tids; on_finish = util::create_async_context_callback( m_image_ctx, on_finish); + { + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); Mutex::Locker locker(m_lock); - assert(m_flush_ctx == nullptr); - m_flush_ctx = on_finish; + // safely commit any remaining AIO modify operations if (m_in_flight_aio != 0) { flush_aio(); } - if (!m_op_contexts.empty() || m_in_flight_aio != 0) { - return; + for (auto &op_event_pair : m_op_events) { + cancel_op_tids.push_back(op_event_pair.first); + } + + assert(m_flush_ctx == nullptr); + if (!m_op_events.empty() || m_in_flight_aio != 0) { + std::swap(m_flush_ctx, on_finish); } } - on_finish->complete(0); + + for (auto op_tid : cancel_op_tids) { + handle_op_complete(op_tid, -ERESTART); + } + if (on_finish != nullptr) { + on_finish->complete(0); + } } template @@ -97,6 +109,7 @@ void Replay::handle_event(const journal::AioDiscardEvent &event, AioImageRequest::aio_discard(&m_image_ctx, aio_comp, event.offset, event.length); if (flush_required) { + Mutex::Locker locker(m_lock); flush_aio(); } } @@ -114,6 +127,7 @@ void Replay::handle_event(const journal::AioWriteEvent &event, AioImageRequest::aio_write(&m_image_ctx, aio_comp, event.offset, event.length, data.c_str(), 0); if (flush_required) { + Mutex::Locker locker(m_lock); flush_aio(); } } @@ -124,7 +138,11 @@ void Replay::handle_event(const journal::AioFlushEvent &event, CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": AIO flush event" << dendl; - AioCompletion *aio_comp = create_aio_flush_completion(on_ready, on_safe); + AioCompletion *aio_comp; + { + Mutex::Locker locker(m_lock); + aio_comp = create_aio_flush_completion(on_ready, on_safe); + } AioImageRequest::aio_flush(&m_image_ctx, aio_comp); } @@ -132,7 +150,40 @@ template void Replay::handle_event(const journal::OpFinishEvent &event, Context *on_ready, Context *on_safe) { CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << this << " " << __func__ << ": Op finish event" << dendl; + ldout(cct, 20) << this << " " << __func__ << ": Op finish event: " + << "op_tid=" << event.op_tid << dendl; + + Context *on_op_finish_event = nullptr; + { + Mutex::Locker locker(m_lock); + auto op_it = m_op_events.find(event.op_tid); + if (op_it == m_op_events.end()) { + ldout(cct, 10) << "unable to locate associated op: assuming previously " + << "committed." << dendl; + on_ready->complete(0); + m_image_ctx.op_work_queue->queue(on_safe, 0); + return; + } + + OpEvent &op_event = op_it->second; + assert(op_event.on_finish_safe == nullptr); + op_event.on_finish_ready = on_ready; + op_event.on_finish_safe = on_safe; + std::swap(on_op_finish_event, op_event.on_op_finish_event); + } + + if (event.r < 0) { + // TODO handle snap create / resize + + // journal recorded failure of op -- no-op the operation + delete on_op_finish_event; + handle_op_complete(event.op_tid, 0); + return; + } + + // apply the op now -- each op is responsible for filtering the + // recorded result to know if the op completed successfully + on_op_finish_event->complete(0); } template @@ -142,8 +193,11 @@ void Replay::handle_event(const journal::SnapCreateEvent &event, ldout(cct, 20) << this << " " << __func__ << ": Snap create event" << dendl; // TODO not-ready until state machine lets us know - Context *on_finish = create_op_context_callback(on_safe); - m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_finish); + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe, + &op_event); + + m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_op_complete); } template @@ -152,8 +206,16 @@ void Replay::handle_event(const journal::SnapRemoveEvent &event, CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Snap remove event" << dendl; - Context *on_finish = create_op_context_callback(on_safe); - m_image_ctx.operations->snap_remove(event.snap_name.c_str(), on_finish); + Mutex::Locker locker(m_lock); + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe, + &op_event); + op_event->on_op_finish_event = new FunctionContext( + [this, event, on_op_complete](int r) { + m_image_ctx.operations->snap_remove(event.snap_name.c_str(), + on_op_complete); + }); + on_ready->complete(0); } @@ -163,9 +225,17 @@ void Replay::handle_event(const journal::SnapRenameEvent &event, CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Snap rename event" << dendl; - Context *on_finish = create_op_context_callback(on_safe); - m_image_ctx.operations->snap_rename(event.snap_id, event.snap_name.c_str(), - on_finish); + Mutex::Locker locker(m_lock); + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe, + &op_event); + op_event->on_op_finish_event = new FunctionContext( + [this, event, on_op_complete](int r) { + m_image_ctx.operations->snap_rename(event.snap_id, + event.snap_name.c_str(), + on_op_complete); + }); + on_ready->complete(0); } @@ -175,8 +245,16 @@ void Replay::handle_event(const journal::SnapProtectEvent &event, CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Snap protect event" << dendl; - Context *on_finish = create_op_context_callback(on_safe); - m_image_ctx.operations->snap_protect(event.snap_name.c_str(), on_finish); + Mutex::Locker locker(m_lock); + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe, + &op_event); + op_event->on_op_finish_event = new FunctionContext( + [this, event, on_op_complete](int r) { + m_image_ctx.operations->snap_protect(event.snap_name.c_str(), + on_op_complete); + }); + on_ready->complete(0); } @@ -187,8 +265,16 @@ void Replay::handle_event(const journal::SnapUnprotectEvent &event, ldout(cct, 20) << this << " " << __func__ << ": Snap unprotect event" << dendl; - Context *on_finish = create_op_context_callback(on_safe); - m_image_ctx.operations->snap_unprotect(event.snap_name.c_str(), on_finish); + Mutex::Locker locker(m_lock); + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe, + &op_event); + op_event->on_op_finish_event = new FunctionContext( + [this, event, on_op_complete](int r) { + m_image_ctx.operations->snap_unprotect(event.snap_name.c_str(), + on_op_complete); + }); + on_ready->complete(0); } @@ -199,9 +285,17 @@ void Replay::handle_event(const journal::SnapRollbackEvent &event, ldout(cct, 20) << this << " " << __func__ << ": Snap rollback start event" << dendl; - Context *on_finish = create_op_context_callback(on_safe); - m_image_ctx.operations->snap_rollback(event.snap_name.c_str(), - no_op_progress_callback, on_finish); + Mutex::Locker locker(m_lock); + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe, + &op_event); + op_event->on_op_finish_event = new FunctionContext( + [this, event, on_op_complete](int r) { + m_image_ctx.operations->snap_rollback(event.snap_name.c_str(), + no_op_progress_callback, + on_op_complete); + }); + on_ready->complete(0); } @@ -211,8 +305,15 @@ void Replay::handle_event(const journal::RenameEvent &event, CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Rename event" << dendl; - Context *on_finish = create_op_context_callback(on_safe); - m_image_ctx.operations->rename(event.image_name.c_str(), on_finish); + Mutex::Locker locker(m_lock); + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe, + &op_event); + op_event->on_op_finish_event = new FunctionContext( + [this, event, on_op_complete](int r) { + m_image_ctx.operations->rename(event.image_name.c_str(), on_op_complete); + }); + on_ready->complete(0); } @@ -223,9 +324,12 @@ void Replay::handle_event(const journal::ResizeEvent &event, ldout(cct, 20) << this << " " << __func__ << ": Resize start event" << dendl; // TODO not-ready until state machine lets us know - Context *on_finish = create_op_context_callback(on_safe); + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe, + &op_event); + m_image_ctx.operations->resize(event.size, no_op_progress_callback, - on_finish); + on_op_complete); } template @@ -234,8 +338,15 @@ void Replay::handle_event(const journal::FlattenEvent &event, CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": Flatten start event" << dendl; - Context *on_finish = create_op_context_callback(on_safe); - m_image_ctx.operations->flatten(no_op_progress_callback, on_finish); + Mutex::Locker locker(m_lock); + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe, + &op_event); + op_event->on_op_finish_event = new FunctionContext( + [this, event, on_op_complete](int r) { + m_image_ctx.operations->flatten(no_op_progress_callback, on_op_complete); + }); + on_ready->complete(0); } @@ -291,7 +402,7 @@ void Replay::handle_aio_flush_complete(Context *on_flush_safe, m_in_flight_aio -= on_safe_ctxs.size(); std::swap(on_aio_ready, m_on_aio_ready); - if (m_op_contexts.empty() && m_in_flight_aio == 0) { + if (m_op_events.empty() && m_in_flight_aio == 0) { on_flush = m_flush_ctx; } @@ -321,34 +432,53 @@ void Replay::handle_aio_flush_complete(Context *on_flush_safe, } template -Context *Replay::create_op_context_callback(Context *on_safe) { +Context *Replay::create_op_context_callback(uint64_t op_tid, + Context *on_safe, + OpEvent **op_event) { assert(m_lock.is_locked()); - C_OpOnFinish *on_finish; - { - on_finish = new C_OpOnFinish(this); - m_op_contexts[on_finish] = on_safe; - } - return on_finish; + *op_event = &m_op_events[op_tid]; + (*op_event)->on_start_safe = on_safe; + return new C_OpOnComplete(this, op_tid); } template -void Replay::handle_op_context_callback(Context *on_op_finish, int r) { - Context *on_safe = nullptr; +void Replay::handle_op_complete(uint64_t op_tid, int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": op_tid=" << op_tid << ", " + << "r=" << r << dendl; + + OpEvent op_event; Context *on_flush = nullptr; { Mutex::Locker locker(m_lock); - auto it = m_op_contexts.find(on_op_finish); - assert(it != m_op_contexts.end()); + auto op_it = m_op_events.find(op_tid); + assert(op_it != m_op_events.end()); + + op_event = std::move(op_it->second); + m_op_events.erase(op_it); - on_safe = it->second; - m_op_contexts.erase(it); - if (m_op_contexts.empty() && m_in_flight_aio == 0) { + // TODO handle paused snap create / resize + + if (m_op_events.empty() && m_in_flight_aio == 0) { on_flush = m_flush_ctx; } } - on_safe->complete(r); + assert((op_event.on_finish_ready != nullptr && + op_event.on_finish_safe != nullptr) || r == -ERESTART); + + // skipped upon error -- so clean up if non-null + delete op_event.on_op_finish_event; + + if (op_event.on_finish_ready != nullptr) { + op_event.on_finish_ready->complete(0); + } + + op_event.on_start_safe->complete(r); + if (op_event.on_finish_safe != nullptr) { + op_event.on_finish_safe->complete(r); + } if (on_flush != nullptr) { on_flush->complete(0); } @@ -358,8 +488,8 @@ template AioCompletion *Replay::create_aio_modify_completion(Context *on_ready, Context *on_safe, bool *flush_required) { + Mutex::Locker locker(m_lock); CephContext *cct = m_image_ctx.cct; - assert(m_lock.is_locked()); assert(m_on_aio_ready == nullptr); ++m_in_flight_aio; @@ -400,6 +530,8 @@ AioCompletion *Replay::create_aio_modify_completion(Context *on_ready, template AioCompletion *Replay::create_aio_flush_completion(Context *on_ready, Context *on_safe) { + assert(m_lock.is_locked()); + // associate all prior write/discard ops to this flush request AioCompletion *aio_comp = AioCompletion::create( new C_AioFlushComplete(this, on_safe, diff --git a/src/librbd/journal/Replay.h b/src/librbd/journal/Replay.h index c71157da04dc0..19956c9178edc 100644 --- a/src/librbd/journal/Replay.h +++ b/src/librbd/journal/Replay.h @@ -7,13 +7,14 @@ #include "include/int_types.h" #include "include/buffer_fwd.h" #include "include/Context.h" -#include "include/unordered_set.h" -#include "include/unordered_map.h" #include "include/rbd/librbd.hpp" #include "common/Mutex.h" #include "librbd/journal/Entries.h" #include #include +#include +#include +#include namespace librbd { @@ -36,16 +37,26 @@ class Replay { void flush(Context *on_finish); private: + struct OpEvent { + Context *on_op_finish_event = nullptr; + Context *on_start_safe = nullptr; + Context *on_finish_ready = nullptr; + Context *on_finish_safe = nullptr; + }; + + typedef std::list OpTids; typedef std::list Contexts; - typedef ceph::unordered_set ContextSet; - typedef ceph::unordered_map OpContexts; + typedef std::unordered_set ContextSet; + typedef std::unordered_map OpEvents; - struct C_OpOnFinish : public Context { + struct C_OpOnComplete : public Context { Replay *replay; - C_OpOnFinish(Replay *replay) : replay(replay) { + uint64_t op_tid; + C_OpOnComplete(Replay *replay, uint64_t op_tid) + : replay(replay), op_tid(op_tid) { } virtual void finish(int r) override { - replay->handle_op_context_callback(this, r); + replay->handle_op_complete(op_tid, r); } }; @@ -97,7 +108,7 @@ class Replay { Contexts m_aio_modify_unsafe_contexts; ContextSet m_aio_modify_safe_contexts; - OpContexts m_op_contexts; + OpEvents m_op_events; Context *m_flush_ctx = nullptr; Context *m_on_aio_ready = nullptr; @@ -136,8 +147,9 @@ class Replay { void handle_aio_flush_complete(Context *on_flush_safe, Contexts &on_safe_ctxs, int r); - Context *create_op_context_callback(Context *on_safe); - void handle_op_context_callback(Context *on_op_finish, int r); + Context *create_op_context_callback(uint64_t op_tid, Context *on_safe, + OpEvent **op_event); + void handle_op_complete(uint64_t op_tid, int r); AioCompletion *create_aio_modify_completion(Context *on_ready, Context *on_safe,