fastvideo.v1.distributed.parallel_state

Contents

fastvideo.v1.distributed.parallel_state#

FastVideo distributed state. It takes over the control of the distributed environment from PyTorch. The typical workflow is:

  • call init_distributed_environment to initialize the distributed environment.

  • call initialize_model_parallel or ensure_model_parallel_initialized to initialize the model parallel groups.

  • any code dealing with the distributed stuff

  • call destroy_model_parallel to destroy the model parallel groups.

  • call destroy_distributed_environment to destroy the distributed environment.

If you only need to use the distributed environment without model parallelism, you can skip the model parallel initialization and destruction steps.

Module Contents#

Classes#

GraphCaptureContext

GroupCoordinator

PyTorch ProcessGroup wrapper for a group of processes. PyTorch ProcessGroup is bound to one specific communication backend, e.g. NCCL, Gloo, MPI, etc. GroupCoordinator takes charge of all the communication operations among the processes in the group. It manages both CPU and device communication.

Functions#

all_reduce

all_reduce_fake

cleanup_dist_env_and_memory

destroy_distributed_environment

destroy_model_parallel

Set the groups to none and destroy them.

ensure_model_parallel_initialized

Helper to initialize model parallel groups if they are not initialized, or ensure tensor-parallel, sequence-parallel sizes are equal to expected values if the model parallel groups are initialized.

get_sequence_model_parallel_rank

Return my rank for the sequence model parallel group.

get_sequence_model_parallel_world_size

Return world size for the sequence model parallel group.

get_sp_group

get_tensor_model_parallel_rank

Return my rank for the tensor model parallel group.

get_tensor_model_parallel_world_size

Return world size for the tensor model parallel group.

get_tp_group

get_world_group

in_the_same_node_as

This is a collective operation that returns if each rank is in the same node as the source rank. It tests if processes are attached to the same memory system (shared access to shared memory).

init_distributed_environment

init_model_parallel_group

init_world_group

initialize_model_parallel

Initialize model parallel groups.

initialize_sequence_parallel_group

Initialize a sequence parallel group for a specific model.

initialize_tensor_parallel_group

Initialize a tensor parallel group for a specific model.

model_parallel_is_initialized

Check if tensor, sequence parallel groups are initialized.

patch_tensor_parallel_group

Patch the tp group temporarily until this function ends.

set_custom_all_reduce

Data#

API#

class fastvideo.v1.distributed.parallel_state.GraphCaptureContext[source]#
stream: torch.cuda.Stream[source]#

None

class fastvideo.v1.distributed.parallel_state.GroupCoordinator(group_ranks: List[List[int]], local_rank: int, torch_distributed_backend: Union[str, torch.distributed.Backend], use_device_communicator: bool, use_message_queue_broadcaster: bool = False, group_name: Optional[str] = None)[source]#

PyTorch ProcessGroup wrapper for a group of processes. PyTorch ProcessGroup is bound to one specific communication backend, e.g. NCCL, Gloo, MPI, etc. GroupCoordinator takes charge of all the communication operations among the processes in the group. It manages both CPU and device communication.

Initialization

all_gather(input_: torch.Tensor, dim: int = -1) torch.Tensor[source]#
all_reduce(input_: torch.Tensor, op: Optional[torch.distributed.ReduceOp] = ReduceOp.SUM) torch.Tensor[source]#

User-facing all-reduce function before we actually call the all-reduce operation.

We need this because Dynamo does not support passing an arbitrary object (self in this case) to a custom op. We need to pass the group name as a string, and then look up the group coordinator from the group name, dispatch the all-reduce operation to the group coordinator.

In addition, PyTorch custom ops do not support mutation or returning a new tensor in the same op. So we always make the all-reduce operation out-of-place.

all_to_all_4D(input_: torch.Tensor, scatter_dim: int = 2, gather_dim: int = 1) torch.Tensor[source]#
barrier() None[source]#

Barrier synchronization among the group. NOTE: don’t use device_group here! barrier in NCCL is terrible because it is internally a broadcast operation with secretly created GPU tensors. It is easy to mess up the current device. Use the CPU group instead.

broadcast(input_: torch.Tensor, src: int = 0)[source]#

Broadcast the input tensor. NOTE: src is the local rank of the source rank.

broadcast_object(obj: Optional[Any] = None, src: int = 0)[source]#

Broadcast the input object. NOTE: src is the local rank of the source rank.

broadcast_object_list(obj_list: List[Any], src: int = 0, group: Optional[torch.distributed.ProcessGroup] = None)[source]#

Broadcast the input object list. NOTE: src is the local rank of the source rank.

broadcast_tensor_dict(tensor_dict: Optional[Dict[str, Union[torch.Tensor, Any]]] = None, src: int = 0, group: Optional[torch.distributed.ProcessGroup] = None, metadata_group: Optional[torch.distributed.ProcessGroup] = None) Optional[Dict[str, Union[torch.Tensor, Any]]][source]#

Broadcast the input tensor dictionary. NOTE: src is the local rank of the source rank.

cpu_group: torch.distributed.ProcessGroup[source]#

None

destroy() None[source]#
device_communicator: fastvideo.v1.distributed.device_communicators.base_device_communicator.DeviceCommunicatorBase[source]#

None

device_group: torch.distributed.ProcessGroup[source]#

None

property first_rank[source]#

Return the global rank of the first process in the group

gather(input_: torch.Tensor, dst: int = 0, dim: int = -1) Optional[torch.Tensor][source]#

NOTE: We assume that the input tensor is on the same device across all the ranks. NOTE: dst is the local rank of the destination rank.

graph_capture(graph_capture_context: Optional[fastvideo.v1.distributed.parallel_state.GraphCaptureContext] = None)[source]#
property is_first_rank[source]#

Return whether the caller is the first process in the group

property is_last_rank[source]#

Return whether the caller is the last process in the group

property last_rank[source]#

Return the global rank of the last process in the group

local_rank: int[source]#

None

mq_broadcaster: Optional[Any][source]#

None

property next_rank[source]#

Return the global rank of the process that follows the caller

property prev_rank[source]#

Return the global rank of the process that precedes the caller

rank: int[source]#

None

rank_in_group: int[source]#

None

ranks: List[int][source]#

None

recv(size: torch.Size, dtype: torch.dtype, src: Optional[int] = None) torch.Tensor[source]#

Receives a tensor from the source rank.

recv_object(src: int) Any[source]#

Receive the input object list from the source rank.

recv_tensor_dict(src: Optional[int] = None, all_gather_group: Optional[fastvideo.v1.distributed.parallel_state.GroupCoordinator] = None) Optional[Dict[str, Union[torch.Tensor, Any]]][source]#

Recv the input tensor dictionary. NOTE: src is the local rank of the source rank.

send(tensor: torch.Tensor, dst: Optional[int] = None) None[source]#

Sends a tensor to the destination rank in a non-blocking way

send_object(obj: Any, dst: int) None[source]#

Send the input object list to the destination rank.

send_tensor_dict(tensor_dict: Dict[str, Union[torch.Tensor, Any]], dst: Optional[int] = None, all_gather_group: Optional[fastvideo.v1.distributed.parallel_state.GroupCoordinator] = None) Optional[Dict[str, Union[torch.Tensor, Any]]][source]#

Send the input tensor dictionary. NOTE: dst is the local rank of the source rank.

use_device_communicator: bool[source]#

None

world_size: int[source]#

None

fastvideo.v1.distributed.parallel_state.TensorMetadata[source]#

β€˜namedtuple(…)’

fastvideo.v1.distributed.parallel_state.all_reduce(tensor: torch.Tensor, group_name: str) torch.Tensor[source]#
fastvideo.v1.distributed.parallel_state.all_reduce_fake(tensor: torch.Tensor, group_name: str) torch.Tensor[source]#
fastvideo.v1.distributed.parallel_state.cleanup_dist_env_and_memory(shutdown_ray: bool = False)[source]#
fastvideo.v1.distributed.parallel_state.destroy_distributed_environment() None[source]#
fastvideo.v1.distributed.parallel_state.destroy_model_parallel() None[source]#

Set the groups to none and destroy them.

fastvideo.v1.distributed.parallel_state.ensure_model_parallel_initialized(tensor_model_parallel_size: int, sequence_model_parallel_size: int, backend: Optional[str] = None) None[source]#

Helper to initialize model parallel groups if they are not initialized, or ensure tensor-parallel, sequence-parallel sizes are equal to expected values if the model parallel groups are initialized.

fastvideo.v1.distributed.parallel_state.get_sequence_model_parallel_rank() int[source]#

Return my rank for the sequence model parallel group.

fastvideo.v1.distributed.parallel_state.get_sequence_model_parallel_world_size() int[source]#

Return world size for the sequence model parallel group.

fastvideo.v1.distributed.parallel_state.get_sp_group() fastvideo.v1.distributed.parallel_state.GroupCoordinator[source]#
fastvideo.v1.distributed.parallel_state.get_tensor_model_parallel_group[source]#

None

fastvideo.v1.distributed.parallel_state.get_tensor_model_parallel_rank() int[source]#

Return my rank for the tensor model parallel group.

fastvideo.v1.distributed.parallel_state.get_tensor_model_parallel_world_size() int[source]#

Return world size for the tensor model parallel group.

fastvideo.v1.distributed.parallel_state.get_tp_group() fastvideo.v1.distributed.parallel_state.GroupCoordinator[source]#
fastvideo.v1.distributed.parallel_state.get_world_group() fastvideo.v1.distributed.parallel_state.GroupCoordinator[source]#
fastvideo.v1.distributed.parallel_state.in_the_same_node_as(pg: Union[torch.distributed.ProcessGroup, fastvideo.v1.distributed.utils.StatelessProcessGroup], source_rank: int = 0) List[bool][source]#

This is a collective operation that returns if each rank is in the same node as the source rank. It tests if processes are attached to the same memory system (shared access to shared memory).

fastvideo.v1.distributed.parallel_state.init_distributed_environment(world_size: int = -1, rank: int = -1, distributed_init_method: str = 'env://', local_rank: int = -1, backend: str = 'nccl')[source]#
fastvideo.v1.distributed.parallel_state.init_model_parallel_group(group_ranks: List[List[int]], local_rank: int, backend: str, use_message_queue_broadcaster: bool = False, group_name: Optional[str] = None) fastvideo.v1.distributed.parallel_state.GroupCoordinator[source]#
fastvideo.v1.distributed.parallel_state.init_world_group(ranks: List[int], local_rank: int, backend: str) fastvideo.v1.distributed.parallel_state.GroupCoordinator[source]#
fastvideo.v1.distributed.parallel_state.initialize_model_parallel(tensor_model_parallel_size: int = 1, sequence_model_parallel_size: int = 1, backend: Optional[str] = None) None[source]#

Initialize model parallel groups.

Parameters:
  • tensor_model_parallel_size – number of GPUs used for tensor model parallelism.

  • sequence_model_parallel_size – number of GPUs used for sequence model parallelism.

fastvideo.v1.distributed.parallel_state.initialize_sequence_parallel_group(sequence_model_parallel_size: int = 1, backend: Optional[str] = None, group_name_suffix: str = '') fastvideo.v1.distributed.parallel_state.GroupCoordinator[source]#

Initialize a sequence parallel group for a specific model.

This function creates a sequence parallel group that can be used with the patch_sequence_parallel_group context manager. It allows different models to use different sequence parallelism configurations.

Parameters:
  • sequence_model_parallel_size – number of GPUs used for sequence model parallelism.

  • backend – communication backend to use.

  • group_name_suffix – optional suffix to make the group name unique.

Returns:

A GroupCoordinator for sequence parallelism that can be used with the patch_sequence_parallel_group context manager.

Example usage: ```python # Initialize sequence parallel group for model2 sp_group_model2 = initialize_sequence_parallel_group( sequence_model_parallel_size=2, group_name_suffix=”model2” )

# Use sequence parallelism for model2
with patch_sequence_parallel_group(sp_group_model2):
    # Run model2 with sequence parallelism
    output2 = model2(input2)
```
fastvideo.v1.distributed.parallel_state.initialize_tensor_parallel_group(tensor_model_parallel_size: int = 1, backend: Optional[str] = None, group_name_suffix: str = '') fastvideo.v1.distributed.parallel_state.GroupCoordinator[source]#

Initialize a tensor parallel group for a specific model.

This function creates a tensor parallel group that can be used with the patch_tensor_parallel_group context manager. It allows different models to use different tensor parallelism configurations.

Parameters:
  • tensor_model_parallel_size – number of GPUs used for tensor model parallelism.

  • backend – communication backend to use.

  • group_name_suffix – optional suffix to make the group name unique.

Returns:

A GroupCoordinator for tensor parallelism that can be used with the patch_tensor_parallel_group context manager.

Example usage: ```python # Initialize tensor parallel group for model1 tp_group_model1 = initialize_tensor_parallel_group( tensor_model_parallel_size=4, group_name_suffix=”model1” )

# Use tensor parallelism for model1
with patch_tensor_parallel_group(tp_group_model1):
    # Run model1 with tensor parallelism
    output1 = model1(input1)
```
fastvideo.v1.distributed.parallel_state.logger[source]#

β€˜init_logger(…)’

fastvideo.v1.distributed.parallel_state.model_parallel_is_initialized() bool[source]#

Check if tensor, sequence parallel groups are initialized.

fastvideo.v1.distributed.parallel_state.patch_tensor_parallel_group(tp_group: fastvideo.v1.distributed.parallel_state.GroupCoordinator)[source]#

Patch the tp group temporarily until this function ends.

This method is for draft workers of speculative decoding to run draft model with different tp degree from that of target model workers.

Parameters:

tp_group (GroupCoordinator) – the tp group coordinator

fastvideo.v1.distributed.parallel_state.set_custom_all_reduce(enable: bool)[source]#