基于 SQLAlchemy 批量更新数据
订单系统上线,迁移历史数据后,要批量更新一些数据,需要一种简单快速的方法。
解法
通过 Python + SQLAlchemy 实现批量更新,找到了下面这种方法,实测速度是挺快的。
from sqlalchemy.sql.expression import bindparam
stmt = update(TableName)\
where(TableName.id == bindparam('_id')).\
values({
'user_id': bindparam('user_id'),
'email_address': bindparam('email_address'),
})
conn.execute(stmt, [
{'user_id': 1, 'email_address' : 'jack@yahoo.com', '_id':1},
{'user_id': 1, 'email_address' : 'jack@msn.com', '_id':2},
{'user_id': 2, 'email_address' : 'www@www.org', '_id':3},
{'user_id': 2, 'email_address' : 'wendy@aol.com', '_id':4},
])
核心是基于 bindparam,将待更新的行的主键及待更新字段一起构造一个字典列表,基于构造好的类似于模板的语句实现批量更新。
实战
import sys
import logging
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, Session
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Float, Text
from sqlalchemy.sql.expression import bindparam
logging.basicConfig(filename='sync_order_address_id.log',
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.DEBUG)
log = logging.getLogger(__name__)
is_dev = sys.argv[1] == 'dev'
if is_dev:
DATABASE_URI = ''
PROJECT_ID = 34
else:
DATABASE_URI = ''
PROJECT_ID = 2
Base = declarative_base()
class Order(Base):
__tablename__ = 'od_order'
id = Column(Integer, primary_key=True)
# ...
class Address(Base):
__tablename__ = 'od_address'
id = Column(Integer, primary_key=True)
# ...
def main():
engine = create_engine(DATABASE_URI, encoding='utf8', echo=True)
map2 = {}
lst = []
with Session(engine) as session:
orders = session.query(Order).filter(Order.lanqiao_project_id==PROJECT_ID, Order.search_field4.isnot(None)).order_by(Order.create_time.desc()).all()
addresses = session.query(Address).filter(Address.old_id.isnot(None)).all()
for address in addresses:
map2[address.old_id] = address.id
for order in orders:
lst.append({
'_id': order.id,
'new_address_id': map2.get(int(order.search_field4), 0)
})
stmp = Order.__table__.update().\
where(Order.__table__.c.id==bindparam('_id')).\
values({
'address_id': bindparam('new_address_id')
})
session.execute(stmp, lst)
session.commit()
if __name__ == '__main__':
main()
要点:
- TableClass.table 访问到 table;
- TableClass.__table.c.id 访问到具体某个列;