跳转至

INF-2 操作历史/回滚 - 完整调研报告

内部文档.指导 INF-2 的实现设计.

执行摘要

本报告基于对业界操作历史和回滚机制的深入调研,针对 Flyto Agent Engine 的三大场景(编程,仓储,企业API)提供设计建议.核心观点:不存在"一个方案适应所有场景"的银弹,应采取分层混合架构:

  • 编程场景:采用 Git 风格的内容寻址 + 快照(当前已有基础)
  • 仓储场景:结合审计日志 + 补偿事务 + 版本向量,优先保障数据一致性和防冲突
  • 企业 API:基于 Saga 补偿 + 事件流,因不可逆操作多

第一部分:业界方案深度分析

1. 文件操作历史

1.1 Git Object Model(内容寻址存储)

核心原理

Git 是内容寻址文件系统(Content-Addressable Filesystem): - 每个对象(blob/tree/commit/tag)由其内容的 SHA-1 哈希标识 - 三层模型:blob(文件内容) → tree(目录结构) → commit(快照) - 关键特性:相同内容自动去重(同一个 index.js 在多个 commit 中只存一份)

存储结构:

.git/objects/
├── ab/     # 哈希前 2 位作为目录
│   └── cdef1234...   # 剩余 38 位作为文件名
└── ...

优点: - 极端高效的存储利用(内容重复自动去重) - 完全的不可篡改性(改一个字节,哈希完全不同) - 天然支持时间旅行(DAG 结构) - 跨系统兼容(只用内容标识,不依赖文件路径)

缺点: - 查询低效:找某个文件某个版本需要遍历整个 commit tree - 删除策略复杂:垃圾回收(GC)需要追踪可达对象 - 冲突解决复杂:需要手动合并

对 Flyto 的适配

早期方案 采用了简化版:hash@v{n} 命名方案,基于内容哈希创建快照文件,绑定消息 ID.建议 Flyto 沿用这个方案:

~/.flyto/history/{project-hash}/
├── {file-content-hash}@v1.bak
├── {file-content-hash}@v2.bak
└── metadata.json

1.2 Copy-on-Write(COW)快照

核心原理

修改时不覆盖原文件,而是创建新版本.Btrfs/ZFS 文件系统在块级别实现:

  1. 创建快照:瞬间操作(O(1)),仅记录时间点
  2. 修改时:新数据写入新块,元数据指向新块
  3. 原快照:保持指向旧块

优点:快照创建极快,空间高效,天然原子性 缺点:数据库不兼容(随机写导致碎片化),文件系统依赖(不跨平台)

对 Flyto 的适配:不推荐直接用.Flyto 需跨平台,但可以在应用层模拟 COW 的优点(延迟复制,差异记录).

1.3 文件方案总评

方案 文件历史 回滚 存储效率 跨平台 查询性能
Git Model ★★★★★ ★★★★ ★★★★★ ★★★★★ ★★
COW 快照 ★★★★ ★★★★★ ★★★★★ ★★★
朴素备份 ★★ ★★★ ★★★★★ ★★★

推荐:Git Model + 应用层 diff 记录


2. 数据库操作历史

数据库场景是仓储系统最关键的.库存数据改错会导致发错货--真金白银的损失.

2.1 Change Data Capture(CDC)

核心原理

捕获数据库的行级变更事件:

MySQL 二进制日志:
  [INSERT] table=inventory, id=SKU-001, qty=100
  [UPDATE] table=inventory, id=SKU-001, qty=100 → qty=95
  [DELETE] table=inventory, id=SKU-002

主流工具:Debezium(业界标准),Maxwell(MySQL 专用)

优点:零业务代码改动,不漏事件,实时,多消费者 缺点: - 只捕获数据变化,不捕获"为什么改"(语义丢失) - 和 Agent 工具调用的关联性弱 - 恢复操作复杂

对 Flyto 的适配:不直接依赖 Debezium,而是在工具层做类似的事--截获 SQL 执行前后的状态,记录 (tool_call_id, sql, before_state, after_state).这等于"应用层的 CDC",带上下文.

2.2 Event Sourcing(事件溯源)

核心原理

不存储最终状态,存储所有事件序列.任意时刻的状态 = 重放从起始到该时刻的所有事件.

传统方式:
  inventory表: SKU-001 qty=95

Event Sourcing 方式:
  [1] InventoryCreated{sku="SKU-001", qty=100}
  [2] StockDecremented{sku="SKU-001", qty=-5, reason="发货"}
  查询当前库存:重放所有事件 → qty=95

优点:完整审计,任意时间点回滚,因果关系清晰 缺点:查询性能极差(需重放),存储膨胀,Schema 演化困难

对 Flyto 的适配:Event Sourcing 的核心思想("每个操作是一个事件,可重放")非常适合.但不需要完全采用(存储爆炸),只需记录"操作序列"和"补偿操作".

2.3 Audit Trail(审计表)- 推荐

核心原理

最简单,最实用:每次修改时在审计表记录 before/after 值.

CREATE TABLE inventory_audit_log (
  id BIGINT AUTO_INCREMENT PRIMARY KEY,
  inventory_id BIGINT,
  action VARCHAR(20),           -- INSERT/UPDATE/DELETE
  old_qty INT,
  new_qty INT,
  changed_by VARCHAR(50),       -- Agent ID
  changed_at TIMESTAMP,
  reason VARCHAR(500)           -- 操作原因
);

优点:实现极简,查询高效,灵活,互联网公司标配 缺点:双写复杂性,性能开销(每次修改 2 个写操作)

对 Flyto 的适配:最推荐.Flyto 内部的 OperationLog:

type OperationLog struct {
    ID           int64
    MessageID    string
    TurnNumber   int
    ToolName     string
    ToolInput    string    // JSON
    ToolOutput   string    // JSON
    BeforeState  string    // 修改前状态快照
    AfterState   string    // 修改后状态快照
    UndoInfo     string    // 回滚指令 JSON
    ChangedAt    time.Time
    AgentID      string
    Status       string    // success / failed / compensated
}

2.4 Temporal Tables(时序表)

SQL:2011 标准,PostgreSQL/SQL Server 原生支持.数据库自动维护历史版本,可查询 AS OF SYSTEM TIME.

优点:数据库原生,自动维护,查询简单 缺点:特定数据库,不记录"为什么",回滚困难

对 Flyto 的适配:不作为主要方案(需跨数据库),但 PostgreSQL 用户可选.

2.5 数据库方案总评

方案 审计完整性 查询性能 回滚难度 跨平台
CDC ★★ ★★★★★ ★★★ ★★★★
Event Sourcing ★★★★★ ★★★★★ ★★★★★
Audit Trail ★★★★ ★★★★ ★★★ ★★★★★
Temporal Tables ★★★ ★★★★ ★★ ★★

推荐:Audit Trail 作为基础,结合应用层的 Event Sourcing 思想


3. API 操作回滚

API 场景特点:第三方系统,不可信,不可逆操作多.

3.1 Saga 模式 - 推荐

核心原理

分布式事务中每个步骤有对应的补偿操作.Orchestration 模式(集中协调):

Saga 协调者:
  ① 调用 PaymentService.charge(100) → OK
  ② 调用 OrderService.create(orderId) → OK
  ③ 调用 ShippingService.dispatch(orderId) → FAIL
  ④ 反向补偿:
     - OrderService.cancel(orderId)
     - PaymentService.refund(100)

对 Flyto 的适配:Flyto Agent 天然就是"协调者",每个工具调用是一个"步骤".

// 发货单处理
message {
  steps: [
    {tool: "SQLQuery", input: "UPDATE inventory ...", undoInfo: "UPDATE inventory SET qty=qty+50 ..."},
    {tool: "HTTPCall", input: "POST /shipment", undoInfo: "DELETE /shipment/xxx"},
    {tool: "SendEmail", input: "to: customer@...", undoInfo: null},  // 不可逆
  ]
}

// 回滚:倒序执行 undo
for i = len(steps)-1; i >= 0; i-- {
    if steps[i].undoInfo != null {
        execute(steps[i].undoInfo)
    } else {
        log("WARNING: step %d 不可逆", i)
    }
}

3.2 Compensating Transaction(补偿事务)

不能真正"回滚",而是执行"反向业务操作":

操作 补偿
INSERT user DELETE user
TRANSFER A→B TRANSFER B→A
SendEmail [不可补偿]

3.3 Command Pattern(命令模式)

GoF 设计模式:每个操作封装为对象,包含 execute() 和 undo().

对 Flyto 的适配:Tool 接口扩展 UndoInfo,但不强制所有工具都实现.


第二部分:关键设计问题

问题 a)回滚粒度

粒度 描述 复杂度 推荐
按操作 只撤销一个工具调用 不推荐(可能留下不一致状态)
按消息 撤销整条消息的所有操作 推荐(原子性好,用户易理解)
按会话 撤销多条消息 因果链追踪困难
按检查点 回到标记的稳定点 可选扩展

推荐按消息回滚:语义清晰("撤销最后一条"),实现可行(倒序执行 UndoInfo).

问题 b)不可逆操作

处理策略: 1. 工具声明 Irreversible: true 2. 权限系统执行前警告用户 3. UndoInfo 不是反向操作,而是"人工操作清单" 4. 回滚时标记为"需要人工审查"

// SendEmail 的 UndoInfo
UndoInfo: {
    action: "manual_followup_required",
    instruction: "邮件已发送无法撤回。建议发送补充邮件说明。",
}

问题 c)并发冲突

场景:Agent A 和 Agent B 同时修改同一行库存.

推荐乐观锁 + 版本号:

-- 库存表加版本号
ALTER TABLE inventory ADD COLUMN version INT DEFAULT 1;

-- Agent A 更新(检查版本)
UPDATE inventory SET qty=50, version=version+1 
WHERE sku='SKU-001' AND version=100;
-- 成功 → rows_affected=1

-- Agent B 更新(版本已变)
UPDATE inventory SET qty=80, version=version+1 
WHERE sku='SKU-001' AND version=100;
-- 失败 → rows_affected=0 → 冲突检测

回滚时也检查版本号,防止覆盖其他 Agent 的修改.

问题 d)性能开销

优化策略: 1. 批量操作:一次 INSERT INTO backup SELECT * FROM ... WHERE id IN (...) 而非逐行 2. 快照而非逐行备份:O(1) 时间标记时间点 3. 异步审计日志:后台写,不阻塞主线程 4. 内容去重:SHA256 哈希,相同内容只存一份 5. 增量审计:只记录改动的列,不记录整行

问题 e)存储策略

分级保留: - 最近 7 天:全量备份,随时回滚(热数据) - 7 天 - 3 个月:每天一个快照(温数据) - 3 个月 - 5 年:每月一个快照,只保留关键表(冷数据)

垃圾回收:每周运行,扫描活跃消息引用的备份,删除孤立版本.


第三部分:Flyto 三大场景具体实现

场景 1:编程(文件编辑)

// FileEdit 执行前自动备份
func (t *FileEditTool) Execute(...) {
    backup := createBackup(filePath)  // SHA256 内容寻址
    result := executeEdit(...)
    result.UndoInfo = &UndoInfo{
        ToolName: "FileRestore",
        Input: map[string]interface{}{"path": filePath, "backup_id": backup.ID},
    }
    return result
}

场景 2:仓储(数据库库存)

// SQLQuery 执行前记录 before 状态
func (t *SQLQueryTool) Execute(...) {
    beforeState := captureState(sql)
    result := db.Exec(sql)
    afterState := captureState(sql)
    undoSQL := generateReverseSQL(beforeState, afterState)

    // 记录审计日志
    auditLog.Insert(&OperationLog{
        BeforeState: beforeState,
        AfterState: afterState,
        UndoInfo: undoSQL,
    })

    result.UndoInfo = &UndoInfo{ToolName: "SQLQuery", Input: undoSQL}
    return result
}

场景 3:企业 API(Saga 补偿)

// Engine.Rollback 倒序执行补偿
func (e *Engine) Rollback(ctx context.Context, messageID string) error {
    ops := e.log.GetOperations(messageID)
    for i := len(ops) - 1; i >= 0; i-- {
        if ops[i].UndoInfo == nil {
            // 不可逆操作 → 生成人工操作清单
            e.generateManualActionChecklist(ops[i])
            continue
        }
        // 执行补偿
        tool, _ := e.tools.Get(ops[i].UndoToolName)
        tool.Execute(ctx, ops[i].UndoInput, nil)
    }
    e.log.UpdateStatus(messageID, "compensated")
    return nil
}

第四部分:实现路线图

Phase 1:编程场景(2 周)

  • FileEdit/FileWrite 前自动备份
  • 内容寻址存储(SHA256 去重)
  • FileRestore 工具
  • OperationLog 内存模型
  • 单消息回滚 API

Phase 2:基础架构(3 周)

  • OperationLog 持久化(SQLite 或文件)
  • Observer 事件埋点
  • Rollback API 完整实现
  • 测试:文件回滚,并发修改,性能基准

Phase 3:仓储场景(4 周)

  • SQLQuery 工具扩展(before/after 状态捕获)
  • 审计表设计
  • 版本号乐观锁集成
  • Saga 补偿逻辑
  • 并发冲突检测

Phase 4:企业 API 场景(3 周)

  • HTTPCall 工具 UndoInfo 生成
  • 不可逆操作检测与标记
  • 人工操作清单生成

Phase 5:存储优化(2 周)

  • 内容去重
  • 分级保留策略
  • 垃圾回收

Phase 6:生产就绪(2 周)

  • 性能测试(大表回滚)
  • 恢复测试(数据一致性)
  • 文档和监控告警

第五部分:对现有代码的影响

Tool 接口扩展

type Result struct {
    Output   string
    IsError  bool
    Data     interface{}
    UndoInfo *UndoInfo  // 新增
}

type UndoInfo struct {
    ToolName    string
    Input       map[string]interface{}
    Description string
    Irreversible bool  // 标记不可逆
}

Engine 新增 API

func (e *Engine) Rollback(ctx context.Context, messageID string) error
func (e *Engine) ListSnapshots(ctx context.Context) ([]*Snapshot, error)
func (e *Engine) CanRollback(ctx context.Context, messageID string) (bool, []string)

风险与缓解

风险 缓解
大表回滚性能差 分批执行,异步化
补偿失败无法恢复 人工清单,告警,重试
存储爆炸 GC,分级保留,去重
并发冲突频繁 版本号,监控,重试
不可逆操作丢失 预先声明,强制审批

参考资源