Skip to content

pipeline_registry

Classes

fastvideo.pipelines.pipeline_registry.PipelineType

Bases: str, Enum

Enumeration for different pipeline types.

Inherits from str to allow string comparison for backward compatibility.

Functions

fastvideo.pipelines.pipeline_registry.PipelineType.choices classmethod
choices() -> list[str]

Get all available choices as strings.

Source code in fastvideo/pipelines/pipeline_registry.py
@classmethod
def choices(cls) -> list[str]:
    """Get all available choices as strings."""
    return [pipeline_type.value for pipeline_type in cls]
fastvideo.pipelines.pipeline_registry.PipelineType.from_string classmethod
from_string(value: str) -> PipelineType

Convert string to PipelineType enum.

Source code in fastvideo/pipelines/pipeline_registry.py
@classmethod
def from_string(cls, value: str) -> "PipelineType":
    """Convert string to PipelineType enum."""
    try:
        return cls(value.lower())
    except ValueError:
        raise ValueError(
            f"Invalid pipeline type: {value}. Must be one of: {', '.join([t.value for t in cls])}"
        ) from None

Functions

fastvideo.pipelines.pipeline_registry.get_pipeline_registry

get_pipeline_registry(pipeline_type: PipelineType | str | None = None) -> _PipelineRegistry

Get a pipeline registry for the specified mode, pipeline type, and workload type.

Parameters:

Name Type Description Default
pipeline_type PipelineType | str | None

Pipeline type to load. If None and mode is provided, will be derived from mode.

None

Returns:

Type Description
_PipelineRegistry

A pipeline registry instance.

Source code in fastvideo/pipelines/pipeline_registry.py
def get_pipeline_registry(
        pipeline_type: PipelineType | str | None = None) -> _PipelineRegistry:
    """
    Get a pipeline registry for the specified mode, pipeline type, and workload type.

    Args:
        pipeline_type: Pipeline type to load. If None and mode is provided, will be derived from mode.

    Returns:
        A pipeline registry instance.
    """
    if isinstance(pipeline_type, str):
        pipeline_type = PipelineType.from_string(pipeline_type)

    pipeline_classes = import_pipeline_classes(pipeline_type)
    return _PipelineRegistry(pipeline_classes)

fastvideo.pipelines.pipeline_registry.import_pipeline_classes cached

import_pipeline_classes(pipeline_types: list[PipelineType] | PipelineType | None = None) -> dict[str, dict[str, dict[str, type[ComposedPipelineBase] | None]]]

Import pipeline classes based on the pipeline type and workload type.

Parameters:

Name Type Description Default
pipeline_types list[PipelineType] | PipelineType | None

The pipeline types to load (basic, preprocess, training). If None, loads all types.

None

Returns:

Type Description
dict[str, dict[str, dict[str, type[ComposedPipelineBase] | None]]]

A three-level nested dictionary:

dict[str, dict[str, dict[str, type[ComposedPipelineBase] | None]]]

{pipeline_type: {architecture_name: {pipeline_name: pipeline_cls}}}

dict[str, dict[str, dict[str, type[ComposedPipelineBase] | None]]]

e.g., {"basic": {"wan": {"WanPipeline": WanPipeline}}}

Source code in fastvideo/pipelines/pipeline_registry.py
@lru_cache
def import_pipeline_classes(
    pipeline_types: list[PipelineType] | PipelineType | None = None
) -> dict[str, dict[str, dict[str, type[ComposedPipelineBase] | None]]]:
    """
    Import pipeline classes based on the pipeline type and workload type.

    Args:
        pipeline_types: The pipeline types to load (basic, preprocess, training). 
                      If None, loads all types.

    Returns:
        A three-level nested dictionary:
        {pipeline_type: {architecture_name: {pipeline_name: pipeline_cls}}}
        e.g., {"basic": {"wan": {"WanPipeline": WanPipeline}}}
    """
    type_to_arch_to_pipeline_dict: dict[str,
                                        dict[str,
                                             dict[str,
                                                  type[ComposedPipelineBase]
                                                  | None]]] = {}
    package_name: str = "fastvideo.pipelines"

    # Determine which pipeline types to scan
    if isinstance(pipeline_types, list):
        pipeline_types_to_scan = [
            pipeline_type.value for pipeline_type in pipeline_types
        ]
    elif isinstance(pipeline_types, PipelineType):
        pipeline_types_to_scan = [pipeline_types.value]
    else:
        pipeline_types_to_scan = [pt.value for pt in PipelineType]

    logger.info("Loading pipelines for types: %s", pipeline_types_to_scan)

    for pipeline_type_str in pipeline_types_to_scan:
        arch_to_pipeline_dict: dict[str, dict[str, type[ComposedPipelineBase]
                                              | None]] = {}

        # Try to load from pipeline-type-specific directory first
        pipeline_type_package_name = f"{package_name}.{pipeline_type_str}"

        try:
            pipeline_type_package = importlib.import_module(
                pipeline_type_package_name)
            logger.debug("Successfully imported %s", pipeline_type_package_name)

            for _, arch, ispkg in pkgutil.iter_modules(
                    pipeline_type_package.__path__):
                pipeline_dict: dict[str, type[ComposedPipelineBase] | None] = {}

                arch_package_name = f"{pipeline_type_package_name}.{arch}"
                if ispkg:
                    arch_package = importlib.import_module(arch_package_name)
                    for _, module_name, ispkg in pkgutil.walk_packages(
                            arch_package.__path__, arch_package_name + "."):
                        if not ispkg:
                            pipeline_module = importlib.import_module(
                                module_name)
                            if hasattr(pipeline_module, "EntryClass"):
                                if isinstance(pipeline_module.EntryClass, list):
                                    for pipeline in pipeline_module.EntryClass:
                                        pipeline_name = pipeline.__name__
                                        assert (
                                            pipeline_name not in pipeline_dict
                                        ), f"Duplicated pipeline implementation for {pipeline_name} in {pipeline_type_str}.{arch_package_name}"
                                        pipeline_dict[pipeline_name] = pipeline
                                else:
                                    pipeline_name = pipeline_module.EntryClass.__name__
                                    assert (
                                        pipeline_name not in pipeline_dict
                                    ), f"Duplicated pipeline implementation for {pipeline_name} in {pipeline_type_str}.{arch_package_name}"
                                    pipeline_dict[
                                        pipeline_name] = pipeline_module.EntryClass

                arch_to_pipeline_dict[arch] = pipeline_dict

        except ImportError as e:
            raise ImportError(
                f"Could not import {pipeline_type_package_name} when importing pipeline classes: {e}"
            ) from None

        type_to_arch_to_pipeline_dict[pipeline_type_str] = arch_to_pipeline_dict

    # Log summary
    total_pipelines = sum(
        len(pipeline_dict)
        for arch_to_pipeline_dict in type_to_arch_to_pipeline_dict.values()
        for pipeline_dict in arch_to_pipeline_dict.values())
    logger.info("Loaded %d pipeline classes across %d types", total_pipelines,
                len(pipeline_types_to_scan))

    return type_to_arch_to_pipeline_dict