Skip to content

Commit

Permalink
Merge pull request ceph#60304 from cbodley/wip-async-completion-recyc…
Browse files Browse the repository at this point in the history
…ling

common/async: Completion uses asio::recycling_allocator by default

Reviewed-by: Adam Emerson <[email protected]>
  • Loading branch information
cbodley authored Dec 6, 2024
2 parents 0ee82db + 843daa9 commit 4603b3e
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 84 deletions.
29 changes: 14 additions & 15 deletions src/common/async/bind_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
#define CEPH_ASYNC_BIND_HANDLER_H

#include <tuple>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/associator.hpp>

namespace ceph::async {

Expand Down Expand Up @@ -52,25 +51,25 @@ struct CompletionHandler {
void operator()() && {
std::apply(std::move(handler), std::move(args));
}

using allocator_type = boost::asio::associated_allocator_t<Handler>;
allocator_type get_allocator() const noexcept {
return boost::asio::get_associated_allocator(handler);
}
};

} // namespace ceph::async

namespace boost::asio {

// specialize boost::asio::associated_executor<> for CompletionHandler
template <typename Handler, typename Tuple, typename Executor>
struct associated_executor<ceph::async::CompletionHandler<Handler, Tuple>, Executor> {
using type = boost::asio::associated_executor_t<Handler, Executor>;

static type get(const ceph::async::CompletionHandler<Handler, Tuple>& handler,
const Executor& ex = Executor()) noexcept {
return boost::asio::get_associated_executor(handler.handler, ex);
// forward the handler's associated executor, allocator, cancellation slot, etc
template <template <typename, typename> class Associator,
typename Handler, typename Tuple, typename DefaultCandidate>
struct associator<Associator,
ceph::async::CompletionHandler<Handler, Tuple>, DefaultCandidate>
: Associator<Handler, DefaultCandidate>
{
static auto get(const ceph::async::CompletionHandler<Handler, Tuple>& h) noexcept {
return Associator<Handler, DefaultCandidate>::get(h.handler);
}
static auto get(const ceph::async::CompletionHandler<Handler, Tuple>& h,
const DefaultCandidate& c) noexcept {
return Associator<Handler, DefaultCandidate>::get(h.handler, c);
}
};

Expand Down
39 changes: 0 additions & 39 deletions src/common/async/bind_like.h

This file was deleted.

16 changes: 9 additions & 7 deletions src/common/async/completion.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <boost/asio/defer.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/recycling_allocator.hpp>
#include <boost/asio/post.hpp>

#include "bind_handler.h"
Expand Down Expand Up @@ -173,7 +174,8 @@ class CompletionImpl final : public Completion<void(Args...), T> {
Handler handler;

// use Handler's associated allocator
using Alloc2 = boost::asio::associated_allocator_t<Handler>;
using DefaultAlloc = boost::asio::recycling_allocator<void>;
using Alloc2 = boost::asio::associated_allocator_t<Handler, DefaultAlloc>;
using Traits2 = std::allocator_traits<Alloc2>;
using RebindAlloc2 = typename Traits2::template rebind_alloc<CompletionImpl>;
using RebindTraits2 = std::allocator_traits<RebindAlloc2>;
Expand All @@ -196,16 +198,16 @@ class CompletionImpl final : public Completion<void(Args...), T> {
void destroy_defer(std::tuple<Args...>&& args) override {
auto w = std::move(work);
auto ex2 = w.second.get_executor();
RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler, DefaultAlloc{});
auto f = bind_and_forward(ex2, std::move(handler), std::move(args));
RebindTraits2::destroy(alloc2, this);
RebindTraits2::deallocate(alloc2, this, 1);
boost::asio::defer(boost::asio::bind_executor(ex2, std::move(f)));
boost::asio::defer(std::move(f));
}
void destroy_dispatch(std::tuple<Args...>&& args) override {
auto w = std::move(work);
auto ex2 = w.second.get_executor();
RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler, DefaultAlloc{});
auto f = bind_and_forward(ex2, std::move(handler), std::move(args));
RebindTraits2::destroy(alloc2, this);
RebindTraits2::deallocate(alloc2, this, 1);
Expand All @@ -214,14 +216,14 @@ class CompletionImpl final : public Completion<void(Args...), T> {
void destroy_post(std::tuple<Args...>&& args) override {
auto w = std::move(work);
auto ex2 = w.second.get_executor();
RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler, DefaultAlloc{});
auto f = bind_and_forward(ex2, std::move(handler), std::move(args));
RebindTraits2::destroy(alloc2, this);
RebindTraits2::deallocate(alloc2, this, 1);
boost::asio::post(std::move(f));
}
void destroy() override {
RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler);
RebindAlloc2 alloc2 = boost::asio::get_associated_allocator(handler, DefaultAlloc{});
RebindTraits2::destroy(alloc2, this);
RebindTraits2::deallocate(alloc2, this, 1);
}
Expand All @@ -238,7 +240,7 @@ class CompletionImpl final : public Completion<void(Args...), T> {
public:
template <typename ...TArgs>
static auto create(const Executor1& ex, Handler&& handler, TArgs&& ...args) {
auto alloc2 = boost::asio::get_associated_allocator(handler);
auto alloc2 = boost::asio::get_associated_allocator(handler, DefaultAlloc{});
using Ptr = std::unique_ptr<CompletionImpl>;
return Ptr{new (alloc2) CompletionImpl(ex, std::move(handler),
std::forward<TArgs>(args)...)};
Expand Down
13 changes: 5 additions & 8 deletions src/common/async/detail/shared_mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <optional>
#include <shared_mutex> // for std::shared_lock

#include <boost/asio/append.hpp>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/intrusive/list.hpp>
Expand Down Expand Up @@ -134,10 +135,8 @@ auto SharedMutexImpl::async_lock(Mutex& mtx, CompletionToken&& token)
state = Exclusive;

// post a successful completion
auto ex2 = boost::asio::get_associated_executor(handler, ex1);
auto h = boost::asio::bind_executor(ex2, std::move(handler));
boost::asio::post(bind_handler(std::move(h), ec,
std::unique_lock{mtx, std::adopt_lock}));
boost::asio::post(ex1, boost::asio::append(std::move(handler),
ec, std::unique_lock{mtx, std::adopt_lock}));
} else {
// create a request and add it to the exclusive list
using LockCompletion = typename Request::LockCompletion;
Expand Down Expand Up @@ -224,10 +223,8 @@ auto SharedMutexImpl::async_lock_shared(Mutex& mtx, CompletionToken&& token)
if (exclusive_queue.empty() && state < MaxShared) {
state++;

auto ex2 = boost::asio::get_associated_executor(handler, ex1);
auto h = boost::asio::bind_executor(ex2, std::move(handler));
boost::asio::post(bind_handler(std::move(h), ec,
std::shared_lock{mtx, std::adopt_lock}));
boost::asio::post(ex1, boost::asio::append(std::move(handler),
ec, std::shared_lock{mtx, std::adopt_lock}));
} else {
using LockCompletion = typename Request::LockCompletion;
auto request = LockCompletion::create(ex1, std::move(handler), mtx);
Expand Down
29 changes: 14 additions & 15 deletions src/common/async/forward_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
#ifndef CEPH_ASYNC_FORWARD_HANDLER_H
#define CEPH_ASYNC_FORWARD_HANDLER_H

#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/associator.hpp>

namespace ceph::async {

Expand Down Expand Up @@ -47,25 +46,25 @@ struct ForwardingHandler {
void operator()(Args&& ...args) {
std::move(handler)(std::forward<Args>(args)...);
}

using allocator_type = boost::asio::associated_allocator_t<Handler>;
allocator_type get_allocator() const noexcept {
return boost::asio::get_associated_allocator(handler);
}
};

} // namespace ceph::async

namespace boost::asio {

// specialize boost::asio::associated_executor<> for ForwardingHandler
template <typename Handler, typename Executor>
struct associated_executor<ceph::async::ForwardingHandler<Handler>, Executor> {
using type = boost::asio::associated_executor_t<Handler, Executor>;

static type get(const ceph::async::ForwardingHandler<Handler>& handler,
const Executor& ex = Executor()) noexcept {
return boost::asio::get_associated_executor(handler.handler, ex);
// forward the handler's associated executor, allocator, cancellation slot, etc
template <template <typename, typename> class Associator,
typename Handler, typename DefaultCandidate>
struct associator<Associator,
ceph::async::ForwardingHandler<Handler>, DefaultCandidate>
: Associator<Handler, DefaultCandidate>
{
static auto get(const ceph::async::ForwardingHandler<Handler>& h) noexcept {
return Associator<Handler, DefaultCandidate>::get(h.handler);
}
static auto get(const ceph::async::ForwardingHandler<Handler>& h,
const DefaultCandidate& c) noexcept {
return Associator<Handler, DefaultCandidate>::get(h.handler, c);
}
};

Expand Down

0 comments on commit 4603b3e

Please sign in to comment.