forked from facebookexperimental/libunifex
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathio_uring_2_test.cpp
145 lines (127 loc) · 4.59 KB
/
io_uring_2_test.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <unifex/config.hpp>
#if !UNIFEX_NO_LIBURING
# if !UNIFEX_NO_COROUTINES
# include <unifex/linux/io_uring_context.hpp>
# include <unifex/finally.hpp>
# include <unifex/inplace_stop_token.hpp>
# include <unifex/never.hpp>
# include <unifex/scheduler_concepts.hpp>
# include <unifex/stop_when.hpp>
# include <unifex/stream_concepts.hpp>
# include <unifex/sync_wait.hpp>
# include <unifex/task.hpp>
# include <unifex/then.hpp>
# include <unifex/when_all.hpp>
# include <chrono>
# include <cstdio>
# include <fcntl.h>
# include <string>
# include <thread>
# include <gtest/gtest.h>
using namespace unifex;
using namespace unifex::linuxos;
namespace {
constexpr std::chrono::milliseconds stopAfter{42};
const char* fdPath = "/proc/self/fd/";
struct IOUringTest : testing::Test {
void SetUp() override {
ASSERT_NE(pipe(pipes_), -1) << "unable to create pipe";
close_ = true;
}
~IOUringTest() {
if (close_) {
close(pipes_[0]);
close(pipes_[1]);
}
stopSource_.request_stop();
t_.join();
}
private:
bool close_{false};
protected:
int pipes_[2];
io_uring_context ctx_;
inplace_stop_source stopSource_;
std::thread t_{[&] {
ctx_.run(stopSource_.get_token());
}};
task<void> accept(io_uring_context::scheduler sched) {
// open on a random port, will hang forever
auto stream = open_listening_socket(sched, 0);
co_await finally(unifex::next(stream), unifex::cleanup(stream));
ADD_FAILURE() << "should cancel and unroll";
}
task<void> read(io_uring_context::scheduler sched) {
auto in = open_file_read_only(sched, fdPath + std::to_string(pipes_[0]));
std::array<char, 1024> buffer;
// will hang forever
co_await async_read_some_at(
in, 0, as_writable_bytes(span{buffer.data(), buffer.size()}));
ADD_FAILURE() << "should cancel and unroll";
}
auto bloat() const {
// pipe is blocking when full (what we want), settings are env. specific
auto size = fcntl(pipes_[1], F_GETPIPE_SZ);
EXPECT_GT(size, 0);
std::printf("Pipe size: %d\n", size);
return std::string(static_cast<std::size_t>(size), '?');
}
task<void> write(io_uring_context::scheduler sched) {
auto data = bloat();
const auto buffer = as_bytes(span{data.data(), data.size()});
auto out = open_file_write_only(sched, fdPath + std::to_string(pipes_[1]));
// Start 8 concurrent writes to the file at different offsets.
co_await when_all(
// Calls the 'async_write_some_at()' CPO on the file object
// returned from 'open_file_write_only()'.
async_write_some_at(out, 0, buffer),
async_write_some_at(out, 1 * buffer.size(), buffer),
async_write_some_at(out, 2 * buffer.size(), buffer),
async_write_some_at(out, 3 * buffer.size(), buffer),
async_write_some_at(out, 4 * buffer.size(), buffer),
async_write_some_at(out, 5 * buffer.size(), buffer),
async_write_some_at(out, 6 * buffer.size(), buffer),
async_write_some_at(out, 7 * buffer.size(), buffer));
ADD_FAILURE() << "should cancel and unroll";
}
};
task<void>
stopTrigger(std::chrono::milliseconds ms, io_uring_context::scheduler sched) {
co_await stop_when(
schedule_at(sched, now(sched) + ms) |
then([ms] { std::printf("Timeout after %ldms\n", ms.count()); }),
never_sender());
}
} // namespace
TEST_F(IOUringTest, AsyncReadCancel) {
auto scheduler = ctx_.get_scheduler();
// cancel the read from *nix pipe
sync_wait(stop_when(read(scheduler), stopTrigger(stopAfter, scheduler)));
}
TEST_F(IOUringTest, AsyncWriteCancel) {
auto scheduler = ctx_.get_scheduler();
// cancel the write into *nix pipe
sync_wait(stop_when(write(scheduler), stopTrigger(stopAfter, scheduler)));
}
TEST_F(IOUringTest, AcceptCancel) {
auto scheduler = ctx_.get_scheduler();
// cancel the accept stream
sync_wait(stop_when(accept(scheduler), stopTrigger(stopAfter, scheduler)));
}
# endif // UNIFEX_NO_LIBURING
#endif // UNIFEX_NO_LIBURING