fastvideo.v1.distributed.parallel_state
#
FastVideo distributed state. It takes over the control of the distributed environment from PyTorch. The typical workflow is:
call
init_distributed_environment
to initialize the distributed environment.call
initialize_model_parallel
orensure_model_parallel_initialized
to initialize the model parallel groups.any code dealing with the distributed stuff
call
destroy_model_parallel
to destroy the model parallel groups.call
destroy_distributed_environment
to destroy the distributed environment.
If you only need to use the distributed environment without model parallelism, you can skip the model parallel initialization and destruction steps.
Module Contents#
Classes#
PyTorch ProcessGroup wrapper for a group of processes. PyTorch ProcessGroup is bound to one specific communication backend, e.g. NCCL, Gloo, MPI, etc. GroupCoordinator takes charge of all the communication operations among the processes in the group. It manages both CPU and device communication. |
Functions#
Set the groups to none and destroy them. |
|
Helper to initialize model parallel groups if they are not initialized, or ensure tensor-parallel, sequence-parallel sizes are equal to expected values if the model parallel groups are initialized. |
|
Return my rank for the sequence model parallel group. |
|
Return world size for the sequence model parallel group. |
|
Return my rank for the tensor model parallel group. |
|
Return world size for the tensor model parallel group. |
|
This is a collective operation that returns if each rank is in the same node as the source rank. It tests if processes are attached to the same memory system (shared access to shared memory). |
|
Initialize model parallel groups. |
|
Initialize a sequence parallel group for a specific model. |
|
Initialize a tensor parallel group for a specific model. |
|
Check if tensor, sequence parallel groups are initialized. |
|
Patch the tp group temporarily until this function ends. |
|
Data#
API#
- class fastvideo.v1.distributed.parallel_state.GraphCaptureContext[source]#
- stream: torch.cuda.Stream[source]#
None
- class fastvideo.v1.distributed.parallel_state.GroupCoordinator(group_ranks: List[List[int]], local_rank: int, torch_distributed_backend: Union[str, torch.distributed.Backend], use_device_communicator: bool, use_message_queue_broadcaster: bool = False, group_name: Optional[str] = None)[source]#
PyTorch ProcessGroup wrapper for a group of processes. PyTorch ProcessGroup is bound to one specific communication backend, e.g. NCCL, Gloo, MPI, etc. GroupCoordinator takes charge of all the communication operations among the processes in the group. It manages both CPU and device communication.
Initialization
- all_gather(input_: torch.Tensor, dim: int = -1) torch.Tensor [source]#
- all_reduce(input_: torch.Tensor, op: Optional[torch.distributed.ReduceOp] = ReduceOp.SUM) torch.Tensor [source]#
User-facing all-reduce function before we actually call the all-reduce operation.
We need this because Dynamo does not support passing an arbitrary object (
self
in this case) to a custom op. We need to pass the group name as a string, and then look up the group coordinator from the group name, dispatch the all-reduce operation to the group coordinator.In addition, PyTorch custom ops do not support mutation or returning a new tensor in the same op. So we always make the all-reduce operation out-of-place.
- all_to_all_4D(input_: torch.Tensor, scatter_dim: int = 2, gather_dim: int = 1) torch.Tensor [source]#
- barrier() None [source]#
Barrier synchronization among the group. NOTE: donβt use
device_group
here!barrier
in NCCL is terrible because it is internally a broadcast operation with secretly created GPU tensors. It is easy to mess up the current device. Use the CPU group instead.
- broadcast(input_: torch.Tensor, src: int = 0)[source]#
Broadcast the input tensor. NOTE:
src
is the local rank of the source rank.
- broadcast_object(obj: Optional[Any] = None, src: int = 0)[source]#
Broadcast the input object. NOTE:
src
is the local rank of the source rank.
- broadcast_object_list(obj_list: List[Any], src: int = 0, group: Optional[torch.distributed.ProcessGroup] = None)[source]#
Broadcast the input object list. NOTE:
src
is the local rank of the source rank.
- broadcast_tensor_dict(tensor_dict: Optional[Dict[str, Union[torch.Tensor, Any]]] = None, src: int = 0, group: Optional[torch.distributed.ProcessGroup] = None, metadata_group: Optional[torch.distributed.ProcessGroup] = None) Optional[Dict[str, Union[torch.Tensor, Any]]] [source]#
Broadcast the input tensor dictionary. NOTE:
src
is the local rank of the source rank.
- device_communicator: fastvideo.v1.distributed.device_communicators.base_device_communicator.DeviceCommunicatorBase[source]#
None
- gather(input_: torch.Tensor, dst: int = 0, dim: int = -1) Optional[torch.Tensor] [source]#
NOTE: We assume that the input tensor is on the same device across all the ranks. NOTE:
dst
is the local rank of the destination rank.
- graph_capture(graph_capture_context: Optional[fastvideo.v1.distributed.parallel_state.GraphCaptureContext] = None)[source]#
- recv(size: torch.Size, dtype: torch.dtype, src: Optional[int] = None) torch.Tensor [source]#
Receives a tensor from the source rank.
- recv_tensor_dict(src: Optional[int] = None, all_gather_group: Optional[fastvideo.v1.distributed.parallel_state.GroupCoordinator] = None) Optional[Dict[str, Union[torch.Tensor, Any]]] [source]#
Recv the input tensor dictionary. NOTE:
src
is the local rank of the source rank.
- send(tensor: torch.Tensor, dst: Optional[int] = None) None [source]#
Sends a tensor to the destination rank in a non-blocking way
- send_tensor_dict(tensor_dict: Dict[str, Union[torch.Tensor, Any]], dst: Optional[int] = None, all_gather_group: Optional[fastvideo.v1.distributed.parallel_state.GroupCoordinator] = None) Optional[Dict[str, Union[torch.Tensor, Any]]] [source]#
Send the input tensor dictionary. NOTE:
dst
is the local rank of the source rank.
- fastvideo.v1.distributed.parallel_state.all_reduce(tensor: torch.Tensor, group_name: str) torch.Tensor [source]#
- fastvideo.v1.distributed.parallel_state.all_reduce_fake(tensor: torch.Tensor, group_name: str) torch.Tensor [source]#
- fastvideo.v1.distributed.parallel_state.cleanup_dist_env_and_memory(shutdown_ray: bool = False)[source]#
- fastvideo.v1.distributed.parallel_state.destroy_model_parallel() None [source]#
Set the groups to none and destroy them.
- fastvideo.v1.distributed.parallel_state.ensure_model_parallel_initialized(tensor_model_parallel_size: int, sequence_model_parallel_size: int, backend: Optional[str] = None) None [source]#
Helper to initialize model parallel groups if they are not initialized, or ensure tensor-parallel, sequence-parallel sizes are equal to expected values if the model parallel groups are initialized.
- fastvideo.v1.distributed.parallel_state.get_sequence_model_parallel_rank() int [source]#
Return my rank for the sequence model parallel group.
- fastvideo.v1.distributed.parallel_state.get_sequence_model_parallel_world_size() int [source]#
Return world size for the sequence model parallel group.
- fastvideo.v1.distributed.parallel_state.get_sp_group() fastvideo.v1.distributed.parallel_state.GroupCoordinator [source]#
- fastvideo.v1.distributed.parallel_state.get_tensor_model_parallel_rank() int [source]#
Return my rank for the tensor model parallel group.
- fastvideo.v1.distributed.parallel_state.get_tensor_model_parallel_world_size() int [source]#
Return world size for the tensor model parallel group.
- fastvideo.v1.distributed.parallel_state.get_tp_group() fastvideo.v1.distributed.parallel_state.GroupCoordinator [source]#
- fastvideo.v1.distributed.parallel_state.get_world_group() fastvideo.v1.distributed.parallel_state.GroupCoordinator [source]#
- fastvideo.v1.distributed.parallel_state.in_the_same_node_as(pg: Union[torch.distributed.ProcessGroup, fastvideo.v1.distributed.utils.StatelessProcessGroup], source_rank: int = 0) List[bool] [source]#
This is a collective operation that returns if each rank is in the same node as the source rank. It tests if processes are attached to the same memory system (shared access to shared memory).
- fastvideo.v1.distributed.parallel_state.init_distributed_environment(world_size: int = -1, rank: int = -1, distributed_init_method: str = 'env://', local_rank: int = -1, backend: str = 'nccl')[source]#
- fastvideo.v1.distributed.parallel_state.init_model_parallel_group(group_ranks: List[List[int]], local_rank: int, backend: str, use_message_queue_broadcaster: bool = False, group_name: Optional[str] = None) fastvideo.v1.distributed.parallel_state.GroupCoordinator [source]#
- fastvideo.v1.distributed.parallel_state.init_world_group(ranks: List[int], local_rank: int, backend: str) fastvideo.v1.distributed.parallel_state.GroupCoordinator [source]#
- fastvideo.v1.distributed.parallel_state.initialize_model_parallel(tensor_model_parallel_size: int = 1, sequence_model_parallel_size: int = 1, backend: Optional[str] = None) None [source]#
Initialize model parallel groups.
- Parameters:
tensor_model_parallel_size β number of GPUs used for tensor model parallelism.
sequence_model_parallel_size β number of GPUs used for sequence model parallelism.
- fastvideo.v1.distributed.parallel_state.initialize_sequence_parallel_group(sequence_model_parallel_size: int = 1, backend: Optional[str] = None, group_name_suffix: str = '') fastvideo.v1.distributed.parallel_state.GroupCoordinator [source]#
Initialize a sequence parallel group for a specific model.
This function creates a sequence parallel group that can be used with the patch_sequence_parallel_group context manager. It allows different models to use different sequence parallelism configurations.
- Parameters:
sequence_model_parallel_size β number of GPUs used for sequence model parallelism.
backend β communication backend to use.
group_name_suffix β optional suffix to make the group name unique.
- Returns:
A GroupCoordinator for sequence parallelism that can be used with the patch_sequence_parallel_group context manager.
Example usage: ```python # Initialize sequence parallel group for model2 sp_group_model2 = initialize_sequence_parallel_group( sequence_model_parallel_size=2, group_name_suffix=βmodel2β )
# Use sequence parallelism for model2 with patch_sequence_parallel_group(sp_group_model2): # Run model2 with sequence parallelism output2 = model2(input2) ```
- fastvideo.v1.distributed.parallel_state.initialize_tensor_parallel_group(tensor_model_parallel_size: int = 1, backend: Optional[str] = None, group_name_suffix: str = '') fastvideo.v1.distributed.parallel_state.GroupCoordinator [source]#
Initialize a tensor parallel group for a specific model.
This function creates a tensor parallel group that can be used with the patch_tensor_parallel_group context manager. It allows different models to use different tensor parallelism configurations.
- Parameters:
tensor_model_parallel_size β number of GPUs used for tensor model parallelism.
backend β communication backend to use.
group_name_suffix β optional suffix to make the group name unique.
- Returns:
A GroupCoordinator for tensor parallelism that can be used with the patch_tensor_parallel_group context manager.
Example usage: ```python # Initialize tensor parallel group for model1 tp_group_model1 = initialize_tensor_parallel_group( tensor_model_parallel_size=4, group_name_suffix=βmodel1β )
# Use tensor parallelism for model1 with patch_tensor_parallel_group(tp_group_model1): # Run model1 with tensor parallelism output1 = model1(input1) ```
- fastvideo.v1.distributed.parallel_state.model_parallel_is_initialized() bool [source]#
Check if tensor, sequence parallel groups are initialized.
- fastvideo.v1.distributed.parallel_state.patch_tensor_parallel_group(tp_group: fastvideo.v1.distributed.parallel_state.GroupCoordinator)[source]#
Patch the tp group temporarily until this function ends.
This method is for draft workers of speculative decoding to run draft model with different tp degree from that of target model workers.
- Parameters:
tp_group (GroupCoordinator) β the tp group coordinator