1.需求及场景概述
现有系统中因历史数据量过大,产生了将历史数据进行按月存储的要求,系统和数据库交互使用的是sqlalchemy,假设系统的原来的历史记录表(record)如下:
为了将历史数据按月分表存储,我们需要以此表为基础按月创建对应的月表来进行分表存储,同时又要使用orm的功能。面对这样的需求我们很自然的会想到创建如下模型
class Recode_202405(Base):
__tablename__ = 'record_202405'
id = Column(INT(11), primary_key=True)
name = Column(String(100, 'utf8mb4_unicode_ci'))
这样当然可以,但是我们不可能每月手动去创建这个模型,然后重启自己的服务,这明显有问题, 那如何解决呢,下面就介绍一种在这种场景下基于sqlalchemy实现的分表存储方案。
首先,我们对我们应用场景及需求进行一下描述:
1.我们有一张基础表,这个表作为我们创建月表的模板。
2.当有新数据需要入库时,我们将数据按月存储到当前月份的月表中,如果当月表不存在系统自动创建。
3.支持使用OMR方便数据存储及查询。
2.实现方案
既然要支持使用ORM ,我们势必要获取月表的model。sqlalchemy0.9.1版本推出了Automap,它可以自动映射数据库的表,通过数据表名映射model。大概的用法如下:
from sqlalchemy.ext.automap import automap_base
AutoBase = automap_base()
# reflect the tables
AutoBase.prepare(engine, reflect=True)
tablename = "record_202405"
RecordDao = getattr(AutoBase.classes, tablename)
既然可以通过表名映射回model,那么现在的问题就是如何基于已有的基础表创建当月的月表,可以参考如下代码:
from sqlalchemy.ext.automap import automap_base
AutoBase = automap_base()
table_name = 'record'
date = '202405'
# 基于基础表创建指定月份的月表
base_table = autobase.metadata.tables[table_name]
mdata = MetaData()
new_table = base_table.tometadata(metadata=mdata, name=f'{table_name}_{date}')
try:
# 创建表
new_table.create(bind=engine)
except BaseException as e:
print(e)
基于上述方案,我们可以封装一个方法,该方法接收两个参数,一个是表名,一个是分表的月份,其实可以进行任何程度的分表,返回该表的model,通过sqlalchemy 的ORM 进行增删改查操作,整体代码如下:
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.automap import automap_base
from urllib import parse
from contextlib import contextmanager
import time
mysql_user = 'root'
mysql_password = 'root'
mysql_host = '127.0.0.1'
mysql_port = 3306
mysql_db = 'db'
SQLALCHEMY_DATABASE_URL = f'mysql+pymysql://{mysql_user}:{parse.quote_plus(mysql_password)}@{mysql_host}:{mysql_port}/{mysql_db}?charset=utf8'
engine = create_engine(SQLALCHEMY_DATABASE_URL, pool_recycle=7200)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@contextmanager
def session_maker():
try:
db: Session = SessionLocal()
yield db
db.commit()
except Exception as e:
print(e)
db.rollback()
finally:
db.close()
def get_table_model(table_name, date=None):
autobase = automap_base()
autobase.prepare(engine, reflect=True)
print(autobase)
base_model = None
try:
base_model = getattr(autobase.classes, table_name)
except Exception as e:
print(e)
return base_model
if date == None: # 不分表的情况
return base_model
else: # 分表的情况
# 获取基础类
base_table = autobase.metadata.tables[table_name]
mdata = MetaData()
new_table = base_table.tometadata(metadata=mdata, name=f'{table_name}_{date}')
# new_table = base_model.tometadata(metadata=mdata, name=f'{table_name}_{date}')
try:
# 创建表
new_table.create(bind=engine)
except BaseException as e:
# 忽略建表异常,存在多进程同时建表情况,忽略后刷新autobase再试
print(e)
'''Automap的映射虽然是自动的,但是只有在启动的时候生效,也就是说如果新建一个数据表,
而没有告诉Automap,那这个表是找不到的。在实际使用中,可以捕获AttributeError异常,
并再次调用AutoBase.prepare(engine, reflect=True) 刷新映射关系。'''
autobase = automap_base()
autobase.prepare(engine, reflect=True)
base_model = getattr(autobase.classes, f'{table_name}_{date}')
return base_model
def get_table_model(table_name, date=None):
autobase = automap_base()
autobase.prepare(engine, reflect=True)
print(autobase)
base_model = None
try:
base_model = getattr(autobase.classes, table_name)
except Exception as e:
print(e)
return base_model
if date == None: # 不分表的情况
return base_model
else: # 分表的情况
# 获取基础类
base_table = autobase.metadata.tables[table_name]
mdata = MetaData()
new_table = base_table.tometadata(metadata=mdata, name=f'{table_name}_{date}')
# new_table = base_model.tometadata(metadata=mdata, name=f'{table_name}_{date}')
try:
# 创建表
new_table.create(bind=engine)
except BaseException as e:
# 忽略建表异常,存在多进程同时建表情况,忽略后刷新autobase再试
print(e)
'''Automap的映射虽然是自动的,但是只有在启动的时候生效,也就是说如果新建一个数据表,
而没有告诉Automap,那这个表是找不到的。在实际使用中,可以捕获AttributeError异常,
并再次调用AutoBase.prepare(engine, reflect=True) 刷新映射关系。'''
autobase = automap_base()
autobase.prepare(engine, reflect=True)
base_model = getattr(autobase.classes, f'{table_name}_{date}')
return base_model
# 使用
Record = database.get_table_model("record", '202406')
ll = Record(name="1111")
with database.session_maker() as db:
db.add(ll)
db.commit()
ll = db.query(Record).all()
print(ll)