Skip to content

components

Classes

fastvideo.workflow.preprocess.components.ParquetDatasetSaver

ParquetDatasetSaver(flush_frequency: int, samples_per_file: int, schema: Schema, record_creator: Callable[..., list[dict[str, Any]]])

Component for saving and writing Parquet datasets using shared parquet_io.

Source code in fastvideo/workflow/preprocess/components.py
def __init__(self, flush_frequency: int, samples_per_file: int,
             schema: pa.Schema,
             record_creator: Callable[..., list[dict[str, Any]]]):
    self.flush_frequency = flush_frequency
    self.samples_per_file = samples_per_file
    self.schema = schema
    self.create_records_from_batch = record_creator
    self.num_processed_samples: int = 0
    self._writer: ParquetDatasetWriter | None = None

Functions

fastvideo.workflow.preprocess.components.ParquetDatasetSaver.clean_up
clean_up() -> None

Clean up all tables

Source code in fastvideo/workflow/preprocess/components.py
def clean_up(self) -> None:
    """Clean up all tables"""
    self.flush_tables(write_remainder=True)
    self._writer = None
    self.num_processed_samples = 0
    gc.collect()
fastvideo.workflow.preprocess.components.ParquetDatasetSaver.flush_tables
flush_tables(write_remainder: bool = False)

Flush buffered records to disk.

Parameters:

Name Type Description Default
output_dir

Directory where parquet files are written. Kept for API symmetry (writer already configured with this path).

required
write_remainder bool

If True, also write any leftover rows smaller than samples_per_file as a final small file. Useful for the last flush.

False
Source code in fastvideo/workflow/preprocess/components.py
def flush_tables(self, write_remainder: bool = False):
    """Flush buffered records to disk.

    Args:
        output_dir: Directory where parquet files are written. Kept for API
            symmetry (writer already configured with this path).
        write_remainder: If True, also write any leftover rows smaller than
            ``samples_per_file`` as a final small file. Useful for the last flush.
    """
    if self._writer is None:
        return
    _ = self._writer.flush(write_remainder=write_remainder)
    # Reset processed sample count modulo samples_per_file
    remainder = self.num_processed_samples % self.samples_per_file
    self.num_processed_samples = 0 if write_remainder else remainder
fastvideo.workflow.preprocess.components.ParquetDatasetSaver.save_and_write_parquet_batch
save_and_write_parquet_batch(batch: PreprocessBatch, output_dir: str, extra_features: dict[str, Any] | None = None) -> None

Save and write Parquet dataset batch

Parameters:

Name Type Description Default
batch PreprocessBatch

PreprocessBatch containing video and metadata information

required
output_dir str

Output directory

required
extra_features dict[str, Any] | None

Extra features

None

Returns:

Type Description
None

Number of processed samples

Source code in fastvideo/workflow/preprocess/components.py
def save_and_write_parquet_batch(
        self,
        batch: PreprocessBatch,
        output_dir: str,
        extra_features: dict[str, Any] | None = None) -> None:
    """
    Save and write Parquet dataset batch

    Args:
        batch: PreprocessBatch containing video and metadata information
        output_dir: Output directory
        extra_features: Extra features

    Returns:
        Number of processed samples
    """
    assert isinstance(batch.latents, torch.Tensor)
    assert isinstance(batch.prompt_embeds, list)
    assert isinstance(batch.prompt_attention_mask, list)

    # Process non-padded embeddings (if needed)
    if batch.prompt_attention_mask is not None:
        batch.prompt_embeds = self._process_non_padded_embeddings(
            batch.prompt_embeds[0], batch.prompt_attention_mask[0])
    else:
        raise ValueError("prompt_attention_mask is None")

    # Prepare batch data for Parquet dataset
    batch_data: list[dict[str, Any]] = []

    for key in dataclasses.fields(batch):
        value = getattr(batch, key.name)
        if isinstance(value, list):
            for idx in range(len(value)):
                if isinstance(value[idx], torch.Tensor):
                    value[idx] = value[idx].cpu().numpy()
        elif isinstance(value, torch.Tensor):
            value = value.cpu().numpy()
            setattr(batch, key.name, value)

    # Create record for Parquet dataset
    records = self.create_records_from_batch(batch)
    batch_data.extend(records)

    if batch_data:
        self.num_processed_samples += len(batch_data)
        table = records_to_table(batch_data, self.schema)
        if self._writer is None:
            os.makedirs(output_dir, exist_ok=True)
            self._writer = ParquetDatasetWriter(
                out_dir=output_dir, samples_per_file=self.samples_per_file)
        self._writer.append_table(table)
        logger.debug("Collected batch with %s samples", len(table))

    # If flush is needed
    if self.num_processed_samples >= self.flush_frequency:
        self.flush_tables()

fastvideo.workflow.preprocess.components.PreprocessingDataValidator

PreprocessingDataValidator(max_height: int = 1024, max_width: int = 1024, max_h_div_w_ratio: float = 17 / 16, min_h_div_w_ratio: float = 8 / 16, num_frames: int = 16, train_fps: int = 24, speed_factor: float = 1.0, video_length_tolerance_range: float = 5.0, drop_short_ratio: float = 0.0, hw_aspect_threshold: float = 1.5)
Source code in fastvideo/workflow/preprocess/components.py
def __init__(self,
             max_height: int = 1024,
             max_width: int = 1024,
             max_h_div_w_ratio: float = 17 / 16,
             min_h_div_w_ratio: float = 8 / 16,
             num_frames: int = 16,
             train_fps: int = 24,
             speed_factor: float = 1.0,
             video_length_tolerance_range: float = 5.0,
             drop_short_ratio: float = 0.0,
             hw_aspect_threshold: float = 1.5):
    self.max_height = max_height
    self.max_width = max_width
    self.max_h_div_w_ratio = max_h_div_w_ratio
    self.min_h_div_w_ratio = min_h_div_w_ratio
    self.num_frames = num_frames
    self.train_fps = train_fps
    self.speed_factor = speed_factor
    self.video_length_tolerance_range = video_length_tolerance_range
    self.drop_short_ratio = drop_short_ratio
    self.hw_aspect_threshold = hw_aspect_threshold
    self.validators: dict[str, Callable[[dict[str, Any]], bool]] = {}
    self.filter_counts: dict[str, int] = {}

    self.num_items_before_filtering = 0
    self.num_items_after_filtering = 0

    self.register_validators()

Functions

fastvideo.workflow.preprocess.components.PreprocessingDataValidator.__call__
__call__(batch: dict[str, Any]) -> bool

Validate whether the preprocessing data batch is valid.

Source code in fastvideo/workflow/preprocess/components.py
def __call__(self, batch: dict[str, Any]) -> bool:
    """
    Validate whether the preprocessing data batch is valid.
    """
    self.num_items_before_filtering += 1

    for name, validator in self.validators.items():
        if not validator(batch):
            self.filter_counts[name] += 1
            return False

    self.num_items_after_filtering += 1
    return True

Functions