INF-2 操作历史/回滚 - 完整调研报告¶
内部文档.指导 INF-2 的实现设计.
执行摘要¶
本报告基于对业界操作历史和回滚机制的深入调研,针对 Flyto Agent Engine 的三大场景(编程,仓储,企业API)提供设计建议.核心观点:不存在"一个方案适应所有场景"的银弹,应采取分层混合架构:
- 编程场景:采用 Git 风格的内容寻址 + 快照(当前已有基础)
- 仓储场景:结合审计日志 + 补偿事务 + 版本向量,优先保障数据一致性和防冲突
- 企业 API:基于 Saga 补偿 + 事件流,因不可逆操作多
第一部分:业界方案深度分析¶
1. 文件操作历史¶
1.1 Git Object Model(内容寻址存储)¶
核心原理
Git 是内容寻址文件系统(Content-Addressable Filesystem): - 每个对象(blob/tree/commit/tag)由其内容的 SHA-1 哈希标识 - 三层模型:blob(文件内容) → tree(目录结构) → commit(快照) - 关键特性:相同内容自动去重(同一个 index.js 在多个 commit 中只存一份)
存储结构:
优点: - 极端高效的存储利用(内容重复自动去重) - 完全的不可篡改性(改一个字节,哈希完全不同) - 天然支持时间旅行(DAG 结构) - 跨系统兼容(只用内容标识,不依赖文件路径)
缺点: - 查询低效:找某个文件某个版本需要遍历整个 commit tree - 删除策略复杂:垃圾回收(GC)需要追踪可达对象 - 冲突解决复杂:需要手动合并
对 Flyto 的适配
早期方案 采用了简化版:hash@v{n} 命名方案,基于内容哈希创建快照文件,绑定消息 ID.建议 Flyto 沿用这个方案:
~/.flyto/history/{project-hash}/
├── {file-content-hash}@v1.bak
├── {file-content-hash}@v2.bak
└── metadata.json
1.2 Copy-on-Write(COW)快照¶
核心原理
修改时不覆盖原文件,而是创建新版本.Btrfs/ZFS 文件系统在块级别实现:
- 创建快照:瞬间操作(O(1)),仅记录时间点
- 修改时:新数据写入新块,元数据指向新块
- 原快照:保持指向旧块
优点:快照创建极快,空间高效,天然原子性 缺点:数据库不兼容(随机写导致碎片化),文件系统依赖(不跨平台)
对 Flyto 的适配:不推荐直接用.Flyto 需跨平台,但可以在应用层模拟 COW 的优点(延迟复制,差异记录).
1.3 文件方案总评¶
| 方案 | 文件历史 | 回滚 | 存储效率 | 跨平台 | 查询性能 |
|---|---|---|---|---|---|
| Git Model | ★★★★★ | ★★★★ | ★★★★★ | ★★★★★ | ★★ |
| COW 快照 | ★★★★ | ★★★★★ | ★★★★★ | ❌ | ★★★ |
| 朴素备份 | ★★ | ★★★ | ★ | ★★★★★ | ★★★ |
推荐:Git Model + 应用层 diff 记录
2. 数据库操作历史¶
数据库场景是仓储系统最关键的.库存数据改错会导致发错货--真金白银的损失.
2.1 Change Data Capture(CDC)¶
核心原理
捕获数据库的行级变更事件:
MySQL 二进制日志:
[INSERT] table=inventory, id=SKU-001, qty=100
[UPDATE] table=inventory, id=SKU-001, qty=100 → qty=95
[DELETE] table=inventory, id=SKU-002
主流工具:Debezium(业界标准),Maxwell(MySQL 专用)
优点:零业务代码改动,不漏事件,实时,多消费者 缺点: - 只捕获数据变化,不捕获"为什么改"(语义丢失) - 和 Agent 工具调用的关联性弱 - 恢复操作复杂
对 Flyto 的适配:不直接依赖 Debezium,而是在工具层做类似的事--截获 SQL 执行前后的状态,记录 (tool_call_id, sql, before_state, after_state).这等于"应用层的 CDC",带上下文.
2.2 Event Sourcing(事件溯源)¶
核心原理
不存储最终状态,存储所有事件序列.任意时刻的状态 = 重放从起始到该时刻的所有事件.
传统方式:
inventory表: SKU-001 qty=95
Event Sourcing 方式:
[1] InventoryCreated{sku="SKU-001", qty=100}
[2] StockDecremented{sku="SKU-001", qty=-5, reason="发货"}
查询当前库存:重放所有事件 → qty=95
优点:完整审计,任意时间点回滚,因果关系清晰 缺点:查询性能极差(需重放),存储膨胀,Schema 演化困难
对 Flyto 的适配:Event Sourcing 的核心思想("每个操作是一个事件,可重放")非常适合.但不需要完全采用(存储爆炸),只需记录"操作序列"和"补偿操作".
2.3 Audit Trail(审计表)- 推荐¶
核心原理
最简单,最实用:每次修改时在审计表记录 before/after 值.
CREATE TABLE inventory_audit_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
inventory_id BIGINT,
action VARCHAR(20), -- INSERT/UPDATE/DELETE
old_qty INT,
new_qty INT,
changed_by VARCHAR(50), -- Agent ID
changed_at TIMESTAMP,
reason VARCHAR(500) -- 操作原因
);
优点:实现极简,查询高效,灵活,互联网公司标配 缺点:双写复杂性,性能开销(每次修改 2 个写操作)
对 Flyto 的适配:最推荐.Flyto 内部的 OperationLog:
type OperationLog struct {
ID int64
MessageID string
TurnNumber int
ToolName string
ToolInput string // JSON
ToolOutput string // JSON
BeforeState string // 修改前状态快照
AfterState string // 修改后状态快照
UndoInfo string // 回滚指令 JSON
ChangedAt time.Time
AgentID string
Status string // success / failed / compensated
}
2.4 Temporal Tables(时序表)¶
SQL:2011 标准,PostgreSQL/SQL Server 原生支持.数据库自动维护历史版本,可查询 AS OF SYSTEM TIME.
优点:数据库原生,自动维护,查询简单 缺点:特定数据库,不记录"为什么",回滚困难
对 Flyto 的适配:不作为主要方案(需跨数据库),但 PostgreSQL 用户可选.
2.5 数据库方案总评¶
| 方案 | 审计完整性 | 查询性能 | 回滚难度 | 跨平台 |
|---|---|---|---|---|
| CDC | ★★ | ★★★★★ | ★★★ | ★★★★ |
| Event Sourcing | ★★★★★ | ❌ | ★★★★★ | ★★★★★ |
| Audit Trail | ★★★★ | ★★★★ | ★★★ | ★★★★★ |
| Temporal Tables | ★★★ | ★★★★ | ★★ | ★★ |
推荐:Audit Trail 作为基础,结合应用层的 Event Sourcing 思想
3. API 操作回滚¶
API 场景特点:第三方系统,不可信,不可逆操作多.
3.1 Saga 模式 - 推荐¶
核心原理
分布式事务中每个步骤有对应的补偿操作.Orchestration 模式(集中协调):
Saga 协调者:
① 调用 PaymentService.charge(100) → OK
② 调用 OrderService.create(orderId) → OK
③ 调用 ShippingService.dispatch(orderId) → FAIL
④ 反向补偿:
- OrderService.cancel(orderId)
- PaymentService.refund(100)
对 Flyto 的适配:Flyto Agent 天然就是"协调者",每个工具调用是一个"步骤".
// 发货单处理
message {
steps: [
{tool: "SQLQuery", input: "UPDATE inventory ...", undoInfo: "UPDATE inventory SET qty=qty+50 ..."},
{tool: "HTTPCall", input: "POST /shipment", undoInfo: "DELETE /shipment/xxx"},
{tool: "SendEmail", input: "to: customer@...", undoInfo: null}, // 不可逆
]
}
// 回滚:倒序执行 undo
for i = len(steps)-1; i >= 0; i-- {
if steps[i].undoInfo != null {
execute(steps[i].undoInfo)
} else {
log("WARNING: step %d 不可逆", i)
}
}
3.2 Compensating Transaction(补偿事务)¶
不能真正"回滚",而是执行"反向业务操作":
| 操作 | 补偿 |
|---|---|
| INSERT user | DELETE user |
| TRANSFER A→B | TRANSFER B→A |
| SendEmail | [不可补偿] |
3.3 Command Pattern(命令模式)¶
GoF 设计模式:每个操作封装为对象,包含 execute() 和 undo().
对 Flyto 的适配:Tool 接口扩展 UndoInfo,但不强制所有工具都实现.
第二部分:关键设计问题¶
问题 a)回滚粒度¶
| 粒度 | 描述 | 复杂度 | 推荐 |
|---|---|---|---|
| 按操作 | 只撤销一个工具调用 | 中 | 不推荐(可能留下不一致状态) |
| 按消息 | 撤销整条消息的所有操作 | 中 | 推荐(原子性好,用户易理解) |
| 按会话 | 撤销多条消息 | 高 | 因果链追踪困难 |
| 按检查点 | 回到标记的稳定点 | 中 | 可选扩展 |
推荐按消息回滚:语义清晰("撤销最后一条"),实现可行(倒序执行 UndoInfo).
问题 b)不可逆操作¶
处理策略:
1. 工具声明 Irreversible: true
2. 权限系统执行前警告用户
3. UndoInfo 不是反向操作,而是"人工操作清单"
4. 回滚时标记为"需要人工审查"
// SendEmail 的 UndoInfo
UndoInfo: {
action: "manual_followup_required",
instruction: "邮件已发送无法撤回。建议发送补充邮件说明。",
}
问题 c)并发冲突¶
场景:Agent A 和 Agent B 同时修改同一行库存.
推荐乐观锁 + 版本号:
-- 库存表加版本号
ALTER TABLE inventory ADD COLUMN version INT DEFAULT 1;
-- Agent A 更新(检查版本)
UPDATE inventory SET qty=50, version=version+1
WHERE sku='SKU-001' AND version=100;
-- 成功 → rows_affected=1
-- Agent B 更新(版本已变)
UPDATE inventory SET qty=80, version=version+1
WHERE sku='SKU-001' AND version=100;
-- 失败 → rows_affected=0 → 冲突检测
回滚时也检查版本号,防止覆盖其他 Agent 的修改.
问题 d)性能开销¶
优化策略: 1. 批量操作:一次 INSERT INTO backup SELECT * FROM ... WHERE id IN (...) 而非逐行 2. 快照而非逐行备份:O(1) 时间标记时间点 3. 异步审计日志:后台写,不阻塞主线程 4. 内容去重:SHA256 哈希,相同内容只存一份 5. 增量审计:只记录改动的列,不记录整行
问题 e)存储策略¶
分级保留: - 最近 7 天:全量备份,随时回滚(热数据) - 7 天 - 3 个月:每天一个快照(温数据) - 3 个月 - 5 年:每月一个快照,只保留关键表(冷数据)
垃圾回收:每周运行,扫描活跃消息引用的备份,删除孤立版本.
第三部分:Flyto 三大场景具体实现¶
场景 1:编程(文件编辑)¶
// FileEdit 执行前自动备份
func (t *FileEditTool) Execute(...) {
backup := createBackup(filePath) // SHA256 内容寻址
result := executeEdit(...)
result.UndoInfo = &UndoInfo{
ToolName: "FileRestore",
Input: map[string]interface{}{"path": filePath, "backup_id": backup.ID},
}
return result
}
场景 2:仓储(数据库库存)¶
// SQLQuery 执行前记录 before 状态
func (t *SQLQueryTool) Execute(...) {
beforeState := captureState(sql)
result := db.Exec(sql)
afterState := captureState(sql)
undoSQL := generateReverseSQL(beforeState, afterState)
// 记录审计日志
auditLog.Insert(&OperationLog{
BeforeState: beforeState,
AfterState: afterState,
UndoInfo: undoSQL,
})
result.UndoInfo = &UndoInfo{ToolName: "SQLQuery", Input: undoSQL}
return result
}
场景 3:企业 API(Saga 补偿)¶
// Engine.Rollback 倒序执行补偿
func (e *Engine) Rollback(ctx context.Context, messageID string) error {
ops := e.log.GetOperations(messageID)
for i := len(ops) - 1; i >= 0; i-- {
if ops[i].UndoInfo == nil {
// 不可逆操作 → 生成人工操作清单
e.generateManualActionChecklist(ops[i])
continue
}
// 执行补偿
tool, _ := e.tools.Get(ops[i].UndoToolName)
tool.Execute(ctx, ops[i].UndoInput, nil)
}
e.log.UpdateStatus(messageID, "compensated")
return nil
}
第四部分:实现路线图¶
Phase 1:编程场景(2 周)¶
- FileEdit/FileWrite 前自动备份
- 内容寻址存储(SHA256 去重)
- FileRestore 工具
- OperationLog 内存模型
- 单消息回滚 API
Phase 2:基础架构(3 周)¶
- OperationLog 持久化(SQLite 或文件)
- Observer 事件埋点
- Rollback API 完整实现
- 测试:文件回滚,并发修改,性能基准
Phase 3:仓储场景(4 周)¶
- SQLQuery 工具扩展(before/after 状态捕获)
- 审计表设计
- 版本号乐观锁集成
- Saga 补偿逻辑
- 并发冲突检测
Phase 4:企业 API 场景(3 周)¶
- HTTPCall 工具 UndoInfo 生成
- 不可逆操作检测与标记
- 人工操作清单生成
Phase 5:存储优化(2 周)¶
- 内容去重
- 分级保留策略
- 垃圾回收
Phase 6:生产就绪(2 周)¶
- 性能测试(大表回滚)
- 恢复测试(数据一致性)
- 文档和监控告警
第五部分:对现有代码的影响¶
Tool 接口扩展¶
type Result struct {
Output string
IsError bool
Data interface{}
UndoInfo *UndoInfo // 新增
}
type UndoInfo struct {
ToolName string
Input map[string]interface{}
Description string
Irreversible bool // 标记不可逆
}
Engine 新增 API¶
func (e *Engine) Rollback(ctx context.Context, messageID string) error
func (e *Engine) ListSnapshots(ctx context.Context) ([]*Snapshot, error)
func (e *Engine) CanRollback(ctx context.Context, messageID string) (bool, []string)
风险与缓解¶
| 风险 | 缓解 |
|---|---|
| 大表回滚性能差 | 分批执行,异步化 |
| 补偿失败无法恢复 | 人工清单,告警,重试 |
| 存储爆炸 | GC,分级保留,去重 |
| 并发冲突频繁 | 版本号,监控,重试 |
| 不可逆操作丢失 | 预先声明,强制审批 |