Skip to content

attention

Classes

fastvideo.attention.AttentionBackend

Bases: ABC

Abstract class for attention backends.

fastvideo.attention.AttentionMetadata dataclass

AttentionMetadata(current_timestep: int)

Attention metadata for prefill and decode batched together.

Functions

fastvideo.attention.AttentionMetadata.asdict_zerocopy
asdict_zerocopy(skip_fields: set[str] | None = None) -> dict[str, Any]

Similar to dataclasses.asdict, but avoids deepcopying.

Source code in fastvideo/attention/backends/abstract.py
def asdict_zerocopy(self,
                    skip_fields: set[str] | None = None) -> dict[str, Any]:
    """Similar to dataclasses.asdict, but avoids deepcopying."""
    if skip_fields is None:
        skip_fields = set()
    # Note that if we add dataclasses as fields, they will need
    # similar handling.
    return {
        field.name: getattr(self, field.name)
        for field in fields(self) if field.name not in skip_fields
    }

fastvideo.attention.AttentionMetadataBuilder

AttentionMetadataBuilder()

Bases: ABC, Generic[T]

Abstract class for attention metadata builders.

Create the builder, remember some configuration and parameters.

Source code in fastvideo/attention/backends/abstract.py
@abstractmethod
def __init__(self) -> None:
    """Create the builder, remember some configuration and parameters."""
    raise NotImplementedError

Functions

fastvideo.attention.AttentionMetadataBuilder.build abstractmethod
build(**kwargs: dict[str, Any]) -> AttentionMetadata

Build attention metadata with on-device tensors.

Source code in fastvideo/attention/backends/abstract.py
@abstractmethod
def build(
    self,
    **kwargs: dict[str, Any],
) -> AttentionMetadata:
    """Build attention metadata with on-device tensors."""
    raise NotImplementedError
fastvideo.attention.AttentionMetadataBuilder.prepare abstractmethod
prepare() -> None

Prepare for one batch.

Source code in fastvideo/attention/backends/abstract.py
@abstractmethod
def prepare(self) -> None:
    """Prepare for one batch."""
    raise NotImplementedError

fastvideo.attention.DistributedAttention

DistributedAttention(num_heads: int, head_size: int, num_kv_heads: int | None = None, softmax_scale: float | None = None, causal: bool = False, supported_attention_backends: tuple[AttentionBackendEnum, ...] | None = None, prefix: str = '', **extra_impl_args)

Bases: Module

Distributed attention layer.

Source code in fastvideo/attention/layer.py
def __init__(self,
             num_heads: int,
             head_size: int,
             num_kv_heads: int | None = None,
             softmax_scale: float | None = None,
             causal: bool = False,
             supported_attention_backends: tuple[AttentionBackendEnum, ...]
             | None = None,
             prefix: str = "",
             **extra_impl_args) -> None:
    super().__init__()
    if softmax_scale is None:
        self.softmax_scale = head_size**-0.5
    else:
        self.softmax_scale = softmax_scale

    if num_kv_heads is None:
        num_kv_heads = num_heads

    dtype = get_compute_dtype()
    attn_backend = get_attn_backend(
        head_size,
        dtype,
        supported_attention_backends=supported_attention_backends)
    impl_cls = attn_backend.get_impl_cls()
    self.attn_impl = impl_cls(num_heads=num_heads,
                              head_size=head_size,
                              causal=causal,
                              softmax_scale=self.softmax_scale,
                              num_kv_heads=num_kv_heads,
                              prefix=f"{prefix}.impl",
                              **extra_impl_args)
    self.num_heads = num_heads
    self.head_size = head_size
    self.num_kv_heads = num_kv_heads
    self.backend = backend_name_to_enum(attn_backend.get_name())
    self.dtype = dtype

Functions

fastvideo.attention.DistributedAttention.forward
forward(q: Tensor, k: Tensor, v: Tensor, replicated_q: Tensor | None = None, replicated_k: Tensor | None = None, replicated_v: Tensor | None = None) -> tuple[Tensor, Tensor | None]

Forward pass for distributed attention.

Parameters:

Name Type Description Default
q Tensor

Query tensor [batch_size, seq_len, num_heads, head_dim]

required
k Tensor

Key tensor [batch_size, seq_len, num_heads, head_dim]

required
v Tensor

Value tensor [batch_size, seq_len, num_heads, head_dim]

required
replicated_q Optional[Tensor]

Replicated query tensor, typically for text tokens

None
replicated_k Optional[Tensor]

Replicated key tensor

None
replicated_v Optional[Tensor]

Replicated value tensor

None

Returns:

Type Description
tuple[Tensor, Tensor | None]

Tuple[torch.Tensor, Optional[torch.Tensor]]: A tuple containing: - o (torch.Tensor): Output tensor after attention for the main sequence - replicated_o (Optional[torch.Tensor]): Output tensor for replicated tokens, if provided

Source code in fastvideo/attention/layer.py
@torch.compiler.disable
def forward(
    self,
    q: torch.Tensor,
    k: torch.Tensor,
    v: torch.Tensor,
    replicated_q: torch.Tensor | None = None,
    replicated_k: torch.Tensor | None = None,
    replicated_v: torch.Tensor | None = None,
) -> tuple[torch.Tensor, torch.Tensor | None]:
    """Forward pass for distributed attention.

    Args:
        q (torch.Tensor): Query tensor [batch_size, seq_len, num_heads, head_dim]
        k (torch.Tensor): Key tensor [batch_size, seq_len, num_heads, head_dim]
        v (torch.Tensor): Value tensor [batch_size, seq_len, num_heads, head_dim]
        replicated_q (Optional[torch.Tensor]): Replicated query tensor, typically for text tokens
        replicated_k (Optional[torch.Tensor]): Replicated key tensor
        replicated_v (Optional[torch.Tensor]): Replicated value tensor

    Returns:
        Tuple[torch.Tensor, Optional[torch.Tensor]]: A tuple containing:
            - o (torch.Tensor): Output tensor after attention for the main sequence
            - replicated_o (Optional[torch.Tensor]): Output tensor for replicated tokens, if provided
    """
    # Check input shapes
    assert q.dim() == 4 and k.dim() == 4 and v.dim(
    ) == 4, "Expected 4D tensors"
    batch_size, seq_len, num_heads, head_dim = q.shape
    local_rank = get_sp_parallel_rank()
    world_size = get_sp_world_size()

    forward_context: ForwardContext = get_forward_context()
    ctx_attn_metadata = forward_context.attn_metadata

    # Stack QKV
    qkv = torch.cat([q, k, v], dim=0)  # [3, seq_len, num_heads, head_dim]

    # Redistribute heads across sequence dimension
    qkv = sequence_model_parallel_all_to_all_4D(qkv,
                                                scatter_dim=2,
                                                gather_dim=1)
    # Apply backend-specific preprocess_qkv
    qkv = self.attn_impl.preprocess_qkv(qkv, ctx_attn_metadata)

    # Concatenate with replicated QKV if provided
    if replicated_q is not None:
        assert replicated_k is not None and replicated_v is not None
        replicated_qkv = torch.cat(
            [replicated_q, replicated_k, replicated_v],
            dim=0)  # [3, seq_len, num_heads, head_dim]
        heads_per_rank = num_heads // world_size
        replicated_qkv = replicated_qkv[:, :, local_rank *
                                        heads_per_rank:(local_rank + 1) *
                                        heads_per_rank]
        qkv = torch.cat([qkv, replicated_qkv], dim=1)

    q, k, v = qkv.chunk(3, dim=0)

    output = self.attn_impl.forward(q, k, v, ctx_attn_metadata)

    # Redistribute back if using sequence parallelism
    replicated_output = None
    if replicated_q is not None:
        replicated_output = output[:, seq_len * world_size:]
        output = output[:, :seq_len * world_size]
        # TODO: make this asynchronous
        replicated_output = sequence_model_parallel_all_gather(
            replicated_output.contiguous(), dim=2)
    # Apply backend-specific postprocess_output
    output = self.attn_impl.postprocess_output(output, ctx_attn_metadata)

    output = sequence_model_parallel_all_to_all_4D(output,
                                                   scatter_dim=1,
                                                   gather_dim=2)
    return output, replicated_output

fastvideo.attention.DistributedAttention_VSA

DistributedAttention_VSA(num_heads: int, head_size: int, num_kv_heads: int | None = None, softmax_scale: float | None = None, causal: bool = False, supported_attention_backends: tuple[AttentionBackendEnum, ...] | None = None, prefix: str = '', **extra_impl_args)

Bases: DistributedAttention

Distributed attention layer with VSA support.

Source code in fastvideo/attention/layer.py
def __init__(self,
             num_heads: int,
             head_size: int,
             num_kv_heads: int | None = None,
             softmax_scale: float | None = None,
             causal: bool = False,
             supported_attention_backends: tuple[AttentionBackendEnum, ...]
             | None = None,
             prefix: str = "",
             **extra_impl_args) -> None:
    super().__init__()
    if softmax_scale is None:
        self.softmax_scale = head_size**-0.5
    else:
        self.softmax_scale = softmax_scale

    if num_kv_heads is None:
        num_kv_heads = num_heads

    dtype = get_compute_dtype()
    attn_backend = get_attn_backend(
        head_size,
        dtype,
        supported_attention_backends=supported_attention_backends)
    impl_cls = attn_backend.get_impl_cls()
    self.attn_impl = impl_cls(num_heads=num_heads,
                              head_size=head_size,
                              causal=causal,
                              softmax_scale=self.softmax_scale,
                              num_kv_heads=num_kv_heads,
                              prefix=f"{prefix}.impl",
                              **extra_impl_args)
    self.num_heads = num_heads
    self.head_size = head_size
    self.num_kv_heads = num_kv_heads
    self.backend = backend_name_to_enum(attn_backend.get_name())
    self.dtype = dtype

Functions

fastvideo.attention.DistributedAttention_VSA.forward
forward(q: Tensor, k: Tensor, v: Tensor, replicated_q: Tensor | None = None, replicated_k: Tensor | None = None, replicated_v: Tensor | None = None, gate_compress: Tensor | None = None) -> tuple[Tensor, Tensor | None]

Forward pass for distributed attention.

Parameters:

Name Type Description Default
q Tensor

Query tensor [batch_size, seq_len, num_heads, head_dim]

required
k Tensor

Key tensor [batch_size, seq_len, num_heads, head_dim]

required
v Tensor

Value tensor [batch_size, seq_len, num_heads, head_dim]

required
gate_compress Tensor

Gate compress tensor [batch_size, seq_len, num_heads, head_dim]

None
replicated_q Optional[Tensor]

Replicated query tensor, typically for text tokens

None
replicated_k Optional[Tensor]

Replicated key tensor

None
replicated_v Optional[Tensor]

Replicated value tensor

None

Returns:

Type Description
tuple[Tensor, Tensor | None]

Tuple[torch.Tensor, Optional[torch.Tensor]]: A tuple containing: - o (torch.Tensor): Output tensor after attention for the main sequence - replicated_o (Optional[torch.Tensor]): Output tensor for replicated tokens, if provided

Source code in fastvideo/attention/layer.py
@torch.compiler.disable
def forward(
    self,
    q: torch.Tensor,
    k: torch.Tensor,
    v: torch.Tensor,
    replicated_q: torch.Tensor | None = None,
    replicated_k: torch.Tensor | None = None,
    replicated_v: torch.Tensor | None = None,
    gate_compress: torch.Tensor | None = None,
) -> tuple[torch.Tensor, torch.Tensor | None]:
    """Forward pass for distributed attention.

    Args:
        q (torch.Tensor): Query tensor [batch_size, seq_len, num_heads, head_dim]
        k (torch.Tensor): Key tensor [batch_size, seq_len, num_heads, head_dim]
        v (torch.Tensor): Value tensor [batch_size, seq_len, num_heads, head_dim]
        gate_compress (torch.Tensor): Gate compress tensor [batch_size, seq_len, num_heads, head_dim]
        replicated_q (Optional[torch.Tensor]): Replicated query tensor, typically for text tokens
        replicated_k (Optional[torch.Tensor]): Replicated key tensor
        replicated_v (Optional[torch.Tensor]): Replicated value tensor

    Returns:
        Tuple[torch.Tensor, Optional[torch.Tensor]]: A tuple containing:
            - o (torch.Tensor): Output tensor after attention for the main sequence
            - replicated_o (Optional[torch.Tensor]): Output tensor for replicated tokens, if provided
    """
    # Check text tokens are not supported for VSA now
    assert replicated_q is None and replicated_k is None and replicated_v is None, "Replicated QKV is not supported for VSA now"
    # Check input shapes
    assert q.dim() == 4 and k.dim() == 4 and v.dim(
    ) == 4, "Expected 4D tensors"

    forward_context: ForwardContext = get_forward_context()
    ctx_attn_metadata = forward_context.attn_metadata

    # Stack QKV
    qkvg = torch.cat([q, k, v, gate_compress],
                     dim=0)  # [3, seq_len, num_heads, head_dim]

    # Redistribute heads across sequence dimension
    qkvg = sequence_model_parallel_all_to_all_4D(qkvg,
                                                 scatter_dim=2,
                                                 gather_dim=1)

    qkvg = self.attn_impl.preprocess_qkv(qkvg, ctx_attn_metadata)

    q, k, v, gate_compress = qkvg.chunk(4, dim=0)
    output = self.attn_impl.forward(
        q, k, v, gate_compress, ctx_attn_metadata)  # type: ignore[call-arg]

    # Redistribute back if using sequence parallelism
    replicated_output = None

    # Apply backend-specific postprocess_output
    output = self.attn_impl.postprocess_output(output, ctx_attn_metadata)

    output = sequence_model_parallel_all_to_all_4D(output,
                                                   scatter_dim=1,
                                                   gather_dim=2)
    return output, replicated_output

fastvideo.attention.LocalAttention

LocalAttention(num_heads: int, head_size: int, num_kv_heads: int | None = None, softmax_scale: float | None = None, causal: bool = False, supported_attention_backends: tuple[AttentionBackendEnum, ...] | None = None, **extra_impl_args)

Bases: Module

Attention layer.

Source code in fastvideo/attention/layer.py
def __init__(self,
             num_heads: int,
             head_size: int,
             num_kv_heads: int | None = None,
             softmax_scale: float | None = None,
             causal: bool = False,
             supported_attention_backends: tuple[AttentionBackendEnum, ...]
             | None = None,
             **extra_impl_args) -> None:
    super().__init__()
    if softmax_scale is None:
        self.softmax_scale = head_size**-0.5
    else:
        self.softmax_scale = softmax_scale
    if num_kv_heads is None:
        num_kv_heads = num_heads

    dtype = get_compute_dtype()
    attn_backend = get_attn_backend(
        head_size,
        dtype,
        supported_attention_backends=supported_attention_backends)
    impl_cls = attn_backend.get_impl_cls()
    self.attn_impl = impl_cls(num_heads=num_heads,
                              head_size=head_size,
                              softmax_scale=self.softmax_scale,
                              num_kv_heads=num_kv_heads,
                              causal=causal,
                              **extra_impl_args)
    self.num_heads = num_heads
    self.head_size = head_size
    self.num_kv_heads = num_kv_heads
    self.backend = backend_name_to_enum(attn_backend.get_name())
    self.dtype = dtype

Functions

fastvideo.attention.LocalAttention.forward
forward(q: Tensor, k: Tensor, v: Tensor) -> Tensor

Apply local attention between query, key and value tensors.

Parameters:

Name Type Description Default
q Tensor

Query tensor of shape [batch_size, seq_len, num_heads, head_dim]

required
k Tensor

Key tensor of shape [batch_size, seq_len, num_heads, head_dim]

required
v Tensor

Value tensor of shape [batch_size, seq_len, num_heads, head_dim]

required

Returns:

Type Description
Tensor

torch.Tensor: Output tensor after local attention

Source code in fastvideo/attention/layer.py
def forward(
    self,
    q: torch.Tensor,
    k: torch.Tensor,
    v: torch.Tensor,
) -> torch.Tensor:
    """
    Apply local attention between query, key and value tensors.

    Args:
        q (torch.Tensor): Query tensor of shape [batch_size, seq_len, num_heads, head_dim]
        k (torch.Tensor): Key tensor of shape [batch_size, seq_len, num_heads, head_dim] 
        v (torch.Tensor): Value tensor of shape [batch_size, seq_len, num_heads, head_dim]

    Returns:
        torch.Tensor: Output tensor after local attention
    """
    # Check input shapes
    assert q.dim() == 4 and k.dim() == 4 and v.dim(
    ) == 4, "Expected 4D tensors"

    forward_context: ForwardContext = get_forward_context()
    ctx_attn_metadata = forward_context.attn_metadata

    output = self.attn_impl.forward(q, k, v, ctx_attn_metadata)
    return output

Modules

fastvideo.attention.backends

Modules

fastvideo.attention.backends.STA_configuration
Functions
fastvideo.attention.backends.STA_configuration.average_head_losses
average_head_losses(results: list[dict[str, Any]], selected_masks: list[list[int]]) -> dict[str, dict[str, ndarray]]

Average losses across all prompts for each mask strategy.

Source code in fastvideo/attention/backends/STA_configuration.py
def average_head_losses(
        results: list[dict[str, Any]],
        selected_masks: list[list[int]]) -> dict[str, dict[str, np.ndarray]]:
    """Average losses across all prompts for each mask strategy."""
    # Initialize a dictionary to store the averaged results
    averaged_losses: dict[str, dict[str, np.ndarray]] = {}
    loss_type = 'L2_loss'
    # Get all loss types (e.g., 'L2_loss')
    averaged_losses[loss_type] = {}

    for mask in selected_masks:
        mask_str = str(mask)
        data_shape = np.array(results[0][loss_type][mask_str]).shape
        accumulated_data = np.zeros(data_shape)

        # Sum across all prompts
        for prompt_result in results:
            accumulated_data += np.array(prompt_result[loss_type][mask_str])

        # Average by dividing by number of prompts
        averaged_data = accumulated_data / len(results)
        averaged_losses[loss_type][mask_str] = averaged_data

    return averaged_losses
fastvideo.attention.backends.STA_configuration.configure_sta
configure_sta(mode: str = 'STA_searching', layer_num: int = 40, time_step_num: int = 50, head_num: int = 40, **kwargs) -> list[list[list[Any]]]

Configure Sliding Tile Attention (STA) parameters based on the specified mode.

Parameters:

mode : str The STA mode to use. Options are: - 'STA_searching': Generate a set of mask candidates for initial search - 'STA_tuning': Select best mask strategy based on previously saved results - 'STA_inference': Load and use a previously tuned mask strategy layer_num: int, number of layers time_step_num: int, number of timesteps head_num: int, number of heads

**kwargs : dict Mode-specific parameters:

For 'STA_searching':
- mask_candidates: list of str, optional, mask candidates to use
- mask_selected: list of int, optional, indices of selected masks

For 'STA_tuning':
- mask_search_files_path: str, required, path to mask search results
- mask_candidates: list of str, optional, mask candidates to use
- mask_selected: list of int, optional, indices of selected masks
- skip_time_steps: int, optional, number of time steps to use full attention (default 12)
- save_dir: str, optional, directory to save mask strategy (default "mask_candidates")

For 'STA_inference':
- load_path: str, optional, path to load mask strategy (default "mask_candidates/mask_strategy.json")
Source code in fastvideo/attention/backends/STA_configuration.py
def configure_sta(mode: str = 'STA_searching',
                  layer_num: int = 40,
                  time_step_num: int = 50,
                  head_num: int = 40,
                  **kwargs) -> list[list[list[Any]]]:
    """
    Configure Sliding Tile Attention (STA) parameters based on the specified mode.

    Parameters:
    ----------
    mode : str
        The STA mode to use. Options are:
        - 'STA_searching': Generate a set of mask candidates for initial search
        - 'STA_tuning': Select best mask strategy based on previously saved results
        - 'STA_inference': Load and use a previously tuned mask strategy
    layer_num: int, number of layers
    time_step_num: int, number of timesteps
    head_num: int, number of heads

    **kwargs : dict
        Mode-specific parameters:

        For 'STA_searching':
        - mask_candidates: list of str, optional, mask candidates to use
        - mask_selected: list of int, optional, indices of selected masks

        For 'STA_tuning':
        - mask_search_files_path: str, required, path to mask search results
        - mask_candidates: list of str, optional, mask candidates to use
        - mask_selected: list of int, optional, indices of selected masks
        - skip_time_steps: int, optional, number of time steps to use full attention (default 12)
        - save_dir: str, optional, directory to save mask strategy (default "mask_candidates")

        For 'STA_inference':
        - load_path: str, optional, path to load mask strategy (default "mask_candidates/mask_strategy.json")
    """
    valid_modes = [
        'STA_searching', 'STA_tuning', 'STA_inference', 'STA_tuning_cfg'
    ]
    if mode not in valid_modes:
        raise ValueError(f"Mode must be one of {valid_modes}, got {mode}")

    if mode == 'STA_searching':
        # Get parameters with defaults
        mask_candidates: list[str] | None = kwargs.get('mask_candidates')
        if mask_candidates is None:
            raise ValueError(
                "mask_candidates is required for STA_searching mode")
        mask_selected: list[int] = kwargs.get('mask_selected',
                                              list(range(len(mask_candidates))))

        # Parse selected masks
        selected_masks: list[list[int]] = []
        for index in mask_selected:
            mask = mask_candidates[index]
            masks_list = [int(x) for x in mask.split(',')]
            selected_masks.append(masks_list)

        # Create 3D mask structure with fixed dimensions (t=50, l=60)
        masks_3d: list[list[list[list[int]]]] = []
        for i in range(time_step_num):  # Fixed t dimension = 50
            row = []
            for j in range(layer_num):  # Fixed l dimension = 60
                row.append(selected_masks)  # Add all masks at each position
            masks_3d.append(row)

        return masks_3d

    elif mode == 'STA_tuning':
        # Get required parameters
        mask_search_files_path: str | None = kwargs.get(
            'mask_search_files_path')
        if not mask_search_files_path:
            raise ValueError(
                "mask_search_files_path is required for STA_tuning mode")

        # Get optional parameters with defaults
        mask_candidates_tuning: list[str] | None = kwargs.get('mask_candidates')
        if mask_candidates_tuning is None:
            raise ValueError("mask_candidates is required for STA_tuning mode")
        mask_selected_tuning: list[int] = kwargs.get(
            'mask_selected', list(range(len(mask_candidates_tuning))))
        skip_time_steps_tuning: int | None = kwargs.get('skip_time_steps')
        save_dir_tuning: str | None = kwargs.get('save_dir', "mask_candidates")

        # Parse selected masks
        selected_masks_tuning: list[list[int]] = []
        for index in mask_selected_tuning:
            mask = mask_candidates_tuning[index]
            masks_list = [int(x) for x in mask.split(',')]
            selected_masks_tuning.append(masks_list)

        # Read JSON results
        results = read_specific_json_files(mask_search_files_path)
        averaged_results = average_head_losses(results, selected_masks_tuning)

        # Add full attention mask for specific cases
        full_attention_mask_tuning: list[int] | None = kwargs.get(
            'full_attention_mask')
        if full_attention_mask_tuning is not None:
            selected_masks_tuning.append(full_attention_mask_tuning)

        # Select best mask strategy
        timesteps_tuning: int = kwargs.get('timesteps', time_step_num)
        if skip_time_steps_tuning is None:
            skip_time_steps_tuning = 12
        mask_strategy, sparsity, strategy_counts = select_best_mask_strategy(
            averaged_results, selected_masks_tuning, skip_time_steps_tuning,
            timesteps_tuning, head_num)

        # Save mask strategy
        if save_dir_tuning is not None:
            os.makedirs(save_dir_tuning, exist_ok=True)
            file_path = os.path.join(
                save_dir_tuning,
                f'mask_strategy_s{skip_time_steps_tuning}.json')
            with open(file_path, 'w') as f:
                json.dump(mask_strategy, f, indent=4)
            print(f"Successfully saved mask_strategy to {file_path}")

        # Print sparsity and strategy counts for information
        print(f"Overall sparsity: {sparsity:.4f}")
        print("\nStrategy usage counts:")
        total_heads = time_step_num * layer_num * head_num  # Fixed dimensions
        for strategy, count in strategy_counts.items():
            print(
                f"Strategy {strategy}: {count} heads ({count/total_heads*100:.2f}%)"
            )

        # Convert dictionary to 3D list with fixed dimensions
        mask_strategy_3d = dict_to_3d_list(mask_strategy,
                                           t_max=time_step_num,
                                           l_max=layer_num,
                                           h_max=head_num)

        return mask_strategy_3d
    elif mode == 'STA_tuning_cfg':
        # Get required parameters for both positive and negative paths
        mask_search_files_path_pos: str | None = kwargs.get(
            'mask_search_files_path_pos')
        mask_search_files_path_neg: str | None = kwargs.get(
            'mask_search_files_path_neg')
        save_dir_cfg: str | None = kwargs.get('save_dir')

        if not mask_search_files_path_pos or not mask_search_files_path_neg or not save_dir_cfg:
            raise ValueError(
                "mask_search_files_path_pos, mask_search_files_path_neg, and save_dir are required for STA_tuning_cfg mode"
            )

        # Get optional parameters with defaults
        mask_candidates_cfg: list[str] | None = kwargs.get('mask_candidates')
        if mask_candidates_cfg is None:
            raise ValueError(
                "mask_candidates is required for STA_tuning_cfg mode")
        mask_selected_cfg: list[int] = kwargs.get(
            'mask_selected', list(range(len(mask_candidates_cfg))))
        skip_time_steps_cfg: int | None = kwargs.get('skip_time_steps')

        # Parse selected masks
        selected_masks_cfg: list[list[int]] = []
        for index in mask_selected_cfg:
            mask = mask_candidates_cfg[index]
            masks_list = [int(x) for x in mask.split(',')]
            selected_masks_cfg.append(masks_list)

        # Read JSON results for both positive and negative paths
        pos_results = read_specific_json_files(mask_search_files_path_pos)
        neg_results = read_specific_json_files(mask_search_files_path_neg)
        # Combine positive and negative results into one list
        combined_results = pos_results + neg_results

        # Average the combined results
        averaged_results = average_head_losses(combined_results,
                                               selected_masks_cfg)

        # Add full attention mask for specific cases
        full_attention_mask_cfg: list[int] | None = kwargs.get(
            'full_attention_mask')
        if full_attention_mask_cfg is not None:
            selected_masks_cfg.append(full_attention_mask_cfg)

        timesteps_cfg: int = kwargs.get('timesteps', time_step_num)
        if skip_time_steps_cfg is None:
            skip_time_steps_cfg = 12
        # Select best mask strategy using combined results
        mask_strategy, sparsity, strategy_counts = select_best_mask_strategy(
            averaged_results, selected_masks_cfg, skip_time_steps_cfg,
            timesteps_cfg, head_num)

        # Save mask strategy
        os.makedirs(save_dir_cfg, exist_ok=True)
        file_path = os.path.join(save_dir_cfg,
                                 f'mask_strategy_s{skip_time_steps_cfg}.json')
        with open(file_path, 'w') as f:
            json.dump(mask_strategy, f, indent=4)
        print(f"Successfully saved mask_strategy to {file_path}")

        # Print sparsity and strategy counts for information
        print(f"Overall sparsity: {sparsity:.4f}")
        print("\nStrategy usage counts:")
        total_heads = time_step_num * layer_num * head_num  # Fixed dimensions
        for strategy, count in strategy_counts.items():
            print(
                f"Strategy {strategy}: {count} heads ({count/total_heads*100:.2f}%)"
            )

        # Convert dictionary to 3D list with fixed dimensions
        mask_strategy_3d = dict_to_3d_list(mask_strategy,
                                           t_max=time_step_num,
                                           l_max=layer_num,
                                           h_max=head_num)

        return mask_strategy_3d

    else:  # STA_inference
        # Get parameters with defaults
        load_path: str | None = kwargs.get(
            'load_path', "mask_candidates/mask_strategy.json")
        if load_path is None:
            raise ValueError("load_path is required for STA_inference mode")

        # Load previously saved mask strategy
        with open(load_path) as f:
            mask_strategy = json.load(f)

        # Convert dictionary to 3D list with fixed dimensions
        mask_strategy_3d = dict_to_3d_list(mask_strategy,
                                           t_max=time_step_num,
                                           l_max=layer_num,
                                           h_max=head_num)

        return mask_strategy_3d
fastvideo.attention.backends.STA_configuration.read_specific_json_files
read_specific_json_files(folder_path: str) -> list[dict[str, Any]]

Read and parse JSON files containing mask search results.

Source code in fastvideo/attention/backends/STA_configuration.py
def read_specific_json_files(folder_path: str) -> list[dict[str, Any]]:
    """Read and parse JSON files containing mask search results."""
    json_contents: list[dict[str, Any]] = []

    # List files only in the current directory (no walk)
    files = os.listdir(folder_path)
    # Filter files
    matching_files = [f for f in files if 'mask' in f and f.endswith('.json')]
    print(f"Found {len(matching_files)} matching files: {matching_files}")

    for file_name in matching_files:
        file_path = os.path.join(folder_path, file_name)
        with open(file_path) as file:
            data = json.load(file)
            json_contents.append(data)

    return json_contents
fastvideo.attention.backends.STA_configuration.select_best_mask_strategy
select_best_mask_strategy(averaged_results: dict[str, dict[str, ndarray]], selected_masks: list[list[int]], skip_time_steps: int = 12, timesteps: int = 50, head_num: int = 40) -> tuple[dict[str, list[int]], float, dict[str, int]]

Select the best mask strategy for each head based on loss minimization.

Source code in fastvideo/attention/backends/STA_configuration.py
def select_best_mask_strategy(
        averaged_results: dict[str, dict[str, np.ndarray]],
        selected_masks: list[list[int]],
        skip_time_steps: int = 12,
        timesteps: int = 50,
        head_num: int = 40
) -> tuple[dict[str, list[int]], float, dict[str, int]]:
    """Select the best mask strategy for each head based on loss minimization."""
    best_mask_strategy: dict[str, list[int]] = {}
    loss_type = 'L2_loss'
    # Get the shape of time steps and layers
    layers = len(averaged_results[loss_type][str(selected_masks[0])][0])

    # Counter for sparsity calculation
    total_tokens = 0  # total number of masked tokens
    total_length = 0  # total sequence length

    strategy_counts: dict[str, int] = {
        str(strategy): 0
        for strategy in selected_masks
    }
    full_attn_strategy = selected_masks[-1]  # Last strategy is full attention
    print(f"Strategy {full_attn_strategy}, skip first {skip_time_steps} steps ")

    for t in range(timesteps):
        for layer_idx in range(layers):
            for h in range(head_num):
                if t < skip_time_steps:  # First steps use full attention
                    strategy = full_attn_strategy
                else:
                    # Get losses for this head across all strategies
                    head_losses = []
                    for strategy in selected_masks[:
                                                   -1]:  # Exclude full attention
                        head_losses.append(averaged_results[loss_type][str(
                            strategy)][t][layer_idx][h])

                    # Find which strategy gives minimum loss
                    best_strategy_idx = np.argmin(head_losses)
                    strategy = selected_masks[best_strategy_idx]

                best_mask_strategy[f'{t}_{layer_idx}_{h}'] = strategy

                # Calculate sparsity
                nums = strategy  # strategy is already a list of numbers
                total_tokens += nums[0] * nums[1] * nums[
                    2]  # masked tokens for chosen strategy
                total_length += full_attn_strategy[0] * full_attn_strategy[
                    1] * full_attn_strategy[2]

                # Count strategy usage
                strategy_counts[str(strategy)] += 1

    overall_sparsity = 1 - total_tokens / total_length

    return best_mask_strategy, overall_sparsity, strategy_counts
fastvideo.attention.backends.abstract
Classes
fastvideo.attention.backends.abstract.AttentionBackend

Bases: ABC

Abstract class for attention backends.

fastvideo.attention.backends.abstract.AttentionImpl
AttentionImpl(num_heads: int, head_size: int, softmax_scale: float, causal: bool = False, num_kv_heads: int | None = None, prefix: str = '', **extra_impl_args)

Bases: ABC, Generic[T]

Source code in fastvideo/attention/backends/abstract.py
@abstractmethod
def __init__(
    self,
    num_heads: int,
    head_size: int,
    softmax_scale: float,
    causal: bool = False,
    num_kv_heads: int | None = None,
    prefix: str = "",
    **extra_impl_args,
) -> None:
    raise NotImplementedError
Functions
fastvideo.attention.backends.abstract.AttentionImpl.postprocess_output
postprocess_output(output: Tensor, attn_metadata: T) -> Tensor

Postprocess the output tensor after the attention operation.

Default implementation returns the tensor unchanged. Subclasses can override this to implement custom postprocessing like untiling, scaling, or other transformations.

Called BEFORE all_to_all for distributed attention

Parameters:

Name Type Description Default
output Tensor

The output tensor from the attention operation

required
attn_metadata T

Metadata for the attention operation

required

Returns:

Type Description
Tensor

Postprocessed output tensor

Source code in fastvideo/attention/backends/abstract.py
def postprocess_output(
    self,
    output: torch.Tensor,
    attn_metadata: T,
) -> torch.Tensor:
    """Postprocess the output tensor after the attention operation.

    Default implementation returns the tensor unchanged.
    Subclasses can override this to implement custom postprocessing
    like untiling, scaling, or other transformations.

    Called BEFORE all_to_all for distributed attention

    Args:
        output: The output tensor from the attention operation
        attn_metadata: Metadata for the attention operation

    Returns:
        Postprocessed output tensor
    """

    return output
fastvideo.attention.backends.abstract.AttentionImpl.preprocess_qkv
preprocess_qkv(qkv: Tensor, attn_metadata: T) -> Tensor

Preprocess QKV tensor before performing attention operation.

Default implementation returns the tensor unchanged. Subclasses can override this to implement custom preprocessing like reshaping, tiling, scaling, or other transformations.

Called AFTER all_to_all for distributed attention

Parameters:

Name Type Description Default
qkv Tensor

The query-key-value tensor

required
attn_metadata T

Metadata for the attention operation

required

Returns:

Type Description
Tensor

Processed QKV tensor

Source code in fastvideo/attention/backends/abstract.py
def preprocess_qkv(self, qkv: torch.Tensor,
                   attn_metadata: T) -> torch.Tensor:
    """Preprocess QKV tensor before performing attention operation.

    Default implementation returns the tensor unchanged.
    Subclasses can override this to implement custom preprocessing
    like reshaping, tiling, scaling, or other transformations.

    Called AFTER all_to_all for distributed attention

    Args:
        qkv: The query-key-value tensor
        attn_metadata: Metadata for the attention operation

    Returns:
        Processed QKV tensor
    """
    return qkv
fastvideo.attention.backends.abstract.AttentionMetadata dataclass
AttentionMetadata(current_timestep: int)

Attention metadata for prefill and decode batched together.

Functions
fastvideo.attention.backends.abstract.AttentionMetadata.asdict_zerocopy
asdict_zerocopy(skip_fields: set[str] | None = None) -> dict[str, Any]

Similar to dataclasses.asdict, but avoids deepcopying.

Source code in fastvideo/attention/backends/abstract.py
def asdict_zerocopy(self,
                    skip_fields: set[str] | None = None) -> dict[str, Any]:
    """Similar to dataclasses.asdict, but avoids deepcopying."""
    if skip_fields is None:
        skip_fields = set()
    # Note that if we add dataclasses as fields, they will need
    # similar handling.
    return {
        field.name: getattr(self, field.name)
        for field in fields(self) if field.name not in skip_fields
    }
fastvideo.attention.backends.abstract.AttentionMetadataBuilder
AttentionMetadataBuilder()

Bases: ABC, Generic[T]

Abstract class for attention metadata builders.

Create the builder, remember some configuration and parameters.

Source code in fastvideo/attention/backends/abstract.py
@abstractmethod
def __init__(self) -> None:
    """Create the builder, remember some configuration and parameters."""
    raise NotImplementedError
Functions
fastvideo.attention.backends.abstract.AttentionMetadataBuilder.build abstractmethod
build(**kwargs: dict[str, Any]) -> AttentionMetadata

Build attention metadata with on-device tensors.

Source code in fastvideo/attention/backends/abstract.py
@abstractmethod
def build(
    self,
    **kwargs: dict[str, Any],
) -> AttentionMetadata:
    """Build attention metadata with on-device tensors."""
    raise NotImplementedError
fastvideo.attention.backends.abstract.AttentionMetadataBuilder.prepare abstractmethod
prepare() -> None

Prepare for one batch.

Source code in fastvideo/attention/backends/abstract.py
@abstractmethod
def prepare(self) -> None:
    """Prepare for one batch."""
    raise NotImplementedError
fastvideo.attention.backends.video_sparse_attn
Classes
Functions
fastvideo.attention.backends.video_sparse_attn.construct_variable_block_sizes cached
construct_variable_block_sizes(dit_seq_shape: tuple[int, int, int], num_tiles: tuple[int, int, int], device: device) -> LongTensor

Compute the number of valid (non‑padded) tokens inside every (ts_t × ts_h × ts_w) tile after padding ‑‑ flattened in the order (t‑tile, h‑tile, w‑tile) that rearrange uses.

Returns

torch.LongTensor # shape: [∏ full_window_size]

Source code in fastvideo/attention/backends/video_sparse_attn.py
@functools.lru_cache(maxsize=10)
def construct_variable_block_sizes(
    dit_seq_shape: tuple[int, int, int],
    num_tiles: tuple[int, int, int],
    device: torch.device,
) -> torch.LongTensor:
    """
    Compute the number of valid (non‑padded) tokens inside every
    (ts_t × ts_h × ts_w) tile after padding ‑‑ flattened in the order
    (t‑tile, h‑tile, w‑tile) that `rearrange` uses.

    Returns
    -------
    torch.LongTensor  # shape: [∏ full_window_size]
    """
    # unpack
    t, h, w = dit_seq_shape
    ts_t, ts_h, ts_w = VSA_TILE_SIZE
    n_t, n_h, n_w = num_tiles

    def _sizes(dim_len: int, tile: int, n_tiles: int) -> torch.LongTensor:
        """Vector with the size of each tile along one dimension."""
        sizes = torch.full((n_tiles, ), tile, dtype=torch.int, device=device)
        # size of last (possibly partial) tile
        remainder = dim_len - (n_tiles - 1) * tile
        sizes[-1] = remainder if remainder > 0 else tile
        return sizes

    t_sizes = _sizes(t, ts_t, n_t)  # [n_t]
    h_sizes = _sizes(h, ts_h, n_h)  # [n_h]
    w_sizes = _sizes(w, ts_w, n_w)  # [n_w]

    # broadcast‑multiply to get voxels per tile, then flatten
    block_sizes = (
        t_sizes[:, None, None]  # [n_t, 1,   1]
        * h_sizes[None, :, None]  # [1,   n_h, 1]
        * w_sizes[None, None, :]  # [1,   1,   n_w]
    ).reshape(-1)  # [n_t * n_h * n_w]

    return block_sizes
fastvideo.attention.backends.vmoba
Classes
fastvideo.attention.backends.vmoba.VMOBAAttentionImpl
VMOBAAttentionImpl(num_heads, head_size, softmax_scale, causal=False, num_kv_heads=None, prefix='', **extra_impl_args)

Bases: AttentionImpl

Source code in fastvideo/attention/backends/vmoba.py
def __init__(self,
             num_heads,
             head_size,
             softmax_scale,
             causal=False,
             num_kv_heads=None,
             prefix="",
             **extra_impl_args) -> None:
    self.prefix = prefix
    self.layer_idx = self._get_layer_idx(prefix)
    from flash_attn.bert_padding import pad_input
    self.pad_input = pad_input
Functions
fastvideo.attention.backends.vmoba.VMOBAAttentionImpl.forward
forward(query: Tensor, key: Tensor, value: Tensor, attn_metadata: AttentionMetadata) -> Tensor

query: [B, L, H, D] key: [B, L, H, D] value: [B, L, H, D] attn_metadata: AttentionMetadata

Source code in fastvideo/attention/backends/vmoba.py
def forward(
    self,
    query: torch.Tensor,
    key: torch.Tensor,
    value: torch.Tensor,
    attn_metadata: AttentionMetadata,
) -> torch.Tensor:
    """
    query: [B, L, H, D]
    key:   [B, L, H, D]
    value: [B, L, H, D]
    attn_metadata: AttentionMetadata
    """
    batch_size, sequence_length, num_heads, head_dim = query.shape

    # select chunk type according to layer idx:
    loop_layer_num = attn_metadata.temporal_layer + attn_metadata.spatial_layer + attn_metadata.st_layer
    moba_layer = self.layer_idx - attn_metadata.first_full_layer
    if moba_layer % loop_layer_num < attn_metadata.temporal_layer:
        moba_chunk_size = attn_metadata.temporal_chunk_size
        moba_topk = attn_metadata.temporal_topk
    elif moba_layer % loop_layer_num < attn_metadata.temporal_layer + attn_metadata.spatial_layer:
        moba_chunk_size = attn_metadata.spatial_chunk_size
        moba_topk = attn_metadata.spatial_topk
    elif moba_layer % loop_layer_num < attn_metadata.temporal_layer + attn_metadata.spatial_layer + attn_metadata.st_layer:
        moba_chunk_size = attn_metadata.st_chunk_size
        moba_topk = attn_metadata.st_topk

    query, chunk_size = process_moba_input(query,
                                           attn_metadata.patch_resolution,
                                           moba_chunk_size)
    key, chunk_size = process_moba_input(key,
                                         attn_metadata.patch_resolution,
                                         moba_chunk_size)
    value, chunk_size = process_moba_input(value,
                                           attn_metadata.patch_resolution,
                                           moba_chunk_size)
    max_seqlen = query.shape[1]
    indices_q = torch.arange(0,
                             query.shape[0] * query.shape[1],
                             device=query.device)
    cu_seqlens = torch.arange(0,
                              query.shape[0] * query.shape[1] + 1,
                              query.shape[1],
                              dtype=torch.int32,
                              device=query.device)
    query = rearrange(query, "b s ... -> (b s) ...")
    key = rearrange(key, "b s ... -> (b s) ...")
    value = rearrange(value, "b s ... -> (b s) ...")

    # current_timestep=attn_metadata.current_timestep
    hidden_states = moba_attn_varlen(
        query,
        key,
        value,
        cu_seqlens=cu_seqlens,
        max_seqlen=max_seqlen,
        moba_chunk_size=chunk_size,
        moba_topk=moba_topk,
        select_mode=attn_metadata.moba_select_mode,
        simsum_threshold=attn_metadata.moba_threshold,
        threshold_type=attn_metadata.moba_threshold_type,
    )
    hidden_states = self.pad_input(hidden_states, indices_q, batch_size,
                                   sequence_length)
    hidden_states = process_moba_output(hidden_states,
                                        attn_metadata.patch_resolution,
                                        moba_chunk_size)

    return hidden_states
Functions

fastvideo.attention.layer

Classes

fastvideo.attention.layer.DistributedAttention
DistributedAttention(num_heads: int, head_size: int, num_kv_heads: int | None = None, softmax_scale: float | None = None, causal: bool = False, supported_attention_backends: tuple[AttentionBackendEnum, ...] | None = None, prefix: str = '', **extra_impl_args)

Bases: Module

Distributed attention layer.

Source code in fastvideo/attention/layer.py
def __init__(self,
             num_heads: int,
             head_size: int,
             num_kv_heads: int | None = None,
             softmax_scale: float | None = None,
             causal: bool = False,
             supported_attention_backends: tuple[AttentionBackendEnum, ...]
             | None = None,
             prefix: str = "",
             **extra_impl_args) -> None:
    super().__init__()
    if softmax_scale is None:
        self.softmax_scale = head_size**-0.5
    else:
        self.softmax_scale = softmax_scale

    if num_kv_heads is None:
        num_kv_heads = num_heads

    dtype = get_compute_dtype()
    attn_backend = get_attn_backend(
        head_size,
        dtype,
        supported_attention_backends=supported_attention_backends)
    impl_cls = attn_backend.get_impl_cls()
    self.attn_impl = impl_cls(num_heads=num_heads,
                              head_size=head_size,
                              causal=causal,
                              softmax_scale=self.softmax_scale,
                              num_kv_heads=num_kv_heads,
                              prefix=f"{prefix}.impl",
                              **extra_impl_args)
    self.num_heads = num_heads
    self.head_size = head_size
    self.num_kv_heads = num_kv_heads
    self.backend = backend_name_to_enum(attn_backend.get_name())
    self.dtype = dtype
Functions
fastvideo.attention.layer.DistributedAttention.forward
forward(q: Tensor, k: Tensor, v: Tensor, replicated_q: Tensor | None = None, replicated_k: Tensor | None = None, replicated_v: Tensor | None = None) -> tuple[Tensor, Tensor | None]

Forward pass for distributed attention.

Parameters:

Name Type Description Default
q Tensor

Query tensor [batch_size, seq_len, num_heads, head_dim]

required
k Tensor

Key tensor [batch_size, seq_len, num_heads, head_dim]

required
v Tensor

Value tensor [batch_size, seq_len, num_heads, head_dim]

required
replicated_q Optional[Tensor]

Replicated query tensor, typically for text tokens

None
replicated_k Optional[Tensor]

Replicated key tensor

None
replicated_v Optional[Tensor]

Replicated value tensor

None

Returns:

Type Description
tuple[Tensor, Tensor | None]

Tuple[torch.Tensor, Optional[torch.Tensor]]: A tuple containing: - o (torch.Tensor): Output tensor after attention for the main sequence - replicated_o (Optional[torch.Tensor]): Output tensor for replicated tokens, if provided

Source code in fastvideo/attention/layer.py
@torch.compiler.disable
def forward(
    self,
    q: torch.Tensor,
    k: torch.Tensor,
    v: torch.Tensor,
    replicated_q: torch.Tensor | None = None,
    replicated_k: torch.Tensor | None = None,
    replicated_v: torch.Tensor | None = None,
) -> tuple[torch.Tensor, torch.Tensor | None]:
    """Forward pass for distributed attention.

    Args:
        q (torch.Tensor): Query tensor [batch_size, seq_len, num_heads, head_dim]
        k (torch.Tensor): Key tensor [batch_size, seq_len, num_heads, head_dim]
        v (torch.Tensor): Value tensor [batch_size, seq_len, num_heads, head_dim]
        replicated_q (Optional[torch.Tensor]): Replicated query tensor, typically for text tokens
        replicated_k (Optional[torch.Tensor]): Replicated key tensor
        replicated_v (Optional[torch.Tensor]): Replicated value tensor

    Returns:
        Tuple[torch.Tensor, Optional[torch.Tensor]]: A tuple containing:
            - o (torch.Tensor): Output tensor after attention for the main sequence
            - replicated_o (Optional[torch.Tensor]): Output tensor for replicated tokens, if provided
    """
    # Check input shapes
    assert q.dim() == 4 and k.dim() == 4 and v.dim(
    ) == 4, "Expected 4D tensors"
    batch_size, seq_len, num_heads, head_dim = q.shape
    local_rank = get_sp_parallel_rank()
    world_size = get_sp_world_size()

    forward_context: ForwardContext = get_forward_context()
    ctx_attn_metadata = forward_context.attn_metadata

    # Stack QKV
    qkv = torch.cat([q, k, v], dim=0)  # [3, seq_len, num_heads, head_dim]

    # Redistribute heads across sequence dimension
    qkv = sequence_model_parallel_all_to_all_4D(qkv,
                                                scatter_dim=2,
                                                gather_dim=1)
    # Apply backend-specific preprocess_qkv
    qkv = self.attn_impl.preprocess_qkv(qkv, ctx_attn_metadata)

    # Concatenate with replicated QKV if provided
    if replicated_q is not None:
        assert replicated_k is not None and replicated_v is not None
        replicated_qkv = torch.cat(
            [replicated_q, replicated_k, replicated_v],
            dim=0)  # [3, seq_len, num_heads, head_dim]
        heads_per_rank = num_heads // world_size
        replicated_qkv = replicated_qkv[:, :, local_rank *
                                        heads_per_rank:(local_rank + 1) *
                                        heads_per_rank]
        qkv = torch.cat([qkv, replicated_qkv], dim=1)

    q, k, v = qkv.chunk(3, dim=0)

    output = self.attn_impl.forward(q, k, v, ctx_attn_metadata)

    # Redistribute back if using sequence parallelism
    replicated_output = None
    if replicated_q is not None:
        replicated_output = output[:, seq_len * world_size:]
        output = output[:, :seq_len * world_size]
        # TODO: make this asynchronous
        replicated_output = sequence_model_parallel_all_gather(
            replicated_output.contiguous(), dim=2)
    # Apply backend-specific postprocess_output
    output = self.attn_impl.postprocess_output(output, ctx_attn_metadata)

    output = sequence_model_parallel_all_to_all_4D(output,
                                                   scatter_dim=1,
                                                   gather_dim=2)
    return output, replicated_output
fastvideo.attention.layer.DistributedAttention_VSA
DistributedAttention_VSA(num_heads: int, head_size: int, num_kv_heads: int | None = None, softmax_scale: float | None = None, causal: bool = False, supported_attention_backends: tuple[AttentionBackendEnum, ...] | None = None, prefix: str = '', **extra_impl_args)

Bases: DistributedAttention

Distributed attention layer with VSA support.

Source code in fastvideo/attention/layer.py
def __init__(self,
             num_heads: int,
             head_size: int,
             num_kv_heads: int | None = None,
             softmax_scale: float | None = None,
             causal: bool = False,
             supported_attention_backends: tuple[AttentionBackendEnum, ...]
             | None = None,
             prefix: str = "",
             **extra_impl_args) -> None:
    super().__init__()
    if softmax_scale is None:
        self.softmax_scale = head_size**-0.5
    else:
        self.softmax_scale = softmax_scale

    if num_kv_heads is None:
        num_kv_heads = num_heads

    dtype = get_compute_dtype()
    attn_backend = get_attn_backend(
        head_size,
        dtype,
        supported_attention_backends=supported_attention_backends)
    impl_cls = attn_backend.get_impl_cls()
    self.attn_impl = impl_cls(num_heads=num_heads,
                              head_size=head_size,
                              causal=causal,
                              softmax_scale=self.softmax_scale,
                              num_kv_heads=num_kv_heads,
                              prefix=f"{prefix}.impl",
                              **extra_impl_args)
    self.num_heads = num_heads
    self.head_size = head_size
    self.num_kv_heads = num_kv_heads
    self.backend = backend_name_to_enum(attn_backend.get_name())
    self.dtype = dtype
Functions
fastvideo.attention.layer.DistributedAttention_VSA.forward
forward(q: Tensor, k: Tensor, v: Tensor, replicated_q: Tensor | None = None, replicated_k: Tensor | None = None, replicated_v: Tensor | None = None, gate_compress: Tensor | None = None) -> tuple[Tensor, Tensor | None]

Forward pass for distributed attention.

Parameters:

Name Type Description Default
q Tensor

Query tensor [batch_size, seq_len, num_heads, head_dim]

required
k Tensor

Key tensor [batch_size, seq_len, num_heads, head_dim]

required
v Tensor

Value tensor [batch_size, seq_len, num_heads, head_dim]

required
gate_compress Tensor

Gate compress tensor [batch_size, seq_len, num_heads, head_dim]

None
replicated_q Optional[Tensor]

Replicated query tensor, typically for text tokens

None
replicated_k Optional[Tensor]

Replicated key tensor

None
replicated_v Optional[Tensor]

Replicated value tensor

None

Returns:

Type Description
tuple[Tensor, Tensor | None]

Tuple[torch.Tensor, Optional[torch.Tensor]]: A tuple containing: - o (torch.Tensor): Output tensor after attention for the main sequence - replicated_o (Optional[torch.Tensor]): Output tensor for replicated tokens, if provided

Source code in fastvideo/attention/layer.py
@torch.compiler.disable
def forward(
    self,
    q: torch.Tensor,
    k: torch.Tensor,
    v: torch.Tensor,
    replicated_q: torch.Tensor | None = None,
    replicated_k: torch.Tensor | None = None,
    replicated_v: torch.Tensor | None = None,
    gate_compress: torch.Tensor | None = None,
) -> tuple[torch.Tensor, torch.Tensor | None]:
    """Forward pass for distributed attention.

    Args:
        q (torch.Tensor): Query tensor [batch_size, seq_len, num_heads, head_dim]
        k (torch.Tensor): Key tensor [batch_size, seq_len, num_heads, head_dim]
        v (torch.Tensor): Value tensor [batch_size, seq_len, num_heads, head_dim]
        gate_compress (torch.Tensor): Gate compress tensor [batch_size, seq_len, num_heads, head_dim]
        replicated_q (Optional[torch.Tensor]): Replicated query tensor, typically for text tokens
        replicated_k (Optional[torch.Tensor]): Replicated key tensor
        replicated_v (Optional[torch.Tensor]): Replicated value tensor

    Returns:
        Tuple[torch.Tensor, Optional[torch.Tensor]]: A tuple containing:
            - o (torch.Tensor): Output tensor after attention for the main sequence
            - replicated_o (Optional[torch.Tensor]): Output tensor for replicated tokens, if provided
    """
    # Check text tokens are not supported for VSA now
    assert replicated_q is None and replicated_k is None and replicated_v is None, "Replicated QKV is not supported for VSA now"
    # Check input shapes
    assert q.dim() == 4 and k.dim() == 4 and v.dim(
    ) == 4, "Expected 4D tensors"

    forward_context: ForwardContext = get_forward_context()
    ctx_attn_metadata = forward_context.attn_metadata

    # Stack QKV
    qkvg = torch.cat([q, k, v, gate_compress],
                     dim=0)  # [3, seq_len, num_heads, head_dim]

    # Redistribute heads across sequence dimension
    qkvg = sequence_model_parallel_all_to_all_4D(qkvg,
                                                 scatter_dim=2,
                                                 gather_dim=1)

    qkvg = self.attn_impl.preprocess_qkv(qkvg, ctx_attn_metadata)

    q, k, v, gate_compress = qkvg.chunk(4, dim=0)
    output = self.attn_impl.forward(
        q, k, v, gate_compress, ctx_attn_metadata)  # type: ignore[call-arg]

    # Redistribute back if using sequence parallelism
    replicated_output = None

    # Apply backend-specific postprocess_output
    output = self.attn_impl.postprocess_output(output, ctx_attn_metadata)

    output = sequence_model_parallel_all_to_all_4D(output,
                                                   scatter_dim=1,
                                                   gather_dim=2)
    return output, replicated_output
fastvideo.attention.layer.LocalAttention
LocalAttention(num_heads: int, head_size: int, num_kv_heads: int | None = None, softmax_scale: float | None = None, causal: bool = False, supported_attention_backends: tuple[AttentionBackendEnum, ...] | None = None, **extra_impl_args)

Bases: Module

Attention layer.

Source code in fastvideo/attention/layer.py
def __init__(self,
             num_heads: int,
             head_size: int,
             num_kv_heads: int | None = None,
             softmax_scale: float | None = None,
             causal: bool = False,
             supported_attention_backends: tuple[AttentionBackendEnum, ...]
             | None = None,
             **extra_impl_args) -> None:
    super().__init__()
    if softmax_scale is None:
        self.softmax_scale = head_size**-0.5
    else:
        self.softmax_scale = softmax_scale
    if num_kv_heads is None:
        num_kv_heads = num_heads

    dtype = get_compute_dtype()
    attn_backend = get_attn_backend(
        head_size,
        dtype,
        supported_attention_backends=supported_attention_backends)
    impl_cls = attn_backend.get_impl_cls()
    self.attn_impl = impl_cls(num_heads=num_heads,
                              head_size=head_size,
                              softmax_scale=self.softmax_scale,
                              num_kv_heads=num_kv_heads,
                              causal=causal,
                              **extra_impl_args)
    self.num_heads = num_heads
    self.head_size = head_size
    self.num_kv_heads = num_kv_heads
    self.backend = backend_name_to_enum(attn_backend.get_name())
    self.dtype = dtype
Functions
fastvideo.attention.layer.LocalAttention.forward
forward(q: Tensor, k: Tensor, v: Tensor) -> Tensor

Apply local attention between query, key and value tensors.

Parameters:

Name Type Description Default
q Tensor

Query tensor of shape [batch_size, seq_len, num_heads, head_dim]

required
k Tensor

Key tensor of shape [batch_size, seq_len, num_heads, head_dim]

required
v Tensor

Value tensor of shape [batch_size, seq_len, num_heads, head_dim]

required

Returns:

Type Description
Tensor

torch.Tensor: Output tensor after local attention

Source code in fastvideo/attention/layer.py
def forward(
    self,
    q: torch.Tensor,
    k: torch.Tensor,
    v: torch.Tensor,
) -> torch.Tensor:
    """
    Apply local attention between query, key and value tensors.

    Args:
        q (torch.Tensor): Query tensor of shape [batch_size, seq_len, num_heads, head_dim]
        k (torch.Tensor): Key tensor of shape [batch_size, seq_len, num_heads, head_dim] 
        v (torch.Tensor): Value tensor of shape [batch_size, seq_len, num_heads, head_dim]

    Returns:
        torch.Tensor: Output tensor after local attention
    """
    # Check input shapes
    assert q.dim() == 4 and k.dim() == 4 and v.dim(
    ) == 4, "Expected 4D tensors"

    forward_context: ForwardContext = get_forward_context()
    ctx_attn_metadata = forward_context.attn_metadata

    output = self.attn_impl.forward(q, k, v, ctx_attn_metadata)
    return output

Functions

fastvideo.attention.selector

Classes

Functions

fastvideo.attention.selector.backend_name_to_enum
backend_name_to_enum(backend_name: str) -> AttentionBackendEnum | None

Convert a string backend name to a _Backend enum value.

Returns: * _Backend: enum value if backend_name is a valid in-tree type * None: otherwise it's an invalid in-tree type or an out-of-tree platform is loaded.

Source code in fastvideo/attention/selector.py
def backend_name_to_enum(backend_name: str) -> AttentionBackendEnum | None:
    """
    Convert a string backend name to a _Backend enum value.

    Returns:
    * _Backend: enum value if backend_name is a valid in-tree type
    * None: otherwise it's an invalid in-tree type or an out-of-tree platform is
            loaded.
    """
    assert backend_name is not None
    return AttentionBackendEnum[backend_name] if backend_name in AttentionBackendEnum.__members__ else \
          None
fastvideo.attention.selector.get_env_variable_attn_backend
get_env_variable_attn_backend() -> AttentionBackendEnum | None

Get the backend override specified by the FastVideo attention backend environment variable, if one is specified.

Returns:

  • _Backend enum value if an override is specified
  • None otherwise
Source code in fastvideo/attention/selector.py
def get_env_variable_attn_backend() -> AttentionBackendEnum | None:
    '''
    Get the backend override specified by the FastVideo attention
    backend environment variable, if one is specified.

    Returns:

    * _Backend enum value if an override is specified
    * None otherwise
    '''
    backend_name = os.environ.get(STR_BACKEND_ENV_VAR)
    return (None
            if backend_name is None else backend_name_to_enum(backend_name))
fastvideo.attention.selector.get_global_forced_attn_backend
get_global_forced_attn_backend() -> AttentionBackendEnum | None

Get the currently-forced choice of attention backend, or None if auto-selection is currently enabled.

Source code in fastvideo/attention/selector.py
def get_global_forced_attn_backend() -> AttentionBackendEnum | None:
    '''
    Get the currently-forced choice of attention backend,
    or None if auto-selection is currently enabled.
    '''
    return forced_attn_backend
fastvideo.attention.selector.global_force_attn_backend
global_force_attn_backend(attn_backend: AttentionBackendEnum | None) -> None

Force all attention operations to use a specified backend.

Passing None for the argument re-enables automatic backend selection.,

Arguments:

  • attn_backend: backend selection (None to revert to auto)
Source code in fastvideo/attention/selector.py
def global_force_attn_backend(
        attn_backend: AttentionBackendEnum | None) -> None:
    '''
    Force all attention operations to use a specified backend.

    Passing `None` for the argument re-enables automatic
    backend selection.,

    Arguments:

    * attn_backend: backend selection (None to revert to auto)
    '''
    global forced_attn_backend
    forced_attn_backend = attn_backend
fastvideo.attention.selector.global_force_attn_backend_context_manager
global_force_attn_backend_context_manager(attn_backend: AttentionBackendEnum) -> Generator[None, None, None]

Globally force a FastVideo attention backend override within a context manager, reverting the global attention backend override to its prior state upon exiting the context manager.

Arguments:

  • attn_backend: attention backend to force

Returns:

  • Generator
Source code in fastvideo/attention/selector.py
@contextmanager
def global_force_attn_backend_context_manager(
        attn_backend: AttentionBackendEnum) -> Generator[None, None, None]:
    '''
    Globally force a FastVideo attention backend override within a
    context manager, reverting the global attention backend
    override to its prior state upon exiting the context
    manager.

    Arguments:

    * attn_backend: attention backend to force

    Returns:

    * Generator
    '''

    # Save the current state of the global backend override (if any)
    original_value = get_global_forced_attn_backend()

    # Globally force the new backend override
    global_force_attn_backend(attn_backend)

    # Yield control back to the enclosed code block
    try:
        yield
    finally:
        # Revert the original global backend override, if any
        global_force_attn_backend(original_value)