Skip to content

utils

Classes

fastvideo.utils.FlexibleArgumentParser

FlexibleArgumentParser(*args, **kwargs)

Bases: ArgumentParser

ArgumentParser that allows both underscore and dash in names.

Source code in fastvideo/utils.py
def __init__(self, *args, **kwargs) -> None:
    # Set the default 'formatter_class' to SortedHelpFormatter
    if 'formatter_class' not in kwargs:
        kwargs['formatter_class'] = SortedHelpFormatter
    super().__init__(*args, **kwargs)

fastvideo.utils.SortedHelpFormatter

Bases: HelpFormatter

SortedHelpFormatter that sorts arguments by their option strings.

Functions

fastvideo.utils.align_to

align_to(value: int, alignment: int) -> int

align height, width according to alignment

Parameters:

Name Type Description Default
value int

height or width

required
alignment int

target alignment factor

required

Returns:

Name Type Description
int int

the aligned value

Source code in fastvideo/utils.py
def align_to(value: int, alignment: int) -> int:
    """align height, width according to alignment

    Args:
        value (int): height or width
        alignment (int): target alignment factor

    Returns:
        int: the aligned value
    """
    return int(math.ceil(value / alignment) * alignment)

fastvideo.utils.cuda_is_initialized

cuda_is_initialized() -> bool

Check if CUDA is initialized.

Source code in fastvideo/utils.py
def cuda_is_initialized() -> bool:
    """Check if CUDA is initialized."""
    if not torch.cuda._is_compiled():
        return False
    return torch.cuda.is_initialized()

fastvideo.utils.current_stream

current_stream() -> Stream | None

replace torch.cuda.current_stream() with fastvideo.utils.current_stream(). it turns out that torch.cuda.current_stream() is quite expensive, as it will construct a new stream object at each call. here we patch torch.cuda.set_stream to keep track of the current stream directly, so that we can avoid calling torch.cuda.current_stream().

the underlying hypothesis is that we do not call torch._C._cuda_setStream from C/C++ code.

Source code in fastvideo/utils.py
def current_stream() -> torch.cuda.Stream | None:
    """
    replace `torch.cuda.current_stream()` with `fastvideo.utils.current_stream()`.
    it turns out that `torch.cuda.current_stream()` is quite expensive,
    as it will construct a new stream object at each call.
    here we patch `torch.cuda.set_stream` to keep track of the current stream
    directly, so that we can avoid calling `torch.cuda.current_stream()`.

    the underlying hypothesis is that we do not call `torch._C._cuda_setStream`
    from C/C++ code.
    """
    from fastvideo.platforms import current_platform

    # For non-CUDA platforms, return None
    if not current_platform.is_cuda_alike():
        return None

    global _current_stream
    if _current_stream is None:
        # when this function is called before any stream is set,
        # we return the default stream.
        # On ROCm using the default 0 stream in combination with RCCL
        # is hurting performance. Therefore creating a dedicated stream
        # per process
        _current_stream = torch.cuda.Stream() if current_platform.is_rocm(
        ) else torch.cuda.current_stream()
    return _current_stream

fastvideo.utils.decorate_logs

decorate_logs(process_name: str | None = None) -> None

Adds a process-specific prefix to each line of output written to stdout and stderr.

Parameters:

Name Type Description Default
process_name str | None

Optional; the name of the process to use in the prefix. If not provided, the current process name from the multiprocessing context is used.

None
Source code in fastvideo/utils.py
def decorate_logs(process_name: str | None = None) -> None:
    """
    Adds a process-specific prefix to each line of output written to stdout and
    stderr.

    Args:
        process_name: Optional; the name of the process to use in the prefix.
            If not provided, the current process name from the multiprocessing
            context is used.
    """
    if process_name is None:
        process_name = get_mp_context().current_process().name
    pid = os.getpid()
    _add_prefix(sys.stdout, process_name, pid)
    _add_prefix(sys.stderr, process_name, pid)

fastvideo.utils.dict_to_3d_list

dict_to_3d_list(mask_strategy: dict[str, Any] | None = None, t_max: int | None = None, l_max: int | None = None, h_max: int | None = None) -> list[list[list[Tensor | None]]]

Convert a dictionary of mask indices to a 3D list of tensors. Args: mask_strategy: keys are "t_l_h", values are torch.Tensor masks. t_max, l_max, h_max: if provided (all three), force the output shape to (t_max, l_max, h_max). If all three are None, infer shape from the data.

Source code in fastvideo/utils.py
def dict_to_3d_list(
    mask_strategy: dict[str, Any] | None = None,
    t_max: int | None = None,
    l_max: int | None = None,
    h_max: int | None = None,
) -> list[list[list[torch.Tensor | None]]]:
    """
    Convert a dictionary of mask indices to a 3D list of tensors.
    Args:
        mask_strategy: keys are "t_l_h", values are torch.Tensor masks.
        t_max, l_max, h_max: if provided (all three), force the output shape to (t_max, l_max, h_max).
                            If all three are None, infer shape from the data.
    """
    # Case 1: no data, but fixed shape requested
    if mask_strategy is None:
        assert t_max is not None and l_max is not None and h_max is not None, (
            "If mask_strategy is None, you must provide t_max, l_max, and h_max"
        )
        return [[[None for _ in range(h_max)] for _ in range(l_max)]
                for _ in range(t_max)]

    # Parse all keys into integer tuples
    indices = [tuple(map(int, key.split("_"))) for key in mask_strategy]

    # Decide on dimensions
    if t_max is None and l_max is None and h_max is None:
        # fully dynamic: infer from data
        max_timesteps_idx = max(t for t, _, _ in indices) + 1
        max_layer_idx = max(l for _, l, _ in indices) + 1  # noqa: E741
        max_head_idx = max(h for _, _, h in indices) + 1
    else:
        # require all three to be provided
        assert t_max is not None and l_max is not None and h_max is not None, (
            "Either supply none of (t_max, l_max, h_max) to infer dimensions, "
            "or supply all three to fix the shape.")
        max_timesteps_idx = t_max
        max_layer_idx = l_max
        max_head_idx = h_max

    # Preallocate
    result = [[[None for _ in range(max_head_idx)]
               for _ in range(max_layer_idx)] for _ in range(max_timesteps_idx)]

    # Fill in, skipping any out-of-bounds entries
    for key, value in mask_strategy.items():
        t, l, h = map(int, key.split("_"))  # noqa: E741
        if 0 <= t < max_timesteps_idx and 0 <= l < max_layer_idx and 0 <= h < max_head_idx:
            result[t][l][h] = value
        # else: silently ignore any key that doesn't fit

    return result

fastvideo.utils.find_hccl_library

find_hccl_library() -> str

We either use the library file specified by the HCCL_SO_PATH environment variable, or we find the library file brought by PyTorch. After importing torch, libhccl.so can be found by ctypes automatically.

Source code in fastvideo/utils.py
def find_hccl_library() -> str:
    """
    We either use the library file specified by the `HCCL_SO_PATH`
    environment variable, or we find the library file brought by PyTorch.
    After importing `torch`, `libhccl.so` can be
    found by `ctypes` automatically.
    """
    so_file = envs.HCCL_SO_PATH

    # manually load the nccl library
    if so_file:
        logger.info("Found hccl from environment variable HCCL_SO_PATH=%s",
                    so_file)
    else:
        if torch.version.cann is not None:  # codespell:ignore cann
            so_file = "libhccl.so"
        else:
            raise ValueError("HCCL only supports Ascend NPU backends.")
        logger.info("Found hccl from library %s", so_file)
    return so_file

fastvideo.utils.find_nccl_library

find_nccl_library() -> str

We either use the library file specified by the FASTVIDEO_NCCL_SO_PATH environment variable, or we find the library file brought by PyTorch. After importing torch, libnccl.so.2 or librccl.so.1 can be found by ctypes automatically.

Source code in fastvideo/utils.py
def find_nccl_library() -> str:
    """
    We either use the library file specified by the `FASTVIDEO_NCCL_SO_PATH`
    environment variable, or we find the library file brought by PyTorch.
    After importing `torch`, `libnccl.so.2` or `librccl.so.1` can be
    found by `ctypes` automatically.
    """
    so_file = envs.FASTVIDEO_NCCL_SO_PATH

    # manually load the nccl library
    if so_file:
        logger.info(
            "Found nccl from environment variable FASTVIDEO_NCCL_SO_PATH=%s",
            so_file)
    else:
        if torch.version.cuda is not None:
            so_file = "libnccl.so.2"
        elif torch.version.hip is not None:
            so_file = "librccl.so.1"
        else:
            raise ValueError("NCCL only supports CUDA and ROCm backends.")
        logger.info("Found nccl from library %s", so_file)
    return str(so_file)

fastvideo.utils.get_compute_dtype

get_compute_dtype() -> dtype

Get the current compute dtype from mixed precision policy.

Returns:

Type Description
dtype

torch.dtype: The compute dtype to use, defaults to get_default_dtype() if no policy set

Source code in fastvideo/utils.py
def get_compute_dtype() -> torch.dtype:
    """Get the current compute dtype from mixed precision policy.

    Returns:
        torch.dtype: The compute dtype to use, defaults to get_default_dtype() if no policy set
    """
    if not hasattr(_mixed_precision_state, 'state'):
        return torch.get_default_dtype()
    else:
        state = get_mixed_precision_state()
        return state.param_dtype

fastvideo.utils.get_mixed_precision_state

get_mixed_precision_state() -> MixedPrecisionState

Get the current mixed precision state.

Source code in fastvideo/utils.py
def get_mixed_precision_state() -> MixedPrecisionState:
    """Get the current mixed precision state."""
    if not hasattr(_mixed_precision_state, 'state'):
        raise ValueError("Mixed precision state not set")
    return cast(MixedPrecisionState, _mixed_precision_state.state)

fastvideo.utils.get_mp_context

get_mp_context() -> BaseContext

Get a multiprocessing context with a particular method (spawn or fork). By default we follow the value of the FASTVIDEO_WORKER_MULTIPROC_METHOD to determine the multiprocessing method (default is fork). However, under certain conditions, we may enforce spawn and override the value of FASTVIDEO_WORKER_MULTIPROC_METHOD.

Source code in fastvideo/utils.py
def get_mp_context() -> BaseContext:
    """Get a multiprocessing context with a particular method (spawn or fork).
    By default we follow the value of the FASTVIDEO_WORKER_MULTIPROC_METHOD to
    determine the multiprocessing method (default is fork). However, under
    certain conditions, we may enforce spawn and override the value of
    FASTVIDEO_WORKER_MULTIPROC_METHOD.
    """
    force_spawn()
    mp_method = envs.FASTVIDEO_WORKER_MULTIPROC_METHOD
    return multiprocessing.get_context(mp_method)

fastvideo.utils.import_pynvml

import_pynvml()

Historical comments:

libnvml.so is the library behind nvidia-smi, and pynvml is a Python wrapper around it. We use it to get GPU status without initializing CUDA context in the current process. Historically, there are two packages that provide pynvml: - nvidia-ml-py (https://pypi.org/project/nvidia-ml-py/): The official wrapper. It is a dependency of FastVideo, and is installed when users install FastVideo. It provides a Python module named pynvml. - pynvml (https://pypi.org/project/pynvml/): An unofficial wrapper. Prior to version 12.0, it also provides a Python module pynvml, and therefore conflicts with the official one which is a standalone Python file. This causes errors when both of them are installed. Starting from version 12.0, it migrates to a new module named pynvml_utils to avoid the conflict. It is so confusing that many packages in the community use the unofficial one by mistake, and we have to handle this case. For example, nvcr.io/nvidia/pytorch:24.12-py3 uses the unofficial one, and it will cause errors, see the issue https://github.com/vllm-project/vllm/issues/12847 for example. After all the troubles, we decide to copy the official pynvml module to our codebase, and use it directly.

Source code in fastvideo/utils.py
def import_pynvml():
    """
    Historical comments:

    libnvml.so is the library behind nvidia-smi, and
    pynvml is a Python wrapper around it. We use it to get GPU
    status without initializing CUDA context in the current process.
    Historically, there are two packages that provide pynvml:
    - `nvidia-ml-py` (https://pypi.org/project/nvidia-ml-py/): The official
        wrapper. It is a dependency of FastVideo, and is installed when users
        install FastVideo. It provides a Python module named `pynvml`.
    - `pynvml` (https://pypi.org/project/pynvml/): An unofficial wrapper.
        Prior to version 12.0, it also provides a Python module `pynvml`,
        and therefore conflicts with the official one which is a standalone Python file.
        This causes errors when both of them are installed.
        Starting from version 12.0, it migrates to a new module
        named `pynvml_utils` to avoid the conflict.
    It is so confusing that many packages in the community use the
    unofficial one by mistake, and we have to handle this case.
    For example, `nvcr.io/nvidia/pytorch:24.12-py3` uses the unofficial
    one, and it will cause errors, see the issue
    https://github.com/vllm-project/vllm/issues/12847 for example.
    After all the troubles, we decide to copy the official `pynvml`
    module to our codebase, and use it directly.
    """
    import fastvideo.third_party.pynvml as pynvml
    return pynvml

fastvideo.utils.log_torch_cuda_memory

log_torch_cuda_memory(tag: str | None = None, *, log_fn: Callable[[str], None] | None = None, log_file_path: str | PathLike[str] | None = 'memory_trace.txt') -> None

Log CUDA memory statistics via logger and append to a trace file.

Source code in fastvideo/utils.py
def log_torch_cuda_memory(
        tag: str | None = None,
        *,
        log_fn: Callable[[str], None] | None = None,
        log_file_path: str | os.PathLike[str] | None = "memory_trace.txt"
) -> None:
    """Log CUDA memory statistics via logger and append to a trace file."""

    log_fn = log_fn or logger.info
    prefix = f"[{tag}] " if tag else ""

    if not torch.cuda.is_available():
        message = f"{prefix}CUDA not available on this host."
        log_fn(message)
        _append_to_memory_trace(message, log_file_path)
        return

    try:
        device_index = torch.cuda.current_device()
        device_name = torch.cuda.get_device_name(device_index)
        allocated = torch.cuda.memory_allocated(device_index)
        reserved = torch.cuda.memory_reserved(device_index)
        max_allocated = torch.cuda.max_memory_allocated(device_index)
        max_reserved = torch.cuda.max_memory_reserved(device_index)
        free_mem, total_mem = torch.cuda.mem_get_info(device_index)
    except Exception as exc:  # noqa: BLE001
        message = f"{prefix}Unable to query CUDA memory stats: {exc}"
        log_fn(message)
        _append_to_memory_trace(message, log_file_path)
        return

    used_mem = total_mem - free_mem

    stats = [
        f"device={device_name} (index={device_index})",
        f"allocated={_format_bytes(allocated)}",
        f"reserved={_format_bytes(reserved)}",
        f"max_allocated={_format_bytes(max_allocated)}",
        f"max_reserved={_format_bytes(max_reserved)}",
        f"used={_format_bytes(used_mem)}",
        f"free={_format_bytes(free_mem)}",
        f"total={_format_bytes(total_mem)}",
    ]

    message = f"{prefix}CUDA memory stats: {' | '.join(stats)}"
    log_fn(message)
    _append_to_memory_trace(message, log_file_path)

fastvideo.utils.maybe_download_lora

maybe_download_lora(model_name_or_path: str, local_dir: str | None = None, download: bool = True) -> str

Check if the model path is a Hugging Face Hub model ID and download it if needed. Args: model_name_or_path: Local path or Hugging Face Hub model ID local_dir: Local directory to save the model download: Whether to download the model from Hugging Face Hub

Returns:

Type Description
str

Local path to the model

Source code in fastvideo/utils.py
def maybe_download_lora(model_name_or_path: str,
                        local_dir: str | None = None,
                        download: bool = True) -> str:
    """
    Check if the model path is a Hugging Face Hub model ID and download it if needed.
    Args:
        model_name_or_path: Local path or Hugging Face Hub model ID
        local_dir: Local directory to save the model
        download: Whether to download the model from Hugging Face Hub

    Returns:
        Local path to the model
    """

    local_path = maybe_download_model(model_name_or_path, local_dir, download)
    weight_name = _best_guess_weight_name(model_name_or_path,
                                          file_extension=".safetensors")
    return os.path.join(local_path, weight_name)

fastvideo.utils.maybe_download_model

maybe_download_model(model_name_or_path: str, local_dir: str | None = None, download: bool = True) -> str

Check if the model path is a Hugging Face Hub model ID and download it if needed.

Parameters:

Name Type Description Default
model_name_or_path str

Local path or Hugging Face Hub model ID

required
local_dir str | None

Local directory to save the model

None
download bool

Whether to download the model from Hugging Face Hub

True

Returns:

Type Description
str

Local path to the model

Source code in fastvideo/utils.py
def maybe_download_model(model_name_or_path: str,
                         local_dir: str | None = None,
                         download: bool = True) -> str:
    """
    Check if the model path is a Hugging Face Hub model ID and download it if needed.

    Args:
        model_name_or_path: Local path or Hugging Face Hub model ID
        local_dir: Local directory to save the model
        download: Whether to download the model from Hugging Face Hub

    Returns:
        Local path to the model
    """

    # If the path exists locally, return it
    if os.path.exists(model_name_or_path):
        logger.info("Model already exists locally at %s", model_name_or_path)
        return model_name_or_path

    # Otherwise, assume it's a HF Hub model ID and try to download it
    try:
        logger.info("Downloading model snapshot from HF Hub for %s...",
                    model_name_or_path)
        with get_lock(model_name_or_path):
            local_path = snapshot_download(
                repo_id=model_name_or_path,
                ignore_patterns=["*.onnx", "*.msgpack"],
                local_dir=local_dir)
        logger.info("Downloaded model to %s", local_path)
        return str(local_path)
    except Exception as e:
        raise ValueError(
            f"Could not find model at {model_name_or_path} and failed to download from HF Hub: {e}"
        ) from e

fastvideo.utils.maybe_download_model_index

maybe_download_model_index(model_name_or_path: str) -> dict[str, Any]

Download and extract just the model_index.json for a Hugging Face model.

Parameters:

Name Type Description Default
model_name_or_path str

Path or HF Hub model ID

required

Returns:

Type Description
dict[str, Any]

The parsed model_index.json as a dictionary

Source code in fastvideo/utils.py
def maybe_download_model_index(model_name_or_path: str) -> dict[str, Any]:
    """
    Download and extract just the model_index.json for a Hugging Face model.

    Args:
        model_name_or_path: Path or HF Hub model ID

    Returns:
        The parsed model_index.json as a dictionary
    """
    import tempfile

    from huggingface_hub import hf_hub_download

    # If it's a local path, verify it directly
    if os.path.exists(model_name_or_path):
        return verify_model_config_and_directory(model_name_or_path)

    # For remote models, download just the model_index.json
    try:
        with tempfile.TemporaryDirectory() as tmp_dir:
            # Download just the model_index.json file
            model_index_path = hf_hub_download(repo_id=model_name_or_path,
                                               filename="model_index.json",
                                               local_dir=tmp_dir)

            # Load the model_index.json
            with open(model_index_path) as f:
                config: dict[str, Any] = json.load(f)

            # Verify it has the required fields
            if "_class_name" not in config:
                raise ValueError(
                    f"model_index.json for {model_name_or_path} does not contain _class_name field"
                )

            if "_diffusers_version" not in config:
                raise ValueError(
                    f"model_index.json for {model_name_or_path} does not contain _diffusers_version field"
                )

            # Add the pipeline name for downstream use
            config["pipeline_name"] = config["_class_name"]

            logger.info("Downloaded model_index.json for %s, pipeline: %s",
                        model_name_or_path, config["_class_name"])
            return config

    except Exception as e:
        raise ValueError(
            f"Failed to download or parse model_index.json for {model_name_or_path}: {e}"
        ) from e

fastvideo.utils.resolve_obj_by_qualname

resolve_obj_by_qualname(qualname: str) -> Any

Resolve an object by its fully qualified name.

Source code in fastvideo/utils.py
def resolve_obj_by_qualname(qualname: str) -> Any:
    """
    Resolve an object by its fully qualified name.
    """
    module_name, obj_name = qualname.rsplit(".", 1)
    module = importlib.import_module(module_name)
    return getattr(module, obj_name)

fastvideo.utils.run_method

run_method(obj: Any, method: str | bytes | Callable, args: tuple[Any], kwargs: dict[str, Any]) -> Any

Run a method of an object with the given arguments and keyword arguments. If the method is string, it will be converted to a method using getattr. If the method is serialized bytes and will be deserialized using cloudpickle. If the method is a callable, it will be called directly.

Source code in fastvideo/utils.py
def run_method(obj: Any, method: str | bytes | Callable, args: tuple[Any],
               kwargs: dict[str, Any]) -> Any:
    """
    Run a method of an object with the given arguments and keyword arguments.
    If the method is string, it will be converted to a method using getattr.
    If the method is serialized bytes and will be deserialized using
    cloudpickle.
    If the method is a callable, it will be called directly.
    """
    if isinstance(method, bytes):
        func = partial(cloudpickle.loads(method), obj)
    elif isinstance(method, str):
        try:
            func = getattr(obj, method)
        except AttributeError:
            raise NotImplementedError(f"Method {method!r} is not"
                                      " implemented.") from None
    else:
        func = partial(method, obj)  # type: ignore
    return func(*args, **kwargs)

fastvideo.utils.set_mixed_precision_policy

set_mixed_precision_policy(param_dtype: dtype, reduce_dtype: dtype, output_dtype: dtype | None = None, mp_policy: MixedPrecisionPolicy | None = None)

Set mixed precision policy globally.

Parameters:

Name Type Description Default
param_dtype dtype

Parameter dtype used for training

required
reduce_dtype dtype

Reduction dtype used for gradients

required
output_dtype dtype | None

Optional output dtype

None
Source code in fastvideo/utils.py
def set_mixed_precision_policy(
    param_dtype: torch.dtype,
    reduce_dtype: torch.dtype,
    output_dtype: torch.dtype | None = None,
    mp_policy: MixedPrecisionPolicy | None = None,
):
    """Set mixed precision policy globally.

    Args:
        param_dtype: Parameter dtype used for training
        reduce_dtype: Reduction dtype used for gradients
        output_dtype: Optional output dtype
    """
    state = MixedPrecisionState(
        param_dtype=param_dtype,
        reduce_dtype=reduce_dtype,
        output_dtype=output_dtype,
        mp_policy=mp_policy,
    )
    _mixed_precision_state.state = state

fastvideo.utils.verify_model_config_and_directory

verify_model_config_and_directory(model_path: str) -> dict[str, Any]

Verify that the model directory contains a valid diffusers configuration.

Parameters:

Name Type Description Default
model_path str

Path to the model directory

required

Returns:

Type Description
dict[str, Any]

The loaded model configuration as a dictionary

Source code in fastvideo/utils.py
def verify_model_config_and_directory(model_path: str) -> dict[str, Any]:
    """
    Verify that the model directory contains a valid diffusers configuration.

    Args:
        model_path: Path to the model directory

    Returns:
        The loaded model configuration as a dictionary
    """

    # Check for model_index.json which is required for diffusers models
    config_path = os.path.join(model_path, "model_index.json")
    if not os.path.exists(config_path):
        raise ValueError(
            f"Model directory {model_path} does not contain model_index.json. "
            "Only Hugging Face diffusers format is supported.")

    # Check for transformer and vae directories
    transformer_dir = os.path.join(model_path, "transformer")
    vae_dir = os.path.join(model_path, "vae")

    if not os.path.exists(transformer_dir):
        raise ValueError(
            f"Model directory {model_path} does not contain a transformer/ directory."
        )

    if not os.path.exists(vae_dir):
        raise ValueError(
            f"Model directory {model_path} does not contain a vae/ directory.")

    # Load the config
    with open(config_path) as f:
        config = json.load(f)

    # Verify diffusers version exists
    if "_diffusers_version" not in config:
        raise ValueError("model_index.json does not contain _diffusers_version")

    logger.info("Diffusers version: %s", config["_diffusers_version"])
    return cast(dict[str, Any], config)

fastvideo.utils.warn_for_unimplemented_methods

warn_for_unimplemented_methods(cls: type[T]) -> type[T]

A replacement for abc.ABC. When we use abc.ABC, subclasses will fail to instantiate if they do not implement all abstract methods. Here, we only require raise NotImplementedError in the base class, and log a warning if the method is not implemented in the subclass.

Source code in fastvideo/utils.py
def warn_for_unimplemented_methods(cls: type[T]) -> type[T]:
    """
    A replacement for `abc.ABC`.
    When we use `abc.ABC`, subclasses will fail to instantiate
    if they do not implement all abstract methods.
    Here, we only require `raise NotImplementedError` in the
    base class, and log a warning if the method is not implemented
    in the subclass.
    """

    original_init = cls.__init__

    def find_unimplemented_methods(self: object):
        unimplemented_methods = []
        for attr_name in dir(self):
            # bypass inner method
            if attr_name.startswith('_'):
                continue

            try:
                attr = getattr(self, attr_name)
                # get the func of callable method
                if callable(attr):
                    attr_func = attr.__func__
            except AttributeError:
                continue
            src = inspect.getsource(attr_func)
            if "NotImplementedError" in src:
                unimplemented_methods.append(attr_name)
        if unimplemented_methods:
            method_names = ','.join(unimplemented_methods)
            msg = (f"Methods {method_names} not implemented in {self}")
            logger.warning(msg)

    @wraps(original_init)
    def wrapped_init(self, *args, **kwargs) -> None:
        original_init(self, *args, **kwargs)
        find_unimplemented_methods(self)

    type.__setattr__(cls, '__init__', wrapped_init)
    return cls

fastvideo.utils.xpu_is_initialized

xpu_is_initialized() -> bool

Check if XPU is initialized.

Source code in fastvideo/utils.py
def xpu_is_initialized() -> bool:
    """Check if XPU is initialized."""
    if not torch.xpu._is_compiled():
        return False
    return torch.xpu.is_initialized()