这是用户在 2024-6-23 19:20 为 https://huggingface.co/blog/annotated-diffusion 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?

The Annotated Diffusion Model
批注扩散模型

Published June 7, 2022 2022 年 6 月 7 日发布
Update on GitHub
Open In Colab

In this blog post, we'll take a deeper look into Denoising Diffusion Probabilistic Models
降噪扩散概率模型
(also known as DDPMs, diffusion models, score-based generative models or simply autoencoders 自编码器) as researchers have been able to achieve remarkable results with them for (un)conditional image/audio/video generation. Popular examples (at the time of writing) include GLIDE and DALL-E 2 by OpenAI, Latent Diffusion 潜在扩散 by the University of Heidelberg and ImageGen by Google Brain.

We'll go over the original DDPM paper by (Ho et al., 2020), implementing it step-by-step in PyTorch, based on Phil Wang's implementation - which itself is based on the original TensorFlow implementation. Note that the idea of diffusion for generative modeling was actually already introduced in (Sohl-Dickstein et al., 2015). However, it took until (Song et al., 2019) (at Stanford University), and then (Ho et al., 2020) (at Google Brain) who independently improved the approach.
我们将按照 Phil Wang 的实现在 PyTorch 中逐步实施(Ho 等人,2020)所撰写的原始 DDPM 论文,该实现本身基于原始的 TensorFlow 实现。值得注意的是,扩散的概念在(Sohl-Dickstein 等人,2015)中实际上已经被引入到了生成建模中。然而,直到(Song 等人,2019)(在斯坦福大学),然后(Ho 等人,2020)(在 Google Brain)独立改进了这一方法。

Note that there are several perspectives on diffusion models. Here, we employ the discrete-time (latent variable model) perspective, but be sure to check out the other perspectives as well.
请注意,对于扩散模型存在几种观点。在这里,我们采用离散时间(潜变量模型)的观点,但一定要也查阅其他观点。

Alright, let's dive in!
好的,让我们深入研究!

from IPython.display import Image
Image(filename='assets/78_annotated-diffusion/ddpm_paper.png')

We'll install and import the required libraries first (assuming you have PyTorch installed).
首先,我们将安装并导入所需的库(假设您已安装了 PyTorch)。

!pip install -q -U einops datasets matplotlib tqdm

import math
from inspect import isfunction
from functools import partial

%matplotlib inline
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from einops import rearrange, reduce
from einops.layers.torch import Rearrange

import torch
from torch import nn, einsum
import torch.nn.functional as F

What is a diffusion model?
什么是扩散模型?

A (denoising) diffusion model isn't that complex if you compare it to other generative models such as Normalizing Flows, GANs or VAEs: they all convert noise from some simple distribution to a data sample. This is also the case here where a neural network learns to gradually denoise data starting from pure noise.
与其他生成模型(如正规化流、GAN 或 VAE)相比,(去噪)扩散模型并不复杂:它们都将某些简单分布的噪声转换为数据样本。这在这里也是适用的,其中一个神经网络逐渐学习从纯噪声开始去噪数据。

In a bit more detail for images, the set-up consists of 2 processes:
对于图像的详细情况,设立包括 2 个过程:

  • a fixed (or predefined) forward diffusion process qq of our choosing, that gradually adds Gaussian noise to an image, until you end up with pure noise
    一个固定的(或预定义的)前向扩散过程 qq ,我们选择逐渐向图像添加高斯噪声,直到最终得到纯噪声。
  • a learned reverse denoising diffusion process pθp_\theta, where a neural network is trained to gradually denoise an image starting from pure noise, until you end up with an actual image.
    一个学习的逆去噪扩散过程 pθp_\theta ,在此过程中,一个神经网络被训练逐渐去噪一幅图像,从纯噪声开始,直到最终得到实际图像。

Both the forward and reverse process indexed by tt happen for some number of finite time steps TT (the DDPM authors use T=1000T=1000). You start with t=0t=0 where you sample a real image x0\mathbf{x}_0 from your data distribution (let's say an image of a cat from ImageNet), and the forward process samples some noise from a Gaussian distribution at each time step tt, which is added to the image of the previous time step. Given a sufficiently large TT and a well behaved schedule for adding noise at each time step, you end up with what is called an isotropic Gaussian distribution at t=Tt=T via a gradual process.
所有前向和逆向过程的索引 tt 都将发生一定数量的有限时间步 TT (DDPM 作者使用 T=1000T=1000 )。从 t=0t=0 开始,其中您可以从数据分布中抽样一个真实图像 x0\mathbf{x}_0 (比如来自 ImageNet 的一张猫的图像),前向过程在每个时间步 tt 从高斯分布中抽样一些噪声,将其添加到前一个时间步的图像中。在每个时间步中添加噪声的行为,以及足够大的 TT 和良好地表现出噪声添加时间表,最终你将得到所谓的各向同性高斯分布 t=Tt=T ,并通过渐进过程。

In more mathematical form
更详细的数学形式

Let's write this down more formally, as ultimately we need a tractable loss function which our neural network needs to optimize.
让我们更正式地写下这一点,因为最终我们需要一个可以被我们的神经网络优化的可处理的损失函数。

Let q(x0)q(\mathbf{x}_0) be the real data distribution, say of "real images". We can sample from this distribution to get an image, x0q(x0)\mathbf{x}_0 \sim q(\mathbf{x}_0). We define the forward diffusion process q(xtxt1)q(\mathbf{x}_t | \mathbf{x}_{t-1}) which adds Gaussian noise at each time step tt, according to a known variance schedule 0<β1<β2<...<βT<10 < \beta_1 < \beta_2 < ... < \beta_T < 1 as
q(x0)q(\mathbf{x}_0) 为真实数据分布,比如"真实图像"的分布。我们可以从这个分布中抽样得到一张图像 x0q(x0)\mathbf{x}_0 \sim q(\mathbf{x}_0) 。我们定义前向扩散过程 q(xtxt1)q(\mathbf{x}_t | \mathbf{x}_{t-1}) ,它在每个时间步长 tt 加入高斯噪声,根据已知的方差计划 0<β1<β2<...<βT<10 < \beta_1 < \beta_2 < ... < \beta_T < 1 ,即
q(xtxt1)=N(xt;1βtxt1,βtI). q(\mathbf{x}_t | \mathbf{x}_{t-1}) = \mathcal{N}(\mathbf{x}_t; \sqrt{1 - \beta_t} \mathbf{x}_{t-1}, \beta_t \mathbf{I}).

Recall that a normal distribution (also called Gaussian distribution) is defined by 2 parameters: a mean μ\mu and a variance σ20\sigma^2 \geq 0. Basically, each new (slightly noisier) image at time step tt is drawn from a conditional Gaussian distribution with μt=1βtxt1\mathbf{\mu}_t = \sqrt{1 - \beta_t} \mathbf{x}_{t-1} and σt2=βt\sigma^2_t = \beta_t, which we can do by sampling ϵN(0,I)\mathbf{\epsilon} \sim \mathcal{N}(\mathbf{0}, \mathbf{I}) and then setting xt=1βtxt1+βtϵ\mathbf{x}_t = \sqrt{1 - \beta_t} \mathbf{x}_{t-1} + \sqrt{\beta_t} \mathbf{\epsilon}.
回想一下,正态分布(又称高斯分布)由 2 个参数定义:平均值 μ\mu 和方差 σ20\sigma^2 \geq 0 。基本上,每个新的(稍微有些嘈杂的)时间步的图像都是从具有 μt=1βtxt1\mathbf{\mu}_t = \sqrt{1 - \beta_t} \mathbf{x}_{t-1}σt2=βt\sigma^2_t = \beta_t 的条件高斯分布中抽取的,我们可以通过对 ϵN(0,I)\mathbf{\epsilon} \sim \mathcal{N}(\mathbf{0}, \mathbf{I}) 进行采样然后设置 xt=1βtxt1+βtϵ\mathbf{x}_t = \sqrt{1 - \beta_t} \mathbf{x}_{t-1} + \sqrt{\beta_t} \mathbf{\epsilon} 来实现。

Note that the βt\beta_t aren't constant at each time step tt (hence the subscript) --- in fact one defines a so-called "variance schedule", which can be linear, quadratic, cosine, etc. as we will see further (a bit like a learning rate schedule).
请注意, βt\beta_t 在每个时间步长 tt 上并非恒定(因此出现下标)--- 实际上,人们定义了所谓的“方差调度”,可以是线性的、二次的、余弦的等等,我们将在后面看到(有点像学习速率调度)。

So starting from x0\mathbf{x}_0, we end up with x1,...,xt,...,xT\mathbf{x}_1, ..., \mathbf{x}_t, ..., \mathbf{x}_T, where xT\mathbf{x}_T is pure Gaussian noise if we set the schedule appropriately.
因此,从 x0\mathbf{x}_0 开始,最终得到 x1,...,xt,...,xT\mathbf{x}_1, ..., \mathbf{x}_t, ..., \mathbf{x}_T ,其中如果我们适当设置日程安排, xT\mathbf{x}_T 就是纯高斯噪声。

Now, if we knew the conditional distribution p(xt1xt)p(\mathbf{x}_{t-1} | \mathbf{x}_t), then we could run the process in reverse: by sampling some random Gaussian noise xT\mathbf{x}_T, and then gradually "denoise" it so that we end up with a sample from the real distribution x0\mathbf{x}_0.
现在,如果我们知道条件分布 p(xt1xt)p(\mathbf{x}_{t-1} | \mathbf{x}_t) ,我们就可以反向运行该过程:通过对一些随机的高斯噪声 xT\mathbf{x}_T 进行抽样,然后逐渐“去噪”它,这样我们最终得到了来自真实分布 x0\mathbf{x}_0 的样本。

However, we don't know p(xt1xt)p(\mathbf{x}_{t-1} | \mathbf{x}_t). It's intractable since it requires knowing the distribution of all possible images in order to calculate this conditional probability. Hence, we're going to leverage a neural network to approximate (learn) this conditional probability distribution, let's call it pθ(xt1xt)p_\theta (\mathbf{x}_{t-1} | \mathbf{x}_t), with θ\theta being the parameters of the neural network, updated by gradient descent.
但是,我们不知道 p(xt1xt)p(\mathbf{x}_{t-1} | \mathbf{x}_t) 。这是难以解决的,因为它需要知道所有可能图像的分布,以便计算这种条件概率。因此,我们将利用神经网络来近似(学习)这个条件概率分布,让我们称之为 pθ(xt1xt)p_\theta (\mathbf{x}_{t-1} | \mathbf{x}_t) ,其中 θ\theta 是神经网络的参数,通过梯度下降进行更新。

Ok, so we need a neural network to represent a (conditional) probability distribution of the backward process. If we assume this reverse process is Gaussian as well, then recall that any Gaussian distribution is defined by 2 parameters:
好的,所以我们需要一个神经网络来表示反向过程的(条件)概率分布。如果我们假设这个逆过程也是高斯的,那么要记得任何高斯分布都由 2 个参数定义:

  • a mean parametrized by μθ\mu_\theta;
    μθ\mu_\theta 参数化的均值;
  • a variance parametrized by Σθ\Sigma_\theta;
    Σθ\Sigma_\theta 参数化的方差;

so we can parametrize the process as
所以我们可以将这个过程参数化为
pθ(xt1xt)=N(xt1;μθ(xt,t),Σθ(xt,t)) p_\theta (\mathbf{x}_{t-1} | \mathbf{x}_t) = \mathcal{N}(\mathbf{x}_{t-1}; \mu_\theta(\mathbf{x}_{t},t), \Sigma_\theta (\mathbf{x}_{t},t)) where the mean and variance are also conditioned on the noise level tt.
其中均值和方差也是在噪声水平 tt 条件下。

Hence, our neural network needs to learn/represent the mean and variance. However, the DDPM authors decided to keep the variance fixed, and let the neural network only learn (represent) the mean μθ\mu_\theta of this conditional probability distribution. From the paper:
因此,我们的神经网络需要学习/表示均值和方差。然而,DDPM 的作者决定保持方差不变,并让神经网络只学习(表示)这个条件概率分布的均值 μθ\mu_\theta 。从论文中:

First, we set Σθ(xt,t)=σt2I\Sigma_\theta ( \mathbf{x}_t, t) = \sigma^2_t \mathbf{I} to untrained time dependent constants. Experimentally, both σt2=βt\sigma^2_t = \beta_t and σt2=β~t\sigma^2_t = \tilde{\beta}_t (see paper) had similar results.
首先,我们将 Σθ(xt,t)=σt2I\Sigma_\theta ( \mathbf{x}_t, t) = \sigma^2_t \mathbf{I} 设置为未经训练的时间相关常数。实验上, σt2=βt\sigma^2_t = \beta_tσt2=β~t\sigma^2_t = \tilde{\beta}_t (见论文)都获得了类似的结果。

This was then later improved in the Improved diffusion models paper, where a neural network also learns the variance of this backwards process, besides the mean.
然后在“改进扩散模型”论文中对此进行了改进,神经网络除了学习这个反向过程的均值之外,还学习了方差。

So we continue, assuming that our neural network only needs to learn/represent the mean of this conditional probability distribution.
因此,我们继续假设我们的神经网络只需要学习/表示这个条件概率分布的均值。

Defining an objective function (by reparametrizing the mean)
定义一个目标函数(通过重新参数化均值)

To derive an objective function to learn the mean of the backward process, the authors observe that the combination of qq and pθp_\theta can be seen as a variational auto-encoder (VAE) (Kingma et al., 2013). Hence, the variational lower bound (also called ELBO) can be used to minimize the negative log-likelihood with respect to ground truth data sample x0\mathbf{x}_0 (we refer to the VAE paper for details regarding ELBO). It turns out that the ELBO for this process is a sum of losses at each time step tt, L=L0+L1+...+LTL = L_0 + L_1 + ... + L_T. By construction of the forward qq process and backward process, each term (except for L0L_0) of the loss is actually the KL divergence between 2 Gaussian distributions which can be written explicitly as an L2-loss with respect to the means!
为了推导一个客观函数来学习向后过程的均值,作者们观察到 qqpθp_\theta 的组合可以被视为变分自动编码器(VAE)(Kingma 等,2013 年)。因此,变分下限(也称为 ELBO)可用于最小化相对于地面真实数据样本 x0\mathbf{x}_0 的负对数似然(有关 ELBO 的详细信息,请参阅 VAE 论文)。原来,这一过程的 ELBO 是每个时间步的损失 ttL=L0+L1+...+LTL = L_0 + L_1 + ... + L_T 的总和。通过向前 qq 过程和向后过程的构造,除了 L0L_0 之外的每个术语的损失实际上是两个高斯分布之间的 KL 散度,可以明确地写成关于均值的 L2 损失!

A direct consequence of the constructed forward process qq, as shown by Sohl-Dickstein et al., is that we can sample xt\mathbf{x}_t at any arbitrary noise level conditioned on x0\mathbf{x}_0 (since sums of Gaussians is also Gaussian). This is very convenient: we don't need to apply qq repeatedly in order to sample xt\mathbf{x}_t. We have that
构建前向过程 qq 的直接后果,正如 Sohl-Dickstein 等人所示的,我们可以在 x0\mathbf{x}_0 的任意噪声水平下对 xt\mathbf{x}_t 进行采样(因为高斯分布的和也是高斯分布)。这非常方便:我们不需要反复应用 qq 来对 xt\mathbf{x}_t 进行采样。我们有以下关系:
q(xtx0)=N(xt;αˉtx0,(1αˉt)I)q(\mathbf{x}_t | \mathbf{x}_0) = \cal{N}(\mathbf{x}_t; \sqrt{\bar{\alpha}_t} \mathbf{x}_0, (1- \bar{\alpha}_t) \mathbf{I})

with αt:=1βt\alpha_t := 1 - \beta_t and αˉt:=Πs=1tαs\bar{\alpha}_t := \Pi_{s=1}^{t} \alpha_s. Let's refer to this equation as the "nice property". This means we can sample Gaussian noise and scale it appropriatly and add it to x0\mathbf{x}_0 to get xt\mathbf{x}_t directly. Note that the αˉt\bar{\alpha}_t are functions of the known βt\beta_t variance schedule and thus are also known and can be precomputed. This then allows us, during training, to optimize random terms of the loss function LL (or in other words, to randomly sample tt during training and optimize LtL_t).
使用 αt:=1βt\alpha_t := 1 - \beta_tαˉt:=Πs=1tαs\bar{\alpha}_t := \Pi_{s=1}^{t} \alpha_s 。我们将这个方程称为“好性质”。这意味着我们可以对高斯噪声进行采样,并适当地缩放,然后加到 x0\mathbf{x}_0 上直接得到 xt\mathbf{x}_t 。请注意, αˉt\bar{\alpha}_t 是已知 βt\beta_t 方差调度的函数,因此也是已知的并且可以预先计算。这样,我们在训练过程中可以优化损失函数 LL 的随机项(或者换句话说,在训练过程中随机采样 tt 并优化 LtL_t )。

Another beauty of this property, as shown in Ho et al. is that one can (after some math, for which we refer the reader to this excellent blog post) instead reparametrize the mean to make the neural network learn (predict) the added noise (via a network ϵθ(xt,t)\mathbf{\epsilon}_\theta(\mathbf{x}_t, t)) for noise level tt in the KL terms which constitute the losses. This means that our neural network becomes a noise predictor, rather than a (direct) mean predictor. The mean can be computed as follows:
这个性质的另一个美妙之处,正如何等人所示,就是一个人可以(经过一些数学运算,我们建议读者查看这篇优秀的博客文章),而不是重新参数化均值,使神经网络学习(预测)通过网络 ϵθ(xt,t)\mathbf{\epsilon}_\theta(\mathbf{x}_t, t) 添加的噪声(对于 KL 项中构成损失的噪声水平 tt )。这意味着我们的神经网络变成了一个噪声预测器,而不是(直接的)均值预测器。均值可以按以下方式计算:

μθ(xt,t)=1αt(xtβt1αˉtϵθ(xt,t)) \mathbf{\mu}_\theta(\mathbf{x}_t, t) = \frac{1}{\sqrt{\alpha_t}} \left( \mathbf{x}_t - \frac{\beta_t}{\sqrt{1- \bar{\alpha}_t}} \mathbf{\epsilon}_\theta(\mathbf{x}_t, t) \right)

The final objective function LtL_t then looks as follows (for a random time step tt given ϵN(0,I)\mathbf{\epsilon} \sim \mathcal{N}(\mathbf{0}, \mathbf{I}) ):
最终的目标函数 LtL_t 如下(对于给定 ϵN(0,I)\mathbf{\epsilon} \sim \mathcal{N}(\mathbf{0}, \mathbf{I}) 的随机时间步长 tt ):

ϵϵθ(xt,t)2=ϵϵθ(αˉtx0+(1αˉt)ϵ,t)2. \| \mathbf{\epsilon} - \mathbf{\epsilon}_\theta(\mathbf{x}_t, t) \|^2 = \| \mathbf{\epsilon} - \mathbf{\epsilon}_\theta( \sqrt{\bar{\alpha}_t} \mathbf{x}_0 + \sqrt{(1- \bar{\alpha}_t) } \mathbf{\epsilon}, t) \|^2.

Here, x0\mathbf{x}_0 is the initial (real, uncorrupted) image, and we see the direct noise level tt sample given by the fixed forward process. ϵ\mathbf{\epsilon} is the pure noise sampled at time step tt, and ϵθ(xt,t)\mathbf{\epsilon}_\theta (\mathbf{x}_t, t) is our neural network. The neural network is optimized using a simple mean squared error (MSE) between the true and the predicted Gaussian noise.
这里, x0\mathbf{x}_0 是初始(真实的,未损坏的)图像,我们可以看到由固定的前向过程给出的直接噪声级别 tt 样本。 ϵ\mathbf{\epsilon} 是在时间步长 tt 采样的纯噪声, ϵθ(xt,t)\mathbf{\epsilon}_\theta (\mathbf{x}_t, t) 是我们的神经网络。神经网络是通过实际高斯噪声和预测高斯噪声之间的简单均方误差(MSE)进行优化的。

The training algorithm now looks as follows:
现在,训练算法看起来如下所示:

In other words: 换句话说:

  • we take a random sample x0\mathbf{x}_0 from the real unknown and possibily complex data distribution q(x0)q(\mathbf{x}_0)
    我们从真实的未知可能复杂的数据分布 q(x0)q(\mathbf{x}_0) 中随机抽取一个样本 x0\mathbf{x}_0
  • we sample a noise level tt uniformally between 11 and TT (i.e., a random time step)
    我们在 11TT 之间均匀地抽取一个噪声水平 tt (即,随机时间步长)
  • we sample some noise from a Gaussian distribution and corrupt the input by this noise at level tt (using the nice property defined above)
    我们从高斯分布中抽取一些噪声,并以 tt 的水平(利用上述定义的良好属性)损坏输入
  • the neural network is trained to predict this noise based on the corrupted image xt\mathbf{x}_t (i.e. noise applied on x0\mathbf{x}_0 based on known schedule βt\beta_t)
    神经网络经过训练,基于受损图像 xt\mathbf{x}_t (即基于已知日程 βt\beta_tx0\mathbf{x}_0 上施加的噪声)来预测此噪声。

In reality, all of this is done on batches of data, as one uses stochastic gradient descent to optimize neural networks.
实际上,所有这些都是针对数据批次进行的,因为人们使用随机梯度下降来优化神经网络。

The neural network  神经网络

The neural network needs to take in a noised image at a particular time step and return the predicted noise. Note that the predicted noise is a tensor that has the same size/resolution as the input image. So technically, the network takes in and outputs tensors of the same shape. What type of neural network can we use for this?
神经网络需要在特定的时间步骤内接收一个带噪声的图像,并返回预测的噪声。需要注意的是,预测的噪声是一个与输入图像具有相同尺寸/分辨率的张量。因此在技术上,网络接收和输出具有相同形状的张量。我们可以使用什么类型的神经网络来实现这一点?

What is typically used here is very similar to that of an Autoencoder, which you may remember from typical "intro to deep learning" tutorials. Autoencoders have a so-called "bottleneck" layer in between the encoder and decoder. The encoder first encodes an image into a smaller hidden representation called the "bottleneck", and the decoder then decodes that hidden representation back into an actual image. This forces the network to only keep the most important information in the bottleneck layer.
通常在这里使用的方法与自编码器非常相似,你可能还记得典型的“深度学习入门”教程中提到过自编码器。自编码器在编码器和解码器之间有一个所谓的“瓶颈”层。编码器首先将图像编码为一个更小的隐藏表示,称为“瓶颈”,然后解码器将该隐藏表示解码回实际图像。这迫使网络只在瓶颈层中保留最重要的信息。

In terms of architecture, the DDPM authors went for a U-Net, introduced by (Ronneberger et al., 2015) (which, at the time, achieved state-of-the-art results for medical image segmentation). This network, like any autoencoder, consists of a bottleneck in the middle that makes sure the network learns only the most important information. Importantly, it introduced residual connections between the encoder and decoder, greatly improving gradient flow (inspired by ResNet in He et al., 2015).
在架构方面,DDPM 的作者选择了一种 U-Net,由 Ronneberger 等人于 2015 年提出(当时该网络在医学图像分割方面取得了最先进的结果)。这种网络,像任何自编码器一样,由中间的瓶颈组成,以确保网络只学习最重要的信息。重要的是,它在编码器和解码器之间引入了残差连接,极大地改善了梯度流(灵感来自 He 等人于 2015 年的 ResNet)。

As can be seen, a U-Net model first downsamples the input (i.e. makes the input smaller in terms of spatial resolution), after which upsampling is performed.
正如所见,U-Net 模型首先对输入进行下采样(即在空间分辨率方面使输入变小),然后执行上采样。

Below, we implement this network, step-by-step.
下面,我们逐步实现这个网络。

Network helpers  网络辅助函数

First, we define some helper functions and classes which will be used when implementing the neural network. Importantly, we define a Residual module, which simply adds the input to the output of a particular function (in other words, adds a residual connection to a particular function).
首先,我们定义一些辅助函数和类,这些将在实现神经网络时使用。重要的是,我们定义了一个 Residual 模块,它只是将输入添加到特定函数的输出(换句话说,将残差连接添加到特定函数)。

We also define aliases for the up- and downsampling operations.
我们还定义了上采样和下采样操作的别名。

def exists(x):
    return x is not None

def default(val, d):
    if exists(val):
        return val
    return d() if isfunction(d) else d


def num_to_groups(num, divisor):
    groups = num // divisor
    remainder = num % divisor
    arr = [divisor] * groups
    if remainder > 0:
        arr.append(remainder)
    return arr


class Residual(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def forward(self, x, *args, **kwargs):
        return self.fn(x, *args, **kwargs) + x


def Upsample(dim, dim_out=None):
    return nn.Sequential(
        nn.Upsample(scale_factor=2, mode="nearest"),
        nn.Conv2d(dim, default(dim_out, dim), 3, padding=1),
    )


def Downsample(dim, dim_out=None):
    # No More Strided Convolutions or Pooling
    return nn.Sequential(
        Rearrange("b c (h p1) (w p2) -> b (c p1 p2) h w", p1=2, p2=2),
        nn.Conv2d(dim * 4, default(dim_out, dim), 1),
    )

Position embeddings  位置嵌入

As the parameters of the neural network are shared across time (noise level), the authors employ sinusoidal position embeddings to encode tt, inspired by the Transformer (Vaswani et al., 2017). This makes the neural network "know" at which particular time step (noise level) it is operating, for every image in a batch.
由于神经网络的参数在时间上是共享的(噪声水平),作者借鉴了 Transformer(Vaswani 等人,2017)的思想,采用正弦位置嵌入来编码 tt 。这使得神经网络能“知道”它正在批处理中的每个图像的特定时间步(噪声水平)。

The SinusoidalPositionEmbeddings module takes a tensor of shape (batch_size, 1) as input (i.e. the noise levels of several noisy images in a batch), and turns this into a tensor of shape (batch_size, dim), with dim being the dimensionality of the position embeddings. This is then added to each residual block, as we will see further.
模块 SinusoidalPositionEmbeddings 以形状为 (batch_size, 1) 的张量作为输入(即批处理中多个嘈杂图像的噪声水平),并将其转换为形状为 (batch_size, dim) 的张量,其中 dim 是位置嵌入的维度。然后将其添加到每个残差块中,我们将在后续看到。

class SinusoidalPositionEmbeddings(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, time):
        device = time.device
        half_dim = self.dim // 2
        embeddings = math.log(10000) / (half_dim - 1)
        embeddings = torch.exp(torch.arange(half_dim, device=device) * -embeddings)
        embeddings = time[:, None] * embeddings[None, :]
        embeddings = torch.cat((embeddings.sin(), embeddings.cos()), dim=-1)
        return embeddings

ResNet block  残差网络块

Next, we define the core building block of the U-Net model. The DDPM authors employed a Wide ResNet block (Zagoruyko et al., 2016), but Phil Wang has replaced the standard convolutional layer by a "weight standardized" version, which works better in combination with group normalization (see (Kolesnikov et al., 2019) for details).
接下来,我们定义 U-Net 模型的核心构建模块。DDPM 的作者采用了 Wide ResNet 模块(Zagoruyko et al.,2016),但 Phil Wang 将标准卷积层替换为“权重标准化”版本,这在与组归一化结合使用时效果更好(详见 Kolesnikov et al.,2019)。

class WeightStandardizedConv2d(nn.Conv2d):
    """
    https://arxiv.org/abs/1903.10520
    weight standardization purportedly works synergistically with group normalization
    """

    def forward(self, x):
        eps = 1e-5 if x.dtype == torch.float32 else 1e-3

        weight = self.weight
        mean = reduce(weight, "o ... -> o 1 1 1", "mean")
        var = reduce(weight, "o ... -> o 1 1 1", partial(torch.var, unbiased=False))
        normalized_weight = (weight - mean) / (var + eps).rsqrt()

        return F.conv2d(
            x,
            normalized_weight,
            self.bias,
            self.stride,
            self.padding,
            self.dilation,
            self.groups,
        )


class Block(nn.Module):
    def __init__(self, dim, dim_out, groups=8):
        super().__init__()
        self.proj = WeightStandardizedConv2d(dim, dim_out, 3, padding=1)
        self.norm = nn.GroupNorm(groups, dim_out)
        self.act = nn.SiLU()

    def forward(self, x, scale_shift=None):
        x = self.proj(x)
        x = self.norm(x)

        if exists(scale_shift):
            scale, shift = scale_shift
            x = x * (scale + 1) + shift

        x = self.act(x)
        return x


class ResnetBlock(nn.Module):
    """https://arxiv.org/abs/1512.03385"""

    def __init__(self, dim, dim_out, *, time_emb_dim=None, groups=8):
        super().__init__()
        self.mlp = (
            nn.Sequential(nn.SiLU(), nn.Linear(time_emb_dim, dim_out * 2))
            if exists(time_emb_dim)
            else None
        )

        self.block1 = Block(dim, dim_out, groups=groups)
        self.block2 = Block(dim_out, dim_out, groups=groups)
        self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity()

    def forward(self, x, time_emb=None):
        scale_shift = None
        if exists(self.mlp) and exists(time_emb):
            time_emb = self.mlp(time_emb)
            time_emb = rearrange(time_emb, "b c -> b c 1 1")
            scale_shift = time_emb.chunk(2, dim=1)

        h = self.block1(x, scale_shift=scale_shift)
        h = self.block2(h)
        return h + self.res_conv(x)

Attention module  注意力模块

Next, we define the attention module, which the DDPM authors added in between the convolutional blocks. Attention is the building block of the famous Transformer architecture (Vaswani et al., 2017), which has shown great success in various domains of AI, from NLP and vision to protein folding. Phil Wang employs 2 variants of attention: one is regular multi-head self-attention (as used in the Transformer), the other one is a linear attention variant (Shen et al., 2018), whose time- and memory requirements scale linear in the sequence length, as opposed to quadratic for regular attention.
接下来,我们定义注意力模块,这是 DDPM 作者在卷积块之间增加的内容。注意力是著名 Transformer 架构的基本组成部分(Vaswani 等人,2017),在人工智能的各个领域,从自然语言处理和视觉到蛋白质折叠,都取得了巨大成功。Phil Wang 使用了两种注意力变体:一种是常规的多头自注意力(与 Transformer 中使用的相同),另一种是线性注意力变体(Shen 等人,2018),其时间和内存需求与序列长度呈线性比例,而不是常规注意力的二次比例。

For an extensive explanation of the attention mechanism, we refer the reader to Jay Allamar's wonderful blog post.
关于注意力机制的详尽解释,我们建议读者查阅 Jay Allamar 的精彩博客文章。

class Attention(nn.Module):
    def __init__(self, dim, heads=4, dim_head=32):
        super().__init__()
        self.scale = dim_head**-0.5
        self.heads = heads
        hidden_dim = dim_head * heads
        self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False)
        self.to_out = nn.Conv2d(hidden_dim, dim, 1)

    def forward(self, x):
        b, c, h, w = x.shape
        qkv = self.to_qkv(x).chunk(3, dim=1)
        q, k, v = map(
            lambda t: rearrange(t, "b (h c) x y -> b h c (x y)", h=self.heads), qkv
        )
        q = q * self.scale

        sim = einsum("b h d i, b h d j -> b h i j", q, k)
        sim = sim - sim.amax(dim=-1, keepdim=True).detach()
        attn = sim.softmax(dim=-1)

        out = einsum("b h i j, b h d j -> b h i d", attn, v)
        out = rearrange(out, "b h (x y) d -> b (h d) x y", x=h, y=w)
        return self.to_out(out)

class LinearAttention(nn.Module):
    def __init__(self, dim, heads=4, dim_head=32):
        super().__init__()
        self.scale = dim_head**-0.5
        self.heads = heads
        hidden_dim = dim_head * heads
        self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False)

        self.to_out = nn.Sequential(nn.Conv2d(hidden_dim, dim, 1), 
                                    nn.GroupNorm(1, dim))

    def forward(self, x):
        b, c, h, w = x.shape
        qkv = self.to_qkv(x).chunk(3, dim=1)
        q, k, v = map(
            lambda t: rearrange(t, "b (h c) x y -> b h c (x y)", h=self.heads), qkv
        )

        q = q.softmax(dim=-2)
        k = k.softmax(dim=-1)

        q = q * self.scale
        context = torch.einsum("b h d n, b h e n -> b h d e", k, v)

        out = torch.einsum("b h d e, b h d n -> b h e n", context, q)
        out = rearrange(out, "b h c (x y) -> b (h c) x y", h=self.heads, x=h, y=w)
        return self.to_out(out)

Group normalization  群组归一化

The DDPM authors interleave the convolutional/attention layers of the U-Net with group normalization (Wu et al., 2018). Below, we define a PreNorm class, which will be used to apply groupnorm before the attention layer, as we'll see further. Note that there's been a debate about whether to apply normalization before or after attention in Transformers.
DDPM 作者在 U-Net 的卷积/注意力层中插入了分组归一化(Wu 等人,2018 年)。 下面,我们定义一个 PreNorm 类,该类将用于在注意力层之前应用 groupnorm,我们将在后面看到。 注意,关于在 Transformer 中是否在注意力之前或之后应用归一化,存在一场辩论。

class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.fn = fn
        self.norm = nn.GroupNorm(1, dim)

    def forward(self, x):
        x = self.norm(x)
        return self.fn(x)

Conditional U-Net  条件 U-Net

Now that we've defined all building blocks (position embeddings, ResNet blocks, attention and group normalization), it's time to define the entire neural network. Recall that the job of the network ϵθ(xt,t)\mathbf{\epsilon}_\theta(\mathbf{x}_t, t) is to take in a batch of noisy images and their respective noise levels, and output the noise added to the input. More formally:
现在我们已经定义了所有的构建模块(位置嵌入,ResNet 模块,注意力和分组归一化),是时候定义整个神经网络了。回想一下,网络的工作 ϵθ(xt,t)\mathbf{\epsilon}_\theta(\mathbf{x}_t, t) 是接收一批嘈杂图像及其相应的噪声级别,并输出添加到输入的噪声。更正式地说:

  • the network takes a batch of noisy images of shape (batch_size, num_channels, height, width) and a batch of noise levels of shape (batch_size, 1) as input, and returns a tensor of shape (batch_size, num_channels, height, width)
    该网络以形状为 (batch_size, num_channels, height, width) 的一批嘈杂图像和形状为 (batch_size, 1) 的一批噪声级别作为输入,并返回形状为 (batch_size, num_channels, height, width) 的张量。

The network is built up as follows:
网络构建如下:

  • first, a convolutional layer is applied on the batch of noisy images, and position embeddings are computed for the noise levels
    首先,在一批嘈杂图像上应用卷积层,并为噪音级别计算位置嵌入
  • next, a sequence of downsampling stages are applied. Each downsampling stage consists of 2 ResNet blocks + groupnorm + attention + residual connection + a downsample operation
    接下来,应用一系列下采样阶段。每个下采样阶段包括 2 个 ResNet 块+groupnorm+注意力+残差连接+下采样操作
  • at the middle of the network, again ResNet blocks are applied, interleaved with attention
    在网络中间,再次应用 ResNet 块,交替使用注意力
  • next, a sequence of upsampling stages are applied. Each upsampling stage consists of 2 ResNet blocks + groupnorm + attention + residual connection + an upsample operation
    接下来,应用一系列上采样阶段。每个上采样阶段包括 2 个 ResNet 块+groupnorm+注意力+残差连接+上采样操作
  • finally, a ResNet block followed by a convolutional layer is applied.
    最后,应用一个 ResNet 块,后跟一个卷积层

Ultimately, neural networks stack up layers as if they were lego blocks (but it's important to understand how they work).
最终,神经网络将层叠成像乐高积木一样(但理解它们的工作原理是非常重要的)。

class Unet(nn.Module):
    def __init__(
        self,
        dim,
        init_dim=None,
        out_dim=None,
        dim_mults=(1, 2, 4, 8),
        channels=3,
        self_condition=False,
        resnet_block_groups=4,
    ):
        super().__init__()

        # determine dimensions
        self.channels = channels
        self.self_condition = self_condition
        input_channels = channels * (2 if self_condition else 1)

        init_dim = default(init_dim, dim)
        self.init_conv = nn.Conv2d(input_channels, init_dim, 1, padding=0) # changed to 1 and 0 from 7,3

        dims = [init_dim, *map(lambda m: dim * m, dim_mults)]
        in_out = list(zip(dims[:-1], dims[1:]))

        block_klass = partial(ResnetBlock, groups=resnet_block_groups)

        # time embeddings
        time_dim = dim * 4

        self.time_mlp = nn.Sequential(
            SinusoidalPositionEmbeddings(dim),
            nn.Linear(dim, time_dim),
            nn.GELU(),
            nn.Linear(time_dim, time_dim),
        )

        # layers
        self.downs = nn.ModuleList([])
        self.ups = nn.ModuleList([])
        num_resolutions = len(in_out)

        for ind, (dim_in, dim_out) in enumerate(in_out):
            is_last = ind >= (num_resolutions - 1)

            self.downs.append(
                nn.ModuleList(
                    [
                        block_klass(dim_in, dim_in, time_emb_dim=time_dim),
                        block_klass(dim_in, dim_in, time_emb_dim=time_dim),
                        Residual(PreNorm(dim_in, LinearAttention(dim_in))),
                        Downsample(dim_in, dim_out)
                        if not is_last
                        else nn.Conv2d(dim_in, dim_out, 3, padding=1),
                    ]
                )
            )

        mid_dim = dims[-1]
        self.mid_block1 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim)
        self.mid_attn = Residual(PreNorm(mid_dim, Attention(mid_dim)))
        self.mid_block2 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim)

        for ind, (dim_in, dim_out) in enumerate(reversed(in_out)):
            is_last = ind == (len(in_out) - 1)

            self.ups.append(
                nn.ModuleList(
                    [
                        block_klass(dim_out + dim_in, dim_out, time_emb_dim=time_dim),
                        block_klass(dim_out + dim_in, dim_out, time_emb_dim=time_dim),
                        Residual(PreNorm(dim_out, LinearAttention(dim_out))),
                        Upsample(dim_out, dim_in)
                        if not is_last
                        else nn.Conv2d(dim_out, dim_in, 3, padding=1),
                    ]
                )
            )

        self.out_dim = default(out_dim, channels)

        self.final_res_block = block_klass(dim * 2, dim, time_emb_dim=time_dim)
        self.final_conv = nn.Conv2d(dim, self.out_dim, 1)

    def forward(self, x, time, x_self_cond=None):
        if self.self_condition:
            x_self_cond = default(x_self_cond, lambda: torch.zeros_like(x))
            x = torch.cat((x_self_cond, x), dim=1)

        x = self.init_conv(x)
        r = x.clone()

        t = self.time_mlp(time)

        h = []

        for block1, block2, attn, downsample in self.downs:
            x = block1(x, t)
            h.append(x)

            x = block2(x, t)
            x = attn(x)
            h.append(x)

            x = downsample(x)

        x = self.mid_block1(x, t)
        x = self.mid_attn(x)
        x = self.mid_block2(x, t)

        for block1, block2, attn, upsample in self.ups:
            x = torch.cat((x, h.pop()), dim=1)
            x = block1(x, t)

            x = torch.cat((x, h.pop()), dim=1)
            x = block2(x, t)
            x = attn(x)

            x = upsample(x)

        x = torch.cat((x, r), dim=1)

        x = self.final_res_block(x, t)
        return self.final_conv(x)

Defining the forward diffusion process
定义正向扩散过程

The forward diffusion process gradually adds noise to an image from the real distribution, in a number of time steps TT. This happens according to a variance schedule. The original DDPM authors employed a linear schedule:
前向扩散过程逐渐向图像添加来自真实分布的噪音,在 TT 个时间步长内发生。这是根据方差计划进行的。原始的 DDPM 作者采用了线性计划:

We set the forward process variances to constants increasing linearly from β1=104\beta_1 = 10^{−4} to βT=0.02\beta_T = 0.02.
我们将前向过程的方差设置为从 β1=104\beta_1 = 10^{−4}βT=0.02\beta_T = 0.02 的线性增加的常数。

However, it was shown in (Nichol et al., 2021) that better results can be achieved when employing a cosine schedule.
然而,在(Nichol 等人,2021)中表明,采用余弦计划可以取得更好的结果。

Below, we define various schedules for the TT timesteps (we'll choose one later on).
下面,我们为 TT 个时间步长定义了各种计划(我们稍后会选择一个)。

def cosine_beta_schedule(timesteps, s=0.008):
    """
    cosine schedule as proposed in https://arxiv.org/abs/2102.09672
    """
    steps = timesteps + 1
    x = torch.linspace(0, timesteps, steps)
    alphas_cumprod = torch.cos(((x / timesteps) + s) / (1 + s) * torch.pi * 0.5) ** 2
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
    betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
    return torch.clip(betas, 0.0001, 0.9999)

def linear_beta_schedule(timesteps):
    beta_start = 0.0001
    beta_end = 0.02
    return torch.linspace(beta_start, beta_end, timesteps)

def quadratic_beta_schedule(timesteps):
    beta_start = 0.0001
    beta_end = 0.02
    return torch.linspace(beta_start**0.5, beta_end**0.5, timesteps) ** 2

def sigmoid_beta_schedule(timesteps):
    beta_start = 0.0001
    beta_end = 0.02
    betas = torch.linspace(-6, 6, timesteps)
    return torch.sigmoid(betas) * (beta_end - beta_start) + beta_start

To start with, let's use the linear schedule for T=300T=300 time steps and define the various variables from the βt\beta_t which we will need, such as the cumulative product of the variances αˉt\bar{\alpha}_t. Each of the variables below are just 1-dimensional tensors, storing values from tt to TT. Importantly, we also define an extract function, which will allow us to extract the appropriate tt index for a batch of indices.
首先,让我们使用线性时间步长 T=300T=300 ,并从中定义我们需要的各种变量,例如方差的累积乘积 αˉt\bar{\alpha}_t 。下面的每个变量都是存储值从 ttTT 的一维张量。重要的是,我们还定义了一个 extract 函数,它将允许我们提取一批索引的适当 tt 索引。

timesteps = 300

# define beta schedule
betas = linear_beta_schedule(timesteps=timesteps)

# define alphas 
alphas = 1. - betas
alphas_cumprod = torch.cumprod(alphas, axis=0)
alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
sqrt_recip_alphas = torch.sqrt(1.0 / alphas)

# calculations for diffusion q(x_t | x_{t-1}) and others
sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod)

# calculations for posterior q(x_{t-1} | x_t, x_0)
posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)

def extract(a, t, x_shape):
    batch_size = t.shape[0]
    out = a.gather(-1, t.cpu())
    return out.reshape(batch_size, *((1,) * (len(x_shape) - 1))).to(t.device)

We'll illustrate with a cats image how noise is added at each time step of the diffusion process.
我们将以一张猫的图片来说明在扩散过程的每个时间步骤中如何添加噪音。

from PIL import Image
import requests

url = 'http://images.cocodataset.org/val2017/000000039769.jpg'
image = Image.open(requests.get(url, stream=True).raw) # PIL image of shape HWC
image

Noise is added to PyTorch tensors, rather than Pillow Images. We'll first define image transformations that allow us to go from a PIL image to a PyTorch tensor (on which we can add the noise), and vice versa.
噪音被添加到 PyTorch 张量,而不是 Pillow 图像。我们将首先定义图像转换,使我们能够从 PIL 图像转换为 PyTorch 张量(在其上可以添加噪音),反之亦然。

These transformations are fairly simple: we first normalize images by dividing by 255255 (such that they are in the [0,1][0,1] range), and then make sure they are in the [1,1][-1, 1] range. From the DPPM paper:
这些转换非常简单:我们首先通过除以 255255 来标准化图像(使其位于 [0,1][0,1] 范围内),然后确保它们位于 [1,1][-1, 1] 范围内。来自 DPPM 论文:

We assume that image data consists of integers in {0,1,...,255}\{0, 1, ... , 255\} scaled linearly to [1,1][−1, 1]. This ensures that the neural network reverse process operates on consistently scaled inputs starting from the standard normal prior p(xT)p(\mathbf{x}_T ).
我们假设图像数据由在 {0,1,...,255}\{0, 1, ... , 255\}[1,1][−1, 1] 间线性缩放的整数组成。这确保了神经网络的反向过程操作在一致缩放的输入上进行,从标准正态先验 p(xT)p(\mathbf{x}_T ) 开始。

from torchvision.transforms import Compose, ToTensor, Lambda, ToPILImage, CenterCrop, Resize

image_size = 128
transform = Compose([
    Resize(image_size),
    CenterCrop(image_size),
    ToTensor(), # turn into torch Tensor of shape CHW, divide by 255
    Lambda(lambda t: (t * 2) - 1),
    
])

x_start = transform(image).unsqueeze(0)
x_start.shape
Output:
----------------------------------------------------------------------------------------------------
torch.Size([1, 3, 128, 128])

We also define the reverse transform, which takes in a PyTorch tensor containing values in [1,1][-1, 1] and turn them back into a PIL image:
我们还定义了反向转换,它接收一个包含值在 [1,1][-1, 1] 内的 PyTorch 张量,并将其转换回 PIL 图像

import numpy as np

reverse_transform = Compose([
     Lambda(lambda t: (t + 1) / 2),
     Lambda(lambda t: t.permute(1, 2, 0)), # CHW to HWC
     Lambda(lambda t: t * 255.),
     Lambda(lambda t: t.numpy().astype(np.uint8)),
     ToPILImage(),
])

Let's verify this: 让我们来验证一下:

reverse_transform(x_start.squeeze())

We can now define the forward diffusion process as in the paper:
我们现在可以按照文中的定义,将正向扩散过程定义为:

# forward diffusion (using the nice property)
def q_sample(x_start, t, noise=None):
    if noise is None:
        noise = torch.randn_like(x_start)

    sqrt_alphas_cumprod_t = extract(sqrt_alphas_cumprod, t, x_start.shape)
    sqrt_one_minus_alphas_cumprod_t = extract(
        sqrt_one_minus_alphas_cumprod, t, x_start.shape
    )

    return sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise

Let's test it on a particular time step:
让我们在特定的时间步长上进行测试:

def get_noisy_image(x_start, t):
  # add noise
  x_noisy = q_sample(x_start, t=t)

  # turn back into PIL image
  noisy_image = reverse_transform(x_noisy.squeeze())

  return noisy_image
# take time step
t = torch.tensor([40])

get_noisy_image(x_start, t)

Let's visualize this for various time steps:
让我们针对不同的时间步长进行可视化:

import matplotlib.pyplot as plt

# use seed for reproducability
torch.manual_seed(0)

# source: https://pytorch.org/vision/stable/auto_examples/plot_transforms.html#sphx-glr-auto-examples-plot-transforms-py
def plot(imgs, with_orig=False, row_title=None, **imshow_kwargs):
    if not isinstance(imgs[0], list):
        # Make a 2d grid even if there's just 1 row
        imgs = [imgs]

    num_rows = len(imgs)
    num_cols = len(imgs[0]) + with_orig
    fig, axs = plt.subplots(figsize=(200,200), nrows=num_rows, ncols=num_cols, squeeze=False)
    for row_idx, row in enumerate(imgs):
        row = [image] + row if with_orig else row
        for col_idx, img in enumerate(row):
            ax = axs[row_idx, col_idx]
            ax.imshow(np.asarray(img), **imshow_kwargs)
            ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])

    if with_orig:
        axs[0, 0].set(title='Original image')
        axs[0, 0].title.set_size(8)
    if row_title is not None:
        for row_idx in range(num_rows):
            axs[row_idx, 0].set(ylabel=row_title[row_idx])

    plt.tight_layout()
plot([get_noisy_image(x_start, torch.tensor([t])) for t in [0, 50, 100, 150, 199]])

This means that we can now define the loss function given the model as follows:
这意味着我们现在可以根据模型定义损失函数如下:

def p_losses(denoise_model, x_start, t, noise=None, loss_type="l1"):
    if noise is None:
        noise = torch.randn_like(x_start)

    x_noisy = q_sample(x_start=x_start, t=t, noise=noise)
    predicted_noise = denoise_model(x_noisy, t)

    if loss_type == 'l1':
        loss = F.l1_loss(noise, predicted_noise)
    elif loss_type == 'l2':
        loss = F.mse_loss(noise, predicted_noise)
    elif loss_type == "huber":
        loss = F.smooth_l1_loss(noise, predicted_noise)
    else:
        raise NotImplementedError()

    return loss

The denoise_model will be our U-Net defined above. We'll employ the Huber loss between the true and the predicted noise.
denoise_model 将是我们上面定义的 U-Net。我们将使用真实噪声和预测噪声之间的 Huber 损失。

Define a PyTorch Dataset + DataLoader
定义一个 PyTorch 数据集 + 数据加载器

Here we define a regular PyTorch Dataset. The dataset simply consists of images from a real dataset, like Fashion-MNIST, CIFAR-10 or ImageNet, scaled linearly to [1,1][−1, 1].
在这里我们定义一个常规的 PyTorch 数据集。数据集只是由来自真实数据集的图像组成,例如 Fashion-MNIST、CIFAR-10 或 ImageNet,并按比例缩放到 [1,1][−1, 1]

Each image is resized to the same size. Interesting to note is that images are also randomly horizontally flipped. From the paper:
每个图像都被调整为相同的大小。有趣的是,图像也被随机水平翻转。从论文中得知:

We used random horizontal flips during training for CIFAR10; we tried training both with and without flips, and found flips to improve sample quality slightly.
在 CIFAR10 的训练过程中,我们使用了随机水平翻转;我们尝试了使用翻转和不使用翻转进行训练,并发现翻转稍微改善了样本质量。

Here we use the 🤗 Datasets library to easily load the Fashion MNIST dataset from the hub. This dataset consists of images which already have the same resolution, namely 28x28.
在这里,我们使用🤗数据集库轻松加载了来自 hub 的时尚 MNIST 数据集。该数据集包含的图像已经具有相同的分辨率,即 28x28。

from datasets import load_dataset

# load dataset from the hub
dataset = load_dataset("fashion_mnist")
image_size = 28
channels = 1
batch_size = 128

Next, we define a function which we'll apply on-the-fly on the entire dataset. We use the with_transform functionality for that. The function just applies some basic image preprocessing: random horizontal flips, rescaling and finally make them have values in the [1,1][-1,1] range.
接下来,我们定义一个函数,我们将在整个数据集上即时应用。我们使用 with_transform 功能来实现这一点。该函数只是应用一些基本的图像预处理:随机水平翻转、重新缩放,最后确保它们的值在 [1,1][-1,1] 范围内。

from torchvision import transforms
from torch.utils.data import DataLoader

# define image transformations (e.g. using torchvision)
transform = Compose([
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Lambda(lambda t: (t * 2) - 1)
])

# define function
def transforms(examples):
   examples["pixel_values"] = [transform(image.convert("L")) for image in examples["image"]]
   del examples["image"]

   return examples

transformed_dataset = dataset.with_transform(transforms).remove_columns("label")

# create dataloader
dataloader = DataLoader(transformed_dataset["train"], batch_size=batch_size, shuffle=True)
batch = next(iter(dataloader))
print(batch.keys())
Output:
----------------------------------------------------------------------------------------------------
dict_keys(['pixel_values'])

Sampling  采样

As we'll sample from the model during training (in order to track progress), we define the code for that below. Sampling is summarized in the paper as Algorithm 2:
由于我们将在训练过程中从模型中进行采样(以便跟踪进展),我们在下面定义了用于此目的的代码。在论文中,采样总结为算法 2。

Generating new images from a diffusion model happens by reversing the diffusion process: we start from TT, where we sample pure noise from a Gaussian distribution, and then use our neural network to gradually denoise it (using the conditional probability it has learned), until we end up at time step t=0t = 0. As shown above, we can derive a slighly less denoised image xt1\mathbf{x}_{t-1 } by plugging in the reparametrization of the mean, using our noise predictor. Remember that the variance is known ahead of time.
根据扩散模型生成新图像是通过逆转扩散过程实现的:我们从 TT 开始,从高斯分布中取样纯噪声,然后使用我们的神经网络逐渐对其进行去噪(使用其学习到的条件概率),直到最终到达时间步 t=0t = 0 。如上所示,我们可以通过插入均值的重新参数化,使用我们的噪声预测器,推导出稍微不太去噪的图像 xt1\mathbf{x}_{t-1 } 。请记住方差是预先已知的。

Ideally, we end up with an image that looks like it came from the real data distribution.
理想情况下,我们得到的图像看起来就像来自真实数据分布一样。

The code below implements this.
下面的代码实现了这一点。

@torch.no_grad()
def p_sample(model, x, t, t_index):
    betas_t = extract(betas, t, x.shape)
    sqrt_one_minus_alphas_cumprod_t = extract(
        sqrt_one_minus_alphas_cumprod, t, x.shape
    )
    sqrt_recip_alphas_t = extract(sqrt_recip_alphas, t, x.shape)
    
    # Equation 11 in the paper
    # Use our model (noise predictor) to predict the mean
    model_mean = sqrt_recip_alphas_t * (
        x - betas_t * model(x, t) / sqrt_one_minus_alphas_cumprod_t
    )

    if t_index == 0:
        return model_mean
    else:
        posterior_variance_t = extract(posterior_variance, t, x.shape)
        noise = torch.randn_like(x)
        # Algorithm 2 line 4:
        return model_mean + torch.sqrt(posterior_variance_t) * noise 

# Algorithm 2 (including returning all images)
@torch.no_grad()
def p_sample_loop(model, shape):
    device = next(model.parameters()).device

    b = shape[0]
    # start from pure noise (for each example in the batch)
    img = torch.randn(shape, device=device)
    imgs = []

    for i in tqdm(reversed(range(0, timesteps)), desc='sampling loop time step', total=timesteps):
        img = p_sample(model, img, torch.full((b,), i, device=device, dtype=torch.long), i)
        imgs.append(img.cpu().numpy())
    return imgs

@torch.no_grad()
def sample(model, image_size, batch_size=16, channels=3):
    return p_sample_loop(model, shape=(batch_size, channels, image_size, image_size))

Note that the code above is a simplified version of the original implementation. We found our simplification (which is in line with Algorithm 2 in the paper) to work just as well as the original, more complex implementation, which employs clipping.
请注意,上述代码是原始实现的简化版本。我们发现我们的简化版本(与论文中的算法 2 一致)和原始、更复杂的实现效果一样好,原始实现使用了剪裁技术。

Train the model  训练模型

Next, we train the model in regular PyTorch fashion. We also define some logic to periodically save generated images, using the sample method defined above.
接下来,我们以常规的 PyTorch 方式对模型进行训练。我们还定义一些逻辑,定期保存生成的图像,使用上面定义的 sample 方法。

from pathlib import Path

def num_to_groups(num, divisor):
    groups = num // divisor
    remainder = num % divisor
    arr = [divisor] * groups
    if remainder > 0:
        arr.append(remainder)
    return arr

results_folder = Path("./results")
results_folder.mkdir(exist_ok = True)
save_and_sample_every = 1000

Below, we define the model, and move it to the GPU. We also define a standard optimizer (Adam).
在下面,我们定义模型,并将其移至 GPU。我们还定义了一个标准优化器(Adam)。

from torch.optim import Adam

device = "cuda" if torch.cuda.is_available() else "cpu"

model = Unet(
    dim=image_size,
    channels=channels,
    dim_mults=(1, 2, 4,)
)
model.to(device)

optimizer = Adam(model.parameters(), lr=1e-3)

Let's start training! 让我们开始训练吧!

from torchvision.utils import save_image

epochs = 6

for epoch in range(epochs):
    for step, batch in enumerate(dataloader):
      optimizer.zero_grad()

      batch_size = batch["pixel_values"].shape[0]
      batch = batch["pixel_values"].to(device)

      # Algorithm 1 line 3: sample t uniformally for every example in the batch
      t = torch.randint(0, timesteps, (batch_size,), device=device).long()

      loss = p_losses(model, batch, t, loss_type="huber")

      if step % 100 == 0:
        print("Loss:", loss.item())

      loss.backward()
      optimizer.step()

      # save generated images
      if step != 0 and step % save_and_sample_every == 0:
        milestone = step // save_and_sample_every
        batches = num_to_groups(4, batch_size)
        all_images_list = list(map(lambda n: sample(model, batch_size=n, channels=channels), batches))
        all_images = torch.cat(all_images_list, dim=0)
        all_images = (all_images + 1) * 0.5
        save_image(all_images, str(results_folder / f'sample-{milestone}.png'), nrow = 6)
Output:
----------------------------------------------------------------------------------------------------
Loss: 0.46477368474006653
Loss: 0.12143351882696152
Loss: 0.08106148988008499
Loss: 0.0801810547709465
Loss: 0.06122320517897606
Loss: 0.06310459971427917
Loss: 0.05681884288787842
Loss: 0.05729678273200989
Loss: 0.05497899278998375
Loss: 0.04439849033951759
Loss: 0.05415581166744232
Loss: 0.06020551547408104
Loss: 0.046830907464027405
Loss: 0.051029372960329056
Loss: 0.0478244312107563
Loss: 0.046767622232437134
Loss: 0.04305662214756012
Loss: 0.05216279625892639
Loss: 0.04748568311333656
Loss: 0.05107741802930832
Loss: 0.04588869959115982
Loss: 0.043014321476221085
Loss: 0.046371955424547195
Loss: 0.04952816292643547
Loss: 0.04472338408231735

Sampling (inference)  抽样(推断)

To sample from the model, we can just use our sample function defined above:
从模型中抽样,我们可以直接使用上面定义的抽样函数:

# sample 64 images
samples = sample(model, image_size=image_size, batch_size=64, channels=channels)

# show a random one
random_index = 5
plt.imshow(samples[-1][random_index].reshape(image_size, image_size, channels), cmap="gray")

Seems like the model is capable of generating a nice T-shirt! Keep in mind that the dataset we trained on is pretty low-resolution (28x28).
似乎这个模型能够生成一件不错的 T 恤衫!请记住,我们训练的数据集分辨率很低(28x28)。

We can also create a gif of the denoising process:
我们也可以制作去噪过程的 gif 动画:

import matplotlib.animation as animation

random_index = 53

fig = plt.figure()
ims = []
for i in range(timesteps):
    im = plt.imshow(samples[i][random_index].reshape(image_size, image_size, channels), cmap="gray", animated=True)
    ims.append([im])

animate = animation.ArtistAnimation(fig, ims, interval=50, blit=True, repeat_delay=1000)
animate.save('diffusion.gif')
plt.show()

Follow-up reads  跟进阅读

Note that the DDPM paper showed that diffusion models are a promising direction for (un)conditional image generation. This has since then (immensely) been improved, most notably for text-conditional image generation. Below, we list some important (but far from exhaustive) follow-up works:
请注意,DDPM 论文表明扩散模型是(无)条件图像生成的一个有前途的方向。自那时以来,这一领域已经得到了极大的改善,尤其是在文本条件图像生成方面。以下我们列举了一些重要的(但远非详尽)后续工作:

  • Improved Denoising Diffusion Probabilistic Models (Nichol et al., 2021): finds that learning the variance of the conditional distribution (besides the mean) helps in improving performance
    改进的去噪扩散概率模型(Nichol 等,2021):发现学习条件分布的方差(除了均值之外)有助于提高性能
  • Cascaded Diffusion Models for High Fidelity Image Generation (Ho et al., 2021): introduces cascaded diffusion, which comprises a pipeline of multiple diffusion models that generate images of increasing resolution for high-fidelity image synthesis
    分级扩散模型用于高保真度图像生成(Ho 等,2021):引入级联扩散,包括多个扩散模型的流水线,用于生成逐渐提高分辨率的图像,以实现高保真度图像合成
  • Diffusion Models Beat GANs on Image Synthesis (Dhariwal et al., 2021): show that diffusion models can achieve image sample quality superior to the current state-of-the-art generative models by improving the U-Net architecture, as well as introducing classifier guidance
    扩散模型在图像合成方面击败 GAN(Dhariwal 等,2021):通过改进 U-Net 架构,并引入分类器指导,展示了扩散模型可以实现优于当前最先进生成模型的图像样本质量
  • Classifier-Free Diffusion Guidance (Ho et al., 2021): shows that you don't need a classifier for guiding a diffusion model by jointly training a conditional and an unconditional diffusion model with a single neural network
    无分类器的扩散引导(Ho 等,2021):展示了可以通过联合训练单个神经网络的条件和无条件扩散模型,为扩散模型提供引导,而无需分类器
  • Hierarchical Text-Conditional Image Generation with CLIP Latents (DALL-E 2) (Ramesh et al., 2022): uses a prior to turn a text caption into a CLIP image embedding, after which a diffusion model decodes it into an image
    具有 CLIP 潜变量的层次文本条件图像生成(DALL-E 2)(Ramesh 等,2022):使用先验将文本说明转换为 CLIP 图像嵌入,然后通过扩散模型将其解码为图像
  • Photorealistic Text-to-Image Diffusion Models with Deep Language Understanding (ImageGen) (Saharia et al., 2022): shows that combining a large pre-trained language model (e.g. T5) with cascaded diffusion works well for text-to-image synthesis

Note that this list only includes important works until the time of writing, which is June 7th, 2022.

For now, it seems that the main (perhaps only) disadvantage of diffusion models is that they require multiple forward passes to generate an image (which is not the case for generative models like GANs). However, there's research going on that enables high-fidelity generation in as few as 10 denoising steps.