如何魔改 diffusers 中的 pipelines

如何魔改 diffusers 中的 pipelines

整个 Stable Diffusion 及其 pipeline 长得就很适合 hack 的样子。不管是通过简单地调整采样过程中的一些参数,还是直接魔改 pipeline 内部甚至 UNet 内部的 Attention,都可以实现很多有趣的功能或采样生图结果。

本文主要介绍两种魔改 diffusers pipelines 的方式,一是通过注册 callback 函数,来在采样生图过程中执行某些操作,二是直接自己写 custom pipelines。

pipeline callbacks

可参考官方文档:Pipelines Callback

通过在 pipe 推理生图时传入自定义的回调函数,不用动底层代码,可以在每个时间步结束时动态地执行一些我们想要的动作,比如在特定的时间步修改特定的采样参数或张量。目前仅支持 callback_on_step_end 在单步结束时执行回调函数。我们通过两个例子来介绍如何通过 callback 函数来魔改 diffusers pipelines。

Dynamic classifier-free guidance

classifier-free guidance (cfg)用于通过 prompt 来引导图像生成的内容。在 diffusers 中,会同时用 CLIP 文本编码器同时编码 prompt 的 embeds 和空 prompt (空字符串或 negative prompt)的 embeds,然后拼接起来,一起通过交叉注意力与 UNet 交互。

通过 callback 函数,我们可以按照自己的需求动态控制 cfg,比如说,我想在特定的时间步之后停止使用 cfg,从而节省计算开销,并且性能不会有很大的损失。 callback 函数需要接收以下参数:

  • pipeline:通过 pipe 可以访问和编辑许多重要的采样参数,如 pipe.num_timestepspipe.guidance_scale 等。在本例中,我们就可以通过将 pipe._guidance_scale 来停用 cfg。

  • timestep 和 step_index:这两个参数可以让我们知道本次采样过程一共有多少时间步,以及当前我们位于哪一步。从而可以根据当前位于整个采样过程中的位置,来选择进行什么操作。在本例中,我们可以设置在整个采样过程 40% 及以后的位置,停用 cfg。

  • callback_kwargs:callback_kwargs 是一个字典,包含了在采样生图的过程中你可以编辑的张量。具体包含哪些张量,需要再调用 pipe 采样生图时通过 callback_on_step_end_tensor_inputs 参数传入。不同的 pipe 可能包含不同的可编辑张量,具体可以通过 pipe 的 _callback_tensor_inputs 属性查看。在本例中,我们需要在停用 cfg 之后调整 prompt_embeds 张量的批尺寸,丢掉空 prompt 部分。这是因为 sd pipe 是根据 _guidance_scale 的值来判断是否进行 cfg,所以我们将这个值改为零,就不会进行 cfg 了,所以需要将 prompt_embeds 不带 prompt 的部分丢掉,只保存带 prompt 的部分。

返回值方面,回调函数必须返回(修改好的) callback_kwargs。

最终,我们的 callback 函数是这样的:

def callback_dynamic_cfg(pipe, step_index, timestep, callback_kwargs, percent):
    if step_index == int(pipe.num_timesteps * percent):
        prompt_embeds = callback_kwargs['prompt_embeds']
        prompt_embeds = prompt_embeds.chunk(2)[-1]
        pipe._guidance_scale = 0.0
        callback_kwargs['prompt_embeds'] = prompt_embeds
    return callback_kwargs

然后在推理生图时传入该参数:

pipeline = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
pipeline = pipeline.to("cuda")

prompt = "a photo of an astronaut riding a horse on mars"

generator = torch.Generator(device="cuda").manual_seed(1)
out = pipeline(
    prompt,
    generator=generator,
    callback_on_step_end=callback_dynamic_cfg,
    callback_on_step_end_tensor_inputs=['prompt_embeds']
)

out.images[0].save("out_custom_cfg.png")
Display image after each generation step

在搭建生图 UI 时,一般需要支持用户在看到前几个时间步的结果不符合预期时,手动终止采样生图过程。这就需要两个功能,一是展示每一步的生图结果,二是支持在过程中终止本次生图。下面以展示中间步结果为例,介绍回调函数的使用。

我们知道,在 SD 中,去噪采样生图过程是发生在隐空间的,以 SDXL 为例,隐空间的特征图尺寸为 128 × 128 × 3 128\times 128\times 3 128×128×3 。我们需要将其转换到像素空间,才能看到这一步的实际生图结果。SDXL 还有点特殊,需要先将四通道的隐空间转换为 RGB 三通道,详情见 Explaining the SDXL latent space

def latents_to_rgb(latents):
    weights = (
        (60, -60, 25, -70),
        (60,  -5, 15, -50),
        (60,  10, -5, -35)
    )

    weights_tensor = torch.t(torch.tensor(weights, dtype=latents.dtype).to(latents.device))
    biases_tensor = torch.tensor((150, 140, 130), dtype=latents.dtype).to(latents.device)
    rgb_tensor = torch.einsum("...lxy,lr -> ...rxy", latents, weights_tensor) + biases_tensor.unsqueeze(-1).unsqueeze(-1)
    image_array = rgb_tensor.clamp(0, 255)[0].byte().cpu().numpy()
    image_array = image_array.transpose(1, 2, 0)

    return Image.fromarray(image_array)
  
def decode_tensors(pipe, step, timestep, callback_kwargs):
    latents = callback_kwargs["latents"]
    
    image = latents_to_rgb(latents)
    image.save(f"{step}.png")

    return callback_kwargs

然后再推理生图时传入该参数回调函数:

from diffusers import AutoPipelineForText2Image
import torch
from PIL import Image

pipeline = AutoPipelineForText2Image.from_pretrained(
    "stabilityai/stable-diffusion-xl-base-1.0",
    torch_dtype=torch.float16,
    variant="fp16",
    use_safetensors=True
).to("cuda")

image = pipeline(
    prompt = "A croissant shaped like a cute bear.",
    negative_prompt = "Deformed, ugly, bad anatomy",
    callback_on_step_end=decode_tensors,
    callback_on_step_end_tensor_inputs=["latents"],
).images[0]

在这里插入图片描述

Custom Pipelines

可参考官方文档:Custome Pipelinescontribute-pipeline

在 diffusers 中,我们可以很方便的自定义并加载自己的定制化 pipelines。在实现自己的自定义 pipelines 时,需要继承基类 DiffusionPipeline

加载 custom pipelines

1 从 diffusers 仓库中加载自定义的 pipeline

从 hf hub 加载自定义的 pipeline 非常简单,只需要将 model_id 传入 custom_pipeline 参数,然后就会加载该仓库中对应的 pipeline.py

from diffusers import DiffusionPipeline

pipeline = DiffusionPipeline.from_pretrained(
    "google/ddpm-cifar10-32", custom_pipeline="hf-internal-testing/diffusers-dummy-pipeline"
)

2 从本地文件加载自定义的 pipeline

如果想从本地文件加载 pipeline,需要将 pipeline.py 所在的文件目录(注意是目录名)传给 custom_pipeline 参数。

from diffusers import DiffusionPipeline

pipeline = DiffusionPipeline.from_pretrained(
    "google/ddpm-cifar10-32", custom_pipeline="path/to/dir"
)

3 加载官方收录的社区 custom pipelines

这里是合入 diffuses 官方仓库的一些社区的自定义 pipelines。我们只需要将对应文件名(不含 py 后缀,如 clip_guided_stable_diffusion)传给 custom_pipeline 参数。

由于自定义 pipelines 的通常比较复杂,所以我们也可以通过官方 pipeline 来加载模型,再将模型传入自定义 pipelines。

from diffusers import DiffusionPipeline
from transformers import CLIPFeatureExtractor, CLIPModel

clip_model_id = "laion/CLIP-ViT-B-32-laion2B-s34B-b79K"

feature_extractor = CLIPFeatureExtractor.from_pretrained(clip_model_id)
clip_model = CLIPModel.from_pretrained(clip_model_id)

pipeline = DiffusionPipeline.from_pretrained(
    "CompVis/stable-diffusion-v1-4",
    custom_pipeline="clip_guided_stable_diffusion",
    clip_model=clip_model,
    feature_extractor=feature_extractor,
)
实现 custom pipelines

我们可以继承 DiffusionPipeline 基类并实现自己的 custom pipeline,这样所有人就都可以加载我们实现的 pipeline。一个 custom pipeline 的框架大致如下:

import torch
from diffusers import DiffusionPipeline


class MyPipeline(DiffusionPipeline):
    def __init__(self, unet, scheduler):
        super().__init__()

        self.register_modules(unet=unet, scheduler=scheduler)

    @torch.no_grad()
    def __call__(self, batch_size: int = 1, num_inference_steps: int = 50):
        # Sample gaussian noise to begin loop
        image = torch.randn((batch_size, self.unet.in_channels, self.unet.sample_size, self.unet.sample_size))

        image = image.to(self.device)

        # set step values
        self.scheduler.set_timesteps(num_inference_steps)

        for t in self.progress_bar(self.scheduler.timesteps):
            # 1. predict noise model_output
            model_output = self.unet(image, t).sample

            # 2. predict previous mean of image x_t-1 and add variance depending on eta
            # eta corresponds to η in paper and should be between [0, 1]
            # do x_t -> x_t-1
            image = self.scheduler.step(model_output, t, image, eta).prev_sample

        image = (image / 2 + 0.5).clamp(0, 1)
        image = image.cpu().permute(0, 2, 3, 1).numpy()

        return image

确保 xxxPipeline 这个类的相关实现都在这一个文件,而且该文件中只包含 xxxPipeline 一个类。因为 pipeline 的识别加载是自动的。

接下来,我们以一个最简单的 one-step pipeline 为例,简单介绍自己实现 custom pipeline 的过程。在这个one-step pipeline 中,只会用到 UNet 一个模型,并将 timestep 固定为 1,只进行一次模型前向。

首先新建一个 one_step_unet.py 文件,然后在其中继承 DiffusionPipeline 基类并实现 UnetSchedulerOneForwardPipeline 类。

初始化:定义 __init__ 方法

在初始化方法中,我们简单地的 one-step pipeline 只需要接收 unet 和 scheduler 两个初始化参数,我们在初始化方法中将变量定义好。注意,为了使得 save_pretrained 方法能够将我们的模型完整地保存下来,需要通过 register_modules 方法将我们想要保存的 unet 和 scheduler 注册进来。

from diffusers import DiffusionPipeline
import torch

class UnetSchedulerOneForwardPipeline(DiffusionPipeline):
    def __init__(self, unet, scheduler):
        super().__init__()
        self.register_modules(unet=unet, scheduler=scheduler)

前向推理:定义 __call__ 方法

定义好初始化方法 __init__ 之后,再来实现 pipeline 推理生图的 __call__ 方法。在这里,我们可以任意发挥,任意组合,魔改扩散模型的采样过程,实现自己想要的功能。在我们的 one-step pipeline 中,这里要做的非常简单:采样一个噪声图,UNet 进行一次前向。

@torch.no_grad()
def __call__(self):
    image = torch.randn(1, self.unet.config.in_channels, self.unet.config.sample_size, self.unet.sanple_size)
    timestep = 1
    model_output = self.unet(image, timestep).sample
    scheduler_output = self.scheduler.step(model_output, timestep, image).prev_sample
    return scheduler_output

这就 ok 了,我们已经实现好了自定义的 one-step pipeline。

推理

我们传入 unet 和 scheduler,实例化一个刚刚自定义好的 UnetSchedulerOneForwardPipeline,然后进行推理生图:

from diffusers import DDPMScheduler, UNet2DModel

scheduler = DDPMScheduler()
unet = UNet2DModel()

pipeline = UnetSchedulerOneForwardPipeline(unet=unet, scheduler=scheduler)

output = pipeline()

如果我们的 custom pipeline 结果如果跟某个已有的 pipeline 的预训练权重是完全一样的,我们还可以直接通过 from_pretrained 方法来加载它们的权重。比如说,我们的 UnetSchedulerOneForwardPipeline 就可以直接加载 google/ddpm-cifar10-32 的权重:

pipeline = UnetSchedulerOneForwardPipeline.from_pretrained("google/ddpm-cifar10-32", use_safetensors=True)

output = pipeline()
分享 custom pipelines

要共享自己的 custom pipeline 有三个方法:

  1. 将自己实现的 custom pipeline 推送到 hf hub 仓库,需要将文件名命名为 pipeline.py

  2. 像 diffusers 官方仓库提交 PR,合并之后可以我们的 pipeline 就会出现在这里,别人可以通过文件名来加载

  3. 共享出自己的源码文件,如 clip_guided_stable_diffusion.py,别人也可以导入我们的 pipeline

而作为使用者,使用 custom pipeline 的方式有两种:

# 方式 1
from diffusers import DiffusionPipeline
pipe = DiffusionPipeline.from_pretrained(
    "google/ddpm-cifar10-32", custom_pipeline="hf-internal-testing/diffusers-dummy-pipeline"
)

# 方式 2
from cus_pipe import CustomPipeline    # cus_pipe is copied from hf-internal-testing/diffusers-dummy-pipeline
pipe = CustomPipeline.from_pretrained("google/ddpm-cifar10-32")

这两种使用方式应该是等价的,这可以从源码中看到:

if custom_pipeline is not None:
    pipeline_class = get_class_from_dynamic_module(
        custom_pipeline, module_file=CUSTOM_PIPELINE_FILE_NAME, cache_dir=custom_pipeline
    )
elif cls != DiffusionPipeline:
    pipeline_class = cls
else:
    diffusers_module = importlib.import_module(cls.__module__.split(".")[0])
    pipeline_class = getattr(diffusers_module, config_dict["_class_name"])

总结

diffusers 的 api 设计非常友好,我们可以通过 pipeline callback 和 custom pipeline 等方式定制化实现自己想要的功能,其中前者不用动底层代码,简单优雅,后者则是功能强大,现在最新的 AIGC 相关的论文基本都是通过 custom diffusion 的方式公开自己的源码,非常方便。

相关推荐

  1. AIGC笔记--Diffuser训练pipeline

    2024-04-08 11:02:03       16 阅读
  2. 不可靠不重传 tcp 新

    2024-04-08 11:02:03       20 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-04-08 11:02:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-04-08 11:02:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-04-08 11:02:03       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-04-08 11:02:03       20 阅读

热门阅读

  1. 自动化脚本代码appium+pytest+adb

    2024-04-08 11:02:03       21 阅读
  2. 【flask快速上手(二)】

    2024-04-08 11:02:03       20 阅读
  3. Windows DLL(动态链接库)的用处

    2024-04-08 11:02:03       19 阅读
  4. Spring Boot中整合Redis

    2024-04-08 11:02:03       17 阅读
  5. C#实践作业1(类、接口、委托)

    2024-04-08 11:02:03       12 阅读
  6. 动态规划算法 - LC354. 俄罗斯套娃信封问题

    2024-04-08 11:02:03       12 阅读