forked from scylladb/seastar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.hh
228 lines (200 loc) · 5.36 KB
/
queue.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#ifndef QUEUE_HH_
#define QUEUE_HH_
#include "circular_buffer.hh"
#include "future.hh"
#include <queue>
#include <experimental/optional>
template <typename T>
class queue {
std::queue<T, circular_buffer<T>> _q;
size_t _max;
std::experimental::optional<promise<>> _not_empty;
std::experimental::optional<promise<>> _not_full;
std::exception_ptr _ex = nullptr;
private:
void notify_not_empty();
void notify_not_full();
public:
explicit queue(size_t size);
// Push an item.
//
// Returns false if the queue was full and the item was not pushed.
bool push(T&& a);
// pops an item.
T pop();
// Consumes items from the queue, passing them to @func, until @func
// returns false or the queue it empty
//
// Returns false if func returned false.
template <typename Func>
bool consume(Func&& func);
// Returns true when the queue is empty.
bool empty() const;
// Returns true when the queue is full.
bool full() const;
// Returns a future<> that becomes available when pop() or consume()
// can be called.
future<> not_empty();
// Returns a future<> that becomes available when push() can be called.
future<> not_full();
// Pops element now or when ther is some. Returns a future that becomes
// available when some element is available.
future<T> pop_eventually();
// Pushes the element now or when there is room. Returns a future<> which
// resolves when data was pushed.
future<> push_eventually(T&& data);
size_t size() const { return _q.size(); }
size_t max_size() const { return _max; }
// Destroy any items in the queue, and pass the provided exception to any
// waiting readers or writers.
void abort(std::exception_ptr ex) {
while (!_q.empty()) {
_q.pop();
}
_ex = ex;
if (_not_full) {
_not_full->set_exception(ex);
_not_full= std::experimental::nullopt;
}
if (_not_empty) {
_not_empty->set_exception(std::move(ex));
_not_empty = std::experimental::nullopt;
}
}
};
template <typename T>
inline
queue<T>::queue(size_t size)
: _max(size) {
}
template <typename T>
inline
void queue<T>::notify_not_empty() {
if (_not_empty) {
_not_empty->set_value();
_not_empty = std::experimental::optional<promise<>>();
}
}
template <typename T>
inline
void queue<T>::notify_not_full() {
if (_not_full) {
_not_full->set_value();
_not_full = std::experimental::optional<promise<>>();
}
}
template <typename T>
inline
bool queue<T>::push(T&& data) {
if (_q.size() < _max) {
_q.push(std::move(data));
notify_not_empty();
return true;
} else {
return false;
}
}
template <typename T>
inline
T queue<T>::pop() {
if (_q.size() == _max) {
notify_not_full();
}
T data = std::move(_q.front());
_q.pop();
return data;
}
template <typename T>
inline
future<T> queue<T>::pop_eventually() {
if (empty()) {
return not_empty().then([this] {
if (_ex) {
return make_exception_future<T>(_ex);
} else {
return make_ready_future<T>(pop());
}
});
} else {
return make_ready_future<T>(pop());
}
}
template <typename T>
inline
future<> queue<T>::push_eventually(T&& data) {
if (full()) {
return not_full().then([this, data = std::move(data)] () mutable {
_q.push(std::move(data));
notify_not_empty();
});
} else {
_q.push(std::move(data));
notify_not_empty();
return make_ready_future<>();
}
}
template <typename T>
template <typename Func>
inline
bool queue<T>::consume(Func&& func) {
if (_q.size() == _max) {
notify_not_full();
}
bool running = true;
while (!_q.empty() && running) {
running = func(std::move(_q.front()));
_q.pop();
}
return running;
}
template <typename T>
inline
bool queue<T>::empty() const {
return _q.empty();
}
template <typename T>
inline
bool queue<T>::full() const {
return _q.size() == _max;
}
template <typename T>
inline
future<> queue<T>::not_empty() {
if (!empty()) {
return make_ready_future<>();
} else {
_not_empty = promise<>();
return _not_empty->get_future();
}
}
template <typename T>
inline
future<> queue<T>::not_full() {
if (!full()) {
return make_ready_future<>();
} else {
_not_full = promise<>();
return _not_full->get_future();
}
}
#endif /* QUEUE_HH_ */