Skip to content

gpu_worker

Classes

fastvideo.worker.gpu_worker.Worker

Worker(fastvideo_args: FastVideoArgs, local_rank: int, rank: int, distributed_init_method: str)
Source code in fastvideo/worker/gpu_worker.py
def __init__(self, fastvideo_args: FastVideoArgs, local_rank: int,
             rank: int, distributed_init_method: str):
    self.fastvideo_args = fastvideo_args
    self.local_rank = local_rank
    self.rank = rank
    self.distributed_init_method = distributed_init_method

Functions

fastvideo.worker.gpu_worker.Worker.init_device
init_device() -> None

Initialize the device for the worker.

Source code in fastvideo/worker/gpu_worker.py
def init_device(self) -> None:
    """Initialize the device for the worker."""

    # torch.distributed.all_reduce does not free the input tensor until
    # the synchronization point. This causes the memory usage to grow
    # as the number of all_reduce calls increases. This env var disables
    # this behavior.
    # Related issue:
    # https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573
    os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1"
    # This env var set by Ray causes exceptions with graph building.
    os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None)

    # Platform-agnostic device initialization
    self.device = get_local_torch_device()

    from fastvideo.platforms import current_platform

    # _check_if_gpu_supports_dtype(self.model_config.dtype)
    if current_platform.is_cuda_alike():
        self.init_gpu_memory = torch.cuda.mem_get_info()[0]
    else:
        # For MPS, we can't get memory info the same way
        self.init_gpu_memory = 0

    if self.fastvideo_args.distributed_executor_backend == "mp":
        os.environ["LOCAL_RANK"] = str(self.local_rank)
    os.environ["RANK"] = str(self.rank)
    os.environ["WORLD_SIZE"] = str(self.fastvideo_args.num_gpus)

    # Initialize the distributed environment.
    maybe_init_distributed_environment_and_model_parallel(
        self.fastvideo_args.tp_size, self.fastvideo_args.sp_size,
        self.distributed_init_method)

    self.pipeline = build_pipeline(self.fastvideo_args)
fastvideo.worker.gpu_worker.Worker.shutdown
shutdown() -> dict[str, Any]

Gracefully shut down the worker process

Source code in fastvideo/worker/gpu_worker.py
def shutdown(self) -> dict[str, Any]:
    """Gracefully shut down the worker process"""
    logger.info("Worker %d shutting down...",
                self.rank,
                local_main_process_only=False)
    # Clean up resources
    if hasattr(self, 'pipeline') and self.pipeline is not None:
        # Clean up pipeline resources if needed
        pass

    # Destroy the distributed environment
    cleanup_dist_env_and_memory(shutdown_ray=False)

    logger.info("Worker %d shutdown complete",
                self.rank,
                local_main_process_only=False)
    return {"status": "shutdown_complete"}

Functions