跳转至

TaskList 设计文档 (L1330 C2/C3)

状态: MVP 已交付 位置: core/pkg/tasklist/ 归属文档: 本文档是 agent_teams.md 的 shared task list 深入篇

分层架构

  ┌────────────────────────────────────────────┐
  │  TaskList (业务层, 唯一实现)                 │
  │  - Claim 状态机 (pending → claimed)         │
  │  - Complete / Fail 状态转换                  │
  │  - 调 Store.CAS 保证原子性                   │
  └─────────────────┬──────────────────────────┘
                    │ Store 接口 (4 方法)
  ┌────────────────────────────────────────────┐
  │  Store 实现 (多后端)                         │
  ├────────────────────────────────────────────┤
  │  MemoryStore      — 内存 (默认, 零依赖)     │
  │  MarkdownStore    — 文件 + flock (Anthropic 互操作) │
  │  CustomStore      — 消费层实现 (DB / Redis / WMS) │
  └────────────────────────────────────────────┘

为什么分两层?

经历了三个方案的比较:

方案 A (最初): 三个独立 TaskList 实现

每个实现 (MemoryTaskList / MarkdownTaskList / CustomTaskList) 自带完整的 Claim 状态机 + 存储逻辑. ~500 行代码, 业务逻辑重复三份, 易出现"不同 后端行为微妙不一致"的 bug.

方案 B: 一个 TaskList + config 参数

NewTaskList(Config{Backend: "markdown", Path: "..."}). API 表面简洁 但内部大 switch, 违反开闭原则, 新增 backend 要改核心代码. 配置错误 (如 Backend=markdown 但 Path 空) 运行时才报错.

方案 C (采用): 业务层 + 存储层

业务逻辑一份 (TaskList), 存储实现各自隔离 (Store). 对齐 database/sql + driver 的 Go 生态标准模式. 新增 backend 只需实现 4 个方法.

Store 接口契约

type Store interface {
    Get(ctx, id string) (Task, error)
    CAS(ctx, id string, expectedVersion int, newTask Task) error
    List(ctx) ([]Task, error)
    Close() error
}

CAS 语义

  • expectedVersion == 0: 新增模式. id 已存在返回 ErrTaskAlreadyExists, 否则写入.
  • expectedVersion > 0: 更新模式. 当前 Version 必须等于 expectedVersion, 否则返回 ErrConcurrentModification (乐观锁失败).

原子性由实现保证: - MemoryStore: sync.Mutex - MarkdownStore: syscall.Flock + 文件 read-modify-write - DB 实现示例: UPDATE SET ...v=v+1 WHERE id=? AND v=?

接入指南: 自定义 Store

参考 examples/agent_teams/tasklist_custom.

最小实现模板

type MyStore struct {
    // 你自己的字段 (DB 连接 / Redis 客户端 / ...)
}

func (s *MyStore) Get(ctx context.Context, id string) (tasklist.Task, error) {
    // SELECT * FROM my_tasks WHERE id=?
}

func (s *MyStore) CAS(ctx context.Context, id string, expectedVersion int, newTask tasklist.Task) error {
    // 伪代码:
    // tx := db.Begin()
    // current := tx.QueryRow(SELECT * FROM my_tasks WHERE id=?)
    // if current 不存在 && expectedVersion == 0 {
    //     tx.Exec(INSERT INTO my_tasks ...)
    // } else if current.Version == expectedVersion {
    //     tx.Exec(UPDATE my_tasks SET ...v=?+1 WHERE id=? AND v=?)
    // } else {
    //     return ErrConcurrentModification
    // }
    // tx.Commit()
}

func (s *MyStore) List(ctx context.Context) ([]tasklist.Task, error) {
    // SELECT * FROM my_tasks ORDER BY created_at
}

func (s *MyStore) Close() error {
    // db.Close() / redis.Close() / ...
}

测试自定义 Store

tasklist 包的测试函数可作为 "Store contract test" 模板:

func TestMyStore_Contract(t *testing.T) {
    store := NewMyStore(...)
    tl := tasklist.New(store)
    // 复制 tasklist_test.go 的测试用例, 换 Store 即可
}

跨行业 Store 选型建议

场景 推荐 Store 原因
单机开发 / CLI dogfood MemoryStore 零配置, 快
单元测试 MemoryStore 确定性, 无 I/O
编程客户需要互操作 Anthropic Claude Code MarkdownStore tasks.md 格式对齐
跨 session / 跨机器协作 (非敏感) MarkdownStore (NFS) 或 Redis Store 看网络和延迟需求
金融合规 (SOX/GDPR) PostgreSQL + 审计表 DB 事务原子性 + 审计粒度
医疗合规 (HIPAA) 加密存储 (加密 DB / Vault) 明文文件违规
仓储 WMS 集成 WMS 主表 避免系统间数据冗余
法律事务所 文档管理系统 API 审批流 / 版本控制

常见问题

Q: MarkdownStore 的文件锁够用吗? A: flock 是 advisory lock, 本进程守规矩即可. 跨进程协调 OK. 极端场景 (如 NFS) flock 语义可能有差异, 高并发跨机器建议换 Redis Store.

Q: Version 溢出了怎么办? A: int 64 位, 足以应付 > 2^62 次更新. 单个任务被改亿万次属于系统设计 问题, 不是 Version 溢出问题.

Q: 任务数变多后 MarkdownStore 性能? A: 每次 CAS 是 O(N) 任务读写. ~1000 任务以下无感知, ~10000 任务建议 换 DB Store.

Q: 如何实现任务监听 (实时通知)? A: 当前 MVP 没有 Watch 方法. 消费层可: - 轮询 List (简单场景) - 订阅 flyto.EventObservertask_created 等事件 (精准) - 等真实需求出现时引擎层加 Store.Watch() <-chan TaskEvent

Q: Anthropic Claude Code 的 tasks.md 格式会变怎么办? A: MarkdownStore 的 parseMarkdown 采用状态机式解析, 未知字段忽略, 兼容 Anthropic 未来扩展. 若格式根本改变 (如改用 YAML), 在 Flyto 增加 AnthropicV2TasksStore 实现即可, 老的 MarkdownStore 保留做旧兼容.