utils
¶
Classes¶
fastvideo.distributed.utils.StatelessProcessGroup
dataclass
¶
StatelessProcessGroup(rank: int, world_size: int, store: Store, data_expiration_seconds: int = 3600, send_dst_counter: dict[int, int] = dict(), recv_src_counter: dict[int, int] = dict(), broadcast_send_counter: int = 0, broadcast_recv_src_counter: dict[int, int] = dict(), entries: deque[tuple[str, float]] = deque())
A dataclass to hold a metadata store, and the rank, world_size of the group. Only use it to communicate metadata between processes. For data-plane communication, create NCCL-related objects.
Functions¶
fastvideo.distributed.utils.StatelessProcessGroup.all_gather_obj
¶
All gather an object from all ranks.
Source code in fastvideo/distributed/utils.py
fastvideo.distributed.utils.StatelessProcessGroup.barrier
¶
fastvideo.distributed.utils.StatelessProcessGroup.broadcast_obj
¶
Broadcast an object from a source rank to all other ranks. It does not clean up after all ranks have received the object. Use it for limited times, e.g., for initialization.
Source code in fastvideo/distributed/utils.py
fastvideo.distributed.utils.StatelessProcessGroup.create
staticmethod
¶
create(host: str, port: int, rank: int, world_size: int, data_expiration_seconds: int = 3600) -> StatelessProcessGroup
A replacement for torch.distributed.init_process_group that does not
pollute the global state.
If we have process A and process B called torch.distributed.init_process_group
to form a group, and then we want to form another group with process A, B, C,
D, it is not possible in PyTorch, because process A and process B have already
formed a group, and process C and process D cannot join that group. This
function is a workaround for this issue.
torch.distributed.init_process_group is a global call, while this function
is a stateless call. It will return a StatelessProcessGroup object that can be
used for exchanging metadata. With this function, process A and process B
can call StatelessProcessGroup.create to form a group, and then process A, B,
C, and D can call StatelessProcessGroup.create to form another group.
Source code in fastvideo/distributed/utils.py
fastvideo.distributed.utils.StatelessProcessGroup.expire_data
¶
Expire data that is older than data_expiration_seconds seconds.
Source code in fastvideo/distributed/utils.py
fastvideo.distributed.utils.StatelessProcessGroup.recv_obj
¶
Receive an object from a source rank.
fastvideo.distributed.utils.StatelessProcessGroup.send_obj
¶
Send an object to a destination rank.
Source code in fastvideo/distributed/utils.py
Functions¶
fastvideo.distributed.utils.divide
¶
Ensure that numerator is divisible by the denominator and return the division value.
fastvideo.distributed.utils.ensure_divisibility
¶
Ensure that numerator is divisible by the denominator.
fastvideo.distributed.utils.split_tensor_along_last_dim
¶
split_tensor_along_last_dim(tensor: Tensor, num_partitions: int, contiguous_split_chunks: bool = False) -> Sequence[Tensor]
Split a tensor along its last dimension.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tensor
|
Tensor
|
input tensor. |
required |
num_partitions
|
int
|
number of partitions to split the tensor |
required |
contiguous_split_chunks
|
bool
|
If True, make each chunk contiguous in memory. |
False
|
Returns:
| Type | Description |
|---|---|
Sequence[Tensor]
|
A list of Tensors |