Skip to content

npu_communicator

Classes

fastvideo.distributed.device_communicators.npu_communicator.NpuCommunicator

NpuCommunicator(cpu_group: ProcessGroup, device: device | None = None, device_group: ProcessGroup | None = None, unique_name: str = '')

Bases: DeviceCommunicatorBase

Source code in fastvideo/distributed/device_communicators/npu_communicator.py
def __init__(self,
             cpu_group: ProcessGroup,
             device: torch.device | None = None,
             device_group: ProcessGroup | None = None,
             unique_name: str = ""):
    super().__init__(cpu_group, device, device_group, unique_name)

    from fastvideo.distributed.device_communicators.pyhccl import (
        PyHcclCommunicator)

    self.pyhccl_comm: PyHcclCommunicator | None = None
    if self.world_size > 1:
        self.pyhccl_comm = PyHcclCommunicator(
            group=self.cpu_group,
            device=self.device,
        )

Functions

fastvideo.distributed.device_communicators.npu_communicator.NpuCommunicator.recv
recv(size: Size, dtype: dtype, src: int | None = None) -> Tensor

Receives a tensor from the source rank.

Source code in fastvideo/distributed/device_communicators/npu_communicator.py
def recv(self,
         size: torch.Size,
         dtype: torch.dtype,
         src: int | None = None) -> torch.Tensor:
    """Receives a tensor from the source rank."""
    """NOTE: `src` is the local rank of the source rank."""
    if src is None:
        src = (self.rank_in_group - 1) % self.world_size

    tensor = torch.empty(size, dtype=dtype, device=self.device)
    pyhccl_comm = self.pyhccl_comm
    if pyhccl_comm is not None and not pyhccl_comm.disabled:
        pyhccl_comm.recv(tensor, src)
    else:
        torch.distributed.recv(tensor, self.ranks[src], self.device_group)
    return tensor
fastvideo.distributed.device_communicators.npu_communicator.NpuCommunicator.send
send(tensor: Tensor, dst: int | None = None) -> None

Sends a tensor to the destination rank in a non-blocking way

Source code in fastvideo/distributed/device_communicators/npu_communicator.py
def send(self, tensor: torch.Tensor, dst: int | None = None) -> None:
    """Sends a tensor to the destination rank in a non-blocking way"""
    """NOTE: `dst` is the local rank of the destination rank."""
    if dst is None:
        dst = (self.rank_in_group + 1) % self.world_size

    pyhccl_comm = self.pyhccl_comm
    if pyhccl_comm is not None and not pyhccl_comm.disabled:
        pyhccl_comm.send(tensor, dst)
    else:
        torch.distributed.send(tensor, self.ranks[dst], self.device_group)