Skip to content

preprocess_pipeline_i2v

I2V Data Preprocessing pipeline implementation.

This module contains an implementation of the I2V Data Preprocessing pipeline using the modular pipeline architecture.

Classes

fastvideo.pipelines.preprocess.preprocess_pipeline_i2v.PreprocessPipeline_I2V

PreprocessPipeline_I2V(model_path: str, fastvideo_args: FastVideoArgs | TrainingArgs, required_config_modules: list[str] | None = None, loaded_modules: dict[str, Module] | None = None)

Bases: BasePreprocessPipeline

I2V preprocessing pipeline implementation.

Source code in fastvideo/pipelines/composed_pipeline_base.py
def __init__(self,
             model_path: str,
             fastvideo_args: FastVideoArgs | TrainingArgs,
             required_config_modules: list[str] | None = None,
             loaded_modules: dict[str, torch.nn.Module] | None = None):
    """
    Initialize the pipeline. After __init__, the pipeline should be ready to
    use. The pipeline should be stateless and not hold any batch state.
    """
    self.fastvideo_args = fastvideo_args

    self.model_path: str = model_path
    self._stages: list[PipelineStage] = []
    self._stage_name_mapping: dict[str, PipelineStage] = {}

    if required_config_modules is not None:
        self._required_config_modules = required_config_modules

    if self._required_config_modules is None:
        raise NotImplementedError(
            "Subclass must set _required_config_modules")

    maybe_init_distributed_environment_and_model_parallel(
        fastvideo_args.tp_size, fastvideo_args.sp_size)

    # Torch profiler. Enabled and configured through env vars:
    # FASTVIDEO_TORCH_PROFILER_DIR=/path/to/save/trace
    trace_dir = envs.FASTVIDEO_TORCH_PROFILER_DIR
    self.profiler_controller = get_or_create_profiler(trace_dir)
    self.profiler = self.profiler_controller.profiler

    self.local_rank = get_world_group().local_rank

    # Load modules directly in initialization
    logger.info("Loading pipeline modules...")
    with self.profiler_controller.region("profiler_region_model_loading"):
        self.modules = self.load_modules(fastvideo_args, loaded_modules)

Functions

fastvideo.pipelines.preprocess.preprocess_pipeline_i2v.PreprocessPipeline_I2V.create_record
create_record(video_name: str, vae_latent: ndarray, text_embedding: ndarray, valid_data: dict[str, Any], idx: int, extra_features: dict[str, Any] | None = None) -> dict[str, Any]

Create a record for the Parquet dataset with CLIP features.

Source code in fastvideo/pipelines/preprocess/preprocess_pipeline_i2v.py
def create_record(
        self,
        video_name: str,
        vae_latent: np.ndarray,
        text_embedding: np.ndarray,
        valid_data: dict[str, Any],
        idx: int,
        extra_features: dict[str, Any] | None = None) -> dict[str, Any]:
    """Create a record for the Parquet dataset with CLIP features."""
    record = super().create_record(video_name=video_name,
                                   vae_latent=vae_latent,
                                   text_embedding=text_embedding,
                                   valid_data=valid_data,
                                   idx=idx,
                                   extra_features=extra_features)

    if extra_features and "clip_feature" in extra_features:
        clip_feature = extra_features["clip_feature"]
        record.update({
            "clip_feature_bytes": clip_feature.tobytes(),
            "clip_feature_shape": list(clip_feature.shape),
            "clip_feature_dtype": str(clip_feature.dtype),
        })
    else:
        record.update({
            "clip_feature_bytes": b"",
            "clip_feature_shape": [],
            "clip_feature_dtype": "",
        })

    if extra_features and "first_frame_latent" in extra_features:
        first_frame_latent = extra_features["first_frame_latent"]
        record.update({
            "first_frame_latent_bytes":
            first_frame_latent.tobytes(),
            "first_frame_latent_shape":
            list(first_frame_latent.shape),
            "first_frame_latent_dtype":
            str(first_frame_latent.dtype),
        })
    else:
        record.update({
            "first_frame_latent_bytes": b"",
            "first_frame_latent_shape": [],
            "first_frame_latent_dtype": "",
        })

    if extra_features and "pil_image" in extra_features:
        pil_image = extra_features["pil_image"]
        record.update({
            "pil_image_bytes": pil_image.tobytes(),
            "pil_image_shape": list(pil_image.shape),
            "pil_image_dtype": str(pil_image.dtype),
        })
    else:
        record.update({
            "pil_image_bytes": b"",
            "pil_image_shape": [],
            "pil_image_dtype": "",
        })

    return record
fastvideo.pipelines.preprocess.preprocess_pipeline_i2v.PreprocessPipeline_I2V.get_pyarrow_schema
get_pyarrow_schema()

Return the PyArrow schema for I2V pipeline.

Source code in fastvideo/pipelines/preprocess/preprocess_pipeline_i2v.py
def get_pyarrow_schema(self):
    """Return the PyArrow schema for I2V pipeline."""
    return pyarrow_schema_i2v

Functions