Skip to content

Commit

Permalink
hsha ok
Browse files Browse the repository at this point in the history
  • Loading branch information
yedf committed Jun 20, 2015
1 parent 38b15d3 commit cf7a86d
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 30 deletions.
17 changes: 15 additions & 2 deletions doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ Handy是一个简洁高效的C++11网络库,支持linux与mac平台,使用
[EventBase事件分发器](#event-base)
[tcp连接](#tcp-conn)
[tcp服务器](#tcp-server)
[http服务器](#http-server)
[http服务器](#http-server)
[半同步半异步服务器](#hsha)
<h2 id="sample">使用示例--echo</h2>
```c
#include <handy/handy.h>
Expand Down Expand Up @@ -165,4 +166,16 @@ sample.onGet("/hello", [](const HttpConnPtr& con) {
con.sendResponse(resp);
});
```
持续更新中......
<h2 id="hsha">半同步半异步服务器</h2>
```c
//cb返回空string,表示无需返回数据。如果用户需要更灵活的控制,可以直接操作cb的con参数
void onMsg(CodecBase* codec, const RetMsgCallBack& cb);

hsha.onMsg(new LineCodec, [](const TcpConnPtr& con, const string& input){
int ms = rand() % 1000;
info("processing a msg");
usleep(ms * 1000);
return util::format("%s used %d ms", input.c_str(), ms);
});
```
持续更新中......
42 changes: 22 additions & 20 deletions examples/hsha.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,33 @@
using namespace std;
using namespace handy;


int main(int argc, const char* argv[]) {
setloglevel("TRACE");
EventBase base;
ThreadPool workers(4);
Signal::signal(SIGINT, [&]{ base.exit(); workers.exit(); signal(SIGINT, SIG_DFL);});

TcpServer hsha(&base);
HSHA hsha(&base, 4);
int r = hsha.bind("", 99);
exitif(r, "bind failed %d %s", errno, strerror(errno));
hsha.onConnCreate([&]{
TcpConnPtr con(new TcpConn);
con->onMsg(new LineCodec, [&] (const TcpConnPtr& con, Slice msg) {
string s(msg);
workers.addTask([s, con, &base] {
int ms = rand() % 1000;
usleep(ms * 1000);
base.safeCall([s, con, ms] {
con->sendMsg(util::format("%s used %d ms", s.c_str(), ms));
});
});
});
return con;
exitif(r, "bind failed");
Signal::signal(SIGINT, [&]{ base.exit(); hsha.exit(); signal(SIGINT, SIG_DFL);});

hsha.onMsg(new LineCodec, [](const TcpConnPtr& con, const string& input){
int ms = rand() % 1000;
info("processing a msg");
usleep(ms * 1000);
return util::format("%s used %d ms", input.c_str(), ms);
});
for (int i = 0; i < 5; i ++) {
TcpConnPtr con = TcpConn::createConnection(&base, "localhost", 99);
con->onMsg(new LineCodec, [](const TcpConnPtr& con, Slice msg) {
info("%.*s recved", (int)msg.size(), msg.data());
con->close();
});
con->onState([](const TcpConnPtr& con) {
if (con->getState() == TcpConn::Connected) {
con->sendMsg("hello");
}
});
}
base.runAfter(1000, [&]{base.exit(); hsha.exit(); });
base.loop();
workers.join();
info("program exited");
}
11 changes: 7 additions & 4 deletions handy/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@ struct CodecBase {
// < 0 解析错误
virtual int tryDecode(Slice data, Slice& msg) = 0;
virtual void encode(Slice msg, Buffer& buf) = 0;
virtual CodecBase* clone() = 0;
};

//以\r\n结尾的消息
struct LineCodec: public CodecBase{
virtual int tryDecode(Slice data, Slice& msg);
virtual void encode(Slice msg, Buffer& buf);
int tryDecode(Slice data, Slice& msg) override;
void encode(Slice msg, Buffer& buf) override;
CodecBase* clone() override { return new LineCodec(); }
};

//给出长度的消息
struct LengthCodec:public CodecBase {
virtual int tryDecode(Slice data, Slice& msg);
virtual void encode(Slice msg, Buffer& buf);
int tryDecode(Slice data, Slice& msg) override;
void encode(Slice msg, Buffer& buf) override;
CodecBase* clone() override { return new LengthCodec(); }
};

};
14 changes: 14 additions & 0 deletions handy/conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ void TcpConn::send(const char* buf, size_t len) {
}

void TcpConn::onMsg(CodecBase* codec, const MsgCallBack& cb) {
assert(!readcb_);
codec_.reset(codec);
onRead([cb](const TcpConnPtr& con) {
int r = 1;
Expand Down Expand Up @@ -306,6 +307,9 @@ void TcpServer::handleAccept() {
if (readcb_) {
con->onRead(readcb_);
}
if (msgcb_) {
con->onMsg(codec_->clone(), msgcb_);
}
};
if (b == base_) {
addcon();
Expand All @@ -318,4 +322,14 @@ void TcpServer::handleAccept() {
}
}

void HSHA::onMsg(CodecBase* codec, const RetMsgCallBack& cb) {
server_.onConnMsg(codec, [this, cb](const TcpConnPtr& con, Slice msg) {
std::string input = msg;
threadPool_.addTask([=]{
std::string output = cb(con, input);
server_.getBase()->safeCall([=] {if (output.size()) con->sendMsg(output); });
});
});
}

}
23 changes: 20 additions & 3 deletions handy/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ namespace handy {
void send(const char* s) { send(s, strlen(s)); }

//数据到达时回调
void onRead(const TcpCallBack& cb) { readcb_ = cb; };
void onRead(const TcpCallBack& cb) { assert(!readcb_); readcb_ = cb; };
//当tcp缓冲区可写时回调
void onWritable(const TcpCallBack& cb) { writablecb_ = cb;}
//tcp状态改变时回调
void onState(const TcpCallBack& cb) { statecb_ = cb; }
//tcp空闲回调
void addIdleCB(int idle, const TcpCallBack& cb);

//消息回调,此回调与onRead回调只有一个生效,后设置的生效
//消息回调,此回调与onRead回调冲突,只能够调用一个
//codec所有权交给onMsg
void onMsg(CodecBase* codec, const MsgCallBack& cb);
//发送消息
Expand Down Expand Up @@ -89,16 +89,33 @@ namespace handy {
int bind(const std::string& host, short port);
~TcpServer() { delete listen_channel_; }
Ip4Addr getAddr() { return addr_; }
EventBase* getBase() { return base_; }
void onConnCreate(const std::function<TcpConnPtr()>& cb) { createcb_ = cb; }
void onConnRead(const TcpCallBack& cb) { readcb_ = cb; }
void onConnRead(const TcpCallBack& cb) { readcb_ = cb; assert(!msgcb_); }
// 消息处理与Read回调冲突,只能调用一个
void onConnMsg(CodecBase* codec, const MsgCallBack& cb) { codec_.reset(codec); msgcb_ = cb; assert(!readcb_); }
private:
EventBase* base_;
EventBases* bases_;
Ip4Addr addr_;
Channel* listen_channel_;
TcpCallBack readcb_;
MsgCallBack msgcb_;
std::function<TcpConnPtr()> createcb_;
std::unique_ptr<CodecBase> codec_;
void handleAccept();
};

typedef std::function<std::string (const TcpConnPtr&, const std::string& msg)> RetMsgCallBack;
//半同步半异步服务器
struct HSHA {
HSHA(EventBase* base, int threads): server_(base), threadPool_(threads) {}
int bind(const std::string& host, short port) { return server_.bind(host, port); }
void exit() {threadPool_.exit(); threadPool_.join(); }
void onMsg(CodecBase* codec, const RetMsgCallBack& cb);
TcpServer server_;
ThreadPool threadPool_;
};


}
2 changes: 2 additions & 0 deletions handy/threads.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "threads.h"
#include <assert.h>
#include <utility>
using namespace std;

Expand All @@ -15,6 +16,7 @@ tasks_(maxWaiting), threads_(threads)
}

ThreadPool::~ThreadPool() {
assert(tasks_.exited());
if (tasks_.size()) {
fprintf(stderr, "%lu tasks not processed when thread pool exited\n",
tasks_.size());
Expand Down
4 changes: 3 additions & 1 deletion protobuf/test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ int main() {
testencode();

EventBase base;
TcpServer echo(&base, "", 99);
TcpServer echo(&base);
int r = echo.bind("", 99);
exitif(r, "bind failed %d %s", errno, strerror(errno));
ProtoMsgDispatcher dispatch;
echo.onConnRead(
[&](TcpConnPtr con) {
Expand Down

0 comments on commit cf7a86d

Please sign in to comment.