悠闲博客-blog.yxrjt.cn

python 数据库mysql连接池

更新时间:2025-09-24 11:24点击:53

Python中的数据库连接池

在Python中,数据库连接池是一种重要的技术,用于管理和复用数据库连接,以提高应用程序的性能和效率。连接池允许多个线程或进程共享数据库连接,而无需频繁地打开和关闭连接。这样可以减少连接建立和释放的开销,提高响应速度。

实现数据库连接池

Python中有几个流行的库可以用来实现数据库连接池,如SQLAlchemy的QueuePool和DBUtils的PooledDB。这些库提供了类似的功能,但也有一些差异。例如,QueuePool是SQLAlchemy内置的连接池实现,它使用Python的queue模块来管理连接队列,并支持配置最大连接数和预处理语句。而PooledDB则是DBUtils库提供的连接池实现,它支持多种类型的连接池,并使用threading模块实现线程安全,具有更高的性能和稳定性。

以下是使用DBUtils.PooledDB实现的数据库连接池示例代码:

import pymysql
from DBUtils.PooledDB import PooledDB

class MySQLConnectionPool:
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
mincached=10, # 初始化时,链接池中至少创建的链接,0表示不创建
maxconnections=200, # 连接池允许的最大连接数,0和None表示不限制连接数
blocking=True# 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
host='localhost',
port=3306,
user='root',
password='123456',
database='mytest'
)

def open(self):
self.conn = self.pool.connection()
self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
return self.conn, self.cursor

def close(self, cursor, conn):
cursor.close()
conn.close()

# 使用连接池
mysql = MySQLConnectionPool()
conn, cursor = mysql.open()
# 执行数据库操作
cursor.execute('SELECT * FROM your_table')
results = cursor.fetchall()
# 关闭连接
mysql.close(cursor, conn)

连接池的配置和性能优化

连接池的配置和性能优化是确保数据库连接池稳定、高效运行的关键因素。例如,可以设置最大连接数、最小空闲连接数、连接超时时间和最大连接空闲时间等参数。这些参数可以根据应用的并发访问量和数据库的容量来调整,以防止连接池过载和性能下降。

异步连接池

在异步Web框架中,异步连接池是确保应用程序能够高效处理并发请求的关键组件。可以使用asyncio库和aiomysql库来创建异步数据库连接池。以下是一个异步连接池的示例:

import asyncio
import aiomysql

class AsyncConnectionPool:
def __init__(self, max_connections, loop, **kwargs):
self.max_connections = max_connections
self.connections = asyncio.Queue(max_connections)
self.loop = loop
self.kwargs = kwargs

async def _create_connection(self):
return await aiomysql.connect(loop=self.loop, **self.kwargs)

async def get_connection(self):
if self.connections.qsize() < self.max_connections:
connection = await self._create_connection()
await self.connections.put(connection)
return await self.connections.get()

async def release_connection(self, connection):
await self.connections.put(connection)

# 使用异步连接池
async def example_usage():
loop = asyncio.get_event_loop()
async_pool = AsyncConnectionPool(
max_connections=10,
loop=loop,
user='user',
password='password',
host='localhost',
port=3306,
db='mydb'
)
async with async_pool.get_connection() as conn:
# 执行数据库操作
pass

loop.run_until_complete(async_pool.release_connection(conn))

loop = asyncio.get_event_loop()
loop.run_until_complete(example_usage())

安全性和异常处理

在连接池的实现中,确保安全性和有效地处理异常是非常重要的。应该在代码中添加适当的异常处理,以确保系统在异常情况下能够正常工作。同时,应该避免在代码中硬编码敏感信息,例如数据库密码,并在生产环境中使用安全的配置管理工具或环境变量来存储敏感信息。

通过本文,读者将深入了解数据库连接池的实现原理和在Python中的具体应用。丰富的示例代码和全面的讲解将帮助读者更好地理解并成功实现一个高效可靠的数据库连接池。希望这篇文章能够成为读者学习和应用数据库连接池的有力指南。


‌关于dbutils模块的说明

代码示例中导入的方式是:


from DBUtils.PooledDB import PooledDB

1.

使用这种方法导入需要指定1.3版本install:


pip3 install DBUtils==1.3

1.

如果DBUtils的版本高于1版本需要按照这种方式引入:


from dbutils.pooled_db import PooledDB

1.

准备工作

MySQL库表

在t1库创建一张名为test_table的表,里面的数据如下:




配置文件

跟项目脚本同级目录存放MySQL的配置settings.ini:


[MYSQL]

HOST = 127.0.0.1

PORT = 3306

USER = root

PASSWORD = 123

DATABASE = t1

CHARSET = utf8



pymysql的基本操作

# -*- coding:utf-8 -*-

import os

import pymysql

import configparser



# 获取配置

current_path = os.path.abspath(".")

config = configparser.ConfigParser()

config.read(os.path.join(current_path,"settings.ini"))


mysql_conf = dict(

    host=config["MYSQL"]["HOST"],

    port=int(config["MYSQL"]["PORT"]), # 注意这里得把port转换成int!

    user=config["MYSQL"]["USER"],

    password=config["MYSQL"]["PASSWORD"],

    database=config["MYSQL"]["DATABASE"],

    charset=config["MYSQL"]["CHARSET"],

)


# 连接数据库 —— 注意这里password得写成字符串类型

conn = pymysql.connect(**mysql_conf)

# 获取光标对象

cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)


## 执行sql语句

sql_str1 = """ select * from test_table """

# 获得受影响的信息条数

res_count = cursor.execute(sql_str1)

print(res_count) # 3

# 获取数据

res_datas = cursor.fetchall()

print(res_datas) # [{'id': 1, 'name': 'whw', 'age': 18}, {'id': 2, 'name': 'naruto', 'age': 19}, {'id': 3, 'name': 'sasuke', 'age': 20}]


## 关闭链接

cursor.close()

conn.close()


  将连接数据库封装为一个方法,每次需要连接的时候调用该方法获取conn和cursor对象,但是这样会有损耗,因为每次都需要 建立连接->执行数据库操作->释放连接。而数据库连接池为维护一个保存有多个数据库连接的池子,每次需要连接数据库时,从连接池中取出一个连接进行使用即可,使用完毕后连接不会释放,而是归还给连接池进行管理,节省了不断建立连接和释放连接的过程。


使用DBUtils+pymysql建立数据库连接池

  这里还使用了with上下文管理与装饰器两种方式实现了链接池的效果。


  另外还使用pymysql实现了事物的操作。


# -*- coding:utf-8 -*-

import os

import pymysql

import configparser

from DBUtils.PooledDB import PooledDB



# 获取配置

current_path = os.path.abspath(".")

config = configparser.ConfigParser()

config.read(os.path.join(current_path,"settings.ini"))


mysql_conf = dict(

    host=config["MYSQL"]["HOST"],

    port=int(config["MYSQL"]["PORT"]), # 注意这里得把port转换成int!

    user=config["MYSQL"]["USER"],

    password=config["MYSQL"]["PASSWORD"],

    database=config["MYSQL"]["DATABASE"],

    charset=config["MYSQL"]["CHARSET"],

)


class MySQLPool(object):

    # 类属性

    pool = PooledDB(creator=pymysql,**mysql_conf) # 注意creator参数


    def __enter__(self):

        self.conn = MySQLPool.pool.connection()

        self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)

        return self


    def __exit__(self, exc_type, exc_val, exc_tb):

        # 关闭链接

        self.cursor.close()

        self.conn.close()


## with上下文使用方法

def func1():

    with MySQLPool() as db:

        sql_str1 = """ select * from test_table """

        res_count = db.cursor.execute(sql_str1)

        print(res_count)

        res_datas = db.cursor.fetchall()

        print(res_datas)

        # 添加一条数据

        sql_insert = """insert into test_table(name,age) values(%s,%s)"""

        res = db.cursor.execute(sql_insert,["wanghw11",18])

        print(res)

        db.conn.commit()

# func1

func1()


## 装饰器方式执行

def mysql_wrapper(func):

    def wrapper(*args,**kwargs):

        with MySQLPool() as db:

            result = func(db,*args,**kwargs)

        # return

        return result

    return wrapper


@mysql_wrapper

def func2(db,*args,**kwargs):

    # 执行事物

    try:

        # 添加一条数据

        sql_insert = """insert into test_table(name,age) values(%s,%s)"""

        # 修改一条数据

        sql_update = """ update test_table set age=22 where name='whw' """

        db.cursor.execute(sql_insert,["whw666",20])

        db.cursor.execute(sql_update)

    except Exception as e:

        # 回滚

        db.conn.rollback()

        print(f"事物执行失败:{e}")

    else:

        # 提交事物

        db.conn.commit()

        print("事物执行成功:",db.cursor.rowcount)

# func2

func2()


  可以看到,通过DBUtils,实例化一个PooledDB对象作为MysqlPool类的类属性,通过重写__enter__和__exit__方法让我们在进入with语句时从连接池中获取到数据库连接,在with语句结束后,自动释放连接池,归还给连接池。


  成功执行事物后可以在数据库中查看一下数据的变化。


模拟一下事物执行失败的情形

  这里使用自定义的异常模拟事物执行失败时的现象,数据库中的数据都没有任何变化。


复制

# -*- coding:utf-8 -*-

import os

import pymysql

import configparser

from DBUtils.PooledDB import PooledDB



# 获取配置

current_path = os.path.abspath(".")

config = configparser.ConfigParser()

config.read(os.path.join(current_path,"settings.ini"))


mysql_conf = dict(

    host=config["MYSQL"]["HOST"],

    port=int(config["MYSQL"]["PORT"]), # 注意这里得把port转换成int!

    user=config["MYSQL"]["USER"],

    password=config["MYSQL"]["PASSWORD"],

    database=config["MYSQL"]["DATABASE"],

    charset=config["MYSQL"]["CHARSET"],

)


class MySQLPool(object):

    # 类属性

    pool = PooledDB(creator=pymysql,**mysql_conf) # 注意creator参数


    def __enter__(self):

        self.conn = MySQLPool.pool.connection()

        self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)

        return self  # 记得return self


def __exit__(self, exc_type, exc_val, exc_tb):

        # 关闭链接

        self.cursor.close()

        self.conn.close()


# 自定义异常类

class MyException(BaseException):

    def __init__(self,msg):

        super().__init__()

        self.msg = msg


    def __str__(self):

        return self.msg


## 装饰器方式执行 —— 执行事物是出现异常的测试

def mysql_wrapper(func):

    def wrapper(*args,**kwargs):

        with MySQLPool() as db:

            result = func(db,*args,**kwargs)

        # return

        return result

    return wrapper


flag = True


@mysql_wrapper

def func2(db,*args,**kwargs):

    # 执行事物

    try:

        # 添加一条数据

        sql_insert = """insert into test_table(name,age) values(%s,%s)"""

        # 修改一条数据

        sql_update = """ update test_table set age=23 where name='whw' """

        db.cursor.execute(sql_insert,["whw123",20])

        db.cursor.execute(sql_update)

        # 这里做一个逻辑判断 —— 可根据具体的业务而定

        if flag:

            raise MyException("执行事物是发生异常!")

    except MyException as e:

        # 回滚

        db.conn.rollback()

        print(f"事物执行失败:{e}")

    except Exception as e:

        # 回滚

        db.conn.rollback()

        print(f"事物执行失败:{e}")

    else:

        # 提交事物

        db.conn.commit()

        print("事物执行成功:",db.cursor.rowcount)

# func2

func2()


栏目分类

联系方式
  • help@yxrjt.cn
  • lgc@yxrjt.cn
  • admin@yxrjt.cn