Skip to content

Commit

Permalink
Use two-phase construction for the DbClientImpl and the RedisClientIm…
Browse files Browse the repository at this point in the history
  • Loading branch information
an-tao authored May 10, 2021
1 parent f8e56d8 commit 471488e
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 50 deletions.
32 changes: 17 additions & 15 deletions nosql_lib/redis/src/RedisClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ std::shared_ptr<RedisClient> RedisClient::newRedisClient(
size_t connectionNumber,
const std::string &password)
{
return std::make_shared<RedisClientImpl>(serverAddress,
connectionNumber,
password);
auto client = std::make_shared<RedisClientImpl>(serverAddress,
connectionNumber,
password);
client->init();
return client;
}

RedisClientImpl::RedisClientImpl(const trantor::InetAddress &serverAddress,
Expand All @@ -37,20 +39,20 @@ RedisClientImpl::RedisClientImpl(const trantor::InetAddress &serverAddress,
password_(std::move(password)),
numberOfConnections_(numberOfConnections)
{
loops_.start();

std::thread([this]() {
for (size_t i = 0; i < numberOfConnections_; ++i)
{
auto loop = loops_.getNextLoop();
loop->queueInLoop([this, loop]() {
std::lock_guard<std::mutex> lock(connectionsMutex_);
connections_.insert(newConnection(loop));
});
}
}).detach();
}

void RedisClientImpl::init()
{
loops_.start();
for (size_t i = 0; i < numberOfConnections_; ++i)
{
auto loop = loops_.getNextLoop();
loop->queueInLoop([this, loop]() {
std::lock_guard<std::mutex> lock(connectionsMutex_);
connections_.insert(newConnection(loop));
});
}
}
RedisConnectionPtr RedisClientImpl::newConnection(trantor::EventLoop *loop)
{
auto conn = std::make_shared<RedisConnection>(serverAddr_, password_, loop);
Expand Down
1 change: 1 addition & 0 deletions nosql_lib/redis/src/RedisClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class RedisClientImpl final
{
timeout_ = timeout;
}
void init();

private:
trantor::EventLoopThreadPool loops_;
Expand Down
20 changes: 13 additions & 7 deletions orm_lib/src/DbClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ std::shared_ptr<DbClient> DbClient::newPgClient(const std::string &connInfo,
const size_t connNum)
{
#if USE_POSTGRESQL
return std::make_shared<DbClientImpl>(connInfo,
connNum,
ClientType::PostgreSQL);
auto client = std::make_shared<DbClientImpl>(connInfo,
connNum,
ClientType::PostgreSQL);
client->init();
return client;
#else
LOG_FATAL << "PostgreSQL is not supported!";
exit(1);
Expand All @@ -47,7 +49,10 @@ std::shared_ptr<DbClient> DbClient::newMysqlClient(const std::string &connInfo,
const size_t connNum)
{
#if USE_MYSQL
return std::make_shared<DbClientImpl>(connInfo, connNum, ClientType::Mysql);
auto client =
std::make_shared<DbClientImpl>(connInfo, connNum, ClientType::Mysql);
client->init();
return client;
#else
LOG_FATAL << "Mysql is not supported!";
exit(1);
Expand All @@ -61,9 +66,10 @@ std::shared_ptr<DbClient> DbClient::newSqlite3Client(
const size_t connNum)
{
#if USE_SQLITE3
return std::make_shared<DbClientImpl>(connInfo,
connNum,
ClientType::Sqlite3);
auto client =
std::make_shared<DbClientImpl>(connInfo, connNum, ClientType::Sqlite3);
client->init();
return client;
#else
LOG_FATAL << "Sqlite3 is not supported!";
exit(1);
Expand Down
44 changes: 16 additions & 28 deletions orm_lib/src/DbClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,36 +61,24 @@ DbClientImpl::DbClientImpl(const std::string &connInfo,
type_ = type;
connectionInfo_ = connInfo;
LOG_TRACE << "type=" << (int)type;
// LOG_DEBUG << loops_.getLoopNum();
assert(connNum > 0);
}
void DbClientImpl::init()
{
// LOG_DEBUG << loops_.getLoopNum();
loops_.start();
if (type == ClientType::PostgreSQL)
{
std::thread([this]() {
for (size_t i = 0; i < numberOfConnections_; ++i)
{
auto loop = loops_.getNextLoop();
loop->runInLoop([this, loop]() {
std::lock_guard<std::mutex> lock(connectionsMutex_);
connections_.insert(newConnection(loop));
});
}
}).detach();
}
else if (type == ClientType::Mysql)
if (type_ == ClientType::PostgreSQL || type_ == ClientType::Mysql)
{
std::thread([this]() {
for (size_t i = 0; i < numberOfConnections_; ++i)
{
auto loop = loops_.getNextLoop();
loop->runAfter(0.1 * (i + 1), [this, loop]() {
std::lock_guard<std::mutex> lock(connectionsMutex_);
connections_.insert(newConnection(loop));
});
}
}).detach();
for (size_t i = 0; i < numberOfConnections_; ++i)
{
auto loop = loops_.getNextLoop();
loop->runInLoop([this, loop]() {
std::lock_guard<std::mutex> lock(connectionsMutex_);
connections_.insert(newConnection(loop));
});
}
}
else if (type == ClientType::Sqlite3)
else if (type_ == ClientType::Sqlite3)
{
sharedMutexPtr_ = std::make_shared<SharedMutex>();
assert(sharedMutexPtr_);
Expand All @@ -104,7 +92,6 @@ DbClientImpl::DbClientImpl(const std::string &connInfo,
});
}
}

DbClientImpl::~DbClientImpl() noexcept
{
std::lock_guard<std::mutex> lock(connectionsMutex_);
Expand Down Expand Up @@ -460,7 +447,8 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
{
std::lock_guard<std::mutex> guard(thisPtr->connectionsMutex_);
thisPtr->busyConnections_.insert(
okConnPtr); // For new connections, this sentence is necessary
okConnPtr); // For new connections, this sentence is
// necessary
}
thisPtr->handleNewTask(okConnPtr);
});
Expand Down
1 change: 1 addition & 0 deletions orm_lib/src/DbClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class DbClientImpl : public DbClient,
{
timeout_ = timeout;
}
void init();

private:
size_t numberOfConnections_;
Expand Down

0 comments on commit 471488e

Please sign in to comment.