Skip to content

worker

Classes

fastvideo.worker.Executor

Executor(fastvideo_args: FastVideoArgs)

Bases: ABC

Source code in fastvideo/worker/executor.py
def __init__(self, fastvideo_args: FastVideoArgs):
    self.fastvideo_args = fastvideo_args

    self._init_executor()

Functions

fastvideo.worker.Executor.collective_rpc abstractmethod
collective_rpc(method: str | Callable[..., _R], timeout: float | None = None, args: tuple = (), kwargs: dict[str, Any] | None = None) -> list[_R]

Execute an RPC call on all workers.

Parameters:

Name Type Description Default
method str | Callable[..., _R]

Name of the worker method to execute, or a callable that is serialized and sent to all workers to execute.

If the method is a callable, it should accept an additional self argument, in addition to the arguments passed in args and kwargs. The self argument will be the worker object.

required
timeout float | None

Maximum time in seconds to wait for execution. Raises a :exc:TimeoutError on timeout. None means wait indefinitely.

None
args tuple

Positional arguments to pass to the worker method.

()
kwargs dict[str, Any] | None

Keyword arguments to pass to the worker method.

None

Returns:

Type Description
list[_R]

A list containing the results from each worker.

Note

It is recommended to use this API to only pass control messages, and set up data-plane communication to pass data.

Source code in fastvideo/worker/executor.py
@abstractmethod
def collective_rpc(self,
                   method: str | Callable[..., _R],
                   timeout: float | None = None,
                   args: tuple = (),
                   kwargs: dict[str, Any] | None = None) -> list[_R]:
    """
    Execute an RPC call on all workers.

    Args:
        method: Name of the worker method to execute, or a callable that
            is serialized and sent to all workers to execute.

            If the method is a callable, it should accept an additional
            `self` argument, in addition to the arguments passed in `args`
            and `kwargs`. The `self` argument will be the worker object.
        timeout: Maximum time in seconds to wait for execution. Raises a
            :exc:`TimeoutError` on timeout. `None` means wait indefinitely.
        args: Positional arguments to pass to the worker method.
        kwargs: Keyword arguments to pass to the worker method.

    Returns:
        A list containing the results from each worker.

    Note:
        It is recommended to use this API to only pass control messages,
        and set up data-plane communication to pass data.
    """
    raise NotImplementedError
fastvideo.worker.Executor.merge_lora_weights abstractmethod
merge_lora_weights() -> None

Merge the LoRA weights for the workers.

Source code in fastvideo/worker/executor.py
@abstractmethod
def merge_lora_weights(self) -> None:
    """
    Merge the LoRA weights for the workers.
    """
    raise NotImplementedError
fastvideo.worker.Executor.set_lora_adapter abstractmethod
set_lora_adapter(lora_nickname: str, lora_path: str | None = None) -> None

Set the LoRA adapter for the workers.

Source code in fastvideo/worker/executor.py
@abstractmethod
def set_lora_adapter(self,
                     lora_nickname: str,
                     lora_path: str | None = None) -> None:
    """
    Set the LoRA adapter for the workers.
    """
    raise NotImplementedError
fastvideo.worker.Executor.shutdown abstractmethod
shutdown() -> None

Shutdown the executor.

Source code in fastvideo/worker/executor.py
@abstractmethod
def shutdown(self) -> None:
    """
    Shutdown the executor.
    """
    raise NotImplementedError
fastvideo.worker.Executor.unmerge_lora_weights abstractmethod
unmerge_lora_weights() -> None

Unmerge the LoRA weights for the workers.

Source code in fastvideo/worker/executor.py
@abstractmethod
def unmerge_lora_weights(self) -> None:
    """
    Unmerge the LoRA weights for the workers.
    """
    raise NotImplementedError

fastvideo.worker.MultiprocExecutor

MultiprocExecutor(fastvideo_args: FastVideoArgs)

Bases: Executor

Source code in fastvideo/worker/executor.py
def __init__(self, fastvideo_args: FastVideoArgs):
    self.fastvideo_args = fastvideo_args

    self._init_executor()

Functions

fastvideo.worker.MultiprocExecutor.__del__
__del__()

Ensure cleanup on garbage collection

Source code in fastvideo/worker/multiproc_executor.py
def __del__(self):
    """Ensure cleanup on garbage collection"""
    self.shutdown()
fastvideo.worker.MultiprocExecutor.__enter__
__enter__()

Support for context manager protocol

Source code in fastvideo/worker/multiproc_executor.py
def __enter__(self):
    """Support for context manager protocol"""
    return self
fastvideo.worker.MultiprocExecutor.__exit__
__exit__(exc_type, exc_val, exc_tb)

Ensure cleanup when exiting context

Source code in fastvideo/worker/multiproc_executor.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Ensure cleanup when exiting context"""
    self.shutdown()
fastvideo.worker.MultiprocExecutor.shutdown
shutdown() -> None

Properly shut down the executor and its workers

Source code in fastvideo/worker/multiproc_executor.py
def shutdown(self) -> None:
    """Properly shut down the executor and its workers"""
    if hasattr(self, 'shutting_down') and self.shutting_down:
        return  # Prevent multiple shutdown calls

    logger.info("Shutting down MultiprocExecutor...")
    self.shutting_down = True

    # First try gentle termination
    try:
        # Send termination message to all workers
        for worker in self.workers:
            with contextlib.suppress(Exception):
                worker.pipe.send({
                    "method": "shutdown",
                    "args": (),
                    "kwargs": {}
                })

        # Give workers some time to exit gracefully
        start_time = time.perf_counter()
        while time.perf_counter() - start_time < 5.0:  # 5 seconds timeout
            if all(not worker.proc.is_alive() for worker in self.workers):
                break
            time.sleep(0.1)

        # Force terminate any remaining workers
        for worker in self.workers:
            if worker.proc.is_alive():
                worker.proc.terminate()

        # Final timeout for terminate
        start_time = time.perf_counter()
        while time.perf_counter() - start_time < 2.0:  # 2 seconds timeout
            if all(not worker.proc.is_alive() for worker in self.workers):
                break
            time.sleep(0.1)

        # Kill if still alive
        for worker in self.workers:
            if worker.proc.is_alive():
                worker.proc.kill()
            worker.proc.join(timeout=1.0)

    except Exception as e:
        logger.error("Error during shutdown: %s", e)
        # Last resort, try to kill all workers
        for worker in self.workers:
            with contextlib.suppress(Exception):
                if worker.proc.is_alive():
                    worker.proc.kill()

    # Clean up pipes
    for worker in self.workers:
        with contextlib.suppress(Exception):
            worker.pipe.close()

    self.workers = []
    logger.info("MultiprocExecutor shutdown complete")

Functions

fastvideo.worker.initialize_ray_cluster

initialize_ray_cluster(fastvideo_args: FastVideoArgs, ray_address: str | None = None)

Initialize the distributed cluster with Ray.

it will connect to the Ray cluster and create a placement group for the workers, which includes the specification of the resources for each distributed worker.

Parameters:

Name Type Description Default
parallel_config

The configurations for parallel execution.

required
ray_address str | None

The address of the Ray cluster. If None, uses the default Ray cluster address.

None
Source code in fastvideo/worker/ray_utils.py
def initialize_ray_cluster(
    fastvideo_args: FastVideoArgs,
    ray_address: str | None = None,
):
    """Initialize the distributed cluster with Ray.

    it will connect to the Ray cluster and create a placement group
    for the workers, which includes the specification of the resources
    for each distributed worker.

    Args:
        parallel_config: The configurations for parallel execution.
        ray_address: The address of the Ray cluster. If None, uses
            the default Ray cluster address.
    """
    assert_ray_available()
    from fastvideo.platforms import current_platform

    if ray.is_initialized():
        logger.info("Ray is already initialized. Skipping Ray initialization.")
    elif current_platform.is_rocm() or current_platform.is_xpu():
        # Try to connect existing ray instance and create a new one if not found
        try:
            ray.init("auto")
        except ConnectionError:
            logger.warning(
                "No existing RAY instance detected. "
                "A new instance will be launched with current node resources.")
            ray.init(address=ray_address,
                     num_gpus=fastvideo_args.num_gpus,
                     runtime_env=fastvideo_args.ray_runtime_env)
    else:
        ray.init(address=ray_address,
                 runtime_env=fastvideo_args.ray_runtime_env)

    device_str = current_platform.ray_device_key
    if not device_str:
        raise ValueError(
            f"current platform {current_platform.device_name} does not "
            "support ray.")

    # Create or get the placement group for worker processes
    if fastvideo_args.ray_placement_group:
        current_placement_group = fastvideo_args.ray_placement_group
    else:
        current_placement_group = ray.util.get_current_placement_group()

    if current_placement_group:
        logger.info("Using the existing placement group")

        # We are in a placement group
        bundles = current_placement_group.bundle_specs
        # Verify that we can use the placement group.
        device_bundles = 0
        for bundle in bundles:
            bundle_devices = bundle.get(device_str, 0)
            if bundle_devices > 1:
                raise ValueError(
                    "Placement group bundle cannot have more than 1 "
                    f"{device_str}.")
            if bundle_devices:
                device_bundles += 1
        if fastvideo_args.num_gpus > device_bundles:
            raise ValueError(
                f"The number of required {device_str}s exceeds the total "
                f"number of available {device_str}s in the placement group. "
                f"Required number of devices: {fastvideo_args.num_gpus}. "
                f"Total number of devices: {device_bundles}.")
    else:
        logger.info("No current placement group found. "
                    "Creating a new placement group.")
        num_devices_in_cluster = ray.cluster_resources().get(device_str, 0)
        # Log a warning message and delay resource allocation failure response.
        # Avoid immediate rejection to allow user-initiated placement group
        # created and wait cluster to be ready
        if fastvideo_args.num_gpus > num_devices_in_cluster:
            logger.warning(
                "The number of required %ss exceeds the total "
                "number of available %ss in the placement group.", device_str,
                device_str)
        # Create a new placement group
        placement_group_specs: list[dict[str, float]] = ([{
            device_str: 1.0
        } for _ in range(fastvideo_args.num_gpus)])

        # FastVideo engine is also a worker to execute model with an accelerator,
        # so it requires to have the device in a current node. Check if
        # the current node has at least one device.
        current_ip = get_ip()
        current_node_id = ray.get_runtime_context().get_node_id()
        current_node_resource = available_resources_per_node()[current_node_id]
        if current_node_resource.get(device_str, 0) < 1:
            raise ValueError(
                f"Current node has no {device_str} available. "
                f"{current_node_resource=}. FastVideo engine cannot start without "
                f"{device_str}. Make sure you have at least 1 {device_str} "
                f"available in a node {current_node_id=} {current_ip=}.")
        # This way, at least bundle is required to be created in a current
        # node.
        placement_group_specs[0][f"node:{current_ip}"] = 0.001

        # By default, Ray packs resources as much as possible.
        current_placement_group = ray.util.placement_group(
            placement_group_specs, strategy="PACK")
        _wait_until_pg_ready(current_placement_group)

    assert current_placement_group is not None
    _verify_bundles(current_placement_group, fastvideo_args, device_str)
    # Set the placement group in the fastvideo args
    fastvideo_args.ray_placement_group = current_placement_group

Modules

fastvideo.worker.executor

Classes

fastvideo.worker.executor.Executor
Executor(fastvideo_args: FastVideoArgs)

Bases: ABC

Source code in fastvideo/worker/executor.py
def __init__(self, fastvideo_args: FastVideoArgs):
    self.fastvideo_args = fastvideo_args

    self._init_executor()
Functions
fastvideo.worker.executor.Executor.collective_rpc abstractmethod
collective_rpc(method: str | Callable[..., _R], timeout: float | None = None, args: tuple = (), kwargs: dict[str, Any] | None = None) -> list[_R]

Execute an RPC call on all workers.

Parameters:

Name Type Description Default
method str | Callable[..., _R]

Name of the worker method to execute, or a callable that is serialized and sent to all workers to execute.

If the method is a callable, it should accept an additional self argument, in addition to the arguments passed in args and kwargs. The self argument will be the worker object.

required
timeout float | None

Maximum time in seconds to wait for execution. Raises a :exc:TimeoutError on timeout. None means wait indefinitely.

None
args tuple

Positional arguments to pass to the worker method.

()
kwargs dict[str, Any] | None

Keyword arguments to pass to the worker method.

None

Returns:

Type Description
list[_R]

A list containing the results from each worker.

Note

It is recommended to use this API to only pass control messages, and set up data-plane communication to pass data.

Source code in fastvideo/worker/executor.py
@abstractmethod
def collective_rpc(self,
                   method: str | Callable[..., _R],
                   timeout: float | None = None,
                   args: tuple = (),
                   kwargs: dict[str, Any] | None = None) -> list[_R]:
    """
    Execute an RPC call on all workers.

    Args:
        method: Name of the worker method to execute, or a callable that
            is serialized and sent to all workers to execute.

            If the method is a callable, it should accept an additional
            `self` argument, in addition to the arguments passed in `args`
            and `kwargs`. The `self` argument will be the worker object.
        timeout: Maximum time in seconds to wait for execution. Raises a
            :exc:`TimeoutError` on timeout. `None` means wait indefinitely.
        args: Positional arguments to pass to the worker method.
        kwargs: Keyword arguments to pass to the worker method.

    Returns:
        A list containing the results from each worker.

    Note:
        It is recommended to use this API to only pass control messages,
        and set up data-plane communication to pass data.
    """
    raise NotImplementedError
fastvideo.worker.executor.Executor.merge_lora_weights abstractmethod
merge_lora_weights() -> None

Merge the LoRA weights for the workers.

Source code in fastvideo/worker/executor.py
@abstractmethod
def merge_lora_weights(self) -> None:
    """
    Merge the LoRA weights for the workers.
    """
    raise NotImplementedError
fastvideo.worker.executor.Executor.set_lora_adapter abstractmethod
set_lora_adapter(lora_nickname: str, lora_path: str | None = None) -> None

Set the LoRA adapter for the workers.

Source code in fastvideo/worker/executor.py
@abstractmethod
def set_lora_adapter(self,
                     lora_nickname: str,
                     lora_path: str | None = None) -> None:
    """
    Set the LoRA adapter for the workers.
    """
    raise NotImplementedError
fastvideo.worker.executor.Executor.shutdown abstractmethod
shutdown() -> None

Shutdown the executor.

Source code in fastvideo/worker/executor.py
@abstractmethod
def shutdown(self) -> None:
    """
    Shutdown the executor.
    """
    raise NotImplementedError
fastvideo.worker.executor.Executor.unmerge_lora_weights abstractmethod
unmerge_lora_weights() -> None

Unmerge the LoRA weights for the workers.

Source code in fastvideo/worker/executor.py
@abstractmethod
def unmerge_lora_weights(self) -> None:
    """
    Unmerge the LoRA weights for the workers.
    """
    raise NotImplementedError

Functions

fastvideo.worker.gpu_worker

Classes

fastvideo.worker.gpu_worker.Worker
Worker(fastvideo_args: FastVideoArgs, local_rank: int, rank: int, distributed_init_method: str)
Source code in fastvideo/worker/gpu_worker.py
def __init__(self, fastvideo_args: FastVideoArgs, local_rank: int,
             rank: int, distributed_init_method: str):
    self.fastvideo_args = fastvideo_args
    self.local_rank = local_rank
    self.rank = rank
    self.distributed_init_method = distributed_init_method
Functions
fastvideo.worker.gpu_worker.Worker.init_device
init_device() -> None

Initialize the device for the worker.

Source code in fastvideo/worker/gpu_worker.py
def init_device(self) -> None:
    """Initialize the device for the worker."""

    # torch.distributed.all_reduce does not free the input tensor until
    # the synchronization point. This causes the memory usage to grow
    # as the number of all_reduce calls increases. This env var disables
    # this behavior.
    # Related issue:
    # https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573
    os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1"
    # This env var set by Ray causes exceptions with graph building.
    os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None)

    # Platform-agnostic device initialization
    self.device = get_local_torch_device()

    from fastvideo.platforms import current_platform

    # _check_if_gpu_supports_dtype(self.model_config.dtype)
    if current_platform.is_cuda_alike():
        self.init_gpu_memory = torch.cuda.mem_get_info()[0]
    else:
        # For MPS, we can't get memory info the same way
        self.init_gpu_memory = 0

    if self.fastvideo_args.distributed_executor_backend == "mp":
        os.environ["LOCAL_RANK"] = str(self.local_rank)
    os.environ["RANK"] = str(self.rank)
    os.environ["WORLD_SIZE"] = str(self.fastvideo_args.num_gpus)

    # Initialize the distributed environment.
    maybe_init_distributed_environment_and_model_parallel(
        self.fastvideo_args.tp_size, self.fastvideo_args.sp_size,
        self.distributed_init_method)

    self.pipeline = build_pipeline(self.fastvideo_args)
fastvideo.worker.gpu_worker.Worker.shutdown
shutdown() -> dict[str, Any]

Gracefully shut down the worker process

Source code in fastvideo/worker/gpu_worker.py
def shutdown(self) -> dict[str, Any]:
    """Gracefully shut down the worker process"""
    logger.info("Worker %d shutting down...",
                self.rank,
                local_main_process_only=False)
    # Clean up resources
    if hasattr(self, 'pipeline') and self.pipeline is not None:
        # Clean up pipeline resources if needed
        pass

    # Destroy the distributed environment
    cleanup_dist_env_and_memory(shutdown_ray=False)

    logger.info("Worker %d shutdown complete",
                self.rank,
                local_main_process_only=False)
    return {"status": "shutdown_complete"}

Functions

fastvideo.worker.multiproc_executor

Classes

fastvideo.worker.multiproc_executor.MultiprocExecutor
MultiprocExecutor(fastvideo_args: FastVideoArgs)

Bases: Executor

Source code in fastvideo/worker/executor.py
def __init__(self, fastvideo_args: FastVideoArgs):
    self.fastvideo_args = fastvideo_args

    self._init_executor()
Functions
fastvideo.worker.multiproc_executor.MultiprocExecutor.__del__
__del__()

Ensure cleanup on garbage collection

Source code in fastvideo/worker/multiproc_executor.py
def __del__(self):
    """Ensure cleanup on garbage collection"""
    self.shutdown()
fastvideo.worker.multiproc_executor.MultiprocExecutor.__enter__
__enter__()

Support for context manager protocol

Source code in fastvideo/worker/multiproc_executor.py
def __enter__(self):
    """Support for context manager protocol"""
    return self
fastvideo.worker.multiproc_executor.MultiprocExecutor.__exit__
__exit__(exc_type, exc_val, exc_tb)

Ensure cleanup when exiting context

Source code in fastvideo/worker/multiproc_executor.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Ensure cleanup when exiting context"""
    self.shutdown()
fastvideo.worker.multiproc_executor.MultiprocExecutor.shutdown
shutdown() -> None

Properly shut down the executor and its workers

Source code in fastvideo/worker/multiproc_executor.py
def shutdown(self) -> None:
    """Properly shut down the executor and its workers"""
    if hasattr(self, 'shutting_down') and self.shutting_down:
        return  # Prevent multiple shutdown calls

    logger.info("Shutting down MultiprocExecutor...")
    self.shutting_down = True

    # First try gentle termination
    try:
        # Send termination message to all workers
        for worker in self.workers:
            with contextlib.suppress(Exception):
                worker.pipe.send({
                    "method": "shutdown",
                    "args": (),
                    "kwargs": {}
                })

        # Give workers some time to exit gracefully
        start_time = time.perf_counter()
        while time.perf_counter() - start_time < 5.0:  # 5 seconds timeout
            if all(not worker.proc.is_alive() for worker in self.workers):
                break
            time.sleep(0.1)

        # Force terminate any remaining workers
        for worker in self.workers:
            if worker.proc.is_alive():
                worker.proc.terminate()

        # Final timeout for terminate
        start_time = time.perf_counter()
        while time.perf_counter() - start_time < 2.0:  # 2 seconds timeout
            if all(not worker.proc.is_alive() for worker in self.workers):
                break
            time.sleep(0.1)

        # Kill if still alive
        for worker in self.workers:
            if worker.proc.is_alive():
                worker.proc.kill()
            worker.proc.join(timeout=1.0)

    except Exception as e:
        logger.error("Error during shutdown: %s", e)
        # Last resort, try to kill all workers
        for worker in self.workers:
            with contextlib.suppress(Exception):
                if worker.proc.is_alive():
                    worker.proc.kill()

    # Clean up pipes
    for worker in self.workers:
        with contextlib.suppress(Exception):
            worker.pipe.close()

    self.workers = []
    logger.info("MultiprocExecutor shutdown complete")
fastvideo.worker.multiproc_executor.UnreadyWorkerProcHandle dataclass
UnreadyWorkerProcHandle(proc: BaseProcess, rank: int, pipe: Connection, ready_pipe: Connection)

WorkerProcess handle before READY.

fastvideo.worker.multiproc_executor.WorkerMultiprocProc
WorkerMultiprocProc(fastvideo_args: FastVideoArgs, local_rank: int, rank: int, distributed_init_method: str, pipe: Connection)

Adapter that runs one Worker in busy loop.

Source code in fastvideo/worker/multiproc_executor.py
def __init__(
    self,
    fastvideo_args: FastVideoArgs,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    pipe: Connection,
):
    self.rank = rank
    self.pipe = pipe
    wrapper = WorkerWrapperBase(fastvideo_args=fastvideo_args,
                                rpc_rank=rank)

    all_kwargs: list[dict] = [{} for _ in range(fastvideo_args.num_gpus)]
    all_kwargs[rank] = {
        "fastvideo_args": fastvideo_args,
        "local_rank": local_rank,
        "rank": rank,
        "distributed_init_method": distributed_init_method,
    }
    wrapper.init_worker(all_kwargs)
    self.worker = wrapper

    # Initialize device
    self.worker.init_device()

    # Set process title and log prefix
    self.setup_proc_title_and_log_prefix()
Functions
fastvideo.worker.multiproc_executor.WorkerMultiprocProc.worker_busy_loop
worker_busy_loop() -> None

Main busy loop for Multiprocessing Workers

Source code in fastvideo/worker/multiproc_executor.py
def worker_busy_loop(self) -> None:
    """Main busy loop for Multiprocessing Workers"""
    while True:
        logger.info("Worker %d starting event loop...", self.rank)
        try:
            rpc_call = self.pipe.recv()
            method = rpc_call.get("method")
            args = rpc_call.get("args", ())
            kwargs = rpc_call.get("kwargs", {})

            if isinstance(method, str):
                if method == "shutdown":
                    response = self.shutdown()
                    with contextlib.suppress(Exception):
                        self.pipe.send(response)
                    break
                if method == 'execute_forward':
                    forward_batch = kwargs['forward_batch']
                    fastvideo_args = kwargs['fastvideo_args']
                    output_batch = self.worker.execute_forward(
                        forward_batch, fastvideo_args)
                    logging_info = None
                    if envs.FASTVIDEO_STAGE_LOGGING:
                        logging_info = output_batch.logging_info
                    self.pipe.send({
                        "output_batch": output_batch.output.cpu(),
                        "logging_info": logging_info
                    })
                else:
                    result = self.worker.execute_method(
                        method, *args, **kwargs)
                    self.pipe.send(result)
            else:
                result = self.worker.execute_method(method, *args, **kwargs)
                self.pipe.send(result)
        except KeyboardInterrupt:
            logger.error(
                "Worker %d in loop received KeyboardInterrupt, aborting forward pass",
                self.rank)
            try:
                self.pipe.send(
                    {"error": "Operation aborted by KeyboardInterrupt"})
                logger.info("Worker %d sent error response after interrupt",
                            self.rank)
            except Exception as e:
                logger.error("Worker %d failed to send error response: %s",
                             self.rank, str(e))
            continue
fastvideo.worker.multiproc_executor.WorkerMultiprocProc.worker_main staticmethod
worker_main(*args, **kwargs)

Worker initialization and execution loops. This runs a background process

Source code in fastvideo/worker/multiproc_executor.py
@staticmethod
def worker_main(*args, **kwargs):
    """ Worker initialization and execution loops.
    This runs a background process """

    # Signal handler used for graceful termination.
    # SystemExit exception is only raised once to allow this and worker
    # processes to terminate without error
    shutdown_requested = False

    def signal_handler(signum, frame):
        nonlocal shutdown_requested
        if not shutdown_requested:
            shutdown_requested = True
            raise SystemExit()

    # Either SIGTERM or SIGINT will terminate the worker
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    kill_itself_when_parent_died()
    faulthandler.enable()
    parent_process = psutil.Process().parent()

    worker = None
    ready_pipe = kwargs.pop("ready_pipe")
    rank = kwargs.get("rank")

    try:
        worker = WorkerMultiprocProc(*args, **kwargs)

        # Send READY once we know everything is loaded
        ready_pipe.send({
            "status": WorkerMultiprocProc.READY_STR,
        })

        ready_pipe.close()
        ready_pipe = None

        worker.worker_busy_loop()

    except Exception:
        if ready_pipe is not None:
            logger.exception("WorkerMultiprocProc failed to start.")
        else:
            logger.exception("WorkerMultiprocProc failed.")

        # The parent sends a SIGTERM to all worker processes if
        # any worker dies. Set this value so we don't re-throw
        # SystemExit() to avoid zmq exceptions in __del__.
        shutdown_requested = True
        traceback = get_exception_traceback()
        logger.error("Worker %d hit an exception: %s", rank, traceback)
        parent_process.send_signal(signal.SIGQUIT)

    finally:
        if ready_pipe is not None:
            ready_pipe.close()
        # Clean up once worker exits busy loop
        if worker is not None:
            worker.shutdown()

Functions

fastvideo.worker.multiproc_executor.set_multiproc_executor_envs
set_multiproc_executor_envs() -> None

Set up environment variables that should be used when there are workers in a multiprocessing environment. This should be called by the parent process before worker processes are created

Source code in fastvideo/worker/multiproc_executor.py
def set_multiproc_executor_envs() -> None:
    """ Set up environment variables that should be used when there are workers
    in a multiprocessing environment. This should be called by the parent 
    process before worker processes are created"""

    force_spawn()

fastvideo.worker.ray_distributed_executor

Classes

fastvideo.worker.ray_distributed_executor.RayDistributedExecutor
RayDistributedExecutor(fastvideo_args: FastVideoArgs)

Bases: Executor

Ray-based distributed executor

Source code in fastvideo/worker/executor.py
def __init__(self, fastvideo_args: FastVideoArgs):
    self.fastvideo_args = fastvideo_args

    self._init_executor()
fastvideo.worker.ray_distributed_executor.RayWorkerMetaData dataclass
RayWorkerMetaData(worker: ActorHandle, created_rank: int, adjusted_rank: int = -1, ip: str = '')

Metadata for a Ray worker. The order of ray worker creation can be random, and we need to reset the rank after creating all workers.

Functions

fastvideo.worker.ray_env

Functions

fastvideo.worker.ray_env.get_env_vars_to_copy
get_env_vars_to_copy(exclude_vars: set[str] | None = None, additional_vars: set[str] | None = None, destination: str | None = None) -> set[str]

Get the environment variables to copy to downstream Ray actors.

Example use cases: - Copy environment variables from RayDistributedExecutor to Ray workers. - Copy environment variables from RayDPClient to Ray DPEngineCoreActor.

Parameters:

Name Type Description Default
exclude_vars set[str] | None

A set of FastVideo defined environment variables to exclude from copying.

None
additional_vars set[str] | None

A set of additional environment variables to copy. If a variable is in both exclude_vars and additional_vars, it will be excluded.

None
destination str | None

The destination of the environment variables.

None

Returns: A set of environment variables to copy.

Source code in fastvideo/worker/ray_env.py
def get_env_vars_to_copy(
    exclude_vars: set[str] | None = None,
    additional_vars: set[str] | None = None,
    destination: str | None = None,
) -> set[str]:
    """
    Get the environment variables to copy to downstream Ray actors.

    Example use cases:
    - Copy environment variables from RayDistributedExecutor to Ray workers.
    - Copy environment variables from RayDPClient to Ray DPEngineCoreActor.

    Args:
        exclude_vars: A set of FastVideo defined environment variables to exclude
            from copying.
        additional_vars: A set of additional environment variables to copy.
            If a variable is in both exclude_vars and additional_vars, it will
            be excluded.
        destination: The destination of the environment variables.
    Returns:
        A set of environment variables to copy.
    """
    exclude_vars = exclude_vars or set()
    additional_vars = additional_vars or set()

    env_vars_to_copy = {
        v
        for v in set(envs.environment_variables).union(additional_vars)
        if v not in exclude_vars and v not in RAY_NON_CARRY_OVER_ENV_VARS
    }

    to_destination = " to " + destination if destination is not None else ""

    logger.info(
        "RAY_NON_CARRY_OVER_ENV_VARS from config: %s",
        RAY_NON_CARRY_OVER_ENV_VARS,
    )
    logger.info(
        "Copying the following environment variables%s: %s",
        to_destination,
        [v for v in env_vars_to_copy if v in os.environ],
    )
    logger.info(
        "If certain env vars should NOT be copied, add them to %s file",
        RAY_NON_CARRY_OVER_ENV_VARS_FILE,
    )

    return env_vars_to_copy

fastvideo.worker.ray_utils

Classes

fastvideo.worker.ray_utils.RayWorkerWrapper
RayWorkerWrapper(*args, **kwargs)

Bases: WorkerWrapperBase

Ray wrapper for fastvideo.worker.Worker, allowing Worker to be lazily initialized after Ray sets CUDA_VISIBLE_DEVICES.

Source code in fastvideo/worker/ray_utils.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)

Functions

fastvideo.worker.ray_utils.assert_ray_available
assert_ray_available() -> None

Raise an exception if Ray is not available.

Source code in fastvideo/worker/ray_utils.py
def assert_ray_available() -> None:
    """Raise an exception if Ray is not available."""
    if ray is None:
        raise ValueError(f"Failed to import Ray: {ray_import_err}."
                         "Please install Ray with `pip install ray`.")
fastvideo.worker.ray_utils.initialize_ray_cluster
initialize_ray_cluster(fastvideo_args: FastVideoArgs, ray_address: str | None = None)

Initialize the distributed cluster with Ray.

it will connect to the Ray cluster and create a placement group for the workers, which includes the specification of the resources for each distributed worker.

Parameters:

Name Type Description Default
parallel_config

The configurations for parallel execution.

required
ray_address str | None

The address of the Ray cluster. If None, uses the default Ray cluster address.

None
Source code in fastvideo/worker/ray_utils.py
def initialize_ray_cluster(
    fastvideo_args: FastVideoArgs,
    ray_address: str | None = None,
):
    """Initialize the distributed cluster with Ray.

    it will connect to the Ray cluster and create a placement group
    for the workers, which includes the specification of the resources
    for each distributed worker.

    Args:
        parallel_config: The configurations for parallel execution.
        ray_address: The address of the Ray cluster. If None, uses
            the default Ray cluster address.
    """
    assert_ray_available()
    from fastvideo.platforms import current_platform

    if ray.is_initialized():
        logger.info("Ray is already initialized. Skipping Ray initialization.")
    elif current_platform.is_rocm() or current_platform.is_xpu():
        # Try to connect existing ray instance and create a new one if not found
        try:
            ray.init("auto")
        except ConnectionError:
            logger.warning(
                "No existing RAY instance detected. "
                "A new instance will be launched with current node resources.")
            ray.init(address=ray_address,
                     num_gpus=fastvideo_args.num_gpus,
                     runtime_env=fastvideo_args.ray_runtime_env)
    else:
        ray.init(address=ray_address,
                 runtime_env=fastvideo_args.ray_runtime_env)

    device_str = current_platform.ray_device_key
    if not device_str:
        raise ValueError(
            f"current platform {current_platform.device_name} does not "
            "support ray.")

    # Create or get the placement group for worker processes
    if fastvideo_args.ray_placement_group:
        current_placement_group = fastvideo_args.ray_placement_group
    else:
        current_placement_group = ray.util.get_current_placement_group()

    if current_placement_group:
        logger.info("Using the existing placement group")

        # We are in a placement group
        bundles = current_placement_group.bundle_specs
        # Verify that we can use the placement group.
        device_bundles = 0
        for bundle in bundles:
            bundle_devices = bundle.get(device_str, 0)
            if bundle_devices > 1:
                raise ValueError(
                    "Placement group bundle cannot have more than 1 "
                    f"{device_str}.")
            if bundle_devices:
                device_bundles += 1
        if fastvideo_args.num_gpus > device_bundles:
            raise ValueError(
                f"The number of required {device_str}s exceeds the total "
                f"number of available {device_str}s in the placement group. "
                f"Required number of devices: {fastvideo_args.num_gpus}. "
                f"Total number of devices: {device_bundles}.")
    else:
        logger.info("No current placement group found. "
                    "Creating a new placement group.")
        num_devices_in_cluster = ray.cluster_resources().get(device_str, 0)
        # Log a warning message and delay resource allocation failure response.
        # Avoid immediate rejection to allow user-initiated placement group
        # created and wait cluster to be ready
        if fastvideo_args.num_gpus > num_devices_in_cluster:
            logger.warning(
                "The number of required %ss exceeds the total "
                "number of available %ss in the placement group.", device_str,
                device_str)
        # Create a new placement group
        placement_group_specs: list[dict[str, float]] = ([{
            device_str: 1.0
        } for _ in range(fastvideo_args.num_gpus)])

        # FastVideo engine is also a worker to execute model with an accelerator,
        # so it requires to have the device in a current node. Check if
        # the current node has at least one device.
        current_ip = get_ip()
        current_node_id = ray.get_runtime_context().get_node_id()
        current_node_resource = available_resources_per_node()[current_node_id]
        if current_node_resource.get(device_str, 0) < 1:
            raise ValueError(
                f"Current node has no {device_str} available. "
                f"{current_node_resource=}. FastVideo engine cannot start without "
                f"{device_str}. Make sure you have at least 1 {device_str} "
                f"available in a node {current_node_id=} {current_ip=}.")
        # This way, at least bundle is required to be created in a current
        # node.
        placement_group_specs[0][f"node:{current_ip}"] = 0.001

        # By default, Ray packs resources as much as possible.
        current_placement_group = ray.util.placement_group(
            placement_group_specs, strategy="PACK")
        _wait_until_pg_ready(current_placement_group)

    assert current_placement_group is not None
    _verify_bundles(current_placement_group, fastvideo_args, device_str)
    # Set the placement group in the fastvideo args
    fastvideo_args.ray_placement_group = current_placement_group
fastvideo.worker.ray_utils.is_in_ray_actor
is_in_ray_actor()

Check if we are in a Ray actor.

Source code in fastvideo/worker/ray_utils.py
def is_in_ray_actor():
    """Check if we are in a Ray actor."""

    try:
        import ray
        return (ray.is_initialized()
                and ray.get_runtime_context().get_actor_id() is not None)
    except ImportError:
        return False

fastvideo.worker.worker_base

Classes

fastvideo.worker.worker_base.WorkerWrapperBase
WorkerWrapperBase(fastvideo_args: FastVideoArgs, rpc_rank: int = 0)

This class represents one process in an executor/engine. It is responsible for lazily initializing the worker and handling the worker's lifecycle. We first instantiate the WorkerWrapper, which remembers the worker module and class name. Then, when we call update_environment_variables, and the real initialization happens in init_worker.

Initialize the worker wrapper with the given fastvideo_args and rpc_rank. Note: rpc_rank is the rank of the worker in the executor. In most cases, it is also the rank of the worker in the distributed group. However, when multiple executors work together, they can be different. e.g. in the case of SPMD-style offline inference with TP=2, users can launch 2 engines/executors, each with only 1 worker. All workers have rpc_rank=0, but they have different ranks in the TP group.

Source code in fastvideo/worker/worker_base.py
def __init__(
    self,
    fastvideo_args: FastVideoArgs,
    rpc_rank: int = 0,
) -> None:
    """
    Initialize the worker wrapper with the given fastvideo_args and rpc_rank.
    Note: rpc_rank is the rank of the worker in the executor. In most cases,
    it is also the rank of the worker in the distributed group. However,
    when multiple executors work together, they can be different.
    e.g. in the case of SPMD-style offline inference with TP=2,
    users can launch 2 engines/executors, each with only 1 worker.
    All workers have rpc_rank=0, but they have different ranks in the TP
    group.
    """
    self.rpc_rank = rpc_rank
    self.worker: Worker | None = None
    self.fastvideo_args: FastVideoArgs | None = None
Functions
fastvideo.worker.worker_base.WorkerWrapperBase.adjust_rank
adjust_rank(rank_mapping: dict[int, int]) -> None

Adjust the rpc_rank based on the given mapping. It is only used during the initialization of the executor, to adjust the rpc_rank of workers after we create all workers.

Source code in fastvideo/worker/worker_base.py
def adjust_rank(self, rank_mapping: dict[int, int]) -> None:
    """
    Adjust the rpc_rank based on the given mapping.
    It is only used during the initialization of the executor,
    to adjust the rpc_rank of workers after we create all workers.
    """
    if self.rpc_rank in rank_mapping:
        self.rpc_rank = rank_mapping[self.rpc_rank]
fastvideo.worker.worker_base.WorkerWrapperBase.init_worker
init_worker(all_kwargs: list[dict[str, Any]]) -> None

Here we inject some common logic before initializing the worker. Arguments are passed to the worker class constructor.

Source code in fastvideo/worker/worker_base.py
def init_worker(self, all_kwargs: list[dict[str, Any]]) -> None:
    """
    Here we inject some common logic before initializing the worker.
    Arguments are passed to the worker class constructor.
    """
    kwargs = all_kwargs[self.rpc_rank]
    self.fastvideo_args = kwargs.get("fastvideo_args")
    assert self.fastvideo_args is not None, (
        "fastvideo_args is required to initialize the worker")

    self.worker = Worker(**kwargs)
    assert self.worker is not None

Functions