Skip to content

Commit

Permalink
add ban logic to socket pool
Browse files Browse the repository at this point in the history
  • Loading branch information
astrozhou committed Aug 8, 2019
1 parent c90c2c2 commit f4ac301
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
4 changes: 2 additions & 2 deletions mars/comm/verinfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
#ifndef Mars_verinfo_h
#define Mars_verinfo_h

#define MARS_REVISION "450ea52"
#define MARS_REVISION "c90c2c2"
#define MARS_PATH "feature/20190724_shortlink_keepalive"
#define MARS_URL ""
#define MARS_BUILD_TIME "2019-08-05 17:25:49"
#define MARS_BUILD_TIME "2019-08-08 14:33:13"
#define MARS_TAG ""

#endif
8 changes: 2 additions & 6 deletions mars/stn/src/shortlink_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,7 @@ void ShortLinkTaskManager::__OnResponse(ShortLinkInterface* _worker, ErrCmdType
if(_worker->IsKeepAlive() && _conn_profile.socket_fd != INVALID_SOCKET) {
if(_err_type != kEctOK) {
close(_conn_profile.socket_fd);
if(_conn_profile.is_reused_fd) {
socket_pool_.Report(false, false);
}
socket_pool_.Report(_conn_profile.is_reused_fd, false, false);
} else if(_conn_profile.ip_index >=0 && _conn_profile.ip_index < (int)_conn_profile.ip_items.size()) {
IPPortItem item = _conn_profile.ip_items[_conn_profile.ip_index];
CacheSocketItem cacheItem(item, _conn_profile.socket_fd, _conn_profile.keepalive_timeout);
Expand Down Expand Up @@ -386,9 +384,7 @@ void ShortLinkTaskManager::__OnResponse(ShortLinkInterface* _worker, ErrCmdType
int err_code = 0;
int handle_type = Buf2Resp(it->task.taskid, it->task.user_context, _body, _extension, err_code, Task::kChannelShort);
xinfo2(TSF"err_code %_ ",err_code);
if(_worker->IsKeepAlive() && _conn_profile.socket_fd != INVALID_SOCKET && _conn_profile.is_reused_fd) {
socket_pool_.Report(true, handle_type==kTaskFailHandleNoError);
}
socket_pool_.Report(_conn_profile.is_reused_fd, true, handle_type==kTaskFailHandleNoError);

switch(handle_type){
case kTaskFailHandleNoError:
Expand Down
22 changes: 18 additions & 4 deletions mars/stn/src/socket_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
#include "mars/comm/xlogger/xlogger.h"
#include "mars/comm/thread/mutex.h"
#include "mars/comm/thread/lock.h"
#include "mars/comm/tickcount.h"

namespace mars {
namespace stn {

#define BAN_INTERVAL (5*60*1000)

class CacheSocketItem {
public:
CacheSocketItem(const IPPortItem& _item, SOCKET _fd, uint32_t _timeout)
Expand All @@ -57,14 +60,14 @@ namespace stn {
public:
const int DEFAULT_MAX_KEEPALIVE_TIME = 5*1000; //same as apache default

SocketPool():use_cache_(true) {}
SocketPool():use_cache_(true), is_baned_(false) {}
~SocketPool() {
Clear();
}

SOCKET GetSocket(const IPPortItem& _item) {
ScopedLock lock(mutex_);
if(!use_cache_)
if(!use_cache_ || _isBaned())
return INVALID_SOCKET;

auto iter = socket_pool_.begin();
Expand Down Expand Up @@ -119,11 +122,20 @@ namespace stn {
socket_pool_.clear();
}

void Report(bool hasReceived, bool isDecodeOk) {

void Report(bool isReused, bool hasReceived, bool isDecodeOk) {
if(isReused && (!hasReceived || !isDecodeOk)) {
ban_start_tick_.gettickcount();
is_baned_ = true;
} else if(isReused && hasReceived && isDecodeOk) {
is_baned_ = false;
}
}

private:
bool _isBaned() {
return is_baned_ && ban_start_tick_.isValid() && ban_start_tick_.gettickspan() <= BAN_INTERVAL;
}

bool _IsSocketClosed(SOCKET fd) {
char buff[2];
int tryCount = 0;
Expand Down Expand Up @@ -154,6 +166,8 @@ namespace stn {
Mutex mutex_;
bool use_cache_;
std::vector<CacheSocketItem> socket_pool_;
bool is_baned_;
tickcount_t ban_start_tick_;
};

}
Expand Down

0 comments on commit f4ac301

Please sign in to comment.