SQLAIchemy 异步DBManager封装-02熟悉掌握

一、引言

在上一篇文章中 SQLAIchemy 异步DBManager封装-01入门理解 我们深入讨论了SQLAlchemy异步DBManager整体的封装结构与思路。详细地介绍了如何封装添加和批量添加的操作方法,并通过实际示例进行了演示。SQL 全称是结构化查询语言,无疑查询是最复杂的部分。因此,在这篇文章中,我将详细介绍如何封装通用的数据库查询方法,并通过具体的示例来讲解这一过程,使得这一复杂的任务变得更为简单。

二、通用查询封装

指定主键id查询

class DBManager(metaclass=SingletonMetaCls):
    DB_CLIENT: SQLAlchemyManager = None
    orm_table: Type[BaseOrmTable] = None
    
    @with_session
    async def query_by_id(
            self,
            pk_id: int,
            *,
            orm_table: Type[BaseOrmTable] = None,
            session: AsyncSession = None,
    ) -> Union[T_BaseOrmTable, None]:
        """
        根据主键id查询
        Args:
            pk_id: 主键id
            orm_table: orm表映射类
            session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

        Returns:
            orm映射类的实例对象
        """
        orm_table = orm_table or self.orm_table
        ret = await session.get(orm_table, pk_id)
        return ret

这个封装很简单,直接看demo吧

class UserTable(BaseOrmTableWithTS):
    """用户表"""

    __tablename__ = "user"
    username: Mapped[str] = mapped_column(String(30), default="", comment="用户昵称")
    age: Mapped[int] = mapped_column(default=0, comment="年龄")
    password: Mapped[str] = mapped_column(String(30), default="", comment="用户密码")
    phone: Mapped[str] = mapped_column(String(11), default="", comment="手机号")
    email: Mapped[str] = mapped_column(String(30), default="", comment="邮箱")
    avatar: Mapped[str] = mapped_column(String(100), default="", comment="头像")



class UserManager(DBManager):
    orm_table = UserTable

    async def get_name_by_email(self, email):
        username = await self.query_one(cols=["username"], conds=[self.orm_table.email == email], flat=True)
        return username
        
async def query_demo():
    user = await UserManager().query_by_id(pk_id=1)
    print("user", user)

>>> out
user {'id': 1, 'username': 'hui', 'age': 18, 'password': '', 'phone': '', 'email': 'huidbk.163.com', 'avatar': '', 'created_at': datetime.datetime(2024, 4, 15, 1, 0, 43), 'updated_at': datetime.datetime(2024, 4, 15, 1, 0, 43)}

查询单条

@with_session
async def _query(
        self,
        *,
        cols: list = None,
        orm_table: BaseOrmTable = None,
        conds: list = None,
        orders: list = None,
        limit: int = None,
        offset: int = 0,
        session: AsyncSession = None,
) -> Result[Any]:
    """
    通用查询
    Args:
        cols: 查询的列表字段
        orm_table: orm表映射类
        conds: 查询的条件列表
        orders: 排序列表, 默认id升序
        limit: 限制数量大小
        offset: 偏移量
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

    Returns: 查询结果集
        cursor_result
    """
    cols = cols or []
    cols = [column(col_obj) if isinstance(col_obj, str) else col_obj for col_obj in cols]  # 兼容字符串列表

    conditions = conds or []
    orders = orders or [column("id")]
    orm_table = orm_table or self.orm_table

    # 构造查询
    if cols:
        # 查询指定列
        query_sql = select(*cols).select_from(orm_table).where(*conditions).order_by(*orders)
    else:
        # 查询全部字段
        query_sql = select(orm_table).where(*conditions).order_by(*orders)

    if limit:
        query_sql = query_sql.limit(limit).offset(offset)

    # 执行查询
    cursor_result = await session.execute(query_sql)
    return cursor_result

@with_session
async def query_one(
        self,
        *,
        cols: list = None,
        orm_table: Type[BaseOrmTable] = None,
        conds: list = None,
        orders: list = None,
        flat: bool = False,
        session: AsyncSession = None,
) -> Union[dict, T_BaseOrmTable, Any]:
    """
    查询单行
    Args:
        cols: 查询的列表字段
        orm_table: orm表映射类
        conds: 查询的条件列表
        orders: 排序列表
        flat: 单字段时扁平化处理
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

    Examples:
        # 指定列名
        ret = await UserManager().query_one(cols=["username", "age"], conds=[UserTable.id == 1])
        sql => select username, age from user where id=1
        ret => {"username": "hui", "age": 18}

        # 指定列名,单字段扁平化处理
        ret = await UserManager().query_one(cols=["username"], conds=[UserTable.id == 1])
        sql => select username from user where id=1
        ret => {"username": "hui"} => "hui"

        # 计算总数
        ret = await UserManager().query_one(cols=[func.count()], flat=True)
        sql => select count(*) as count from user
        ret => {"count": 10} => 10

        # 不指定列名,查询全部字段, 返回表实例对象
        ret = await UserManager().query_one(conds=[UserTable.id == 1])
        sql => select id, username, age from user where id=1
        ret => UserTable(id=1, username="hui", age=18)

    Returns:
        Union[dict, BaseOrmTable(), Any(flat=True)]
    """
    cursor_result = await self._query(cols=cols, orm_table=orm_table, conds=conds, orders=orders, session=session)
    if cols:
        if flat and len(cols) == 1:
            # 单行单字段查询: 直接返回字段结果
            # eg: select count(*) as count from user 从 {"count": 100} => 100
            # eg: select username from user where id=1 从 {"username": "hui"} => "hui"
            return cursor_result.scalar_one()

        # eg: select username, age from user where id=1 => {"username": "hui", "age": 18}
        return cursor_result.mappings().one() or {}
    else:
        # 未指定列名查询默认全部字段,返回的是表实例对象 BaseOrmTable()
        # eg: select id, username, age from user where id=1 => UserTable(id=1, username="hui", age=18)
        return cursor_result.scalar_one()

查询无疑就只有两种结果单条、多条结果数据。这里统一封装一个 _query 通用查询方法,以供内部使用。

  • 支持指定查询的列(cols)
  • 条件查询(conds)
  • 排序(orders)
  • 分页(limit、offset)

主要封装就是利用 sqlaichemy 提供的 select 语法进行组织sql,通过 column 兼容列名字段字符串列表。query_one 方法,如果指定了 cols 返回字典格式,不指定则是库表映射类实例对象,一开始封装的时候我想统一出参都是返回 库表映射类实例对象 。

query_ret = cursor_result.mappings().one() or {}
return orm_table(**query_ret)

如果是 id as user_id 取别名查询会导致映射不上,但可以查询时不指定别名,orm_table_obj.to_dict(alias_dict={"id": "user_id"}) 时进行别名转换,还有一些flat 扁平化、统计数量的时候都不能使用 orm_table(**query_ret) 故而不好统一,再实际web场景中,出参还是要转成dict、json格式化进行响应,故而进行保留。看看具体使用效果

from sqlalchemy import String, func, label

async def query_demo():
    ret = await UserManager().query_one(cols=["username", "age"], conds=[UserTable.id == 1])
    print("指定列名 ret", ret)
    
    ret = await UserManager().query_one(
        cols=[UserTable.username, label("user_age", UserTable.age)], conds=[UserTable.id == 1]
    )
    print("取别名 ret", ret)

    ret = await UserManager().query_one(cols=["username"], conds=[UserTable.id == 1], flat=True)
    print("指定列名,单字段扁平化处理", ret)

    ret = await UserManager().query_one(cols=[func.count()], flat=True)
    print("计算总数", ret)

    ret = await UserManager().query_one(conds=[UserTable.id == 1])
    print("不指定列名,查询全部字段, 返回表实例对象", ret)

查询结果

指定列名 ret {'username': 'hui', 'age': 18}

取别名 ret {'username': 'hui', 'user_age': 18}

指定列名,单字段扁平化处理 hui

计算总数 6

不指定列名,查询全部字段, 返回表实例对象 {'username': 'hui', 'age': 18, 'password': '', 'phone': '', 'email': 'huidbk.163.com', 'avatar': '', 'id': 1, 'created_at': datetime.datetime(2024, 4, 15, 1, 0, 43), 'updated_at': datetime.datetime(2024, 4, 15, 1, 0, 43)}

查询多条

@with_session
async def query_all(
        self,
        *,
        cols: list = None,
        orm_table: BaseOrmTable = None,
        conds: list = None,
        orders: list = None,
        flat: bool = False,
        limit: int = None,
        offset: int = None,
        session: AsyncSession = None,
) -> Union[List[dict], List[T_BaseOrmTable], Any]:
    """
    查询多行
    Args:
        cols: 查询的列表字段
        orm_table: orm表映射类
        conds: 查询的条件列表
        orders: 排序列表
        flat: 单字段时扁平化处理
        limit: 限制数量大小
        offset: 偏移量
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务
    """
    cursor_result = await self._query(
        cols=cols, orm_table=orm_table, conds=conds, orders=orders, limit=limit, offset=offset, session=session
    )
    if cols:
        if flat and len(cols) == 1:
            # 扁平化处理
            # eg: select id from user 从 [{"id": 1}, {"id": 2}, {"id": 3}] => [1, 2, 3]
            return cursor_result.scalars().all()

        # eg: select username, age from user => [{"username": "hui", "age": 18}, [{"username": "dbk", "age": 18}]]
        return cursor_result.mappings().all() or []
    else:
        # 未指定列名查询默认全部字段,返回的是表实例对象 [BaseOrmTable()]
        # eg: select id, username, age from user
        # [User(id=1, username="hui", age=18), User(id=2, username="dbk", age=18)
        return cursor_result.scalars().all()

查询多条与query_one一致内部调用 _query() 获取查询结果集,最后通过 cursor_result.mappings().all()cursor_result.scalars().all() 获取列表数据,同样支持单字段扁平化处理,还支持分页处理。具体看如下例子

from sqlalchemy import String, func, label, or_

ret = await UserManager().query_all()
user_ids = [user.id for user in ret]
print("查询全部", user_ids)

user_ids = await UserManager().query_all(cols=[UserTable.id], flat=True)
print("查询全部的用户id(扁平化处理)", user_ids)

ret = await UserManager().query_all(
    cols=[UserTable.username],
    conds=[
        UserTable.id > 1,
        or_(UserTable.age < 20, UserTable.email == "huidbk.163.com")
    ],
    orders=[UserTable.id],
    flat=True
)
# sql => select username from user where user.id > 1 and (age < 20 or email='huidbk.163.com') order by id
print("条件查询", ret)

查询结果

查询全部 [1, 2, 3, 4, 5, 6]
查询全部的用户id(扁平化处理) [1, 2, 3, 4, 5, 6]
条件查询 ['zack']

单字段扁平化处理,可以节省获取查询数据后再进行扁平化处理的一步操作。看看下面没有扁平化处理

user_infos = await UserManager().query_all(cols=[UserTable.id])
user_ids = [user_info.get("id") for user_info in user_infos]
print("查询全部的用户id", user_ids)

ret = await UserManager().query_one(cols=[func.count()])
count = ret.get("count") or 0
print("计算总数", count)

上面的获取某业务的所有id,计算总数等,直接获取扁平化的结果,有时还是比较实用。

分页查询

async def list_page(
        self,
        *,
        cols: list = None,
        orm_table: BaseOrmTable = None,
        conds: list = None,
        orders: list = None,
        curr_page: int = 1,
        page_size: int = 20,
        session: AsyncSession = None,
):
    """
    单表通用分页查询
    Args:
        cols: 查询的列表字段
        orm_table: orm表映射类
        conds: 查询的条件列表
        orders: 排序列表
        curr_page: 页码
        page_size: 每页数量
        session: 数据库会话对象,如果为 None,则通过装饰器在方法内部开启新的事务

    Returns: 
        total_count, data_list
    """
    conds = conds or []
    orders = orders or [column("id")]
    orm_table = orm_table or self.orm_table

    limit = page_size
    offset = (curr_page - 1) * page_size
    total_count, data_list = await asyncio.gather(
        self.query_one(
            cols=[func.count()], orm_table=orm_table, conds=conds, orders=orders, flat=True, session=session
        ),
        self.query_all(
            cols=cols, orm_table=orm_table, conds=conds, orders=orders, limit=limit, offset=offset, session=session
        ),
    )

    return total_count, data_list

这里分页查询就用 query_one 查询总数,query_all 分页查询,curr_page 当前页与 page_size 每页大小计算数据偏移量 offset,然后通过 asyncio.gather 并发执行获取结果。

total_count, data_list = await UserManager().list_page(
    cols=[UserTable.id, UserTable.username, UserTable.age],
    conds=[UserTable.id > 1],
    curr_page=2,
    page_size=3,
    orders=[desc(UserTable.age)]
)
print("分页查询 total_count", total_count)
print("分页查询 data_list", data_list)

分页查询结果

分页查询 total_count 5
分页查询 data_list [{'id': 3, 'username': 'wang', 'age': 20}, {'id': 2, 'username': 'zack', 'age': 19}]

这里的分页查询没有使用 with_session 装饰器,由于 asyncio.gather 并发操作原因不能共享数据库会话 session,需要单独的 session,不然会报如下错误。

sqlalchemy.exc.InvalidRequestError:无法在上下文管理器内的已关闭事务上进行操作。 请先完成上下文管理器,然后再发出进一步的命令。

三、封装说明

SQL 的话还是查询用的多,查询也复杂,这里的话只封装了一些通用的查询操作,有一些分组查询、连表查询等我都没有封装,我认为这些操作还是写原生sql更直观一些,用ORM进行组装这些操作会感觉语法很别扭不简洁。如何执行原始sql,请看下一篇。SQLAIchemy 异步DBManager封装-03得心应手

四、Github源代码

源代码已上传到了Github,里面也有具体的使用Demo,欢迎大家一起体验、贡献。

HuiDBK/py-tools: 打造 Python 开发常用的工具,让Coding变得更简单 (github.com)

相关推荐

  1. C# 串口通讯异步封装

    2024-04-22 18:40:04       70 阅读
  2. Flask实现异步调用sqlalchemy的模型类

    2024-04-22 18:40:04       49 阅读
  3. 一篇文章熟练掌握 Axios

    2024-04-22 18:40:04       58 阅读
  4. 我的 Lisp 学习历程:从新手到熟练掌握

    2024-04-22 18:40:04       43 阅读
  5. android 全局异常处理封装

    2024-04-22 18:40:04       51 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-22 18:40:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-22 18:40:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-04-22 18:40:04       82 阅读
  4. Python语言-面向对象

    2024-04-22 18:40:04       91 阅读

热门阅读

  1. 百钱买百鸡

    2024-04-22 18:40:04       34 阅读
  2. MAC: 使用技巧

    2024-04-22 18:40:04       36 阅读
  3. C#面:使用 IEnumerable 实现斐波那契数列生成

    2024-04-22 18:40:04       38 阅读
  4. 自学新标日第十二课(完结)

    2024-04-22 18:40:04       39 阅读
  5. redis分布式锁到底怎么用

    2024-04-22 18:40:04       37 阅读
  6. python创建sqlite,并使用flask-sqlalchemy连接

    2024-04-22 18:40:04       34 阅读
  7. mysql 删除数据的四种方法

    2024-04-22 18:40:04       42 阅读
  8. tomcat热部署热加载原理剖析

    2024-04-22 18:40:04       38 阅读
  9. [leetcode] 796. 旋转字符串

    2024-04-22 18:40:04       37 阅读