Skip to content

workflow

Modules

fastvideo.workflow.preprocess

Modules

fastvideo.workflow.preprocess.components
Classes
fastvideo.workflow.preprocess.components.ParquetDatasetSaver
ParquetDatasetSaver(flush_frequency: int, samples_per_file: int, schema: Schema, record_creator: Callable[..., list[dict[str, Any]]])

Component for saving and writing Parquet datasets using shared parquet_io.

Source code in fastvideo/workflow/preprocess/components.py
def __init__(self, flush_frequency: int, samples_per_file: int,
             schema: pa.Schema,
             record_creator: Callable[..., list[dict[str, Any]]]):
    self.flush_frequency = flush_frequency
    self.samples_per_file = samples_per_file
    self.schema = schema
    self.create_records_from_batch = record_creator
    self.num_processed_samples: int = 0
    self._writer: ParquetDatasetWriter | None = None
Functions
fastvideo.workflow.preprocess.components.ParquetDatasetSaver.clean_up
clean_up() -> None

Clean up all tables

Source code in fastvideo/workflow/preprocess/components.py
def clean_up(self) -> None:
    """Clean up all tables"""
    self.flush_tables(write_remainder=True)
    self._writer = None
    self.num_processed_samples = 0
    gc.collect()
fastvideo.workflow.preprocess.components.ParquetDatasetSaver.flush_tables
flush_tables(write_remainder: bool = False)

Flush buffered records to disk.

Parameters:

Name Type Description Default
output_dir

Directory where parquet files are written. Kept for API symmetry (writer already configured with this path).

required
write_remainder bool

If True, also write any leftover rows smaller than samples_per_file as a final small file. Useful for the last flush.

False
Source code in fastvideo/workflow/preprocess/components.py
def flush_tables(self, write_remainder: bool = False):
    """Flush buffered records to disk.

    Args:
        output_dir: Directory where parquet files are written. Kept for API
            symmetry (writer already configured with this path).
        write_remainder: If True, also write any leftover rows smaller than
            ``samples_per_file`` as a final small file. Useful for the last flush.
    """
    if self._writer is None:
        return
    _ = self._writer.flush(write_remainder=write_remainder)
    # Reset processed sample count modulo samples_per_file
    remainder = self.num_processed_samples % self.samples_per_file
    self.num_processed_samples = 0 if write_remainder else remainder
fastvideo.workflow.preprocess.components.ParquetDatasetSaver.save_and_write_parquet_batch
save_and_write_parquet_batch(batch: PreprocessBatch, output_dir: str, extra_features: dict[str, Any] | None = None) -> None

Save and write Parquet dataset batch

Parameters:

Name Type Description Default
batch PreprocessBatch

PreprocessBatch containing video and metadata information

required
output_dir str

Output directory

required
extra_features dict[str, Any] | None

Extra features

None

Returns:

Type Description
None

Number of processed samples

Source code in fastvideo/workflow/preprocess/components.py
def save_and_write_parquet_batch(
        self,
        batch: PreprocessBatch,
        output_dir: str,
        extra_features: dict[str, Any] | None = None) -> None:
    """
    Save and write Parquet dataset batch

    Args:
        batch: PreprocessBatch containing video and metadata information
        output_dir: Output directory
        extra_features: Extra features

    Returns:
        Number of processed samples
    """
    assert isinstance(batch.latents, torch.Tensor)
    assert isinstance(batch.prompt_embeds, list)
    assert isinstance(batch.prompt_attention_mask, list)

    # Process non-padded embeddings (if needed)
    if batch.prompt_attention_mask is not None:
        batch.prompt_embeds = self._process_non_padded_embeddings(
            batch.prompt_embeds[0], batch.prompt_attention_mask[0])
    else:
        raise ValueError("prompt_attention_mask is None")

    # Prepare batch data for Parquet dataset
    batch_data: list[dict[str, Any]] = []

    for key in dataclasses.fields(batch):
        value = getattr(batch, key.name)
        if isinstance(value, list):
            for idx in range(len(value)):
                if isinstance(value[idx], torch.Tensor):
                    value[idx] = value[idx].cpu().numpy()
        elif isinstance(value, torch.Tensor):
            value = value.cpu().numpy()
            setattr(batch, key.name, value)

    # Create record for Parquet dataset
    records = self.create_records_from_batch(batch)
    batch_data.extend(records)

    if batch_data:
        self.num_processed_samples += len(batch_data)
        table = records_to_table(batch_data, self.schema)
        if self._writer is None:
            os.makedirs(output_dir, exist_ok=True)
            self._writer = ParquetDatasetWriter(
                out_dir=output_dir, samples_per_file=self.samples_per_file)
        self._writer.append_table(table)
        logger.debug("Collected batch with %s samples", len(table))

    # If flush is needed
    if self.num_processed_samples >= self.flush_frequency:
        self.flush_tables()
fastvideo.workflow.preprocess.components.PreprocessingDataValidator
PreprocessingDataValidator(max_height: int = 1024, max_width: int = 1024, max_h_div_w_ratio: float = 17 / 16, min_h_div_w_ratio: float = 8 / 16, num_frames: int = 16, train_fps: int = 24, speed_factor: float = 1.0, video_length_tolerance_range: float = 5.0, drop_short_ratio: float = 0.0, hw_aspect_threshold: float = 1.5)
Source code in fastvideo/workflow/preprocess/components.py
def __init__(self,
             max_height: int = 1024,
             max_width: int = 1024,
             max_h_div_w_ratio: float = 17 / 16,
             min_h_div_w_ratio: float = 8 / 16,
             num_frames: int = 16,
             train_fps: int = 24,
             speed_factor: float = 1.0,
             video_length_tolerance_range: float = 5.0,
             drop_short_ratio: float = 0.0,
             hw_aspect_threshold: float = 1.5):
    self.max_height = max_height
    self.max_width = max_width
    self.max_h_div_w_ratio = max_h_div_w_ratio
    self.min_h_div_w_ratio = min_h_div_w_ratio
    self.num_frames = num_frames
    self.train_fps = train_fps
    self.speed_factor = speed_factor
    self.video_length_tolerance_range = video_length_tolerance_range
    self.drop_short_ratio = drop_short_ratio
    self.hw_aspect_threshold = hw_aspect_threshold
    self.validators: dict[str, Callable[[dict[str, Any]], bool]] = {}
    self.filter_counts: dict[str, int] = {}

    self.num_items_before_filtering = 0
    self.num_items_after_filtering = 0

    self.register_validators()
Functions
fastvideo.workflow.preprocess.components.PreprocessingDataValidator.__call__
__call__(batch: dict[str, Any]) -> bool

Validate whether the preprocessing data batch is valid.

Source code in fastvideo/workflow/preprocess/components.py
def __call__(self, batch: dict[str, Any]) -> bool:
    """
    Validate whether the preprocessing data batch is valid.
    """
    self.num_items_before_filtering += 1

    for name, validator in self.validators.items():
        if not validator(batch):
            self.filter_counts[name] += 1
            return False

    self.num_items_after_filtering += 1
    return True
Functions

fastvideo.workflow.workflow_base

Classes

fastvideo.workflow.workflow_base.WorkflowBase
WorkflowBase(fastvideo_args: FastVideoArgs)

Bases: ABC

Abstract base class for defining video processing workflows.

A workflow serves as the top-level orchestrator that coordinates multiple pipelines and components to accomplish a specific video processing task. The workflow pattern provides several key benefits:

  1. Separation of Concerns: Workflows separate high-level orchestration logic from low-level processing implementations in pipelines.

  2. Modularity: Different workflows can be created for different execution modes (preprocess, inference, etc.) while sharing common pipeline components.

  3. Configuration Management: Workflows manage the configuration and initialization of multiple related pipelines and components in a centralized manner.

  4. Environment Setup: Workflows handle system-level setup and resource allocation before pipeline execution begins.

  5. Lifecycle Management: Workflows control the complete lifecycle from initialization through execution to cleanup.

The workflow acts as a factory and coordinator, creating the appropriate pipelines based on configuration, setting up the execution environment, and orchestrating the overall processing flow.

Initialize the workflow with configuration arguments.

Parameters:

Name Type Description Default
fastvideo_args FastVideoArgs

Configuration object containing all parameters needed for workflow and pipeline setup.

required
Source code in fastvideo/workflow/workflow_base.py
def __init__(self, fastvideo_args: FastVideoArgs):
    """
    Initialize the workflow with configuration arguments.

    Args:
        fastvideo_args: Configuration object containing all parameters
                      needed for workflow and pipeline setup.
    """
    self.fastvideo_args = fastvideo_args

    # TODO: pipeline_config should be: dict[str, PipelineConfig]
    # pipeline_type should be included in the PipelineConfig
    # pipeline_config[pipeline_name] = (pipeline_type, fastvideo_args)
    self._pipeline_configs: dict[str, tuple[PipelineType,
                                            FastVideoArgs]] = {}
    self._pipelines: dict[str, ComposedPipelineBase] = {}
    self._components: dict[str, Any] = {}
    self.register_pipelines()
    self.register_components()

    self.prepare_system_environment()
    self.load_pipelines()
Functions
fastvideo.workflow.workflow_base.WorkflowBase.add_component
add_component(component_name: str, component: Any) -> None

Register a component instance with the workflow.

Components are auxiliary objects that may be shared across pipelines or used for workflow-level functionality (e.g., databases, caches, external services).

Parameters:

Name Type Description Default
component_name str

Unique identifier for the component.

required
component Any

The component instance to register.

required
Source code in fastvideo/workflow/workflow_base.py
def add_component(self, component_name: str, component: Any) -> None:
    """
    Register a component instance with the workflow.

    Components are auxiliary objects that may be shared across pipelines
    or used for workflow-level functionality (e.g., databases, caches,
    external services).

    Args:
        component_name: Unique identifier for the component.
        component: The component instance to register.
    """
    self._components[component_name] = component
    setattr(self, component_name, component)
fastvideo.workflow.workflow_base.WorkflowBase.add_pipeline_config
add_pipeline_config(pipeline_name: str, pipeline_config: tuple[PipelineType, FastVideoArgs]) -> None

Register a pipeline configuration for later instantiation.

Parameters:

Name Type Description Default
pipeline_name str

Unique identifier for the pipeline.

required
pipeline_config tuple[PipelineType, FastVideoArgs]

Tuple containing the pipeline type and configuration arguments.

required
Source code in fastvideo/workflow/workflow_base.py
def add_pipeline_config(
        self, pipeline_name: str,
        pipeline_config: tuple[PipelineType, FastVideoArgs]) -> None:
    """
    Register a pipeline configuration for later instantiation.

    Args:
        pipeline_name: Unique identifier for the pipeline.
        pipeline_config: Tuple containing the pipeline type and
                       configuration arguments.
    """
    self._pipeline_configs[pipeline_name] = pipeline_config
fastvideo.workflow.workflow_base.WorkflowBase.get_component
get_component(component_name: str) -> Any

Retrieve a registered component by name.

Parameters:

Name Type Description Default
component_name str

The name of the component to retrieve.

required

Returns:

Type Description
Any

The component instance.

Source code in fastvideo/workflow/workflow_base.py
def get_component(self, component_name: str) -> Any:
    """
    Retrieve a registered component by name.

    Args:
        component_name: The name of the component to retrieve.

    Returns:
        The component instance.
    """
    return self._components[component_name]
fastvideo.workflow.workflow_base.WorkflowBase.get_workflow_cls classmethod
get_workflow_cls(fastvideo_args: FastVideoArgs) -> Optional[WorkflowBase]

Factory method to get the appropriate workflow class based on execution mode.

This method acts as a workflow factory, returning the appropriate workflow class implementation based on the specified execution mode in the configuration arguments.

Parameters:

Name Type Description Default
fastvideo_args FastVideoArgs

Configuration object containing the execution mode and other parameters.

required

Returns:

Type Description
Optional[WorkflowBase]

The appropriate workflow class for the specified execution mode,

Optional[WorkflowBase]

or None if no workflow is available for the given mode.

Source code in fastvideo/workflow/workflow_base.py
@classmethod
def get_workflow_cls(
        cls, fastvideo_args: FastVideoArgs) -> Optional["WorkflowBase"]:
    """
    Factory method to get the appropriate workflow class based on execution mode.

    This method acts as a workflow factory, returning the appropriate
    workflow class implementation based on the specified execution mode
    in the configuration arguments.

    Args:
        fastvideo_args: Configuration object containing the execution mode
                      and other parameters.

    Returns:
        The appropriate workflow class for the specified execution mode,
        or None if no workflow is available for the given mode.
    """
    if fastvideo_args.mode == ExecutionMode.PREPROCESS:
        from fastvideo.workflow.preprocess.preprocess_workflow import (
            PreprocessWorkflow)
        return PreprocessWorkflow.get_workflow_cls(fastvideo_args)
    else:
        raise ValueError(
            f"Execution mode: {fastvideo_args.mode} is not supported in workflow."
        )
fastvideo.workflow.workflow_base.WorkflowBase.load_pipelines
load_pipelines() -> None

Create and initialize all registered pipelines.

This method instantiates pipeline objects from their configurations and makes them available as both dictionary entries and instance attributes for convenient access.

Source code in fastvideo/workflow/workflow_base.py
def load_pipelines(self) -> None:
    """
    Create and initialize all registered pipelines.

    This method instantiates pipeline objects from their configurations
    and makes them available as both dictionary entries and instance
    attributes for convenient access.
    """
    for pipeline_name, pipeline_config in self._pipeline_configs.items():
        pipeline_type, fastvideo_args = pipeline_config
        pipeline = build_pipeline(fastvideo_args, pipeline_type)
        self._pipelines[pipeline_name] = pipeline
        setattr(self, pipeline_name, pipeline)
fastvideo.workflow.workflow_base.WorkflowBase.prepare_system_environment abstractmethod
prepare_system_environment() -> None

Prepare the system environment for workflow execution.

Subclasses must implement this method to handle any system-level setup required before pipeline execution (e.g., GPU initialization, temporary directories, resource allocation).

Source code in fastvideo/workflow/workflow_base.py
@abstractmethod
def prepare_system_environment(self) -> None:
    """
    Prepare the system environment for workflow execution.

    Subclasses must implement this method to handle any system-level
    setup required before pipeline execution (e.g., GPU initialization,
    temporary directories, resource allocation).
    """
    pass
fastvideo.workflow.workflow_base.WorkflowBase.register_components abstractmethod
register_components() -> None

Register workflow-specific components.

Subclasses must implement this method to register any components needed for their specific workflow (e.g., databases, external APIs, shared resources).

Source code in fastvideo/workflow/workflow_base.py
@abstractmethod
def register_components(self) -> None:
    """
    Register workflow-specific components.

    Subclasses must implement this method to register any components
    needed for their specific workflow (e.g., databases, external APIs,
    shared resources).
    """
    pass
fastvideo.workflow.workflow_base.WorkflowBase.register_pipelines abstractmethod
register_pipelines() -> None

Register workflow-specific pipelines.

Subclasses must implement this method to define which pipelines are needed for their specific workflow and how they should be configured.

Source code in fastvideo/workflow/workflow_base.py
@abstractmethod
def register_pipelines(self) -> None:
    """
    Register workflow-specific pipelines.

    Subclasses must implement this method to define which pipelines
    are needed for their specific workflow and how they should be
    configured.
    """
    pass
fastvideo.workflow.workflow_base.WorkflowBase.run abstractmethod
run()

Execute the main workflow logic.

Subclasses must implement this method to define the specific execution flow for their workflow, coordinating the registered pipelines and components to accomplish the desired task.

Source code in fastvideo/workflow/workflow_base.py
@abstractmethod
def run(self):
    """
    Execute the main workflow logic.

    Subclasses must implement this method to define the specific
    execution flow for their workflow, coordinating the registered
    pipelines and components to accomplish the desired task.
    """
    pass

Functions