Python 中创建 PostgreSQL 数据库连接池
习惯于使用数据库之前都必须创建一个连接池,即使是单线程的应用,只要有多个方法中需用到数据库连接,建立一两个连接的也会考虑先池化他们。连接池的好处多多,
- 1) 如果反复创建连接相当耗时,
 - 2) 对于单个连接一路用到底的应用,有连接池时避免了数据库连接对象传来传去,
 - 3) 忘记关连接了,连接池幸许还能帮忙在一定时长后关掉,当然密集取连接的应用势将耗尽连接,
 - 4) 一个应用打开连接的数量是可控的
 
接触到 Python 后,在使用 PostgreSQL 也自然而然的考虑创建连接池,使用时从池中取,用完后还回去,而不是每次需要连接时创建一个物理的。Python 连接 PostgreSQL 是主要有两个包, py-postgresql 和 psycopg2 , 而本文的实例将使用后者。
Psycopg 在 psycopg2.pool 模块中提供了两个连接池的实现在,它们都继承自 psycopg2.pool.AbstractConnectionPool,
该抽象类的基本方法是
getconn(key=None):获取连接putconn(conn, key=None, close=False):归还连接closeall():关闭连接池中的所有连接
两个连接池的实现类是
psycopg2.pool.SimpleConnectionPool(minconn, maxconn, *args, **kwars): 给单线程应用用的psycopg2.pool.ThreadedConnectionPool(minconn, maxconn, *args, **kwars): 多线程时更安全,其实就是在getconn()和putconn()时加了锁来控制
所以最安全保险的做法还是使用 ThreadedConnectionPool, 在单线程应用中, SimpleConnectionPool 也不见得比 ThreadedConnectionPool 效率高多少。
下面来看一个具体的连接池实现,其中用到了 Context Manager, 使用时结合 with 键字更方便,用完后不用显式的调用 putconn() 归还连接
db_helper.py
from psycopg2 import pool from psycopg2.extras import RealDictCursor from contextlib import contextmanager import atexit class DBHelper: def __init__(self): self._connection_pool = None def initialize_connection_pool(self): db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5' self._connection_pool = pool.ThreadedConnectionPool(1, 3,db_dsn) @contextmanager def get_resource(self, autocommit=True): if self._connection_pool is None: self.initialize_connection_pool() conn = self._connection_pool.getconn() conn.autocommit = autocommit cursor = conn.cursor(cursor_factory=RealDictCursor) try: yield cursor, conn finally: cursor.close() self._connection_pool.putconn(conn) def shutdown_connection_pool(self): if self._connection_pool is not None: self._connection_pool.closeall() db_helper = DBHelper() @atexit.register def shutdown_connection_pool(): db_helper.shutdown_connection_pool() from psycopg2 import pool from psycopg2 . extras import RealDictCursor from contextlib import contextmanager import atexit class DBHelper : def __init__ ( self ) : self . _connection_pool = None def initialize_connection_pool ( self ) : db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5' self . _connection_pool = pool . ThreadedConnectionPool ( 1 , 3 , db_dsn ) @ contextmanager def get_resource ( self , autocommit = True ) : if self . _connection_pool is None : self . initialize_connection_pool ( ) conn = self . _connection_pool . getconn ( ) conn . autocommit = autocommit cursor = conn . cursor ( cursor_factory = RealDictCursor ) try : yield cursor , conn finally : cursor . close ( ) self . _connection_pool . putconn ( conn ) def shutdown_connection_pool ( self ) : if self . _connection_pool is not None : self . _connection_pool . closeall ( ) db_helper = DBHelper ( ) @ atexit . register def shutdown_connection_pool ( ) : db_helper . shutdown_connection_pool ( )
几点说明:
- 只在第一次调用 
get_resource()时创建连接池,而不是在from db_helper import db_helper引用时就创建连接池 Context Manager返回了两个对象,cursor和connection, 需要用connection管理事物时用它- 默认时 
cursor返回的记录是字典,而非数组 - 默认时连接为自动提交
 - 最后的
@atexit.register那个ShutdownHook可能有点多余,在进程退出时连接也被关闭,TIME_WAIT时间应该会稍长些
 
使用方式:
如果不用事物
from db_helper import db_helper
with db_helper.get_resource() as (cursor, _):
 cursor.execute('select * from users')
 for record in cursor.fetchall():
  ... process record, record['name'] ...
from db_helper import db_helper
with db_helper . get_resource ( ) as ( cursor , _ ) :
  cursor . execute ( 'select * from users' )
  for record in cursor . fetchall ( ) :
. . . process record , record [ 'name' ] . . .
如果需要用到事物
with db_helper.get_resource(autocommit=False) as (cursor, _):
 try:
  cursor.execute('update users set name = %s where id = %s', ('new_name', 1))
  cursor.execute('delete from orders where user_id = %s', (1,))
  conn.commit()
 except:
  conn.rollback()
with db_helper . get_resource ( autocommit = False ) as ( cursor , _ ) :
  try :
cursor . execute ( 'update users set name = %s where id = %s' , ( 'new_name' , 1 ) )
cursor . execute ( 'delete from orders where user_id = %s' , ( 1 , ) )
conn . commit ( )
  except :
conn . rollback ( )
在写作本文时,查看 psycopg 的官网时,发现 Psycopg 3.0 正式版在 2021-10-13 日发布了( Psycopg 3.0 released ), 更好的支持 async。在 Psycopg2 2.2 版本时就开始支持异步了。而且还注意到 Psycopg 的主要部分是用 C 实现的,才使得它效率比较高,也难怪经常用 pip install psycopg2 安装不成功,而要用 pip install psycopg2-binary 来安装的原因。
在创建连接池时加上参数 keepalivesXxx 能让服务器及时断掉死链接,否则在 Linux 下默认要 2 个小时后才断开。死链接的情况发生在客户端异常退出(如断电)时先前建立的链接就变为死链接了。
pool.ThreadedConnectionPool(1, 3, db_dsn, keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5)
PostgreSQL 服务端会对连接在空闲 tcp_keepalives_idle 秒后,主动发送tcp_keepalives_count 个 tcp_keeplive 侦测包,每个侦探包在 tcp_keepalives_interval 秒内都没有回应,就认为是死连接,于是切断它。
到此这篇关于Python 中创建 PostgreSQL 数据库连接池的文章就介绍到这了,更多相关PostgreSQL Python内容请搜索本站以前的文章或继续浏览下面的相关文章希望大家以后多多支持本站!
版权声明:本站文章来源标注为YINGSOO的内容版权均为本站所有,欢迎引用、转载,请保持原文完整并注明来源及原文链接。禁止复制或仿造本网站,禁止在非www.yingsoo.com所属的服务器上建立镜像,否则将依法追究法律责任。本站部分内容来源于网友推荐、互联网收集整理而来,仅供学习参考,不代表本站立场,如有内容涉嫌侵权,请联系alex-e#qq.com处理。
                    关注官方微信