Flask-SQLAlchemy 是如何执行一条查询的
最近在排查一个问题时发现,对于使用 flask-sqlalchemy 的 db.session.exeute()
提交的查询是如何被执行的不甚明了,这篇文章就是探究这一过程的。
下面我们分俩个部分来看下 db.session.execute 是如何执行的:
- db.session:session 的创建
- session.execute:查询的执行
在 1 和 2 之间,为了更好的理解查询的执过程,先看下 SQLAlchemy 的整体架构。此外,之前对 autocommit 参数有一些误解,这里说明一下。
软件版本: SQLAlchemy 1.3.10,flask-sqlalchmey 2.4.1。
session 的创建
db
对象是从 SQLAlchemy
实例化来的,在 SQLAlchemy
的初始化方法中
class SQLAlchemy(object):
def __init(self, ...):
# ...
self.session = self.create_scoped_session(session_options)
# ...
def create_scoped_session(self, options=None):
if options is None:
options = {}
scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
options.setdefault('query_cls', self.Query)
return orm.scoped_session(
self.create_session(options), scopefunc=scopefunc
)
从 create_scoped_session
的注释来看,所谓的 scoped
其实是类似于 ThreadLocal
,session 只在请求上下文内有效,传入的 __ident_func__
其实就是获取线程 ID 的,可以猜想,会有一个字典,当前请求的线程 ID 为 KEY,每个请求会通过工厂方法创建一个请求自己的 session。
下面去 sqlalchemy
的代码中验证:
class scoped_session(object):
def __init__(self, session_factory, scopefunc=None):
self.session_factory = session_factory
if scopefunc:
self.registry = ScopedRegistry(session_factory, scopefunc)
else:
self.registry = ThreadLocalRegistry(session_factory)
def __call__(self, **kw):
if kw:
if self.registry.has():
raise sa_exc.InvalidRequestError(
"Scoped session is already present; "
"no new arguments may be specified."
)
else:
sess = self.session_factory(**kw)
self.registry.set(sess)
return sess
else:
return self.registry()
scoped_session
(源码)只是一个中转站,
class ScopedRegistry(object):
def __init__(self, createfunc, scopefunc):
self.createfunc = createfunc
self.scopefunc = scopefunc
self.registry = {}
def __call__(self):
key = self.scopefunc()
try:
return self.registry[key]
except KeyError:
return self.registry.setdefault(key, self.createfunc())
def has(self):
"""Return True if an object is present in the current scope."""
return self.scopefunc() in self.registry
通过上面两段代码可以发现,事实正如我们想的一样,ScopedRegistry
内部维护了一个字典,通过传入的 scopefunc
生成 key,传入的 session_factory
工厂方法生成 sessoin 对象。
下面看看工厂方法 create_session
创建 session 的过程:
class SQLAlchemy(object):
# ...
def create_session(self, options):
return orm.sessionmaker(class_=SignallingSession, db=self, **options)
SignallingSession
是对 SQLAlchemy
内部的 Session
类做了一些数据库事件信号方面的拓展封装,这里我们忽略这些拓展,就认为传入的是 SQLAlchemy 原生的 Session 类。sessionmaker
也是一个工厂方法,它做的事是将传入的参数经过一些处理,在传入到 Session
类中,最终返回了一个 session 对象。
这样,每个 HTTP 请求内调用 db.session
都会创建一个只在这个请求范围内有效的 session。
SQLAlchemy 架构
上图是 SQLAlchemy 整体的架构,可以分成三个部分:
- ORM:对象关系映射处理。
- Core:底层的查询处理、连接池等。
- DBAPI:最底层的数据访问接口。
上图是 Core 里面核心类之间的关系。
DBAPI:Python 在 pep249 中定义的数据库访问接口规范,核心是 Connection
和 Cursor
两个类,Connection
类封装数据库连 接,Cursor
封装了数据库查询的上下文信息。
Dialect:本身是一个抽象类,如何与数据库交互都是通过这个类定义的,包括元信息、查询语句生成、执行、结果集处理等,不同的数据库对应不同的实现。源码
Connection:对 DB-API 更高层次的封装,提供了对 SQL 语句执行的相关支持,里面包含了一个从数据库连接池拿来的一个 DBAPI 连接。
Engine:协调 Connection、Pool,提供用户级的接口,一般是通过外观方法 create_engine()
来创建。
ExecutionContext:执行的过程中由 Connection 创建,Dialect
的信使,表示一个查询执行。
每执行一个查询,Engine 会创建一个 Connection 对象,并从连接池 Pool 中取一个数据库连接传入 Connection,connection 通过 Dialect 与不同的数据库对话,最终的查询结果封装中 ResultProxy 中。
查询的执行
由上面的分析我们知道 db.session
最终返回的是一个 session 对象。从下面的 Session 类的代码中我们发现 session 对象在初始化时,就默认开始了一个事务(autocommit 参数默认为 False),也就是说 db.session
开启了一个事务。
class Session(_SessionClassMethods):
def __init__(self, ...):
# ...
if not self.autocommit:
self.begin()
def begin(self, subtransactions=False, nested=False):
if self.transaction is not None:
# ...
else:
self.transaction = SessionTransaction(self, nested=nested)
return self.transaction # needed for __enter__/__exit__ hook
class Session(_SessionClassMethods):
# ...
def execute(self, clause, params=None, mapper=None, bind=None, **kw):
# 处理查询语句
clause = expression._literal_as_text(
clause, allow_coercion_to_text=True
)
# 获取 Engine 对象
if bind is None:
bind = self.get_bind(mapper, clause=clause, **kw)
# 创建 Connection 对象,执行查询
return self._connection_for_bind(bind, close_with_result=True).execute(
clause, params or {}
)
事务在 session 初始化时自动开启了,但是在 execute 里面没有看到提交,因为一个事务里面可能有多次查询,对应多个 execute,所以,所有 execute 执行完后要自己调用 commit 方法提交事务。
class Session(_SessionClassMethods):
def commit(self):
# ...
self.transaction.commit()
class SessionTransaction(object):
def commit(self):
# ...
self.close()
return self._parent
def close(self, invalidate=False):
# 关闭事务...
# 同时开启另一个事务
if not self.session.autocommit:
self.session.begin()
Autocommit
The autocommit flag is not for general use, and if it is used, queries should only be invoked within the span of a
Session.begin()
/Session.commit()
pair. Executing queries outside of a demarcated transaction is a legacy mode of usage, and can in some cases lead to concurrent connection checkouts.
autocommit 并不是字面意思那样会自动提交,相反,开启了 autocommit 后,要手动的调用, sesession.begin()/session.commit()。在事务外执行查询时历史遗留问题,可能会导致并发问题。
Defaults to
False
. WhenTrue
, theSession
does not keep a persistent transaction running, and will acquire connections from the engine on an as-needed basis, returning them immediately after their use. Flushes will begin and commit (or possibly rollback) their own transaction if no transaction is present. When using this mode, theSession.begin()
method is used to explicitly start transactions.
默认为 Flase。当为 True 时,session 不会维持一个持久化的事务,并且会需要时才从 engine 获取 connection,用完后立刻归还。
Deprecated since version 1.4: “autocommit” mode is a legacy mode of use and should not be considered for new projects. The feature will be deprecated in SQLAlchemy 1.4 and removed in version 2.0; both versions provide a more refined “autobegin” approach that allows the
Session.begin()
method to be used normally. If autocommit mode is used, it is strongly advised that the application at least ensure that transaction scope is made present via theSession.begin()
method, rather than using the session in pure autocommit mode.
概述在 1.4 被废弃,2.0 移除,会提供一个更合理的参数 autobegin
。
参考
[1] The Architecture of SQLAlchemy
[2] Python Database API Specification v2.0
[3] session_basics