连接池
import pymysql
from dbutils. pooled_db import PooledDB
POOL = PooledDB(
creator= pymysql,
maxconnections= 6 ,
mincached= 2 ,
maxcached= 5 ,
maxshared= 3 ,
blocking= True ,
maxusage= None ,
setsession= [ ] ,
ping= 0 ,
host= '127.0.0.1' ,
port= 3306 ,
user= 'root' ,
password= '123' ,
database= 'luffy' ,
charset= 'utf8'
)
- 第二步: 使用
from sql_pool import POOL
conn = POOL. connection( )
curser = conn. cursor( )
curser. execute( 'select * from luffy_order where id<2' )
res= curser. fetchall( )
print ( res)
实例
from flask import Flask
app = Flask( __name__)
@app. route ( '/' )
def hello_world ( ) :
conn = POOL. connection( )
curser = conn. cursor( )
curser. execute( 'select * from luffy_course where id<2' )
res = curser. fetchall( )
return '数据查到了 %s' % res
if __name__ == '__main__' :
app. run( )
flask定制命令
使用 flask- script定制命令( 老版本,不用了)
- 1 pip3 install Flask- Script== 2.0 .3
- 2 pip3 install flask== 1.1 .4
- 3 pip3 install markupsafe= 1.1 .1
- 4 使用
from flask_script import Manager
manager = Manager( app)
if __name__ == '__main__' :
manager. run( )
- 5 自定制命令
@manager. command
def custom ( arg) :
"""自定义命令
python manage.py custom 123
"""
print ( arg)
- 6 执行自定制命令
python manage. py custom 123
新版本定制命令
from flask import Flask
import click
app = Flask( __name__)
@app. cli. command ( "create-user" )
@click. argument ( "name" )
def create_user ( args) :
print ( name)
@app. route ( '/' )
def index ( ) :
return 'index'
if __name__ == '__main__' :
app. run( )
management/ commands/
from django. core. management. base import BaseCommand
class Command ( BaseCommand) :
help = '命令提示'
def handle ( self, * args, ** kwargs) :
命令逻辑
python manage. py py文件( 命令名)
flask 缓存的使用
from flask import Flask
from flask_caching import Cache
config = {
"DEBUG" : True ,
"CACHE_TYPE" : "SimpleCache" ,
"CACHE_DEFAULT_TIMEOUT" : 300
}
app = Flask( __name__)
app. config. from_mapping( config)
cache = Cache( app)
@app. route ( '/' )
def index ( ) :
cache. set ( 'name' , 'xxx' )
return 'index'
@app. route ( '/get' )
def get ( ) :
res= cache. get( 'name' )
return res
if __name__ == '__main__' :
app. run( )
flask信号的使用
request_started = _signals. signal( 'request-started' )
request_finished = _signals. signal( 'request-finished' )
before_render_template = _signals. signal( 'before-render-template' )
template_rendered = _signals. signal( 'template-rendered' )
got_request_exception = _signals. signal( 'got-request-exception' )
request_tearing_down = _signals. signal( 'request-tearing-down' )
appcontext_tearing_down = _signals. signal( 'appcontext-tearing-down' )
appcontext_pushed = _signals. signal( 'appcontext-pushed' )
appcontext_popped = _signals. signal( 'appcontext-popped' )
message_flashed = _signals. signal( 'message-flashed' )
def before_render ( * args, ** kwargs) :
print ( args)
print ( kwargs)
print ( '记录日志,模板要渲染了' )
from flask. signals import before_render_template
before_render_template. connect( before_render)
@app. route ( '/' )
def index ( ) :
return render_template( 'index.html' )
@app. route ( '/login' )
def login ( ) :
return render_template( 'login.html' )
if __name__ == '__main__' :
app. run( )
from flask import Flask, render_template
from flask. signals import _signals
import pymysql
app = Flask( __name__)
app. debug = True
db_save = _signals. signal( 'db_save' )
def db_save_fun ( * args, ** kwargs) :
print ( args)
print ( kwargs)
print ( '表数据插入了' )
db_save. connect( db_save_fun)
@app. route ( '/' )
def index ( ) :
return render_template( 'index.html' )
@app. route ( '/create_article' )
def create_article ( ) :
conn = pymysql. connect( host= '127.0.0.1' , user= 'root' , password= '1234' , database= 'cnblogs' )
cursor = conn. cursor( )
cursor. execute( 'insert into article (title,author) VALUES (%s,%s)' , args= [ '测试测试标题' , '测试作者测试' ] )
conn. commit( )
db_save. send( table_name= 'article' , info= { 'title' : '测试测试标题' , 'author' : '测试作者测试' } )
return '插入成功'
if __name__ == '__main__' :
app. run( )
db_save = _signals. signal( 'db_save' )
def db_save_fun ( * args, ** kwargs) :
print ( args)
print ( kwargs)
print ( '表数据插入了' )
db_save. connect( db_save_fun)
db_save. send( )
sqlalchemy原生操作
import pymysql
import threading
from sqlalchemy import create_engine
from sqlalchemy. engine. base import Engine
engine = create_engine(
"mysql+pymysql://root:1234@127.0.0.1:3306/cnblogs" ,
max_overflow= 0 ,
pool_size= 5 ,
pool_timeout= 30 ,
pool_recycle= - 1
)
def task ( arg) :
conn = engine. raw_connection( )
cursor = conn. cursor( )
cursor. execute(
"select * from article"
)
result = cursor. fetchall( )
print ( result)
cursor. close( )
conn. close( )
for i in range ( 20 ) :
t = threading. Thread( target= task, args= ( i, ) )
t. start( )
sqlalchemy操作表
import datetime
from sqlalchemy import create_engine
from sqlalchemy. orm import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base( )
class User ( Base) :
__tablename__ = 'users'
id = Column( Integer, primary_key= True , autoincrement= True )
name = Column( String( 32 ) , index= True , nullable= True )
email = Column( String( 32 ) , unique= True )
ctime = Column( DateTime, default= datetime. datetime. now)
extra = Column( Text)
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01" ,
max_overflow= 0 ,
pool_size= 5 ,
pool_timeout= 30 ,
pool_recycle= - 1
)
Base. metadata. create_all( engine)
flask orm操作表
基本操作
from models import User
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy01" ,
max_overflow= 0 ,
pool_size= 5 ,
pool_timeout= 30 ,
pool_recycle= - 1
)
from sqlalchemy. orm import sessionmaker
Connection = sessionmaker( bind= engine)
session = Connection( )
一对多的增加和跨表查询 (一对一只需要关联字段加上 ,unique=True)
modles. py文件
import datetime
from sqlalchemy import create_engine
from sqlalchemy. orm import declarative_base, relationship
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base( )
class Hobby ( Base) :
__tablename__ = 'hobby'
id = Column( Integer, primary_key= True )
caption = Column( String( 50 ) , default= '篮球' )
def __str__ ( self) :
return self. caption
def __repr__ ( self) :
return self. caption
class Person ( Base) :
__tablename__ = 'person'
id = Column( Integer, primary_key= True )
name = Column( String( 32 ) , index= True , nullable= True )
hobby_id = Column( Integer, ForeignKey( "hobby.id" ) )
hobby = relationship( 'Hobby' , backref= 'pers' )
def __str__ ( self) :
return self. name
def __repr__ ( self) :
return self. name
from sqlalchemy import create_engine
from sqlalchemy. orm import sessionmaker
from models import Hobby, Person, User
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01" ,
max_overflow= 0 ,
pool_size= 5 ,
pool_timeout= 30 ,
pool_recycle= - 1
)
Session = sessionmaker( bind= engine)
session = Session( )
hobby= session. query( Hobby) . filter_by( caption= '乒乓球' ) . first( )
print ( hobby. pers)
print ( hobby. pers[ 0 ] . name)
多对多关系的增加和查询
class Boy2Girl ( Base) :
__tablename__ = 'boy2girl'
id = Column( Integer, primary_key= True , autoincrement= True )
girl_id = Column( Integer, ForeignKey( 'girl.id' ) )
boy_id = Column( Integer, ForeignKey( 'boy.id' ) )
def __str__ ( self) :
return self. name
def __repr__ ( self) :
return self. name
class Girl ( Base) :
__tablename__ = 'girl'
id = Column( Integer, primary_key= True )
name = Column( String( 64 ) , unique= True , nullable= False )
class Boy ( Base) :
__tablename__ = 'boy'
id = Column( Integer, primary_key= True , autoincrement= True )
name = Column( String( 64 ) , unique= True , nullable= False )
girls = relationship( 'Girl' , secondary= 'boy2girl' , backref= 'boys' )
def __str__ ( self) :
return self. name
def __repr__ ( self) :
return self. name
if __name__ == '__main__' :
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01" ,
max_overflow= 0 ,
pool_size= 5 ,
pool_timeout= 30 ,
pool_recycle= - 1
)
Base. metadata. create_all( engine)
from sqlalchemy import create_engine
from sqlalchemy. orm import sessionmaker
from models import Hobby, Person, User, Girl, Boy2Girl, Boy
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01" ,
max_overflow= 0 ,
pool_size= 5 ,
pool_timeout= 30 ,
pool_recycle= - 1
)
Session = sessionmaker( bind= engine)
session = Session( )
boy = session. query( Boy) . filter_by( name= '张小勇' ) . first( )
print ( boy. girls[ 0 ] . name)
多对多基本的增删改查
from sqlalchemy. orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy. orm import scoped_session
from models import User, Person, Hobby, Boy, Girl, Boy2Girl
from sqlalchemy. sql import text