From 2a08c8e62fc3dedbaa841d1cfa487e1456f031bd Mon Sep 17 00:00:00 2001 From: yedf Date: Sun, 5 Jun 2016 10:46:28 +0800 Subject: [PATCH] udp support added --- .gitignore | 3 + README.md | 8 ++- examples/udp-cli.cc | 17 +++++ examples/udp-hsha.cc | 31 ++++++++++ examples/udp-svr.cc | 16 +++++ handy/conn.cc | 1 - handy/handy.h | 1 + handy/udp.cc | 144 +++++++++++++++++++++++++++++++++++++++++++ handy/udp.h | 83 +++++++++++++++++++++++++ 9 files changed, 302 insertions(+), 2 deletions(-) create mode 100644 examples/udp-cli.cc create mode 100644 examples/udp-hsha.cc create mode 100644 examples/udp-svr.cc create mode 100644 handy/udp.cc create mode 100644 handy/udp.h diff --git a/.gitignore b/.gitignore index 1608afe..74accf9 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,9 @@ examples/idle-close examples/reconnect examples/safe-close examples/timer +examples/udp-cli +examples/udp-svr +examples/udp-hsha raw-examples/epoll raw-examples/epoll-et raw-examples/kqueue diff --git a/README.md b/README.md index e02c8b2..f6d9e15 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,10 @@ int main(int argc, const char* argv[]) { 使用protobuf的消息encode/decode示例在protobuf下 +###udp支持 + +支持udp,udp的客户端采用connect方式使用,类似tcp + ###安装与使用 make && make install @@ -92,7 +96,9 @@ int main(int argc, const char* argv[]) { * stat.cc 一个简单的状态服务器示例,一个内嵌的http服务器,方便外部的工具查看应用程序的状态 * write-on-empty.cc 这个例子演示了需要写出大量数据,例如1G文件这种情景中的使用技巧 * daemon.cc 程序已以daemon方式启动,从conf文件中获取日志相关的配置,并初始化日志参数 - +* udp-cli.cc udp的客户端 +* udp-svr.cc udp服务器 +* udp-hsha.cc udp的半同步半异步服务器 license ==== Use of this source code is governed by a BSD-style diff --git a/examples/udp-cli.cc b/examples/udp-cli.cc new file mode 100644 index 0000000..588700b --- /dev/null +++ b/examples/udp-cli.cc @@ -0,0 +1,17 @@ +// echo client +#include +using namespace handy; + +int main(int argc, const char* argv[]) { + setloglevel("TRACE"); + EventBase base; + Signal::signal(SIGINT, [&]{ base.exit(); }); + UdpConnPtr con = UdpConn::createConnection(&base, "127.0.0.1", 99); + exitif(!con, "start udp conn failed"); + con->onMsg([](const UdpConnPtr& p, Buffer buf) { + info("udp recved %lu bytes", buf.size()); + }); + base.runAfter(0, [=](){con->send("hello");}, 1000); + base.loop(); +} + diff --git a/examples/udp-hsha.cc b/examples/udp-hsha.cc new file mode 100644 index 0000000..0c2ad1a --- /dev/null +++ b/examples/udp-hsha.cc @@ -0,0 +1,31 @@ +#include + +using namespace std; +using namespace handy; + +int main(int argc, const char* argv[]) { + setloglevel("TRACE"); + EventBase base; + HSHAUPtr hsha = HSHAU::startServer(&base, "", 99, 1); + exitif(!hsha, "bind failed"); + Signal::signal(SIGINT, [&, hsha]{ base.exit(); hsha->exit(); signal(SIGINT, SIG_DFL);}); + + hsha->onMsg([](const UdpServerPtr& con, const string& input, Ip4Addr addr){ + int ms = rand() % 1000 + 500; + info("processing a msg: %.*s will using %d ms", (int)input.length(), input.data(), ms); + usleep(ms * 1000); + info("msg processed"); + return util::format("%s used %d ms", input.c_str(), ms); + }); + for (int i = 0; i < 1; i ++) { + UdpConnPtr con = UdpConn::createConnection(&base, "localhost", 99); + con->onMsg([](const UdpConnPtr& con, Buffer buf) { + info("%.*s recved", (int)buf.size(), buf.data()); + con->close(); + }); + con->send("hello"); + } + base.runAfter(500, [&,hsha]{info("exiting"); base.exit(); hsha->exit(); }); + base.loop(); + info("program exited"); +} diff --git a/examples/udp-svr.cc b/examples/udp-svr.cc new file mode 100644 index 0000000..a10944e --- /dev/null +++ b/examples/udp-svr.cc @@ -0,0 +1,16 @@ +// echo server +#include +using namespace handy; + +int main(int argc, const char* argv[]) { + setloglevel("TRACE"); + EventBase base; + Signal::signal(SIGINT, [&]{ base.exit(); }); + UdpServerPtr svr = UdpServer::startServer(&base, "", 99); + exitif(!svr, "start udp server failed"); + svr->onMsg([](const UdpServerPtr& p, Buffer buf, Ip4Addr peer) { + info("echo msg: %s to %s", buf.data(), peer.toString().c_str()); + p->sendTo(buf, peer); + }); + base.loop(); +} diff --git a/handy/conn.cc b/handy/conn.cc index 6e13573..87b45cd 100644 --- a/handy/conn.cc +++ b/handy/conn.cc @@ -7,7 +7,6 @@ using namespace std; namespace handy { -int a; void handyUnregisterIdle(EventBase* base, const IdleId& idle); void handyUpdateIdle(EventBase* base, const IdleId& idle); diff --git a/handy/handy.h b/handy/handy.h index d003887..6405c9b 100644 --- a/handy/handy.h +++ b/handy/handy.h @@ -6,3 +6,4 @@ #include "slice.h" #include "threads.h" #include "util.h" +#include "udp.h" diff --git a/handy/udp.cc b/handy/udp.cc new file mode 100644 index 0000000..d2d8b81 --- /dev/null +++ b/handy/udp.cc @@ -0,0 +1,144 @@ +#include "udp.h" +#include + +using namespace std; + +namespace handy { + +UdpServer::UdpServer(EventBases *bases): + base_(bases->allocBase()), + bases_(bases), + channel_(NULL) +{ +} + +int UdpServer::bind(const std::string &host, short port, bool reusePort) { + addr_ = Ip4Addr(host, port); + int fd = socket(AF_INET, SOCK_DGRAM, 0); + int r = net::setReuseAddr(fd); + fatalif(r, "set socket reuse option failed"); + r = net::setReusePort(fd, reusePort); + fatalif(r, "set socket reuse port option failed"); + r = util::addFdFlag(fd, FD_CLOEXEC); + fatalif(r, "addFdFlag FD_CLOEXEC failed"); + r = ::bind(fd,(struct sockaddr *)&addr_.getAddr(),sizeof(struct sockaddr)); + if (r) { + close(fd); + error("bind to %s failed %d %s", addr_.toString().c_str(), errno, strerror(errno)); + return errno; + } + net::setNonBlock(fd); + info("udp fd %d bind to %s", fd, addr_.toString().c_str()); + channel_ = new Channel(base_, fd, kReadEvent); + channel_->onRead([this]{ + Buffer buf; + struct sockaddr_in raddr; + socklen_t rsz = sizeof(raddr); + if (!channel_ || channel_->fd() < 0) { + return; + } + int fd = channel_->fd(); + ssize_t rn = recvfrom(fd, buf.makeRoom(kUdpPacketSize), buf.space(), 0, (sockaddr*)&raddr, &rsz); + if (rn < 0) { + error("udp %d recv failed: %d %s", fd, errno, strerror(errno)); + return; + } + buf.addSize(rn); + trace("udp %d recv %ld bytes from %s", fd, rn, Ip4Addr(raddr).toString().data()); + this->msgcb_(shared_from_this(), buf, raddr); + }); + return 0; +} + +UdpServerPtr UdpServer::startServer(EventBases* bases, const std::string& host, short port, bool reusePort) { + UdpServerPtr p(new UdpServer(bases)); + int r = p->bind(host, port, reusePort); + if (r) { + error("bind to %s:%d failed %d %s", host.c_str(), port, errno, strerror(errno)); + } + return r == 0 ? p : NULL; +} + +void UdpServer::sendTo(const char* buf, size_t len, Ip4Addr addr) { + if (!channel_ || channel_->fd() < 0) { + warn("udp sending %lu bytes to %s after channel closed", len, addr.toString().data()); + return; + } + int fd = channel_->fd(); + int wn = ::sendto(fd, buf, len, 0, (sockaddr*)&addr.getAddr(), sizeof(sockaddr)); + if (wn < 0) { + error("udp %d sendto %s error: %d %s", fd, addr.toString().c_str(), errno, strerror(errno)); + return; + } + trace("udp %d sendto %s %d bytes", fd, addr.toString().c_str(), wn); +} + +UdpConnPtr UdpConn::createConnection(EventBase* base, const string& host, short port) { + Ip4Addr addr(host, port); + int fd = socket(AF_INET, SOCK_DGRAM, 0); + fatalif(fd<0, "socket failed %d %s", errno, strerror(errno)); + net::setNonBlock(fd); + int t = util::addFdFlag(fd, FD_CLOEXEC); + fatalif(t, "addFdFlag FD_CLOEXEC failed %d %s", t, strerror(t)); + int r = ::connect(fd, (sockaddr *) &addr.getAddr(), sizeof(sockaddr_in)); + if (r != 0 && errno != EINPROGRESS) { + error("connect to %s error %d %s", addr.toString().c_str(), errno, strerror(errno)); + return NULL; + } + trace("udp fd %d connecting to %s ok", fd, addr.toString().data()); + UdpConnPtr con(new UdpConn); + con->destHost_ = host; + con->destPort_ = port; + con->peer_ = addr; + Channel* ch = new Channel(base, fd, kReadEvent); + con->channel_ = ch; + ch->onRead([con]{ + if (!con->channel_ || con->channel_->fd() < 0) { + return; + } + Buffer input; + int fd = con->channel_->fd(); + int rn = ::read(fd, input.makeRoom(kUdpPacketSize), kUdpPacketSize); + if (rn < 0) { + error("udp read from %d error %d %s", fd, errno, strerror(errno)); + return; + } + trace("udp %d read %d bytes", fd, rn); + input.addSize(rn); + con->cb_(con, input); + }); + return con; +} + +void UdpConn::send(const char *buf, size_t len) { + if (!channel_ || channel_->fd() < 0) { + warn("udp sending %lu bytes to %s after channel closed", len, peer_.toString().data()); + return; + } + int fd = channel_->fd(); + int wn = ::write(fd, buf, len); + if (wn < 0) { + error("udp %d write error %d %s", fd, errno, strerror(errno)); + return; + } + trace("udp %d write %d bytes", fd, wn); +} + +HSHAUPtr HSHAU::startServer(EventBase* base, const std::string& host, short port, int threads) { + HSHAUPtr p = HSHAUPtr(new HSHAU(threads)); + p->server_ = UdpServer::startServer(base, host, port); + return p->server_ ? p : NULL; +} + +void HSHAU::onMsg(const RetMsgUdpCallBack& cb) { + server_->onMsg([this, cb](const UdpServerPtr& con, Buffer buf, Ip4Addr addr) { + std::string input(buf.data(), buf.size()); + threadPool_.addTask([=]{ + std::string output = cb(con, input, addr); + server_->getBase()->safeCall([=] {if (output.size()) con->sendTo(output, addr); }); + }); + }); +} + + +} \ No newline at end of file diff --git a/handy/udp.h b/handy/udp.h new file mode 100644 index 0000000..927b32c --- /dev/null +++ b/handy/udp.h @@ -0,0 +1,83 @@ +#pragma once + +#include "event_base.h" + +namespace handy { + + struct UdpServer; + struct UdpConn; + typedef std::shared_ptr UdpConnPtr; + typedef std::shared_ptr UdpServerPtr; + typedef std::function UdpCallBack; + typedef std::function UdpSvrCallBack; + const int kUdpPacketSize = 4096; + //Udp服务器 + struct UdpServer : public std::enable_shared_from_this, private noncopyable { + UdpServer(EventBases* bases); + //return 0 on sucess, errno on error + int bind(const std::string& host, short port, bool reusePort=false); + static UdpServerPtr startServer(EventBases* bases, const std::string& host, short port, bool reusePort=false); + ~UdpServer() { delete channel_; } + Ip4Addr getAddr() { return addr_; } + EventBase* getBase() { return base_; } + void sendTo(Buffer msg, Ip4Addr addr) { sendTo(msg.data(), msg.size(), addr); msg.clear(); } + void sendTo(const char* buf, size_t len, Ip4Addr addr); + void sendTo(const std::string& s, Ip4Addr addr) { sendTo(s.data(), s.size(), addr); } + void sendTo(const char* s, Ip4Addr addr) { sendTo(s, strlen(s), addr); } + + //消息的处理 + void onMsg(const UdpSvrCallBack& cb) { msgcb_ = cb; } + private: + EventBase* base_; + EventBases* bases_; + Ip4Addr addr_; + Channel*channel_; + UdpSvrCallBack msgcb_; + }; + + //Udp连接,使用引用计数 + struct UdpConn: public std::enable_shared_from_this, private noncopyable { + //Udp构造函数,实际可用的连接应当通过createConnection创建 + UdpConn(){}; + virtual ~UdpConn() {close();}; + static UdpConnPtr createConnection(EventBase* base, const std::string& host, short port); + //automatically managed context. allocated when first used, deleted when destruct + template T& context() { return ctx_.context(); } + + EventBase* getBase() { return base_; } + Channel* getChannel() { return channel_; } + + //发送数据 + void send(Buffer msg) { send(msg.data(), msg.size()); msg.clear(); } + void send(const char* buf, size_t len); + void send(const std::string& s) { send(s.data(), s.size()); } + void send(const char* s) { send(s, strlen(s)); } + void onMsg(const UdpCallBack& cb) { cb_ = cb; } + void close() { auto p = channel_; channel_=NULL; delete p; } + //远程地址的字符串 + std::string str() { return peer_.toString(); } + public: + EventBase* base_; + Channel* channel_; + Ip4Addr local_, peer_; + AutoContext ctx_; + std::string destHost_; + int destPort_; + UdpCallBack cb_; + }; + + typedef std::function RetMsgUdpCallBack; + //半同步半异步服务器 + struct HSHAU; + typedef std::shared_ptr HSHAUPtr; + struct HSHAU { + static HSHAUPtr startServer(EventBase* base, const std::string& host, short port, int threads); + HSHAU(int threads): threadPool_(threads) {} + void exit() {threadPool_.exit(); threadPool_.join(); } + void onMsg(const RetMsgUdpCallBack& cb); + UdpServerPtr server_; + ThreadPool threadPool_; + }; + + +} \ No newline at end of file