TaskList 设计文档 (L1330 C2/C3)¶
状态: MVP 已交付 位置:
core/pkg/tasklist/归属文档: 本文档是 agent_teams.md 的 shared task list 深入篇
分层架构¶
┌────────────────────────────────────────────┐
│ TaskList (业务层, 唯一实现) │
│ - Claim 状态机 (pending → claimed) │
│ - Complete / Fail 状态转换 │
│ - 调 Store.CAS 保证原子性 │
└─────────────────┬──────────────────────────┘
│ Store 接口 (4 方法)
▼
┌────────────────────────────────────────────┐
│ Store 实现 (多后端) │
├────────────────────────────────────────────┤
│ MemoryStore — 内存 (默认, 零依赖) │
│ MarkdownStore — 文件 + flock (Anthropic 互操作) │
│ CustomStore — 消费层实现 (DB / Redis / WMS) │
└────────────────────────────────────────────┘
为什么分两层?¶
经历了三个方案的比较:
方案 A (最初): 三个独立 TaskList 实现¶
每个实现 (MemoryTaskList / MarkdownTaskList / CustomTaskList) 自带完整的 Claim 状态机 + 存储逻辑. ~500 行代码, 业务逻辑重复三份, 易出现"不同 后端行为微妙不一致"的 bug.
方案 B: 一个 TaskList + config 参数¶
NewTaskList(Config{Backend: "markdown", Path: "..."}). API 表面简洁
但内部大 switch, 违反开闭原则, 新增 backend 要改核心代码. 配置错误
(如 Backend=markdown 但 Path 空) 运行时才报错.
方案 C (采用): 业务层 + 存储层¶
业务逻辑一份 (TaskList), 存储实现各自隔离 (Store). 对齐 database/sql +
driver 的 Go 生态标准模式. 新增 backend 只需实现 4 个方法.
Store 接口契约¶
type Store interface {
Get(ctx, id string) (Task, error)
CAS(ctx, id string, expectedVersion int, newTask Task) error
List(ctx) ([]Task, error)
Close() error
}
CAS 语义¶
expectedVersion == 0: 新增模式. id 已存在返回ErrTaskAlreadyExists, 否则写入.expectedVersion > 0: 更新模式. 当前 Version 必须等于 expectedVersion, 否则返回ErrConcurrentModification(乐观锁失败).
原子性由实现保证:
- MemoryStore: sync.Mutex
- MarkdownStore: syscall.Flock + 文件 read-modify-write
- DB 实现示例: UPDATE SET ...v=v+1 WHERE id=? AND v=?
接入指南: 自定义 Store¶
参考 examples/agent_teams/tasklist_custom.
最小实现模板¶
type MyStore struct {
// 你自己的字段 (DB 连接 / Redis 客户端 / ...)
}
func (s *MyStore) Get(ctx context.Context, id string) (tasklist.Task, error) {
// SELECT * FROM my_tasks WHERE id=?
}
func (s *MyStore) CAS(ctx context.Context, id string, expectedVersion int, newTask tasklist.Task) error {
// 伪代码:
// tx := db.Begin()
// current := tx.QueryRow(SELECT * FROM my_tasks WHERE id=?)
// if current 不存在 && expectedVersion == 0 {
// tx.Exec(INSERT INTO my_tasks ...)
// } else if current.Version == expectedVersion {
// tx.Exec(UPDATE my_tasks SET ...v=?+1 WHERE id=? AND v=?)
// } else {
// return ErrConcurrentModification
// }
// tx.Commit()
}
func (s *MyStore) List(ctx context.Context) ([]tasklist.Task, error) {
// SELECT * FROM my_tasks ORDER BY created_at
}
func (s *MyStore) Close() error {
// db.Close() / redis.Close() / ...
}
测试自定义 Store¶
tasklist 包的测试函数可作为 "Store contract test" 模板:
func TestMyStore_Contract(t *testing.T) {
store := NewMyStore(...)
tl := tasklist.New(store)
// 复制 tasklist_test.go 的测试用例, 换 Store 即可
}
跨行业 Store 选型建议¶
| 场景 | 推荐 Store | 原因 |
|---|---|---|
| 单机开发 / CLI dogfood | MemoryStore | 零配置, 快 |
| 单元测试 | MemoryStore | 确定性, 无 I/O |
| 编程客户需要互操作 Anthropic Claude Code | MarkdownStore | tasks.md 格式对齐 |
| 跨 session / 跨机器协作 (非敏感) | MarkdownStore (NFS) 或 Redis Store | 看网络和延迟需求 |
| 金融合规 (SOX/GDPR) | PostgreSQL + 审计表 | DB 事务原子性 + 审计粒度 |
| 医疗合规 (HIPAA) | 加密存储 (加密 DB / Vault) | 明文文件违规 |
| 仓储 WMS | 集成 WMS 主表 | 避免系统间数据冗余 |
| 法律事务所 | 文档管理系统 API | 审批流 / 版本控制 |
常见问题¶
Q: MarkdownStore 的文件锁够用吗? A: flock 是 advisory lock, 本进程守规矩即可. 跨进程协调 OK. 极端场景 (如 NFS) flock 语义可能有差异, 高并发跨机器建议换 Redis Store.
Q: Version 溢出了怎么办? A: int 64 位, 足以应付 > 2^62 次更新. 单个任务被改亿万次属于系统设计 问题, 不是 Version 溢出问题.
Q: 任务数变多后 MarkdownStore 性能? A: 每次 CAS 是 O(N) 任务读写. ~1000 任务以下无感知, ~10000 任务建议 换 DB Store.
Q: 如何实现任务监听 (实时通知)?
A: 当前 MVP 没有 Watch 方法. 消费层可:
- 轮询 List (简单场景)
- 订阅 flyto.EventObserver 的 task_created 等事件 (精准)
- 等真实需求出现时引擎层加 Store.Watch() <-chan TaskEvent
Q: Anthropic Claude Code 的 tasks.md 格式会变怎么办?
A: MarkdownStore 的 parseMarkdown 采用状态机式解析, 未知字段忽略, 兼容
Anthropic 未来扩展. 若格式根本改变 (如改用 YAML), 在 Flyto 增加
AnthropicV2TasksStore 实现即可, 老的 MarkdownStore 保留做旧兼容.