Source code for xtuner.v1.rl.utils.ray_cpu_worker

from __future__ import annotations

import os
import threading
from typing import TypeAlias

import ray
from cyclopts import Parameter
from pydantic import BaseModel, ConfigDict, Field, field_validator
from ray.actor import ActorClass
from ray.util.placement_group import (
    VALID_PLACEMENT_GROUP_STRATEGIES,
    PlacementGroup,
    placement_group,
    placement_group_table,
)
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from typing_extensions import Annotated

from xtuner.v1.utils.logger import get_logger


PG_READY_TIMEOUT = os.getenv("XTUNER_PG_READY_TIMEOUT", 30)  # default 30 seconds
PlacementGroups: TypeAlias = PlacementGroup | list[PlacementGroup] | tuple[PlacementGroup, ...] | None
logger = get_logger()


[docs]class CPUResourcesConfig(BaseModel): """Configuration for CPU worker resources in XTuner. This class provides CPU resource options for Ray CPU workers. When used by ``AutoCPUWorkers`` the workers are launched in a CPU placement group. When used as ``cpu_resources`` by judgers or agent loops, the workers are managed by ``CPUResourceManager`` outside accelerator placement groups. Args: num_workers (int): Total number of CPU workers. Defaults to 1. num_cpus_per_worker (float): Number of CPUs to allocate per worker in the placement group or Ray actor. Defaults to 1. cpu_memory_per_worker (int): Amount of CPU memory (in bytes) to allocate for each worker. Defaults to 1 GiB. pg_pack_strategy (str): Ray placement group strategy used only when a CPU placement group is built. Defaults to "SPREAD". **Examples:** Example CPU resource configuration:: resources = CPUResourcesConfig( num_workers=4, num_cpus_per_worker=2, cpu_memory_per_worker=4 * 1024**3, ) """ model_config = ConfigDict(extra="forbid") num_workers: Annotated[int, Parameter(help="Number of workers."), Field(ge=1)] = 1 num_cpus_per_worker: Annotated[float, Parameter(help="Number of CPUs to allocate per worker."), Field(gt=0)] = 1 cpu_memory_per_worker: Annotated[ int, Parameter(help="Amount of memory (in bytes) to allocate per worker."), Field(gt=0) ] = 1024**3 # 1 GB pg_pack_strategy: Annotated[ str, Parameter(help="Placement group packing strategy, options: " + ", ".join(VALID_PLACEMENT_GROUP_STRATEGIES)), ] = "SPREAD" @field_validator("pg_pack_strategy") @classmethod def check_pg_pack_strategy(cls, v): if v not in VALID_PLACEMENT_GROUP_STRATEGIES: raise ValueError(f"pg_pack_strategy must be one of {VALID_PLACEMENT_GROUP_STRATEGIES}") return v
class CPUActorLauncher: """Low-level helper for launching CPU Ray actors. ``CPUActorLauncher`` only answers one question: given a class and explicit Ray resource/scheduling options, how do we create one or more CPU actors? It converts plain Python classes to Ray actor classes, optionally binds actors to a placement-group bundle, and forwards constructor args. It does **not** remember which placement-group bundles have already been used. If ``build_actors`` is called twice with the same placement group and no explicit ``start_bundle_idx``, both calls start from bundle 0. Example: .. code-block:: python workers_a = CPUActorLauncher.build_actors( WorkerA, cfg_a, pg=pg, start_bundle_idx=0, num_workers=2, actor_num_cpus_per_worker=1, ) workers_b = CPUActorLauncher.build_actors( WorkerB, cfg_b, pg=pg, start_bundle_idx=2, # caller must manage this offset num_workers=2, actor_num_cpus_per_worker=1, ) Use this class when the caller already knows exactly where the actor(s) should be placed, or when no placement group is involved. """ _ACTOR_CLASS_CACHE: dict[type, ActorClass] = {} @staticmethod def build_placement_group(resources_config: CPUResourcesConfig): bundles = [ { "CPU": resources_config.num_cpus_per_worker, "memory": resources_config.cpu_memory_per_worker, } ] * resources_config.num_workers pg = placement_group(bundles=bundles, strategy=resources_config.pg_pack_strategy) ray.get(pg.ready(), timeout=PG_READY_TIMEOUT) return pg @staticmethod def get_pg_options(pg: PlacementGroup, num_cpus: int | float = -1) -> dict: assert len(pg.bundle_specs) > 0, "Placement group has no bundles defined." default_cpu = pg.bundle_specs[0].get("CPU", 1) return {"num_cpus": num_cpus if num_cpus >= 0 else default_cpu} @classmethod def to_actor_class(cls, worker_cls): if hasattr(worker_cls, "remote") and hasattr(worker_cls, "options"): return worker_cls if worker_cls not in cls._ACTOR_CLASS_CACHE: cls._ACTOR_CLASS_CACHE[worker_cls] = ray.remote(worker_cls) return cls._ACTOR_CLASS_CACHE[worker_cls] @staticmethod def _get_bundle_resources(pg: PlacementGroup, bundle_idx: int) -> dict[str, float | int]: assert len(pg.bundle_specs) > bundle_idx, f"Placement group does not have bundle index {bundle_idx}." return pg.bundle_specs[bundle_idx] @classmethod def _resolve_actor_resources( cls, pg: PlacementGroup, bundle_idx: int, actor_num_cpus: int | float | None = None, actor_memory: int | None = None, ) -> tuple[float | int, int]: bundle = cls._get_bundle_resources(pg, bundle_idx) resolved_num_cpus = actor_num_cpus if actor_num_cpus is not None else bundle.get("CPU", 1) resolved_memory = actor_memory if actor_memory is not None else int(bundle.get("memory", 0)) assert bundle.get("CPU", 1) >= resolved_num_cpus, ( f"Placement group bundle {bundle_idx} does not have enough CPU resources." ) assert bundle.get("memory", 0) >= resolved_memory, ( f"Placement group bundle {bundle_idx} does not have enough memory resources." ) return resolved_num_cpus, resolved_memory @classmethod def build_actor( cls, worker_cls, *init_args, pg: PlacementGroup | None = None, bundle_idx: int = 0, actor_num_cpus: int | float | None = None, actor_memory: int | None = None, capture_child_tasks: bool = False, **init_kwargs, ): resolved_num_cpus = 1 if actor_num_cpus is None else actor_num_cpus resolved_memory = actor_memory actor_cls = cls.to_actor_class(worker_cls) actor_options = { "num_cpus": resolved_num_cpus, } if resolved_memory is not None and resolved_memory > 0: actor_options["memory"] = resolved_memory if pg is None: return actor_cls.options(**actor_options).remote(*init_args, **init_kwargs) resolved_num_cpus, resolved_memory = cls._resolve_actor_resources( pg=pg, bundle_idx=bundle_idx, actor_num_cpus=actor_num_cpus, actor_memory=actor_memory, ) actor_options["num_cpus"] = resolved_num_cpus if resolved_memory > 0: actor_options["memory"] = resolved_memory actor_options["scheduling_strategy"] = PlacementGroupSchedulingStrategy( placement_group=pg, placement_group_bundle_index=bundle_idx, placement_group_capture_child_tasks=capture_child_tasks, ) return actor_cls.options(**actor_options).remote(*init_args, **init_kwargs) @classmethod def build_actors( cls, worker_cls, *init_args, pg: PlacementGroup | None = None, start_bundle_idx: int = 0, num_workers: int = 1, actor_num_cpus_per_worker: int | float | None = None, actor_memory_per_worker: int | None = None, capture_child_tasks: bool = False, **init_kwargs, ): workers_list = [] for idx in range(num_workers): workers_list.append( cls.build_actor( worker_cls, *init_args, pg=pg, bundle_idx=start_bundle_idx + idx, actor_num_cpus=actor_num_cpus_per_worker, actor_memory=actor_memory_per_worker, capture_child_tasks=capture_child_tasks, **init_kwargs, ) ) return workers_list class AutoCPUWorkers(CPUActorLauncher): """Convenience wrapper for homogeneous CPU worker pools. ``AutoCPUWorkers`` builds on ``CPUActorLauncher`` for the common case where a CPU placement group contains one bundle per worker and each worker should consume the next available bundle. Its main extra behavior is a per-process placement-group cursor. When ``from_placement_group`` is called without ``start_bundle_idx``, it starts from the first bundle not previously consumed through ``AutoCPUWorkers`` for that placement group. This prevents repeated calls from accidentally placing different worker pools on the same bundles. Example: .. code-block:: python workers_a = AutoCPUWorkers.from_placement_group( WorkerA, cfg_a, pg, num_workers=2, ) # uses bundle 0 and 1 workers_b = AutoCPUWorkers.from_placement_group( WorkerB, cfg_b, pg, num_workers=2, ) # automatically uses bundle 2 and 3 ``from_config`` is an even higher-level shortcut: it first creates a CPU placement group from ``CPUResourcesConfig`` and then launches workers from it. Example: .. code-block:: python workers, pg = AutoCPUWorkers.from_config( MyCPUWorker, worker_config, CPUResourcesConfig(num_workers=4, num_cpus_per_worker=2), ) Use this class for homogeneous CPU worker pools backed by a CPU placement group. Use ``CPUActorLauncher`` directly when placement is explicit or when the actor should run outside a placement group. """ _PG_NEXT_BUNDLE_INDEX: dict[str, int] = {} _PG_NEXT_BUNDLE_INDEX_LOCK = threading.Lock() @staticmethod def _get_pg_key(pg: PlacementGroup) -> str: return str(pg.id) @classmethod def _reserve_bundle_range( cls, pg: PlacementGroup, num_workers: int, start_bundle_idx: int | None, ) -> tuple[int, int]: pg_key = cls._get_pg_key(pg) with cls._PG_NEXT_BUNDLE_INDEX_LOCK: current_cursor = cls._PG_NEXT_BUNDLE_INDEX.get(pg_key, 0) resolved_start_bundle_idx = current_cursor if start_bundle_idx is None else start_bundle_idx resolved_num_workers = num_workers if num_workers > 0 else pg.bundle_count - resolved_start_bundle_idx assert resolved_num_workers > 0, "At least one worker must be created from the placement group." assert resolved_start_bundle_idx >= 0, "start_bundle_idx must be non-negative." assert resolved_start_bundle_idx + resolved_num_workers <= pg.bundle_count, ( "Placement group does not have enough remaining bundles for the requested CPU workers." ) cls._PG_NEXT_BUNDLE_INDEX[pg_key] = max(current_cursor, resolved_start_bundle_idx + resolved_num_workers) return resolved_start_bundle_idx, resolved_num_workers @classmethod def from_config(cls, worker_cls, worker_config, cpu_config: CPUResourcesConfig): pg = cls.build_placement_group(cpu_config) workers_list = cls.from_placement_group(worker_cls, worker_config, pg) return workers_list, pg @classmethod def from_placement_group( cls, worker_cls, worker_config, pg: PlacementGroup, num_workers: int = -1, start_bundle_idx: int | None = None, ): start_bundle_idx, num_workers = cls._reserve_bundle_range( pg=pg, num_workers=num_workers, start_bundle_idx=start_bundle_idx ) default_cpu = cls._get_bundle_resources(pg, start_bundle_idx).get("CPU", 1) return cls.build_actors( worker_cls, worker_config, num_cpus=default_cpu, pg=pg, start_bundle_idx=start_bundle_idx, num_workers=num_workers, actor_num_cpus_per_worker=default_cpu, actor_memory_per_worker=None, ) class CPUResourceManager: """Validates and serves PG-external CPU actor allocations.""" def __init__( self, accelerator_placement_groups: PlacementGroups = None, ): self.pools: dict[str, CPUResourcesConfig] = {} self._registration_counts: dict[str, int] = {} if accelerator_placement_groups is None: self._accelerator_placement_groups: tuple[PlacementGroup, ...] = () elif isinstance(accelerator_placement_groups, PlacementGroup) or hasattr( accelerator_placement_groups, "bundle_specs" ): self._accelerator_placement_groups = (accelerator_placement_groups,) else: self._accelerator_placement_groups = tuple(accelerator_placement_groups) def get(self, name: str) -> CPUResourcesConfig: if name not in self.pools: raise KeyError(f"Unknown external CPU resource pool {name!r}. Available pools: {sorted(self.pools)}") return self.pools[name] def register(self, name: str, config: CPUResourcesConfig) -> None: registered_name = self._make_unique_registration_name(name) self.pools[registered_name] = config try: self.validate_or_raise() except Exception: del self.pools[registered_name] raise def log_initial_snapshot(self) -> None: resource_summary = self._build_resource_summary() self._log_resource_summary( resource_summary, include_registered=False, title=( "External CPU initial cluster snapshot. " "Registered external CPU pools are not shown yet and may change as components are built" ), ) def log_registered_summary(self) -> None: resource_summary = self._build_resource_summary() self._log_resource_summary( resource_summary, include_registered=True, title="External CPU resource summary before rollout/training", ) def validate_or_raise(self) -> None: resource_summary = self._build_resource_summary() cluster_cpus = resource_summary["cluster_cpus"] cluster_memory = int(resource_summary["cluster_memory"]) accelerator_cpus = resource_summary["accelerator_cpus"] accelerator_memory = int(resource_summary["accelerator_memory"]) external_cpus = resource_summary["external_capacity_cpus"] external_memory = int(resource_summary["external_capacity_memory"]) requested_cpus = resource_summary["registered_external_cpus"] requested_memory = int(resource_summary["registered_external_memory"]) max_node_external_cpus = resource_summary["max_node_external_cpus"] errors: list[str] = [] if requested_cpus > external_cpus: errors.append( f"CPU requested={requested_cpus:g}, available_outside_accelerator_pg={external_cpus:g}, " f"cluster_total={cluster_cpus:g}, accelerator_pg_reserved={accelerator_cpus:g}" ) if requested_memory > 0 and requested_memory > external_memory: errors.append( "memory requested=" f"{requested_memory}, available_outside_accelerator_pg={external_memory}, " f"cluster_total={cluster_memory}, accelerator_pg_reserved={accelerator_memory}" ) for name, pool in self.pools.items(): if pool.num_cpus_per_worker > max_node_external_cpus: errors.append( f"pool {name!r} requests {pool.num_cpus_per_worker:g} CPU per worker, " f"but the largest node has only {max_node_external_cpus:g} CPU outside accelerator PGs" ) if errors: self._log_resource_summary( resource_summary, include_registered=True, title="External CPU resource summary at validation failure", ) pool_lines = [ f" - {name}: {pool.num_workers} workers * {pool.num_cpus_per_worker:g} CPU" f", {pool.cpu_memory_per_worker} memory each" for name, pool in self.pools.items() ] raise RuntimeError( "Insufficient PG-external Ray resources for registered external CPU pools:\n" + "\n".join(errors) + "\nRegistered pools:\n" + ("\n".join(pool_lines) if pool_lines else " <none>") ) def _make_unique_registration_name(self, name: str) -> str: count = self._registration_counts.get(name, 0) + 1 self._registration_counts[name] = count if count == 1 and name not in self.pools: return name while True: candidate = f"{name}#{count}" if candidate not in self.pools: return candidate count += 1 self._registration_counts[name] = count def _accelerator_pg_resource_total(self, resource_name: str) -> float: total = 0.0 for pg in self._accelerator_placement_groups: for bundle in pg.bundle_specs: total += float(bundle.get(resource_name, 0)) return total def _build_resource_summary(self) -> dict[str, float]: cluster_resources = ray.cluster_resources() available_resources = ray.available_resources() cluster_cpus = float(cluster_resources.get("CPU", 0)) available_cpus = float(available_resources.get("CPU", 0)) cluster_memory = float(cluster_resources.get("memory", 0)) available_memory = float(available_resources.get("memory", 0)) accelerator_cpus = self._accelerator_pg_resource_total("CPU") accelerator_memory = self._accelerator_pg_resource_total("memory") registered_external_cpus = sum(pool.num_workers * pool.num_cpus_per_worker for pool in self.pools.values()) registered_external_memory = float( sum(pool.num_workers * pool.cpu_memory_per_worker for pool in self.pools.values()) ) external_capacity_cpus = cluster_cpus - accelerator_cpus external_capacity_memory = cluster_memory - accelerator_memory return { "cluster_cpus": cluster_cpus, "available_cpus": available_cpus, "accelerator_cpus": accelerator_cpus, "external_capacity_cpus": external_capacity_cpus, "ray_external_in_use_cpus": max(0.0, external_capacity_cpus - available_cpus), "registered_external_cpus": registered_external_cpus, "remaining_after_registered_cpus": external_capacity_cpus - registered_external_cpus, "cluster_memory": cluster_memory, "available_memory": available_memory, "accelerator_memory": accelerator_memory, "external_capacity_memory": external_capacity_memory, "ray_external_in_use_memory": max(0.0, external_capacity_memory - available_memory), "registered_external_memory": registered_external_memory, "remaining_after_registered_memory": external_capacity_memory - registered_external_memory, "max_node_external_cpus": self._max_node_external_resource("CPU"), } def _log_resource_summary(self, summary: dict[str, float], *, include_registered: bool, title: str) -> None: rows = [ ( "cluster_total", summary["cluster_cpus"], summary["cluster_memory"], "Ray cluster_resources total.", ), ( "accelerator_pg_reserved", summary["accelerator_cpus"], summary["accelerator_memory"], "Reserved by accelerator placement group(s).", ), ( "external_capacity", summary["external_capacity_cpus"], summary["external_capacity_memory"], "cluster_total - accelerator_pg_reserved.", ), ] if include_registered: rows.extend( [ ( "registered_external", summary["registered_external_cpus"], summary["registered_external_memory"], "External CPU pools managed by XTuner.", ), ( "remaining_after_registered", summary["remaining_after_registered_cpus"], summary["remaining_after_registered_memory"], "XTuner budget after registered pools.", ), ] ) rows.extend( [ ( f"pool:{name}", pool.num_workers * pool.num_cpus_per_worker, pool.num_workers * pool.cpu_memory_per_worker, f"{pool.num_workers} worker(s) * {pool.num_cpus_per_worker:g} CPU.", ) for name, pool in self.pools.items() ] ) logger.info(f"{title}:\n{self._format_resource_table(rows)}") @staticmethod def _format_resource_table(rows: list[tuple[str, float, float, str]]) -> str: headers = ("item", "CPU", "memory(GB)", "note") rendered_rows = [ ( item, f"{cpu:g}", "-" if memory == 0 else f"{memory / 1024**3:.2f}", note, ) for item, cpu, memory, note in rows ] widths = [max(len(str(row[idx])) for row in (headers, *rendered_rows)) for idx in range(len(headers))] def render(row: tuple[str, str, str, str]) -> str: return " | ".join(str(value).ljust(widths[idx]) for idx, value in enumerate(row)) separator = "-+-".join("-" * width for width in widths) return "\n".join([render(headers), separator, *(render(row) for row in rendered_rows), separator]) def _max_node_external_resource(self, resource_name: str) -> float: node_totals = { node["NodeID"]: float(node.get("Resources", {}).get(resource_name, 0)) for node in ray.nodes() if node.get("Alive", True) } if not node_totals: return 0.0 for pg in self._accelerator_placement_groups: table = placement_group_table(pg) bundles_to_node_id = table.get("bundles_to_node_id", {}) for bundle_idx, node_id in bundles_to_node_id.items(): try: bundle = pg.bundle_specs[int(bundle_idx)] except (IndexError, ValueError): continue if node_id in node_totals: node_totals[node_id] -= float(bundle.get(resource_name, 0)) return max(node_totals.values()) _CPU_RESOURCE_MANAGER: CPUResourceManager | None = None def format_cpu_resource_manager_uninitialized_error(owner: str) -> str: return ( f"{owner} sets cpu_resources, but CPUResourceManager is not initialized.\n" "This usually means the config is being built outside RLTrainer. In normal training, build the trainer first " "so XTuner can initialize CPUResourceManager after accelerator placement groups are created.\n" "For standalone tests or scripts, initialize it explicitly before building this component:\n" " from xtuner.v1.rl.utils import CPUResourceManager, set_cpu_resource_manager\n" " set_cpu_resource_manager(CPUResourceManager(accelerator_placement_groups=None))\n" "Note: standalone initialization does not account for accelerator placement group reservation unless you pass " "the placement group(s)." ) def set_cpu_resource_manager(manager: CPUResourceManager | None) -> None: global _CPU_RESOURCE_MANAGER _CPU_RESOURCE_MANAGER = manager def get_cpu_resource_manager() -> CPUResourceManager | None: return _CPU_RESOURCE_MANAGER def register_cpu_resources(name: str, cpu_resources: CPUResourcesConfig) -> None: if _CPU_RESOURCE_MANAGER is None: raise ValueError(format_cpu_resource_manager_uninitialized_error(name)) _CPU_RESOURCE_MANAGER.register(name=name, config=cpu_resources) def clear_cpu_resource_manager() -> None: set_cpu_resource_manager(None)