如何魔改 diffusers 中的 pipelines

整个 Stable Diffusion 及其 pipeline 长得就很适合 hack 的样子。不管是通过简单地调整采样过程中的一些参数,还是直接魔改 pipeline 内部甚至 UNet 内部的 Attention,都可以实现很多有趣的功能或采样生图结果。

本文主要介绍两种魔改 diffusers pipelines 的方式,一是通过注册 callback 函数,来在采样生图过程中执行某些操作,二是直接自己写 custom pipelines。

pipeline callbacks

可参考官方文档:Pipelines Callback

通过在 pipe 推理生图时传入自定义的回调函数,不用动底层代码,可以在每个时间步结束时动态地执行一些我们想要的动作,比如在特定的时间步修改特定的采样参数或张量。目前仅支持 callback_on_step_end 在单步结束时执行回调函数。我们通过两个例子来介绍如何通过 callback 函数来魔改 diffusers pipelines。

Dynamic classifier-free guidance

classifier-free guidance (cfg)用于通过 prompt 来引导图像生成的内容。在 diffusers 中,会同时用 CLIP 文本编码器同时编码 prompt 的 embeds 和空 prompt (空字符串或 negative prompt)的 embeds,然后拼接起来,一起通过交叉注意力与 UNet 交互。

通过 callback 函数,我们可以按照自己的需求动态控制 cfg,比如说,我想在特定的时间步之后停止使用 cfg,从而节省计算开销,并且性能不会有很大的损失。 callback 函数需要接收以下参数:

  • pipeline:通过 pipe 可以访问和编辑许多重要的采样参数,如 pipe.num_timestepspipe.guidance_scale 等。在本例中,我们就可以通过将 pipe._guidance_scale 来停用 cfg。

  • timestep 和 step_index:这两个参数可以让我们知道本次采样过程一共有多少时间步,以及当前我们位于哪一步。从而可以根据当前位于整个采样过程中的位置,来选择进行什么操作。在本例中,我们可以设置在整个采样过程 40% 及以后的位置,停用 cfg。

  • callback_kwargs:callback_kwargs 是一个字典,包含了在采样生图的过程中你可以编辑的张量。具体包含哪些张量,需要再调用 pipe 采样生图时通过 callback_on_step_end_tensor_inputs 参数传入。不同的 pipe 可能包含不同的可编辑张量,具体可以通过 pipe 的 _callback_tensor_inputs 属性查看。在本例中,我们需要在停用 cfg 之后调整 prompt_embeds 张量的批尺寸,丢掉空 prompt 部分。这是因为 sd pipe 是根据 _guidance_scale 的值来判断是否进行 cfg,所以我们将这个值改为零,就不会进行 cfg 了,所以需要将 prompt_embeds 不带 prompt 的部分丢掉,只保存带 prompt 的部分。

返回值方面,回调函数必须返回(修改好的) callback_kwargs。

最终,我们的 callback 函数是这样的:

def callback_dynamic_cfg(pipe, step_index, timestep, callback_kwargs, percent):
    if step_index == int(pipe.num_timesteps * percent):
        prompt_embeds = callback_kwargs['prompt_embeds']
        prompt_embeds = prompt_embeds.chunk(2)[-1]
        pipe._guidance_scale = 0.0
        callback_kwargs['prompt_embeds'] = prompt_embeds
    return callback_kwargs


pipeline = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
pipeline = pipeline.to("cuda")

prompt = "a photo of an astronaut riding a horse on mars"

generator = torch.Generator(device="cuda").manual_seed(1)
out = pipeline(

Display image after each generation step

在搭建生图 UI 时,一般需要支持用户在看到前几个时间步的结果不符合预期时,手动终止采样生图过程。这就需要两个功能,一是展示每一步的生图结果,二是支持在过程中终止本次生图。下面以展示中间步结果为例,介绍回调函数的使用。

我们知道,在 SD 中,去噪采样生图过程是发生在隐空间的,以 SDXL 为例,隐空间的特征图尺寸为 128 × 128 × 3 128\times 128\times 3 128×128×3 。我们需要将其转换到像素空间,才能看到这一步的实际生图结果。SDXL 还有点特殊,需要先将四通道的隐空间转换为 RGB 三通道,详情见 Explaining the SDXL latent space

def latents_to_rgb(latents):
    weights = (
        (60, -60, 25, -70),
        (60,  -5, 15, -50),
        (60,  10, -5, -35)

    weights_tensor = torch.t(torch.tensor(weights, dtype=latents.dtype).to(latents.device))
    biases_tensor = torch.tensor((150, 140, 130), dtype=latents.dtype).to(latents.device)
    rgb_tensor = torch.einsum("...lxy,lr -> ...rxy", latents, weights_tensor) + biases_tensor.unsqueeze(-1).unsqueeze(-1)
    image_array = rgb_tensor.clamp(0, 255)[0].byte().cpu().numpy()
    image_array = image_array.transpose(1, 2, 0)

    return Image.fromarray(image_array)
def decode_tensors(pipe, step, timestep, callback_kwargs):
    latents = callback_kwargs["latents"]
    image = latents_to_rgb(latents)

    return callback_kwargs


from diffusers import AutoPipelineForText2Image
import torch
from PIL import Image

pipeline = AutoPipelineForText2Image.from_pretrained(

image = pipeline(
    prompt = "A croissant shaped like a cute bear.",
    negative_prompt = "Deformed, ugly, bad anatomy",


Custom Pipelines

可参考官方文档:Custome Pipelinescontribute-pipeline

在 diffusers 中,我们可以很方便的自定义并加载自己的定制化 pipelines。在实现自己的自定义 pipelines 时,需要继承基类 DiffusionPipeline

加载 custom pipelines

1 从 diffusers 仓库中加载自定义的 pipeline

从 hf hub 加载自定义的 pipeline 非常简单,只需要将 model_id 传入 custom_pipeline 参数,然后就会加载该仓库中对应的 pipeline.py

from diffusers import DiffusionPipeline

pipeline = DiffusionPipeline.from_pretrained(
    "google/ddpm-cifar10-32", custom_pipeline="hf-internal-testing/diffusers-dummy-pipeline"

2 从本地文件加载自定义的 pipeline

如果想从本地文件加载 pipeline,需要将 pipeline.py 所在的文件目录(注意是目录名)传给 custom_pipeline 参数。

from diffusers import DiffusionPipeline

pipeline = DiffusionPipeline.from_pretrained(
    "google/ddpm-cifar10-32", custom_pipeline="path/to/dir"

3 加载官方收录的社区 custom pipelines

这里是合入 diffuses 官方仓库的一些社区的自定义 pipelines。我们只需要将对应文件名(不含 py 后缀,如 clip_guided_stable_diffusion)传给 custom_pipeline 参数。

由于自定义 pipelines 的通常比较复杂,所以我们也可以通过官方 pipeline 来加载模型,再将模型传入自定义 pipelines。

from diffusers import DiffusionPipeline
from transformers import CLIPFeatureExtractor, CLIPModel

clip_model_id = "laion/CLIP-ViT-B-32-laion2B-s34B-b79K"

feature_extractor = CLIPFeatureExtractor.from_pretrained(clip_model_id)
clip_model = CLIPModel.from_pretrained(clip_model_id)

pipeline = DiffusionPipeline.from_pretrained(
实现 custom pipelines

我们可以继承 DiffusionPipeline 基类并实现自己的 custom pipeline,这样所有人就都可以加载我们实现的 pipeline。一个 custom pipeline 的框架大致如下:

import torch
from diffusers import DiffusionPipeline

class MyPipeline(DiffusionPipeline):
    def __init__(self, unet, scheduler):

        self.register_modules(unet=unet, scheduler=scheduler)

    def __call__(self, batch_size: int = 1, num_inference_steps: int = 50):
        # Sample gaussian noise to begin loop
        image = torch.randn((batch_size, self.unet.in_channels, self.unet.sample_size, self.unet.sample_size))

        image = image.to(self.device)

        # set step values

        for t in self.progress_bar(self.scheduler.timesteps):
            # 1. predict noise model_output
            model_output = self.unet(image, t).sample

            # 2. predict previous mean of image x_t-1 and add variance depending on eta
            # eta corresponds to η in paper and should be between [0, 1]
            # do x_t -> x_t-1
            image = self.scheduler.step(model_output, t, image, eta).prev_sample

        image = (image / 2 + 0.5).clamp(0, 1)
        image = image.cpu().permute(0, 2, 3, 1).numpy()

        return image

确保 xxxPipeline 这个类的相关实现都在这一个文件,而且该文件中只包含 xxxPipeline 一个类。因为 pipeline 的识别加载是自动的。

接下来,我们以一个最简单的 one-step pipeline 为例,简单介绍自己实现 custom pipeline 的过程。在这个one-step pipeline 中,只会用到 UNet 一个模型,并将 timestep 固定为 1,只进行一次模型前向。

首先新建一个 one_step_unet.py 文件,然后在其中继承 DiffusionPipeline 基类并实现 UnetSchedulerOneForwardPipeline 类。

初始化:定义 __init__ 方法

在初始化方法中,我们简单地的 one-step pipeline 只需要接收 unet 和 scheduler 两个初始化参数,我们在初始化方法中将变量定义好。注意,为了使得 save_pretrained 方法能够将我们的模型完整地保存下来,需要通过 register_modules 方法将我们想要保存的 unet 和 scheduler 注册进来。

from diffusers import DiffusionPipeline
import torch

class UnetSchedulerOneForwardPipeline(DiffusionPipeline):
    def __init__(self, unet, scheduler):
        self.register_modules(unet=unet, scheduler=scheduler)

前向推理:定义 __call__ 方法

定义好初始化方法 __init__ 之后,再来实现 pipeline 推理生图的 __call__ 方法。在这里,我们可以任意发挥,任意组合,魔改扩散模型的采样过程,实现自己想要的功能。在我们的 one-step pipeline 中,这里要做的非常简单:采样一个噪声图,UNet 进行一次前向。

def __call__(self):
    image = torch.randn(1, self.unet.config.in_channels, self.unet.config.sample_size, self.unet.sanple_size)
    timestep = 1
    model_output = self.unet(image, timestep).sample
    scheduler_output = self.scheduler.step(model_output, timestep, image).prev_sample
    return scheduler_output

这就 ok 了,我们已经实现好了自定义的 one-step pipeline。


我们传入 unet 和 scheduler,实例化一个刚刚自定义好的 UnetSchedulerOneForwardPipeline,然后进行推理生图:

from diffusers import DDPMScheduler, UNet2DModel

scheduler = DDPMScheduler()
unet = UNet2DModel()

pipeline = UnetSchedulerOneForwardPipeline(unet=unet, scheduler=scheduler)

output = pipeline()

如果我们的 custom pipeline 结果如果跟某个已有的 pipeline 的预训练权重是完全一样的,我们还可以直接通过 from_pretrained 方法来加载它们的权重。比如说,我们的 UnetSchedulerOneForwardPipeline 就可以直接加载 google/ddpm-cifar10-32 的权重:

pipeline = UnetSchedulerOneForwardPipeline.from_pretrained("google/ddpm-cifar10-32", use_safetensors=True)

output = pipeline()
分享 custom pipelines

要共享自己的 custom pipeline 有三个方法:

  1. 将自己实现的 custom pipeline 推送到 hf hub 仓库,需要将文件名命名为 pipeline.py

  2. 像 diffusers 官方仓库提交 PR,合并之后可以我们的 pipeline 就会出现在这里,别人可以通过文件名来加载

  3. 共享出自己的源码文件,如 clip_guided_stable_diffusion.py,别人也可以导入我们的 pipeline

而作为使用者,使用 custom pipeline 的方式有两种:

# 方式 1
from diffusers import DiffusionPipeline
pipe = DiffusionPipeline.from_pretrained(
    "google/ddpm-cifar10-32", custom_pipeline="hf-internal-testing/diffusers-dummy-pipeline"

# 方式 2
from cus_pipe import CustomPipeline    # cus_pipe is copied from hf-internal-testing/diffusers-dummy-pipeline
pipe = CustomPipeline.from_pretrained("google/ddpm-cifar10-32")


if custom_pipeline is not None:
    pipeline_class = get_class_from_dynamic_module(
        custom_pipeline, module_file=CUSTOM_PIPELINE_FILE_NAME, cache_dir=custom_pipeline
elif cls != DiffusionPipeline:
    pipeline_class = cls
    diffusers_module = importlib.import_module(cls.__module__.split(".")[0])
    pipeline_class = getattr(diffusers_module, config_dict["_class_name"])


diffusers 的 api 设计非常友好,我们可以通过 pipeline callback 和 custom pipeline 等方式定制化实现自己想要的功能,其中前者不用动底层代码,简单优雅,后者则是功能强大,现在最新的 AIGC 相关的论文基本都是通过 custom diffusion 的方式公开自己的源码,非常方便。


