Skip to content

Commit

Permalink
finish pool
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCaius committed Jun 13, 2023
1 parent 85f5902 commit 4ef6b05
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 0 deletions.
32 changes: 32 additions & 0 deletions code/pool/sqlconnRAII.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@


#ifndef SQLCONNRAII_H
#define SQLCONNRAII_H
#include "sqlconnpool.h"

/* 资源在对象构造初始化 资源在对象析构时释放*/
class SqlConnRAII
{
public:
SqlConnRAII(MYSQL **sql, SqlConnPool *connpool)
{
assert(connpool);
*sql = connpool->GetConn();
sql_ = *sql;
connpool_ = connpool;
}

~SqlConnRAII()
{
if (sql_)
{
connpool_->FreeConn(sql_);
}
}

private:
MYSQL *sql_;
SqlConnPool *connpool_;
};

#endif // SQLCONNRAII_H
91 changes: 91 additions & 0 deletions code/pool/sqlconnpool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@


#include "sqlconnpool.h"
using namespace std;

SqlConnPool::SqlConnPool()
{
useCount_ = 0;
freeCount_ = 0;
}

SqlConnPool *SqlConnPool::Instance()
{
static SqlConnPool connPool;
return &connPool;
}

void SqlConnPool::Init(const char *host, int port,
const char *user, const char *pwd, const char *dbName,
int connSize = 10)
{
assert(connSize > 0);
for (int i = 0; i < connSize; i++)
{
MYSQL *sql = nullptr;
sql = mysql_init(sql);
if (!sql)
{
LOG_ERROR("MySql init error!");
assert(sql);
}
sql = mysql_real_connect(sql, host,
user, pwd,
dbName, port, nullptr, 0);
if (!sql)
{
LOG_ERROR("MySql Connect error!");
}
connQue_.push(sql);
}
MAX_CONN_ = connSize;
sem_init(&semId_, 0, MAX_CONN_);
}

MYSQL *SqlConnPool::GetConn()
{
MYSQL *sql = nullptr;
if (connQue_.empty())
{
LOG_WARN("SqlConnPool busy!");
return nullptr;
}
sem_wait(&semId_);
{
lock_guard<mutex> locker(mtx_);
sql = connQue_.front();
connQue_.pop();
}
return sql;
}

void SqlConnPool::FreeConn(MYSQL *sql)
{
assert(sql);
lock_guard<mutex> locker(mtx_);
connQue_.push(sql);
sem_post(&semId_);
}

void SqlConnPool::ClosePool()
{
lock_guard<mutex> locker(mtx_);
while (!connQue_.empty())
{
auto item = connQue_.front();
connQue_.pop();
mysql_close(item);
}
mysql_library_end();
}

int SqlConnPool::GetFreeConnCount()
{
lock_guard<mutex> locker(mtx_);
return connQue_.size();
}

SqlConnPool::~SqlConnPool()
{
ClosePool();
}
40 changes: 40 additions & 0 deletions code/pool/sqlconnpool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

#ifndef SQLCONNPOOL_H
#define SQLCONNPOOL_H

#include <mysql/mysql.h>
#include <string>
#include <queue>
#include <mutex>
#include <semaphore.h>
#include <thread>
#include "../log/log.h"

class SqlConnPool
{
public:
static SqlConnPool *Instance();

MYSQL *GetConn();
void FreeConn(MYSQL *conn);
int GetFreeConnCount();

void Init(const char *host, int port,
const char *user, const char *pwd,
const char *dbName, int connSize);
void ClosePool();

private:
SqlConnPool();
~SqlConnPool();

int MAX_CONN_;
int useCount_;
int freeCount_;

std::queue<MYSQL *> connQue_;
std::mutex mtx_;
sem_t semId_;
};

#endif // SQLCONNPOOL_H
74 changes: 74 additions & 0 deletions code/pool/threadpool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@


#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <functional>
class ThreadPool
{
public:
explicit ThreadPool(size_t threadCount = 8) : pool_(std::make_shared<Pool>())
{
assert(threadCount > 0);
for (size_t i = 0; i < threadCount; i++)
{
std::thread([pool = pool_]
{
std::unique_lock<std::mutex> locker(pool->mtx);
while(true) {
if(!pool->tasks.empty()) {
auto task = std::move(pool->tasks.front());
pool->tasks.pop();
locker.unlock();
task();
locker.lock();
}
else if(pool->isClosed) break;
else pool->cond.wait(locker);
} })
.detach();
}
}

ThreadPool() = default;

ThreadPool(ThreadPool &&) = default;

~ThreadPool()
{
if (static_cast<bool>(pool_))
{
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->isClosed = true;
}
pool_->cond.notify_all();
}
}

template <class F>
void AddTask(F &&task)
{
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->tasks.emplace(std::forward<F>(task));
}
pool_->cond.notify_one();
}

private:
struct Pool
{
std::mutex mtx;
std::condition_variable cond;
bool isClosed;
std::queue<std::function<void()>> tasks;
};
std::shared_ptr<Pool> pool_;
};

#endif // THREADPOOL_H

0 comments on commit 4ef6b05

Please sign in to comment.