检查点与恢复
长时间运行的任务工作流可以持久化自己的进度,并在崩溃、中止或进程重启后恢复。检查点是需显式开启的,且完全运行在既有的 MemoryStore 接口之上,因此承载共享内存的那个内存、Redis、Postgres 或自定义后端,同样承载检查点——无需额外的存储层。
它覆盖编排路径(runTeam、runTasks、runFromPlan 和 restore)。单次 runAgent 调用没有可恢复的东西,不做检查点。
按调用传入 checkpoint,或通过 OrchestratorConfig.checkpoint 为每次运行设默认值。按调用的选项会覆盖配置默认值。
import { OpenMultiAgent, Team, InMemoryStore } from '@open-multi-agent/core'
const store = new InMemoryStore() // or a durable MemoryStore (Redis, SQLite, ...)
const team = new Team({ name: 'research', agents: [researcher, writer], sharedMemoryStore: store,})
const orchestrator = new OpenMultiAgent()
// Snapshots are written after each completed task.await orchestrator.runTasks(team, tasks, { checkpoint: { store } })checkpoint: true 是简写:当团队有共享内存存储时复用它,否则用一个限定在该编排器实例上的私有内存存储。
const orchestrator = new OpenMultiAgent({ checkpoint: true }) // default for all runsCheckpointOptions
Section titled “CheckpointOptions”| 字段 | 类型 | 默认值 | 用途 |
|---|---|---|---|
enabled | boolean | true | 当配置默认值为开启时,设为 false 可对单次运行禁用。 |
store | MemoryStore | 团队的共享内存存储 | 检查点记录的持久化后端。 |
runId | string | — | 逻辑运行 id;据此派生逐次运行的检查点键。 |
key | string | — | 精确的存储键。优先于 runId。 |
当团队没有共享内存存储时,必须提供
runId、key或显式的store。 实例级的回退存储在该编排器上的每次运行间共享,因此若没有一个区分性的键,两次并发运行会在默认检查点键上互相覆盖。该调用宁可抛错,也不冒静默互踩的风险。
restore() 加载最新的检查点,重建任务队列与共享内存,跳过已完成的任务,并运行其余任务。
// After a crash/restart: same team wiring, same store.const resumedTeam = new Team({ name: 'research', agents: [researcher, writer], sharedMemoryStore: store,})
const result = await orchestrator.restore(resumedTeam, { checkpoint: { store } })一次恢复的 runTeam 运行会重新执行协调器综合,因此你会得到和全新 runTeam 相同的、综合出的最终答案(位于 result.agentResults.get('coordinator')),而不只是各任务的原始输出。要重新提供你最初使用的协调器配置——检查点无法持久化一个活动的 adapter:
const result = await orchestrator.restore(resumedTeam, { checkpoint: { store }, coordinator: { provider: 'anthropic', model: 'claude-sonnet-4-6' }, // same as the original runTeam})如果综合无法运行(没有可用的协调器配置或凭据)或综合调用失败,恢复是尽力而为的:它返回各任务的原始输出,不带 'coordinator' 条目,并发出一个 onProgress 的 synthesis_failed 事件。runTasks / runFromPlan 运行从不综合。
如果找不到检查点,restore() 会回退为对你传入的任务或计划做一次正常运行——因此同一个调用对首次运行和恢复都适用:
// Fresh store → runs all tasks. Existing checkpoint → resumes, skipping done tasks.await orchestrator.restore(team, tasks, { checkpoint: { store } })await orchestrator.restore(team, plan, { checkpoint: { store } }) // PlanArtifactawait orchestrator.restore(team, { checkpoint: { store } }) // resume-only, no-op on empty store每当一个任务成功完成,编排器就写入一份 CheckpointSnapshot:
- 任务队列状态——每个任务及其状态分区(pending / in-progress / completed / failed / blocked / skipped)。
- 共享内存——回合计数器总会被记录。完整的条目快照仅在检查点存储与团队的共享内存存储不同时才嵌入。当它们是同一个存储时(
checkpoint: true的默认情形),这些条目已经在那里持久化了,因此每个任务都重新嵌入它们会造成在一次长运行中浪费约 O(N²) 的写入量;恢复时改为直接从存储读取它们。无论哪种方式,恢复都能正确地重建共享内存。 - 已完成任务的结果——每个完成任务的
taskId、assignee和result,这样被恢复的智能体能看到先前的输出。
快照以 JSON 形式存储在一个保留命名空间下:__oma_checkpoint__/<runId>/latest(未设 runId 时为 __oma_checkpoint__/latest)。__oma_checkpoint__/ 下的键是保留的——共享内存的快照 / 恢复会刻意跳过它们,使得一个存储能同时承载智能体内存和检查点。
保存是尽力而为的
Section titled “保存是尽力而为的”检查点写入绝不能拖垮它所保护的运行。如果存储拒绝(一次瞬时的 Redis/SQLite 错误),该失败会通过 onProgress 暴露出来,运行继续;下一个完成的任务会重试这次写入。
const orchestrator = new OpenMultiAgent({ onProgress(event) { if (event.type === 'error' && event.data?.kind === 'checkpoint_save_failed') { console.warn('checkpoint write failed, run continues:', event.data.error) } },})进阶:Checkpoint 类
Section titled “进阶:Checkpoint 类”为了直接检视或管理检查点,管理器与键辅助函数都已导出:
import { Checkpoint, checkpointKey, isCheckpointKey, CHECKPOINT_KEY_PREFIX, DEFAULT_CHECKPOINT_KEY,} from '@open-multi-agent/core'
const cp = new Checkpoint(store, { runId: 'nightly-2026-06-18' })const snapshot = await cp.loadLatest() // CheckpointSnapshot | nullawait cp.delete() // drop the persisted checkpoint在 MemoryStore 之上的逐次运行快照 / 恢复。它尚未做到的:
- 恢复是任务粒度的,不是任务中途的。 一个在运行中被打断的任务,在恢复时会从头重跑——运行中任务内部、进行中智能体的对话历史不会被持久化。恢复发生在任务边界处。
- 基于快照,而非事件溯源。 每个检查点覆盖前一个;没有可回放的状态转换日志。
关于上面所述共享内存优化的两点说明:
- 一个独立的持久检查点存储(共享内存在存储 X,
checkpoint: { store: Y })在每次保存时仍会嵌入完整的内存快照——这是必要的,因为 Y 不持有这些条目的任何其它副本。 - 复用存储的路径不会对共享内存做时间点回滚。默认框架仅在任务完成时写入结果(因此一个崩溃的进行中任务什么也没写),但一个在任务中途向共享内存写入的自定义工具,其那些部分写入在恢复时不会被回滚。