LayerwiseOffloadManager(model: Module, *, module_list_attr: str, num_layers: int, enabled: bool, pin_cpu_memory: bool = True, auto_initialize: bool = False)
A lightweight layerwise CPU offload manager.
Offloads per-layer parameters/buffers from GPU to CPU, and supports async H2D
prefetch using a dedicated CUDA stream.
Source code in fastvideo/models/layerwise_offload.py
| def __init__(
self,
model: torch.nn.Module,
*,
module_list_attr: str,
num_layers: int,
enabled: bool,
pin_cpu_memory: bool = True,
auto_initialize: bool = False,
) -> None:
self.model = model
self.module_list_attr = module_list_attr
self.num_layers = int(num_layers)
self.pin_cpu_memory = bool(pin_cpu_memory)
self.enabled = bool(enabled and torch.cuda.is_available())
self.device = (
torch.device("cuda", torch.cuda.current_device()) if self.enabled else None
)
self.copy_stream = torch.cuda.Stream() if self.enabled else None
self._layer_name_re = re.compile(
rf"(^|\.){re.escape(module_list_attr)}\.(\d+)(\.|$)"
)
self._cpu_weights: Dict[int, Dict[str, torch.Tensor]] = {}
self._cpu_dtypes: Dict[int, Dict[str, torch.dtype]] = {}
self._gpu_layers: Dict[int, Set[str]] = {}
self._named_parameters: Dict[str, torch.nn.Parameter] = {}
self._named_buffers: Dict[str, torch.Tensor] = {}
self._meta: Dict[str, Tuple[int, torch.dtype]] = {}
if auto_initialize:
self.initialize()
|
Functions
fastvideo.models.layerwise_offload.LayerwiseOffloadManager.initialize
Offload all matched layer tensors to CPU and prefetch layer 0 (sync).
Source code in fastvideo/models/layerwise_offload.py
| @torch.compiler.disable
def initialize(self) -> None:
"""Offload all matched layer tensors to CPU and prefetch layer 0 (sync)."""
if not self.enabled:
return
self._named_parameters = dict(self.model.named_parameters())
self._named_buffers = dict(self.model.named_buffers())
for name, param in self._named_parameters.items():
layer_idx = self._match_layer_idx(name)
if layer_idx is None or layer_idx >= self.num_layers:
continue
self._offload_tensor(name, param, layer_idx)
for name, buf in self._named_buffers.items():
layer_idx = self._match_layer_idx(name)
if layer_idx is None or layer_idx >= self.num_layers:
continue
self._offload_tensor(name, buf, layer_idx)
self.prefetch_layer(0, non_blocking=False)
if self.copy_stream is not None:
torch.cuda.current_stream().wait_stream(self.copy_stream)
|
fastvideo.models.layerwise_offload.LayerwiseOffloadManager.prefetch_layer
prefetch_layer(layer_idx: int, non_blocking: bool = True) -> None
Prefetch a layer's tensors from CPU to GPU (async on copy_stream).
Source code in fastvideo/models/layerwise_offload.py
| @torch.compiler.disable
def prefetch_layer(self, layer_idx: int, non_blocking: bool = True) -> None:
"""Prefetch a layer's tensors from CPU to GPU (async on copy_stream)."""
if not self.enabled or self.device is None or self.copy_stream is None:
return
if layer_idx < 0 or layer_idx >= self.num_layers:
return
if layer_idx in self._gpu_layers:
return
if layer_idx not in self._cpu_weights:
return
self.copy_stream.wait_stream(torch.cuda.current_stream())
param_names: Set[str] = set()
with torch.cuda.stream(self.copy_stream):
for name, cpu_weight in self._cpu_weights[layer_idx].items():
target = self._get_target(name)
gpu_weight = torch.empty(
cpu_weight.shape,
dtype=self._cpu_dtypes[layer_idx][name],
device=self.device,
)
gpu_weight.copy_(cpu_weight, non_blocking=non_blocking)
target.data = gpu_weight
param_names.add(name)
self._gpu_layers[layer_idx] = param_names
|
fastvideo.models.layerwise_offload.LayerwiseOffloadManager.release_all
Release all currently-resident layers back to placeholders.
Source code in fastvideo/models/layerwise_offload.py
| @torch.compiler.disable
def release_all(self) -> None:
"""Release all currently-resident layers back to placeholders."""
if not self.enabled or self.device is None:
return
if self.copy_stream is not None:
torch.cuda.current_stream().wait_stream(self.copy_stream)
for layer_idx in list(self._gpu_layers.keys()):
param_names = self._gpu_layers.pop(layer_idx, None)
if not param_names:
continue
for name in param_names:
target = self._get_target(name)
self._record_meta(name, target)
target.data = self._make_placeholder(name)
|
fastvideo.models.layerwise_offload.LayerwiseOffloadManager.release_layer
release_layer(layer_idx: int) -> None
Release a layer's tensors back to placeholders (free VRAM).
Source code in fastvideo/models/layerwise_offload.py
| @torch.compiler.disable
def release_layer(self, layer_idx: int) -> None:
"""Release a layer's tensors back to placeholders (free VRAM)."""
if not self.enabled or self.device is None:
return
if layer_idx < 0:
return
param_names = self._gpu_layers.pop(layer_idx, None)
if not param_names:
return
for name in param_names:
target = self._get_target(name)
# Ensure meta exists even if something unexpected happened
self._record_meta(name, target)
target.data = self._make_placeholder(name)
|