Skip to content

backends

Modules

fastvideo.attention.backends.STA_configuration

Functions

fastvideo.attention.backends.STA_configuration.average_head_losses
average_head_losses(results: list[dict[str, Any]], selected_masks: list[list[int]]) -> dict[str, dict[str, ndarray]]

Average losses across all prompts for each mask strategy.

Source code in fastvideo/attention/backends/STA_configuration.py
def average_head_losses(
        results: list[dict[str, Any]],
        selected_masks: list[list[int]]) -> dict[str, dict[str, np.ndarray]]:
    """Average losses across all prompts for each mask strategy."""
    # Initialize a dictionary to store the averaged results
    averaged_losses: dict[str, dict[str, np.ndarray]] = {}
    loss_type = 'L2_loss'
    # Get all loss types (e.g., 'L2_loss')
    averaged_losses[loss_type] = {}

    for mask in selected_masks:
        mask_str = str(mask)
        data_shape = np.array(results[0][loss_type][mask_str]).shape
        accumulated_data = np.zeros(data_shape)

        # Sum across all prompts
        for prompt_result in results:
            accumulated_data += np.array(prompt_result[loss_type][mask_str])

        # Average by dividing by number of prompts
        averaged_data = accumulated_data / len(results)
        averaged_losses[loss_type][mask_str] = averaged_data

    return averaged_losses
fastvideo.attention.backends.STA_configuration.configure_sta
configure_sta(mode: str = 'STA_searching', layer_num: int = 40, time_step_num: int = 50, head_num: int = 40, **kwargs) -> list[list[list[Any]]]

Configure Sliding Tile Attention (STA) parameters based on the specified mode.

Parameters:

mode : str The STA mode to use. Options are: - 'STA_searching': Generate a set of mask candidates for initial search - 'STA_tuning': Select best mask strategy based on previously saved results - 'STA_inference': Load and use a previously tuned mask strategy layer_num: int, number of layers time_step_num: int, number of timesteps head_num: int, number of heads

**kwargs : dict Mode-specific parameters:

For 'STA_searching':
- mask_candidates: list of str, optional, mask candidates to use
- mask_selected: list of int, optional, indices of selected masks

For 'STA_tuning':
- mask_search_files_path: str, required, path to mask search results
- mask_candidates: list of str, optional, mask candidates to use
- mask_selected: list of int, optional, indices of selected masks
- skip_time_steps: int, optional, number of time steps to use full attention (default 12)
- save_dir: str, optional, directory to save mask strategy (default "mask_candidates")

For 'STA_inference':
- load_path: str, optional, path to load mask strategy (default "mask_candidates/mask_strategy.json")
Source code in fastvideo/attention/backends/STA_configuration.py
def configure_sta(mode: str = 'STA_searching',
                  layer_num: int = 40,
                  time_step_num: int = 50,
                  head_num: int = 40,
                  **kwargs) -> list[list[list[Any]]]:
    """
    Configure Sliding Tile Attention (STA) parameters based on the specified mode.

    Parameters:
    ----------
    mode : str
        The STA mode to use. Options are:
        - 'STA_searching': Generate a set of mask candidates for initial search
        - 'STA_tuning': Select best mask strategy based on previously saved results
        - 'STA_inference': Load and use a previously tuned mask strategy
    layer_num: int, number of layers
    time_step_num: int, number of timesteps
    head_num: int, number of heads

    **kwargs : dict
        Mode-specific parameters:

        For 'STA_searching':
        - mask_candidates: list of str, optional, mask candidates to use
        - mask_selected: list of int, optional, indices of selected masks

        For 'STA_tuning':
        - mask_search_files_path: str, required, path to mask search results
        - mask_candidates: list of str, optional, mask candidates to use
        - mask_selected: list of int, optional, indices of selected masks
        - skip_time_steps: int, optional, number of time steps to use full attention (default 12)
        - save_dir: str, optional, directory to save mask strategy (default "mask_candidates")

        For 'STA_inference':
        - load_path: str, optional, path to load mask strategy (default "mask_candidates/mask_strategy.json")
    """
    valid_modes = [
        'STA_searching', 'STA_tuning', 'STA_inference', 'STA_tuning_cfg'
    ]
    if mode not in valid_modes:
        raise ValueError(f"Mode must be one of {valid_modes}, got {mode}")

    if mode == 'STA_searching':
        # Get parameters with defaults
        mask_candidates: list[str] | None = kwargs.get('mask_candidates')
        if mask_candidates is None:
            raise ValueError(
                "mask_candidates is required for STA_searching mode")
        mask_selected: list[int] = kwargs.get('mask_selected',
                                              list(range(len(mask_candidates))))

        # Parse selected masks
        selected_masks: list[list[int]] = []
        for index in mask_selected:
            mask = mask_candidates[index]
            masks_list = [int(x) for x in mask.split(',')]
            selected_masks.append(masks_list)

        # Create 3D mask structure with fixed dimensions (t=50, l=60)
        masks_3d: list[list[list[list[int]]]] = []
        for i in range(time_step_num):  # Fixed t dimension = 50
            row = []
            for j in range(layer_num):  # Fixed l dimension = 60
                row.append(selected_masks)  # Add all masks at each position
            masks_3d.append(row)

        return masks_3d

    elif mode == 'STA_tuning':
        # Get required parameters
        mask_search_files_path: str | None = kwargs.get(
            'mask_search_files_path')
        if not mask_search_files_path:
            raise ValueError(
                "mask_search_files_path is required for STA_tuning mode")

        # Get optional parameters with defaults
        mask_candidates_tuning: list[str] | None = kwargs.get('mask_candidates')
        if mask_candidates_tuning is None:
            raise ValueError("mask_candidates is required for STA_tuning mode")
        mask_selected_tuning: list[int] = kwargs.get(
            'mask_selected', list(range(len(mask_candidates_tuning))))
        skip_time_steps_tuning: int | None = kwargs.get('skip_time_steps')
        save_dir_tuning: str | None = kwargs.get('save_dir', "mask_candidates")

        # Parse selected masks
        selected_masks_tuning: list[list[int]] = []
        for index in mask_selected_tuning:
            mask = mask_candidates_tuning[index]
            masks_list = [int(x) for x in mask.split(',')]
            selected_masks_tuning.append(masks_list)

        # Read JSON results
        results = read_specific_json_files(mask_search_files_path)
        averaged_results = average_head_losses(results, selected_masks_tuning)

        # Add full attention mask for specific cases
        full_attention_mask_tuning: list[int] | None = kwargs.get(
            'full_attention_mask')
        if full_attention_mask_tuning is not None:
            selected_masks_tuning.append(full_attention_mask_tuning)

        # Select best mask strategy
        timesteps_tuning: int = kwargs.get('timesteps', time_step_num)
        if skip_time_steps_tuning is None:
            skip_time_steps_tuning = 12
        mask_strategy, sparsity, strategy_counts = select_best_mask_strategy(
            averaged_results, selected_masks_tuning, skip_time_steps_tuning,
            timesteps_tuning, head_num)

        # Save mask strategy
        if save_dir_tuning is not None:
            os.makedirs(save_dir_tuning, exist_ok=True)
            file_path = os.path.join(
                save_dir_tuning,
                f'mask_strategy_s{skip_time_steps_tuning}.json')
            with open(file_path, 'w') as f:
                json.dump(mask_strategy, f, indent=4)
            print(f"Successfully saved mask_strategy to {file_path}")

        # Print sparsity and strategy counts for information
        print(f"Overall sparsity: {sparsity:.4f}")
        print("\nStrategy usage counts:")
        total_heads = time_step_num * layer_num * head_num  # Fixed dimensions
        for strategy, count in strategy_counts.items():
            print(
                f"Strategy {strategy}: {count} heads ({count/total_heads*100:.2f}%)"
            )

        # Convert dictionary to 3D list with fixed dimensions
        mask_strategy_3d = dict_to_3d_list(mask_strategy,
                                           t_max=time_step_num,
                                           l_max=layer_num,
                                           h_max=head_num)

        return mask_strategy_3d
    elif mode == 'STA_tuning_cfg':
        # Get required parameters for both positive and negative paths
        mask_search_files_path_pos: str | None = kwargs.get(
            'mask_search_files_path_pos')
        mask_search_files_path_neg: str | None = kwargs.get(
            'mask_search_files_path_neg')
        save_dir_cfg: str | None = kwargs.get('save_dir')

        if not mask_search_files_path_pos or not mask_search_files_path_neg or not save_dir_cfg:
            raise ValueError(
                "mask_search_files_path_pos, mask_search_files_path_neg, and save_dir are required for STA_tuning_cfg mode"
            )

        # Get optional parameters with defaults
        mask_candidates_cfg: list[str] | None = kwargs.get('mask_candidates')
        if mask_candidates_cfg is None:
            raise ValueError(
                "mask_candidates is required for STA_tuning_cfg mode")
        mask_selected_cfg: list[int] = kwargs.get(
            'mask_selected', list(range(len(mask_candidates_cfg))))
        skip_time_steps_cfg: int | None = kwargs.get('skip_time_steps')

        # Parse selected masks
        selected_masks_cfg: list[list[int]] = []
        for index in mask_selected_cfg:
            mask = mask_candidates_cfg[index]
            masks_list = [int(x) for x in mask.split(',')]
            selected_masks_cfg.append(masks_list)

        # Read JSON results for both positive and negative paths
        pos_results = read_specific_json_files(mask_search_files_path_pos)
        neg_results = read_specific_json_files(mask_search_files_path_neg)
        # Combine positive and negative results into one list
        combined_results = pos_results + neg_results

        # Average the combined results
        averaged_results = average_head_losses(combined_results,
                                               selected_masks_cfg)

        # Add full attention mask for specific cases
        full_attention_mask_cfg: list[int] | None = kwargs.get(
            'full_attention_mask')
        if full_attention_mask_cfg is not None:
            selected_masks_cfg.append(full_attention_mask_cfg)

        timesteps_cfg: int = kwargs.get('timesteps', time_step_num)
        if skip_time_steps_cfg is None:
            skip_time_steps_cfg = 12
        # Select best mask strategy using combined results
        mask_strategy, sparsity, strategy_counts = select_best_mask_strategy(
            averaged_results, selected_masks_cfg, skip_time_steps_cfg,
            timesteps_cfg, head_num)

        # Save mask strategy
        os.makedirs(save_dir_cfg, exist_ok=True)
        file_path = os.path.join(save_dir_cfg,
                                 f'mask_strategy_s{skip_time_steps_cfg}.json')
        with open(file_path, 'w') as f:
            json.dump(mask_strategy, f, indent=4)
        print(f"Successfully saved mask_strategy to {file_path}")

        # Print sparsity and strategy counts for information
        print(f"Overall sparsity: {sparsity:.4f}")
        print("\nStrategy usage counts:")
        total_heads = time_step_num * layer_num * head_num  # Fixed dimensions
        for strategy, count in strategy_counts.items():
            print(
                f"Strategy {strategy}: {count} heads ({count/total_heads*100:.2f}%)"
            )

        # Convert dictionary to 3D list with fixed dimensions
        mask_strategy_3d = dict_to_3d_list(mask_strategy,
                                           t_max=time_step_num,
                                           l_max=layer_num,
                                           h_max=head_num)

        return mask_strategy_3d

    else:  # STA_inference
        # Get parameters with defaults
        load_path: str | None = kwargs.get(
            'load_path', "mask_candidates/mask_strategy.json")
        if load_path is None:
            raise ValueError("load_path is required for STA_inference mode")

        # Load previously saved mask strategy
        with open(load_path) as f:
            mask_strategy = json.load(f)

        # Convert dictionary to 3D list with fixed dimensions
        mask_strategy_3d = dict_to_3d_list(mask_strategy,
                                           t_max=time_step_num,
                                           l_max=layer_num,
                                           h_max=head_num)

        return mask_strategy_3d
fastvideo.attention.backends.STA_configuration.read_specific_json_files
read_specific_json_files(folder_path: str) -> list[dict[str, Any]]

Read and parse JSON files containing mask search results.

Source code in fastvideo/attention/backends/STA_configuration.py
def read_specific_json_files(folder_path: str) -> list[dict[str, Any]]:
    """Read and parse JSON files containing mask search results."""
    json_contents: list[dict[str, Any]] = []

    # List files only in the current directory (no walk)
    files = os.listdir(folder_path)
    # Filter files
    matching_files = [f for f in files if 'mask' in f and f.endswith('.json')]
    print(f"Found {len(matching_files)} matching files: {matching_files}")

    for file_name in matching_files:
        file_path = os.path.join(folder_path, file_name)
        with open(file_path) as file:
            data = json.load(file)
            json_contents.append(data)

    return json_contents
fastvideo.attention.backends.STA_configuration.select_best_mask_strategy
select_best_mask_strategy(averaged_results: dict[str, dict[str, ndarray]], selected_masks: list[list[int]], skip_time_steps: int = 12, timesteps: int = 50, head_num: int = 40) -> tuple[dict[str, list[int]], float, dict[str, int]]

Select the best mask strategy for each head based on loss minimization.

Source code in fastvideo/attention/backends/STA_configuration.py
def select_best_mask_strategy(
        averaged_results: dict[str, dict[str, np.ndarray]],
        selected_masks: list[list[int]],
        skip_time_steps: int = 12,
        timesteps: int = 50,
        head_num: int = 40
) -> tuple[dict[str, list[int]], float, dict[str, int]]:
    """Select the best mask strategy for each head based on loss minimization."""
    best_mask_strategy: dict[str, list[int]] = {}
    loss_type = 'L2_loss'
    # Get the shape of time steps and layers
    layers = len(averaged_results[loss_type][str(selected_masks[0])][0])

    # Counter for sparsity calculation
    total_tokens = 0  # total number of masked tokens
    total_length = 0  # total sequence length

    strategy_counts: dict[str, int] = {
        str(strategy): 0
        for strategy in selected_masks
    }
    full_attn_strategy = selected_masks[-1]  # Last strategy is full attention
    print(f"Strategy {full_attn_strategy}, skip first {skip_time_steps} steps ")

    for t in range(timesteps):
        for layer_idx in range(layers):
            for h in range(head_num):
                if t < skip_time_steps:  # First steps use full attention
                    strategy = full_attn_strategy
                else:
                    # Get losses for this head across all strategies
                    head_losses = []
                    for strategy in selected_masks[:
                                                   -1]:  # Exclude full attention
                        head_losses.append(averaged_results[loss_type][str(
                            strategy)][t][layer_idx][h])

                    # Find which strategy gives minimum loss
                    best_strategy_idx = np.argmin(head_losses)
                    strategy = selected_masks[best_strategy_idx]

                best_mask_strategy[f'{t}_{layer_idx}_{h}'] = strategy

                # Calculate sparsity
                nums = strategy  # strategy is already a list of numbers
                total_tokens += nums[0] * nums[1] * nums[
                    2]  # masked tokens for chosen strategy
                total_length += full_attn_strategy[0] * full_attn_strategy[
                    1] * full_attn_strategy[2]

                # Count strategy usage
                strategy_counts[str(strategy)] += 1

    overall_sparsity = 1 - total_tokens / total_length

    return best_mask_strategy, overall_sparsity, strategy_counts

fastvideo.attention.backends.abstract

Classes

fastvideo.attention.backends.abstract.AttentionBackend

Bases: ABC

Abstract class for attention backends.

fastvideo.attention.backends.abstract.AttentionImpl
AttentionImpl(num_heads: int, head_size: int, softmax_scale: float, causal: bool = False, num_kv_heads: int | None = None, prefix: str = '', **extra_impl_args)

Bases: ABC, Generic[T]

Source code in fastvideo/attention/backends/abstract.py
@abstractmethod
def __init__(
    self,
    num_heads: int,
    head_size: int,
    softmax_scale: float,
    causal: bool = False,
    num_kv_heads: int | None = None,
    prefix: str = "",
    **extra_impl_args,
) -> None:
    raise NotImplementedError
Functions
fastvideo.attention.backends.abstract.AttentionImpl.postprocess_output
postprocess_output(output: Tensor, attn_metadata: T) -> Tensor

Postprocess the output tensor after the attention operation.

Default implementation returns the tensor unchanged. Subclasses can override this to implement custom postprocessing like untiling, scaling, or other transformations.

Called BEFORE all_to_all for distributed attention

Parameters:

Name Type Description Default
output Tensor

The output tensor from the attention operation

required
attn_metadata T

Metadata for the attention operation

required

Returns:

Type Description
Tensor

Postprocessed output tensor

Source code in fastvideo/attention/backends/abstract.py
def postprocess_output(
    self,
    output: torch.Tensor,
    attn_metadata: T,
) -> torch.Tensor:
    """Postprocess the output tensor after the attention operation.

    Default implementation returns the tensor unchanged.
    Subclasses can override this to implement custom postprocessing
    like untiling, scaling, or other transformations.

    Called BEFORE all_to_all for distributed attention

    Args:
        output: The output tensor from the attention operation
        attn_metadata: Metadata for the attention operation

    Returns:
        Postprocessed output tensor
    """

    return output
fastvideo.attention.backends.abstract.AttentionImpl.preprocess_qkv
preprocess_qkv(qkv: Tensor, attn_metadata: T) -> Tensor

Preprocess QKV tensor before performing attention operation.

Default implementation returns the tensor unchanged. Subclasses can override this to implement custom preprocessing like reshaping, tiling, scaling, or other transformations.

Called AFTER all_to_all for distributed attention

Parameters:

Name Type Description Default
qkv Tensor

The query-key-value tensor

required
attn_metadata T

Metadata for the attention operation

required

Returns:

Type Description
Tensor

Processed QKV tensor

Source code in fastvideo/attention/backends/abstract.py
def preprocess_qkv(self, qkv: torch.Tensor,
                   attn_metadata: T) -> torch.Tensor:
    """Preprocess QKV tensor before performing attention operation.

    Default implementation returns the tensor unchanged.
    Subclasses can override this to implement custom preprocessing
    like reshaping, tiling, scaling, or other transformations.

    Called AFTER all_to_all for distributed attention

    Args:
        qkv: The query-key-value tensor
        attn_metadata: Metadata for the attention operation

    Returns:
        Processed QKV tensor
    """
    return qkv
fastvideo.attention.backends.abstract.AttentionMetadata dataclass
AttentionMetadata(current_timestep: int)

Attention metadata for prefill and decode batched together.

Functions
fastvideo.attention.backends.abstract.AttentionMetadata.asdict_zerocopy
asdict_zerocopy(skip_fields: set[str] | None = None) -> dict[str, Any]

Similar to dataclasses.asdict, but avoids deepcopying.

Source code in fastvideo/attention/backends/abstract.py
def asdict_zerocopy(self,
                    skip_fields: set[str] | None = None) -> dict[str, Any]:
    """Similar to dataclasses.asdict, but avoids deepcopying."""
    if skip_fields is None:
        skip_fields = set()
    # Note that if we add dataclasses as fields, they will need
    # similar handling.
    return {
        field.name: getattr(self, field.name)
        for field in fields(self) if field.name not in skip_fields
    }
fastvideo.attention.backends.abstract.AttentionMetadataBuilder
AttentionMetadataBuilder()

Bases: ABC, Generic[T]

Abstract class for attention metadata builders.

Create the builder, remember some configuration and parameters.

Source code in fastvideo/attention/backends/abstract.py
@abstractmethod
def __init__(self) -> None:
    """Create the builder, remember some configuration and parameters."""
    raise NotImplementedError
Functions
fastvideo.attention.backends.abstract.AttentionMetadataBuilder.build abstractmethod
build(**kwargs: dict[str, Any]) -> AttentionMetadata

Build attention metadata with on-device tensors.

Source code in fastvideo/attention/backends/abstract.py
@abstractmethod
def build(
    self,
    **kwargs: dict[str, Any],
) -> AttentionMetadata:
    """Build attention metadata with on-device tensors."""
    raise NotImplementedError
fastvideo.attention.backends.abstract.AttentionMetadataBuilder.prepare abstractmethod
prepare() -> None

Prepare for one batch.

Source code in fastvideo/attention/backends/abstract.py
@abstractmethod
def prepare(self) -> None:
    """Prepare for one batch."""
    raise NotImplementedError

fastvideo.attention.backends.sla

Classes

fastvideo.attention.backends.sla.SLAAttentionBackend

Bases: AttentionBackend

Sparse-Linear Attention backend.

fastvideo.attention.backends.sla.SLAAttentionImpl
SLAAttentionImpl(num_heads: int, head_size: int, causal: bool = False, softmax_scale: float | None = None, num_kv_heads: int | None = None, prefix: str = '', topk_ratio: float = 0.1, feature_map: str = 'softmax', BLKQ: int = 128, BLKK: int = 64, use_bf16: bool = True, **extra_impl_args)

Bases: AttentionImpl, Module

SLA attention implementation with learnable linear projection.

This implementation combines sparse attention with linear attention, using a learnable projection to blend the outputs. The sparse attention uses a block-sparse pattern determined by QK similarity.

Parameters:

Name Type Description Default
num_heads int

Number of attention heads

required
head_size int

Dimension of each head

required
topk_ratio float

Ratio of key blocks to attend to (0-1), default 0.5

0.1
feature_map str

Feature map for linear attention ('softmax', 'elu', 'relu')

'softmax'
BLKQ int

Query block size for sparse attention

128
BLKK int

Key block size for sparse attention

64
use_bf16 bool

Whether to use bfloat16 for computation

True
Source code in fastvideo/attention/backends/sla.py
def __init__(
    self,
    num_heads: int,
    head_size: int,
    causal: bool = False,
    softmax_scale: float | None = None,
    num_kv_heads: int | None = None,
    prefix: str = "",
    # SLA-specific parameters - matched to TurboDiffusion defaults
    topk_ratio: float = 0.1,  # TurboDiffusion uses topk=0.1
    feature_map: str = "softmax",
    BLKQ: int = 128,  # TurboDiffusion uses BLKQ=128
    BLKK: int = 64,  # TurboDiffusion uses BLKK=64
    use_bf16: bool = True,
    **extra_impl_args,
) -> None:
    nn.Module.__init__(self)

    self.num_heads = num_heads
    self.head_size = head_size
    self.softmax_scale = softmax_scale if softmax_scale else head_size**-0.5
    self.causal = causal
    self.prefix = prefix

    # SLA-specific config
    self.topk_ratio = topk_ratio
    self.BLKQ = BLKQ
    self.BLKK = BLKK
    self.dtype = torch.bfloat16 if use_bf16 else torch.float16

    # Learnable linear projection for combining sparse + linear attention
    self.proj_l = nn.Linear(head_size, head_size, dtype=torch.float32)

    # Feature map for linear attention
    # Type annotation for callables
    self.feature_map_q: Callable[[torch.Tensor], torch.Tensor]
    self.feature_map_k: Callable[[torch.Tensor], torch.Tensor]
    if feature_map == "elu":
        self.feature_map_q = lambda x: F.elu(x) + 1
        self.feature_map_k = lambda x: F.elu(x) + 1
    elif feature_map == "relu":
        self.feature_map_q = F.relu
        self.feature_map_k = F.relu
    elif feature_map == "softmax":
        self.feature_map_q = lambda x: F.softmax(x, dim=-1)
        self.feature_map_k = lambda x: F.softmax(x, dim=-1)
    else:
        raise ValueError(f"Unknown feature map: {feature_map}")

    self._init_weights()
Functions
fastvideo.attention.backends.sla.SLAAttentionImpl.forward
forward(query: Tensor, key: Tensor, value: Tensor, attn_metadata: AttentionMetadata) -> Tensor

Forward pass for SLA attention.

Input tensors are in FastVideo format: (B, L, H, D) Internally converted to SLA format: (B, H, L, D)

Parameters:

Name Type Description Default
query Tensor

Query tensor (B, L, H, D)

required
key Tensor

Key tensor (B, L, H, D)

required
value Tensor

Value tensor (B, L, H, D)

required
attn_metadata AttentionMetadata

Attention metadata

required

Returns:

Type Description
Tensor

Output tensor (B, L, H, D)

Source code in fastvideo/attention/backends/sla.py
def forward(
    self,
    query: torch.Tensor,
    key: torch.Tensor,
    value: torch.Tensor,
    attn_metadata: AttentionMetadata,
) -> torch.Tensor:
    """Forward pass for SLA attention.

    Input tensors are in FastVideo format: (B, L, H, D)
    Internally converted to SLA format: (B, H, L, D)

    Args:
        query: Query tensor (B, L, H, D)
        key: Key tensor (B, L, H, D)
        value: Value tensor (B, L, H, D)
        attn_metadata: Attention metadata

    Returns:
        Output tensor (B, L, H, D)
    """
    original_dtype = query.dtype

    # Convert from FastVideo format (B, L, H, D) to SLA format (B, H, L, D)
    q = query.transpose(1, 2).contiguous()
    k = key.transpose(1, 2).contiguous()
    v = value.transpose(1, 2).contiguous()

    # Get topk ratio from metadata if available
    topk_ratio = self.topk_ratio
    if hasattr(attn_metadata, 'topk_ratio'):
        topk_ratio = attn_metadata.topk_ratio  # type: ignore[union-attr]

    # Compute block-sparse attention pattern
    sparse_map, lut, real_topk = get_block_map(q,
                                               k,
                                               topk_ratio=topk_ratio,
                                               BLKQ=self.BLKQ,
                                               BLKK=self.BLKK)

    # Convert to compute dtype
    q = q.to(self.dtype)
    k = k.to(self.dtype)
    v = v.to(self.dtype)

    # Sparse attention
    o_s = _attention.apply(q, k, v, sparse_map, lut, real_topk, self.BLKQ,
                           self.BLKK)

    # Linear attention with feature maps
    q_linear = self.feature_map_q(q).contiguous().to(self.dtype)
    k_linear = self.feature_map_k(k).contiguous().to(self.dtype)
    o_l = self._calc_linear_attention(q_linear, k_linear, v)

    # Project linear attention output and combine
    with torch.amp.autocast('cuda', dtype=self.dtype):
        o_l = self.proj_l(o_l)

    # Combine sparse and linear outputs
    output = (o_s + o_l).to(original_dtype)

    # Convert back to FastVideo format (B, L, H, D)
    output = output.transpose(1, 2)

    return output
fastvideo.attention.backends.sla.SLAAttentionMetadata dataclass
SLAAttentionMetadata(current_timestep: int, topk_ratio: float = 0.5)

Bases: AttentionMetadata

Metadata for SLA attention.

fastvideo.attention.backends.sla.SLAAttentionMetadataBuilder
SLAAttentionMetadataBuilder()

Bases: AttentionMetadataBuilder

Builder for SLA attention metadata.

Source code in fastvideo/attention/backends/sla.py
def __init__(self) -> None:
    pass
fastvideo.attention.backends.sla.SageSLAAttentionBackend

Bases: AttentionBackend

Quantized Sparse-Linear Attention backend using SageAttention kernels.

fastvideo.attention.backends.sla.SageSLAAttentionImpl
SageSLAAttentionImpl(num_heads: int, head_size: int, causal: bool = False, softmax_scale: float | None = None, num_kv_heads: int | None = None, prefix: str = '', topk_ratio: float = 0.5, feature_map: str = 'softmax', use_bf16: bool = True, **extra_impl_args)

Bases: AttentionImpl, Module

SageSLA attention implementation using quantized SageAttention kernels.

This uses INT8 quantization for Q/K and FP8 for V to achieve better performance while maintaining accuracy. Requires spas_sage_attn package.

Parameters:

Name Type Description Default
num_heads int

Number of attention heads

required
head_size int

Dimension of each head (must be 64 or 128)

required
topk_ratio float

Ratio of key blocks to attend to (0-1), default 0.5

0.5
feature_map str

Feature map for linear attention ('softmax', 'elu', 'relu')

'softmax'
use_bf16 bool

Whether to use bfloat16 for computation

True
Source code in fastvideo/attention/backends/sla.py
def __init__(
    self,
    num_heads: int,
    head_size: int,
    causal: bool = False,
    softmax_scale: float | None = None,
    num_kv_heads: int | None = None,
    prefix: str = "",
    # SageSLA-specific parameters
    topk_ratio: float = 0.5,
    feature_map: str = "softmax",
    use_bf16: bool = True,
    **extra_impl_args,
) -> None:
    nn.Module.__init__(self)

    if not SAGESLA_ENABLED:
        raise ImportError(
            "SageSLA requires spas_sage_attn. "
            "Install with: pip install git+https://github.com/thu-ml/SpargeAttn.git"
        )

    assert head_size in [
        64, 128
    ], f"SageSLA requires head_size in [64, 128], got {head_size}"

    self.num_heads = num_heads
    self.head_size = head_size
    self.softmax_scale = softmax_scale if softmax_scale else head_size**-0.5
    self.causal = causal
    self.prefix = prefix

    # SageSLA-specific config
    self.topk_ratio = topk_ratio
    self.dtype = torch.bfloat16 if use_bf16 else torch.float16

    # Learnable linear projection for combining sparse + linear attention
    self.proj_l = nn.Linear(head_size, head_size, dtype=torch.float32)

    # Feature map for linear attention
    # Type annotation for callables
    self.feature_map_q: Callable[[torch.Tensor], torch.Tensor]
    self.feature_map_k: Callable[[torch.Tensor], torch.Tensor]
    if feature_map == "elu":
        self.feature_map_q = lambda x: F.elu(x) + 1
        self.feature_map_k = lambda x: F.elu(x) + 1
    elif feature_map == "relu":
        self.feature_map_q = F.relu
        self.feature_map_k = F.relu
    elif feature_map == "softmax":
        self.feature_map_q = lambda x: F.softmax(x, dim=-1)
        self.feature_map_k = lambda x: F.softmax(x, dim=-1)
    else:
        raise ValueError(f"Unknown feature map: {feature_map}")

    self._init_weights()
Functions
fastvideo.attention.backends.sla.SageSLAAttentionImpl.forward
forward(query: Tensor, key: Tensor, value: Tensor, attn_metadata: AttentionMetadata) -> Tensor

Forward pass for SageSLA attention with quantized kernels.

Input tensors are in FastVideo format: (B, L, H, D)

Parameters:

Name Type Description Default
query Tensor

Query tensor (B, L, H, D)

required
key Tensor

Key tensor (B, L, H, D)

required
value Tensor

Value tensor (B, L, H, D)

required
attn_metadata AttentionMetadata

Attention metadata

required

Returns:

Type Description
Tensor

Output tensor (B, L, H, D)

Source code in fastvideo/attention/backends/sla.py
def forward(
    self,
    query: torch.Tensor,
    key: torch.Tensor,
    value: torch.Tensor,
    attn_metadata: AttentionMetadata,
) -> torch.Tensor:
    """Forward pass for SageSLA attention with quantized kernels.

    Input tensors are in FastVideo format: (B, L, H, D)

    Args:
        query: Query tensor (B, L, H, D)
        key: Key tensor (B, L, H, D)
        value: Value tensor (B, L, H, D)
        attn_metadata: Attention metadata

    Returns:
        Output tensor (B, L, H, D)
    """
    original_dtype = query.dtype

    # Convert from FastVideo format (B, L, H, D) to SLA format (B, H, L, D)
    q = query.transpose(1, 2).contiguous()
    k = key.transpose(1, 2).contiguous()
    v = value.transpose(1, 2).contiguous()

    # Get topk ratio from metadata if available
    topk_ratio = self.topk_ratio
    if hasattr(attn_metadata, 'topk_ratio'):
        topk_ratio = attn_metadata.topk_ratio  # type: ignore[union-attr]

    # Determine block sizes based on GPU architecture
    arch = _get_cuda_arch(q.device.index)
    if arch == "sm90":
        BLKQ, BLKK = 64, 128
    else:
        BLKQ, BLKK = 128, 64

    # Compute block-sparse attention pattern
    sparse_map, lut, real_topk = get_block_map(q,
                                               k,
                                               topk_ratio=topk_ratio,
                                               BLKQ=BLKQ,
                                               BLKK=BLKK)

    # Convert to compute dtype
    q = q.to(self.dtype)
    k = k.to(self.dtype)
    v = v.to(self.dtype)

    # ========== SPARGE QUANTIZED ATTENTION ==========
    km = k.mean(dim=-2, keepdim=True)
    headdim = q.size(-1)
    scale = 1.0 / (headdim**0.5)

    # Quantize Q, K to INT8
    q_int8, q_scale, k_int8, k_scale = get_vanilla_qk_quant(
        q, k, km, BLKQ, BLKK)
    lut_triton, valid_block_num = block_map_lut_triton(sparse_map)

    # Quantize V to FP8
    b, h_kv, kv_len, head_dim = v.shape
    padded_len = (kv_len + 127) // 128 * 128
    v_transposed_permutted = torch.empty((b, h_kv, head_dim, padded_len),
                                         dtype=v.dtype,
                                         device=v.device)
    fused.transpose_pad_permute_cuda(v, v_transposed_permutted, 1)
    v_fp8 = torch.empty(v_transposed_permutted.shape,
                        dtype=torch.float8_e4m3fn,
                        device=v.device)
    v_scale = torch.empty((b, h_kv, head_dim),
                          dtype=torch.float32,
                          device=v.device)
    fused.scale_fuse_quant_cuda(v_transposed_permutted, v_fp8, v_scale,
                                kv_len, 2.25, 1)

    # Sparse attention with quantized kernels
    o_s = torch.empty_like(q)
    if arch == "sm90":
        qattn.qk_int8_sv_f8_accum_f32_block_sparse_attn_inst_buf_fuse_v_scale_sm90(
            q_int8, k_int8, v_fp8, o_s, lut_triton, valid_block_num,
            q_scale, k_scale, v_scale, 1, False, 1, scale)
    else:
        pvthreshold = torch.full((q.shape[-3], ),
                                 1e6,
                                 dtype=torch.float32,
                                 device=q.device)
        if SAGE2PP_ENABLED:
            qk_int8_sv_f8_accum_f16_block_sparse_attn_inst_buf_fuse_v_scale_with_pv_threshold(
                q_int8, k_int8, v_fp8, o_s, lut_triton, valid_block_num,
                pvthreshold, q_scale, k_scale, v_scale, 1, False, 1, scale,
                0)
        else:
            qattn.qk_int8_sv_f8_accum_f32_block_sparse_attn_inst_buf_fuse_v_scale_with_pv_threshold(
                q_int8, k_int8, v_fp8, o_s, lut_triton, valid_block_num,
                pvthreshold, q_scale, k_scale, v_scale, 1, False, 1, scale,
                0)
    # ========== END SPARGE ==========

    # Linear attention with feature maps
    q_linear = self.feature_map_q(q).contiguous().to(self.dtype)
    k_linear = self.feature_map_k(k).contiguous().to(self.dtype)
    o_l = self._calc_linear_attention(q_linear, k_linear, v)

    # Project linear attention output and combine
    with torch.amp.autocast('cuda', dtype=self.dtype):
        o_l = self.proj_l(o_l)

    # Combine sparse and linear outputs
    output = (o_s + o_l).to(original_dtype)

    # Convert back to FastVideo format (B, L, H, D)
    output = output.transpose(1, 2)

    return output

Functions

fastvideo.attention.backends.sla.get_block_map
get_block_map(q: Tensor, k: Tensor, topk_ratio: float, BLKQ: int = 64, BLKK: int = 64) -> tuple[Tensor, Tensor, int]

Compute sparse block map for attention based on QK similarity.

Parameters:

Name Type Description Default
q Tensor

Query tensor of shape (B, H, L, D)

required
k Tensor

Key tensor of shape (B, H, L, D)

required
topk_ratio float

Ratio of key blocks to attend to (0-1)

required
BLKQ int

Query block size

64
BLKK int

Key block size

64

Returns:

Name Type Description
sparse_map Tensor

Binary mask of shape (B, H, num_q_blocks, num_k_blocks)

lut Tensor

Top-k indices of shape (B, H, num_q_blocks, topk)

topk int

Number of key blocks selected

Source code in fastvideo/attention/backends/sla.py
def get_block_map(
    q: torch.Tensor,
    k: torch.Tensor,
    topk_ratio: float,
    BLKQ: int = 64,
    BLKK: int = 64,
) -> tuple[torch.Tensor, torch.Tensor, int]:
    """Compute sparse block map for attention based on QK similarity.

    Args:
        q: Query tensor of shape (B, H, L, D)
        k: Key tensor of shape (B, H, L, D)
        topk_ratio: Ratio of key blocks to attend to (0-1)
        BLKQ: Query block size
        BLKK: Key block size

    Returns:
        sparse_map: Binary mask of shape (B, H, num_q_blocks, num_k_blocks)
        lut: Top-k indices of shape (B, H, num_q_blocks, topk)
        topk: Number of key blocks selected
    """
    arg_k = k - torch.mean(
        k, dim=-2, keepdim=True)  # smooth-k technique from SageAttention
    pooled_qblocks = mean_pool(q, BLKQ)
    pooled_kblocks = mean_pool(arg_k, BLKK)
    pooled_score = pooled_qblocks @ pooled_kblocks.transpose(-1, -2)

    K = pooled_score.shape[-1]
    topk = min(K, int(topk_ratio * K))
    lut = torch.topk(pooled_score, topk, dim=-1, sorted=False).indices

    sparse_map = torch.zeros_like(pooled_score, dtype=torch.int8)
    sparse_map.scatter_(-1, lut, 1)
    return sparse_map, lut, topk
fastvideo.attention.backends.sla.mean_pool
mean_pool(x: Tensor, BLK: int) -> Tensor

Mean pool tensor along sequence dimension with block size BLK.

Source code in fastvideo/attention/backends/sla.py
def mean_pool(x: torch.Tensor, BLK: int) -> torch.Tensor:
    """Mean pool tensor along sequence dimension with block size BLK."""
    assert x.is_contiguous()

    B, H, L, D = x.shape
    L_BLOCKS = (L + BLK - 1) // BLK
    x_mean = torch.empty((B, H, L_BLOCKS, D), device=x.device, dtype=x.dtype)

    grid = (L_BLOCKS, B * H)
    compress_kernel[grid](x, x_mean, L, D, BLK)
    return x_mean

fastvideo.attention.backends.video_sparse_attn

Classes

Functions

fastvideo.attention.backends.video_sparse_attn.construct_variable_block_sizes cached
construct_variable_block_sizes(dit_seq_shape: tuple[int, int, int], num_tiles: tuple[int, int, int], device: device) -> LongTensor

Compute the number of valid (non‑padded) tokens inside every (ts_t × ts_h × ts_w) tile after padding ‑‑ flattened in the order (t‑tile, h‑tile, w‑tile) that rearrange uses.

Returns

torch.LongTensor # shape: [∏ full_window_size]

Source code in fastvideo/attention/backends/video_sparse_attn.py
@functools.lru_cache(maxsize=10)
def construct_variable_block_sizes(
    dit_seq_shape: tuple[int, int, int],
    num_tiles: tuple[int, int, int],
    device: torch.device,
) -> torch.LongTensor:
    """
    Compute the number of valid (non‑padded) tokens inside every
    (ts_t × ts_h × ts_w) tile after padding ‑‑ flattened in the order
    (t‑tile, h‑tile, w‑tile) that `rearrange` uses.

    Returns
    -------
    torch.LongTensor  # shape: [∏ full_window_size]
    """
    # unpack
    t, h, w = dit_seq_shape
    ts_t, ts_h, ts_w = VSA_TILE_SIZE
    n_t, n_h, n_w = num_tiles

    def _sizes(dim_len: int, tile: int, n_tiles: int) -> torch.LongTensor:
        """Vector with the size of each tile along one dimension."""
        sizes = torch.full((n_tiles, ), tile, dtype=torch.int, device=device)
        # size of last (possibly partial) tile
        remainder = dim_len - (n_tiles - 1) * tile
        sizes[-1] = remainder if remainder > 0 else tile
        return sizes

    t_sizes = _sizes(t, ts_t, n_t)  # [n_t]
    h_sizes = _sizes(h, ts_h, n_h)  # [n_h]
    w_sizes = _sizes(w, ts_w, n_w)  # [n_w]

    # broadcast‑multiply to get voxels per tile, then flatten
    block_sizes = (
        t_sizes[:, None, None]  # [n_t, 1,   1]
        * h_sizes[None, :, None]  # [1,   n_h, 1]
        * w_sizes[None, None, :]  # [1,   1,   n_w]
    ).reshape(-1)  # [n_t * n_h * n_w]

    return block_sizes

fastvideo.attention.backends.vmoba

Classes

fastvideo.attention.backends.vmoba.VMOBAAttentionImpl
VMOBAAttentionImpl(num_heads, head_size, softmax_scale, causal=False, num_kv_heads=None, prefix='', **extra_impl_args)

Bases: AttentionImpl

Source code in fastvideo/attention/backends/vmoba.py
def __init__(self,
             num_heads,
             head_size,
             softmax_scale,
             causal=False,
             num_kv_heads=None,
             prefix="",
             **extra_impl_args) -> None:
    self.prefix = prefix
    self.layer_idx = self._get_layer_idx(prefix)
    from flash_attn.bert_padding import pad_input
    self.pad_input = pad_input
Functions
fastvideo.attention.backends.vmoba.VMOBAAttentionImpl.forward
forward(query: Tensor, key: Tensor, value: Tensor, attn_metadata: AttentionMetadata) -> Tensor

query: [B, L, H, D] key: [B, L, H, D] value: [B, L, H, D] attn_metadata: AttentionMetadata

Source code in fastvideo/attention/backends/vmoba.py
def forward(
    self,
    query: torch.Tensor,
    key: torch.Tensor,
    value: torch.Tensor,
    attn_metadata: AttentionMetadata,
) -> torch.Tensor:
    """
    query: [B, L, H, D]
    key:   [B, L, H, D]
    value: [B, L, H, D]
    attn_metadata: AttentionMetadata
    """
    batch_size, sequence_length, num_heads, head_dim = query.shape

    # select chunk type according to layer idx:
    loop_layer_num = attn_metadata.temporal_layer + attn_metadata.spatial_layer + attn_metadata.st_layer
    moba_layer = self.layer_idx - attn_metadata.first_full_layer
    if moba_layer % loop_layer_num < attn_metadata.temporal_layer:
        moba_chunk_size = attn_metadata.temporal_chunk_size
        moba_topk = attn_metadata.temporal_topk
    elif moba_layer % loop_layer_num < attn_metadata.temporal_layer + attn_metadata.spatial_layer:
        moba_chunk_size = attn_metadata.spatial_chunk_size
        moba_topk = attn_metadata.spatial_topk
    elif moba_layer % loop_layer_num < attn_metadata.temporal_layer + attn_metadata.spatial_layer + attn_metadata.st_layer:
        moba_chunk_size = attn_metadata.st_chunk_size
        moba_topk = attn_metadata.st_topk

    query, chunk_size = process_moba_input(query,
                                           attn_metadata.patch_resolution,
                                           moba_chunk_size)
    key, chunk_size = process_moba_input(key,
                                         attn_metadata.patch_resolution,
                                         moba_chunk_size)
    value, chunk_size = process_moba_input(value,
                                           attn_metadata.patch_resolution,
                                           moba_chunk_size)
    max_seqlen = query.shape[1]
    indices_q = torch.arange(0,
                             query.shape[0] * query.shape[1],
                             device=query.device)
    cu_seqlens = torch.arange(0,
                              query.shape[0] * query.shape[1] + 1,
                              query.shape[1],
                              dtype=torch.int32,
                              device=query.device)
    query = rearrange(query, "b s ... -> (b s) ...")
    key = rearrange(key, "b s ... -> (b s) ...")
    value = rearrange(value, "b s ... -> (b s) ...")

    # current_timestep=attn_metadata.current_timestep
    hidden_states = moba_attn_varlen(
        query,
        key,
        value,
        cu_seqlens=cu_seqlens,
        max_seqlen=max_seqlen,
        moba_chunk_size=chunk_size,
        moba_topk=moba_topk,
        select_mode=attn_metadata.moba_select_mode,
        simsum_threshold=attn_metadata.moba_threshold,
        threshold_type=attn_metadata.moba_threshold_type,
    )
    hidden_states = self.pad_input(hidden_states, indices_q, batch_size,
                                   sequence_length)
    hidden_states = process_moba_output(hidden_states,
                                        attn_metadata.patch_resolution,
                                        moba_chunk_size)

    return hidden_states

Functions