Skip to content

cuda_communicator

Classes

fastvideo.distributed.device_communicators.cuda_communicator.CudaCommunicator

CudaCommunicator(cpu_group: ProcessGroup, device: device | None = None, device_group: ProcessGroup | None = None, unique_name: str = '')

Bases: DeviceCommunicatorBase

Source code in fastvideo/distributed/device_communicators/cuda_communicator.py
def __init__(self,
             cpu_group: ProcessGroup,
             device: torch.device | None = None,
             device_group: ProcessGroup | None = None,
             unique_name: str = ""):
    super().__init__(cpu_group, device, device_group, unique_name)

    from fastvideo.distributed.device_communicators.pynccl import (
        PyNcclCommunicator)

    self.pynccl_comm: PyNcclCommunicator | None = None
    if self.world_size > 1:
        self.pynccl_comm = PyNcclCommunicator(
            group=self.cpu_group,
            device=self.device,
        )

Functions

fastvideo.distributed.device_communicators.cuda_communicator.CudaCommunicator.recv
recv(size: Size, dtype: dtype, src: int | None = None) -> Tensor

Receives a tensor from the source rank.

Source code in fastvideo/distributed/device_communicators/cuda_communicator.py
def recv(self,
         size: torch.Size,
         dtype: torch.dtype,
         src: int | None = None) -> torch.Tensor:
    """Receives a tensor from the source rank."""
    """NOTE: `src` is the local rank of the source rank."""
    if src is None:
        src = (self.rank_in_group - 1) % self.world_size

    tensor = torch.empty(size, dtype=dtype, device=self.device)
    pynccl_comm = self.pynccl_comm
    if pynccl_comm is not None and not pynccl_comm.disabled:
        pynccl_comm.recv(tensor, src)
    else:
        torch.distributed.recv(tensor, self.ranks[src], self.device_group)
    return tensor
fastvideo.distributed.device_communicators.cuda_communicator.CudaCommunicator.send
send(tensor: Tensor, dst: int | None = None) -> None

Sends a tensor to the destination rank in a non-blocking way

Source code in fastvideo/distributed/device_communicators/cuda_communicator.py
def send(self, tensor: torch.Tensor, dst: int | None = None) -> None:
    """Sends a tensor to the destination rank in a non-blocking way"""
    """NOTE: `dst` is the local rank of the destination rank."""
    if dst is None:
        dst = (self.rank_in_group + 1) % self.world_size

    pynccl_comm = self.pynccl_comm
    if pynccl_comm is not None and not pynccl_comm.disabled:
        pynccl_comm.send(tensor, dst)
    else:
        torch.distributed.send(tensor, self.ranks[dst], self.device_group)