Skip to content

Commit

Permalink
reconnect ok
Browse files Browse the repository at this point in the history
  • Loading branch information
yedf committed Aug 24, 2015
1 parent 283594f commit 199cb29
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 47 deletions.
17 changes: 6 additions & 11 deletions examples/codec-cli.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,21 @@
using namespace std;
using namespace handy;

void reconnect2(EventBase* base, string host, short port) {
TcpConnPtr con = TcpConn::createConnection(base, host, port);
int main(int argc, const char* argv[]) {
setloglevel("TRACE");
EventBase base;
Signal::signal(SIGINT, [&]{ base.exit(); });
TcpConnPtr con = TcpConn::createConnection(&base, "127.0.0.1", 99, 3000);
con->setReconnectInterval(3000);
con->onMsg(new LengthCodec, [](const TcpConnPtr& con, Slice msg) {
info("recv msg: %.*s", (int)msg.size(), msg.data());
});
con->onState([=](const TcpConnPtr& con) {
info("onState called state: %d", con->getState());
if (con->getState() == TcpConn::Connected) {
con->sendMsg("hello");
} else if (con->getState() == TcpConn::Closed || con->getState() == TcpConn::Failed) {
base->runAfter(3000, [=] { reconnect2(base, host, port); });
}
});
}

int main(int argc, const char* argv[]) {
setloglevel("TRACE");
EventBase base;
Signal::signal(SIGINT, [&]{ base.exit(); });
reconnect2(&base, "localhost", 99);
base.loop();
info("program exited");
}
61 changes: 34 additions & 27 deletions handy/conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ void handyUpdateIdle(EventBase* base, const IdleId& idle);

void TcpConn::attach(EventBase* base, int fd, Ip4Addr local, Ip4Addr peer)
{
fatalif((!isClient_ && state_ != State::Invalid) || (isClient_ && state_ != State::Handshaking),
fatalif((destPort_<=0 && state_ != State::Invalid) || (destPort_>=0 && state_ != State::Handshaking),
"you should use a new TcpConn to attach. state: %d", state_);
base_ = base;
state_ = State::Handshaking;
local_ = local;
peer_ = peer;
if (channel_) { delete channel_; }
channel_ = new Channel(base, fd, kWriteEvent|kReadEvent);
trace("tcp constructed %s - %s fd: %d",
local_.toString().c_str(),
Expand All @@ -29,48 +31,48 @@ void TcpConn::attach(EventBase* base, int fd, Ip4Addr local, Ip4Addr peer)
con->channel_->onWrite([=] { con->handleWrite(con); });
}

int TcpConn::connect(EventBase* base, const string& host, short port, int timeout, const string& localip) {
fatalif(state_ != State::Invalid, "you should use a new TcpConn to connect. state: %d", state_);
isClient_ = true;
void TcpConn::connect(EventBase* base, const string& host, short port, int timeout, const string& localip) {
fatalif(state_ != State::Invalid && state_ != State::Closed && state_ != State::Failed,
"current state is bad state to connect. state: %d", state_);
destHost_ = host;
destPort_ = port;
connectTimeout_ = timeout;
localIp_ = localip;
Ip4Addr addr(host, port);
int fd = socket(AF_INET, SOCK_STREAM, 0);
fatalif(fd<0, "socket failed %d %s", errno, strerror(errno));
net::setNonBlock(fd);
int t = util::addFdFlag(fd, FD_CLOEXEC);
fatalif(t, "addFdFlag FD_CLOEXEC failed %d %s", t, strerror(t));
int r = 0;
if (localip.size()) {
Ip4Addr addr(localip, 0);
int r = ::bind(fd,(struct sockaddr *)&addr.getAddr(),sizeof(struct sockaddr));
if (r) {
error("bind to %s failed error %d %s", addr.toString().c_str(), errno, strerror(errno));
::close(fd);
return errno;
}
r = ::bind(fd,(struct sockaddr *)&addr.getAddr(),sizeof(struct sockaddr));
error("bind to %s failed error %d %s", addr.toString().c_str(), errno, strerror(errno));
}
int r = ::connect(fd, (sockaddr*)&addr.getAddr(), sizeof (sockaddr_in));
if (r != 0 && errno != EINPROGRESS) {
error("connect to %s error %d %s", addr.toString().c_str(), errno, strerror(errno));
::close(fd);
return errno;
if (r == 0) {
r = ::connect(fd, (sockaddr *) &addr.getAddr(), sizeof(sockaddr_in));
if (r != 0 && errno != EINPROGRESS) {
error("connect to %s error %d %s", addr.toString().c_str(), errno, strerror(errno));
}
}

sockaddr_in local;
socklen_t alen = sizeof(local);
r = getsockname(fd, (sockaddr*)&local, &alen);
if (r < 0) {
error("getsockname failed %d %s", errno, strerror(errno));
::close(fd);
return errno;
if (r == 0) {
r = getsockname(fd, (sockaddr *) &local, &alen);
if (r < 0) {
error("getsockname failed %d %s", errno, strerror(errno));
}
}
state_ = State::Handshaking;
attach(base, fd, Ip4Addr(local), addr);
if (timeout) {
TcpConnPtr con = shared_from_this();
base->runAfter(timeout, [con] {
timeoutId_ = base->runAfter(timeout, [con] {
if (con->getState() == Handshaking) { con->channel_->close(); }
});
}
return 0;
}

void TcpConn::close() {
Expand All @@ -92,17 +94,22 @@ void TcpConn::cleanup(const TcpConnPtr& con) {
trace("tcp closing %s - %s fd %d %d",
local_.toString().c_str(),
peer_.toString().c_str(),
channel_->fd(), errno);
for (auto& idle: idleIds_) {
handyUnregisterIdle(getBase(), idle);
}
channel_ ? channel_->fd(): -1, errno);
getBase()->cancel(timeoutId_);
if (statecb_) {
statecb_(con);
}
if (reconnectInterval_ >= 0 && !getBase()->exited()) { //reconnect
reconnect();
return;
}
for (auto& idle: idleIds_) {
handyUnregisterIdle(getBase(), idle);
}
//channel may have hold TcpConnPtr, set channel_ to NULL before delete
readcb_ = writablecb_ = statecb_ = nullptr;
Channel* ch = channel_;
channel_ = NULL;
readcb_ = writablecb_ = statecb_ = nullptr;
delete ch;
}

Expand Down
17 changes: 12 additions & 5 deletions handy/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ namespace handy {
//可传入连接类型,返回智能指针
template<class C=TcpConn> static TcpConnPtr createConnection(EventBase* base, const std::string& host, short port,
int timeout=0, const std::string& localip="") {
TcpConnPtr con(new C); con->isClient_ = true; return con->connect(base, host, port, timeout, localip) ? NULL : con;
TcpConnPtr con(new C); con->connect(base, host, port, timeout, localip); return con;
}
template<class C=TcpConn> static TcpConnPtr createConnection(EventBase* base, int fd, Ip4Addr local, Ip4Addr peer) {
TcpConnPtr con(new C); con->attach(base, fd, local, peer); return con;
}

bool isClient() { return isClient_; }
bool isClient() { return destPort_ > 0; }
//automatically managed context. allocated when first used, deleted when destruct
template<class T> T& context() { return ctx_.context<T>(); }

EventBase* getBase() { return channel_ ? channel_->getBase() : NULL; }
EventBase* getBase() { return base_; }
State getState() { return state_; }
//TcpConn的输入输出缓冲区
Buffer& getInput() { return input_; }
Expand Down Expand Up @@ -56,27 +56,34 @@ namespace handy {

//conn会在下个事件周期进行处理
void close();
//设置重连时间间隔,-1: 不重连,0:立即重连,其它:等待毫秒数,未设置不重连
void setReconnectInterval(int milli) { reconnectInterval_ = milli; }

//!慎用。立即关闭连接,清理相关资源,可能导致该连接的引用计数变为0,从而使当前调用者引用的连接被析构
void closeNow() { if (channel_) channel_->close(); }

//远程地址的字符串
std::string str() { return peer_.toString(); }

public:
EventBase* base_;
Channel* channel_;
Buffer input_, output_;
Ip4Addr local_, peer_;
State state_;
TcpCallBack readcb_, writablecb_, statecb_;
std::list<IdleId> idleIds_;
TimerId timeoutId_;
AutoContext ctx_, internalCtx_;
bool isClient_;
std::string destHost_, localIp_;
int destPort_, connectTimeout_, reconnectInterval_;
std::unique_ptr<CodecBase> codec_;
void handleRead(const TcpConnPtr& con);
void handleWrite(const TcpConnPtr& con);
ssize_t isend(const char* buf, size_t len);
void cleanup(const TcpConnPtr& con);
int connect(EventBase* base, const std::string& host, short port, int timeout, const std::string& localip);
void connect(EventBase* base, const std::string& host, short port, int timeout, const std::string& localip);
void reconnect();
void attach(EventBase* base, int fd, Ip4Addr local, Ip4Addr peer);
virtual int readImp(int fd, void* buf, size_t bytes) { return ::read(fd, buf, bytes); }
virtual int writeImp(int fd, const void* buf, size_t bytes) { return ::write(fd, buf, bytes); }
Expand Down
27 changes: 25 additions & 2 deletions handy/event_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct EventsImp {
std::map<TimerId, Task> timers_;
std::atomic<int64_t> timerSeq_;
std::map<int, std::list<IdleNode>> idleConns_;
std::set<TcpConnPtr> reconnectConns_;
bool idleEnabled;

EventsImp(EventBase* base, int taskCap);
Expand All @@ -61,7 +62,7 @@ struct EventsImp {
void repeatableTimeout(TimerRepeatable* tr);

//eventbase functions
EventBase& exit() { exit_ = true; wakeup(); return *base_; }
EventBase& exit();
bool exited() { return exit_; }
void safeCall(Task&& task) { tasks_.push(move(task)); wakeup(); }
void loop() { while (!exit_) loop_once(10000); loop_once(0); }
Expand Down Expand Up @@ -132,6 +133,17 @@ void EventsImp::init() {
});
}

EventBase& EventsImp::exit() {
exit_ = true;
wakeup();
timerReps_.clear();
timers_.clear();
idleConns_.clear();
for (auto recon: reconnectConns_) { //重连的连接无法通过channel清理,因此单独清理
recon->cleanup(recon);
}
return *base_; }

void EventsImp::handleTimeouts() {
int64_t now = util::timeMilli();
TimerId tid { now, 1L<<62 };
Expand Down Expand Up @@ -323,7 +335,7 @@ void handyUpdateIdle(EventBase* base, const IdleId& idle) {
}

TcpConn::TcpConn()
:channel_(NULL), state_(State::Invalid), isClient_(false)
:base_(NULL), channel_(NULL), state_(State::Invalid), destPort_(-1), connectTimeout_(0), reconnectInterval_(-1)
{
}

Expand All @@ -338,4 +350,15 @@ void TcpConn::addIdleCB(int idle, const TcpCallBack& cb) {
}
}

void TcpConn::reconnect() {
auto con = shared_from_this();
getBase()->imp_->reconnectConns_.insert(con);
getBase()->runAfter(reconnectInterval_, [this, con]() {
getBase()->imp_->reconnectConns_.erase(con);
connect(getBase(), destHost_, (short)destPort_, connectTimeout_, localIp_);
});
delete channel_;
channel_ = NULL;
}

}
4 changes: 2 additions & 2 deletions handy/http.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void HttpConnPtr::onHttpMsg(const HttpCallBack& cb) const {
}

void HttpConnPtr::handleRead(const HttpCallBack& cb) const {
if (!tcp->isClient_) { //server
if (!tcp->isClient()) { //server
HttpRequest& req = getRequest();
HttpMsg::Result r = req.tryDecode(tcp->getInput());
if (r == HttpMsg::Error) {
Expand Down Expand Up @@ -211,7 +211,7 @@ void HttpConnPtr::handleRead(const HttpCallBack& cb) const {
}

void HttpConnPtr::clearData() const {
if (tcp->isClient_) {
if (tcp->isClient()) {
tcp->getInput().consume(getResponse().getByte());
getResponse().clear();
} else {
Expand Down

0 comments on commit 199cb29

Please sign in to comment.