Skip to content

basic

Basic inference pipelines for fastvideo.

This package contains basic pipelines for video and image generation.

Modules

fastvideo.pipelines.basic.cosmos

Modules

fastvideo.pipelines.basic.cosmos.cosmos_pipeline

Cosmos video diffusion pipeline implementation.

This module contains an implementation of the Cosmos video diffusion pipeline using the modular pipeline architecture.

Classes
fastvideo.pipelines.basic.cosmos.cosmos_pipeline.Cosmos2VideoToWorldPipeline
Cosmos2VideoToWorldPipeline(model_path: str, fastvideo_args: FastVideoArgs | TrainingArgs, required_config_modules: list[str] | None = None, loaded_modules: dict[str, Module] | None = None)

Bases: ComposedPipelineBase

Source code in fastvideo/pipelines/composed_pipeline_base.py
def __init__(self,
             model_path: str,
             fastvideo_args: FastVideoArgs | TrainingArgs,
             required_config_modules: list[str] | None = None,
             loaded_modules: dict[str, torch.nn.Module] | None = None):
    """
    Initialize the pipeline. After __init__, the pipeline should be ready to
    use. The pipeline should be stateless and not hold any batch state.
    """
    self.fastvideo_args = fastvideo_args

    self.model_path: str = model_path
    self._stages: list[PipelineStage] = []
    self._stage_name_mapping: dict[str, PipelineStage] = {}

    if required_config_modules is not None:
        self._required_config_modules = required_config_modules

    if self._required_config_modules is None:
        raise NotImplementedError(
            "Subclass must set _required_config_modules")

    maybe_init_distributed_environment_and_model_parallel(
        fastvideo_args.tp_size, fastvideo_args.sp_size)

    # Torch profiler. Enabled and configured through env vars:
    # FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
    trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
    self.profiler_controller = get_or_create_profiler(trace_dir)
    self.profiler = self.profiler_controller.profiler

    self.local_rank = get_world_group().local_rank

    # Load modules directly in initialization
    logger.info("Loading pipeline modules...")
    with self.profiler_controller.region("profiler_region_model_loading"):
        self.modules = self.load_modules(fastvideo_args, loaded_modules)
Functions
fastvideo.pipelines.basic.cosmos.cosmos_pipeline.Cosmos2VideoToWorldPipeline.create_pipeline_stages
create_pipeline_stages(fastvideo_args: FastVideoArgs)

Set up pipeline stages with proper dependency injection.

Source code in fastvideo/pipelines/basic/cosmos/cosmos_pipeline.py
def create_pipeline_stages(self, fastvideo_args: FastVideoArgs):
    """Set up pipeline stages with proper dependency injection."""

    self.add_stage(stage_name="input_validation_stage",
                   stage=InputValidationStage())

    self.add_stage(stage_name="prompt_encoding_stage",
                   stage=TextEncodingStage(
                       text_encoders=[self.get_module("text_encoder")],
                       tokenizers=[self.get_module("tokenizer")],
                   ))

    self.add_stage(stage_name="conditioning_stage",
                   stage=ConditioningStage())

    self.add_stage(stage_name="timestep_preparation_stage",
                   stage=TimestepPreparationStage(
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="latent_preparation_stage",
                   stage=CosmosLatentPreparationStage(
                       scheduler=self.get_module("scheduler"),
                       transformer=self.get_module("transformer"),
                       vae=self.get_module("vae")))

    self.add_stage(stage_name="denoising_stage",
                   stage=CosmosDenoisingStage(
                       transformer=self.get_module("transformer"),
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="decoding_stage",
                   stage=DecodingStage(vae=self.get_module("vae")))
Functions

fastvideo.pipelines.basic.hunyuan

Modules

fastvideo.pipelines.basic.hunyuan.hunyuan_pipeline

Hunyuan video diffusion pipeline implementation.

This module contains an implementation of the Hunyuan video diffusion pipeline using the modular pipeline architecture.

Classes
fastvideo.pipelines.basic.hunyuan.hunyuan_pipeline.HunyuanVideoPipeline
HunyuanVideoPipeline(model_path: str, fastvideo_args: FastVideoArgs | TrainingArgs, required_config_modules: list[str] | None = None, loaded_modules: dict[str, Module] | None = None)

Bases: ComposedPipelineBase

Source code in fastvideo/pipelines/composed_pipeline_base.py
def __init__(self,
             model_path: str,
             fastvideo_args: FastVideoArgs | TrainingArgs,
             required_config_modules: list[str] | None = None,
             loaded_modules: dict[str, torch.nn.Module] | None = None):
    """
    Initialize the pipeline. After __init__, the pipeline should be ready to
    use. The pipeline should be stateless and not hold any batch state.
    """
    self.fastvideo_args = fastvideo_args

    self.model_path: str = model_path
    self._stages: list[PipelineStage] = []
    self._stage_name_mapping: dict[str, PipelineStage] = {}

    if required_config_modules is not None:
        self._required_config_modules = required_config_modules

    if self._required_config_modules is None:
        raise NotImplementedError(
            "Subclass must set _required_config_modules")

    maybe_init_distributed_environment_and_model_parallel(
        fastvideo_args.tp_size, fastvideo_args.sp_size)

    # Torch profiler. Enabled and configured through env vars:
    # FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
    trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
    self.profiler_controller = get_or_create_profiler(trace_dir)
    self.profiler = self.profiler_controller.profiler

    self.local_rank = get_world_group().local_rank

    # Load modules directly in initialization
    logger.info("Loading pipeline modules...")
    with self.profiler_controller.region("profiler_region_model_loading"):
        self.modules = self.load_modules(fastvideo_args, loaded_modules)
Functions
fastvideo.pipelines.basic.hunyuan.hunyuan_pipeline.HunyuanVideoPipeline.create_pipeline_stages
create_pipeline_stages(fastvideo_args: FastVideoArgs)

Set up pipeline stages with proper dependency injection.

Source code in fastvideo/pipelines/basic/hunyuan/hunyuan_pipeline.py
def create_pipeline_stages(self, fastvideo_args: FastVideoArgs):
    """Set up pipeline stages with proper dependency injection."""

    self.add_stage(stage_name="input_validation_stage",
                   stage=InputValidationStage())

    self.add_stage(stage_name="prompt_encoding_stage_primary",
                   stage=TextEncodingStage(
                       text_encoders=[
                           self.get_module("text_encoder"),
                           self.get_module("text_encoder_2")
                       ],
                       tokenizers=[
                           self.get_module("tokenizer"),
                           self.get_module("tokenizer_2")
                       ],
                   ))

    self.add_stage(stage_name="conditioning_stage",
                   stage=ConditioningStage())

    self.add_stage(stage_name="timestep_preparation_stage",
                   stage=TimestepPreparationStage(
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="latent_preparation_stage",
                   stage=LatentPreparationStage(
                       scheduler=self.get_module("scheduler"),
                       transformer=self.get_module("transformer")))

    self.add_stage(stage_name="denoising_stage",
                   stage=DenoisingStage(
                       transformer=self.get_module("transformer"),
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="decoding_stage",
                   stage=DecodingStage(vae=self.get_module("vae")))
Functions

fastvideo.pipelines.basic.stepvideo

Modules

fastvideo.pipelines.basic.stepvideo.stepvideo_pipeline

Hunyuan video diffusion pipeline implementation.

This module contains an implementation of the Hunyuan video diffusion pipeline using the modular pipeline architecture.

Classes
fastvideo.pipelines.basic.stepvideo.stepvideo_pipeline.StepVideoPipeline
StepVideoPipeline(*args, **kwargs)

Bases: LoRAPipeline, ComposedPipelineBase

Source code in fastvideo/pipelines/lora_pipeline.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)
    self.device = get_local_torch_device()
    self.exclude_lora_layers = self.modules[
        "transformer"].config.arch_config.exclude_lora_layers
    self.lora_target_modules = self.fastvideo_args.lora_target_modules
    self.lora_path = self.fastvideo_args.lora_path
    self.lora_nickname = self.fastvideo_args.lora_nickname
    self.training_mode = self.fastvideo_args.training_mode
    if self.training_mode and getattr(self.fastvideo_args, "lora_training",
                                      False):
        assert isinstance(self.fastvideo_args, TrainingArgs)
        if self.fastvideo_args.lora_alpha is None:
            self.fastvideo_args.lora_alpha = self.fastvideo_args.lora_rank
        self.lora_rank = self.fastvideo_args.lora_rank  # type: ignore
        self.lora_alpha = self.fastvideo_args.lora_alpha  # type: ignore
        logger.info("Using LoRA training with rank %d and alpha %d",
                    self.lora_rank, self.lora_alpha)
        if self.lora_target_modules is None:
            self.lora_target_modules = [
                "q_proj", "k_proj", "v_proj", "o_proj", "to_q", "to_k",
                "to_v", "to_out", "to_qkv"
            ]
        self.convert_to_lora_layers()
    # Inference
    elif not self.training_mode and self.lora_path is not None:
        self.convert_to_lora_layers()
        self.set_lora_adapter(
            self.lora_nickname,  # type: ignore
            self.lora_path)  # type: ignore
Functions
fastvideo.pipelines.basic.stepvideo.stepvideo_pipeline.StepVideoPipeline.create_pipeline_stages
create_pipeline_stages(fastvideo_args: FastVideoArgs)

Set up pipeline stages with proper dependency injection.

Source code in fastvideo/pipelines/basic/stepvideo/stepvideo_pipeline.py
def create_pipeline_stages(self, fastvideo_args: FastVideoArgs):
    """Set up pipeline stages with proper dependency injection."""

    self.add_stage(stage_name="input_validation_stage",
                   stage=InputValidationStage())

    self.add_stage(stage_name="prompt_encoding_stage",
                   stage=StepvideoPromptEncodingStage(
                       stepllm=self.get_module("text_encoder"),
                       clip=self.get_module("text_encoder_2"),
                   ))

    self.add_stage(stage_name="timestep_preparation_stage",
                   stage=TimestepPreparationStage(
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="latent_preparation_stage",
                   stage=LatentPreparationStage(
                       scheduler=self.get_module("scheduler"),
                       transformer=self.get_module("transformer"),
                   ))

    self.add_stage(stage_name="denoising_stage",
                   stage=DenoisingStage(
                       transformer=self.get_module("transformer"),
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="decoding_stage",
                   stage=DecodingStage(vae=self.get_module("vae")))
fastvideo.pipelines.basic.stepvideo.stepvideo_pipeline.StepVideoPipeline.initialize_pipeline
initialize_pipeline(fastvideo_args: FastVideoArgs)

Initialize the pipeline.

Source code in fastvideo/pipelines/basic/stepvideo/stepvideo_pipeline.py
def initialize_pipeline(self, fastvideo_args: FastVideoArgs):
    """
    Initialize the pipeline.
    """
    target_device = get_local_torch_device()
    llm_dir = os.path.join(self.model_path, "step_llm")
    clip_dir = os.path.join(self.model_path, "hunyuan_clip")
    text_enc = self.build_llm(llm_dir, target_device)
    clip_enc = self.build_clip(clip_dir, target_device)
    self.add_module("text_encoder", text_enc)
    self.add_module("text_encoder_2", clip_enc)
    lib_path = (
        os.path.join(
            fastvideo_args.model_path,
            'lib/liboptimus_ths-torch2.5-cu124.cpython-310-x86_64-linux-gnu.so'
        ) if os.path.isdir(fastvideo_args.model_path)  # local checkout
        else hf_hub_download(
            repo_id=fastvideo_args.model_path,
            filename=
            'lib/liboptimus_ths-torch2.5-cu124.cpython-310-x86_64-linux-gnu.so'
        ))
    torch.ops.load_library(lib_path)
fastvideo.pipelines.basic.stepvideo.stepvideo_pipeline.StepVideoPipeline.load_modules
load_modules(fastvideo_args: FastVideoArgs) -> dict[str, Any]

Load the modules from the config.

Source code in fastvideo/pipelines/basic/stepvideo/stepvideo_pipeline.py
def load_modules(self, fastvideo_args: FastVideoArgs) -> dict[str, Any]:
    """
    Load the modules from the config.
    """
    model_index = self._load_config(self.model_path)
    logger.info("Loading pipeline modules from config: %s", model_index)

    # remove keys that are not pipeline modules
    model_index.pop("_class_name")
    model_index.pop("_diffusers_version")

    # some sanity checks
    assert len(
        model_index
    ) > 1, "model_index.json must contain at least one pipeline module"

    required_modules = ["transformer", "scheduler", "vae"]
    for module_name in required_modules:
        if module_name not in model_index:
            raise ValueError(
                f"model_index.json must contain a {module_name} module")
    logger.info("Diffusers config passed sanity checks")

    # all the component models used by the pipeline
    modules = {}
    for module_name, (transformers_or_diffusers,
                      architecture) in model_index.items():
        component_model_path = os.path.join(self.model_path, module_name)
        module = PipelineComponentLoader.load_module(
            module_name=module_name,
            component_model_path=component_model_path,
            transformers_or_diffusers=transformers_or_diffusers,
            fastvideo_args=fastvideo_args,
        )
        logger.info("Loaded module %s from %s", module_name,
                    component_model_path)

        if module_name in modules:
            logger.warning("Overwriting module %s", module_name)
        modules[module_name] = module

    required_modules = self.required_config_modules
    # Check if all required modules were loaded
    for module_name in required_modules:
        if module_name not in modules or modules[module_name] is None:
            raise ValueError(
                f"Required module {module_name} was not loaded properly")

    return modules
Functions

fastvideo.pipelines.basic.wan

Modules

fastvideo.pipelines.basic.wan.wan_causal_dmd_pipeline

Wan causal DMD pipeline implementation.

This module wires the causal DMD denoising stage into the modular pipeline.

Classes
fastvideo.pipelines.basic.wan.wan_causal_dmd_pipeline.WanCausalDMDPipeline
WanCausalDMDPipeline(*args, **kwargs)

Bases: LoRAPipeline, ComposedPipelineBase

Source code in fastvideo/pipelines/lora_pipeline.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)
    self.device = get_local_torch_device()
    self.exclude_lora_layers = self.modules[
        "transformer"].config.arch_config.exclude_lora_layers
    self.lora_target_modules = self.fastvideo_args.lora_target_modules
    self.lora_path = self.fastvideo_args.lora_path
    self.lora_nickname = self.fastvideo_args.lora_nickname
    self.training_mode = self.fastvideo_args.training_mode
    if self.training_mode and getattr(self.fastvideo_args, "lora_training",
                                      False):
        assert isinstance(self.fastvideo_args, TrainingArgs)
        if self.fastvideo_args.lora_alpha is None:
            self.fastvideo_args.lora_alpha = self.fastvideo_args.lora_rank
        self.lora_rank = self.fastvideo_args.lora_rank  # type: ignore
        self.lora_alpha = self.fastvideo_args.lora_alpha  # type: ignore
        logger.info("Using LoRA training with rank %d and alpha %d",
                    self.lora_rank, self.lora_alpha)
        if self.lora_target_modules is None:
            self.lora_target_modules = [
                "q_proj", "k_proj", "v_proj", "o_proj", "to_q", "to_k",
                "to_v", "to_out", "to_qkv"
            ]
        self.convert_to_lora_layers()
    # Inference
    elif not self.training_mode and self.lora_path is not None:
        self.convert_to_lora_layers()
        self.set_lora_adapter(
            self.lora_nickname,  # type: ignore
            self.lora_path)  # type: ignore
Functions
fastvideo.pipelines.basic.wan.wan_causal_dmd_pipeline.WanCausalDMDPipeline.create_pipeline_stages
create_pipeline_stages(fastvideo_args: FastVideoArgs) -> None

Set up pipeline stages with proper dependency injection.

Source code in fastvideo/pipelines/basic/wan/wan_causal_dmd_pipeline.py
def create_pipeline_stages(self, fastvideo_args: FastVideoArgs) -> None:
    """Set up pipeline stages with proper dependency injection."""

    self.add_stage(stage_name="input_validation_stage",
                   stage=InputValidationStage())

    self.add_stage(stage_name="prompt_encoding_stage",
                   stage=TextEncodingStage(
                       text_encoders=[self.get_module("text_encoder")],
                       tokenizers=[self.get_module("tokenizer")],
                   ))

    self.add_stage(stage_name="conditioning_stage",
                   stage=ConditioningStage())

    self.add_stage(stage_name="latent_preparation_stage",
                   stage=LatentPreparationStage(
                       scheduler=self.get_module("scheduler"),
                       transformer=self.get_module("transformer", None)))

    self.add_stage(stage_name="denoising_stage",
                   stage=CausalDMDDenosingStage(
                       transformer=self.get_module("transformer"),
                       transformer_2=self.get_module("transformer_2", None),
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="decoding_stage",
                   stage=DecodingStage(vae=self.get_module("vae")))
Functions
fastvideo.pipelines.basic.wan.wan_dmd_pipeline

Wan video diffusion pipeline implementation.

This module contains an implementation of the Wan video diffusion pipeline using the modular pipeline architecture.

Classes
fastvideo.pipelines.basic.wan.wan_dmd_pipeline.WanDMDPipeline
WanDMDPipeline(*args, **kwargs)

Bases: LoRAPipeline, ComposedPipelineBase

Wan video diffusion pipeline with LoRA support.

Source code in fastvideo/pipelines/lora_pipeline.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)
    self.device = get_local_torch_device()
    self.exclude_lora_layers = self.modules[
        "transformer"].config.arch_config.exclude_lora_layers
    self.lora_target_modules = self.fastvideo_args.lora_target_modules
    self.lora_path = self.fastvideo_args.lora_path
    self.lora_nickname = self.fastvideo_args.lora_nickname
    self.training_mode = self.fastvideo_args.training_mode
    if self.training_mode and getattr(self.fastvideo_args, "lora_training",
                                      False):
        assert isinstance(self.fastvideo_args, TrainingArgs)
        if self.fastvideo_args.lora_alpha is None:
            self.fastvideo_args.lora_alpha = self.fastvideo_args.lora_rank
        self.lora_rank = self.fastvideo_args.lora_rank  # type: ignore
        self.lora_alpha = self.fastvideo_args.lora_alpha  # type: ignore
        logger.info("Using LoRA training with rank %d and alpha %d",
                    self.lora_rank, self.lora_alpha)
        if self.lora_target_modules is None:
            self.lora_target_modules = [
                "q_proj", "k_proj", "v_proj", "o_proj", "to_q", "to_k",
                "to_v", "to_out", "to_qkv"
            ]
        self.convert_to_lora_layers()
    # Inference
    elif not self.training_mode and self.lora_path is not None:
        self.convert_to_lora_layers()
        self.set_lora_adapter(
            self.lora_nickname,  # type: ignore
            self.lora_path)  # type: ignore
Functions
fastvideo.pipelines.basic.wan.wan_dmd_pipeline.WanDMDPipeline.create_pipeline_stages
create_pipeline_stages(fastvideo_args: FastVideoArgs) -> None

Set up pipeline stages with proper dependency injection.

Source code in fastvideo/pipelines/basic/wan/wan_dmd_pipeline.py
def create_pipeline_stages(self, fastvideo_args: FastVideoArgs) -> None:
    """Set up pipeline stages with proper dependency injection."""

    self.add_stage(stage_name="input_validation_stage",
                   stage=InputValidationStage())

    self.add_stage(stage_name="prompt_encoding_stage",
                   stage=TextEncodingStage(
                       text_encoders=[self.get_module("text_encoder")],
                       tokenizers=[self.get_module("tokenizer")],
                   ))

    self.add_stage(stage_name="conditioning_stage",
                   stage=ConditioningStage())

    self.add_stage(stage_name="timestep_preparation_stage",
                   stage=TimestepPreparationStage(
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="latent_preparation_stage",
                   stage=LatentPreparationStage(
                       scheduler=self.get_module("scheduler"),
                       transformer=self.get_module("transformer", None)))

    self.add_stage(stage_name="denoising_stage",
                   stage=DmdDenoisingStage(
                       transformer=self.get_module("transformer"),
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="decoding_stage",
                   stage=DecodingStage(vae=self.get_module("vae")))
Functions
fastvideo.pipelines.basic.wan.wan_i2v_dmd_pipeline

Wan video diffusion pipeline implementation.

This module contains an implementation of the Wan video diffusion pipeline using the modular pipeline architecture.

Classes
fastvideo.pipelines.basic.wan.wan_i2v_dmd_pipeline.WanImageToVideoDmdPipeline
WanImageToVideoDmdPipeline(*args, **kwargs)

Bases: LoRAPipeline, ComposedPipelineBase

Source code in fastvideo/pipelines/lora_pipeline.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)
    self.device = get_local_torch_device()
    self.exclude_lora_layers = self.modules[
        "transformer"].config.arch_config.exclude_lora_layers
    self.lora_target_modules = self.fastvideo_args.lora_target_modules
    self.lora_path = self.fastvideo_args.lora_path
    self.lora_nickname = self.fastvideo_args.lora_nickname
    self.training_mode = self.fastvideo_args.training_mode
    if self.training_mode and getattr(self.fastvideo_args, "lora_training",
                                      False):
        assert isinstance(self.fastvideo_args, TrainingArgs)
        if self.fastvideo_args.lora_alpha is None:
            self.fastvideo_args.lora_alpha = self.fastvideo_args.lora_rank
        self.lora_rank = self.fastvideo_args.lora_rank  # type: ignore
        self.lora_alpha = self.fastvideo_args.lora_alpha  # type: ignore
        logger.info("Using LoRA training with rank %d and alpha %d",
                    self.lora_rank, self.lora_alpha)
        if self.lora_target_modules is None:
            self.lora_target_modules = [
                "q_proj", "k_proj", "v_proj", "o_proj", "to_q", "to_k",
                "to_v", "to_out", "to_qkv"
            ]
        self.convert_to_lora_layers()
    # Inference
    elif not self.training_mode and self.lora_path is not None:
        self.convert_to_lora_layers()
        self.set_lora_adapter(
            self.lora_nickname,  # type: ignore
            self.lora_path)  # type: ignore
Functions
fastvideo.pipelines.basic.wan.wan_i2v_dmd_pipeline.WanImageToVideoDmdPipeline.create_pipeline_stages
create_pipeline_stages(fastvideo_args: FastVideoArgs)

Set up pipeline stages with proper dependency injection.

Source code in fastvideo/pipelines/basic/wan/wan_i2v_dmd_pipeline.py
def create_pipeline_stages(self, fastvideo_args: FastVideoArgs):
    """Set up pipeline stages with proper dependency injection."""

    self.add_stage(stage_name="input_validation_stage",
                   stage=InputValidationStage())

    self.add_stage(stage_name="prompt_encoding_stage",
                   stage=TextEncodingStage(
                       text_encoders=[self.get_module("text_encoder")],
                       tokenizers=[self.get_module("tokenizer")],
                   ))

    self.add_stage(stage_name="image_encoding_stage",
                   stage=ImageEncodingStage(
                       image_encoder=self.get_module("image_encoder"),
                       image_processor=self.get_module("image_processor"),
                   ))

    self.add_stage(stage_name="conditioning_stage",
                   stage=ConditioningStage())

    self.add_stage(stage_name="timestep_preparation_stage",
                   stage=TimestepPreparationStage(
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="latent_preparation_stage",
                   stage=LatentPreparationStage(
                       scheduler=self.get_module("scheduler"),
                       transformer=self.get_module("transformer")))

    self.add_stage(stage_name="image_latent_preparation_stage",
                   stage=ImageVAEEncodingStage(vae=self.get_module("vae")))

    self.add_stage(stage_name="denoising_stage",
                   stage=DmdDenoisingStage(
                       transformer=self.get_module("transformer"),
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="decoding_stage",
                   stage=DecodingStage(vae=self.get_module("vae")))
Functions
fastvideo.pipelines.basic.wan.wan_i2v_pipeline

Wan video diffusion pipeline implementation.

This module contains an implementation of the Wan video diffusion pipeline using the modular pipeline architecture.

Classes
fastvideo.pipelines.basic.wan.wan_i2v_pipeline.WanImageToVideoPipeline
WanImageToVideoPipeline(*args, **kwargs)

Bases: LoRAPipeline, ComposedPipelineBase

Source code in fastvideo/pipelines/lora_pipeline.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)
    self.device = get_local_torch_device()
    self.exclude_lora_layers = self.modules[
        "transformer"].config.arch_config.exclude_lora_layers
    self.lora_target_modules = self.fastvideo_args.lora_target_modules
    self.lora_path = self.fastvideo_args.lora_path
    self.lora_nickname = self.fastvideo_args.lora_nickname
    self.training_mode = self.fastvideo_args.training_mode
    if self.training_mode and getattr(self.fastvideo_args, "lora_training",
                                      False):
        assert isinstance(self.fastvideo_args, TrainingArgs)
        if self.fastvideo_args.lora_alpha is None:
            self.fastvideo_args.lora_alpha = self.fastvideo_args.lora_rank
        self.lora_rank = self.fastvideo_args.lora_rank  # type: ignore
        self.lora_alpha = self.fastvideo_args.lora_alpha  # type: ignore
        logger.info("Using LoRA training with rank %d and alpha %d",
                    self.lora_rank, self.lora_alpha)
        if self.lora_target_modules is None:
            self.lora_target_modules = [
                "q_proj", "k_proj", "v_proj", "o_proj", "to_q", "to_k",
                "to_v", "to_out", "to_qkv"
            ]
        self.convert_to_lora_layers()
    # Inference
    elif not self.training_mode and self.lora_path is not None:
        self.convert_to_lora_layers()
        self.set_lora_adapter(
            self.lora_nickname,  # type: ignore
            self.lora_path)  # type: ignore
Functions
fastvideo.pipelines.basic.wan.wan_i2v_pipeline.WanImageToVideoPipeline.create_pipeline_stages
create_pipeline_stages(fastvideo_args: FastVideoArgs)

Set up pipeline stages with proper dependency injection.

Source code in fastvideo/pipelines/basic/wan/wan_i2v_pipeline.py
def create_pipeline_stages(self, fastvideo_args: FastVideoArgs):
    """Set up pipeline stages with proper dependency injection."""

    self.add_stage(stage_name="input_validation_stage",
                   stage=InputValidationStage())

    self.add_stage(stage_name="prompt_encoding_stage",
                   stage=TextEncodingStage(
                       text_encoders=[self.get_module("text_encoder")],
                       tokenizers=[self.get_module("tokenizer")],
                   ))

    if (self.get_module("image_encoder") is not None
            and self.get_module("image_processor") is not None):
        self.add_stage(
            stage_name="image_encoding_stage",
            stage=ImageEncodingStage(
                image_encoder=self.get_module("image_encoder"),
                image_processor=self.get_module("image_processor"),
            ))

    self.add_stage(stage_name="conditioning_stage",
                   stage=ConditioningStage())

    self.add_stage(stage_name="timestep_preparation_stage",
                   stage=TimestepPreparationStage(
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="latent_preparation_stage",
                   stage=LatentPreparationStage(
                       scheduler=self.get_module("scheduler"),
                       transformer=self.get_module("transformer")))

    self.add_stage(stage_name="image_latent_preparation_stage",
                   stage=ImageVAEEncodingStage(vae=self.get_module("vae")))

    self.add_stage(stage_name="denoising_stage",
                   stage=DenoisingStage(
                       transformer=self.get_module("transformer"),
                       transformer_2=self.get_module("transformer_2"),
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="decoding_stage",
                   stage=DecodingStage(vae=self.get_module("vae")))
Functions
fastvideo.pipelines.basic.wan.wan_pipeline

Wan video diffusion pipeline implementation.

This module contains an implementation of the Wan video diffusion pipeline using the modular pipeline architecture.

Classes
fastvideo.pipelines.basic.wan.wan_pipeline.WanPipeline
WanPipeline(*args, **kwargs)

Bases: LoRAPipeline, ComposedPipelineBase

Wan video diffusion pipeline with LoRA support.

Source code in fastvideo/pipelines/lora_pipeline.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)
    self.device = get_local_torch_device()
    self.exclude_lora_layers = self.modules[
        "transformer"].config.arch_config.exclude_lora_layers
    self.lora_target_modules = self.fastvideo_args.lora_target_modules
    self.lora_path = self.fastvideo_args.lora_path
    self.lora_nickname = self.fastvideo_args.lora_nickname
    self.training_mode = self.fastvideo_args.training_mode
    if self.training_mode and getattr(self.fastvideo_args, "lora_training",
                                      False):
        assert isinstance(self.fastvideo_args, TrainingArgs)
        if self.fastvideo_args.lora_alpha is None:
            self.fastvideo_args.lora_alpha = self.fastvideo_args.lora_rank
        self.lora_rank = self.fastvideo_args.lora_rank  # type: ignore
        self.lora_alpha = self.fastvideo_args.lora_alpha  # type: ignore
        logger.info("Using LoRA training with rank %d and alpha %d",
                    self.lora_rank, self.lora_alpha)
        if self.lora_target_modules is None:
            self.lora_target_modules = [
                "q_proj", "k_proj", "v_proj", "o_proj", "to_q", "to_k",
                "to_v", "to_out", "to_qkv"
            ]
        self.convert_to_lora_layers()
    # Inference
    elif not self.training_mode and self.lora_path is not None:
        self.convert_to_lora_layers()
        self.set_lora_adapter(
            self.lora_nickname,  # type: ignore
            self.lora_path)  # type: ignore
Functions
fastvideo.pipelines.basic.wan.wan_pipeline.WanPipeline.create_pipeline_stages
create_pipeline_stages(fastvideo_args: FastVideoArgs) -> None

Set up pipeline stages with proper dependency injection.

Source code in fastvideo/pipelines/basic/wan/wan_pipeline.py
def create_pipeline_stages(self, fastvideo_args: FastVideoArgs) -> None:
    """Set up pipeline stages with proper dependency injection."""

    self.add_stage(stage_name="input_validation_stage",
                   stage=InputValidationStage())

    self.add_stage(stage_name="prompt_encoding_stage",
                   stage=TextEncodingStage(
                       text_encoders=[self.get_module("text_encoder")],
                       tokenizers=[self.get_module("tokenizer")],
                   ))

    self.add_stage(stage_name="conditioning_stage",
                   stage=ConditioningStage())

    self.add_stage(stage_name="timestep_preparation_stage",
                   stage=TimestepPreparationStage(
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="latent_preparation_stage",
                   stage=LatentPreparationStage(
                       scheduler=self.get_module("scheduler"),
                       transformer=self.get_module("transformer", None)))

    self.add_stage(stage_name="denoising_stage",
                   stage=DenoisingStage(
                       transformer=self.get_module("transformer"),
                       transformer_2=self.get_module("transformer_2", None),
                       scheduler=self.get_module("scheduler"),
                       vae=self.get_module("vae"),
                       pipeline=self))

    self.add_stage(stage_name="decoding_stage",
                   stage=DecodingStage(vae=self.get_module("vae"),
                                       pipeline=self))
Functions
fastvideo.pipelines.basic.wan.wan_v2v_pipeline

Wan video-to-video diffusion pipeline implementation.

This module contains an implementation of the Wan video-to-video diffusion pipeline using the modular pipeline architecture.

Classes
fastvideo.pipelines.basic.wan.wan_v2v_pipeline.WanVideoToVideoPipeline
WanVideoToVideoPipeline(*args, **kwargs)

Bases: LoRAPipeline, ComposedPipelineBase

Source code in fastvideo/pipelines/lora_pipeline.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)
    self.device = get_local_torch_device()
    self.exclude_lora_layers = self.modules[
        "transformer"].config.arch_config.exclude_lora_layers
    self.lora_target_modules = self.fastvideo_args.lora_target_modules
    self.lora_path = self.fastvideo_args.lora_path
    self.lora_nickname = self.fastvideo_args.lora_nickname
    self.training_mode = self.fastvideo_args.training_mode
    if self.training_mode and getattr(self.fastvideo_args, "lora_training",
                                      False):
        assert isinstance(self.fastvideo_args, TrainingArgs)
        if self.fastvideo_args.lora_alpha is None:
            self.fastvideo_args.lora_alpha = self.fastvideo_args.lora_rank
        self.lora_rank = self.fastvideo_args.lora_rank  # type: ignore
        self.lora_alpha = self.fastvideo_args.lora_alpha  # type: ignore
        logger.info("Using LoRA training with rank %d and alpha %d",
                    self.lora_rank, self.lora_alpha)
        if self.lora_target_modules is None:
            self.lora_target_modules = [
                "q_proj", "k_proj", "v_proj", "o_proj", "to_q", "to_k",
                "to_v", "to_out", "to_qkv"
            ]
        self.convert_to_lora_layers()
    # Inference
    elif not self.training_mode and self.lora_path is not None:
        self.convert_to_lora_layers()
        self.set_lora_adapter(
            self.lora_nickname,  # type: ignore
            self.lora_path)  # type: ignore
Functions
fastvideo.pipelines.basic.wan.wan_v2v_pipeline.WanVideoToVideoPipeline.create_pipeline_stages
create_pipeline_stages(fastvideo_args: FastVideoArgs)

Set up pipeline stages with proper dependency injection.

Source code in fastvideo/pipelines/basic/wan/wan_v2v_pipeline.py
def create_pipeline_stages(self, fastvideo_args: FastVideoArgs):
    """Set up pipeline stages with proper dependency injection."""

    self.add_stage(stage_name="input_validation_stage",
                   stage=InputValidationStage())

    self.add_stage(stage_name="prompt_encoding_stage",
                   stage=TextEncodingStage(
                       text_encoders=[self.get_module("text_encoder")],
                       tokenizers=[self.get_module("tokenizer")],
                   ))

    if (self.get_module("image_encoder") is not None
            and self.get_module("image_processor") is not None):
        self.add_stage(
            stage_name="ref_image_encoding_stage",
            stage=RefImageEncodingStage(
                image_encoder=self.get_module("image_encoder"),
                image_processor=self.get_module("image_processor"),
            ))

    self.add_stage(stage_name="conditioning_stage",
                   stage=ConditioningStage())

    self.add_stage(stage_name="timestep_preparation_stage",
                   stage=TimestepPreparationStage(
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="latent_preparation_stage",
                   stage=LatentPreparationStage(
                       scheduler=self.get_module("scheduler"),
                       transformer=self.get_module("transformer")))

    self.add_stage(stage_name="video_latent_preparation_stage",
                   stage=VideoVAEEncodingStage(vae=self.get_module("vae")))

    self.add_stage(stage_name="denoising_stage",
                   stage=DenoisingStage(
                       transformer=self.get_module("transformer"),
                       transformer_2=self.get_module("transformer_2"),
                       scheduler=self.get_module("scheduler")))

    self.add_stage(stage_name="decoding_stage",
                   stage=DecodingStage(vae=self.get_module("vae")))
Functions