前言
回归 Python 栈,相较 Go 的 Coding,Python 确实偏向复杂,看似编码方便快捷的背后,是越来越庞杂的细枝末节,稍不注意就是偏差。如果项目只是“能跑就行”,那大概率遍地是坑。开启踩坑记~
内存泄漏
服务内存缓慢持续上涨,内存泄漏,Python 相对还是好找一些,第一反应全局 Mutable 的变量再被持续 append? 寻迹之下发现不是,而是另外一种形式,危险的 Mutable 默认参数!
def some_decorator(default={}):
@functools.wraps(func)
def wrapper(*args, **kwargs):
res = func(*args, **kwargs)
if isinstance(kwargs['target'], dict):
for k, v in kwargs['target'].items():
default[k] = v
record(default)
return res
return wrapper
本意是该装饰器在使用时会传入字典变量 default
用以后续记录内容中做一些基本信息植入。当不传时默认给一个空 dict
。
问题就出现在这。
参数默认值的设定,都是在定义时设置一次,也就是说 default={}
的默认参数,并不是在每次调用时候,去执行 new
一个新 dict
传入 default
,那么在大量都是缺省default 参数指定的默认值调用中,后续装饰器的内就会去不停写入 default
默认值的 dict
中的内容,如果外层 kwargs['target']
中的 kv 又是大面积不重复的 kv 对,因此这里就变成一个 内存泄漏点,并且实际行为也是不符合预期的。
优化上文,剔除可变参数:
def some_decorator(default=None):
@functools.wraps(func)
def wrapper(*args, **kwargs):
local = default or {}
res = func(*args, **kwargs)
if isinstance(kwargs['target'], dict):
for k, v in kwargs['target'].items():
local[k] = v
record(local)
return res
return wrapper
真的修复了么?当使用是,不传入行参给 default
本地 new
一个 dict
貌似可以了。
# 如果这样使用呢?
@some_decorator(default={'test_key': 'test_value'})
def some_fun(*args, **kwargs):
pass
闭包陷阱
,装饰器声明时候调用一次,此时 loacl 被固定指向了 声明时传入的对象,后续所有的修改,都是在持续修改该对象,还是潜在泄漏
,修复如下:
def some_decorator():
@functools.wraps(func)
def wrapper(*args, **kwargs):
local = kwargs.get('default', {})
res = func(*args, **kwargs)
if isinstance(kwargs['target'], dict):
for k, v in kwargs['target'].items():
local[k] = v
record(local)
return res
return wrapper
pydantic
package 的安装使用不再赘述,参考 https://docs.pydantic.dev/latest/ 介绍,但是个别细节还是容易踩坑
首先是 从 BaseModel
集成来的子类定义式中的 变量命名
pydantic.BaseModel
的很大一个应用场景是 帮我们做一个 实例对象 属性类型的校验,下例会有什么效果?
import datetime
from pydantic import BaseModel
class Test(BaseModel):
attr: dict
_flag: str
t = Test(
attr={'time': datetime.datetime(2018, 1, 30, 13, 55, 28)},
_flag='hello'
)
print(t._flag)
print(t.json())
执行发
Traceback (most recent call last):
File "/Users/machao/miniconda3/envs/py3.9/lib/python3.9/site-packages/pydantic/main.py", line 746, in __getattr__
return self.__pydantic_private__[item] # type: ignore
KeyError: '_region'
_region
这种 ‘_’ 命名的私有变量不会再 实例对象中初始化,不妨 打印 t.__dict__
看下便知道
注释 print(t._flag)
程序顺利执行,这里 attr
中的 datetime
对象 顺利序列化了……
普通的 json.dumps()
都是要手动转一下 datetime
的,这里 json 顺利就完成了。
这种宽字典,其中 可选存在 datetime
字段时,如果想做中间缓存,手动 json.dumps()
存入 kv
又会遭中。
pydantic = 2.6.4
的较新版本中,对于转化就仅有的几句介绍:model_dump_json,能够对常规josn.dumps
中不兼容的 datetime
、date
或 UUID
做到兼容。
这里的内部实现没有明确说明,源码也没有,不过对于老一些的 pydantic = 2.6.4
他的兼容实现,就是类似调用 josn.dumps
的时候指定 default
,笔者在 旧版本上做过 debug
,内部就是指定了不同类型的 encoder
,可以参考 deprecated 中的 ENCODERS_BY_TYPE
线程
可能是 Go
的 Goroutine
用多了,Python
threading
很多细节被颠覆了,总觉得开了 threading
不 join
主程序执行完退出整个程序就退了,分别看下原始 threading
包和 concurrent
中的 ThreadPoolExecutor
threading
def rand_sleep(i):
time.sleep(random.randint(2, 10))
print(f'close thread: {i}')
# 样例一
if __name__ == '__main__':
threads = [Thread(target=rand_sleep, args=(i,)) for i in range(5)]
for t in threads:
t.start()
print('end main')
执行结果:
end main
close thread: 4
close thread: 3
close thread: 0
close thread: 1
close thread: 2
Process finished with exit code 0
主程序执行完后会等待开的子线程执行完毕方才退出,顺序上,threads
开启后主程序继续往后执行,如果想要 threads
开启后阻塞住当前主进程,需调整如下
# 样例二
if __name__ == '__main__':
threads = [Thread(target=rand_sleep, args=(i,)) for i in range(5)]
for t in threads:
t.start()
# t.join() 此处 join 变成串行了
for t in threads:
t.join()
print('end main')
执行结果
close thread: 1
close thread: 2
close thread: 3
close thread: 4
close thread: 0
end main
Process finished with exit code 0
Thread
初始化中有个默认参数 daemon
用来设置 守护线程,在默认情况下是,当前进程,如果
在样例一的编码中,维持其他不变,添加参数 daemon=True
# 样例三
if __name__ == '__main__':
threads = [Thread(target=rand_sleep, args=(i,), daemon=True) for i in range(5)]
for t in threads:
t.start()
print('end main')
此时主进程执行完即退出,不会等待子线程,除非类似 样例二中 手动 join
concurrent.futures
Python
3.2 引入的并发库,让我们的开启并行的形式更简明,
# 样例一
if __name__ == '__main__':
tasks = []
with ThreadPoolExecutor(max_workers=10) as executor:
for i in range(5):
tasks.append(executor.submit(rand_sleep, i))
print('end main')
执行结果
close thread: 4
close thread: 1
close thread: 0
close thread: 2
close thread: 3
end main
Process finished with exit code 0
没有手动去调用 join
啊,为什么会阻塞主线程了,ThreadPoolExecutor
的上下文协议管理做了什么?
# ThreadPoolExecutor 的父类
class Executor(object):
....
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
class ThreadPoolExecutor(_base.Executor):
......
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown
的方法里,默认是开启 wait
会 join
住主线程
# 样例二
if __name__ == '__main__':
tasks = []
executor = ThreadPoolExecutor(max_workers=10)
for i in range(5):
tasks.append(executor.submit(rand_sleep, i))
print('end main')
执行结果
end main
close thread: 1
close thread: 0
close thread: 2
close thread: 3
close thread: 4
Process finished with exit code 0
类似 threading
没有 join
的效果,翻看源码会发现,submit
内部会开启 Thread
效果仿佛类似 threading
等待底层的线程调度器来做逻辑处理,其实还是不一样
在 concurrent.futures.thread
中,定义了
def _python_exit():
global _shutdown
with _global_shutdown_lock:
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
t.join()
# 注册到 threading 包的 _register_atexit 中
threading._register_atexit(_python_exit)
# threading 中如下
def _register_atexit(func, *arg, **kwargs):
"""CPython internal: register *func* to be called before joining threads.
The registered *func* is called with its arguments just before all
non-daemon threads are joined in `_shutdown()`. It provides a similar
purpose to `atexit.register()`, but its functions are called prior to
threading shutdown instead of interpreter shutdown.
For similarity to atexit, the registered functions are called in reverse.
"""
if _SHUTTING_DOWN:
raise RuntimeError("can't register atexit after shutdown")
call = functools.partial(func, *arg, **kwargs)
_threading_atexits.append(call)
体会下这里的区别吧,并且在 concurrent.futures
还提供了两个模块级的函数 concurrent.futures.wait
和 concurrent.futures.as_completed
可以在细读下 doc
先到这~ 未完待续~