diff --git a/curvefs_python/curvefs.py b/curvefs_python/curvefs.py index 2a3f75441f..f52d385ea7 100644 --- a/curvefs_python/curvefs.py +++ b/curvefs_python/curvefs.py @@ -130,6 +130,7 @@ class _object: CURVE_ERROR_UNKNOWN = _curvefs.CURVE_ERROR_UNKNOWN CURVE_OP_READ = _curvefs.CURVE_OP_READ CURVE_OP_WRITE = _curvefs.CURVE_OP_WRITE +CLUSTERIDMAX = _curvefs.CLUSTERIDMAX class AioContext_t(_object): __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, AioContext_t, name, value) @@ -358,6 +359,10 @@ def Rmdir(dirpath, info): def UnInit(): return _curvefs.UnInit() UnInit = _curvefs.UnInit + +def GetClusterId(buf=None, len=0): + return _curvefs.GetClusterId(buf, len) +GetClusterId = _curvefs.GetClusterId # This file is compatible with both classic and new-style classes. diff --git a/curvefs_python/curvefs_wrap.cxx b/curvefs_python/curvefs_wrap.cxx index fefa29ddab..1b76fc97cc 100644 --- a/curvefs_python/curvefs_wrap.cxx +++ b/curvefs_python/curvefs_wrap.cxx @@ -5600,6 +5600,58 @@ SWIGINTERN PyObject *_wrap_UnInit(PyObject *SWIGUNUSEDPARM(self), PyObject *args } +SWIGINTERN PyObject *_wrap_GetClusterId(PyObject *SWIGUNUSEDPARM(self), PyObject *args) { + PyObject *resultobj = 0; + char *arg1 = (char *) nullptr ; + int arg2 = (int) 0 ; + int res1 ; + char *buf1 = 0 ; + int alloc1 = 0 ; + int val2 ; + int ecode2 = 0 ; + PyObject * obj0 = 0 ; + PyObject * obj1 = 0 ; + int result; + int retlen; + + if (!PyArg_ParseTuple(args,(char *)"|OO:GetClusterId",&obj0,&obj1)) SWIG_fail; + if (obj0) { + res1 = SWIG_AsCharPtrAndSize(obj0, &buf1, NULL, &alloc1); + if (!SWIG_IsOK(res1)) { + SWIG_exception_fail(SWIG_ArgError(res1), "in method '" "GetClusterId" "', argument " "1"" of type '" "char *""'"); + } + arg1 = reinterpret_cast< char * >(buf1); + } + if (obj1) { + ecode2 = SWIG_AsVal_int(obj1, &val2); + if (!SWIG_IsOK(ecode2)) { + SWIG_exception_fail(SWIG_ArgError(ecode2), "in method '" "GetClusterId" "', argument " "2"" of type '" "int""'"); + } + arg2 = static_cast< int >(val2); + } + if (arg2 == 0) { + arg2 = CLUSTERIDMAX; + } + arg1 = new char[arg2]; + result = (int)GetClusterId(arg1,arg2); + if (result < 0 ) { + resultobj = SWIG_From_int(static_cast< int >(result)); + if (alloc1 == SWIG_NEWOBJ) delete[] buf1; + delete[] arg1; + return resultobj; + } + retlen = strlen(arg1); + resultobj = SWIG_Python_AppendOutput(resultobj, SWIG_FromCharPtrAndSize(arg1, retlen)); + if (alloc1 == SWIG_NEWOBJ) delete[] buf1; + delete[] arg1; + return resultobj; +fail: + if (alloc1 == SWIG_NEWOBJ) delete[] buf1; + delete[] arg1; + return NULL; +} + + static PyMethodDef SwigMethods[] = { { (char *)"SWIG_PyInstanceMethod_New", (PyCFunction)SWIG_PyInstanceMethod_New, METH_O, NULL}, { (char *)"AioContext_t_offset_set", _wrap_AioContext_t_offset_set, METH_VARARGS, NULL}, @@ -5674,6 +5726,7 @@ static PyMethodDef SwigMethods[] = { { (char *)"Mkdir", _wrap_Mkdir, METH_VARARGS, NULL}, { (char *)"Rmdir", _wrap_Rmdir, METH_VARARGS, NULL}, { (char *)"UnInit", _wrap_UnInit, METH_VARARGS, NULL}, + { (char *)"GetClusterId", _wrap_GetClusterId, METH_VARARGS, NULL}, { NULL, NULL, 0, NULL } }; @@ -6472,6 +6525,7 @@ SWIG_init(void) { SWIG_Python_SetConstant(d, "CURVE_ERROR_UNKNOWN",SWIG_From_int(static_cast< int >(100))); SWIG_Python_SetConstant(d, "CURVE_OP_READ",SWIG_From_int(static_cast< int >(0))); SWIG_Python_SetConstant(d, "CURVE_OP_WRITE",SWIG_From_int(static_cast< int >(1))); + SWIG_Python_SetConstant(d, "CLUSTERIDMAX",SWIG_From_int(static_cast< int >(256))); #if PY_VERSION_HEX >= 0x03000000 return m; #else diff --git a/curvefs_python/libcurvefs.h b/curvefs_python/libcurvefs.h index 0a58d7d50b..e98b9deb5f 100644 --- a/curvefs_python/libcurvefs.h +++ b/curvefs_python/libcurvefs.h @@ -77,6 +77,8 @@ extern "C" { #define CURVE_OP_READ 0 #define CURVE_OP_WRITE 1 +#define CLUSTERIDMAX 256 + typedef void (*AioCallBack)(struct AioContext* context); typedef struct AioContext { @@ -141,6 +143,8 @@ int Rmdir(const char* dirpath, UserInfo_t* info); void UnInit(); +int GetClusterId(char* buf = nullptr, int len = 0); + #ifdef __cplusplus } #endif diff --git a/include/client/libcurve.h b/include/client/libcurve.h index 8900a540b0..6fb1822cb5 100644 --- a/include/client/libcurve.h +++ b/include/client/libcurve.h @@ -221,6 +221,14 @@ int Close(int fd); void UnInit(); +/** + * @brief: 获取集群id, id用UUID标识 + * @param: buf存放集群id + * @param: buf的长度 + * @return: 成功返回0, 否则返回-LIBCURVE_ERROR::FAILED + */ +int GetClusterId(char* buf, int len); + #ifdef __cplusplus } #endif diff --git a/src/client/client_common.h b/src/client/client_common.h index 1d6d430645..77d2bf9ce9 100644 --- a/src/client/client_common.h +++ b/src/client/client_common.h @@ -218,6 +218,11 @@ struct ChunkServerAddr { return addr_ == other.addr_; } }; + +struct ClusterContext { + std::string clusterId; +}; + } // namespace client } // namespace curve diff --git a/src/client/libcurve_file.cpp b/src/client/libcurve_file.cpp index 5ec4abfe46..7108c35c5b 100644 --- a/src/client/libcurve_file.cpp +++ b/src/client/libcurve_file.cpp @@ -456,6 +456,34 @@ bool FileClient::CheckAligned(off_t offset, size_t length) { (length % IO_ALIGNED_BLOCK_SIZE == 0); } +int FileClient::GetClusterId(char* buf, int len) { + if (mdsClient_ == nullptr) { + LOG(ERROR) << "global mds client not inited!"; + return -LIBCURVE_ERROR::FAILED; + } + + if (buf == nullptr) { + LOG(ERROR) << "invalid argument: buffer is nullptr"; + return -LIBCURVE_ERROR::FAILED; + } + + ClusterContext clsctx; + int ret = mdsClient_->GetClusterInfo(&clsctx); + if (ret == LIBCURVE_ERROR::OK) { + if (len >= clsctx.clusterId.size() + 1) { + snprintf(buf, len, "%s", clsctx.clusterId.c_str()); + return LIBCURVE_ERROR::OK; + } + + LOG(ERROR) << "buffer length is too small, " + << "cluster id length is " + << clsctx.clusterId.size() + 1; + return -LIBCURVE_ERROR::FAILED; + } + + return -LIBCURVE_ERROR::FAILED; +} + } // namespace client } // namespace curve @@ -752,6 +780,15 @@ void UnInit() { GlobalUnInit(); } +int GetClusterId(char* buf, int len) { + if (globalclient == nullptr) { + LOG(ERROR) << "not inited!"; + return -LIBCURVE_ERROR::FAILED; + } + + return globalclient->GetClusterId(buf, len); +} + int GlobalInit(const char* path) { int ret = 0; if (globalclientinited_) { diff --git a/src/client/libcurve_file.h b/src/client/libcurve_file.h index c7abb8101e..30a0f86f32 100644 --- a/src/client/libcurve_file.h +++ b/src/client/libcurve_file.h @@ -193,6 +193,14 @@ class FileClient { */ virtual void UnInit(); + /** + * @brief: 获取集群id + * @param: buf存放集群id + * @param: buf的长度 + * @return: 成功返回0, 失败返回-LIBCURVE_ERROR::FAILED + */ + int GetClusterId(char* buf, int len); + private: inline bool CheckAligned(off_t offset, size_t length); diff --git a/src/client/mds_client.cpp b/src/client/mds_client.cpp index 0007774946..a2c755a576 100644 --- a/src/client/mds_client.cpp +++ b/src/client/mds_client.cpp @@ -1032,6 +1032,77 @@ LIBCURVE_ERROR MDSClient::GetServerList(const LogicPoolID& logicalpooid, return LIBCURVE_ERROR::FAILED; } +LIBCURVE_ERROR MDSClient::GetClusterInfo(ClusterContext* clsctx) { + // 记录当前mds重试次数 + int count = 0; + // 记录重试中timeOut次数 + int timeOutTimes = 0; + // 记录还没重试的mds addr数量 + int mdsAddrleft = metaServerOpt_.metaaddrvec.size() - 1; + + while (count < metaServerOpt_.rpcRetryTimes) { + brpc::Controller cntl; + curve::mds::topology::GetClusterInfoResponse response; + + { + std::unique_lock lk(mutex_); + mdsClientBase_.GetClusterInfo(&response, &cntl, channel_); + } + + if (cntl.Failed()) { + LOG(ERROR) << "get cluster info from mds failed, status code = " + << response.statuscode() + << ", error content: " + << cntl.ErrorText() + << ", retry GetClusterInfo, retry times = " + << count; + + // 1. 访问不存在的IP地址会报错:ETIMEDOUT + // 2. 访问存在的IP地址,但无人监听:ECONNREFUSED + // 3. 正常发送RPC情况下,对端进程挂掉了:EHOSTDOWN + // 4. 链接建立,对端主机挂掉了:brpc::ERPCTIMEDOUT + // 5. 对端server调用了Stop:ELOGOFF + // 6. 对端链接已关闭:ECONNRESET + // 在这几种场景下,主动切换mds。 + // GetClusterInfo在主IO路劲上,所以即使切换mds server失败 + // 也不能直接向上返回,也需要重试到规定次数。 + // 因为返回失败就会导致qemu一侧磁盘IO错误,上层应用就crash了。 + // 所以这里的rpc超时次数要设置大一点 + if (cntl.ErrorCode() == brpc::ERPCTIMEDOUT || + cntl.ErrorCode() == ETIMEDOUT) { + timeOutTimes++; + } + + // rpc超时次数达到synchronizeRPCRetryTime次的时候就触发切换mds + if (timeOutTimes > metaServerOpt_.synchronizeRPCRetryTime || + cntl.ErrorCode() == EHOSTDOWN || + cntl.ErrorCode() == ECONNRESET || + cntl.ErrorCode() == ECONNREFUSED || + cntl.ErrorCode() == brpc::ELOGOFF) { + count++; + if (!ChangeMDServer(&mdsAddrleft)) { + LOG(ERROR) << "change mds server failed!"; + bthread_usleep(metaServerOpt_.retryIntervalUs); + } else { + timeOutTimes = 0; + } + } else { + if (!UpdateRetryinfoOrChangeServer(&count, &mdsAddrleft, false)) { // NOLINT + LOG(ERROR) << "UpdateRetryinfoOrChangeServer failed!"; + } + } + continue; + } + + if (response.statuscode() == 0) { + clsctx->clusterId = response.clusterid(); + return LIBCURVE_ERROR::OK; + } + return LIBCURVE_ERROR::FAILED; + } + return LIBCURVE_ERROR::FAILED; +} + LIBCURVE_ERROR MDSClient::CreateCloneFile(const std::string &destination, const UserInfo_t& userinfo, uint64_t size, diff --git a/src/client/mds_client.h b/src/client/mds_client.h index 786e6ae725..a95815296a 100644 --- a/src/client/mds_client.h +++ b/src/client/mds_client.h @@ -85,6 +85,14 @@ class MDSClient { LIBCURVE_ERROR GetServerList(const LogicPoolID &logicPoolId, const std::vector& csid, std::vector* cpinfoVec); + + /** + * 获取当前mds所属的集群信息 + * @param[out]: clsctx 为要获取的集群信息 + * @return: 成功返回LIBCURVE_ERROR::OK,否则返回LIBCURVE_ERROR::FAILED + */ + LIBCURVE_ERROR GetClusterInfo(ClusterContext* clsctx); + /** * 获取segment的chunk信息,并更新到Metacache * @param: allocate为true的时候mds端发现不存在就分配,为false的时候不分配 diff --git a/src/client/mds_client_base.cpp b/src/client/mds_client_base.cpp index 0285459ab1..384a6bf1b5 100644 --- a/src/client/mds_client_base.cpp +++ b/src/client/mds_client_base.cpp @@ -264,6 +264,18 @@ void MDSClientBase::GetServerList(const LogicPoolID& logicalpooid, stub.GetChunkServerListInCopySets(cntl, &request, response, nullptr); } +void MDSClientBase::GetClusterInfo(GetClusterInfoResponse* response, + brpc::Controller* cntl, + brpc::Channel* channel) { + GetClusterInfoRequest request; + + cntl->set_log_id(GetLogId()); + cntl->set_timeout_ms(metaServerOpt_.rpcTimeoutMs); + + curve::mds::topology::TopologyService_Stub stub(channel); + stub.GetClusterInfo(cntl, &request, response, nullptr); +} + void MDSClientBase::CreateCloneFile(const std::string &destination, const UserInfo_t& userinfo, uint64_t size, diff --git a/src/client/mds_client_base.h b/src/client/mds_client_base.h index 8e039691f7..e9fd576faa 100644 --- a/src/client/mds_client_base.h +++ b/src/client/mds_client_base.h @@ -68,6 +68,8 @@ using curve::mds::GetOrAllocateSegmentRequest; using curve::mds::GetOrAllocateSegmentResponse; using curve::mds::topology::GetChunkServerListInCopySetsRequest; using curve::mds::topology::GetChunkServerListInCopySetsResponse; +using curve::mds::topology::GetClusterInfoRequest; +using curve::mds::topology::GetClusterInfoResponse; extern const char* kRootUserName; @@ -245,6 +247,17 @@ class MDSClientBase { GetChunkServerListInCopySetsResponse* response, brpc::Controller* cntl, brpc::Channel* channel); + + /** + * 获取mds对应的cluster id + * @param[out]: response为该rpc的respoonse,提供给外部处理 + * @param[in|out]: cntl既是入参,也是出参,返回RPC状态 + * @param[in]: channel是当前与mds建立的通道 + */ + void GetClusterInfo(GetClusterInfoResponse* response, + brpc::Controller* cntl, + brpc::Channel* channel); + /** * 创建clone文件 * @param:destination clone目标文件名 diff --git a/test/client/fake/fakeMDS.h b/test/client/fake/fakeMDS.h index 0efe33681f..22830d7d7d 100644 --- a/test/client/fake/fakeMDS.h +++ b/test/client/fake/fakeMDS.h @@ -24,6 +24,7 @@ #include "src/common/timeutility.h" #include "src/common/authenticator.h" #include "proto/heartbeat.pb.h" +#include "src/common/uuid.h" using curve::common::Authenticator; @@ -34,6 +35,8 @@ using ::curve::mds::topology::GetChunkServerListInCopySetsResponse; using ::curve::mds::topology::GetChunkServerListInCopySetsRequest; using ::curve::mds::topology::ChunkServerRegistRequest; using ::curve::mds::topology::ChunkServerRegistResponse; +using ::curve::mds::topology::GetClusterInfoRequest; +using ::curve::mds::topology::GetClusterInfoResponse; using HeartbeatRequest = curve::mds::heartbeat::ChunkServerHeartbeatRequest; using HeartbeatResponse = curve::mds::heartbeat::ChunkServerHeartbeatResponse; @@ -671,6 +674,17 @@ class FakeMDSTopologyService : public curve::mds::topology::TopologyService { response->set_token(request->hostip()); } + void GetClusterInfo(::google::protobuf::RpcController* controller, + const GetClusterInfoRequest* request, + GetClusterInfoResponse* response, + ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + + std::string uuid = curve::common::UUIDGenerator().GenerateUUID(); + response->set_statuscode(0); + response->set_clusterid(uuid); + } + void SetFakeReturn(FakeReturn* fakeret) { fakeret_ = fakeret; } diff --git a/test/client/libcurve_interface_unittest.cpp b/test/client/libcurve_interface_unittest.cpp index 7dc3a2bfb6..0bb021ceb0 100644 --- a/test/client/libcurve_interface_unittest.cpp +++ b/test/client/libcurve_interface_unittest.cpp @@ -86,6 +86,22 @@ TEST(TestLibcurveInterface, InterfaceTest) { ASSERT_EQ(0, Init(configpath.c_str())); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + // test get cluster id + const int CLUSTERIDMAX = 256; + char clusterId[CLUSTERIDMAX]; + + ASSERT_EQ(GetClusterId(nullptr, 0), -LIBCURVE_ERROR::FAILED); + + memset(clusterId, 0, sizeof(clusterId)); + ASSERT_EQ(GetClusterId(clusterId, CLUSTERIDMAX), LIBCURVE_ERROR::OK); + ASSERT_GT(strlen(clusterId), 0); + ASSERT_EQ(strlen(clusterId), 36); + + memset(clusterId, 0, sizeof(clusterId)); + ASSERT_EQ(GetClusterId(clusterId, 0), -LIBCURVE_ERROR::FAILED); + ASSERT_EQ(GetClusterId(clusterId, 1), -LIBCURVE_ERROR::FAILED); + // libcurve file operation int temp = Create(filename.c_str(), &userinfo, FLAGS_test_disk_size);