Skip to content

Commit

Permalink
Merge "std::thread" into feature
Browse files Browse the repository at this point in the history
  • Loading branch information
hzchenwei7 authored and Gerrit Code Review committed Jul 29, 2019
2 parents 89af318 + 1f09319 commit 41a5e11
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 27 deletions.
9 changes: 5 additions & 4 deletions src/mds/nameserver2/clean_task_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ CleanTaskManager::CleanTaskManager(int threadNum, int checkPeriod) {
void CleanTaskManager::CheckCleanResult(void) {
while (true) {
{
std::lock_guard<std::mutex> lck(mutex_);
common::LockGuard lck(mutex_);
for (auto iter = cleanTasks_.begin();
stopFlag_ != true && iter != cleanTasks_.end();) {
// TODO(hzsunjianliang): check if task is too long to warning
Expand Down Expand Up @@ -58,7 +58,8 @@ bool CleanTaskManager::Start(void) {
}

// start check thread
checkThread_ = new std::thread(&CleanTaskManager::CheckCleanResult, this);
checkThread_ = new common::Thread(&CleanTaskManager::CheckCleanResult,
this);
LOG(INFO) << "TaskManger check thread started";
return true;
}
Expand All @@ -78,7 +79,7 @@ bool CleanTaskManager::Stop(void) {
}

bool CleanTaskManager::PushTask(std::shared_ptr<Task> task) {
std::lock_guard<std::mutex> lck(mutex_);
common::LockGuard lck(mutex_);
if (stopFlag_) {
LOG(ERROR) << "task manager not started, taskID = "
<< task->GetTaskID();
Expand All @@ -97,7 +98,7 @@ bool CleanTaskManager::PushTask(std::shared_ptr<Task> task) {
}

std::shared_ptr<Task> CleanTaskManager::GetTask(TaskIDType id) {
std::lock_guard<std::mutex> lck(mutex_);
common::LockGuard lck(mutex_);

auto iter = cleanTasks_.begin();
if ((iter = cleanTasks_.find(id)) == cleanTasks_.end()) {
Expand Down
8 changes: 4 additions & 4 deletions src/mds/nameserver2/clean_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class CleanTaskManager {
~CleanTaskManager() {}

/**
* @brief 启动worker线程池、启动检查线程
*
* @brief 启动worker线程池、启动检查线程
*
*/
bool Start(void);

Expand Down Expand Up @@ -62,8 +62,8 @@ class CleanTaskManager {
::curve::common::TaskThreadPool *cleanWorkers_;
// for period check snapshot delete status
std::unordered_map<TaskIDType, std::shared_ptr<Task>> cleanTasks_;
std::mutex mutex_;
std::thread *checkThread_;
common::Mutex mutex_;
common::Thread *checkThread_;
int checkPeriod_;

bool stopFlag_;
Expand Down
4 changes: 2 additions & 2 deletions src/mds/nameserver2/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ bool SessionManager::Init(const struct SessionOptions &sessionOptions) {
}

void SessionManager::Start() {
scanThread = new std::thread(&SessionManager::SessionScanFunc, this);
scanThread = new common::Thread(&SessionManager::SessionScanFunc, this);
return;
}

Expand Down Expand Up @@ -537,7 +537,7 @@ void SessionManager::SessionScanFunc() {
HandleDeleteSessionList();

// 3、睡眠一段时间
std::unique_lock<std::mutex> lk(exitmtx_);
common::UniqueLock lk(exitmtx_);
exitcv_.wait_for(lk, std::chrono::microseconds(intevalTime_),
[&]()->bool{ return sessionScanStop_;});
}
Expand Down
4 changes: 2 additions & 2 deletions src/mds/nameserver2/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Session {
uint64_t updateTime_;
// 能够容忍的client和mds之间的时钟不同步的时间,单位us
uint32_t toleranceTime_;
std::mutex sessionLock_;
common::Mutex sessionLock_;
// proto中定义的session结构,可用来返回给client
ProtoSession protoSession_;
// client的ip
Expand Down Expand Up @@ -219,7 +219,7 @@ class SessionManager {
curve::common::Atomic<bool> sessionScanStop_;

// session的后台扫描线程,扫描回收过期的session
std::thread *scanThread;
common::Thread *scanThread;

// 对sessionMap_进行操作时,需要进行加锁
curve::common::RWLock rwLock_;
Expand Down
4 changes: 2 additions & 2 deletions src/mds/schedule/coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void Coordinator::Run() {
SetSchedulerRunning(true);
// run different scheduler at interval in different threads
for (auto &v : schedulerController_) {
runSchedulerThreads_[v.first] = std::thread(
runSchedulerThreads_[v.first] = common::Thread(
&Coordinator::RunScheduler, this, v.second);
}
}
Expand Down Expand Up @@ -236,7 +236,7 @@ bool Coordinator::ChunkserverGoingToAdd(


void Coordinator::SetSchedulerRunning(bool flag) {
std::lock_guard<std::mutex> guard(mutex_);
common::LockGuard guard(mutex_);
schedulerRunning_ = flag;
}

Expand Down
2 changes: 1 addition & 1 deletion src/mds/schedule/coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class Coordinator {
std::shared_ptr<TopoAdapter> topo_;

std::map<SchedulerType, std::shared_ptr<Scheduler>> schedulerController_;
std::map<SchedulerType, std::thread> runSchedulerThreads_;
std::map<SchedulerType, common::Thread> runSchedulerThreads_;
std::shared_ptr<OperatorController> opController_;

bool schedulerRunning_;
Expand Down
6 changes: 3 additions & 3 deletions test/mds/nameserver2/etcd_id_generator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ TEST_F(TestEtcdIdGenerator, test_multiclient) {
storeKey_, strAlloc2, NameSpaceStorageCodec::EncodeID(alloc3)))
.WillOnce(Return(EtcdErrCode::OK));

std::thread thread1 = std::thread(
common::Thread thread1 = common::Thread(
&TestEtcdIdGenerator::GenID1000Times, this);
std::thread thread2 = std::thread(
common::Thread thread2 = common::Thread(
&TestEtcdIdGenerator::GenID1000Times, this);
std::thread thread3 = std::thread(
common::Thread thread3 = common::Thread(
&TestEtcdIdGenerator::GenID500Times, this);
thread1.join();
thread2.join();
Expand Down
14 changes: 7 additions & 7 deletions test/mds/nameserver2/etcdclient_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ TEST_F(TestEtcdClinetImp, test_CampaignLeader) {
// 启动一个线程竞选leader
int electionTimeoutMs = 0;
uint64_t targetOid;
std::thread thread1(&EtcdClientImp::CampaignLeader, client_, pfx,
common::Thread thread1(&EtcdClientImp::CampaignLeader, client_, pfx,
leaderName1, sessionnInterSec, electionTimeoutMs, &targetOid);
// 等待线程1执行完成, 线程1执行完成就说明竞选成功,
// 否则electionTimeoutMs为0的情况下会一直hung在里面
Expand All @@ -310,7 +310,7 @@ TEST_F(TestEtcdClinetImp, test_CampaignLeader) {
// 启动第二个线程竞选leader
auto client2 = std::make_shared<EtcdClientImp>();
ASSERT_EQ(0, client2->Init(conf, dialtTimeout, retryTimes));
std::thread thread2(&EtcdClientImp::CampaignLeader, client2, pfx,
common::Thread thread2(&EtcdClientImp::CampaignLeader, client2, pfx,
leaderName2, sessionnInterSec, electionTimeoutMs, &leaderOid);
// 线程1退出后,leader2会当选
thread2.join();
Expand All @@ -327,13 +327,13 @@ TEST_F(TestEtcdClinetImp, test_CampaignLeader) {
int electionTimeoutMs = 1000;
auto client1 = std::make_shared<EtcdClientImp>();
ASSERT_EQ(0, client1->Init(conf, dialtTimeout, retryTimes));
std::thread thread1(&EtcdClientImp::CampaignLeader, client1, pfx,
common::Thread thread1(&EtcdClientImp::CampaignLeader, client1, pfx,
leaderName1, sessionnInterSec, electionTimeoutMs, &leaderOid);
thread1.join();
LOG(INFO) << "thread 1 exit.";

// leader2再次竞选
std::thread thread2(&EtcdClientImp::CampaignLeader, client1, pfx,
common::Thread thread2(&EtcdClientImp::CampaignLeader, client1, pfx,
leaderName2, sessionnInterSec, electionTimeoutMs, &leaderOid);
thread2.join();
client1->CloseClient();
Expand All @@ -348,7 +348,7 @@ TEST_F(TestEtcdClinetImp, test_CampaignLeader) {
int electionTimeoutMs = 0;
auto client1 = std::make_shared<EtcdClientImp>();
ASSERT_EQ(0, client1->Init(conf, dialtTimeout, retryTimes));
std::thread thread1(&EtcdClientImp::CampaignLeader, client1, pfx,
common::Thread thread1(&EtcdClientImp::CampaignLeader, client1, pfx,
leaderName1, sessionnInterSec, electionTimeoutMs, &targetOid);
thread1.join();
LOG(INFO) << "thread 1 exit.";
Expand All @@ -357,15 +357,15 @@ TEST_F(TestEtcdClinetImp, test_CampaignLeader) {
client1->LeaderResign(targetOid, 500));

// leader2当选
std::thread thread2(&EtcdClientImp::CampaignLeader, client1, pfx,
common::Thread thread2(&EtcdClientImp::CampaignLeader, client1, pfx,
leaderName2, sessionnInterSec, electionTimeoutMs, &leaderOid);
thread2.join();
// leader1观察到leader改变
ASSERT_EQ(EtcdErrCode::ObserverLeaderChange,
client1->LeaderObserve(targetOid, 1000, leaderName1));

// leader2启动线程observe
std::thread thread3(&EtcdClientImp::LeaderObserve, client1,
common::Thread thread3(&EtcdClientImp::LeaderObserve, client1,
targetOid, 1000, leaderName2);
std::this_thread::sleep_for(std::chrono::seconds(1));
system("killall etcd");
Expand Down
4 changes: 2 additions & 2 deletions test/mds/nameserver2/file_lock_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ TEST_F(FileLockManagerTest, Basic) {
}

TEST_F(FileLockManagerTest, RandomReadWriteLock) {
std::vector<std::thread> threads;
std::vector<common::Thread> threads;
std::srand(std::time(nullptr));
std::string filePath = "/home/dir1/file1";
for (int i = 0; i < 10; i++) {
Expand All @@ -94,7 +94,7 @@ TEST_F(FileLockManagerTest, RandomReadWriteLock) {

TEST_F(FileLockManagerTest, UnlockInAnotherThread) {
std::string filePath = "/home/dir1/file1";
std::thread t1(std::bind(WriteLock, filePath, false));
common::Thread t1(std::bind(WriteLock, filePath, false));
// wait for task to be executed
t1.join();
Unlock(filePath);
Expand Down

0 comments on commit 41a5e11

Please sign in to comment.