Judger#
Judger 是 XTuner RL 中负责给 rollout 结果打分的组件。它接收 RolloutState,读取模型生成结果和标签,计算 reward,并把分数写回 rollout_state.reward。后续的 advantage 计算、训练数据构建和默认评测逻辑都会依赖这个字段。
在典型流程中,AgentLoop 先调用推理引擎生成 response_ids,再把 token 解码成文本形式的 response,最后通过 run_judger() 调用 judger.judge(rollout_state) 或 judger.batch_judge(rollout_states)。
Judger 类型#
xtuner/v1/rl/judger/native.py 中定义了 Judger 的基础类型。可以先把它理解成两层:第一层是所有 Judger 统一实现的 judge() 和 batch_judge() 接口;第二层是不同运行形态,决定打分逻辑在本地执行、在 Ray actor 中执行,还是组合多个子 Judger 一起执行。
┌─────────────────────────────┐
│ Judger (ABC) │
│ async judge(RolloutState) │
│ async batch_judge(list) │
└──────────────┬──────────────┘
│
┌───────────────────────────────┼───────────────────────────────┐
│ │ │
┌───────────▼───────────┐ ┌───────────▼───────────┐ ┌───────────▼───────────┐
│ NativeJudger │ │ RemoteJudger │ │ JudgerPool │
│ │ │ │ │ │
│ 本地执行 judge() │ │ judge() 的远程代理 │ │ 多个 RemoteJudger 副本 │
│ 调用 reward_handler │ │ 调用 JudgerActor.remote │ │ 按 round-robin 分发 │
└───────────┬───────────┘ └───────────┬───────────┘ └───────────┬───────────┘
│ │ │
│ ▼ │
│ ┌───────────────────────┐ │
│ │ JudgerActor │ │
│ │ Ray worker 内部持有 │ │
│ │ NativeJudger │ │
│ └───────────┬───────────┘ │
│ │ │
└───────────────────────────────▼───────────────────────────────┘
reward_handler / HTTP endpoint
┌─────────────────────────────┐
│ ComposedJudger │
│ 多分支路由与结果合并 │
└──────────────┬──────────────┘
│
┌───────────────────────┼───────────────────────┐
│ │ │
┌───────▼───────┐ ┌───────▼───────┐ ┌───────▼───────┐
│ branch A │ │ branch B │ │ branch ... │
│ 任意 Judger │ │ 任意 Judger │ │ 任意 Judger │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
└───────────────────────┼───────────────────────┘
▼
data_source 选择 branch,merge_fn 合并 reward dict
RemoteJudger 的作用是给 Ray actor 形式的 Judger 提供一个对外统一接口。它本身不实现具体打分逻辑,而是持有 JudgerActor 的 actor proxy。judge() 或 batch_judge() 会先在 driver 侧把 RolloutState 转成轻量 payload,再调用 actor.judge_payload.remote(...),因此 Ray 序列化的是 payload,而不是完整 RolloutState。这样上层代码只需要面向 Judger 接口编程,不需要关心当前 Judger 是本地对象、Ray actor,还是多副本池中的一个远程副本。
构建时有两条入口:
JudgerConfig
├─ cpu_resources = None ──► NativeJudger
└─ cpu_resources = CPUResourcesConfig(...)
├─ num_workers = 1 ──► RemoteJudger
│ └─ JudgerActor
│ └─ NativeJudger
└─ num_workers > 1 ──► JudgerPool
├─ RemoteJudger -> JudgerActor -> NativeJudger
├─ RemoteJudger -> JudgerActor -> NativeJudger
└─ ...
ComposedJudgerConfig
└─ ComposedJudger
├─ branches["a"] -> JudgerConfig 或 ComposedJudgerConfig
├─ branches["b"] -> JudgerConfig 或 ComposedJudgerConfig
└─ RolloutState.data_source + merge_fn 控制路由和合并
普通 JudgerConfig 根据 cpu_resources 决定执行模式。cpu_resources 表示 PG 外 Ray CPU worker 的资源需求,类型为 CPUResourcesConfig:
配置 |
构建结果 |
含义 |
|---|---|---|
|
|
本地执行,不启动 Ray Judger actor |
|
|
单个 Ray actor 执行打分 |
|
|
多个 Ray actor 副本并发打分 |
CPUResourcesConfig.cpu_memory_per_worker 默认是 1024**3,通常不需要额外配置。PG 外 CPU actor 资源会注册到全局 CPUResourceManager,资源不足时会在组件构建阶段报错,避免 Ray actor 长时间 pending。
ComposedJudgerConfig 用于多分支场景:样本通过 RolloutState.data_source 路由到一个或多个 Judger。data_source 为字符串时选择一个 branch;为字典时使用字典 key 同时选择多个 branch,并用 merge_fn 合并结果。
输入输出约定#
使用预置 SingleTurnAgentLoop 和 RLTrainer._prepare_train_data() 时,Judger 的输入输出约定要放在整条数据链路里理解:
Sampler -> RolloutState
-> SingleTurnAgentLoop.generate_group()
-> RolloutController.generate()
-> Judger.judge() / Judger.batch_judge()
-> ReplayBuffer
-> RLTrainer._prepare_train_data()
Judger 本身统一暴露异步接口:
async def judge(self, rollout_state: RolloutState) -> RolloutState:
...
async def batch_judge(self, rollout_states: list[RolloutState]) -> list[RolloutState]:
...
judge() 只处理单条样本;批量打分使用 batch_judge()。内置 NativeJudger 和 CompassVerifierV2 默认不支持批量打分,会在 batch_judge() 入口直接报错。只有当前 Judger 明确实现 batch_judge() 时,才应打开 SingleTurnAgentLoopConfig(enable_batch_judge=True)。
Judger 输入#
在预置 SingleTurnAgentLoop 中,进入 Judger 前的 RolloutState 通常已经包含:
prompt_ids:prompt token,后续_prepare_train_data()会用它拼接训练输入。response_ids:模型生成的 response token,后续训练数据直接依赖它。response:模型生成的文本,NativeJudger会用它计算 reward。logprobs:response token 的 rollout logprob。如果存在,长度必须和response_ids一致。response_mask:哪些 response token 参与训练。为空时,_prepare_train_data()默认所有 response token 都参与训练。reward_model["ground_truth"]:标准答案或标签,预置 rule-based Judger 通常依赖这个字段。
NativeJudger 传给 reward_handler 的字段更窄:
input_kwargs = {
"response": rollout_state.response,
"label": rollout_state.reward_model["ground_truth"],
"extra_info": {**self.extra_info},
}
因此,如果只自定义 reward_handler,只能直接拿到 response、label 和 extra_info;如果需要读取 response_ids、extra_fields、工具轨迹或状态码,应继承 Judger。
Judger 输出#
Judger 的输出仍然是同一个 RolloutState,必须把分数写到 rollout_state.reward 这个 dict 中:
rollout_state.reward = {
"score": 1.0,
"acc": True,
}
如果这批数据后续要进入 _prepare_train_data(),必须满足:
reward不能为None。reward必须包含数值型score字段,因为_prepare_train_data()会直接读取data.reward["score"]来计算 advantage。status不能是ABORTED、FILTERED或FAILED。response和response_ids都不能为空。如果提供
response_mask,长度必须和response_ids一致。mask 为0的 token 会在训练 label 中变成-100,对应 advantage 也会置为0.0。如果提供
logprobs,长度必须和response_ids一致。
其他 reward 字段可以按任务需要扩展,例如 acc、format、tool_ok、reason 等。
如果使用 ComposedJudger 且 data_source 只选择一个 branch,输出会直接使用该 branch 的 reward。若 data_source 选择多个 branch,必须提供自定义 merge_fn,因为通用逻辑无法知道业务上应该如何聚合多个分数。例如:
rollout_state.reward = {
"correctness": 1.0,
"format": 0.5,
}
如果训练或评测需要 reward["score"],merge_fn 必须生成这个字段。
data_source 与 merge_fn#
ComposedJudgerConfig 的核心是 branches 和 merge_fn,路由固定读取 RolloutState.data_source:
from xtuner.v1.rl.judger import ComposedJudgerConfig, JudgerConfig
judger_config = ComposedJudgerConfig(
branches={
"gsm8k": JudgerConfig(judger_name="gsm8k", reward_handler=gsm8k_reward),
"format": JudgerConfig(judger_name="format", reward_handler=format_reward),
},
merge_fn=merge_rewards,
)
data_source 的含义:
str:必须等于某个 branch 名称,运行这一个 branch。dict:key 必须都是 branch 名称,运行这些 branches。None、空 dict、未知 branch、非字符串 key 或其他类型都会在ComposedJudger入口报错。
merge_fn 负责把多个子 Judger 的输出合并回一个 RolloutState:
from xtuner.v1.rl.judger import JudgerOutput
def merge_fn(
original: RolloutState | list[RolloutState],
judged: dict[str, JudgerOutput | list[JudgerOutput]],
) -> RolloutState | list[RolloutState]:
...
当前实现没有独立的 weight_fn 配置字段。若需要加权,可以在 merge_fn 中实现,或者让 merge_fn 调用用户自己定义的 weight_fn 辅助函数:
def weight_fn(branch_name: str) -> float:
return {
"correctness": 0.9,
"format": 0.1,
}[branch_name]
def merge_rewards(original, judged):
branch_scores = {
name: output["score"]
for name, output in judged.items()
}
original.reward = {
"score": sum(weight_fn(name) * score for name, score in branch_scores.items()),
**branch_scores,
}
return original
批量输入时,merge_fn 的第一个参数是 list[RolloutState],judged 中每个 branch 的值是同样长度的 list[JudgerOutput],返回值也必须是同样长度的 list[RolloutState]。框架不提供默认 merge;多 branch 场景必须显式传入 merge_fn。
自定义 Judger#
自定义 Judger 有三种常见方式。
方式一:自定义 reward_handler#
如果打分逻辑只依赖 response、ground_truth 和静态配置,推荐直接使用 JudgerConfig(reward_handler=...):
from xtuner.v1.rl.judger import JudgerConfig
def exact_match_reward(response, label, extra_info):
pred = response.strip()
gt = str(label).strip()
score = 1.0 if pred == gt else 0.0
return {
"score": score,
"acc": score > 0,
}
judger_config = JudgerConfig(
judger_name="exact_match",
reward_handler=exact_match_reward,
extra_info={},
)
reward_handler 可以是普通函数,也可以是 async 函数。返回值必须是 dict。
方式二:HTTP Judger 服务#
如果打分逻辑在外部服务中,可以把 reward_handler 配成 URL:
judger_config = JudgerConfig(
judger_name="remote_reward",
reward_handler="http://127.0.0.1:8000/reward",
request_timeout=30.0,
)
服务端需要接收:
{
"response": "...",
"label": "...",
"extra_info": {}
}
并返回 JSON object:
{
"score": 1.0,
"acc": true
}
方式三:继承 Judger#
如果打分需要读取 RolloutState.extra_fields、工具调用轨迹、多轮状态、状态码或其他字段,可以直接继承 Judger:
from xtuner.v1.data_proto.rl_data import RolloutState
from xtuner.v1.rl.judger import Judger
class ToolJudger(Judger):
async def judge(self, rollout_state: RolloutState) -> RolloutState:
tool_ok = rollout_state.extra_fields.get("tool_ok", False)
answer = (rollout_state.response or "").strip()
label = rollout_state.reward_model["ground_truth"]
rollout_state.reward = {
"score": 1.0 if tool_ok and answer == label else 0.0,
"tool_ok": tool_ok,
}
return rollout_state
如果希望它通过配置系统构建,并继续复用 Ray actor 扩展能力,可以继承 JudgerConfig 并覆盖 build_local():
from xtuner.v1.rl.judger import JudgerConfig
class ToolJudgerConfig(JudgerConfig):
judger_name: str = "tool_judger"
def build_local(self) -> ToolJudger:
return ToolJudger()
之后仍可通过 cpu_resources 控制本地或 Ray actor 模式:
from xtuner.v1.rl.utils import CPUResourcesConfig
judger_config = ToolJudgerConfig(
cpu_resources=CPUResourcesConfig(
num_workers=4,
num_cpus_per_worker=1,
),
)
judger = judger_config.build()
在训练配置中使用#
在 V1 RL 配置中,Judger 通常挂到 TaskSpecConfig.judger_config:
from xtuner.v1.rl.agent_loop import SingleTurnAgentLoopConfig
from xtuner.v1.rl.agent_loop_manager import TaskSpecConfig
from xtuner.v1.rl.judger import GSM8KJudgerConfig
from xtuner.v1.rl.utils import CPUResourcesConfig
judger_config = GSM8KJudgerConfig(
judger_name="openai/gsm8k",
cpu_resources=CPUResourcesConfig(
num_workers=1,
num_cpus_per_worker=1,
),
)
task_config = TaskSpecConfig(
task_name="train_task",
agent_loop_config=SingleTurnAgentLoopConfig(
hf_checkpoint=model_path,
sample_params=training_sample_params,
),
judger_config=judger_config,
sampler_config=sampler_config,
)
AgentLoopManagerConfig.build() 会调用 build_judger(task_cfg.judger_config)。普通 JudgerConfig 会按 cpu_resources 构建本地或 Ray 版本;ComposedJudgerConfig 会递归构建各个 branch。
预置 Judger#
XTuner 预置了一些常用 Judger 配置,可直接在训练配置中使用:
GSM8KJudgerConfigDapoMathJudgerConfigGEO3KJudgerConfigCompassVerifierV2Config
这些预置 Judger 都遵循上面的 RolloutState -> reward dict 约定;具体任务逻辑可以直接查看对应实现文件。