forked from scylladb/seastar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
semaphore.hh
423 lines (395 loc) · 16.1 KB
/
semaphore.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#ifndef CORE_SEMAPHORE_HH_
#define CORE_SEMAPHORE_HH_
#include "future.hh"
#include "chunked_fifo.hh"
#include <stdexcept>
#include <exception>
#include "timer.hh"
#include "expiring_fifo.hh"
namespace seastar {
/// \addtogroup fiber-module
/// @{
/// Exception thrown when a semaphore is broken by
/// \ref semaphore::broken().
class broken_semaphore : public std::exception {
public:
/// Reports the exception reason.
virtual const char* what() const noexcept {
return "Semaphore broken";
}
};
/// Exception thrown when a semaphore wait operation
/// times out.
///
/// \see semaphore::wait(typename timer<>::duration timeout, size_t nr)
class semaphore_timed_out : public std::exception {
public:
/// Reports the exception reason.
virtual const char* what() const noexcept {
return "Semaphore timedout";
}
};
/// Exception Factory for standard semaphore
///
/// constructs standard semaphore exceptions
/// \see semaphore_timed_out and broken_semaphore
struct semaphore_default_exception_factory {
static semaphore_timed_out timeout() {
return semaphore_timed_out();
}
static broken_semaphore broken() {
return broken_semaphore();
}
};
/// \brief Counted resource guard.
///
/// This is a standard computer science semaphore, adapted
/// for futures. You can deposit units into a counter,
/// or take them away. Taking units from the counter may wait
/// if not enough units are available.
///
/// To support exceptional conditions, a \ref broken() method
/// is provided, which causes all current waiters to stop waiting,
/// with an exceptional future returned. This allows causing all
/// fibers that are blocked on a semaphore to continue. This is
/// similar to POSIX's `pthread_cancel()`, with \ref wait() acting
/// as a cancellation point.
///
/// \tparam ExceptionFactory template parameter allows modifying a semaphore to throw
/// customized exceptions on timeout/broken(). It has to provide two static functions
/// ExceptionFactory::timeout() and ExceptionFactory::broken() which return corresponding
/// exception object.
template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
class basic_semaphore {
public:
using duration = typename timer<Clock>::duration;
using clock = typename timer<Clock>::clock;
using time_point = typename timer<Clock>::time_point;
private:
ssize_t _count;
std::exception_ptr _ex;
struct entry {
promise<> pr;
size_t nr;
entry(promise<>&& pr_, size_t nr_) : pr(std::move(pr_)), nr(nr_) {}
};
struct expiry_handler {
void operator()(entry& e) noexcept {
e.pr.set_exception(ExceptionFactory::timeout());
}
};
expiring_fifo<entry, expiry_handler, clock> _wait_list;
bool has_available_units(size_t nr) const {
return _count >= 0 && (static_cast<size_t>(_count) >= nr);
}
bool may_proceed(size_t nr) const {
return has_available_units(nr) && _wait_list.empty();
}
public:
/// Returns the maximum number of units the semaphore counter can hold
static constexpr size_t max_counter() {
return std::numeric_limits<decltype(_count)>::max();
}
/// Constructs a semaphore object with a specific number of units
/// in its internal counter. E.g., starting it at 1 is suitable for use as
/// an unlocked mutex.
///
/// \param count number of initial units present in the counter.
basic_semaphore(size_t count) : _count(count) {}
/// Waits until at least a specific number of units are available in the
/// counter, and reduces the counter by that amount of units.
///
/// \note Waits are serviced in FIFO order, though if several are awakened
/// at once, they may be reordered by the scheduler.
///
/// \param nr Amount of units to wait for (default 1).
/// \return a future that becomes ready when sufficient units are available
/// to satisfy the request. If the semaphore was \ref broken(), may
/// contain an exception.
future<> wait(size_t nr = 1) {
return wait(time_point::max(), nr);
}
/// Waits until at least a specific number of units are available in the
/// counter, and reduces the counter by that amount of units. If the request
/// cannot be satisfied in time, the request is aborted.
///
/// \note Waits are serviced in FIFO order, though if several are awakened
/// at once, they may be reordered by the scheduler.
///
/// \param timeout expiration time.
/// \param nr Amount of units to wait for (default 1).
/// \return a future that becomes ready when sufficient units are available
/// to satisfy the request. On timeout, the future contains a
/// \ref semaphore_timed_out exception. If the semaphore was
/// \ref broken(), may contain an exception.
future<> wait(time_point timeout, size_t nr = 1) {
if (may_proceed(nr)) {
_count -= nr;
return make_ready_future<>();
}
if (_ex) {
return make_exception_future(_ex);
}
promise<> pr;
auto fut = pr.get_future();
_wait_list.push_back(entry(std::move(pr), nr), timeout);
return fut;
}
/// Waits until at least a specific number of units are available in the
/// counter, and reduces the counter by that amount of units. If the request
/// cannot be satisfied in time, the request is aborted.
///
/// \note Waits are serviced in FIFO order, though if several are awakened
/// at once, they may be reordered by the scheduler.
///
/// \param timeout how long to wait.
/// \param nr Amount of units to wait for (default 1).
/// \return a future that becomes ready when sufficient units are available
/// to satisfy the request. On timeout, the future contains a
/// \ref semaphore_timed_out exception. If the semaphore was
/// \ref broken(), may contain an exception.
future<> wait(duration timeout, size_t nr = 1) {
return wait(clock::now() + timeout, nr);
}
/// Deposits a specified number of units into the counter.
///
/// The counter is incremented by the specified number of units.
/// If the new counter value is sufficient to satisfy the request
/// of one or more waiters, their futures (in FIFO order) become
/// ready, and the value of the counter is reduced according to
/// the amount requested.
///
/// \param nr Number of units to deposit (default 1).
void signal(size_t nr = 1) {
if (_ex) {
return;
}
_count += nr;
while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
auto& x = _wait_list.front();
_count -= x.nr;
x.pr.set_value();
_wait_list.pop_front();
}
}
/// Consume the specific number of units without blocking
//
/// Consume the specific number of units now, regardless of how many units are available
/// in the counter, and reduces the counter by that amount of units. This operation may
/// cause the counter to go negative.
///
/// \param nr Amount of units to consume (default 1).
void consume(size_t nr = 1) {
if (_ex) {
return;
}
_count -= nr;
}
/// Attempts to reduce the counter value by a specified number of units.
///
/// If sufficient units are available in the counter, and if no
/// other fiber is waiting, then the counter is reduced. Otherwise,
/// nothing happens. This is useful for "opportunistic" waits where
/// useful work can happen if the counter happens to be ready, but
/// when it is not worthwhile to wait.
///
/// \param nr number of units to reduce the counter by (default 1).
/// \return `true` if the counter had sufficient units, and was decremented.
bool try_wait(size_t nr = 1) {
if (may_proceed(nr)) {
_count -= nr;
return true;
} else {
return false;
}
}
/// Returns the number of units available in the counter.
///
/// Does not take into account any waiters.
size_t current() const { return std::max(_count, ssize_t(0)); }
/// Returns the number of available units.
///
/// Takes into account units consumed using \ref consume() and therefore
/// may return a negative value.
ssize_t available_units() const { return _count; }
/// Returns the current number of waiters
size_t waiters() const { return _wait_list.size(); }
/// Signal to waiters that an error occurred. \ref wait() will see
/// an exceptional future<> containing a \ref broken_semaphore exception.
/// The future is made available immediately.
void broken() { broken(std::make_exception_ptr(ExceptionFactory::broken())); }
/// Signal to waiters that an error occurred. \ref wait() will see
/// an exceptional future<> containing the provided exception parameter.
/// The future is made available immediately.
template <typename Exception>
void broken(const Exception& ex) {
broken(std::make_exception_ptr(ex));
}
/// Signal to waiters that an error occurred. \ref wait() will see
/// an exceptional future<> containing the provided exception parameter.
/// The future is made available immediately.
void broken(std::exception_ptr ex);
/// Reserve memory for waiters so that wait() will not throw.
void ensure_space_for_waiters(size_t n) {
_wait_list.reserve(n);
}
};
template<typename ExceptionFactory, typename Clock>
inline
void
basic_semaphore<ExceptionFactory, Clock>::broken(std::exception_ptr xp) {
_ex = xp;
_count = 0;
while (!_wait_list.empty()) {
auto& x = _wait_list.front();
x.pr.set_exception(xp);
_wait_list.pop_front();
}
}
template<typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
class semaphore_units {
basic_semaphore<ExceptionFactory, Clock>& _sem;
size_t _n;
public:
semaphore_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t n) noexcept : _sem(sem), _n(n) {}
semaphore_units(semaphore_units&& o) noexcept : _sem(o._sem), _n(o._n) {
o._n = 0;
}
semaphore_units& operator=(semaphore_units&& o) noexcept {
if (this != &o) {
this->~semaphore_units();
new (this) semaphore_units(std::move(o));
}
return *this;
}
semaphore_units(const semaphore_units&) = delete;
~semaphore_units() noexcept {
if (_n) {
_sem.signal(_n);
}
}
/// Releases ownership of the units. The semaphore will not be signalled.
///
/// \return the number of units held
size_t release() {
return std::exchange(_n, 0);
}
};
/// \brief Take units from semaphore temporarily
///
/// Takes units from the semaphore and returns them when the \ref semaphore_units object goes out of scope.
/// This provides a safe way to temporarily take units from a semaphore and ensure
/// that they are eventually returned under all circumstances (exceptions, premature scope exits, etc).
///
/// Unlike with_semaphore(), the scope of unit holding is not limited to the scope of a single async lambda.
///
/// \param sem The semaphore to take units from
/// \param units Number of units to take
/// \return a \ref future<> holding \ref semaphore_units object. When the object goes out of scope
/// the units are returned to the semaphore.
///
/// \note The caller must guarantee that \c sem is valid as long as
/// \ref seaphore_units object is alive.
///
/// \related semaphore
template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
future<semaphore_units<ExceptionFactory, Clock>>
get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) {
return sem.wait(units).then([&sem, units] {
return semaphore_units<ExceptionFactory, Clock>{ sem, units };
});
}
/// \brief Take units from semaphore temporarily with time bound on wait
///
/// Like \ref get_units(basic_semaphore<ExceptionFactory>&, size_t) but when
/// timeout is reached before units are granted throws semaphore_timed_out exception.
///
/// \param sem The semaphore to take units from
/// \param units Number of units to take
/// \return a \ref future<> holding \ref semaphore_units object. When the object goes out of scope
/// the units are returned to the semaphore.
///
/// \note The caller must guarantee that \c sem is valid as long as
/// \ref seaphore_units object is alive.
///
/// \related semaphore
template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
future<semaphore_units<ExceptionFactory, Clock>>
get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, typename basic_semaphore<ExceptionFactory, Clock>::time_point timeout) {
return sem.wait(timeout, units).then([&sem, units] {
return semaphore_units<ExceptionFactory, Clock>{ sem, units };
});
}
/// \brief Consume units from semaphore temporarily
///
/// Consume units from the semaphore and returns them when the \ref semaphore_units object goes out of scope.
/// This provides a safe way to temporarily take units from a semaphore and ensure
/// that they are eventually returned under all circumstances (exceptions, premature scope exits, etc).
///
/// Unlike get_units(), this calls the non-blocking consume() API.
///
/// Unlike with_semaphore(), the scope of unit holding is not limited to the scope of a single async lambda.
///
/// \param sem The semaphore to take units from
/// \param units Number of units to consume
template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
semaphore_units<ExceptionFactory, Clock>
consume_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) {
sem.consume(units);
return semaphore_units<ExceptionFactory, Clock>{ sem, units };
}
/// \brief Runs a function protected by a semaphore
///
/// Acquires a \ref semaphore, runs a function, and releases
/// the semaphore, returning the the return value of the function,
/// as a \ref future.
///
/// \param sem The semaphore to be held while the \c func is
/// running.
/// \param units Number of units to acquire from \c sem (as
/// with semaphore::wait())
/// \param func The function to run; signature \c void() or
/// \c future<>().
/// \return a \ref future<> holding the function's return value
/// or exception thrown; or a \ref future<> containing
/// an exception from one of the semaphore::broken()
/// variants.
///
/// \note The caller must guarantee that \c sem is valid until
/// the future returned by with_semaphore() resolves.
///
/// \related semaphore
template <typename ExceptionFactory, typename Func, typename Clock = typename timer<>::clock>
inline
futurize_t<std::result_of_t<Func()>>
with_semaphore(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units, Func&& func) {
return get_units(sem, units).then([func = std::forward<Func>(func)] (auto units) mutable {
return futurize_apply(std::forward<Func>(func)).finally([units = std::move(units)] {});
});
}
/// default basic_semaphore specialization that throws semaphore specific exceptions
/// on error conditions.
using semaphore = basic_semaphore<semaphore_default_exception_factory>;
/// @}
}
#endif /* CORE_SEMAPHORE_HH_ */