flask-(数据连接池的使用,定制命令,信号的使用,表关系的建立和查询)

连接池

    import pymysql
    from dbutils.pooled_db import PooledDB
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。
        ping=0,
        # ping MySQL服务端,检查是否服务可用。
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123',
        database='luffy',
        charset='utf8'
    )

    -第二步:使用
    from sql_pool import POOL # 做成单例
	conn = POOL.connection() # 从连接池种取一个链接(如果没有,阻塞在这)
    curser = conn.cursor()
    curser.execute('select * from luffy_order where id<2')
    res=curser.fetchall()
    print(res)

实例

from flask import Flask
app = Flask(__name__)

@app.route('/')
def hello_world():  # put application's code here
    conn = POOL.connection()  # 从连接池种取一个链接(如果没有,阻塞在这)
    curser = conn.cursor()
    curser.execute('select * from luffy_course where id<2')
    res = curser.fetchall()
    return '数据查到了 %s' % res
    

if __name__ == '__main__':
    app.run()

flask定制命令

使用 flask-script定制命令(老版本,不用了)
# flask 老版本中,没有命令运行项目,自定制命令
# flask-script 解决了这个问题:flask项目可以通过命令运行,可以定制命令
# 新版的flask--》官方支持定制命令  click 定制命令,这个模块就弃用了
# flask-migrate 老版本基于flask-script,新版本基于flask-click写的

### 使用步骤
	-1 pip3 install Flask-Script==2.0.3
    -2 pip3 install flask==1.1.4
    -3 pip3 install markupsafe=1.1.1
	-4 使用
    from flask_script import Manager
    manager = Manager(app)
    if __name__ == '__main__':
    	manager.run()
    -5 自定制命令
    @manager.command
    def custom(arg):
        """自定义命令
        python manage.py custom 123
        """
        print(arg)
        
    - 6 执行自定制命令
    python manage.py custom 123

新版本定制命令
from flask import Flask
import click
app = Flask(__name__)


@app.cli.command("create-user")
@click.argument("name")
def create_user(args):
    # from pool import POOL
    # conn=POOL.connection()
    # cursor=conn.cursor()
    # cursor.excute('insert into user (username,password) values (%s,%s)',args=[name,'123456'])
    # conn.commit()
    print(name)

@app.route('/')
def index():
    return 'index'

if __name__ == '__main__':
    app.run()

# 运行项目的命令是:flask --app py文件名字:app run
# 命令行中执行
# flask --app 7-flask命令:app create-user lqz
# 简写成 前提条件是 app所在的py文件名字叫 app.py
# flask create-user lqz

# django定制命令
# 1 app下新建文件夹
	management/commands/
# 2 在该文件夹下新建py文件,随便命名(命令名)

# 3 在py文件中写代码
from django.core.management.base import BaseCommand
class Command(BaseCommand):
    help = '命令提示'
    def handle(self, *args, **kwargs):
		命令逻辑  
# 4 使用命令
python manage.py  py文件(命令名)

flask 缓存的使用

from flask import Flask
from flask_caching import Cache

config = {
    "DEBUG": True,  # some Flask specific configs
    "CACHE_TYPE": "SimpleCache",  # Flask-Caching related configs
    "CACHE_DEFAULT_TIMEOUT": 300
}

app = Flask(__name__)
# tell Flask to use the above defined config
app.config.from_mapping(config)
cache = Cache(app)

@app.route('/')
def index():
    cache.set('name', 'xxx')
    return 'index'

@app.route('/get')
def get():
    res=cache.get('name')
    return res

if __name__ == '__main__':
    app.run()

flask信号的使用

# 内置信号--》flask请求过程中--》源码中定义的---》不需要我们定义和触发---》只要写了函数跟它对应--》执行到这,就会触发函数执行
# Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为

request_started = _signals.signal('request-started')                # 请求到来前执行
request_finished = _signals.signal('request-finished')              # 请求结束后执行

before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
 
got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
 
request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
 
appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发

# 内置信号的使用
# 案例 :
### 我们现在想在模板渲染之 : 记录日志  使用内置信号实现

# 1 写一个函数
def before_render(*args, **kwargs):
    print(args)
    print(kwargs)
    # 谁(ip) 在什么时间  访问了哪个页面(template)
    print('记录日志,模板要渲染了')

# 2 跟内置信号绑定
from flask.signals import before_render_template
before_render_template.connect(before_render) # 信号触发的函数

# 3 源码中触发信号执行(我们不需要动)
# before_render_template.send()  源码再模板渲染之前,它写死了

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/login')
def login():
    return render_template('login.html')
    
if __name__ == '__main__':
    app.run()



# 自定义信号
from flask import Flask, render_template
from flask.signals import _signals
import pymysql

app = Flask(__name__)
app.debug = True

# 1 定义信号
# 自定义信号
db_save = _signals.signal('db_save')

# 2 写一个函数
def db_save_fun(*args, **kwargs):
    print(args)
    print(kwargs)
    print('表数据插入了')

# 3 跟自定义置信号绑定
db_save.connect(db_save_fun)

# 3 触发信号执行(需要我们做)
# before_render_template.send()  源码再模板渲染之前,它写死了

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/create_article')
def create_article():
    conn = pymysql.connect(host='127.0.0.1', user='root', password='1234', database='cnblogs')
    cursor = conn.cursor()
    cursor.execute('insert into article (title,author) VALUES (%s,%s)', args=['测试测试标题', '测试作者测试'])
    conn.commit()
    # 手动触发信号
    db_save.send(table_name='article',info={'title':'测试测试标题','author':'测试作者测试'})
    return '插入成功'

if __name__ == '__main__':
    app.run()


# 步骤:
# 1 定义信号
db_save = _signals.signal('db_save')
# 2 写一个函数
def db_save_fun(*args, **kwargs):
    print(args)
    print(kwargs)
    print('表数据插入了')

# 3 跟自定义置信号绑定
db_save.connect(db_save_fun)

# 3 触发信号执行(需要我们做)
db_save.send()  # 需要在代码中写,可以传参数,传入的参数--》db_save_fun 能拿到

sqlalchemy原生操作

# 操作原生sql ---》用得少
import pymysql
import threading
# 1 导入
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
# 2 创建引擎
engine = create_engine(
    "mysql+pymysql://root:1234@127.0.0.1:3306/cnblogs",  
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,     # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 3 使用引擎,拿到链接
# conn = engine.raw_connection()
# # 4 剩下的一样了
# cursor=conn.cursor(pymysql.cursors.DictCursor)
# cursor.execute('select * from article limit 10')
# res=cursor.fetchall()
# print(res)

## 多线程测试:
def task(arg):
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from article"
    )
    result = cursor.fetchall()
    print(result)
    cursor.close()
    conn.close()

for i in range(20):
    t = threading.Thread(target=task, args=(i,))
    t.start()

sqlalchemy操作表

# 1 建个 models.py  里面写 表模型
# 2 把表模型---》同步到数据库中
# 3 增删查改
# 1 导入
import datetime  # 时间模块
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()  # Base 当成 django的 models.Model

# 2 创建表模型
class User(Base):  # 模型层的名字
    __tablename__ = 'users'  # 数据库表的名字
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(32), index=True, nullable=True)
    email = Column(String(32), unique=True)
    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    ctime = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text)

# 3 没有迁移命令---》后期使用第三方模块,可以有命令
# 目前需要手动做
# sqlalchemy 不能创建数据库,能创建表,删除表,不能删除增加字段(第三方模块)
# 3.1 创建引擎
engine = create_engine(
    "mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
## 3.2 把表模型同步到数据库中
Base.metadata.create_all(engine)  

# 3.3 同步和删除表只能执行一条
# Base.metadata.drop_all(engine)

flask orm操作表

基本操作
from models import User
from sqlalchemy import create_engine

# 1 创建引擎
engine = create_engine(
    "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy01",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 2 orm操作--》借助于 engine 得到session(conn)对象
from sqlalchemy.orm import sessionmaker

Connection = sessionmaker(bind=engine) # 绑定连接
session = Connection()

# 3 使用conn---》进行orm操作
# 3.1 增加数据

# user = User(name='lqz', email='3@qq.com')
# # 插入到数据库
# conn.add(user)  # 放个对象

# # 提交
# conn.commit()
# # 关闭链接
# conn.close()

# 3.2 查询数据
# 查询User表中id为1的所有记录--》放到列表中
# res=conn.query(User).filter_by(id=1).all()
# print(res)

## 3.3 删除
# res = conn.query(User).filter_by(name='lqz').delete()
# print(res)
# conn.commit()

## 3.4 修改
# res=conn.query(User).filter_by(name='9999').update({'extra':'xxsss'})
# conn.commit()

一对多的增加和跨表查询 (一对一只需要关联字段加上 ,unique=True)

modles.py文件
import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()  # Base 当成 models.Model

# 一对多 :一个兴趣被多个人喜欢  一个人只喜欢一个兴趣
class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')

    def __str__(self):
        return self.caption

    def __repr__(self):
        return self.caption

class Person(Base):
    __tablename__ = 'person'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是类名,uselist=False
    # 外键关联--》强外键--》物理外键
    hobby_id = Column(Integer, ForeignKey("hobby.id"))  # 如果是一对一加上unique=True

    # 跟数据库无关,不会新增字段,只用于快速连表操作
    # 类名,backref用于反向查询
    hobby = relationship('Hobby', backref='pers') # 重点!!!!!!!! 正反向查询要通过这个字段

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


# views.py中
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Hobby, Person, User

engine = create_engine(
    "mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Session = sessionmaker(bind=engine)
session = Session()

# 1  增加 Hobby
# hobby = Hobby(caption='足球')
# hobby1 = Hobby(caption='篮球')
# session.add_all([hobby, hobby1])   # 列表套对象
# session.commit()      # 一定要记得提交


# 2 增加Person
# p1 = Person(name='彭于晏', hobby_id=1) # 可以是查询出来的对象 也可以是id 但是id写死了
# p2 = Person(name='刘亦菲', hobby_id=2)
# session.add_all([p1, p2])
# session.commit()


# 3 简便方式增加person---》增加Person,直接新增Hobby
# hobby1 = Hobby(caption='乒乓球')
# # 同时把数据插入两张表
# p1 = Person(name='彭于晏', hobby=hobby1) # 前提是表模型 必须有 relationship
# session.add(p1)
# session.commit()

# # 4 基于对象的跨表查询---正向
# per = session.query(Person).filter_by(name='彭于晏').first()
# print(per)
# # 正向 Person的对象通过 hobby 查询到 Hobby的caption字段
# print(per.hobby.caption)

# 5 基于对象的跨表查询---正向
# hobby=session.query(Hobby).filter_by(caption='乒乓球').first()
# print(hobby)

# 反向--->拿到多条 可以通过索引取值
hobby=session.query(Hobby).filter_by(caption='乒乓球').first()
print(hobby.pers)
print(hobby.pers[0].name)  # 列表套对象

多对多关系的增加和查询

# 多对多 models.py
class Boy2Girl(Base):
    __tablename__ = 'boy2girl'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))
    # boy = relationship('Boy', backref='boy'
    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)


class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(64), unique=True, nullable=False)

    # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以--等同于manytomany
    girls = relationship('Girl', secondary='boy2girl', backref='boys')

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name

if __name__ == '__main__':
    # 3.1 创建引擎
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    ## 3.2 把表模型同步到数据库中
    Base.metadata.create_all(engine)

    # 3.3 删除表
    # Base.metadata.drop_all(engine)


# 视图类中
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Hobby, Person, User, Girl, Boy2Girl, Boy

engine = create_engine(
    "mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Session = sessionmaker(bind=engine)
session = Session()

# 第一种简便方式增加

# obj2 = Girl(name='张亦菲')
# obj3 = Girl(name='李娜扎')
# obj1 = Boy(name='张小勇', girls=[obj2, obj3]) # 在Boy 添加一个 通过girls关键字往 Girl 加上两个并建立关系
#
# session.add(obj1)
# session.commit()

# # 4 基于对象的跨表查询---正向
boy = session.query(Boy).filter_by(name='张小勇').first()
print(boy.girls[0].name)

# 5 基于对象的跨表查询---反向
# girl=session.query(Girl).filter_by(name='张亦菲').first()
# print(girl.boys)


# 第二中添加方式 一张一张表的添加
# 1  增加 Boy
# boy = Boy(name='王小刚')
# boy2 = Boy(name='王小明')
# boy3 = Boy(name='王小勇')
# session.add_all([boy,boy2,boy3])
# session.commit()

# 2 增加Girl
# girl = Girl(name='张小华')
# girl2 = Girl(name='刘小红')
# girl3 = Girl(name='李小丽')
# session.add_all([girl3, girl2, girl])
# session.commit()

# 3 增加Boy2Girl
# obj1=Boy2Girl(boy_id=1,girl_id=1)
# obj2=Boy2Girl(boy_id=1,girl_id=2)
# obj3=Boy2Girl(boy_id=1,girl_id=3)
# session.add_all([obj2, obj3, obj1])
# session.commit()

多对多基本的增删改查

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import User, Person, Hobby, Boy, Girl, Boy2Girl
from sqlalchemy.sql import text

# 添加
# obj1 = Boy(name='张小勇', girls=[obj2, obj3]) # 在Boy 添加一个 通过girls关键字往 Girl 加上两个并建立关系
# session.add(obj1)

# 2 删除
# 2.1 session.query(Users).filter_by(id=1).delete()
# 2.1 session.delete(对象)
# user = session.query(User).filter_by(id=1).first()
# session.delete(user)
# session.commit()

#  修改
#  1 方式一:
# session.query(Boy).filter_by(id=1).update({'name':'lqz'})
# session.commit()

# # 2 方式二  类名.属性名,作为要修改的key
# session.query(Boy).filter_by(id=4).update({Boy.name:'lqz1'})
# session.commit()

# id为4的人的名字后+  _nb   类似于django的 F 查询
# session.query(User).filter_by(id=2).update({'name':User.name+'_nb'},synchronize_session=False) # 字符串拼接
# session.query(User).filter_by(id=2).update({'id':User.id+6}, synchronize_session="evaluate") # 数字之间加
# session.commit()

# # 3 方式三:
# 对象.name='xxx'
#session.add(对象)
# boy=session.query(Boy).filter_by(id=1).first()
# boy.name='xxzzyy'
# session.add(boy) # 有id就是修改,没有就是新增
# session.commit()

相关推荐

  1. 数据库连接概念原理

    2024-04-01 10:32:03       7 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-04-01 10:32:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-04-01 10:32:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-04-01 10:32:03       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-04-01 10:32:03       20 阅读

热门阅读

  1. ctf题目

    ctf题目

    2024-04-01 10:32:03      17 阅读
  2. TS全栈开发(React+Next.js+Nest.js+UniApp/Vue)项目

    2024-04-01 10:32:03       12 阅读
  3. 【Linux的进程篇章 - 冯诺依曼的体系结构】

    2024-04-01 10:32:03       30 阅读
  4. AI 在鞋服零售行业的应用与畅想

    2024-04-01 10:32:03       17 阅读