fastvideo.v1.distributed.utils#

Module Contents#

Classes#

StatelessProcessGroup

A dataclass to hold a metadata store, and the rank, world_size of the group. Only use it to communicate metadata between processes. For data-plane communication, create NCCL-related objects.

Functions#

divide

Ensure that numerator is divisible by the denominator and return the division value.

ensure_divisibility

Ensure that numerator is divisible by the denominator.

split_tensor_along_last_dim

Split a tensor along its last dimension.

Data#

API#

class fastvideo.v1.distributed.utils.StatelessProcessGroup[source]#

A dataclass to hold a metadata store, and the rank, world_size of the group. Only use it to communicate metadata between processes. For data-plane communication, create NCCL-related objects.

all_gather_obj(obj: Any) list[Any][source]#

All gather an object from all ranks.

barrier()[source]#

A barrier to synchronize all ranks.

broadcast_obj(obj: Optional[Any], src: int) Any[source]#

Broadcast an object from a source rank to all other ranks. It does not clean up after all ranks have received the object. Use it for limited times, e.g., for initialization.

broadcast_recv_src_counter: Dict[int, int][source]#

‘field(…)’

broadcast_send_counter: int[source]#

0

static create(host: str, port: int, rank: int, world_size: int, data_expiration_seconds: int = 3600) fastvideo.v1.distributed.utils.StatelessProcessGroup[source]#

A replacement for torch.distributed.init_process_group that does not pollute the global state.

If we have process A and process B called torch.distributed.init_process_group to form a group, and then we want to form another group with process A, B, C, D, it is not possible in PyTorch, because process A and process B have already formed a group, and process C and process D cannot join that group. This function is a workaround for this issue.

torch.distributed.init_process_group is a global call, while this function is a stateless call. It will return a StatelessProcessGroup object that can be used for exchanging metadata. With this function, process A and process B can call StatelessProcessGroup.create to form a group, and then process A, B, C, and D can call StatelessProcessGroup.create to form another group.

data_expiration_seconds: int[source]#

3600

entries: Deque[Tuple[str, float]][source]#

‘field(…)’

expire_data() None[source]#

Expire data that is older than data_expiration_seconds seconds.

rank: int[source]#

None

recv_obj(src: int) Any[source]#

Receive an object from a source rank.

recv_src_counter: Dict[int, int][source]#

‘field(…)’

send_dst_counter: Dict[int, int][source]#

‘field(…)’

send_obj(obj: Any, dst: int)[source]#

Send an object to a destination rank.

store: torch._C._distributed_c10d.Store[source]#

None

world_size: int[source]#

None

fastvideo.v1.distributed.utils.divide(numerator: int, denominator: int) int[source]#

Ensure that numerator is divisible by the denominator and return the division value.

fastvideo.v1.distributed.utils.ensure_divisibility(numerator, denominator) None[source]#

Ensure that numerator is divisible by the denominator.

fastvideo.v1.distributed.utils.logger[source]#

‘init_logger(…)’

fastvideo.v1.distributed.utils.split_tensor_along_last_dim(tensor: torch.Tensor, num_partitions: int, contiguous_split_chunks: bool = False) Sequence[torch.Tensor][source]#

Split a tensor along its last dimension.

Parameters:
  • tensor – input tensor.

  • num_partitions – number of partitions to split the tensor

  • contiguous_split_chunks – If True, make each chunk contiguous in memory.

Returns:

A list of Tensors