๐๏ธ Adding a New Diffusion Pipeline#
This guide explains how to implement a custom diffusion pipeline in FastVideo, leveraging the frameworkโs modular architecture for high-performance video generation.
Implementation Process Overview#
Port Required Modules - Identify and implement necessary model components
Create Directory Structure - Set up pipeline files and folders
Implement Pipeline Class - Build the pipeline using existing or custom stages
Register Your Pipeline - Make it discoverable by the framework
Configure Your Pipeline - (Coming soon)
Need help? Join our Slack community.
Step 1: Pipeline Modules#
Identifying Required Modules#
FastVideo uses the Hugging Face Diffusers format for model organization:
Examine the
model_index.json
in the HF model repository:
{
"_class_name": "WanImageToVideoPipeline",
"_diffusers_version": "0.33.0.dev0",
"image_encoder": ["transformers", "CLIPVisionModelWithProjection"],
"image_processor": ["transformers", "CLIPImageProcessor"],
"scheduler": ["diffusers", "UniPCMultistepScheduler"],
"text_encoder": ["transformers", "UMT5EncoderModel"],
"tokenizer": ["transformers", "T5TokenizerFast"],
"transformer": ["diffusers", "WanTransformer3DModel"],
"vae": ["diffusers", "AutoencoderKLWan"]
}
For each component:
Note the originating library (
transformers
ordiffusers
)Identify the class name
Check if itโs already available in FastVideo
Review config files in each componentโs directory for architecture details
Implementing Modules#
Place new modules in the appropriate directories:
Encoders:
fastvideo/v1/models/encoders/
VAEs:
fastvideo/v1/models/vaes/
Transformer models:
fastvideo/v1/models/dits/
Schedulers:
fastvideo/v1/models/schedulers/
Adapting Model Layers#
Layer Replacements#
Replace standard PyTorch layers with FastVideo optimized versions:
nn.LayerNorm โ fastvideo.v1.layers.layernorm.RMSNorm
Embedding layers โ fastvideo.v1.layers.vocab_parallel_embedding modules
Activation functions โ versions from fastvideo.v1.layers.activation
Distributed Linear Layers#
Use appropriate parallel layers for distribution:
# Output dimension parallelism
from fastvideo.v1.layers.linear import ColumnParallelLinear
self.q_proj = ColumnParallelLinear(
input_size=hidden_size,
output_size=head_size * num_heads,
bias=bias,
gather_output=False
)
# Fused QKV projection
from fastvideo.v1.layers.linear import QKVParallelLinear
self.qkv_proj = QKVParallelLinear(
hidden_size=hidden_size,
head_size=attention_head_dim,
total_num_heads=num_attention_heads,
bias=True
)
# Input dimension parallelism
from fastvideo.v1.layers.linear import RowParallelLinear
self.out_proj = RowParallelLinear(
input_size=head_size * num_heads,
output_size=hidden_size,
bias=bias,
input_is_parallel=True
)
Attention Layers#
Replace standard attention with FastVideoโs optimized attention:
# Local attention patterns
from fastvideo.v1.attention import LocalAttention
from fastvideo.v1.attention.backends.abstract import _Backend
self.attn = LocalAttention(
num_heads=num_heads,
head_size=head_dim,
dropout_rate=0.0,
softmax_scale=None,
causal=False,
supported_attention_backends=(_Backend.FLASH_ATTN, _Backend.TORCH_SDPA)
)
# Distributed attention for long sequences
from fastvideo.v1.attention import DistributedAttention
self.attn = DistributedAttention(
num_heads=num_heads,
head_size=head_dim,
dropout_rate=0.0,
softmax_scale=None,
causal=False,
supported_attention_backends=(_Backend.SLIDING_TILE_ATTN, _Backend.FLASH_ATTN, _Backend.TORCH_SDPA)
)
Define supported backend selection#
_supported_attention_backends = (_Backend.FLASH_ATTN, _Backend.TORCH_SDPA)
Registering Models#
Register implemented modules in the model registry:
# In fastvideo/v1/models/registry.py
_TEXT_TO_VIDEO_DIT_MODELS = {
"YourTransformerModel": ("dits", "yourmodule", "YourTransformerClass"),
}
_VAE_MODELS = {
"YourVAEModel": ("vaes", "yourvae", "YourVAEClass"),
}
Step 2: Directory Structure#
Create a new directory for your pipeline:
fastvideo/v1/pipelines/
โโโ your_pipeline/
โ โโโ __init__.py
โ โโโ your_pipeline.py
Step 3: Implement Pipeline Class#
Pipelines are composed of stages, each handling a specific part of the diffusion process:
InputValidationStage: Validates input parameters
Text Encoding Stages: Handle text encoding (CLIP/Llama/T5)
CLIPImageEncodingStage: Processes image inputs
TimestepPreparationStage: Prepares diffusion timesteps
LatentPreparationStage: Manages latent representations
ConditioningStage: Processes conditioning inputs
DenoisingStage: Performs denoising diffusion
DecodingStage: Converts latents to pixels
Creating Your Pipeline#
from fastvideo.v1.pipelines.composed_pipeline_base import ComposedPipelineBase
from fastvideo.v1.pipelines.stages import (
InputValidationStage, CLIPTextEncodingStage, TimestepPreparationStage,
LatentPreparationStage, DenoisingStage, DecodingStage
)
from fastvideo.v1.fastvideo_args import FastVideoArgs
from fastvideo.v1.pipelines.pipeline_batch_info import ForwardBatch
import torch
class MyCustomPipeline(ComposedPipelineBase):
"""Custom diffusion pipeline implementation."""
# Define required model components from model_index.json
_required_config_modules = [
"text_encoder", "tokenizer", "vae", "transformer", "scheduler"
]
@property
def required_config_modules(self) -> List[str]:
return self._required_config_modules
def initialize_pipeline(self, fastvideo_args: FastVideoArgs):
"""Initialize pipeline-specific components."""
pass
def create_pipeline_stages(self, fastvideo_args: FastVideoArgs):
"""Set up pipeline stages with proper dependency injection."""
self.add_stage(
stage_name="input_validation_stage",
stage=InputValidationStage()
)
self.add_stage(
stage_name="prompt_encoding_stage",
stage=CLIPTextEncodingStage(
text_encoder=self.get_module("text_encoder"),
tokenizer=self.get_module("tokenizer")
)
)
self.add_stage(
stage_name="timestep_preparation_stage",
stage=TimestepPreparationStage(
scheduler=self.get_module("scheduler")
)
)
self.add_stage(
stage_name="latent_preparation_stage",
stage=LatentPreparationStage(
scheduler=self.get_module("scheduler"),
vae=self.get_module("vae")
)
)
self.add_stage(
stage_name="denoising_stage",
stage=DenoisingStage(
transformer=self.get_module("transformer"),
scheduler=self.get_module("scheduler")
)
)
self.add_stage(
stage_name="decoding_stage",
stage=DecodingStage(
vae=self.get_module("vae")
)
)
# Register the pipeline class
EntryClass = MyCustomPipeline
Creating Custom Stages (Optional)#
If existing stages donโt meet your needs, create custom ones:
from fastvideo.v1.pipelines.stages.base import PipelineStage
class MyCustomStage(PipelineStage):
"""Custom processing stage for the pipeline."""
def __init__(self, custom_module, other_param=None):
super().__init__()
self.custom_module = custom_module
self.other_param = other_param
def forward(self, batch: ForwardBatch, fastvideo_args: FastVideoArgs) -> ForwardBatch:
# Access input data
input_data = batch.some_attribute
# Validate inputs
if input_data is None:
raise ValueError("Required input is missing")
# Process with your module
result = self.custom_module(input_data)
# Update batch with results
batch.some_output = result
return batch
Add your custom stage to the pipeline:
self.add_stage(
stage_name="my_custom_stage",
stage=MyCustomStage(
custom_module=self.get_module("custom_module"),
other_param="some_value"
)
)
Stage Design Principles#
Single Responsibility: Focus on one specific task
Functional Pattern: Receive and return a
ForwardBatch
objectDependency Injection: Pass dependencies through constructor
Input Validation: Validate inputs for clear error messages
Step 4: Register Your Pipeline#
Define EntryClass
at the end of your pipeline file:
# Single pipeline class
EntryClass = MyCustomPipeline
# Or multiple pipeline classes
EntryClass = [MyCustomPipeline, MyOtherPipeline]
The registry will automatically:
Scan all packages under
fastvideo/v1/pipelines/
Look for
EntryClass
variablesRegister pipelines using their class names as identifiers
Best Practices#
Reuse Existing Components: Leverage built-in stages and modules
Follow Module Organization: Place new modules in appropriate directories
Match Model Patterns: Follow existing code patterns and conventions