Skip to content

Commit

Permalink
os/bluestore: Make OpSequencer has own deferred_lock.
Browse files Browse the repository at this point in the history
For 4k randwrite workload & Using perf script futex-contention in
60second, i found:
>bstore_mempool[69278] lock 7fdc0131f3b8 contended 5 times, 2043 avg ns
>bstore_kv_final[69277] lock 7fdc0131f3b8 contended 41835 times, 1285 avg ns
>bstore_aio[69226] lock 7fdc0131f3b8 contended 62294 times, 1217 avg ns

deferred_lock has serious contented. This becasue deferred_lock is a
globla lock. So i add defferred_lock in OpSequencer and make global
deferred_lock only protect deferred_queue.

After this change:
>bstore_kv_final[75810] lock 7f72f931f3b8 contended 1074 times, 1692 avg ns
>bstore_aio[75759] lock 7f72f931f3b8 contended 1491 times, 1133 avg ns

Signed-off-by: Jianpeng Ma <[email protected]>
  • Loading branch information
majianpeng committed Dec 16, 2020
1 parent 3519603 commit abda218
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 34 deletions.
95 changes: 62 additions & 33 deletions src/os/bluestore/BlueStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11569,11 +11569,11 @@ void BlueStore::_osr_drain_preceding(TransContext *txc)
++deferred_aggressive; // FIXME: maybe osr-local aggressive flag?
{
// submit anything pending
deferred_lock.lock();
osr->deferred_lock.lock();
if (osr->deferred_pending && !osr->deferred_running) {
_deferred_submit_unlock(osr);
} else {
deferred_lock.unlock();
osr->deferred_lock.unlock();
}
}
{
Expand All @@ -11595,11 +11595,11 @@ void BlueStore::_osr_drain(OpSequencer *osr)
++deferred_aggressive; // FIXME: maybe osr-local aggressive flag?
{
// submit anything pending
deferred_lock.lock();
osr->deferred_lock.lock();
if (osr->deferred_pending && !osr->deferred_running) {
_deferred_submit_unlock(osr);
} else {
deferred_lock.unlock();
osr->deferred_lock.unlock();
}
}
{
Expand Down Expand Up @@ -12131,59 +12131,84 @@ bluestore_deferred_op_t *BlueStore::_get_deferred_op(
void BlueStore::_deferred_queue(TransContext *txc)
{
dout(20) << __func__ << " txc " << txc << " osr " << txc->osr << dendl;
deferred_lock.lock();
if (!txc->osr->deferred_pending &&
!txc->osr->deferred_running) {
deferred_queue.push_back(*txc->osr);
}
if (!txc->osr->deferred_pending) {
txc->osr->deferred_pending = new DeferredBatch(cct, txc->osr.get());

DeferredBatch *tmp;
{
txc->osr->deferred_lock.lock();
if (!txc->osr->deferred_pending) {
tmp = new DeferredBatch(cct, txc->osr.get());
} else {
tmp = txc->osr->deferred_pending;
txc->osr->deferred_pending = nullptr;
}
txc->osr->deferred_lock.unlock();
}
++deferred_queue_size;
txc->osr->deferred_pending->txcs.push_back(*txc);

tmp->txcs.push_back(*txc);
bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
for (auto opi = wt.ops.begin(); opi != wt.ops.end(); ++opi) {
const auto& op = *opi;
ceph_assert(op.op == bluestore_deferred_op_t::OP_WRITE);
bufferlist::const_iterator p = op.data.begin();
for (auto e : op.extents) {
txc->osr->deferred_pending->prepare_write(
cct, wt.seq, e.offset, e.length, p);
tmp->prepare_write(cct, wt.seq, e.offset, e.length, p);
}
}
if (deferred_aggressive &&
!txc->osr->deferred_running) {
_deferred_submit_unlock(txc->osr.get());
} else {
deferred_lock.unlock();

{
txc->osr->deferred_lock.lock();
++deferred_queue_size;
txc->osr->deferred_pending = tmp;
if (!txc->osr->deferred_running && (tmp->txcs.size() == 1)) {
deferred_lock.lock();
deferred_queue.push_back(*txc->osr);
deferred_lock.unlock();
}

if (deferred_aggressive &&
!txc->osr->deferred_running) {
_deferred_submit_unlock(txc->osr.get());
} else {
txc->osr->deferred_lock.unlock();
}
}
}

}

void BlueStore::deferred_try_submit()
{
dout(20) << __func__ << " " << deferred_queue.size() << " osrs, "
<< deferred_queue_size << " txcs" << dendl;
std::lock_guard l(deferred_lock);
vector<OpSequencerRef> osrs;
osrs.reserve(deferred_queue.size());
for (auto& osr : deferred_queue) {
osrs.push_back(&osr);

{
std::lock_guard l(deferred_lock);
osrs.reserve(deferred_queue.size());
for (auto& osr : deferred_queue) {
osrs.push_back(&osr);
}
}

for (auto& osr : osrs) {
osr->deferred_lock.lock();
if (osr->deferred_pending) {
if (!osr->deferred_running) {
_deferred_submit_unlock(osr.get());
deferred_lock.lock();
} else {
osr->deferred_lock.unlock();
dout(20) << __func__ << " osr " << osr << " already has running"
<< dendl;
}
} else {
osr->deferred_lock.unlock();
dout(20) << __func__ << " osr " << osr << " has no pending" << dendl;
}
}

deferred_last_submitted = ceph_clock_now();
{
std::lock_guard l(deferred_lock);
deferred_last_submitted = ceph_clock_now();
}
}

void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
Expand All @@ -12201,7 +12226,7 @@ void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
osr->deferred_running = osr->deferred_pending;
osr->deferred_pending = nullptr;

deferred_lock.unlock();
osr->deferred_lock.unlock();

for (auto& txc : b->txcs) {
throttle.log_state_latency(txc, logger, l_bluestore_state_deferred_queued_lat);
Expand Down Expand Up @@ -12258,16 +12283,20 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr)
DeferredBatch *b = osr->deferred_running;

{
deferred_lock.lock();
osr->deferred_lock.lock();
ceph_assert(osr->deferred_running == b);
osr->deferred_running = nullptr;
if (!osr->deferred_pending) {
dout(20) << __func__ << " dequeueing" << dendl;
auto q = deferred_queue.iterator_to(*osr);
deferred_queue.erase(q);
deferred_lock.unlock();
{
deferred_lock.lock();
auto q = deferred_queue.iterator_to(*osr);
deferred_queue.erase(q);
deferred_lock.unlock();
}
osr->deferred_lock.unlock();
} else {
deferred_lock.unlock();
osr->deferred_lock.unlock();
if (deferred_aggressive) {
dout(20) << __func__ << " queuing async deferred_try_submit" << dendl;
finisher.queue(new C_DeferredTrySubmit(this));
Expand Down
4 changes: 3 additions & 1 deletion src/os/bluestore/BlueStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,8 @@ class BlueStore : public ObjectStore,
DeferredBatch *deferred_running = nullptr;
DeferredBatch *deferred_pending = nullptr;

ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::OpSequencer::deferred_lock");

BlueStore *store;
coll_t cid;

Expand Down Expand Up @@ -2095,7 +2097,7 @@ class BlueStore : public ObjectStore,
ceph::make_mutex("BlueStore::atomic_alloc_and_submit_lock");
std::atomic<uint64_t> deferred_seq = {0};
deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
int deferred_queue_size = 0; ///< num txc's queued across all osrs
std::atomic_int deferred_queue_size = {0}; ///< num txc's queued across all osrs
std::atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
Finisher finisher;
utime_t deferred_last_submitted = utime_t();
Expand Down

0 comments on commit abda218

Please sign in to comment.