Python教程:用sqlalchemy实现连接池管理,操作简化

发布时间:2026/7/5 4:44:31
Python教程:用sqlalchemy实现连接池管理,操作简化 一、背景对数据库有了解的同学都知道数据库的连接并不是无限的在开发实践中必然要面对Engine、Connection生命周期的管理如何创建全局的事例连接线程不会无节制的膨胀就是这账要讲的内容二、如何创建全局实例以FastApi为例在main界面创建连接池将其添加到FastAPI的state中db DB() asynccontextmanager async def lifespan(app: FastAPI): # 预热连接池 db.get_engine(warehousedata) db.get_engine(salarydata) db.get_engine(permission) db.get_engine(analysisdata) app.state.db db yield db.close_all() # 再创建依赖就可以了 def get_db(request: Request) - DB: return request.app.state.db像是QT一类的应用程序开发框架也是相同的思路在创建页面的时候作为初始化参数传入就行了class WarehouseDataWidget(QWidget,Ui_MainWin): def __init__(self,userlogin:UserLogin,thread_pool:QThreadPool): super().__init__() self.userlogin userlogin这样这样一来就不会出现N个界面N个实例了三、引擎统一管理完成了上一部之后若你还是用一次引擎就创建一次新的那连接膨胀的问题只是减缓了url ( fmysqlpymysql://{user}:{password} f{host}:{int(port)}/{database}?charsetutf8mb4 ) engine create_engine( url, pool_size10, # 常驻连接数 max_overflow20, # 临时扩展连接 pool_timeout30, # 等待连接时间 pool_recycle1800, # 回收连接 pool_pre_pingTrue, # 防断线 echoFalse, futureTrue, connect_args{ **ssl_args, connect_timeout: 5, }, ) # 预热连接 with engine.connect() as conn: conn.exec_driver_sql(SELECT 1)要解决这个问题我们要创建一个缓存管理器使用之前查看引擎是否存在存在直接复用就行class DBManager: def __init__(self): self._engines {} def get(self,db_name): return self._engines.get(db_name) def set(self,db_name,engine): self._engines[db_name] engine def close_all(self): for engine in self._engines.values(): engine.dispose() self._engines.clear()四、熟练使用WITH若你前三点都做的不错但是连接回收问题还是没有解决那你取出的Connection大概率就是连接没有及时的close若你嫌麻烦那就用with用完之后自动关闭with self.db.get_conn(salarydata) as conn: return self.db.execute_fetch( conn, query, params, return_typedict )五、简化操作下面是我实践中常用的一些方法直接贴在下面了##------------SQL执行 def execute(self, conn: Connection, sql_query, paramsNone) - int: 执行SQL语句;注意:conn现在由应用层管理声明周期,使用完成后需要用[close]关闭 return self._execute(conn, sql_query, params) overload def execute_fetch(self, conn: Connection, sql_query, params, return_type: Literal[scalar]) - Any: ... overload def execute_fetch(self, conn: Connection, sql_query, params, return_type: Literal[all]) - List[Row]: ... overload def execute_fetch(self, conn: Connection, sql_query, params, return_type: Literal[single_column]) - List[Any]: ... overload def execute_fetch(self, conn: Connection, sql_query, params, return_type: Literal[rows]) - List[List[Any]]: ... overload def execute_fetch(self, conn: Connection, sql_query, params, return_type: Literal[dict]) - List[Dict[str, Any]]: ... def execute_fetch(self,conn: Connection,sql_query,paramsNone,return_typescalar): 执行查询语句,并获取执行结果;注意:conn现在由应用层管理声明周期,使用完成后需要用[close]关闭 :param conn: 数据库引擎 :type conn: Connection :param sql_query: sql语句 :param parameter: 参数字典 :param return_type: 执行类型:scalar,single_column,rows,dict return self._execute(conn,sql_query,params,fetchreturn_type) def _execute(self,conn: Connection,sql,paramsNone,*,fetch: str): fetch: scalar - scalar() 的单值结果 all - list[Row] single_column - scalars().all() rows - 多行 list[list] dict - 多行 dict try: if not isinstance(sql, TextClause): sql text(sql) result conn.execute(sql, params) if fetch scalar: return result.scalar() if fetch all: return result.fetchall() if fetch single_column: return result.scalars().all() if fetch rows: return [list(r) for r in result] if fetch dict: return [dict(r._mapping) for r in result] raise ValueError(f非法 fetch 参数: {fetch}) except (OperationalError, IntegrityError, DBAPIError) as e: self.log_sql_error(sql,params,e) raise except Exception as e: self.log_sql_error(sql,params,e) raise def transaction(self, db_name:str, sql_list:list) - list[CursorResult,]: 增删改使用,连接生命周期由方法自动管理。 :param conn: 数据库连接 :param sql_list: [(sql, params),] if not sql_list: raise ValueError(sql_list不能为空) conn:Connection self.get_conn(db_name) try: results [] current_sql None current_params None with conn.begin(): for (sql, params) in sql_list: current_sql sql current_params params result self._execute_raw(conn, sql, params) results.append(result) return results except Exception as e: self.log_sql_error(current_sql,current_params,e) raise finally: conn.close() def transaction_insert_chain(self,db_name: str,lastrowid_col: str,sql_query_list,parameter_listNone): 事务执行SQL语句,插入返回ID,连接生命周期由调用方管理。 if not sql_query_list: raise ValueError(sql_query_list不能为空) if parameter_list is None: parameter_list [{}] * len(sql_query_list) if len(sql_query_list) ! len(parameter_list): raise ValueError(sql_query_list和parameter_list长度不一致) conn:Connection self.get_conn(db_name) current_sql None current_params None try: with conn.begin(): results [] lastrowid None for idx, (sql, params) in enumerate(zip(sql_query_list, parameter_list)): exec_params dict(params) current_sql sql current_params exec_params if idx 0 and lastrowid is not None: exec_params[lastrowid_col] lastrowid result self._execute_raw(conn, sql, exec_params) if idx 0: lastrowid result.lastrowid if lastrowid is None: raise RuntimeError(INSERT 未返回 lastrowid) results.append(result) return results except Exception as e: self.log_sql_error(current_sql,current_params,e) raise finally: conn.close() def _execute_raw(self,conn: Connection,sql,params:dict | None None) - CursorResult: if not isinstance(sql, TextClause): sql text(sql) try: return conn.execute(sql, params or {}) except Exception as e: self.log_sql_error(sql, params, e) raise