Skip to content

Commit

Permalink
Merge pull request ceph#49116 from Matan-B/wip-matanb-c-balanced-reads
Browse files Browse the repository at this point in the history
crimson/osd: Support balance reads

Reviewed-by: Samuel Just <[email protected]>
  • Loading branch information
Matan-B authored Mar 7, 2023
2 parents f96dd0d + b1632db commit 5841654
Show file tree
Hide file tree
Showing 21 changed files with 374 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ tasks:
- workunit:
clients:
client.0:
- rbd/test_librbd.sh
- rbd/crimson/test_crimson_librbd.sh
35 changes: 35 additions & 0 deletions qa/workunits/rbd/crimson/test_crimson_librbd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/sh -e

if [ -n "${VALGRIND}" ]; then
valgrind ${VALGRIND} --suppressions=${TESTDIR}/valgrind.supp \
--error-exitcode=1 ceph_test_librbd
else
# Run test cases indivually to allow better selection
# of ongoing Crimson development.
# Disabled test groups are tracked here:
# https://tracker.ceph.com/issues/58791
ceph_test_librbd --gtest_filter='TestLibRBD.*'
ceph_test_librbd --gtest_filter='EncryptedFlattenTest/0.*'
ceph_test_librbd --gtest_filter='EncryptedFlattenTest/1.*'
ceph_test_librbd --gtest_filter='EncryptedFlattenTest/2.*'
ceph_test_librbd --gtest_filter='EncryptedFlattenTest/3.*'
ceph_test_librbd --gtest_filter='EncryptedFlattenTest/4.*'
ceph_test_librbd --gtest_filter='EncryptedFlattenTest/5.*'
ceph_test_librbd --gtest_filter='EncryptedFlattenTest/6.*'
ceph_test_librbd --gtest_filter='EncryptedFlattenTest/7.*'
# ceph_test_librbd --gtest_filter='DiffIterateTest/0.*'
# ceph_test_librbd --gtest_filter='DiffIterateTest/1.*'
ceph_test_librbd --gtest_filter='TestImageWatcher.*'
ceph_test_librbd --gtest_filter='TestInternal.*'
ceph_test_librbd --gtest_filter='TestMirroring.*'
# ceph_test_librbd --gtest_filter='TestDeepCopy.*'
ceph_test_librbd --gtest_filter='TestGroup.*'
# ceph_test_librbd --gtest_filter='TestMigration.*'
ceph_test_librbd --gtest_filter='TestMirroringWatcher.*'
ceph_test_librbd --gtest_filter='TestObjectMap.*'
ceph_test_librbd --gtest_filter='TestOperations.*'
ceph_test_librbd --gtest_filter='TestTrash.*'
ceph_test_librbd --gtest_filter='TestJournalEntries.*'
ceph_test_librbd --gtest_filter='TestJournalReplay.*'
fi
exit 0
27 changes: 27 additions & 0 deletions src/common/intrusive_lru.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,33 @@ class intrusive_lru {
}
}

/*
* Clears unreferenced elements from the lru set [from, to]
*/
void clear_range(
const K& from,
const K& to) {
auto from_iter = lru_set.lower_bound(from);
auto to_iter = lru_set.upper_bound(to);
for (auto i = from_iter; i != to_iter; ) {
if (!(*i).lru) {
unreferenced_list.erase(lru_list_t::s_iterator_to(*i));
i = lru_set.erase_and_dispose(i, [](auto *p)
{ delete p; } );
} else {
i++;
}
}
}

template <class F>
void for_each(F&& f) {
for (auto& v : lru_set) {
access(v);
f(TRef{static_cast<T*>(&v)});
}
}

/**
* Returns the TRef corresponding to k if it exists or
* nullptr otherwise.
Expand Down
26 changes: 14 additions & 12 deletions src/crimson/osd/object_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class ObjectContext : public ceph::common::intrusive_lru_base<
hobject_t, ObjectContext, obc_to_hoid<ObjectContext>>>
{
public:
Ref head; // Ref defined as part of ceph::common::intrusive_lru_base
ObjectState obs;
SnapSetContextRef ssc;
// the watch / notify machinery rather stays away from the hot and
Expand All @@ -81,21 +80,14 @@ class ObjectContext : public ceph::common::intrusive_lru_base<
return get_oid().is_head();
}

Ref get_head_obc() const {
return head;
}

hobject_t get_head_oid() const {
return get_oid().get_head();
}

const SnapSet &get_ro_ss() const {
if (is_head()) {
ceph_assert(ssc);
return ssc->snapset;
} else {
return head->get_ro_ss();
}
const SnapSet &get_head_ss() const {
ceph_assert(is_head());
ceph_assert(ssc);
return ssc->snapset;
}

void set_head_state(ObjectState &&_obs, SnapSetContextRef &&_ssc) {
Expand Down Expand Up @@ -263,6 +255,16 @@ class ObjectContextRegistry : public md_config_obs_t {
return obc_lru.get(hoid);
}

void clear_range(const hobject_t &from,
const hobject_t &to) {
obc_lru.clear_range(from, to);
}

template <class F>
void for_each(F&& f) {
obc_lru.for_each(std::forward<F>(f));
}

const char** get_tracked_conf_keys() const final;
void handle_conf_change(const crimson::common::ConfigProxy& conf,
const std::set <std::string> &changed) final;
Expand Down
49 changes: 46 additions & 3 deletions src/crimson/osd/object_context_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ using crimson::common::local_conf;
crimson::ct_error::enoent::make()
};
}
return this->with_clone_obc_only<State>(head,
return this->with_clone_obc_only<State>(std::move(head),
oid,
std::move(func));
});
Expand All @@ -60,7 +60,7 @@ using crimson::common::local_conf;
with_obc_func_t&& func)
{
LOG_PREFIX(ObjectContextLoader::with_clone_obc_only);
auto coid = resolve_oid(head->get_ro_ss(), oid);
auto coid = resolve_oid(head->get_head_ss(), oid);
if (!coid) {
ERRORDPP("clone {} not found", dpp, oid);
return load_obc_iertr::future<>{
Expand All @@ -73,14 +73,52 @@ using crimson::common::local_conf;
func=std::move(func), head=std::move(head), this]()
-> load_obc_iertr::future<> {
auto loaded = get_or_load_obc<State>(clone, existed);
clone->head = std::move(head);
return loaded.safe_then_interruptible(
[func = std::move(func)](auto clone) {
return std::move(func)(std::move(clone));
});
});
}

template<RWState::State State>
ObjectContextLoader::load_obc_iertr::future<>
ObjectContextLoader::with_head_and_clone_obc(
hobject_t oid,
with_both_obc_func_t&& func)
{
LOG_PREFIX(ObjectContextLoader::with_head_and_clone_obc);
assert(!oid.is_head());
return with_obc<RWState::RWREAD>(
oid.get_head(),
[FNAME, oid, func=std::move(func), this](auto head) mutable
-> load_obc_iertr::future<> {
if (!head->obs.exists) {
ERRORDPP("head doesn't exist for object {}", dpp, head->obs.oi.soid);
return load_obc_iertr::future<>{
crimson::ct_error::enoent::make()
};
}
auto coid = resolve_oid(head->get_head_ss(), oid);
if (!coid) {
ERRORDPP("clone {} not found", dpp, oid);
return load_obc_iertr::future<>{
crimson::ct_error::enoent::make()
};
}
auto [clone, existed] = obc_registry.get_cached_obc(*coid);
return clone->template with_lock<State, IOInterruptCondition>(
[existed=existed, clone=std::move(clone),
func=std::move(func), head=std::move(head), this]()
-> load_obc_iertr::future<> {
auto loaded = get_or_load_obc<State>(clone, existed);
return loaded.safe_then_interruptible(
[func = std::move(func), head=std::move(head)](auto clone) {
return std::move(func)(std::move(head), std::move(clone));
});
});
});
}

template<RWState::State State>
ObjectContextLoader::load_obc_iertr::future<>
ObjectContextLoader::with_obc(hobject_t oid,
Expand Down Expand Up @@ -187,4 +225,9 @@ using crimson::common::local_conf;
template ObjectContextLoader::load_obc_iertr::future<>
ObjectContextLoader::with_obc<RWState::RWEXCL>(hobject_t,
with_obc_func_t&&);

template ObjectContextLoader::load_obc_iertr::future<>
ObjectContextLoader::with_head_and_clone_obc<RWState::RWWRITE>(
hobject_t,
with_both_obc_func_t&&);
}
45 changes: 29 additions & 16 deletions src/crimson/osd/object_context_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,45 @@ class ObjectContextLoader {
using with_obc_func_t =
std::function<load_obc_iertr::future<> (ObjectContextRef)>;

using with_both_obc_func_t =
std::function<load_obc_iertr::future<> (ObjectContextRef, ObjectContextRef)>;

// Use this variant by default
template<RWState::State State>
load_obc_iertr::future<> with_obc(hobject_t oid,
with_obc_func_t&& func);

template<RWState::State State>
load_obc_iertr::future<> with_clone_obc(hobject_t oid,
with_obc_func_t&& func);

// Use this variant in the case where the head object
// obc is already locked. Avoid nesting
// with_head_obc() as in using with_clone_obc().
// obc is already locked and only the clone obc is needed.
// Avoid nesting with_head_obc() calls by using with_clone_obc()
// with an already locked head.
template<RWState::State State>
load_obc_iertr::future<> with_clone_obc_only(ObjectContextRef head,
hobject_t oid,
with_obc_func_t&& func);

// Use this variant in the case where both the head
// object *and* the matching clone object are being used
// in func.
template<RWState::State State>
load_obc_iertr::future<> with_head_and_clone_obc(
hobject_t oid,
with_both_obc_func_t&& func);

load_obc_iertr::future<> reload_obc(ObjectContext& obc) const;

void notify_on_change(bool is_primary);

private:
ObjectContextRegistry& obc_registry;
PGBackend& backend;
DoutPrefixProvider& dpp;
obc_accessing_list_t obc_set_accessing;

template<RWState::State State>
load_obc_iertr::future<> with_clone_obc(hobject_t oid,
with_obc_func_t&& func);

template<RWState::State State>
load_obc_iertr::future<> with_head_obc(ObjectContextRef obc,
bool existed,
Expand All @@ -60,15 +83,5 @@ class ObjectContextLoader {

load_obc_iertr::future<ObjectContextRef>
load_obc(ObjectContextRef obc);

load_obc_iertr::future<> reload_obc(ObjectContext& obc) const;

void notify_on_change(bool is_primary);

private:
ObjectContextRegistry& obc_registry;
PGBackend& backend;
DoutPrefixProvider& dpp;
obc_accessing_list_t obc_set_accessing;
};
}
1 change: 0 additions & 1 deletion src/crimson/osd/ops_executer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,6 @@ std::pair<object_info_t, ObjectContextRef> OpsExecuter::prepare_clone(
c_obc->obs.oi = static_snap_oi;
c_obc->obs.exists = true;
c_obc->ssc = obc->ssc;
c_obc->head = obc->head;
logger().debug("clone_obc: {}", c_obc->obs.oi);
clone_obc = std::move(c_obc);
}
Expand Down
36 changes: 24 additions & 12 deletions src/crimson/osd/osd_operations/client_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,13 @@ ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
*this
).then_interruptible(
[this, pg]() mutable {
return do_recover_missing(pg, m->get_hobj());
if (pg->is_primary()) {
return do_recover_missing(pg, m->get_hobj());
} else {
logger().debug("process_op: Skipping do_recover_missing"
"on non primary pg");
return interruptor::now();
}
}).then_interruptible([this, pg, &ihref]() mutable {
return pg->already_complete(m->get_reqid()).then_interruptible(
[this, pg, &ihref](auto completed) mutable
Expand Down Expand Up @@ -252,16 +258,6 @@ ClientRequest::do_process(
instance_handle_t &ihref,
Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
{
if (!pg->is_primary()) {
// primary can handle both normal ops and balanced reads
if (is_misdirected(*pg)) {
logger().trace("do_process: dropping misdirected op");
return seastar::now();
} else if (const hobject_t& hoid = m->get_hobj();
!pg->get_peering_state().can_serve_replica_read(hoid)) {
return reply_op_error(pg, -EAGAIN);
}
}
if (m->has_flag(CEPH_OSD_FLAG_PARALLELEXEC)) {
return reply_op_error(pg, -EINVAL);
}
Expand Down Expand Up @@ -293,6 +289,22 @@ ClientRequest::do_process(
return reply_op_error(pg, -ENOENT);
}

if (!pg->is_primary()) {
// primary can handle both normal ops and balanced reads
if (is_misdirected(*pg)) {
logger().trace("do_process: dropping misdirected op");
return seastar::now();
} else if (const hobject_t& hoid = m->get_hobj();
!pg->get_peering_state().can_serve_replica_read(hoid)) {
logger().debug("{}: unstable write on replica, "
"bouncing to primary",
__func__);
return reply_op_error(pg, -EAGAIN);
} else {
logger().debug("{}: serving replica read on oid {}",
__func__, m->get_hobj());
}
}
return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible(
[this, pg, &ihref](auto submitted, auto all_completed) mutable {
return submitted.then_interruptible([this, pg, &ihref] {
Expand Down Expand Up @@ -331,7 +343,7 @@ bool ClientRequest::is_misdirected(const PG& pg) const
return true;
}
// balanced reads; any replica will do
return pg.is_nonprimary();
return false;
}
// neither balanced nor localize reads
return true;
Expand Down
1 change: 1 addition & 0 deletions src/crimson/osd/osd_operations/client_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class ClientRequest final : public PhasedOperationT<ClientRequest>,

public:
class PGPipeline : public CommonPGPipeline {
public:
struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> {
static constexpr auto type_name = "ClientRequest::PGPipeline::await_map";
} await_map;
Expand Down
1 change: 1 addition & 0 deletions src/crimson/osd/osd_operations/client_request_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ CommonClientRequest::do_recover_missing(
Ref<PG>& pg, const hobject_t& soid)
{
eversion_t ver;
assert(pg->is_primary());
logger().debug("{} check for recovery, {}", __func__, soid);
if (!pg->is_unreadable_object(soid, &ver) &&
!pg->is_degraded_or_backfilling_object(soid)) {
Expand Down
1 change: 1 addition & 0 deletions src/crimson/osd/osd_operations/internal_client_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ InternalClientRequest::InternalClientRequest(Ref<PG> pg)
: pg(std::move(pg))
{
assert(bool(this->pg));
assert(this->pg->is_primary());
}

InternalClientRequest::~InternalClientRequest()
Expand Down
18 changes: 15 additions & 3 deletions src/crimson/osd/osd_operations/replicated_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,23 @@ seastar::future<> RepRequest::with_pg(
ShardServices &shard_services, Ref<PG> pg)
{
logger().debug("{}: RepRequest::with_pg", *this);

IRef ref = this;
return interruptor::with_interruption([this, pg] {
return pg->handle_rep_op(req);
}, [ref](std::exception_ptr) { return seastar::now(); }, pg);
logger().debug("{}: pg present", *this);
return this->template enter_stage<interruptor>(pp(*pg).await_map
).then_interruptible([this, pg] {
return this->template with_blocking_event<
PG_OSDMapGate::OSDMapBlocker::BlockingEvent
>([this, pg](auto &&trigger) {
return pg->osdmap_gate.wait_for_map(
std::move(trigger), req->min_epoch);
});
}).then_interruptible([this, pg] (auto) {
return pg->handle_rep_op(req);
});
}, [ref](std::exception_ptr) {
return seastar::now();
}, pg);
}

}
Loading

0 comments on commit 5841654

Please sign in to comment.