Skip to content

ray_utils

Classes

fastvideo.worker.ray_utils.RayWorkerWrapper

RayWorkerWrapper(*args, **kwargs)

Bases: WorkerWrapperBase

Ray wrapper for fastvideo.worker.Worker, allowing Worker to be lazily initialized after Ray sets CUDA_VISIBLE_DEVICES.

Source code in fastvideo/worker/ray_utils.py
def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)

Functions

fastvideo.worker.ray_utils.assert_ray_available

assert_ray_available() -> None

Raise an exception if Ray is not available.

Source code in fastvideo/worker/ray_utils.py
def assert_ray_available() -> None:
    """Raise an exception if Ray is not available."""
    if ray is None:
        raise ValueError(f"Failed to import Ray: {ray_import_err}."
                         "Please install Ray with `pip install ray`.")

fastvideo.worker.ray_utils.initialize_ray_cluster

initialize_ray_cluster(fastvideo_args: FastVideoArgs, ray_address: str | None = None)

Initialize the distributed cluster with Ray.

it will connect to the Ray cluster and create a placement group for the workers, which includes the specification of the resources for each distributed worker.

Parameters:

Name Type Description Default
parallel_config

The configurations for parallel execution.

required
ray_address str | None

The address of the Ray cluster. If None, uses the default Ray cluster address.

None
Source code in fastvideo/worker/ray_utils.py
def initialize_ray_cluster(
    fastvideo_args: FastVideoArgs,
    ray_address: str | None = None,
):
    """Initialize the distributed cluster with Ray.

    it will connect to the Ray cluster and create a placement group
    for the workers, which includes the specification of the resources
    for each distributed worker.

    Args:
        parallel_config: The configurations for parallel execution.
        ray_address: The address of the Ray cluster. If None, uses
            the default Ray cluster address.
    """
    assert_ray_available()
    from fastvideo.platforms import current_platform

    if ray.is_initialized():
        logger.info("Ray is already initialized. Skipping Ray initialization.")
    elif current_platform.is_rocm() or current_platform.is_xpu():
        # Try to connect existing ray instance and create a new one if not found
        try:
            ray.init("auto")
        except ConnectionError:
            logger.warning(
                "No existing RAY instance detected. "
                "A new instance will be launched with current node resources.")
            ray.init(address=ray_address,
                     num_gpus=fastvideo_args.num_gpus,
                     runtime_env=fastvideo_args.ray_runtime_env)
    else:
        ray.init(address=ray_address,
                 runtime_env=fastvideo_args.ray_runtime_env)

    device_str = current_platform.ray_device_key
    if not device_str:
        raise ValueError(
            f"current platform {current_platform.device_name} does not "
            "support ray.")

    # Create or get the placement group for worker processes
    if fastvideo_args.ray_placement_group:
        current_placement_group = fastvideo_args.ray_placement_group
    else:
        current_placement_group = ray.util.get_current_placement_group()

    if current_placement_group:
        logger.info("Using the existing placement group")

        # We are in a placement group
        bundles = current_placement_group.bundle_specs
        # Verify that we can use the placement group.
        device_bundles = 0
        for bundle in bundles:
            bundle_devices = bundle.get(device_str, 0)
            if bundle_devices > 1:
                raise ValueError(
                    "Placement group bundle cannot have more than 1 "
                    f"{device_str}.")
            if bundle_devices:
                device_bundles += 1
        if fastvideo_args.num_gpus > device_bundles:
            raise ValueError(
                f"The number of required {device_str}s exceeds the total "
                f"number of available {device_str}s in the placement group. "
                f"Required number of devices: {fastvideo_args.num_gpus}. "
                f"Total number of devices: {device_bundles}.")
    else:
        logger.info("No current placement group found. "
                    "Creating a new placement group.")
        num_devices_in_cluster = ray.cluster_resources().get(device_str, 0)
        # Log a warning message and delay resource allocation failure response.
        # Avoid immediate rejection to allow user-initiated placement group
        # created and wait cluster to be ready
        if fastvideo_args.num_gpus > num_devices_in_cluster:
            logger.warning(
                "The number of required %ss exceeds the total "
                "number of available %ss in the placement group.", device_str,
                device_str)
        # Create a new placement group
        placement_group_specs: list[dict[str, float]] = ([{
            device_str: 1.0
        } for _ in range(fastvideo_args.num_gpus)])

        # FastVideo engine is also a worker to execute model with an accelerator,
        # so it requires to have the device in a current node. Check if
        # the current node has at least one device.
        current_ip = get_ip()
        current_node_id = ray.get_runtime_context().get_node_id()
        current_node_resource = available_resources_per_node()[current_node_id]
        if current_node_resource.get(device_str, 0) < 1:
            raise ValueError(
                f"Current node has no {device_str} available. "
                f"{current_node_resource=}. FastVideo engine cannot start without "
                f"{device_str}. Make sure you have at least 1 {device_str} "
                f"available in a node {current_node_id=} {current_ip=}.")
        # This way, at least bundle is required to be created in a current
        # node.
        placement_group_specs[0][f"node:{current_ip}"] = 0.001

        # By default, Ray packs resources as much as possible.
        current_placement_group = ray.util.placement_group(
            placement_group_specs, strategy="PACK")
        _wait_until_pg_ready(current_placement_group)

    assert current_placement_group is not None
    _verify_bundles(current_placement_group, fastvideo_args, device_str)
    # Set the placement group in the fastvideo args
    fastvideo_args.ray_placement_group = current_placement_group

fastvideo.worker.ray_utils.is_in_ray_actor

is_in_ray_actor()

Check if we are in a Ray actor.

Source code in fastvideo/worker/ray_utils.py
def is_in_ray_actor():
    """Check if we are in a Ray actor."""

    try:
        import ray
        return (ray.is_initialized()
                and ray.get_runtime_context().get_actor_id() is not None)
    except ImportError:
        return False