《Effective AI Agent》条目 5:实现可回放的 Orchestrator 以支撑调试与回归
背景动机
Agent 系统最折磨工程师的地方,不是它会不会“答”,而是它怎么走到这个答案。线上事故往往不是最终回复“看起来不对”,而是某一步工具调用抖了一下、某次检索命中了脏数据、某个路由策略突然选错了分支,最后你只看见一个结果,却找不到责任步骤。
这跟当下互联网的软件架构实践是反着来的:我们做微服务默认有 trace、有日志、有回放;出了故障能复盘,修了之后能回归验证。Agent 其实就是一种“会自我分支、会重试、会跨多工具”的工作流系统——只不过它把控制流的一部分交给了模型。Anthropic 在 2024 的工程文章里把 agentic system 明确区分为 workflow 与 agent,并强调成功落地依赖“简单、可组合的模式”,而不是把复杂性藏进黑盒。
所以这条原则很硬:每一次任务必须可记录、可复现、可重放。否则你无法稳定迭代,只能靠“玄学调参”。
原理分析
“可回放”不是一个日志开关,它是一套运行时语义:你要把一次执行变成可重建的“事件历史”。Temporal 这类工作流引擎用 Event History 记录每次执行的命令与事件,并依赖确定性约束实现一致 replay——这套机制之所以能让工程团队长期维护复杂流程,本质就在于“把执行过程变成可回放的数据”。
Agent 的麻烦在于:LLM 不确定、外部世界也不确定。因此可回放要分层:
第一层是播放式回放:不重新请求模型、不重打外部工具,直接按历史事件“播放”输入输出,用来复现现场、定位责任步骤(这是 Debug 的刚需)。
第二层是半确定回放:外部工具返回用历史快照冻结,只重跑模型或部分策略,用来做 Prompt/模型升级的回归对比(你要的是“策略变没变坏”,不是“外部世界变没变”)。
第三层才是全量重跑:连工具也重跑,更像演练/压测,前提是工具幂等、可关联、可对账,否则只会制造更多噪音。
这套分层之所以可落地,是因为它借了两条成熟的架构思想:一条是 Fowler 总结的 Event Sourcing(用事件重建状态、天然支持审计与回放),另一条是分布式可观测性(用 trace context 把跨组件的步骤串起来)。
工程实践
要实现可回放的 Orchestrator,别把它当“串工具的脚本”,要把它当一个小型工作流内核:它负责定义运行契约、事件模型、存储与回放器,并把这些能力接进你现有的观测与回归体系。
首先把“运行契约”钉死。每次 run 至少要固化:模型与版本、采样参数、system prompt 版本、工具 schema 版本、检索配置版本。OpenAI 在 2025 的 agents 工具发布里把“可观测(trace & inspect)”作为 agent 工程化能力的一部分提出来,本质就是在提醒你:没有这些版本维度,你连问题都描述不清,更别说回归对比。
然后把每一步变成不可变事件,并进入追加写存储。不要只存“最终答案”,要存:路由决策、工具入参/出参、模型原始输出、解析结果、重试原因、耗时、token 用量。LangChain 在谈 deep agents 调试时反复强调“你需要可视化每一步做了什么”,否则长链路里任何一次偏航都像大海捞针。
接着,工具必须“可回放”。工程上最常见的坑是:你以为模型不稳定,其实是工具不稳定;你以为回归变差,其实是外部数据漂移。最低要求是给每次工具调用一个可派生的 call_id(例如 run_id:step_id),并把它透传到下游系统,做到幂等与可对账。这里强烈建议直接对齐 OpenTelemetry 的语义:把一次 run 视为 trace,把每一步视为 span,把 run_id / call_id 作为 attributes。LangSmith 的 OpenTelemetry 支持与文档,本质上就是把“agent 轨迹”纳入成熟的分布式追踪范式。
最后,把回放变成回归。你要把历史 run 变成“回归集”,对每次 Prompt/模型/策略升级跑一遍半确定回放,比较三类指标:路径长度(步数、工具调用数、重试次数)、成本(token 与外部调用费用)、质量(结构化输出通过率、引用覆盖率、关键断言正确率)。Chip Huyen 在 agents 的工程写法里强调 failure modes 与 evaluation 的重要性——而可回放轨迹就是你做这些评估的“样本基建”。
示例代码
下面这段代码给的是“可回放最小骨架”:
- RunContract 固化版本维度
- 每一步写入 append-only 事件日志
- 提供 playback(复现现场)与 hybrid_replay(冻结工具结果做回归)
- 预留
trace_id字段,方便你对接 OTel/W3C Trace Context(跨服务串起来)([W3C][8])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Callable, Optional
import json, time, uuid
JSON = Dict[str, Any]
@dataclass(frozen=True)
class RunContract:
# “这次到底跑的是什么”——必须可追溯
model: str
model_version: str
temperature: float
system_prompt_version: str
tool_schema_version: str
retriever_version: str
@dataclass(frozen=True)
class StepEvent:
run_id: str
step_id: int
kind: str # router/tool/llm/parse/error...
name: str # tool_name/model_name/router_name...
ts_ms: int
trace_id: str # 便于接入 OpenTelemetry/W3C Trace Context
input: JSON
output: JSON
meta: JSON # latency, tokens, retry, etc.
class AppendOnlyLog:
def __init__(self, path: str):
self.path = path
def append(self, e: StepEvent) -> None:
with open(self.path, "a", encoding="utf-8") as f:
f.write(json.dumps(asdict(e), ensure_ascii=False) + "\n")
def read(self, run_id: str) -> List[StepEvent]:
out: List[StepEvent] = []
with open(self.path, "r", encoding="utf-8") as f:
for line in f:
obj = json.loads(line)
if obj["run_id"] == run_id:
out.append(StepEvent(**obj))
return sorted(out, key=lambda x: x.step_id)
class ReplayableOrchestrator:
def __init__(self, log: AppendOnlyLog, tools: Dict[str, Callable[[JSON], JSON]]):
self.log = log
self.tools = tools
def _now_ms(self) -> int:
return int(time.time() * 1000)
def execute(
self,
user_input: str,
*,
contract: RunContract,
llm: Callable[[str], str],
) -> str:
run_id = uuid.uuid4().hex
trace_id = uuid.uuid4().hex # 真实场景:从入口透传/生成,贯穿全链路
step = 0
def emit(kind: str, name: str, _in: JSON, _out: JSON, meta: JSON) -> None:
nonlocal step
self.log.append(StepEvent(
run_id=run_id, step_id=step, kind=kind, name=name,
ts_ms=self._now_ms(), trace_id=trace_id,
input=_in, output=_out, meta=meta,
))
step += 1
emit("contract", "run_contract", {"user_input": user_input}, {"contract": asdict(contract)}, {})
# 1) router(示例:可替换组件)
route = "search" if "查" in user_input or "search" in user_input.lower() else "none"
emit("router", "simple_router", {"text": user_input}, {"route": route}, {"router_version": "v1"})
# 2) tool(要求可回放:call_id 可对账、可幂等)
tool_snapshot: Optional[JSON] = None
if route == "search":
call_id = f"{run_id}:{step}"
t0 = self._now_ms()
tool_snapshot = self.tools["search"]({"q": user_input, "call_id": call_id})
emit("tool", "search", {"q": user_input, "call_id": call_id}, {"result": tool_snapshot},
{"latency_ms": self._now_ms() - t0, "tool_schema": contract.tool_schema_version})
# 3) llm(把“渲染后的 prompt”固化,避免模板演进导致无法回放)
prompt = f"[sys:{contract.system_prompt_version}] Q={user_input}\nCTX={tool_snapshot}\nA="
t0 = self._now_ms()
raw = llm(prompt)
emit("llm", contract.model, {"prompt": prompt}, {"raw": raw},
{"latency_ms": self._now_ms() - t0, "model_version": contract.model_version, "temperature": contract.temperature})
# 4) parse(结构化输出时,这一步的失败原因是排障关键)
answer = raw.strip()
emit("parse", "plain_parser", {"raw": raw}, {"answer": answer}, {"parser_version": "v1"})
return answer # 线上建议同时返回 run_id 供定位
def playback(self, run_id: str) -> List[StepEvent]:
# 播放式回放:完全复现场景,不发起任何外部调用
return self.log.read(run_id)
def hybrid_replay(self, run_id: str, *, new_llm: Callable[[str], str]) -> str:
# 冻结工具结果,只重跑 LLM:用于回归对比
events = self.log.read(run_id)
llm_evt = next(e for e in events if e.kind == "llm")
return new_llm(llm_evt.input["prompt"]).strip()
参考文献
- Anthropic — Building effective agents -
https://www.anthropic.com/research/building-effective-agents - OpenAI — New tools for building agents -
https://openai.com/index/new-tools-for-building-agents/ - LangChain — Debugging Deep Agents with LangSmith -
https://blog.langchain.com/debugging-deep-agents-with-langsmith/ - LangChain — Introducing OpenTelemetry support for LangSmith
https://blog.langchain.com/opentelemetry-langsmith/ - LangSmith Docs — Trace with OpenTelemetry -
https://docs.langchain.com/langsmith/trace-with-opentelemetry - Chip Huyen — Agents -
https://huyenchip.com/2025/01/07/agents.html - Temporal Docs — Temporal Workflow / Event History / deterministic replay -
https://docs.temporal.io/workflows - W3C — Trace Context - https://www.w3.org/TR/trace-context/`
- OpenTelemetry — Context propagation -
https://opentelemetry.io/docs/concepts/context-propagation/ - Martin Fowler — Event Sourcing -
https://martinfowler.com/eaaDev/EventSourcing.html