Flask-SQLAlchemy 是如何执行一条查询的

最近在排查一个问题时发现,对于使用 flask-sqlalchemy 的 db.session.exeute() 提交的查询是如何被执行的不甚明了,这篇文章就是探究这一过程的。

下面我们分俩个部分来看下 db.session.execute 是如何执行的:

  1. db.session:session 的创建
  2. session.execute:查询的执行

在 1 和 2 之间,为了更好的理解查询的执过程,先看下 SQLAlchemy 的整体架构。此外,之前对 autocommit 参数有一些误解,这里说明一下。

软件版本: SQLAlchemy 1.3.10,flask-sqlalchmey 2.4.1。

session 的创建

db 对象是从 SQLAlchemy 实例化来的,在 SQLAlchemy 的初始化方法中

class SQLAlchemy(object):

    def __init(self, ...):
        # ...
        self.session = self.create_scoped_session(session_options)

    # ...

    def create_scoped_session(self, options=None):
          if options is None:
              options = {}

          scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
          options.setdefault('query_cls', self.Query)
          return orm.scoped_session(
              self.create_session(options), scopefunc=scopefunc
          )

create_scoped_session 的注释来看,所谓的 scoped 其实是类似于 ThreadLocal ,session 只在请求上下文内有效,传入的 __ident_func__ 其实就是获取线程 ID 的,可以猜想,会有一个字典,当前请求的线程 ID 为 KEY,每个请求会通过工厂方法创建一个请求自己的 session。

下面去 sqlalchemy 的代码中验证:

class scoped_session(object):

    def __init__(self, session_factory, scopefunc=None):

    self.session_factory = session_factory

    if scopefunc:
        self.registry = ScopedRegistry(session_factory, scopefunc)
    else:
        self.registry = ThreadLocalRegistry(session_factory)

    def __call__(self, **kw):
        if kw:
            if self.registry.has():
                raise sa_exc.InvalidRequestError(
                    "Scoped session is already present; "
                    "no new arguments may be specified."
                )
            else:
                sess = self.session_factory(**kw)
                self.registry.set(sess)
                return sess
        else:
            return self.registry()

scoped_session源码)只是一个中转站,


class ScopedRegistry(object):

    def __init__(self, createfunc, scopefunc):
        self.createfunc = createfunc
        self.scopefunc = scopefunc
        self.registry = {}

    def __call__(self):
        key = self.scopefunc()
        try:
            return self.registry[key]
        except KeyError:
            return self.registry.setdefault(key, self.createfunc())

    def has(self):
        """Return True if an object is present in the current scope."""

        return self.scopefunc() in self.registry

通过上面两段代码可以发现,事实正如我们想的一样,ScopedRegistry 内部维护了一个字典,通过传入的 scopefunc 生成 key,传入的 session_factory 工厂方法生成 sessoin 对象。

下面看看工厂方法 create_session 创建 session 的过程:

class SQLAlchemy(object):

    # ...

    def create_session(self, options):
        return orm.sessionmaker(class_=SignallingSession, db=self, **options)

SignallingSession 是对 SQLAlchemy 内部的 Session 类做了一些数据库事件信号方面的拓展封装,这里我们忽略这些拓展,就认为传入的是 SQLAlchemy 原生的 Session 类。sessionmaker 也是一个工厂方法,它做的事是将传入的参数经过一些处理,在传入到 Session 类中,最终返回了一个 session 对象。

这样,每个 HTTP 请求内调用 db.session 都会创建一个只在这个请求范围内有效的 session。

SQLAlchemy 架构

img

上图是 SQLAlchemy 整体的架构,可以分成三个部分:

  • ORM:对象关系映射处理。
  • Core:底层的查询处理、连接池等。
  • DBAPI:最底层的数据访问接口。
img

上图是 Core 里面核心类之间的关系。

DBAPI:Python 在 pep249 中定义的数据库访问接口规范,核心是 ConnectionCursor 两个类,Connection 类封装数据库连 接,Cursor 封装了数据库查询的上下文信息。

Dialect:本身是一个抽象类,如何与数据库交互都是通过这个类定义的,包括元信息、查询语句生成、执行、结果集处理等,不同的数据库对应不同的实现。源码

Connection:对 DB-API 更高层次的封装,提供了对 SQL 语句执行的相关支持,里面包含了一个从数据库连接池拿来的一个 DBAPI 连接。

Engine:协调 Connection、Pool,提供用户级的接口,一般是通过外观方法 create_engine()来创建。

ExecutionContext:执行的过程中由 Connection 创建,Dialect 的信使,表示一个查询执行。

每执行一个查询,Engine 会创建一个 Connection 对象,并从连接池 Pool 中取一个数据库连接传入 Connection,connection 通过 Dialect 与不同的数据库对话,最终的查询结果封装中 ResultProxy 中。

查询的执行

由上面的分析我们知道 db.session 最终返回的是一个 session 对象。从下面的 Session 类的代码中我们发现 session 对象在初始化时,就默认开始了一个事务(autocommit 参数默认为 False),也就是说 db.session 开启了一个事务。

class Session(_SessionClassMethods):

		def __init__(self, ...):
      	# ...
      	if not self.autocommit:
            self.begin()

  	def begin(self, subtransactions=False, nested=False):
        if self.transaction is not None:
            # ...
        else:
            self.transaction = SessionTransaction(self, nested=nested)
        return self.transaction  # needed for __enter__/__exit__ hook
class Session(_SessionClassMethods):

    # ...

    def execute(self, clause, params=None, mapper=None, bind=None, **kw):
        # 处理查询语句
   		  clause = expression._literal_as_text(
            clause, allow_coercion_to_text=True
        )
			  # 获取 Engine 对象
        if bind is None:
            bind = self.get_bind(mapper, clause=clause, **kw)
				# 创建 Connection 对象,执行查询
        return self._connection_for_bind(bind, close_with_result=True).execute(
            clause, params or {}
        )

事务在 session 初始化时自动开启了,但是在 execute 里面没有看到提交,因为一个事务里面可能有多次查询,对应多个 execute,所以,所有 execute 执行完后要自己调用 commit 方法提交事务。

class Session(_SessionClassMethods):

    def commit(self):
        # ...
        self.transaction.commit()

class SessionTransaction(object):

    def commit(self):
        # ...
        self.close()
        return self._parent

    def close(self, invalidate=False):
        # 关闭事务...
        # 同时开启另一个事务
        if not self.session.autocommit:
           self.session.begin()

Autocommit

原文1

The autocommit flag is not for general use, and if it is used, queries should only be invoked within the span of a Session.begin() / Session.commit() pair. Executing queries outside of a demarcated transaction is a legacy mode of usage, and can in some cases lead to concurrent connection checkouts.

autocommit 并不是字面意思那样会自动提交,相反,开启了 autocommit 后,要手动的调用, sesession.begin()/session.commit()。在事务外执行查询时历史遗留问题,可能会导致并发问题。

Defaults to False. When True, the Session does not keep a persistent transaction running, and will acquire connections from the engine on an as-needed basis, returning them immediately after their use. Flushes will begin and commit (or possibly rollback) their own transaction if no transaction is present. When using this mode, the Session.begin() method is used to explicitly start transactions.

默认为 Flase。当为 True 时,session 不会维持一个持久化的事务,并且会需要时才从 engine 获取 connection,用完后立刻归还。

原文2

Deprecated since version 1.4: “autocommit” mode is a legacy mode of use and should not be considered for new projects. The feature will be deprecated in SQLAlchemy 1.4 and removed in version 2.0; both versions provide a more refined “autobegin” approach that allows the Session.begin() method to be used normally. If autocommit mode is used, it is strongly advised that the application at least ensure that transaction scope is made present via the Session.begin() method, rather than using the session in pure autocommit mode.

概述在 1.4 被废弃,2.0 移除,会提供一个更合理的参数 autobegin

参考

[1] The Architecture of SQLAlchemy

[2] Python Database API Specification v2.0

[3] session_basics