Skip to content

Commit

Permalink
udp support added
Browse files Browse the repository at this point in the history
  • Loading branch information
yedf committed Jun 5, 2016
1 parent e6f814b commit 2a08c8e
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ examples/idle-close
examples/reconnect
examples/safe-close
examples/timer
examples/udp-cli
examples/udp-svr
examples/udp-hsha
raw-examples/epoll
raw-examples/epoll-et
raw-examples/kqueue
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ int main(int argc, const char* argv[]) {
使用protobuf的消息encode/decode示例在protobuf下
###udp支持
支持udp,udp的客户端采用connect方式使用,类似tcp
###安装与使用
make && make install
Expand Down Expand Up @@ -92,7 +96,9 @@ int main(int argc, const char* argv[]) {
* stat.cc 一个简单的状态服务器示例,一个内嵌的http服务器,方便外部的工具查看应用程序的状态
* write-on-empty.cc 这个例子演示了需要写出大量数据,例如1G文件这种情景中的使用技巧
* daemon.cc 程序已以daemon方式启动,从conf文件中获取日志相关的配置,并初始化日志参数
* udp-cli.cc udp的客户端
* udp-svr.cc udp服务器
* udp-hsha.cc udp的半同步半异步服务器
license
====
Use of this source code is governed by a BSD-style
Expand Down
17 changes: 17 additions & 0 deletions examples/udp-cli.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// echo client
#include <handy/handy.h>
using namespace handy;

int main(int argc, const char* argv[]) {
setloglevel("TRACE");
EventBase base;
Signal::signal(SIGINT, [&]{ base.exit(); });
UdpConnPtr con = UdpConn::createConnection(&base, "127.0.0.1", 99);
exitif(!con, "start udp conn failed");
con->onMsg([](const UdpConnPtr& p, Buffer buf) {
info("udp recved %lu bytes", buf.size());
});
base.runAfter(0, [=](){con->send("hello");}, 1000);
base.loop();
}

31 changes: 31 additions & 0 deletions examples/udp-hsha.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#include <handy/handy.h>

using namespace std;
using namespace handy;

int main(int argc, const char* argv[]) {
setloglevel("TRACE");
EventBase base;
HSHAUPtr hsha = HSHAU::startServer(&base, "", 99, 1);
exitif(!hsha, "bind failed");
Signal::signal(SIGINT, [&, hsha]{ base.exit(); hsha->exit(); signal(SIGINT, SIG_DFL);});

hsha->onMsg([](const UdpServerPtr& con, const string& input, Ip4Addr addr){
int ms = rand() % 1000 + 500;
info("processing a msg: %.*s will using %d ms", (int)input.length(), input.data(), ms);
usleep(ms * 1000);
info("msg processed");
return util::format("%s used %d ms", input.c_str(), ms);
});
for (int i = 0; i < 1; i ++) {
UdpConnPtr con = UdpConn::createConnection(&base, "localhost", 99);
con->onMsg([](const UdpConnPtr& con, Buffer buf) {
info("%.*s recved", (int)buf.size(), buf.data());
con->close();
});
con->send("hello");
}
base.runAfter(500, [&,hsha]{info("exiting"); base.exit(); hsha->exit(); });
base.loop();
info("program exited");
}
16 changes: 16 additions & 0 deletions examples/udp-svr.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// echo server
#include <handy/handy.h>
using namespace handy;

int main(int argc, const char* argv[]) {
setloglevel("TRACE");
EventBase base;
Signal::signal(SIGINT, [&]{ base.exit(); });
UdpServerPtr svr = UdpServer::startServer(&base, "", 99);
exitif(!svr, "start udp server failed");
svr->onMsg([](const UdpServerPtr& p, Buffer buf, Ip4Addr peer) {
info("echo msg: %s to %s", buf.data(), peer.toString().c_str());
p->sendTo(buf, peer);
});
base.loop();
}
1 change: 0 additions & 1 deletion handy/conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

using namespace std;
namespace handy {
int a;

void handyUnregisterIdle(EventBase* base, const IdleId& idle);
void handyUpdateIdle(EventBase* base, const IdleId& idle);
Expand Down
1 change: 1 addition & 0 deletions handy/handy.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
#include "slice.h"
#include "threads.h"
#include "util.h"
#include "udp.h"
144 changes: 144 additions & 0 deletions handy/udp.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#include "udp.h"
#include <fcntl.h>

using namespace std;

namespace handy {

UdpServer::UdpServer(EventBases *bases):
base_(bases->allocBase()),
bases_(bases),
channel_(NULL)
{
}

int UdpServer::bind(const std::string &host, short port, bool reusePort) {
addr_ = Ip4Addr(host, port);
int fd = socket(AF_INET, SOCK_DGRAM, 0);
int r = net::setReuseAddr(fd);
fatalif(r, "set socket reuse option failed");
r = net::setReusePort(fd, reusePort);
fatalif(r, "set socket reuse port option failed");
r = util::addFdFlag(fd, FD_CLOEXEC);
fatalif(r, "addFdFlag FD_CLOEXEC failed");
r = ::bind(fd,(struct sockaddr *)&addr_.getAddr(),sizeof(struct sockaddr));
if (r) {
close(fd);
error("bind to %s failed %d %s", addr_.toString().c_str(), errno, strerror(errno));
return errno;
}
net::setNonBlock(fd);
info("udp fd %d bind to %s", fd, addr_.toString().c_str());
channel_ = new Channel(base_, fd, kReadEvent);
channel_->onRead([this]{
Buffer buf;
struct sockaddr_in raddr;
socklen_t rsz = sizeof(raddr);
if (!channel_ || channel_->fd() < 0) {
return;
}
int fd = channel_->fd();
ssize_t rn = recvfrom(fd, buf.makeRoom(kUdpPacketSize), buf.space(), 0, (sockaddr*)&raddr, &rsz);
if (rn < 0) {
error("udp %d recv failed: %d %s", fd, errno, strerror(errno));
return;
}
buf.addSize(rn);
trace("udp %d recv %ld bytes from %s", fd, rn, Ip4Addr(raddr).toString().data());
this->msgcb_(shared_from_this(), buf, raddr);
});
return 0;
}

UdpServerPtr UdpServer::startServer(EventBases* bases, const std::string& host, short port, bool reusePort) {
UdpServerPtr p(new UdpServer(bases));
int r = p->bind(host, port, reusePort);
if (r) {
error("bind to %s:%d failed %d %s", host.c_str(), port, errno, strerror(errno));
}
return r == 0 ? p : NULL;
}

void UdpServer::sendTo(const char* buf, size_t len, Ip4Addr addr) {
if (!channel_ || channel_->fd() < 0) {
warn("udp sending %lu bytes to %s after channel closed", len, addr.toString().data());
return;
}
int fd = channel_->fd();
int wn = ::sendto(fd, buf, len, 0, (sockaddr*)&addr.getAddr(), sizeof(sockaddr));
if (wn < 0) {
error("udp %d sendto %s error: %d %s", fd, addr.toString().c_str(), errno, strerror(errno));
return;
}
trace("udp %d sendto %s %d bytes", fd, addr.toString().c_str(), wn);
}

UdpConnPtr UdpConn::createConnection(EventBase* base, const string& host, short port) {
Ip4Addr addr(host, port);
int fd = socket(AF_INET, SOCK_DGRAM, 0);
fatalif(fd<0, "socket failed %d %s", errno, strerror(errno));
net::setNonBlock(fd);
int t = util::addFdFlag(fd, FD_CLOEXEC);
fatalif(t, "addFdFlag FD_CLOEXEC failed %d %s", t, strerror(t));
int r = ::connect(fd, (sockaddr *) &addr.getAddr(), sizeof(sockaddr_in));
if (r != 0 && errno != EINPROGRESS) {
error("connect to %s error %d %s", addr.toString().c_str(), errno, strerror(errno));
return NULL;
}
trace("udp fd %d connecting to %s ok", fd, addr.toString().data());
UdpConnPtr con(new UdpConn);
con->destHost_ = host;
con->destPort_ = port;
con->peer_ = addr;
Channel* ch = new Channel(base, fd, kReadEvent);
con->channel_ = ch;
ch->onRead([con]{
if (!con->channel_ || con->channel_->fd() < 0) {
return;
}
Buffer input;
int fd = con->channel_->fd();
int rn = ::read(fd, input.makeRoom(kUdpPacketSize), kUdpPacketSize);
if (rn < 0) {
error("udp read from %d error %d %s", fd, errno, strerror(errno));
return;
}
trace("udp %d read %d bytes", fd, rn);
input.addSize(rn);
con->cb_(con, input);
});
return con;
}

void UdpConn::send(const char *buf, size_t len) {
if (!channel_ || channel_->fd() < 0) {
warn("udp sending %lu bytes to %s after channel closed", len, peer_.toString().data());
return;
}
int fd = channel_->fd();
int wn = ::write(fd, buf, len);
if (wn < 0) {
error("udp %d write error %d %s", fd, errno, strerror(errno));
return;
}
trace("udp %d write %d bytes", fd, wn);
}

HSHAUPtr HSHAU::startServer(EventBase* base, const std::string& host, short port, int threads) {
HSHAUPtr p = HSHAUPtr(new HSHAU(threads));
p->server_ = UdpServer::startServer(base, host, port);
return p->server_ ? p : NULL;
}

void HSHAU::onMsg(const RetMsgUdpCallBack& cb) {
server_->onMsg([this, cb](const UdpServerPtr& con, Buffer buf, Ip4Addr addr) {
std::string input(buf.data(), buf.size());
threadPool_.addTask([=]{
std::string output = cb(con, input, addr);
server_->getBase()->safeCall([=] {if (output.size()) con->sendTo(output, addr); });
});
});
}


}
83 changes: 83 additions & 0 deletions handy/udp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#pragma once

#include "event_base.h"

namespace handy {

struct UdpServer;
struct UdpConn;
typedef std::shared_ptr<UdpConn> UdpConnPtr;
typedef std::shared_ptr<UdpServer> UdpServerPtr;
typedef std::function<void(const UdpConnPtr&, Buffer)> UdpCallBack;
typedef std::function<void(const UdpServerPtr&, Buffer, Ip4Addr)> UdpSvrCallBack;
const int kUdpPacketSize = 4096;
//Udp服务器
struct UdpServer : public std::enable_shared_from_this<UdpServer>, private noncopyable {
UdpServer(EventBases* bases);
//return 0 on sucess, errno on error
int bind(const std::string& host, short port, bool reusePort=false);
static UdpServerPtr startServer(EventBases* bases, const std::string& host, short port, bool reusePort=false);
~UdpServer() { delete channel_; }
Ip4Addr getAddr() { return addr_; }
EventBase* getBase() { return base_; }
void sendTo(Buffer msg, Ip4Addr addr) { sendTo(msg.data(), msg.size(), addr); msg.clear(); }
void sendTo(const char* buf, size_t len, Ip4Addr addr);
void sendTo(const std::string& s, Ip4Addr addr) { sendTo(s.data(), s.size(), addr); }
void sendTo(const char* s, Ip4Addr addr) { sendTo(s, strlen(s), addr); }

//消息的处理
void onMsg(const UdpSvrCallBack& cb) { msgcb_ = cb; }
private:
EventBase* base_;
EventBases* bases_;
Ip4Addr addr_;
Channel*channel_;
UdpSvrCallBack msgcb_;
};

//Udp连接,使用引用计数
struct UdpConn: public std::enable_shared_from_this<UdpConn>, private noncopyable {
//Udp构造函数,实际可用的连接应当通过createConnection创建
UdpConn(){};
virtual ~UdpConn() {close();};
static UdpConnPtr createConnection(EventBase* base, const std::string& host, short port);
//automatically managed context. allocated when first used, deleted when destruct
template<class T> T& context() { return ctx_.context<T>(); }

EventBase* getBase() { return base_; }
Channel* getChannel() { return channel_; }

//发送数据
void send(Buffer msg) { send(msg.data(), msg.size()); msg.clear(); }
void send(const char* buf, size_t len);
void send(const std::string& s) { send(s.data(), s.size()); }
void send(const char* s) { send(s, strlen(s)); }
void onMsg(const UdpCallBack& cb) { cb_ = cb; }
void close() { auto p = channel_; channel_=NULL; delete p; }
//远程地址的字符串
std::string str() { return peer_.toString(); }
public:
EventBase* base_;
Channel* channel_;
Ip4Addr local_, peer_;
AutoContext ctx_;
std::string destHost_;
int destPort_;
UdpCallBack cb_;
};

typedef std::function<std::string (const UdpServerPtr&, const std::string&, Ip4Addr)> RetMsgUdpCallBack;
//半同步半异步服务器
struct HSHAU;
typedef std::shared_ptr<HSHAU> HSHAUPtr;
struct HSHAU {
static HSHAUPtr startServer(EventBase* base, const std::string& host, short port, int threads);
HSHAU(int threads): threadPool_(threads) {}
void exit() {threadPool_.exit(); threadPool_.join(); }
void onMsg(const RetMsgUdpCallBack& cb);
UdpServerPtr server_;
ThreadPool threadPool_;
};


}

0 comments on commit 2a08c8e

Please sign in to comment.