SQLAlchemy中常用的查询方法[示例学习]

SQLAlchemy 是一个强大的 Python ORM(对象关系映射)工具,它提供了多种方法来执行数据库查询操作。以下是 SQLAlchemy 中常用的查询方法的总结:

  1. session.query():使用 session.query(Model) 来创建一个查询对象,其中 Model 是你要查询的数据库模型类。

  2. filter():在查询对象上使用 filter() 方法可以添加过滤条件,例如 filter(Model.column == value)

  3. all():使用 all() 方法可以获取查询的所有结果,并以列表的形式返回。

  4. first():使用 first() 方法可以获取查询结果的第一条记录。

  5. get():使用 get(primary_key_value) 方法可以根据主键值直接获取相应的记录。

  6. filter_by():使用 filter_by(column=value) 方法可以根据指定列的数值进行过滤。

  7. join():使用 join() 方法可以进行表的内连接查询。

  8. outerjoin():使用 outerjoin() 方法可以进行表的外连接查询。

  9. group_by():使用 group_by() 方法可以按指定列分组查询。

  10. order_by():使用 order_by() 方法可以对查询结果进行排序。

  11. count():使用 count() 方法可以统计查询结果的数量。

  12. delete():使用 delete() 方法可以删除满足条件的记录。

  13. update():使用 update() 方法可以更新满足条件的记录。

from sqlalchemy.orm import declarative_base,sessionmaker
from sqlalchemy import create_engine, ForeignKey

from sqlalchemy import Column, Integer, String

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

class Address(Base):
    __tablename__ = 'addresses'
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    street = Column(String)
    city = Column(String)

# 创建数据库连接
engine = create_engine('sqlite:///users_addresses_example.db')
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 清空 User 表数据
session.query(User).delete()
session.commit()

# 清空 Address 表数据
session.query(Address).delete()
session.commit()

# 插入数据到 User 表
user1 = User(name='Alice', age=25)
user2 = User(name='Bob', age=30)
user3 = User(name='Charlie', age=22)
user4 = User(name='pemp', age=34)
user5 = User(name='pita', age=39)

session.add_all([user1, user2, user3, user4, user5])
session.commit()

# 插入数据到 Address 表
address1 = Address(user_id=user1.id, street='123 Main St', city='New York')
address2 = Address(user_id=user2.id, street='456 Park Ave', city='Los Angeles')
address3 = Address(user_id=user3.id, street='789 Elm St', city='Chicago')

session.add_all([address1, address2, address3])
session.commit()

# 进行表的联合查询
result = session.query(User, Address).join(Address, User.id == Address.user_id).all()

for user, address in result:
    print(f"User: {user.name}, Age: {user.age}, Address: {address.street}, {address.city}")

print('*'*40)

from sqlalchemy import and_

# 进行表的外连接查询
# outerjoin外连接查询将返回左表User的所有记录,以及右表Address 表中与左表记录关联的数据,如果没有匹配的记录,则右表数据部分为 NULL。
# result = session.query(User.id, Address.id).outerjoin(Address, User.id == Address.user_id).all()

# # 找出没有关联的记录
# unassociated_records = [(user_id, address_id) for user_id, address_id in result if address_id is None]

# print("User 和 Address 表中没有关联的 ID:")
# for user_id, _ in unassociated_records:
#     print(f"独立User ID: {user_id}")

# 进行表的外连接查询
result = session.query(User, Address).outerjoin(Address, User.id == Address.user_id).all()

for user, address in result:
    if address is None:
        print(f"独立User: {user.name}, Age: {user.age}, Address: None")
    else:
        print(f"User(关联ID): {user.name}, Age: {user.age}, Address: {address.street}, {address.city}")

# 关闭会话
session.close()
from sqlalchemy import create_engine, Column, Integer, String,select
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import func

# 创建数据库连接
engine = create_engine('sqlite:///users_example.db')

Base = declarative_base()

# 定义模型类
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

# 创建数据表
Base.metadata.create_all(engine)

# 创建 Session 类
Session = sessionmaker(bind=engine)
session = Session()

# 事先清空数据
session.query(User).delete() 
session.commit()

# 插入更多数据
users_data = [
    {'name': 'Charlie', 'age': 28},
    {'name': 'David', 'age': 32},
    {'name': 'Eve', 'age': 27},
    {'name': 'gemm', 'age': 43},
    {'name': 'riyu', 'age': 43}
]

for user_data in users_data:
    user = User(**user_data)
    session.add(user)
session.commit()

# 查询数据
query = session.query(User)

# 使用 filter() 方法添加过滤条件
result = query.filter(User.age > 25).all()
print("年龄大于25的用户:", [(user.name, user.age) for user in result])

# 使用 func.group_concat() 函数将同一年龄下的用户名合并成一个字符串
result = session.query(User.age, func.group_concat(User.name)).group_by(User.age).all()
print("按年龄分组查询:")
for age, names in result:
    name_list = names.split(',')
    print(f"年龄 {age} 的用户有:{', '.join(name_list)}")


# # 使用 group_by() 方法按年龄分组查询
# result = query.with_entities(User.age, func.group_concat(User.name)).group_by(User.age).all()
# print("按年龄分组查询:")
# for age, names in result:
#     name_list = names.split(',')
#     print(f"年龄 {age} 的用户有:{', '.join(name_list)}")

# 使用 order_by() 方法对结果进行排序
result = query.order_by(User.age).all()
print("按年龄排序:", [(user.name, user.age) for user in result])

# 统计查询结果的数量
count = query.count()
print("查询结果的数量:", count)

# 删除满足条件的记录
session.query(User).filter(User.name == 'Charlie').delete()
session.commit()

# 更新满足条件的记录
session.query(User).filter(User.name == 'David').update({User.age: 35})
session.commit()

# 执行 SELECT 查询
# 使用 select(User) 会创建一个 SELECT 查询,查询的对象是 User 这个表格(模型类)。这意味着你将检索 User 表中的所有列
with engine.connect() as connection:
    stmt = select(User).where(User.age > 25)
    result = connection.execute(stmt)
    for row in result:
        print(row)

---------------
sqlalchemy
select 函数的参数可以是一个或多个表达式,用于指定要查询的列。
它接受一个可迭代对象作为参数,该可迭代对象包含要选择的列或其他表达式。

以下是 select 函数的基本语法:

select(columns, whereclause=None, from_obj=[], **kwargs)

  • columns:要选择的列或其他表达式,可以是一个或多个。select(User模型类)
  • whereclause:可选参数,用于指定 WHERE 子句中的条件表达式。
  • from_obj:可选参数,用于指定查询的来源表(FROM 子句)。
  • **kwargs:其他可选参数,例如 group_byhavingorder_by 等。

在使用 select 函数时,你至少需要提供一个 columns 参数来指定要选择的列。其他参数都是可选的,根据实际需要来决定是否使用。
------------------------

session.scalars() 方法是 SQLAlchemy 中用于执行查询并返回标量值(Scalar)的方法。在 ORM 查询中,当你只需要获取一列数据的值而不是整个对象时,可以使用 session.scalars() 方法。 

from sqlalchemy import select

# 创建一个 SELECT 查询,选择名字为 "spongebob" 或 "sandy" 的用户的 id 列
stmt = select(User.id).where(User.name.in_(["spongebob", "sandy"]))

# 使用会话执行查询
with Session(engine) as session:
    # 执行查询并返回标量值
    for user_id in session.scalars(stmt):
        print(user_id)

 

相关推荐

  1. SQLAlchemy常用查询方法[示例学习]

    2024-03-31 11:30:03       14 阅读
  2. node.js常用命令及示例

    2024-03-31 11:30:03       20 阅读
  3. React Router 常用方法总结

    2024-03-31 11:30:03       15 阅读
  4. python学习笔记(常用方法

    2024-03-31 11:30:03       17 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-31 11:30:03       19 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-31 11:30:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-31 11:30:03       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-31 11:30:03       20 阅读

热门阅读

  1. k8s学习

    k8s学习

    2024-03-31 11:30:03      20 阅读
  2. Mybatis相关面试题详细总结

    2024-03-31 11:30:03       14 阅读
  3. 采药采药~

    2024-03-31 11:30:03       16 阅读
  4. linux redis 快速安装

    2024-03-31 11:30:03       15 阅读
  5. 2952. 需要添加的硬币的最小数量

    2024-03-31 11:30:03       19 阅读
  6. springcloud第4季 远程调用openfegin的介绍4

    2024-03-31 11:30:03       14 阅读