Python 踩坑记


回归 Python 栈,相较 Go 的 Coding,Python 确实偏向复杂,看似编码方便快捷的背后,是越来越庞杂的细枝末节,稍不注意就是偏差。如果项目只是“能跑就行”,那大概率遍地是坑。开启踩坑记~


服务内存缓慢持续上涨,内存泄漏,Python 相对还是好找一些,第一反应全局 Mutable 的变量再被持续 append? 寻迹之下发现不是,而是另外一种形式,危险的 Mutable 默认参数!

def some_decorator(default={}):

    def wrapper(*args, **kwargs):
    	res = func(*args, **kwargs)
    	if isinstance(kwargs['target'], dict):
            for k, v in kwargs['target'].items():
                default[k] = v
        return res
    return wrapper

本意是该装饰器在使用时会传入字典变量 default 用以后续记录内容中做一些基本信息植入。当不传时默认给一个空 dict
参数默认值的设定,都是在定义时设置一次,也就是说 default={} 的默认参数,并不是在每次调用时候,去执行 new 一个新 dict 传入 default,那么在大量都是缺省default 参数指定的默认值调用中,后续装饰器的内就会去不停写入 default 默认值的 dict 中的内容,如果外层 kwargs['target'] 中的 kv 又是大面积不重复的 kv 对,因此这里就变成一个 内存泄漏点,并且实际行为也是不符合预期的。

def some_decorator(default=None):

    def wrapper(*args, **kwargs):
    	local = default or {}
    	res = func(*args, **kwargs)
    	if isinstance(kwargs['target'], dict):
            for k, v in kwargs['target'].items():
                local[k] = v
        return res
    return wrapper

真的修复了么?当使用是,不传入行参给 default 本地 new 一个 dict 貌似可以了。

# 如果这样使用呢?
@some_decorator(default={'test_key': 'test_value'})
def some_fun(*args, **kwargs):

闭包陷阱,装饰器声明时候调用一次,此时 loacl 被固定指向了 声明时传入的对象,后续所有的修改,都是在持续修改该对象,还是潜在泄漏,修复如下:

def some_decorator():

    def wrapper(*args, **kwargs):
    	local = kwargs.get('default', {})
    	res = func(*args, **kwargs)
    	if isinstance(kwargs['target'], dict):
            for k, v in kwargs['target'].items():
                local[k] = v
        return res
    return wrapper


package 的安装使用不再赘述,参考 介绍,但是个别细节还是容易踩坑
首先是 从 BaseModel 集成来的子类定义式中的 变量命名
pydantic.BaseModel 的很大一个应用场景是 帮我们做一个 实例对象 属性类型的校验,下例会有什么效果?

import datetime
from pydantic import BaseModel

class Test(BaseModel):
    attr: dict
    _flag: str

t = Test(
	attr={'time': datetime.datetime(2018, 1, 30, 13, 55, 28)}, 


Traceback (most recent call last):
  File "/Users/machao/miniconda3/envs/py3.9/lib/python3.9/site-packages/pydantic/", line 746, in __getattr__
    return self.__pydantic_private__[item]  # type: ignore
KeyError: '_region'

_region 这种 ‘_’ 命名的私有变量不会再 实例对象中初始化,不妨 打印 t.__dict__ 看下便知道
注释 print(t._flag) 程序顺利执行,这里 attr 中的 datetime 对象 顺利序列化了……
普通的 json.dumps() 都是要手动转一下 datetime 的,这里 json 顺利就完成了。
这种宽字典,其中 可选存在 datetime 字段时,如果想做中间缓存,手动 json.dumps() 存入 kv 又会遭中。
pydantic = 2.6.4 的较新版本中,对于转化就仅有的几句介绍:model_dump_json,能够对常规josn.dumps 中不兼容的 datetimedateUUID 做到兼容。
这里的内部实现没有明确说明,源码也没有,不过对于老一些的 pydantic = 2.6.4 他的兼容实现,就是类似调用 josn.dumps的时候指定 default,笔者在 旧版本上做过 debug,内部就是指定了不同类型的 encoder,可以参考 deprecated 中的 ENCODERS_BY_TYPE


可能是 GoGoroutine 用多了,Python threading 很多细节被颠覆了,总觉得开了 threadingjoin 主程序执行完退出整个程序就退了,分别看下原始 threading 包和 concurrent 中的 ThreadPoolExecutor


def rand_sleep(i):
    time.sleep(random.randint(2, 10))
    print(f'close thread: {i}')

#  样例一
if __name__ == '__main__':
    threads = [Thread(target=rand_sleep, args=(i,)) for i in range(5)]
    for t in threads:
    print('end main')


end main
close thread: 4
close thread: 3
close thread: 0
close thread: 1
close thread: 2

Process finished with exit code 0

主程序执行完后会等待开的子线程执行完毕方才退出,顺序上,threads 开启后主程序继续往后执行,如果想要 threads 开启后阻塞住当前主进程,需调整如下

#  样例二
if __name__ == '__main__':
    threads = [Thread(target=rand_sleep, args=(i,)) for i in range(5)]
    for t in threads:
        # t.join() 此处 join 变成串行了
	for t in threads:
    print('end main')


close thread: 1
close thread: 2
close thread: 3
close thread: 4
close thread: 0
end main

Process finished with exit code 0

Thread 初始化中有个默认参数 daemon 用来设置 守护线程,在默认情况下是,当前进程,如果
在样例一的编码中,维持其他不变,添加参数 daemon=True

#  样例三
if __name__ == '__main__':
    threads = [Thread(target=rand_sleep, args=(i,), daemon=True) for i in range(5)]
    for t in threads:
    print('end main')

此时主进程执行完即退出,不会等待子线程,除非类似 样例二中 手动 join


Python 3.2 引入的并发库,让我们的开启并行的形式更简明,

# 样例一
if __name__ == '__main__':
	tasks = []
	with ThreadPoolExecutor(max_workers=10) as executor:
	    for i in range(5):
	        tasks.append(executor.submit(rand_sleep, i))
	print('end main')


close thread: 4
close thread: 1
close thread: 0
close thread: 2
close thread: 3
end main

Process finished with exit code 0

没有手动去调用 join 啊,为什么会阻塞主线程了,ThreadPoolExecutor上下文协议管理做了什么?

# ThreadPoolExecutor 的父类
class Executor(object):
    def __exit__(self, exc_type, exc_val, exc_tb):
        return False
class ThreadPoolExecutor(_base.Executor):
	def shutdown(self, wait=True, *, cancel_futures=False):
        with self._shutdown_lock:
            self._shutdown = True
            if cancel_futures:
                # Drain all work items from the queue, and then cancel their
                # associated futures.
                while True:
                        work_item = self._work_queue.get_nowait()
                    except queue.Empty:
                    if work_item is not None:

            # Send a wake-up to prevent threads calling
            # _work_queue.get(block=True) from permanently blocking.
        if wait:
            for t in self._threads:

shutdown 的方法里,默认是开启 waitjoin 住主线程

# 样例二
if __name__ == '__main__':
	tasks = []
	executor = ThreadPoolExecutor(max_workers=10)
    for i in range(5):
        tasks.append(executor.submit(rand_sleep, i))
	print('end main')


end main
close thread: 1
close thread: 0
close thread: 2
close thread: 3
close thread: 4

Process finished with exit code 0

类似 threading 没有 join 的效果,翻看源码会发现,submit 内部会开启 Thread 效果仿佛类似 threading 等待底层的线程调度器来做逻辑处理,其实还是不一样

concurrent.futures.thread 中,定义了

def _python_exit():
    global _shutdown
    with _global_shutdown_lock:
        _shutdown = True
    items = list(_threads_queues.items())
    for t, q in items:
    for t, q in items:

# 注册到 threading 包的 _register_atexit 中

# threading 中如下
def _register_atexit(func, *arg, **kwargs):
    """CPython internal: register *func* to be called before joining threads.

    The registered *func* is called with its arguments just before all
    non-daemon threads are joined in `_shutdown()`. It provides a similar
    purpose to `atexit.register()`, but its functions are called prior to
    threading shutdown instead of interpreter shutdown.

    For similarity to atexit, the registered functions are called in reverse.
        raise RuntimeError("can't register atexit after shutdown")

    call = functools.partial(func, *arg, **kwargs)

体会下这里的区别吧,并且在 concurrent.futures 还提供了两个模块级的函数 concurrent.futures.waitconcurrent.futures.as_completed 可以在细读下 doc

先到这~ 未完待续~


