forked from cubenlp/ChatSQL
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdb_tools.py
157 lines (141 loc) · 4.41 KB
/
db_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
@Time: 2022/11/03
@Author: LiuShu
@File: 数据库操作类库
"""
import pymysql
from utility.loggers import logger
from utility.utils import config
class Cur_db(object):
def __init__(self):
self.config = config
self.db_name = self.config['database']['DB']
def pymysql_cur(self, reback=5):
""" 连接数据库 """
try:
self.conn = pymysql.connect(host=self.config['database']['HOST'], user=self.config['database']['USER'],
password=self.config['database']['PWD'], db=self.db_name,
port=int(self.config['database']['PORT']),
charset='utf8')
except Exception as e:
if reback == 0:
logger.exception('Exception occurred.')
return
else:
logger.exception('Exception occurred.')
reback -= 1
return self.pymysql_cur(reback)
def get_db_name(self):
"""
:return:
"""
return self.db_name
def select(self, sql, params, reback=2):
""" 查询单条语句,并返回查询所有的结果 """
try:
cur = self.conn.cursor()
cur.execute(sql, params)
# 单条
res = cur.fetchone()
cur.close()
if res:
return res
return
except Exception as e:
logger.exception('Exception occurred.')
if reback > 0:
reback -= 1
return self.select(sql, reback)
else:
logger.info(str('*' * 100))
return
def _select(self, sql, reback=2):
try:
cur = self.conn.cursor()
cur.execute(sql)
# 单条
res = cur.fetchone()
cur.close()
if res:
return res[0]
return
except Exception as e:
logger.exception('Exception occurred.')
if reback > 0:
reback -= 1
return self.select(sql, reback)
else:
logger.info(str('*' * 100))
return
def selectMany(self, sql, reback=2):
try:
cur = self.conn.cursor()
cur.execute(sql)
res = cur.fetchall()
cur.close()
if res:
return res
logger.info(str(sql))
return
except Exception as e:
logger.exception('Exception occurred.')
if reback > 0:
reback -= 1
return self.selectMany(sql, reback)
else:
logger.info(str('*' * 100))
return
def insert(self, sql, params):
cur = self.conn.cursor()
cur.execute(sql, params)
self.conn.commit()
return
def _insert(self, sql):
cur = self.conn.cursor()
cur.execute(sql)
self.conn.commit()
def insert_batch(self, sql, data_list):
"""
将dataframe批量入库
:param sql: 插入语句
:return:
"""
cur = self.conn.cursor()
# 开启事务
self.conn.begin()
try:
cur.executemany(sql, data_list)
self.conn.commit()
cur.close()
self.conn.close()
return True
except:
# 万一失败了,要进行回滚操作
self.conn.rollback()
cur.close()
self.conn.close()
return False
def update(self, sql, params):
cur = self.conn.cursor()
cur.execute(sql, params)
self.conn.commit()
return
def _update(self, sql):
try:
cur = self.conn.cursor()
cur.execute(sql)
self.conn.commit()
except Exception as e:
logger.exception('Exception occurred.')
def close(self):
self.conn.close()
pass
if __name__ == '__main__':
db_con = Cur_db()
logger.info(str(db_con.config['database']['HOST']))
print(str(db_con.config['database']['HOST']))
db_con.pymysql_cur()
sql = "SELECT * FROM cargo"
res = db_con.selectMany(sql)
print(str(res))
db_con.close()