Skip to content

worker_base

Classes

fastvideo.worker.worker_base.WorkerWrapperBase

WorkerWrapperBase(fastvideo_args: FastVideoArgs, rpc_rank: int = 0)

This class represents one process in an executor/engine. It is responsible for lazily initializing the worker and handling the worker's lifecycle. We first instantiate the WorkerWrapper, which remembers the worker module and class name. Then, when we call update_environment_variables, and the real initialization happens in init_worker.

Initialize the worker wrapper with the given fastvideo_args and rpc_rank. Note: rpc_rank is the rank of the worker in the executor. In most cases, it is also the rank of the worker in the distributed group. However, when multiple executors work together, they can be different. e.g. in the case of SPMD-style offline inference with TP=2, users can launch 2 engines/executors, each with only 1 worker. All workers have rpc_rank=0, but they have different ranks in the TP group.

Source code in fastvideo/worker/worker_base.py
def __init__(
    self,
    fastvideo_args: FastVideoArgs,
    rpc_rank: int = 0,
) -> None:
    """
    Initialize the worker wrapper with the given fastvideo_args and rpc_rank.
    Note: rpc_rank is the rank of the worker in the executor. In most cases,
    it is also the rank of the worker in the distributed group. However,
    when multiple executors work together, they can be different.
    e.g. in the case of SPMD-style offline inference with TP=2,
    users can launch 2 engines/executors, each with only 1 worker.
    All workers have rpc_rank=0, but they have different ranks in the TP
    group.
    """
    self.rpc_rank = rpc_rank
    self.worker: Worker | None = None
    self.fastvideo_args: FastVideoArgs | None = None

Functions

fastvideo.worker.worker_base.WorkerWrapperBase.adjust_rank
adjust_rank(rank_mapping: dict[int, int]) -> None

Adjust the rpc_rank based on the given mapping. It is only used during the initialization of the executor, to adjust the rpc_rank of workers after we create all workers.

Source code in fastvideo/worker/worker_base.py
def adjust_rank(self, rank_mapping: dict[int, int]) -> None:
    """
    Adjust the rpc_rank based on the given mapping.
    It is only used during the initialization of the executor,
    to adjust the rpc_rank of workers after we create all workers.
    """
    if self.rpc_rank in rank_mapping:
        self.rpc_rank = rank_mapping[self.rpc_rank]
fastvideo.worker.worker_base.WorkerWrapperBase.init_worker
init_worker(all_kwargs: list[dict[str, Any]]) -> None

Here we inject some common logic before initializing the worker. Arguments are passed to the worker class constructor.

Source code in fastvideo/worker/worker_base.py
def init_worker(self, all_kwargs: list[dict[str, Any]]) -> None:
    """
    Here we inject some common logic before initializing the worker.
    Arguments are passed to the worker class constructor.
    """
    kwargs = all_kwargs[self.rpc_rank]
    self.fastvideo_args = kwargs.get("fastvideo_args")
    assert self.fastvideo_args is not None, (
        "fastvideo_args is required to initialize the worker")

    self.worker = Worker(**kwargs)
    assert self.worker is not None

Functions