Skip to content

pyhccl

Classes

fastvideo.distributed.device_communicators.pyhccl.PyHcclCommunicator

PyHcclCommunicator(group: ProcessGroup | StatelessProcessGroup, device: int | str | device, library_path: str | None = None)

Parameters:

Name Type Description Default
group ProcessGroup | StatelessProcessGroup

the process group to work on. If None, it will use the default process group.

required
device int | str | device

the device to bind the PyHcclCommunicator to. If None, it will be bind to f"npu:{local_rank}".

required
library_path str | None

the path to the HCCL library. If None, it will use the default library path.

None

It is the caller's responsibility to make sure each communicator is bind to a unique device.

Source code in fastvideo/distributed/device_communicators/pyhccl.py
def __init__(
    self,
    group: ProcessGroup | StatelessProcessGroup,
    device: int | str | torch.device,
    library_path: str | None = None,
):
    """
    Args:
        group: the process group to work on. If None, it will use the
            default process group.
        device: the device to bind the PyHcclCommunicator to. If None,
            it will be bind to f"npu:{local_rank}".
        library_path: the path to the HCCL library. If None, it will
            use the default library path.
    It is the caller's responsibility to make sure each communicator
    is bind to a unique device.
    """

    if not isinstance(group, StatelessProcessGroup):
        assert dist.is_initialized()
        assert dist.get_backend(group) != dist.Backend.HCCL, (
            "PyHcclCommunicator should be attached to a non-HCCL group.")
        # note: this rank is the rank in the group
        self.rank = dist.get_rank(group)
        self.world_size = dist.get_world_size(group)
    else:
        self.rank = group.rank
        self.world_size = group.world_size

    self.group = group

    # if world_size == 1, no need to create communicator
    if self.world_size == 1:
        self.available = False
        self.disabled = True
        return

    try:
        self.hccl = HCCLLibrary(library_path)
    except Exception:
        logger.warning("disable hccl because of missing HCCL library")
        # disable because of missing HCCL library
        # e.g. in a non-NPU environment
        self.available = False
        self.disabled = True
        return

    self.available = True
    self.disabled = False

    logger.info("FastVideo is using pyhccl")

    if isinstance(device, int):
        device = torch.device(f"npu:{device}")
    elif isinstance(device, str):
        device = torch.device(device)
    # now `device` is a `torch.device` object
    assert isinstance(device, torch.device)
    self.device = device

    if self.rank == 0:
        # get the unique id from HCCL
        with torch.npu.device(device):
            self.unique_id = self.hccl.hcclGetUniqueId()
    else:
        # construct an empty unique id
        self.unique_id = hcclUniqueId()

    if not isinstance(group, StatelessProcessGroup):
        tensor = torch.ByteTensor(list(self.unique_id.internal))
        ranks = dist.get_process_group_ranks(group)
        # arg `src` in `broadcast` is the global rank
        dist.broadcast(tensor, src=ranks[0], group=group)
        byte_list = tensor.tolist()
        for i, byte in enumerate(byte_list):
            self.unique_id.internal[i] = byte
    else:
        self.unique_id = group.broadcast_obj(self.unique_id, src=0)

    # hccl communicator and stream will use this device
    # `torch.npu.device` is a context manager that changes the
    # current npu device to the specified one
    with torch.npu.device(device):
        self.comm: hcclComm_t = self.hccl.hcclCommInitRank(
            self.world_size, self.unique_id, self.rank)

        stream = current_stream()
        # A small all_reduce for warmup.
        data = torch.zeros(1, device=device)
        self.all_reduce(data)
        stream.synchronize()
        del data

Functions

Functions