Initialize the device for the worker.
Source code in fastvideo/worker/gpu_worker.py
| def init_device(self) -> None:
"""Initialize the device for the worker."""
# torch.distributed.all_reduce does not free the input tensor until
# the synchronization point. This causes the memory usage to grow
# as the number of all_reduce calls increases. This env var disables
# this behavior.
# Related issue:
# https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573
os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1"
# This env var set by Ray causes exceptions with graph building.
os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None)
# Set environment variables BEFORE calling get_local_torch_device()
# so that each worker uses the correct device
if self.fastvideo_args.distributed_executor_backend == "mp":
os.environ["LOCAL_RANK"] = str(self.local_rank)
os.environ["RANK"] = str(self.rank)
os.environ["WORLD_SIZE"] = str(self.fastvideo_args.num_gpus)
# Platform-agnostic device initialization
self.device = get_local_torch_device()
from fastvideo.platforms import current_platform
# Set the CUDA device BEFORE any CUDA calls
if current_platform.is_cuda_alike():
torch.cuda.set_device(self.device)
self.init_gpu_memory = torch.cuda.mem_get_info(self.device)[0]
else:
# For MPS, we can't get memory info the same way
self.init_gpu_memory = 0
# Initialize the distributed environment.
maybe_init_distributed_environment_and_model_parallel(
self.fastvideo_args.tp_size, self.fastvideo_args.sp_size,
self.distributed_init_method)
self.pipeline = build_pipeline(self.fastvideo_args)
|