Skip to content

Commit

Permalink
add GetClusterId interface
Browse files Browse the repository at this point in the history
Change-Id: I6a8d676260b37277ccbd882ae7962a34169f6c5e
  • Loading branch information
wu-hanqing committed Oct 16, 2019
1 parent 757c224 commit d590f4f
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 0 deletions.
5 changes: 5 additions & 0 deletions curvefs_python/curvefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class _object:
CURVE_ERROR_UNKNOWN = _curvefs.CURVE_ERROR_UNKNOWN
CURVE_OP_READ = _curvefs.CURVE_OP_READ
CURVE_OP_WRITE = _curvefs.CURVE_OP_WRITE
CLUSTERIDMAX = _curvefs.CLUSTERIDMAX
class AioContext_t(_object):
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, AioContext_t, name, value)
Expand Down Expand Up @@ -358,6 +359,10 @@ def Rmdir(dirpath, info):
def UnInit():
return _curvefs.UnInit()
UnInit = _curvefs.UnInit

def GetClusterId(buf=None, len=0):
return _curvefs.GetClusterId(buf, len)
GetClusterId = _curvefs.GetClusterId
# This file is compatible with both classic and new-style classes.


54 changes: 54 additions & 0 deletions curvefs_python/curvefs_wrap.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -5600,6 +5600,58 @@ SWIGINTERN PyObject *_wrap_UnInit(PyObject *SWIGUNUSEDPARM(self), PyObject *args
}


SWIGINTERN PyObject *_wrap_GetClusterId(PyObject *SWIGUNUSEDPARM(self), PyObject *args) {
PyObject *resultobj = 0;
char *arg1 = (char *) nullptr ;
int arg2 = (int) 0 ;
int res1 ;
char *buf1 = 0 ;
int alloc1 = 0 ;
int val2 ;
int ecode2 = 0 ;
PyObject * obj0 = 0 ;
PyObject * obj1 = 0 ;
int result;
int retlen;

if (!PyArg_ParseTuple(args,(char *)"|OO:GetClusterId",&obj0,&obj1)) SWIG_fail;
if (obj0) {
res1 = SWIG_AsCharPtrAndSize(obj0, &buf1, NULL, &alloc1);
if (!SWIG_IsOK(res1)) {
SWIG_exception_fail(SWIG_ArgError(res1), "in method '" "GetClusterId" "', argument " "1"" of type '" "char *""'");
}
arg1 = reinterpret_cast< char * >(buf1);
}
if (obj1) {
ecode2 = SWIG_AsVal_int(obj1, &val2);
if (!SWIG_IsOK(ecode2)) {
SWIG_exception_fail(SWIG_ArgError(ecode2), "in method '" "GetClusterId" "', argument " "2"" of type '" "int""'");
}
arg2 = static_cast< int >(val2);
}
if (arg2 == 0) {
arg2 = CLUSTERIDMAX;
}
arg1 = new char[arg2];
result = (int)GetClusterId(arg1,arg2);
if (result < 0 ) {
resultobj = SWIG_From_int(static_cast< int >(result));
if (alloc1 == SWIG_NEWOBJ) delete[] buf1;
delete[] arg1;
return resultobj;
}
retlen = strlen(arg1);
resultobj = SWIG_Python_AppendOutput(resultobj, SWIG_FromCharPtrAndSize(arg1, retlen));
if (alloc1 == SWIG_NEWOBJ) delete[] buf1;
delete[] arg1;
return resultobj;
fail:
if (alloc1 == SWIG_NEWOBJ) delete[] buf1;
delete[] arg1;
return NULL;
}


static PyMethodDef SwigMethods[] = {
{ (char *)"SWIG_PyInstanceMethod_New", (PyCFunction)SWIG_PyInstanceMethod_New, METH_O, NULL},
{ (char *)"AioContext_t_offset_set", _wrap_AioContext_t_offset_set, METH_VARARGS, NULL},
Expand Down Expand Up @@ -5674,6 +5726,7 @@ static PyMethodDef SwigMethods[] = {
{ (char *)"Mkdir", _wrap_Mkdir, METH_VARARGS, NULL},
{ (char *)"Rmdir", _wrap_Rmdir, METH_VARARGS, NULL},
{ (char *)"UnInit", _wrap_UnInit, METH_VARARGS, NULL},
{ (char *)"GetClusterId", _wrap_GetClusterId, METH_VARARGS, NULL},
{ NULL, NULL, 0, NULL }
};

Expand Down Expand Up @@ -6472,6 +6525,7 @@ SWIG_init(void) {
SWIG_Python_SetConstant(d, "CURVE_ERROR_UNKNOWN",SWIG_From_int(static_cast< int >(100)));
SWIG_Python_SetConstant(d, "CURVE_OP_READ",SWIG_From_int(static_cast< int >(0)));
SWIG_Python_SetConstant(d, "CURVE_OP_WRITE",SWIG_From_int(static_cast< int >(1)));
SWIG_Python_SetConstant(d, "CLUSTERIDMAX",SWIG_From_int(static_cast< int >(256)));
#if PY_VERSION_HEX >= 0x03000000
return m;
#else
Expand Down
4 changes: 4 additions & 0 deletions curvefs_python/libcurvefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ extern "C" {
#define CURVE_OP_READ 0
#define CURVE_OP_WRITE 1

#define CLUSTERIDMAX 256


typedef void (*AioCallBack)(struct AioContext* context);
typedef struct AioContext {
Expand Down Expand Up @@ -141,6 +143,8 @@ int Rmdir(const char* dirpath, UserInfo_t* info);

void UnInit();

int GetClusterId(char* buf = nullptr, int len = 0);

#ifdef __cplusplus
}
#endif
Expand Down
8 changes: 8 additions & 0 deletions include/client/libcurve.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ int Close(int fd);

void UnInit();

/**
* @brief: 获取集群id, id用UUID标识
* @param: buf存放集群id
* @param: buf的长度
* @return: 成功返回0, 否则返回-LIBCURVE_ERROR::FAILED
*/
int GetClusterId(char* buf, int len);

#ifdef __cplusplus
}
#endif
Expand Down
5 changes: 5 additions & 0 deletions src/client/client_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ struct ChunkServerAddr {
return addr_ == other.addr_;
}
};

struct ClusterContext {
std::string clusterId;
};

} // namespace client
} // namespace curve

Expand Down
37 changes: 37 additions & 0 deletions src/client/libcurve_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,34 @@ bool FileClient::CheckAligned(off_t offset, size_t length) {
(length % IO_ALIGNED_BLOCK_SIZE == 0);
}

int FileClient::GetClusterId(char* buf, int len) {
if (mdsClient_ == nullptr) {
LOG(ERROR) << "global mds client not inited!";
return -LIBCURVE_ERROR::FAILED;
}

if (buf == nullptr) {
LOG(ERROR) << "invalid argument: buffer is nullptr";
return -LIBCURVE_ERROR::FAILED;
}

ClusterContext clsctx;
int ret = mdsClient_->GetClusterInfo(&clsctx);
if (ret == LIBCURVE_ERROR::OK) {
if (len >= clsctx.clusterId.size() + 1) {
snprintf(buf, len, "%s", clsctx.clusterId.c_str());
return LIBCURVE_ERROR::OK;
}

LOG(ERROR) << "buffer length is too small, "
<< "cluster id length is "
<< clsctx.clusterId.size() + 1;
return -LIBCURVE_ERROR::FAILED;
}

return -LIBCURVE_ERROR::FAILED;
}

} // namespace client
} // namespace curve

Expand Down Expand Up @@ -752,6 +780,15 @@ void UnInit() {
GlobalUnInit();
}

int GetClusterId(char* buf, int len) {
if (globalclient == nullptr) {
LOG(ERROR) << "not inited!";
return -LIBCURVE_ERROR::FAILED;
}

return globalclient->GetClusterId(buf, len);
}

int GlobalInit(const char* path) {
int ret = 0;
if (globalclientinited_) {
Expand Down
8 changes: 8 additions & 0 deletions src/client/libcurve_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ class FileClient {
*/
virtual void UnInit();

/**
* @brief: 获取集群id
* @param: buf存放集群id
* @param: buf的长度
* @return: 成功返回0, 失败返回-LIBCURVE_ERROR::FAILED
*/
int GetClusterId(char* buf, int len);

private:
inline bool CheckAligned(off_t offset, size_t length);

Expand Down
71 changes: 71 additions & 0 deletions src/client/mds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,77 @@ LIBCURVE_ERROR MDSClient::GetServerList(const LogicPoolID& logicalpooid,
return LIBCURVE_ERROR::FAILED;
}

LIBCURVE_ERROR MDSClient::GetClusterInfo(ClusterContext* clsctx) {
// 记录当前mds重试次数
int count = 0;
// 记录重试中timeOut次数
int timeOutTimes = 0;
// 记录还没重试的mds addr数量
int mdsAddrleft = metaServerOpt_.metaaddrvec.size() - 1;

while (count < metaServerOpt_.rpcRetryTimes) {
brpc::Controller cntl;
curve::mds::topology::GetClusterInfoResponse response;

{
std::unique_lock<bthread::Mutex> lk(mutex_);
mdsClientBase_.GetClusterInfo(&response, &cntl, channel_);
}

if (cntl.Failed()) {
LOG(ERROR) << "get cluster info from mds failed, status code = "
<< response.statuscode()
<< ", error content: "
<< cntl.ErrorText()
<< ", retry GetClusterInfo, retry times = "
<< count;

// 1. 访问不存在的IP地址会报错:ETIMEDOUT
// 2. 访问存在的IP地址,但无人监听:ECONNREFUSED
// 3. 正常发送RPC情况下,对端进程挂掉了:EHOSTDOWN
// 4. 链接建立,对端主机挂掉了:brpc::ERPCTIMEDOUT
// 5. 对端server调用了Stop:ELOGOFF
// 6. 对端链接已关闭:ECONNRESET
// 在这几种场景下,主动切换mds。
// GetClusterInfo在主IO路劲上,所以即使切换mds server失败
// 也不能直接向上返回,也需要重试到规定次数。
// 因为返回失败就会导致qemu一侧磁盘IO错误,上层应用就crash了。
// 所以这里的rpc超时次数要设置大一点
if (cntl.ErrorCode() == brpc::ERPCTIMEDOUT ||
cntl.ErrorCode() == ETIMEDOUT) {
timeOutTimes++;
}

// rpc超时次数达到synchronizeRPCRetryTime次的时候就触发切换mds
if (timeOutTimes > metaServerOpt_.synchronizeRPCRetryTime ||
cntl.ErrorCode() == EHOSTDOWN ||
cntl.ErrorCode() == ECONNRESET ||
cntl.ErrorCode() == ECONNREFUSED ||
cntl.ErrorCode() == brpc::ELOGOFF) {
count++;
if (!ChangeMDServer(&mdsAddrleft)) {
LOG(ERROR) << "change mds server failed!";
bthread_usleep(metaServerOpt_.retryIntervalUs);
} else {
timeOutTimes = 0;
}
} else {
if (!UpdateRetryinfoOrChangeServer(&count, &mdsAddrleft, false)) { // NOLINT
LOG(ERROR) << "UpdateRetryinfoOrChangeServer failed!";
}
}
continue;
}

if (response.statuscode() == 0) {
clsctx->clusterId = response.clusterid();
return LIBCURVE_ERROR::OK;
}
return LIBCURVE_ERROR::FAILED;
}
return LIBCURVE_ERROR::FAILED;
}

LIBCURVE_ERROR MDSClient::CreateCloneFile(const std::string &destination,
const UserInfo_t& userinfo,
uint64_t size,
Expand Down
8 changes: 8 additions & 0 deletions src/client/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ class MDSClient {
LIBCURVE_ERROR GetServerList(const LogicPoolID &logicPoolId,
const std::vector<CopysetID>& csid,
std::vector<CopysetInfo_t>* cpinfoVec);

/**
* 获取当前mds所属的集群信息
* @param[out]: clsctx 为要获取的集群信息
* @return: 成功返回LIBCURVE_ERROR::OK,否则返回LIBCURVE_ERROR::FAILED
*/
LIBCURVE_ERROR GetClusterInfo(ClusterContext* clsctx);

/**
* 获取segment的chunk信息,并更新到Metacache
* @param: allocate为true的时候mds端发现不存在就分配,为false的时候不分配
Expand Down
12 changes: 12 additions & 0 deletions src/client/mds_client_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,18 @@ void MDSClientBase::GetServerList(const LogicPoolID& logicalpooid,
stub.GetChunkServerListInCopySets(cntl, &request, response, nullptr);
}

void MDSClientBase::GetClusterInfo(GetClusterInfoResponse* response,
brpc::Controller* cntl,
brpc::Channel* channel) {
GetClusterInfoRequest request;

cntl->set_log_id(GetLogId());
cntl->set_timeout_ms(metaServerOpt_.rpcTimeoutMs);

curve::mds::topology::TopologyService_Stub stub(channel);
stub.GetClusterInfo(cntl, &request, response, nullptr);
}

void MDSClientBase::CreateCloneFile(const std::string &destination,
const UserInfo_t& userinfo,
uint64_t size,
Expand Down
13 changes: 13 additions & 0 deletions src/client/mds_client_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ using curve::mds::GetOrAllocateSegmentRequest;
using curve::mds::GetOrAllocateSegmentResponse;
using curve::mds::topology::GetChunkServerListInCopySetsRequest;
using curve::mds::topology::GetChunkServerListInCopySetsResponse;
using curve::mds::topology::GetClusterInfoRequest;
using curve::mds::topology::GetClusterInfoResponse;

extern const char* kRootUserName;

Expand Down Expand Up @@ -245,6 +247,17 @@ class MDSClientBase {
GetChunkServerListInCopySetsResponse* response,
brpc::Controller* cntl,
brpc::Channel* channel);

/**
* 获取mds对应的cluster id
* @param[out]: response为该rpc的respoonse,提供给外部处理
* @param[in|out]: cntl既是入参,也是出参,返回RPC状态
* @param[in]: channel是当前与mds建立的通道
*/
void GetClusterInfo(GetClusterInfoResponse* response,
brpc::Controller* cntl,
brpc::Channel* channel);

/**
* 创建clone文件
* @param:destination clone目标文件名
Expand Down
14 changes: 14 additions & 0 deletions test/client/fake/fakeMDS.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "src/common/timeutility.h"
#include "src/common/authenticator.h"
#include "proto/heartbeat.pb.h"
#include "src/common/uuid.h"

using curve::common::Authenticator;

Expand All @@ -34,6 +35,8 @@ using ::curve::mds::topology::GetChunkServerListInCopySetsResponse;
using ::curve::mds::topology::GetChunkServerListInCopySetsRequest;
using ::curve::mds::topology::ChunkServerRegistRequest;
using ::curve::mds::topology::ChunkServerRegistResponse;
using ::curve::mds::topology::GetClusterInfoRequest;
using ::curve::mds::topology::GetClusterInfoResponse;

using HeartbeatRequest = curve::mds::heartbeat::ChunkServerHeartbeatRequest;
using HeartbeatResponse = curve::mds::heartbeat::ChunkServerHeartbeatResponse;
Expand Down Expand Up @@ -671,6 +674,17 @@ class FakeMDSTopologyService : public curve::mds::topology::TopologyService {
response->set_token(request->hostip());
}

void GetClusterInfo(::google::protobuf::RpcController* controller,
const GetClusterInfoRequest* request,
GetClusterInfoResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);

std::string uuid = curve::common::UUIDGenerator().GenerateUUID();
response->set_statuscode(0);
response->set_clusterid(uuid);
}

void SetFakeReturn(FakeReturn* fakeret) {
fakeret_ = fakeret;
}
Expand Down
Loading

0 comments on commit d590f4f

Please sign in to comment.