Skip to content

pynccl

Classes

fastvideo.distributed.device_communicators.pynccl.PyNcclCommunicator

PyNcclCommunicator(group: ProcessGroup | StatelessProcessGroup, device: int | str | device, library_path: str | None = None)

Parameters:

Name Type Description Default
group ProcessGroup | StatelessProcessGroup

the process group to work on. If None, it will use the default process group.

required
device int | str | device

the device to bind the PyNcclCommunicator to. If None, it will be bind to f"cuda:{local_rank}".

required
library_path str | None

the path to the NCCL library. If None, it will use the default library path.

None

It is the caller's responsibility to make sure each communicator is bind to a unique device.

Source code in fastvideo/distributed/device_communicators/pynccl.py
def __init__(
    self,
    group: ProcessGroup | StatelessProcessGroup,
    device: int | str | torch.device,
    library_path: str | None = None,
):
    """
    Args:
        group: the process group to work on. If None, it will use the
            default process group.
        device: the device to bind the PyNcclCommunicator to. If None,
            it will be bind to f"cuda:{local_rank}".
        library_path: the path to the NCCL library. If None, it will
            use the default library path.
    It is the caller's responsibility to make sure each communicator
    is bind to a unique device.
    """
    if not isinstance(group, StatelessProcessGroup):
        assert dist.is_initialized()
        assert dist.get_backend(group) != dist.Backend.NCCL, (
            "PyNcclCommunicator should be attached to a non-NCCL group.")
        # note: this rank is the rank in the group
        self.rank = dist.get_rank(group)
        self.world_size = dist.get_world_size(group)
    else:
        self.rank = group.rank
        self.world_size = group.world_size

    self.group = group

    # if world_size == 1, no need to create communicator
    if self.world_size == 1:
        self.available = False
        self.disabled = True
        return
    try:
        self.nccl = NCCLLibrary(library_path)
    except Exception:
        # disable because of missing NCCL library
        # e.g. in a non-GPU environment
        self.available = False
        self.disabled = True
        return

    self.available = True
    self.disabled = False

    logger.info("FastVideo is using nccl==%s", self.nccl.ncclGetVersion())

    if self.rank == 0:
        # get the unique id from NCCL
        self.unique_id = self.nccl.ncclGetUniqueId()
    else:
        # construct an empty unique id
        self.unique_id = ncclUniqueId()

    if not isinstance(group, StatelessProcessGroup):
        tensor = torch.ByteTensor(list(self.unique_id.internal))
        ranks = dist.get_process_group_ranks(group)
        # arg `src` in `broadcast` is the global rank
        dist.broadcast(tensor, src=ranks[0], group=group)
        byte_list = tensor.tolist()
        for i, byte in enumerate(byte_list):
            self.unique_id.internal[i] = byte
    else:
        self.unique_id = group.broadcast_obj(self.unique_id, src=0)
    if isinstance(device, int):
        device = torch.device(f"cuda:{device}")
    elif isinstance(device, str):
        device = torch.device(device)
    # now `device` is a `torch.device` object
    assert isinstance(device, torch.device)
    self.device = device
    # nccl communicator and stream will use this device
    # `torch.cuda.device` is a context manager that changes the
    # current cuda device to the specified one
    with torch.cuda.device(device):
        self.comm: ncclComm_t = self.nccl.ncclCommInitRank(
            self.world_size, self.unique_id, self.rank)

        stream = current_stream()
        # A small all_reduce for warmup.
        data = torch.zeros(1, device=device)
        self.all_reduce(data)
        if stream is not None:
            stream.synchronize()
        del data

Functions

Functions