Skip to content

workflow_base

Classes

fastvideo.workflow.workflow_base.WorkflowBase

WorkflowBase(fastvideo_args: FastVideoArgs)

Bases: ABC

Abstract base class for defining video processing workflows.

A workflow serves as the top-level orchestrator that coordinates multiple pipelines and components to accomplish a specific video processing task. The workflow pattern provides several key benefits:

  1. Separation of Concerns: Workflows separate high-level orchestration logic from low-level processing implementations in pipelines.

  2. Modularity: Different workflows can be created for different execution modes (preprocess, inference, etc.) while sharing common pipeline components.

  3. Configuration Management: Workflows manage the configuration and initialization of multiple related pipelines and components in a centralized manner.

  4. Environment Setup: Workflows handle system-level setup and resource allocation before pipeline execution begins.

  5. Lifecycle Management: Workflows control the complete lifecycle from initialization through execution to cleanup.

The workflow acts as a factory and coordinator, creating the appropriate pipelines based on configuration, setting up the execution environment, and orchestrating the overall processing flow.

Initialize the workflow with configuration arguments.

Parameters:

Name Type Description Default
fastvideo_args FastVideoArgs

Configuration object containing all parameters needed for workflow and pipeline setup.

required
Source code in fastvideo/workflow/workflow_base.py
def __init__(self, fastvideo_args: FastVideoArgs):
    """
    Initialize the workflow with configuration arguments.

    Args:
        fastvideo_args: Configuration object containing all parameters
                      needed for workflow and pipeline setup.
    """
    self.fastvideo_args = fastvideo_args

    # TODO: pipeline_config should be: dict[str, PipelineConfig]
    # pipeline_type should be included in the PipelineConfig
    # pipeline_config[pipeline_name] = (pipeline_type, fastvideo_args)
    self._pipeline_configs: dict[str, tuple[PipelineType,
                                            FastVideoArgs]] = {}
    self._pipelines: dict[str, ComposedPipelineBase] = {}
    self._components: dict[str, Any] = {}
    self.register_pipelines()
    self.register_components()

    self.prepare_system_environment()
    self.load_pipelines()

Functions

fastvideo.workflow.workflow_base.WorkflowBase.add_component
add_component(component_name: str, component: Any) -> None

Register a component instance with the workflow.

Components are auxiliary objects that may be shared across pipelines or used for workflow-level functionality (e.g., databases, caches, external services).

Parameters:

Name Type Description Default
component_name str

Unique identifier for the component.

required
component Any

The component instance to register.

required
Source code in fastvideo/workflow/workflow_base.py
def add_component(self, component_name: str, component: Any) -> None:
    """
    Register a component instance with the workflow.

    Components are auxiliary objects that may be shared across pipelines
    or used for workflow-level functionality (e.g., databases, caches,
    external services).

    Args:
        component_name: Unique identifier for the component.
        component: The component instance to register.
    """
    self._components[component_name] = component
    setattr(self, component_name, component)
fastvideo.workflow.workflow_base.WorkflowBase.add_pipeline_config
add_pipeline_config(pipeline_name: str, pipeline_config: tuple[PipelineType, FastVideoArgs]) -> None

Register a pipeline configuration for later instantiation.

Parameters:

Name Type Description Default
pipeline_name str

Unique identifier for the pipeline.

required
pipeline_config tuple[PipelineType, FastVideoArgs]

Tuple containing the pipeline type and configuration arguments.

required
Source code in fastvideo/workflow/workflow_base.py
def add_pipeline_config(
        self, pipeline_name: str,
        pipeline_config: tuple[PipelineType, FastVideoArgs]) -> None:
    """
    Register a pipeline configuration for later instantiation.

    Args:
        pipeline_name: Unique identifier for the pipeline.
        pipeline_config: Tuple containing the pipeline type and
                       configuration arguments.
    """
    self._pipeline_configs[pipeline_name] = pipeline_config
fastvideo.workflow.workflow_base.WorkflowBase.get_component
get_component(component_name: str) -> Any

Retrieve a registered component by name.

Parameters:

Name Type Description Default
component_name str

The name of the component to retrieve.

required

Returns:

Type Description
Any

The component instance.

Source code in fastvideo/workflow/workflow_base.py
def get_component(self, component_name: str) -> Any:
    """
    Retrieve a registered component by name.

    Args:
        component_name: The name of the component to retrieve.

    Returns:
        The component instance.
    """
    return self._components[component_name]
fastvideo.workflow.workflow_base.WorkflowBase.get_workflow_cls classmethod
get_workflow_cls(fastvideo_args: FastVideoArgs) -> Optional[WorkflowBase]

Factory method to get the appropriate workflow class based on execution mode.

This method acts as a workflow factory, returning the appropriate workflow class implementation based on the specified execution mode in the configuration arguments.

Parameters:

Name Type Description Default
fastvideo_args FastVideoArgs

Configuration object containing the execution mode and other parameters.

required

Returns:

Type Description
Optional[WorkflowBase]

The appropriate workflow class for the specified execution mode,

Optional[WorkflowBase]

or None if no workflow is available for the given mode.

Source code in fastvideo/workflow/workflow_base.py
@classmethod
def get_workflow_cls(
        cls, fastvideo_args: FastVideoArgs) -> Optional["WorkflowBase"]:
    """
    Factory method to get the appropriate workflow class based on execution mode.

    This method acts as a workflow factory, returning the appropriate
    workflow class implementation based on the specified execution mode
    in the configuration arguments.

    Args:
        fastvideo_args: Configuration object containing the execution mode
                      and other parameters.

    Returns:
        The appropriate workflow class for the specified execution mode,
        or None if no workflow is available for the given mode.
    """
    if fastvideo_args.mode == ExecutionMode.PREPROCESS:
        from fastvideo.workflow.preprocess.preprocess_workflow import (
            PreprocessWorkflow)
        return PreprocessWorkflow.get_workflow_cls(fastvideo_args)
    else:
        raise ValueError(
            f"Execution mode: {fastvideo_args.mode} is not supported in workflow."
        )
fastvideo.workflow.workflow_base.WorkflowBase.load_pipelines
load_pipelines() -> None

Create and initialize all registered pipelines.

This method instantiates pipeline objects from their configurations and makes them available as both dictionary entries and instance attributes for convenient access.

Source code in fastvideo/workflow/workflow_base.py
def load_pipelines(self) -> None:
    """
    Create and initialize all registered pipelines.

    This method instantiates pipeline objects from their configurations
    and makes them available as both dictionary entries and instance
    attributes for convenient access.
    """
    for pipeline_name, pipeline_config in self._pipeline_configs.items():
        pipeline_type, fastvideo_args = pipeline_config
        pipeline = build_pipeline(fastvideo_args, pipeline_type)
        self._pipelines[pipeline_name] = pipeline
        setattr(self, pipeline_name, pipeline)
fastvideo.workflow.workflow_base.WorkflowBase.prepare_system_environment abstractmethod
prepare_system_environment() -> None

Prepare the system environment for workflow execution.

Subclasses must implement this method to handle any system-level setup required before pipeline execution (e.g., GPU initialization, temporary directories, resource allocation).

Source code in fastvideo/workflow/workflow_base.py
@abstractmethod
def prepare_system_environment(self) -> None:
    """
    Prepare the system environment for workflow execution.

    Subclasses must implement this method to handle any system-level
    setup required before pipeline execution (e.g., GPU initialization,
    temporary directories, resource allocation).
    """
    pass
fastvideo.workflow.workflow_base.WorkflowBase.register_components abstractmethod
register_components() -> None

Register workflow-specific components.

Subclasses must implement this method to register any components needed for their specific workflow (e.g., databases, external APIs, shared resources).

Source code in fastvideo/workflow/workflow_base.py
@abstractmethod
def register_components(self) -> None:
    """
    Register workflow-specific components.

    Subclasses must implement this method to register any components
    needed for their specific workflow (e.g., databases, external APIs,
    shared resources).
    """
    pass
fastvideo.workflow.workflow_base.WorkflowBase.register_pipelines abstractmethod
register_pipelines() -> None

Register workflow-specific pipelines.

Subclasses must implement this method to define which pipelines are needed for their specific workflow and how they should be configured.

Source code in fastvideo/workflow/workflow_base.py
@abstractmethod
def register_pipelines(self) -> None:
    """
    Register workflow-specific pipelines.

    Subclasses must implement this method to define which pipelines
    are needed for their specific workflow and how they should be
    configured.
    """
    pass
fastvideo.workflow.workflow_base.WorkflowBase.run abstractmethod
run()

Execute the main workflow logic.

Subclasses must implement this method to define the specific execution flow for their workflow, coordinating the registered pipelines and components to accomplish the desired task.

Source code in fastvideo/workflow/workflow_base.py
@abstractmethod
def run(self):
    """
    Execute the main workflow logic.

    Subclasses must implement this method to define the specific
    execution flow for their workflow, coordinating the registered
    pipelines and components to accomplish the desired task.
    """
    pass

Functions