-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlibrados_asio.h
293 lines (259 loc) · 11.5 KB
/
librados_asio.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2017 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*/
#ifndef LIBRADOS_ASIO_H
#define LIBRADOS_ASIO_H
#include <memory>
#include <boost/asio.hpp>
#include "include/rados/librados.hpp"
/// Defines asynchronous librados operations that satisfy all of the
/// "Requirements on asynchronous operations" imposed by the C++ Networking TS
/// in section 13.2.7. Many of the type and variable names below are taken
/// directly from those requirements.
///
/// The current draft of the Networking TS (as of 2017-11-27) is available here:
/// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/n4711.pdf
///
/// The boost::asio documentation duplicates these requirements here:
/// http://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/asynchronous_operations.html
namespace librados {
namespace detail {
/// unique_ptr with custom deleter for AioCompletion
struct release_completion {
void operator()(AioCompletion *c) { c->release(); }
};
using unique_completion_ptr =
std::unique_ptr<AioCompletion, release_completion>;
/// Invokes the given completion handler. When the type of Result is not void,
/// storage is provided for it and that result is passed as an additional
/// argument to the handler.
template <typename Result>
struct invoker {
Result result;
template <typename CompletionHandler>
void invoke(CompletionHandler& completion_handler,
boost::system::error_code ec) {
completion_handler(ec, std::move(result));
}
};
// specialization for Result=void
template <>
struct invoker<void> {
template <typename CompletionHandler>
void invoke(CompletionHandler& completion_handler,
boost::system::error_code ec) {
completion_handler(ec);
}
};
/// The function object that eventually gets dispatched back to its associated
/// executor to invoke the completion_handler with our bound error_code and
/// Result arguments.
/// Inherits from invoker for empty base optimization when Result=void.
template <typename CompletionHandler, typename Result, typename Executor2>
struct bound_completion_handler : public invoker<Result> {
CompletionHandler completion_handler; //< upcall handler from CompletionToken
boost::system::error_code ec;
Executor2 ex2; //< associated completion handler executor
bound_completion_handler(CompletionHandler& completion_handler, Executor2 ex2)
: completion_handler(completion_handler), ex2(ex2)
{
// static check for CompletionHandler concept (must be CopyConstructible and
// callable with no arguments)
using namespace boost::asio;
BOOST_ASIO_COMPLETION_HANDLER_CHECK(bound_completion_handler, *this) type_check;
}
/// Invoke the completion handler with our bound arguments
void operator()() {
this->invoke(completion_handler, ec);
}
/// Delegate to CompletionHandler's associated allocator
using allocator_type = boost::asio::associated_allocator_t<CompletionHandler>;
allocator_type get_allocator() const noexcept {
return boost::asio::get_associated_allocator(completion_handler);
}
/// Use our associated completion handler executor
using executor_type = Executor2;
executor_type get_executor() const noexcept {
return ex2;
}
};
/// Operation state needed to invoke the handler on completion. This state must
/// be allocated so that its address can be passed through the AioCompletion
/// callback. This memory is managed by the CompletionHandler's associated
/// allocator according to "Allocation of intermediate storage" requirements.
template <typename CompletionHandler, typename Result, typename Executor1>
struct op_state {
/// completion handler executor, which delegates to CompletionHandler's
/// associated executor or defaults to the io executor
using Executor2 = boost::asio::associated_executor_t<CompletionHandler, Executor1>;
/// maintain outstanding work on the io executor
boost::asio::executor_work_guard<Executor1> work1;
/// maintain outstanding work on the completion handler executor
boost::asio::executor_work_guard<Executor2> work2;
/// the function object that invokes the completion handler
bound_completion_handler<CompletionHandler, Result, Executor2> f;
unique_completion_ptr completion; //< the AioCompletion
op_state(CompletionHandler& completion_handler, Executor1 ex1,
unique_completion_ptr&& completion)
: work1(ex1),
work2(boost::asio::get_associated_executor(completion_handler, ex1)),
f(completion_handler, work2.get_executor()),
completion(std::move(completion))
{}
using Handler = CompletionHandler; // the following macro wants a Handler type
/// Defines a scoped 'op_state::ptr' type that uses CompletionHandler's
/// associated allocator to manage its memory
BOOST_ASIO_DEFINE_HANDLER_PTR(op_state);
};
/// Handler allocators require that their memory is released before the handler
/// itself is invoked. Return a moved copy of the bound completion handler after
/// destroying/deallocating the op_state.
template <typename StatePtr>
auto release_handler(StatePtr&& p) -> decltype(p.p->f)
{
// move the completion handler out of the memory being released
auto f = std::move(p.p->f);
// return the memory to the moved handler's associated allocator
p.h = std::addressof(f.completion_handler);
p.reset();
return f;
}
/// AioCompletion callback function, executed in the librados finisher thread.
template <typename State, typename StatePtr = typename State::ptr>
inline void aio_op_dispatch(completion_t cb, void *arg)
{
auto op = static_cast<State*>(arg);
const int ret = op->completion->get_return_value();
// maintain work until the completion handler is dispatched. these would
// otherwise be destroyed with op_state in release_handler()
auto work1 = std::move(op->work1);
auto work2 = std::move(op->work2);
// return the memory to the handler allocator
auto f = release_handler<StatePtr>({nullptr, op, op});
if (ret < 0) {
// assign the bound error code
f.ec.assign(-ret, boost::system::system_category());
}
// dispatch the completion handler using its associated allocator/executor
auto alloc2 = boost::asio::get_associated_allocator(f);
f.ex2.dispatch(std::move(f), alloc2);
}
/// Create an AioCompletion and return it as a unique_ptr.
template <typename State>
inline unique_completion_ptr make_completion(void *op)
{
auto cb = aio_op_dispatch<State>;
return unique_completion_ptr{Rados::aio_create_completion(op, nullptr, cb)};
}
/// Allocate op state using the CompletionHandler's associated allocator.
template <typename Result, typename Executor1, typename CompletionHandler,
typename State = op_state<CompletionHandler, Result, Executor1>,
typename StatePtr = typename State::ptr>
StatePtr make_op_state(Executor1&& ex1, CompletionHandler& handler)
{
// allocate a block of memory with StatePtr::allocate()
StatePtr p = {std::addressof(handler), StatePtr::allocate(handler), 0};
// create an AioCompletion to call aio_op_dispatch() with this pointer
auto completion = make_completion<State>(p.v);
// construct the op_state in place
p.p = new (p.v) State(handler, ex1, std::move(completion));
return p;
}
} // namespace detail
/// Calls IoCtx::aio_read() and arranges for the AioCompletion to call a
/// given handler with signature (boost::system::error_code, bufferlist).
template <typename ExecutionContext, typename CompletionToken,
typename Signature = void(boost::system::error_code, bufferlist)>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
size_t len, uint64_t off, CompletionToken&& token)
{
boost::asio::async_completion<CompletionToken, Signature> init(token);
auto p = detail::make_op_state<bufferlist>(ctx.get_executor(),
init.completion_handler);
int ret = io.aio_read(oid, p.p->completion.get(),
&p.p->f.result, len, off);
if (ret < 0) {
// post the completion after releasing the handler-allocated memory
p.p->f.ec.assign(-ret, boost::system::system_category());
boost::asio::post(detail::release_handler(std::move(p)));
} else {
p.v = p.p = nullptr; // release ownership until completion
}
return init.result.get();
}
/// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a
/// given handler with signature (boost::system::error_code).
template <typename ExecutionContext, typename CompletionToken,
typename Signature = void(boost::system::error_code)>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
bufferlist &bl, size_t len, uint64_t off,
CompletionToken&& token)
{
boost::asio::async_completion<CompletionToken, Signature> init(token);
auto p = detail::make_op_state<void>(ctx.get_executor(),
init.completion_handler);
int ret = io.aio_write(oid, p.p->completion.get(), bl, len, off);
if (ret < 0) {
p.p->f.ec.assign(-ret, boost::system::system_category());
boost::asio::post(detail::release_handler(std::move(p)));
} else {
p.v = p.p = nullptr; // release ownership until completion
}
return init.result.get();
}
/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
/// given handler with signature (boost::system::error_code, bufferlist).
template <typename ExecutionContext, typename CompletionToken,
typename Signature = void(boost::system::error_code, bufferlist)>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
ObjectReadOperation *op, int flags,
CompletionToken&& token)
{
boost::asio::async_completion<CompletionToken, Signature> init(token);
auto p = detail::make_op_state<bufferlist>(ctx.get_executor(),
init.completion_handler);
int ret = io.aio_operate(oid, p.p->completion.get(), op,
flags, &p.p->f.result);
if (ret < 0) {
p.p->f.ec.assign(-ret, boost::system::system_category());
boost::asio::post(detail::release_handler(std::move(p)));
} else {
p.v = p.p = nullptr; // release ownership until completion
}
return init.result.get();
}
/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
/// given handler with signature (boost::system::error_code).
template <typename ExecutionContext, typename CompletionToken,
typename Signature = void(boost::system::error_code)>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)
async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
ObjectWriteOperation *op, int flags,
CompletionToken &&token)
{
boost::asio::async_completion<CompletionToken, Signature> init(token);
auto p = detail::make_op_state<void>(ctx.get_executor(),
init.completion_handler);
int ret = io.aio_operate(oid, p.p->completion.get(), op, flags);
if (ret < 0) {
p.p->f.ec.assign(-ret, boost::system::system_category());
boost::asio::post(detail::release_handler(std::move(p)));
} else {
p.v = p.p = nullptr; // release ownership until completion
}
return init.result.get();
}
} // namespace librados
#endif // LIBRADOS_ASIO_H