Initialize the distributed cluster with Ray.
it will connect to the Ray cluster and create a placement group
for the workers, which includes the specification of the resources
for each distributed worker.
Parameters:
| Name |
Type |
Description |
Default |
parallel_config
|
|
The configurations for parallel execution.
|
required
|
ray_address
|
str | None
|
The address of the Ray cluster. If None, uses
the default Ray cluster address.
|
None
|
Source code in fastvideo/worker/ray_utils.py
| def initialize_ray_cluster(
fastvideo_args: FastVideoArgs,
ray_address: str | None = None,
):
"""Initialize the distributed cluster with Ray.
it will connect to the Ray cluster and create a placement group
for the workers, which includes the specification of the resources
for each distributed worker.
Args:
parallel_config: The configurations for parallel execution.
ray_address: The address of the Ray cluster. If None, uses
the default Ray cluster address.
"""
assert_ray_available()
from fastvideo.platforms import current_platform
if ray.is_initialized():
logger.info("Ray is already initialized. Skipping Ray initialization.")
elif current_platform.is_rocm() or current_platform.is_xpu():
# Try to connect existing ray instance and create a new one if not found
try:
ray.init("auto")
except ConnectionError:
logger.warning(
"No existing RAY instance detected. "
"A new instance will be launched with current node resources.")
ray.init(address=ray_address,
num_gpus=fastvideo_args.num_gpus,
runtime_env=fastvideo_args.ray_runtime_env)
else:
ray.init(address=ray_address,
runtime_env=fastvideo_args.ray_runtime_env)
device_str = current_platform.ray_device_key
if not device_str:
raise ValueError(
f"current platform {current_platform.device_name} does not "
"support ray.")
# Create or get the placement group for worker processes
if fastvideo_args.ray_placement_group:
current_placement_group = fastvideo_args.ray_placement_group
else:
current_placement_group = ray.util.get_current_placement_group()
if current_placement_group:
logger.info("Using the existing placement group")
# We are in a placement group
bundles = current_placement_group.bundle_specs
# Verify that we can use the placement group.
device_bundles = 0
for bundle in bundles:
bundle_devices = bundle.get(device_str, 0)
if bundle_devices > 1:
raise ValueError(
"Placement group bundle cannot have more than 1 "
f"{device_str}.")
if bundle_devices:
device_bundles += 1
if fastvideo_args.num_gpus > device_bundles:
raise ValueError(
f"The number of required {device_str}s exceeds the total "
f"number of available {device_str}s in the placement group. "
f"Required number of devices: {fastvideo_args.num_gpus}. "
f"Total number of devices: {device_bundles}.")
else:
logger.info("No current placement group found. "
"Creating a new placement group.")
num_devices_in_cluster = ray.cluster_resources().get(device_str, 0)
# Log a warning message and delay resource allocation failure response.
# Avoid immediate rejection to allow user-initiated placement group
# created and wait cluster to be ready
if fastvideo_args.num_gpus > num_devices_in_cluster:
logger.warning(
"The number of required %ss exceeds the total "
"number of available %ss in the placement group.", device_str,
device_str)
# Create a new placement group
placement_group_specs: list[dict[str, float]] = ([{
device_str: 1.0
} for _ in range(fastvideo_args.num_gpus)])
# FastVideo engine is also a worker to execute model with an accelerator,
# so it requires to have the device in a current node. Check if
# the current node has at least one device.
current_ip = get_ip()
current_node_id = ray.get_runtime_context().get_node_id()
current_node_resource = available_resources_per_node()[current_node_id]
if current_node_resource.get(device_str, 0) < 1:
raise ValueError(
f"Current node has no {device_str} available. "
f"{current_node_resource=}. FastVideo engine cannot start without "
f"{device_str}. Make sure you have at least 1 {device_str} "
f"available in a node {current_node_id=} {current_ip=}.")
# This way, at least bundle is required to be created in a current
# node.
placement_group_specs[0][f"node:{current_ip}"] = 0.001
# By default, Ray packs resources as much as possible.
current_placement_group = ray.util.placement_group(
placement_group_specs, strategy="PACK")
_wait_until_pg_ready(current_placement_group)
assert current_placement_group is not None
_verify_bundles(current_placement_group, fastvideo_args, device_str)
# Set the placement group in the fastvideo args
fastvideo_args.ray_placement_group = current_placement_group
|