DBUtils 是Python 的一个用于实现数据库连接池的模块。

此连接池有两种连接模式:

DBUtils :提供两种外部接口:

PersistentDB :提供线程专用的数据库连接,并自动管理连接。

PooledDB :提供线程间可共享的数据库连接,并自动管理连接。

介绍

PersistentDB模式

为每个线程创建一个连接,线程即使调用了 close 方法,也不会关闭,只是把链接重新放到链接池,供自己线程再次使用,当线程终止时,链接自动关闭。

from DBUtils.PersistentDB import PersistentDB
import pymysql
POOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0, # ping MySQL服务端,检查是否服务可用。
closeable=False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host='127.0.0.1',
port=3306,
user='root',
password='',
database='test',
charset='utf8'
) def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from user')
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
if __name__ == '__main__': func()

PooledDB模式

创建一批连接到连接池,供所有线程共享使用。

import pymysql

from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0, # ping MySQL服务端,检查是否服务可用。
host='127.0.0.1',
port=3306,
user='root',
password='',
database='test',
charset='utf8'
) def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则等待或报raise TooManyConnections异常
# 否则则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = POOL.connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute('select * from user')
result = cursor.fetchall()
print(result)
conn.close() if __name__ == '__main__': func()

工具类

# TEST数据库信息
DB_TEST_HOST = "127.0.0.1"
DB_TEST_PORT = 3306
DB_TEST_DBNAME = ""
DB_TEST_USER = "root"
DB_TEST_PASSWORD = "root" # 数据库连接编码
DB_CHARSET = "utf8" # mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
DB_MIN_CACHED = 10 # maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
DB_MAX_CACHED = 10 # maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
DB_MAX_SHARED = 20 # maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS = 100 # blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......>; 其他代表阻塞直到连接数减少,连接被分配)
DB_BLOCKING = True # maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
DB_MAX_USAGE = 0 # setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
DB_SET_SESSION = None

db_config.py

import pymysql
from DBUtils.PooledDB import PooledDB
import db_config as Config class ConnectionPool(object):
__pool = None def __enter__(self):
self.conn = self.__getConn()
self.cursor = self.conn.cursor()
print("数据库创建conn和cursor")
return self def __getConn(self):
if self.__pool is None:
self.__pool = PooledDB(creator=pymysql, mincached=Config.DB_MIN_CACHED, maxcached=Config.DB_MAX_CACHED,
maxshared=Config.DB_MAX_SHARED, maxconnections=Config.DB_MAX_CONNECYIONS,
blocking=Config.DB_BLOCKING, maxusage=Config.DB_MAX_USAGE,
setsession=Config.DB_SET_SESSION,
host=Config.DB_TEST_HOST, port=Config.DB_TEST_PORT,
user=Config.DB_TEST_USER, passwd=Config.DB_TEST_PASSWORD,
db=Config.DB_TEST_DBNAME, use_unicode=False, charset=Config.DB_CHARSET)
return self.__pool.connection() def __exit__(self, type, value, trace):
"""
@summary: 释放连接池资源
"""
self.cursor.close()
self.conn.close()
print("连接池释放conn和cursor") def getconn(self):
'''
从连接池中取出一个连接
'''
conn = self.__getConn()
cursor = conn.cursor(pymysql.cursors.DictCursor)
return cursor, conn def close(self):
'''
关闭连接归还给连接池
'''
self.cursor.close()
self.conn.close()
print("连接池释放conn和cursor") POOL = ConnectionPool() class MysqlHelper(object):
mysql = None def __init__(self):
self.db = POOL def __new__(cls, *args, **kwargs):
if not hasattr(cls, 'inst'):
cls.inst = super(MysqlHelper, cls).__new__(cls, *args, **kwargs)
return cls.inst def selectall(self, sql='', param=()):
'''
查询所有
''' try:
cursor, conn = self.execute(sql, param)
res = cursor.fetchall()
self.close(cursor, conn)
return res
except Exception as e:
print('selectall except ', e.args)
self.close(cursor, conn)
return None def selectone(self, sql='', param=()):
'''
查询一条
'''
try:
cursor, conn = self.execute(sql, param)
res = cursor.fetchone()
self.close(cursor, conn)
return res
except Exception as e:
print('selectone except ', e.args)
self.close(cursor, conn)
return None def insert(self, sql='', param=()):
'''
增加
'''
try:
cursor, conn = self.execute(sql, param)
print('============')
_id = cursor.lastrowid
print('_id ', _id)
conn.commit()
self.close(cursor, conn)
# 防止表中没有id返回0
if _id == 0:
return True
return _id
except Exception as e:
print('insert except ', e.args)
conn.rollback()
self.close(cursor, conn)
# self.conn.rollback()
return 0 def insertmany(self, sql='', param=()):
'''
增加多行
'''
cursor, conn = self.db.getconn()
try:
cursor.executemany(sql, param)
conn.commit()
self.close(cursor, conn)
return True
except Exception as e:
print('insert many except ', e.args)
conn.rollback()
self.close(cursor, conn)
return False def delete(self, sql='', param=()):
'''
删除
'''
try:
cursor, conn = self.execute(sql, param)
self.close(cursor, conn)
return True
except Exception as e:
print('delete except ', e.args)
conn.rollback()
self.close(cursor, conn)
return False def update(self, sql='', param=()):
'''
更新
'''
try:
cursor, conn = self.execute(sql, param)
self.close(cursor, conn)
return True
except Exception as e:
print('update except ', e.args)
conn.rollback()
self.close(cursor, conn)
return False @classmethod
def getInstance(self):
if MysqlHelper.mysql == None:
MysqlHelper.mysql = MysqlHelper()
return MysqlHelper.mysql # 执行命令
def execute(self, sql='', param=(), autoclose=False):
cursor, conn = self.db.getconn()
try:
if param:
cursor.execute(sql, param)
else:
cursor.execute(sql)
conn.commit()
if autoclose:
self.close(cursor, conn)
except Exception as e:
pass
return cursor, conn def executemany(self, list=[]):
'''
# 执行多条命令
'[{"sql":"xxx","param":"xx"}....]'
'''
cursor, conn = self.db.getconn()
try:
for order in list:
sql = order['sql']
param = order['param']
if param:
cursor.execute(sql, param)
else:
cursor.execute(sql)
conn.commit()
self.close(cursor, conn)
return True
except Exception as e:
print('execute failed========', e.args)
conn.rollback()
self.close(cursor, conn)
return False def close(self, cursor, conn):
cursor.close()
conn.close()
print("PT连接池释放con和cursor")

mysqlhelper

最新文章

  1. linux c 笔记-1
  2. 在navgationController中添加UISegmentedControl
  3. ORACLE变量定义及使用(另,T-SQL EXISTS的PLSQL替代写法)
  4. 写出优秀论文How To Write A Great Essay About Anything
  5. Thinkphp 缓存微信jssdk相关认证参数
  6. Android项目包装apk和apk反编译,xml反编译
  7. 【CSS学习笔记】a标签的四种伪类
  8. volume 生命周期管理 - 每天5分钟玩转 Docker 容器技术(44)
  9. 微信公众平台开发,API接入与推送事件(1)
  10. TensorFlow-Slim使用方法说明
  11. Ubuntu上更改MySQL数据库数据存储目录
  12. 数据库的数据进行改动,Cognos报表展示未及时更新
  13. 高性能mysql 事务笔记
  14. python 二维数组读入
  15. 十五、springcloud(一)注册中心Eureka
  16. 20165316 实验四 Android程序设计
  17. jquery中data()和js中dataset属性的区别
  18. 删除vector指定位置的元素
  19. ActiveMQ实现消息的发送与接受
  20. DataSnap对象传递

热门文章

  1. 【CFD之道】2017年原创文章汇总
  2. vue2.0引入现有css文件
  3. python – 基于pandas中的列中的值从DataFrame中选择行
  4. dedecms 5.7sp2 20170405运行PHP7.1的大坑(dedecms PHP7.1)
  5. 如何查看.Net Framework版本
  6. VS调试不能进入断点,提示当前不会命中断点还未为文档加载任何符号
  7. Houdini技术体系 基础管线(三) :UE4以选择区域的方式对地形做生成和更新 上篇
  8. js 对象转&amp;拼接
  9. IIS 请求 超时设置
  10. springboot-multisource