Skip to content

multiproc_executor

Classes

fastvideo.worker.multiproc_executor.MultiprocExecutor

MultiprocExecutor(fastvideo_args: FastVideoArgs)

Bases: Executor

Source code in fastvideo/worker/executor.py
def __init__(self, fastvideo_args: FastVideoArgs):
    self.fastvideo_args = fastvideo_args

    self._init_executor()

Functions

fastvideo.worker.multiproc_executor.MultiprocExecutor.__del__
__del__()

Ensure cleanup on garbage collection

Source code in fastvideo/worker/multiproc_executor.py
def __del__(self):
    """Ensure cleanup on garbage collection"""
    self.shutdown()
fastvideo.worker.multiproc_executor.MultiprocExecutor.__enter__
__enter__()

Support for context manager protocol

Source code in fastvideo/worker/multiproc_executor.py
def __enter__(self):
    """Support for context manager protocol"""
    return self
fastvideo.worker.multiproc_executor.MultiprocExecutor.__exit__
__exit__(exc_type, exc_val, exc_tb)

Ensure cleanup when exiting context

Source code in fastvideo/worker/multiproc_executor.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Ensure cleanup when exiting context"""
    self.shutdown()
fastvideo.worker.multiproc_executor.MultiprocExecutor.shutdown
shutdown() -> None

Properly shut down the executor and its workers

Source code in fastvideo/worker/multiproc_executor.py
def shutdown(self) -> None:
    """Properly shut down the executor and its workers"""
    if hasattr(self, 'shutting_down') and self.shutting_down:
        return  # Prevent multiple shutdown calls

    logger.info("Shutting down MultiprocExecutor...")
    self.shutting_down = True

    # First try gentle termination
    try:
        # Send termination message to all workers
        for worker in self.workers:
            with contextlib.suppress(Exception):
                worker.pipe.send({
                    "method": "shutdown",
                    "args": (),
                    "kwargs": {}
                })

        # Give workers some time to exit gracefully
        start_time = time.perf_counter()
        while time.perf_counter() - start_time < 5.0:  # 5 seconds timeout
            if all(not worker.proc.is_alive() for worker in self.workers):
                break
            time.sleep(0.1)

        # Force terminate any remaining workers
        for worker in self.workers:
            if worker.proc.is_alive():
                worker.proc.terminate()

        # Final timeout for terminate
        start_time = time.perf_counter()
        while time.perf_counter() - start_time < 2.0:  # 2 seconds timeout
            if all(not worker.proc.is_alive() for worker in self.workers):
                break
            time.sleep(0.1)

        # Kill if still alive
        for worker in self.workers:
            if worker.proc.is_alive():
                worker.proc.kill()
            worker.proc.join(timeout=1.0)

    except Exception as e:
        logger.error("Error during shutdown: %s", e)
        # Last resort, try to kill all workers
        for worker in self.workers:
            with contextlib.suppress(Exception):
                if worker.proc.is_alive():
                    worker.proc.kill()

    # Clean up pipes
    for worker in self.workers:
        with contextlib.suppress(Exception):
            worker.pipe.close()

    self.workers = []
    logger.info("MultiprocExecutor shutdown complete")

fastvideo.worker.multiproc_executor.UnreadyWorkerProcHandle dataclass

UnreadyWorkerProcHandle(proc: BaseProcess, rank: int, pipe: Connection, ready_pipe: Connection)

WorkerProcess handle before READY.

fastvideo.worker.multiproc_executor.WorkerMultiprocProc

WorkerMultiprocProc(fastvideo_args: FastVideoArgs, local_rank: int, rank: int, distributed_init_method: str, pipe: Connection)

Adapter that runs one Worker in busy loop.

Source code in fastvideo/worker/multiproc_executor.py
def __init__(
    self,
    fastvideo_args: FastVideoArgs,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    pipe: Connection,
):
    self.rank = rank
    self.pipe = pipe
    wrapper = WorkerWrapperBase(fastvideo_args=fastvideo_args,
                                rpc_rank=rank)

    all_kwargs: list[dict] = [{} for _ in range(fastvideo_args.num_gpus)]
    all_kwargs[rank] = {
        "fastvideo_args": fastvideo_args,
        "local_rank": local_rank,
        "rank": rank,
        "distributed_init_method": distributed_init_method,
    }
    wrapper.init_worker(all_kwargs)
    self.worker = wrapper

    # Initialize device
    self.worker.init_device()

    # Set process title and log prefix
    self.setup_proc_title_and_log_prefix()

Functions

fastvideo.worker.multiproc_executor.WorkerMultiprocProc.worker_busy_loop
worker_busy_loop() -> None

Main busy loop for Multiprocessing Workers

Source code in fastvideo/worker/multiproc_executor.py
def worker_busy_loop(self) -> None:
    """Main busy loop for Multiprocessing Workers"""
    while True:
        logger.info("Worker %d starting event loop...", self.rank)
        try:
            rpc_call = self.pipe.recv()
            method = rpc_call.get("method")
            args = rpc_call.get("args", ())
            kwargs = rpc_call.get("kwargs", {})

            if isinstance(method, str):
                if method == "shutdown":
                    response = self.shutdown()
                    with contextlib.suppress(Exception):
                        self.pipe.send(response)
                    break
                if method == 'execute_forward':
                    forward_batch = kwargs['forward_batch']
                    fastvideo_args = kwargs['fastvideo_args']
                    output_batch = self.worker.execute_forward(
                        forward_batch, fastvideo_args)
                    logging_info = None
                    if envs.FASTVIDEO_STAGE_LOGGING:
                        logging_info = output_batch.logging_info
                    self.pipe.send({
                        "output_batch": output_batch.output.cpu(),
                        "logging_info": logging_info
                    })
                else:
                    result = self.worker.execute_method(
                        method, *args, **kwargs)
                    self.pipe.send(result)
            else:
                result = self.worker.execute_method(method, *args, **kwargs)
                self.pipe.send(result)
        except KeyboardInterrupt:
            logger.error(
                "Worker %d in loop received KeyboardInterrupt, aborting forward pass",
                self.rank)
            try:
                self.pipe.send(
                    {"error": "Operation aborted by KeyboardInterrupt"})
                logger.info("Worker %d sent error response after interrupt",
                            self.rank)
            except Exception as e:
                logger.error("Worker %d failed to send error response: %s",
                             self.rank, str(e))
            continue
fastvideo.worker.multiproc_executor.WorkerMultiprocProc.worker_main staticmethod
worker_main(*args, **kwargs)

Worker initialization and execution loops. This runs a background process

Source code in fastvideo/worker/multiproc_executor.py
@staticmethod
def worker_main(*args, **kwargs):
    """ Worker initialization and execution loops.
    This runs a background process """

    # Signal handler used for graceful termination.
    # SystemExit exception is only raised once to allow this and worker
    # processes to terminate without error
    shutdown_requested = False

    def signal_handler(signum, frame):
        nonlocal shutdown_requested
        if not shutdown_requested:
            shutdown_requested = True
            raise SystemExit()

    # Either SIGTERM or SIGINT will terminate the worker
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    kill_itself_when_parent_died()
    faulthandler.enable()
    parent_process = psutil.Process().parent()

    worker = None
    ready_pipe = kwargs.pop("ready_pipe")
    rank = kwargs.get("rank")

    try:
        worker = WorkerMultiprocProc(*args, **kwargs)

        # Send READY once we know everything is loaded
        ready_pipe.send({
            "status": WorkerMultiprocProc.READY_STR,
        })

        ready_pipe.close()
        ready_pipe = None

        worker.worker_busy_loop()

    except Exception:
        if ready_pipe is not None:
            logger.exception("WorkerMultiprocProc failed to start.")
        else:
            logger.exception("WorkerMultiprocProc failed.")

        # The parent sends a SIGTERM to all worker processes if
        # any worker dies. Set this value so we don't re-throw
        # SystemExit() to avoid zmq exceptions in __del__.
        shutdown_requested = True
        traceback = get_exception_traceback()
        logger.error("Worker %d hit an exception: %s", rank, traceback)
        parent_process.send_signal(signal.SIGQUIT)

    finally:
        if ready_pipe is not None:
            ready_pipe.close()
        # Clean up once worker exits busy loop
        if worker is not None:
            worker.shutdown()

Functions

fastvideo.worker.multiproc_executor.set_multiproc_executor_envs

set_multiproc_executor_envs() -> None

Set up environment variables that should be used when there are workers in a multiprocessing environment. This should be called by the parent process before worker processes are created

Source code in fastvideo/worker/multiproc_executor.py
def set_multiproc_executor_envs() -> None:
    """ Set up environment variables that should be used when there are workers
    in a multiprocessing environment. This should be called by the parent 
    process before worker processes are created"""

    force_spawn()