diff --git a/uvco/close.cc b/uvco/close.cc index 7d81e78..86cd4a0 100644 --- a/uvco/close.cc +++ b/uvco/close.cc @@ -3,6 +3,7 @@ #include #include "uvco/close.h" +#include "uvco/internal/internal_utils.h" #include "uvco/loop/loop.h" #include "uvco/run.h" @@ -20,7 +21,7 @@ bool CloseAwaiter::await_ready() const { return closed_; } void CloseAwaiter::await_resume() {} void onCloseCallback(uv_handle_t *stream) { - auto *awaiter = (CloseAwaiter *)stream->data; + auto *awaiter = getData(stream); awaiter->closed_ = true; if (awaiter->handle_) { auto handle = *awaiter->handle_; diff --git a/uvco/close.h b/uvco/close.h index e0d8dc0..03cc68e 100644 --- a/uvco/close.h +++ b/uvco/close.h @@ -28,10 +28,10 @@ template Promise closeHandle(T *handle, C closer) { BOOST_ASSERT(handle != nullptr); CloseAwaiter awaiter{}; - handle->data = &awaiter; + setData(handle, &awaiter); closer(handle, onCloseCallback); co_await awaiter; - handle->data = nullptr; + setData(handle, (void *)nullptr); BOOST_ASSERT(awaiter.closed_); } diff --git a/uvco/fs.h b/uvco/fs.h index 126130c..f19c351 100644 --- a/uvco/fs.h +++ b/uvco/fs.h @@ -39,9 +39,12 @@ class Directory { uv_dirent_type_t type; }; + /// Create a directory. static Promise mkdir(const Loop &loop, std::string_view path, int mode = 0755); + /// Remove a directory. It must be empty. static Promise rmdir(const Loop &loop, std::string_view path); + /// Open a directory for reading. static Promise open(const Loop &loop, std::string_view path); /// Read all directory entries of the given directory. static MultiPromise readAll(const Loop &loop, std::string_view path); diff --git a/uvco/name_resolution.cc b/uvco/name_resolution.cc index f40211a..c2c36aa 100644 --- a/uvco/name_resolution.cc +++ b/uvco/name_resolution.cc @@ -176,7 +176,7 @@ Promise Resolver::gai(std::string_view host, void Resolver::onAddrinfo(uv_getaddrinfo_t *req, uv_status status, struct addrinfo *result) { - auto *awaiter = (AddrinfoAwaiter_ *)req->data; + auto *awaiter = getRequestData(req); awaiter->addrinfo_ = result; awaiter->status_ = status; BOOST_ASSERT(awaiter->handle_); diff --git a/uvco/stream.cc b/uvco/stream.cc index 54be205..2c6208f 100644 --- a/uvco/stream.cc +++ b/uvco/stream.cc @@ -143,7 +143,7 @@ size_t StreamBase::InStreamAwaiter_::await_resume() { void StreamBase::InStreamAwaiter_::allocate(uv_handle_t *handle, size_t /*suggested_size*/, uv_buf_t *buf) { - const InStreamAwaiter_ *awaiter = (InStreamAwaiter_ *)handle->data; + const InStreamAwaiter_ *awaiter = getData(handle); BOOST_ASSERT(awaiter != nullptr); buf->base = awaiter->buffer_.data(); buf->len = awaiter->buffer_.size(); @@ -162,7 +162,7 @@ void StreamBase::InStreamAwaiter_::stop_read() { void StreamBase::InStreamAwaiter_::onInStreamRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t * /*buf*/) { - auto *awaiter = (InStreamAwaiter_ *)stream->data; + auto *awaiter = getData(stream); BOOST_ASSERT(awaiter != nullptr); awaiter->stop_read(); awaiter->status_ = nread; @@ -172,7 +172,7 @@ void StreamBase::InStreamAwaiter_::onInStreamRead(uv_stream_t *stream, awaiter->handle_.reset(); Loop::enqueue(handle); } - stream->data = nullptr; + setData(stream, (void*)nullptr); } StreamBase::OutStreamAwaiter_::OutStreamAwaiter_(StreamBase &stream, @@ -223,14 +223,14 @@ uv_status StreamBase::OutStreamAwaiter_::await_resume() { void StreamBase::OutStreamAwaiter_::onOutStreamWrite(uv_write_t *write, uv_status status) { - auto *awaiter = (OutStreamAwaiter_ *)write->data; + auto *awaiter = getRequestData(write); BOOST_ASSERT(awaiter != nullptr); awaiter->status_ = status; BOOST_ASSERT(awaiter->handle_); auto handle = awaiter->handle_.value(); awaiter->handle_.reset(); Loop::enqueue(handle); - write->data = nullptr; + setData(write, (void*) nullptr); } bool StreamBase::ShutdownAwaiter_::await_ready() { return false; } @@ -251,7 +251,7 @@ void StreamBase::ShutdownAwaiter_::await_resume() { void StreamBase::ShutdownAwaiter_::onShutdown(uv_shutdown_t *req, uv_status status) { - auto *awaiter = (ShutdownAwaiter_ *)req->data; + auto *awaiter = getRequestData(req); awaiter->status_ = status; if (awaiter->handle_) { auto handle = awaiter->handle_.value(); diff --git a/uvco/stream_server_base_impl.cc b/uvco/stream_server_base_impl.cc index 4dedd14..4301879 100644 --- a/uvco/stream_server_base_impl.cc +++ b/uvco/stream_server_base_impl.cc @@ -41,14 +41,16 @@ StreamServerBase::~StreamServerBase() { template Promise StreamServerBase::close() { - auto *awaiter = (ConnectionAwaiter_ *)socket_->data; - // Resume listener coroutine and tell it to exit. - // If awaiter == nullptr, one of two things is true: - // 1. listener is currently not running - // 2. listener has yielded and is suspended there: the listener generator will - // be cancelled when its MultiPromise is dropped. - if (awaiter != nullptr && awaiter->handle_) { - awaiter->stop(); + if (!dataIsNull(socket_.get())) { + auto *awaiter = getData(socket_.get()); + // Resume listener coroutine and tell it to exit. + // If awaiter == nullptr, one of two things is true: + // 1. listener is currently not running + // 2. listener has yielded and is suspended there: the listener generator + // will be cancelled when its MultiPromise is dropped. + if (awaiter->handle_) { + awaiter->stop(); + } } co_await closeHandle(socket_.get()); socket_.reset(); @@ -59,13 +61,13 @@ MultiPromise StreamServerBase::listen(int backlog) { BOOST_ASSERT(socket_); ConnectionAwaiter_ awaiter{*socket_}; - socket_->data = &awaiter; + setData(socket_.get(), &awaiter); const uv_status listenStatus = uv_listen((uv_stream_t *)socket_.get(), backlog, StreamServerBase::onNewConnection); if (listenStatus != 0) { - socket_->data = nullptr; + setData(socket_.get(), (void *)nullptr); throw UvcoException{listenStatus, "StreamServerBase::listen(): failed to listen"}; } @@ -90,7 +92,7 @@ StreamServerBase::listen(int backlog) { // and will process the remaining connections. Therefore, first remove // the already processed connections. awaiter.accepted_.erase(awaiter.accepted_.begin(), it); - socket_->data = nullptr; + setData(socket_.get(), (void *)nullptr); throw UvcoException{status, "UnixStreamServer failed to accept a connection!"}; } else { @@ -100,14 +102,14 @@ StreamServerBase::listen(int backlog) { // // `close()` also relies on whether `socket_->data` is `nullptr` or not // to decide if the socket has been closed already. - socket_->data = nullptr; + setData(socket_.get(), (void *)nullptr); co_yield std::move(std::get<1>(streamSlot)); - socket_->data = &awaiter; + setData(socket_.get(), &awaiter); } } awaiter.accepted_.clear(); } - socket_->data = nullptr; + setData(socket_.get(), (void *)nullptr); } template @@ -143,7 +145,7 @@ template void StreamServerBase::onNewConnection( uv_stream_t *stream, uv_status status) { const auto *server = (UvStreamType *)stream; - auto *connectionAwaiter = (ConnectionAwaiter_ *)server->data; + auto *connectionAwaiter = getData(server); uv_loop_t *const loop = connectionAwaiter->socket_.loop; if (status == 0) { diff --git a/uvco/tcp.cc b/uvco/tcp.cc index 5681b9f..1562e5c 100644 --- a/uvco/tcp.cc +++ b/uvco/tcp.cc @@ -77,7 +77,7 @@ Promise TcpClient::connect() { } void TcpClient::onConnect(uv_connect_t *req, uv_status status) { - auto *connect = static_cast(req->data); + auto *connect = getRequestData(req); connect->onConnect(status); } diff --git a/uvco/timer.cc b/uvco/timer.cc index 4b7dbfd..3b4078e 100644 --- a/uvco/timer.cc +++ b/uvco/timer.cc @@ -29,7 +29,7 @@ class TimerAwaiter { TimerAwaiter(TimerAwaiter &&other) noexcept : timer_{std::move(other.timer_)}, handle_{other.handle_}, stopped_{other.stopped_} { - timer_->data = this; + setData(timer_.get(), this); other.closed_ = true; } TimerAwaiter &operator=(const TimerAwaiter &) = delete; @@ -37,14 +37,14 @@ class TimerAwaiter { timer_ = std::move(other.timer_); handle_ = other.handle_; stopped_ = other.stopped_; - timer_->data = this; + setData(timer_.get(), this); other.closed_ = true; return *this; } TimerAwaiter(const Loop &loop, uint64_t millis, bool repeating = false) : timer_{std::make_unique()} { uv_timer_init(loop.uvloop(), timer_.get()); - timer_->data = this; + setData(timer_.get(), this); if (repeating) { uv_timer_start(timer_.get(), onMultiTimerFired, millis, millis); } else { @@ -110,13 +110,13 @@ class TimerAwaiter { }; void onSingleTimerDone(uv_timer_t *handle) { - auto *awaiter = (TimerAwaiter *)handle->data; + auto *awaiter = getData(handle); awaiter->stop(); awaiter->resume(); } void onMultiTimerFired(uv_timer_t *handle) { - auto *awaiter = (TimerAwaiter *)handle->data; + auto *awaiter = getData(handle); awaiter->resume(); } diff --git a/uvco/udp.cc b/uvco/udp.cc index 6bad14c..11b5a7f 100644 --- a/uvco/udp.cc +++ b/uvco/udp.cc @@ -187,8 +187,8 @@ MultiPromise> Udp::receiveMany() { Promise Udp::close() { BOOST_ASSERT(udp_); - RecvAwaiter_ *const awaiter = (RecvAwaiter_ *)udp_->data; - if (awaiter != nullptr) { + if (!dataIsNull(udp_.get())) { + auto *const awaiter = getData(udp_.get()); fmt::print(stderr, "Udp::close(): stopping receiving. Please instead use " "Udp::stopReceivingMany() explicitly.\n"); // Force return from receiveMany() generator. @@ -208,10 +208,10 @@ void Udp::stopReceiveMany( udpStopReceive(); // Cancel receiving generator if currently suspended by co_yield. packets.cancel(); - auto *const currentAwaiter = (RecvAwaiter_ *)udp_->data; - if (currentAwaiter == nullptr) { + if (dataIsNull(udp_.get())) { return; } + auto *const currentAwaiter = getData(udp_.get()); // If generator is suspended on co_await, resume it synchronously so it can // exit before the Udp instance is possibly destroyed. if (currentAwaiter->handle_) { @@ -236,8 +236,8 @@ int Udp::udpStartReceive() { void Udp::onReceiveOne(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned int flags) { - auto *awaiter = (RecvAwaiter_ *)handle->data; - BOOST_ASSERT(awaiter != nullptr); + BOOST_ASSERT(!dataIsNull(handle)); + auto *awaiter = getData(handle); if (addr == nullptr) { // Error or asking to free buffers. @@ -379,7 +379,7 @@ Udp::RecvAwaiter_::await_resume() { } void Udp::onSendDone(uv_udp_send_t *req, uv_status status) { - auto *const awaiter = (SendAwaiter_ *)req->data; + auto *const awaiter = getRequestData(req); awaiter->status_ = status; if (awaiter->handle_) { auto resumeHandle = *awaiter->handle_; diff --git a/uvco/udp.h b/uvco/udp.h index c2b3691..668be79 100644 --- a/uvco/udp.h +++ b/uvco/udp.h @@ -79,7 +79,8 @@ class Udp { MultiPromise> receiveMany(); /// Stop receiving with `receiveMany()` by cancelling the receiving generator - /// coroutine. + /// coroutine. Supply the MultiPromise obtained from receiveMany() in order to + /// guarantee a complete clean-up. void stopReceiveMany(MultiPromise> &packets); diff --git a/uvco/uds.cc b/uvco/uds.cc index cfe71d9..aa19d0f 100644 --- a/uvco/uds.cc +++ b/uvco/uds.cc @@ -63,7 +63,7 @@ UnixStreamClient::ConnectAwaiter_::ConnectAwaiter_(const Loop &loop, void UnixStreamClient::ConnectAwaiter_::onConnect(uv_connect_t *req, uv_status status) { - auto *awaiter = (ConnectAwaiter_ *)req->data; + auto *awaiter = getRequestData(req); awaiter->status_ = status; if (awaiter->handle_) { Loop::enqueue(awaiter->handle_.value());