基于 SQLAlchemy 批量更新数据

订单系统上线,迁移历史数据后,要批量更新一些数据,需要一种简单快速的方法。

解法

通过 Python + SQLAlchemy 实现批量更新,找到了下面这种方法,实测速度是挺快的。

from sqlalchemy.sql.expression import bindparam
stmt = update(TableName)\
    where(TableName.id == bindparam('_id')).\
    values({
        'user_id': bindparam('user_id'),
        'email_address': bindparam('email_address'),
    })

conn.execute(stmt, [
    {'user_id': 1, 'email_address' : 'jack@yahoo.com', '_id':1},
    {'user_id': 1, 'email_address' : 'jack@msn.com', '_id':2},
    {'user_id': 2, 'email_address' : 'www@www.org', '_id':3},
    {'user_id': 2, 'email_address' : 'wendy@aol.com', '_id':4},
])

核心是基于 bindparam,将待更新的行的主键及待更新字段一起构造一个字典列表,基于构造好的类似于模板的语句实现批量更新。

实战

import sys
import logging
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, Session
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Float, Text
from sqlalchemy.sql.expression import bindparam

logging.basicConfig(filename='sync_order_address_id.log',
                    filemode='a',
                    format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                    datefmt='%H:%M:%S',
                    level=logging.DEBUG)


log = logging.getLogger(__name__)

is_dev = sys.argv[1] == 'dev'

if is_dev:
    DATABASE_URI = ''
    PROJECT_ID = 34
else:
    DATABASE_URI = ''
    PROJECT_ID = 2


Base = declarative_base()


class Order(Base):
    __tablename__ = 'od_order'

    id = Column(Integer, primary_key=True)
    # ...


class Address(Base):
    __tablename__ = 'od_address'

    id = Column(Integer, primary_key=True)
    # ...


def main():
    engine = create_engine(DATABASE_URI, encoding='utf8', echo=True)
    map2 = {}
    lst = []
    with Session(engine) as session:
        orders = session.query(Order).filter(Order.lanqiao_project_id==PROJECT_ID, Order.search_field4.isnot(None)).order_by(Order.create_time.desc()).all()
        addresses = session.query(Address).filter(Address.old_id.isnot(None)).all()
        for address in addresses:
            map2[address.old_id] = address.id
        for order in orders:
            lst.append({
                '_id': order.id,
                'new_address_id': map2.get(int(order.search_field4), 0)
            })
        stmp = Order.__table__.update().\
        where(Order.__table__.c.id==bindparam('_id')).\
        values({
            'address_id': bindparam('new_address_id')
        })
        session.execute(stmp, lst)
        session.commit()


if __name__ == '__main__':
    main()

要点:

  1. TableClass.table 访问到 table;
  2. TableClass.__table.c.id 访问到具体某个列;

参考

[1] how-to-bulk-update-in-sqlalchemy-python