Skip to content

Commit

Permalink
Merge PR ceph#37095 into master
Browse files Browse the repository at this point in the history
* refs/pull/37095/head:
	mds: make purge_queue delete objects asynchronously and keep accepting pushes

Reviewed-by: Patrick Donnelly <[email protected]>
  • Loading branch information
batrick committed Jan 13, 2021
2 parents 72c3b5e + 50aafa4 commit 8326c45
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 50 deletions.
157 changes: 107 additions & 50 deletions src/mds/PurgeQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ void PurgeItem::decode(bufferlist::const_iterator &p)
DECODE_FINISH(p);
}

// TODO: if Objecter has any slow requests, take that as a hint and
// slow down our rate of purging (keep accepting pushes though)
// if Objecter has any slow requests, take that as a hint and
// slow down our rate of purging
PurgeQueue::PurgeQueue(
CephContext *cct_,
mds_rank_t rank_,
Expand Down Expand Up @@ -493,6 +493,99 @@ bool PurgeQueue::_consume()
return could_consume;
}

class C_IO_PurgeItem_Commit : public Context {
public:
C_IO_PurgeItem_Commit(PurgeQueue *pq, std::vector<PurgeItemCommitOp> ops, uint64_t expire_to)
: purge_queue(pq), ops_vec(std::move(ops)), expire_to(expire_to) {
}

void finish(int r) override {
purge_queue->_commit_ops(r, ops_vec, expire_to);
}

private:
PurgeQueue *purge_queue;
std::vector<PurgeItemCommitOp> ops_vec;
uint64_t expire_to;
};

void PurgeQueue::_commit_ops(int r, const std::vector<PurgeItemCommitOp>& ops_vec, uint64_t expire_to)
{
if (r < 0) {
derr << " r = " << r << dendl;
return;
}

SnapContext nullsnapc;
C_GatherBuilder gather(cct);

for (auto &op : ops_vec) {
dout(10) << op.item.get_type_str() << dendl;
if (op.type == PurgeItemCommitOp::PURGE_OP_RANGE) {
uint64_t first_obj = 0, num_obj = 0;
uint64_t num = Striper::get_num_objects(op.item.layout, op.item.size);
num_obj = num;

if (op.item.action == PurgeItem::TRUNCATE_FILE) {
first_obj = 1;
if (num > 1)
num_obj = num - 1;
else
continue;
}

filer.purge_range(op.item.ino, &op.item.layout, op.item.snapc,
first_obj, num_obj, ceph::real_clock::now(), op.flags,
gather.new_sub());
} else if (op.type == PurgeItemCommitOp::PURGE_OP_REMOVE) {
if (op.item.action == PurgeItem::PURGE_DIR) {
objecter->remove(op.oid, op.oloc, nullsnapc,
ceph::real_clock::now(), op.flags,
gather.new_sub());
} else {
objecter->remove(op.oid, op.oloc, op.item.snapc,
ceph::real_clock::now(), op.flags,
gather.new_sub());
}
} else if (op.type == PurgeItemCommitOp::PURGE_OP_ZERO) {
filer.zero(op.item.ino, &op.item.layout, op.item.snapc,
0, op.item.layout.object_size, ceph::real_clock::now(), 0, true,
gather.new_sub());
} else {
derr << "Invalid purge op: " << op.type << dendl;
ceph_abort();
}
}

ceph_assert(gather.has_subs());

gather.set_finisher(new C_OnFinisher(
new LambdaContext([this, expire_to](int r) {
std::lock_guard l(lock);

if (r == -EBLOCKLISTED) {
finisher.queue(on_error, r);
on_error = nullptr;
return;
}

_execute_item_complete(expire_to);
_consume();

// Have we gone idle? If so, do an extra write_head now instead of
// waiting for next flush after journaler_write_head_interval.
// Also do this periodically even if not idle, so that the persisted
// expire_pos doesn't fall too far behind our progress when consuming
// a very long queue.
if (!readonly &&
(in_flight.empty() || journaler.write_head_needed())) {
journaler.write_head(nullptr);
}
}), &finisher));

gather.activate();
}

void PurgeQueue::_execute_item(
const PurgeItem &item,
uint64_t expire_to)
Expand All @@ -510,38 +603,34 @@ void PurgeQueue::_execute_item(
ops_high_water = std::max(ops_high_water, ops_in_flight);
logger->set(l_pq_executing_ops_high_water, ops_high_water);

SnapContext nullsnapc;
std::vector<PurgeItemCommitOp> ops_vec;
auto submit_ops = [&]() {
finisher.queue(new C_IO_PurgeItem_Commit(this, std::move(ops_vec), expire_to));
};

C_GatherBuilder gather(cct);
if (item.action == PurgeItem::PURGE_FILE) {
if (item.size > 0) {
uint64_t num = Striper::get_num_objects(item.layout, item.size);
dout(10) << " 0~" << item.size << " objects 0~" << num
<< " snapc " << item.snapc << " on " << item.ino << dendl;
filer.purge_range(item.ino, &item.layout, item.snapc,
0, num, ceph::real_clock::now(), 0,
gather.new_sub());
ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0);
}

// remove the backtrace object if it was not purged
object_t oid = CInode::get_object_name(item.ino, frag_t(), "");
if (!gather.has_subs() || !item.layout.pool_ns.empty()) {
if (ops_vec.empty() || !item.layout.pool_ns.empty()) {
object_locator_t oloc(item.layout.pool_id);
dout(10) << " remove backtrace object " << oid
<< " pool " << oloc.pool << " snapc " << item.snapc << dendl;
objecter->remove(oid, oloc, item.snapc,
ceph::real_clock::now(), 0,
gather.new_sub());
ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
}

// remove old backtrace objects
for (const auto &p : item.old_pools) {
object_locator_t oloc(p);
dout(10) << " remove backtrace object " << oid
<< " old pool " << p << " snapc " << item.snapc << dendl;
objecter->remove(oid, oloc, item.snapc,
ceph::real_clock::now(), 0,
gather.new_sub());
ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
}
} else if (item.action == PurgeItem::PURGE_DIR) {
object_locator_t oloc(metadata_pool);
Expand All @@ -552,9 +641,7 @@ void PurgeQueue::_execute_item(
for (const auto &leaf : leaves) {
object_t oid = CInode::get_object_name(item.ino, leaf, "");
dout(10) << " remove dirfrag " << oid << dendl;
objecter->remove(oid, oloc, nullsnapc,
ceph::real_clock::now(),
0, gather.new_sub());
ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
}
} else if (item.action == PurgeItem::TRUNCATE_FILE) {
const uint64_t num = Striper::get_num_objects(item.layout, item.size);
Expand All @@ -563,14 +650,9 @@ void PurgeQueue::_execute_item(

// keep backtrace object
if (num > 1) {
filer.purge_range(item.ino, &item.layout, item.snapc,
1, num - 1, ceph::real_clock::now(),
0, gather.new_sub());
ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0);
}
filer.zero(item.ino, &item.layout, item.snapc,
0, item.layout.object_size,
ceph::real_clock::now(),
0, true, gather.new_sub());
ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_ZERO, 0);
} else {
derr << "Invalid item (action=" << item.action << ") in purge queue, "
"dropping it" << dendl;
Expand All @@ -585,33 +667,8 @@ void PurgeQueue::_execute_item(
logger->set(l_pq_executing_high_water, files_high_water);
return;
}
ceph_assert(gather.has_subs());

gather.set_finisher(new C_OnFinisher(
new LambdaContext([this, expire_to](int r){
std::lock_guard l(lock);

if (r == -EBLOCKLISTED) {
finisher.queue(on_error, r);
on_error = nullptr;
return;
}

_execute_item_complete(expire_to);
_consume();

// Have we gone idle? If so, do an extra write_head now instead of
// waiting for next flush after journaler_write_head_interval.
// Also do this periodically even if not idle, so that the persisted
// expire_pos doesn't fall too far behind our progress when consuming
// a very long queue.
if (!readonly &&
(in_flight.empty() || journaler.write_head_needed())) {
journaler.write_head(nullptr);
}
}), &finisher));

gather.activate();
submit_ops();
}

void PurgeQueue::_execute_item_complete(
Expand Down
24 changes: 24 additions & 0 deletions src/mds/PurgeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,28 @@ enum {
l_pq_last
};

struct PurgeItemCommitOp {
public:
enum PurgeType : uint8_t {
PURGE_OP_RANGE = 0,
PURGE_OP_REMOVE = 1,
PURGE_OP_ZERO
};

PurgeItemCommitOp(PurgeItem _item, PurgeType _type, int _flags)
: item(_item), type(_type), flags(_flags) {}

PurgeItemCommitOp(PurgeItem _item, PurgeType _type, int _flags,
object_t _oid, object_locator_t _oloc)
: item(_item), type(_type), flags(_flags), oid(_oid), oloc(_oloc) {}

PurgeItem item;
PurgeType type;
int flags;
object_t oid;
object_locator_t oloc;
};

/**
* A persistent queue of PurgeItems. This class both writes and reads
* to the queue. There is one of these per MDS rank.
Expand Down Expand Up @@ -131,6 +153,8 @@ class PurgeQueue
// to the queue (there is no callback for when it is executed)
void push(const PurgeItem &pi, Context *completion);

void _commit_ops(int r, const std::vector<PurgeItemCommitOp>& ops_vec, uint64_t expire_to);

// If the on-disk queue is empty and we are not currently processing
// anything.
bool is_idle() const;
Expand Down

0 comments on commit 8326c45

Please sign in to comment.