Skip to content

Commit

Permalink
curvefs/metaserver: now we support rocksdb storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
Wine93 committed Apr 10, 2022
1 parent 5107c09 commit 79e09d3
Show file tree
Hide file tree
Showing 108 changed files with 7,833 additions and 1,658 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,9 @@ curvefs/docker/base/*
!curvefs/docker/base/Dockerfile
!curvefs/docker/base/Makefile
curvefs/BUILD_MODE

.BUILD_MODE
__not_found__
thirdparties/rocksdb/lib/
thirdparties/rocksdb/include/
thirdparties/rocksdb/rocksdb/
9 changes: 8 additions & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ http_archive(
sha256 = "9510dd2afc29e7245e9e884336f848c8a6600a14ae726adb6befdb4f786f0be2",
urls = ["https://github.com/google/protobuf/archive/v3.6.1.3.zip"],
)

bind(
name = "protobuf",
actual = "@com_google_protobuf//:protobuf",
Expand Down Expand Up @@ -229,3 +229,10 @@ http_archive(
strip_prefix = "platforms-98939346da932eef0b54cf808622f5bb0928f00b",
urls = ["https://github.com/bazelbuild/platforms/archive/98939346da932eef0b54cf808622f5bb0928f00b.zip"],
)

# RocksDB
new_local_repository(
name = "rocksdb",
build_file = "external/bazel/rocksdb.BUILD",
path = "thirdparties/rocksdb",
)
3 changes: 2 additions & 1 deletion curvefs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

prefix?= "$(PWD)/devops/projects"
release?= 0
build_rocksdb?= 0
only?= "*"
hosts?= "*"
tag?= "curvefs:unknown"
Expand Down Expand Up @@ -32,7 +33,7 @@ define only_specify
endef

build:
@bash util/build.sh --only=$(only) --release=$(release) --os=$(os)
@bash util/build.sh --only=$(only) --release=$(release) --os=$(os) --build_rocksdb=$(build_rocksdb)

install:
@bash util/install.sh --prefix=$(prefix) --only=$(only)
Expand Down
2 changes: 2 additions & 0 deletions curvefs/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ excutorOpt.maxRetry=1000000
excutorOpt.retryIntervalUS=500
# RPC timeout for communicating with metaserver
excutorOpt.rpcTimeoutMS=1000
# RPC stream idle timeout
excutorOpt.rpcStreamIdleTimeoutMS=500
# The maximum timeout RPC time of the retry request.
# The timeout time will follow the exponential backoff policy.
# Because the timeout occurs when the network is congested, the RPC timeout needs to be increased
Expand Down
23 changes: 21 additions & 2 deletions curvefs/conf/metaserver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,31 @@ mdsOpt.rpcRetryOpt.normalRetryTimesBeforeTriggerWait=3
# Sleep interval for wait
mdsOpt.rpcRetryOpt.waitSleepMs=1000
mdsOpt.rpcRetryOpt.addrs=127.0.0.1:6700,127.0.0.1:6701,127.0.0.1:6702 # __CURVEADM_TEMPLATE__ ${cluster_mds_addr} __CURVEADM_TEMPLATE__ __ANSIBLE_TEMPLATE__ {{ groups.mds | join_peer(hostvars, "mds_listen_port") }} __ANSIBLE_TEMPLATE__

#
# storage settings
#
# metaserver storage data directory
storage.data_dir=/tmp/storage # __CURVEADM_TEMPLATE__ ${prefix}/data __CURVEADM_TEMPLATE__
# storage type, "memory" or "rocksdb"
storage.type=rocksdb
# metaserver max memory quota bytes (default: 30GB)
storage.max_memory_quota_bytes=32212254720
# metaserver max disk quota bytes (default: 2TB)
storage.max_disk_quota_bytes=2199023255552
# storage root dir, rocksdb's all SST and other files store under it
storage.data_dir=./storage # __CURVEADM_TEMPLATE__ ${prefix}/data/storage __CURVEADM_TEMPLATE__
# whether need to compress the value for memory storage (default: False)
storage.memory.compression=False
# rocksdb column family's write_buffer_size
# for store inode which exclude its s3chunkinfo list (unit: bytes, default: 128MB)
storage.rocksdb.unordered_write_buffer_size=134217728
# rocksdb column family's max_write_buffer_number
# for store inode which exclude its s3chunkinfo list (default: 5)
storage.rocksdb.unordered_max_write_buffer_number=5
# rocksdb column family's write_buffer_size
# for store dentry and inode's s3chunkinfo list (unit: bytes, default: 128MB)
storage.rocksdb.ordered_write_buffer_size=134217728
# rocksdb column family's max_write_buffer_number
# for store dentry and inode's s3chunkinfo list (default: 15)
storage.rocksdb.ordered_max_write_buffer_number=15
# rocksdb block cache(LRU) capacity (default: 128MB)
storage.rocksdb.block_cache_capacity=134217728
2 changes: 2 additions & 0 deletions curvefs/docker/debian10/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ function create_directory() {
chmod 700 "$g_prefix/data"
if [ "$g_role" == "etcd" ]; then
mkdir -p "$g_prefix/data/wal"
elif [ "$g_role" == "metaserver" ]; then
mkdir -p "$g_prefix/data/storage"
elif [ "$g_role" == "client" ]; then
mkdir -p "$g_prefix/mnt"
fi
Expand Down
2 changes: 2 additions & 0 deletions curvefs/docker/debian11/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ function create_directory() {
chmod 700 "$g_prefix/data"
if [ "$g_role" == "etcd" ]; then
mkdir -p "$g_prefix/data/wal"
elif [ "$g_role" == "metaserver" ]; then
mkdir -p "$g_prefix/data/storage"
elif [ "$g_role" == "client" ]; then
mkdir -p "$g_prefix/mnt"
fi
Expand Down
2 changes: 2 additions & 0 deletions curvefs/docker/debian9/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ function create_directory() {
chmod 700 "$g_prefix/data"
if [ "$g_role" == "etcd" ]; then
mkdir -p "$g_prefix/data/wal"
elif [ "$g_role" == "metaserver" ]; then
mkdir -p "$g_prefix/data/storage"
elif [ "$g_role" == "client" ]; then
mkdir -p "$g_prefix/mnt"
fi
Expand Down
8 changes: 6 additions & 2 deletions curvefs/proto/metaserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ enum MetaStatusCode {
S3_DELETE_ERR = 19;
PARTITION_ID_MISSMATCH = 20;
IDEMPOTENCE_OK = 21;
SERIALIZE_TO_STRING_FAILED = 22;
PARSE_FROM_STRING_FAILED = 23;
STORAGE_INTERNAL_ERROR = 24;
RPC_STREAM_ERROR = 25;
}

// dentry interface
Expand Down Expand Up @@ -382,13 +386,13 @@ message BatchGetXAttrRequest {
repeated uint64 inodeId = 5;
optional uint64 appliedIndex = 6;
}

message XAttr {
required uint64 inodeId = 1;
required uint32 fsId = 2;
map<string, string> xAttrInfos = 3;
}

message BatchGetXAttrResponse {
required MetaStatusCode statusCode = 1;
repeated XAttr xattr = 2;
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ void InitExcutorOption(Configuration *conf, ExcutorOpt *opts) {
conf->GetValueFatalIfFail("excutorOpt.retryIntervalUS",
&opts->retryIntervalUS);
conf->GetValueFatalIfFail("excutorOpt.rpcTimeoutMS", &opts->rpcTimeoutMS);
conf->GetValueFatalIfFail("excutorOpt.rpcStreamIdleTimeoutMS",
&opts->rpcStreamIdleTimeoutMS);
conf->GetValueFatalIfFail("excutorOpt.maxRPCTimeoutMS",
&opts->maxRPCTimeoutMS);
conf->GetValueFatalIfFail("excutorOpt.maxRetrySleepIntervalUS",
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct ExcutorOpt {
uint32_t maxRetry = 3;
uint64_t retryIntervalUS = 200;
uint64_t rpcTimeoutMS = 1000;
uint64_t rpcStreamIdleTimeoutMS = 500;
uint64_t maxRPCTimeoutMS = 64000;
uint64_t maxRetrySleepIntervalUS = 64ull * 1000 * 1000;
uint64_t minRetryTimesForceTimeoutBackoff = 5;
Expand Down
9 changes: 9 additions & 0 deletions curvefs/src/client/inode_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ CURVEFS_ERROR InodeCacheManagerImpl::GetInode(uint64_t inodeid,
out = std::make_shared<InodeWrapper>(
std::move(inode), metaClient_);

// NOTE: now the s3chunkinfo in inode is empty for
// we had store it with alone, so we should invoke
// RefreshS3ChunkInfo() to padding inode's s3chunkinfo.
CURVEFS_ERROR rc = out->RefreshS3ChunkInfo();
if (rc != CURVEFS_ERROR::OK) {
LOG(ERROR) << "RefreshS3ChunkInfo() failed, retCode = " << rc;
return rc;
}

std::shared_ptr<InodeWrapper> eliminatedOne;
bool eliminated = iCache_->Put(inodeid, out, &eliminatedOne);
if (eliminated) {
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/lease/lease_excutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#ifndef CURVEFS_SRC_CLIENT_LEASE_LEASE_EXCUTOR_H_
#define CURVEFS_SRC_CLIENT_LEASE_LEASE_EXCUTOR_H_

#include <memory>

#include "curvefs/src/client/rpcclient/metacache.h"
#include "curvefs/src/client/rpcclient/mds_client.h"
#include "curvefs/src/client/common/config.h"
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/rpcclient/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ cc_library(
"//curvefs/proto:space_cc_proto",
"//curvefs/src/client/common",
"//curvefs/src/client/metric:client_metric",
"//curvefs/src/common:curvefs_common",
"//external:brpc",
"//external:gflags",
"//external:glog",
"//src/client:curve_client",
"@com_google_absl//absl/cleanup",
],
)
65 changes: 60 additions & 5 deletions curvefs/src/client/rpcclient/channel_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,24 @@ namespace rpcclient {
template <typename T> class ChannelManager {
public:
using ChannelPtr = std::shared_ptr<brpc::Channel>;

ChannelPtr GetOrCreateChannel(const T &id,
const butil::EndPoint &leaderAddr);

ChannelPtr GetOrCreateStreamChannel(const T &id,
const butil::EndPoint &leaderAddr);

void ResetSenderIfNotHealth(const T &csId);

private:
void ResetSenderIfNotHealthInternal(
std::unordered_map<T, ChannelPtr>* channelPool,
const T &csId);

private:
curve::common::BthreadRWLock rwlock_;
std::unordered_map<T, ChannelPtr> channelPool_;
std::unordered_map<T, ChannelPtr> streamChannelPool_;
};

template <typename T>
Expand All @@ -67,7 +78,7 @@ ChannelManager<T>::GetOrCreateChannel(const T &id,
}

auto channel = std::make_shared<brpc::Channel>();
if (0 != channel->Init(leaderAddr, NULL)) {
if (0 != channel->Init(leaderAddr, nullptr)) {
LOG(ERROR) << "failed to init channel to server, " << id << ", "
<< butil::endpoint2str(leaderAddr).c_str();
return nullptr;
Expand All @@ -78,11 +89,48 @@ ChannelManager<T>::GetOrCreateChannel(const T &id,
}

template <typename T>
void ChannelManager<T>::ResetSenderIfNotHealth(const T &id) {
typename ChannelManager<T>::ChannelPtr
ChannelManager<T>::GetOrCreateStreamChannel(const T &id,
const butil::EndPoint &leaderAddr) {
{
curve::common::ReadLockGuard guard(rwlock_);
auto iter = streamChannelPool_.find(id);
if (streamChannelPool_.end() != iter) {
return iter->second;
}
}

curve::common::WriteLockGuard guard(rwlock_);
auto iter = channelPool_.find(id);
auto iter = streamChannelPool_.find(id);
if (streamChannelPool_.end() != iter) {
return iter->second;
}

if (iter == channelPool_.end()) {
// NOTE: we must sperate normal channel and streaming channel,
// because the BRPC can't distinguish the normal RPC
// with streaming RPC in one connection.
// see issue: https://github.com/apache/incubator-brpc/issues/392
auto channel = std::make_shared<brpc::Channel>();
brpc::ChannelOptions options;
options.connection_group = "streaming";
if (0 != channel->Init(leaderAddr, &options)) {
LOG(ERROR) << "failed to init channel to server, " << id << ", "
<< butil::endpoint2str(leaderAddr).c_str();
return nullptr;
} else {
streamChannelPool_.emplace(id, channel);
return channel;
}
}

template <typename T>
void ChannelManager<T>::ResetSenderIfNotHealthInternal(
std::unordered_map<T, ChannelPtr>* channelPool,
const T &id) {
curve::common::WriteLockGuard guard(rwlock_);
auto iter = channelPool->find(id);

if (iter == channelPool->end()) {
return;
}

Expand All @@ -91,8 +139,15 @@ void ChannelManager<T>::ResetSenderIfNotHealth(const T &id) {
return;
}

channelPool_.erase(iter);
channelPool->erase(iter);
}

template <typename T>
void ChannelManager<T>::ResetSenderIfNotHealth(const T &id) {
ResetSenderIfNotHealthInternal(&channelPool_, id);
ResetSenderIfNotHealthInternal(&streamChannelPool_, id);
}

} // namespace rpcclient
} // namespace client
} // namespace curvefs
Expand Down
Loading

0 comments on commit 79e09d3

Please sign in to comment.