架构设计文档¶
Flyto Agent Engine 的内部架构,设计原则和关键决策.
目录¶
- 设计原则
- 模块依赖图
- 数据流
- 核心模块详解
- NormalizePipeline 架构
- 三层压缩降级
- SubAgent Fork 模式
- MemoryExtractor 架构
- RelevanceScorer 架构
- MemoryType 注册制
- Bash AST 解析器架构
- 模型角色系统架构
- EngineObserver 可观测性架构
- StrictMode 严格模式
- ToolResultPairing 配对修复
- 防御性编程的三层防御模式
- KAIROS/Dream 系统的架构位置
- 跨行业扩展的架构支撑
- 叠加而非替换(Composite 层)
- ToolCapability 安全协议
- FileHistory 文件历史系统
- OperationLog 统一操作日志
- 安全审计体系(INF-5)
- 数据安全(文件/DB/API 三维度)
- TokenBudgetManager 预算管理
- QueryChainTracking 查询链追踪
- 设计决策记录
设计原则¶
1. 零 UI 依赖¶
引擎不知道自己跑在终端还是 Web 服务里.所有输出通过 Event 接口和 Go channel 推送,消费层自己决定怎么渲染.
2. 流式优先¶
不是"先缓冲再输出",而是 token 一到就推送.TextDeltaEvent 每次只包含几个 token 的增量文本.消费层用 for range 读取 channel 即可实现实时流式输出.
3. 可组合¶
每个子系统都是接口,可替换:
permission.Engine-- 权限检查,可换成自定义实现memory.Store-- 记忆存储,可换成数据库后端permission.Handler-- 权限 UI,CLI 弹对话框,HTTP 走 WebSockettools.Tool-- 工具,可动态注册/注销
4. 状态显式传递¶
不用全局变量和隐式状态.核心 runLoop 的消息历史通过参数传递,会话状态集中在 Session 结构体中,统计信息通过事件推送.
5. 零外部依赖¶
go.mod 只有标准库.HTTP Server 用 Go 1.22+ 标准库路由,JSON 用 encoding/json,正则用 regexp.编译产物是单个静态二进制.
6. 防御性编程¶
永远不信任模型输出.每个模型交互点都有三层防御(指令 + 参数 + 兜底),每个失败路径都有降级策略.详见 defensive-programming.md.
7. 不硬编码模型 ID¶
业务逻辑通过 ModelRole 引用模型(RoleMain,RoleFast,RoleThinking),不直接使用模型 ID 字符串.角色到模型的映射由 ModelRegistry 管理,运行时可修改.
消费者架构(框架 vs 消费者)¶
Flyto Agent 是一个智能体框架,不是一个 CLI 工具.CLI,HTTP Server,TUI 都是它的消费者.
┌─────────────────────────────────────────────────────────────┐
│ 消费者(应用层) │
│ │
│ tui/agent-engine (独立 TUI,占位) │
│ │
│ platform/ 你的 Go 业务代码 │
│ (平台服务,占位) (直接 import pkg/engine) │
└──────────────────────────────┬──────────────────────────────┘
│ 都通过同一个接口消费
▼
┌─────────────────────────────────────────────────────────────┐
│ pkg/engine — 框架本体 │
│ │
│ engine.New(cfg) → *Engine │
│ engine.Session(id) → *Session │
│ session.Send(ctx, prompt) → <-chan Event │
│ │
│ 模型调用规范、token 计费、context window — 都是引擎的职责 │
│ TurnStartEvent 携带 ContextWindowTokens,消费者无需硬编码 │
└─────────────────────────────────────────────────────────────┘
可选 SDK 包(按需 import,引擎不强依赖):
pkg/security/AuditSink — 接口定义(引擎内)
audit-pg/ — PostgreSQL 实现(将来,独立 module)
audit-file/ — 文件实现(将来,独立 module)
tui/agent-engine(占位)¶
TUI 终端入口,当前为占位目录.平台服务由 platform/ 独立实现.
消费者接入的三种 API 形态¶
消费者接入引擎时,面对的接口按机制分三种形态,不按业务领域分:
- 订阅 (push) —
<-chan flyto.Event/flyto.EventObserver: 引擎异步通知 - 调取 (pull) —
Session.Stats()/DenialTracker.Stats()/Classifier.Classify(): 消费者同步查询 - 同步回调 (callback) —
permission.Handler/ApprovalPolicy/ElicitationHandler/AuditSink/HookHandler: 引擎阻塞等决策
完整分类 + 选型速查 + 消费者实现清单 (CLI/SaaS/测试) 见 docs/api-reference.md "API 消费形态" 章节.
模块依赖图¶
消费层(应用层):
tui/agent-engine ─────────┐ ← 终端 UI(占位)
platform/ ─────────┐ │ ← 平台服务(占位)
examples/basic ────┘ │ ← 10 行最小 demo
│
▼
pkg/engine ◄── (你的 Go 代码)
│
┌────────┬────────┬───┴────┬─────────┬──────────┬──────────┐
▼ ▼ ▼ ▼ ▼ ▼ ▼
pkg/query pkg/tools pkg/perm pkg/mem pkg/hooks pkg/plugin pkg/evolve
│ │
┌─────┴─────┐ │
▼ ▼ ▼
pkg/tools/ pkg/tools/ pkg/evolve/
builtin deferred (uses engine)
│
┌─────────────┼──────────────────┐
▼ ▼ ▼
internal/transport internal/git internal/mcp
│
▼
internal/bash internal/tokenizer internal/logger
(AST parser) (token/pricing)
pkg/config
config.go 多级配置加载
models.go ModelRegistry + ModelConfig(= flyto.ModelInfo 类型别名)+ 角色映射
pkg/context
prompts.go 静态提示词常量(9 个英文段落,对应早期方案)
prompts_zh.go 静态提示词中文翻译(9 个段落,语义与英文版一字不差)
bundle.go Section/SectionRegistry/PromptBundle/BundleRegistry/SystemPromptBlock(模块 15)
default_bundle.go claude+programming 默认 Bundle(9 静态 + 6 动态 sections)
bundle_overlay.go BundleOverlay(局部覆盖 base Bundle)/ NewBundleFromFunc(函数式工厂)
chinese_bundle.go NewChineseBundle / ChineseBundleKeys / RegisterChineseBundle(中文模型适配)
compact.go 三层压缩降级(CompactTiered + CircuitBreaker)
通过 flyto.ModelProvider 接口统一所有 Provider(不再接受 Anthropic 专有参数)
compact_persist.go 断路器状态跨进程持久化(FilePersister / CircuitBreakerPersister 接口)
composite.go CompositePolicy(多策略叠加)
policy.go CompactionPolicy 接口
grouping.go 消息按 API 往返分组
restorer.go PostCompactRestorer(压缩后上下文恢复)
context.go 上下文构建(Builder + BuildSystemPromptBlocks)
instructions.go 指令管理
图例:
pkg/ = 公开包(可被外部导入)
internal/ = 内部包(仅本模块可用)
──► = 依赖方向
包职责划分¶
| 包 | 职责 | 关键文件 |
|---|---|---|
pkg/engine |
核心引擎:Engine, Session, Events, Errors, runLoop, SubAgent(fork), NormalizePipeline, Fallback, Reminders, FlushGate | 30+ 个文件 |
pkg/tools |
工具系统:Tool 接口, Registry, Orchestrator, DeferredRegistry | 4 个文件 |
pkg/tools/builtin |
14 个内置工具实现 | 16 个文件 |
pkg/permission |
权限系统:Engine, Rules, Checker, Learning, BashSecurity, Denial, Explainer, Filesystem, CompositeHandler | 12 个文件 |
pkg/memory |
记忆系统:Store, Frontmatter, Relevance, Scanner, RelevanceScorer, CompositeScorer, ExternalScorer, MemoryExtractor, MemoryTypeRegistry, SyncAdapter, GitSyncAdapter | 9 个文件 |
pkg/hooks |
Hook 系统:Manager, Executor, Types, Integration(含 14 种 hook 类型) | 5 个文件 |
pkg/evolve |
自进化:Evolver, ToolBuilder, SkillLearner, SelfReflector, EngineIntegration | 5 个文件 |
pkg/config |
多级配置 + 模型角色系统(ModelRegistry) | 2 个文件 |
pkg/context |
系统提示构建,三层压缩降级(CompactTiered),CompositePolicy,CompactCircuitBreaker,指令管理 | 8 个文件 |
pkg/query |
查询引擎:Message(含 Metadata 扩展字段), Content 类型定义 | 1 个文件 |
pkg/plugin |
插件系统:Host, Manifest, Loader, Skill | 4 个文件 |
internal/transport |
通用 SSE 流式 HTTP 客户端:SSE 解析,Prompt Caching,Beta Headers,错误分类,预连接,流守卫.所有配置通过 ClientOption 注入(WithMessagePath/WithAPIVersion/WithRetryPolicy/WithClassifier),无 Anthropic 默认值 | 8 个文件 |
internal/syslib/bash |
Bash AST 解析器:AST 节点,解析器,信息提取,Heredoc | 4 个文件 |
internal/syslib/diff |
文件差异计算(Myers 算法) | 2 个文件 |
internal/syslib/git |
Git 操作封装:GetInfo + validateRef 注入防御,供 context 提示词组装消费 (memory 同步走 execenv.Executor DI, 刻意独立) | 1 个文件 |
internal/mcp |
MCP 协议:Client, Bridge, JSON-RPC, Manager, Types | 5 个文件 |
internal/tokenizer |
Token 估算和模型定价 | 1 个文件 |
internal/logger |
结构化日志 | 1 个文件 |
pkg/websocket |
WebSocket 传输层 | 2 个文件 |
数据流¶
单次运行数据流¶
用户输入 "修复 bug"
│
▼
Engine.Run(ctx, prompt)
│
▼ (goroutine)
runLoop()
│
├──► 1. buildSystemPromptWithContext()
│ 构建系统提示:基础提示 + 工具描述 + 环境信息 + 用户追加
│ 启用 Prompt Caching → 静态段落标记 cache_control
│
├──► 1.5 ReminderSystem.CollectReminders()
│ 检查日期变更 / 文件外部修改 / 相关记忆
│ → 注入 <system-reminder> 到本轮消息
│
├──► 2. 构建 messages(用户消息 + 历史)
│
├──► 2.5 NormalizePipeline.Run(messages)
│ 10 步规范化管道:tool_use/tool_result 配对修复(4 case)
│ → 孤立 tool_result 移除 → 错误内容剥离
│ → 孤立 thinking 过滤 → 空消息过滤 → 空白 assistant 过滤
│ → tool_use 输入规范化 → 连续同角色合并 → 图片验证
│
├──► 3. client.CreateMessageStream() ──► 模型 API
│ Beta Headers: prompt-caching, extended-thinking, effort...
│ 流式 SSE 连接
│
├──► 4. 处理流式事件
│ EventContentBlockDelta
│ ├── text → TextDeltaEvent → ch
│ ├── thinking → ThinkingDeltaEvent → ch
│ └── tool_use → 累积 JSON
│ EventContentBlockStop
│ ├── text → TextEvent → ch
│ ├── thinking → ThinkingEvent → ch
│ └── tool_use → ToolUseEvent → ch
│
├──► 5. 如果 stop_reason == "tool_use"
│ │
│ ├── 权限检查(PermissionEngine.Check)
│ │ ├── BashSecurity: AST 解析 → 命令提取 → 安全分析
│ │ └── 规则匹配 / 风险评估 / 用户决策
│ │
│ ├── orchestrator.ExecuteBatch()
│ │ ├── 分批:连续 ConcurrencySafe 工具并行
│ │ ├── 非 ConcurrencySafe 工具串行
│ │ ├── 大结果 → ResultStore 存磁盘 → 返回摘要
│ │ └── 每个结果 → ToolResultEvent → ch
│ │
│ ├── 追加 tool_result 到消息历史
│ └── → 回到步骤 3(继续调用 API)
│
├──► 6. 如果 stop_reason == "end_turn" 或 "stop_sequence"
│ → DoneEvent → ch → close(ch)
│
├──► 7. 如果 stop_reason == "max_tokens"
│ → 增大 maxTokens → 追加 "please continue" → 回到步骤 3
│
├──► 8. API 调用失败
│ → FallbackTracker.ShouldFallback(err)
│ → 有备用模型 → 切换模型 → 回到步骤 3
│ → 无备用模型 → ErrorEvent
│
└──► 每轮检查:
├── maxTurns 限制
├── maxBudget 限制(80% 时 WarningEvent)
└── maybeCompact()(MicroCompact → CompactTiered 三层降级)
Bash 工具执行数据流(含 AST 解析)¶
模型返回 tool_use: Bash {command: "sudo npm install && git push"}
│
▼
权限系统 (BashSecurity)
│
├──► 1. bash.Parse(command)
│ AST 解析(处理 heredoc / 引号 / 命令替换 / 算术展开)
│
├──► 2. bash.ExtractCommands(ast)
│ 递归遍历 AST → 提取 CommandInfo 列表
│ [sudo npm install, git push]
│
├──► 3. bash.ExtractCommandName(cmd)
│ 跳过前缀(sudo/env/nohup/...)
│ → ("npm", "install"), ("git", "push")
│
├──► 4. 命令分类 + 安全检查
│ ClassifyShellCommand → ClassGeneral
│ 检查重定向目标是否静态
│ 检查危险命令(rm -rf, git push --force)
│
└──► 5. 权限决策
├── 规则匹配 → Allow/Deny
└── 需要询问 → PermissionRequestEvent → 用户决策
权限交互流程¶
Engine runLoop
│
▼
工具需要权限
│
├──► PermissionEngine.Check(req)
│ │
│ ├── 匹配规则 → Allow/Deny(直接返回)
│ │
│ └── 需要询问 → DecisionAsk
│ │
│ ├──► ExplainPermissionRequest() → 人类可读描述
│ ├──► AssessRisk() → 风险等级
│ ├──► SuggestRules() → 建议的永久规则
│ │
│ └──► PermissionHandler(ctx, req)
│ │
│ ├── CLI: 终端弹对话框
│ ├── HTTP: PermissionRequestEvent → SSE
│ │ 等待 POST /v1/sessions/{id}/permissions/{rid}
│ └── SDK: 回调函数
│
└──► 记录决策到 LearningTracker
(达到阈值时生成 PermissionLearnEvent)
会话生命周期¶
agent.Session("id")
│
├── newSession(): 创建 Session 结构体
│ messages: []
│ pendingPermissions: map{}
│ stats: 0
│
▼
session.Send(ctx, prompt)
│
├── 快照当前消息历史
├── engine.Run(ctx, prompt, WithMessages(history))
│ 返回 rawEvents channel
│
├── goroutine: trackEvents()
│ ├── 转发事件到消费层
│ ├── 收集 assistantText
│ ├── 收集 token 统计
│ └── 完成时:
│ ├── 追加 user message 到 history
│ ├── 追加 assistant message 到 history
│ └── 更新统计 (inputTokens, outputTokens, costUSD, turnCount)
│
▼
session.Send(ctx, next_prompt) ← 第二轮自动包含第一轮历史
│
...
│
▼
session.Close()
│
└── SaveTranscript() → ~/.flyto/projects/<hash>/transcripts/<id>.json
核心模块详解¶
Engine (pkg/engine)¶
引擎是系统的中央协调器.它的职责:
- 初始化子系统 -- 创建 Registry,PermissionEngine,MemoryStore,HookManager,PluginHost,ModelRegistry,ReminderSystem,FileStateCache,ResultStore
- 管理 runLoop -- 核心查询循环(调用 API -> 处理事件 -> 执行工具 -> 循环)
- 管理 Session -- 有状态的多轮对话 + 会话持久化
- 上下文管理 -- 系统提示构建,自动压缩,系统提醒注入
- 模型降级 -- FallbackTracker 在 API 失败时自动切换备用模型
- 子 Agent 管理 -- SubAgentRegistry 跟踪所有活跃的子 Agent
关键类型:
- Engine -- 主入口,持有所有子系统引用
- Config -- 统一配置结构体(含 ModelRegistry,CompactionPolicies,PermissionHandlers,Observer,StrictMode)
- Session -- 多轮会话状态
- Event -- 事件接口(16 种具体类型)
- EngineError -- 结构化错误(15 种错误码)
- EventObserver -- 可观测性核心接口(Event + Error)
- MetricObserver -- 指标接口(可选实现,接口断言检测)
- TraceObserver -- 调用链接口(可选实现)
- StrictMode -- 严格模式配置(ToolResultPairing / CompactFailure / NormalizerError)
- NormalizePipeline -- 10 步可组合消息规范化管道
- MessageNormalizer -- 规范化步骤接口
- FallbackTracker -- 模型降级追踪器
- ReminderSystem -- 系统提醒管理
- FileStateCache -- 文件状态缓存(LRU + hash 检测外部修改)
- ResultStore -- 大结果磁盘持久化
- SkillLoader -- 技能文件加载器
- SubAgent / SubAgentRegistry -- 子 Agent 系统(Fork 模式,共享 prompt cache)
- WorktreeInfo -- Git Worktree 管理
- InputProcessor -- 输入预处理(文件引用,图片,URL)
- ToolSummaryGenerator -- 工具结果摘要生成
- FlushGate[T] -- 泛型消息排队门(压缩/重连期间暂存消息)
Query (pkg/query)¶
query.Message 结构新增 Metadata map[string]interface{} 可扩展元数据字段:
- is_attachment -- 标记附件消息
- attachment_type -- 附件类型
- is_virtual -- 虚拟消息(不发 API)
- error_source -- 错误来源映射
设计理念:不为每个场景加专用字段,避免结构体僵化.
Tools (pkg/tools)¶
工具系统由四部分组成:
- Tool 接口 -- 所有工具的统一契约(Name, Description, InputSchema, Execute)
- Registry -- 线程安全的工具注册表(支持别名,动态注册/注销)
- Orchestrator -- 并发调度器(连续 ConcurrencySafe 工具并行,非安全工具串行)
- DeferredRegistry -- 延迟加载系统(工具数超过阈值时非核心工具按需激活)
延迟加载机制:
工具总数 <= 15 → 全部活跃(无延迟加载)
工具总数 > 15 → 核心工具(Bash/Read/Edit/Write/Glob/Grep/Agent/ToolSearch)始终活跃
其余通过 ToolSearch 按需发现和激活
编排算法:
输入: [Glob, Grep, Edit, Glob, Grep]
分批: [[Glob, Grep](并行), [Edit](串行), [Glob, Grep](并行)]
执行: batch1 → 并行 → 等待 → batch2 → 串行 → batch3 → 并行 → 等待
Permission (pkg/permission)¶
权限系统的四层检查:
- 模式判断 -- bypass 直接放行,plan 直接拒绝
- 规则匹配 -- 前缀匹配(Bash 命令),路径 glob(文件操作),域名匹配(WebFetch)
- 风险评估 -- 检测危险命令(rm -rf, git push --force 等)
- 用户决策 -- 通过 Handler 接口让消费层决策
Bash 命令安全分析(pkg/permission/bash_security.go)现在基于 AST 解析而非字符串分割:
旧方式: strings.Split(cmd, " ") → 无法处理引号/heredoc/管道
新方式: bash.Parse(cmd) → AST → ExtractCommands() → 完整的命令信息
Memory (pkg/memory)¶
记忆系统使用文件系统存储,每条记忆是一个带 YAML frontmatter 的 markdown 文件.
存储路径:~/.flyto/projects/<sha256(cwd)[:16]>/memory/
记忆类型通过 MemoryTypeRegistry 分层注册制管理(详见 MemoryType 注册制).内置 4 种编程场景类型:
- User -- 用户画像(偏好和习惯)
- Feedback -- 行为指导(根据反馈调整)
- Project -- 项目上下文(结构,技术栈,约定)
- Reference -- 外部指针(链接和摘要)
相关性搜索通过 RelevanceScorer 接口实现,默认使用文本相似度评分(Jaccard + token 权重 + 子串匹配),支持 CompositeScorer 加权组合和 ExternalScorer 跨语言桥接.
记忆提取通过 MemoryExtractor 接口实现,策略与执行分离.
记忆同步(模块 10.2)¶
SyncAdapter 接口 + SyncConfig 组合实现可插拔同步,调用方通过 WithSyncAdapter 选项注入.
fileStore.List() / FindRelevant()
└── maybePull(ctx) ← 按 PullPolicy 决定是否 Pull
└── SyncAdapter.Pull(ctx, baseDir)
fileStore.Save()
└── maybePush(ctx) ← Save 成功后同步 Push,失败只记录事件
└── SyncAdapter.Push(ctx, baseDir, ConflictPolicy)
核心决策:Pull 失败 fail-open(本地缓存继续可用),Push 失败仅发 Observer 事件(Save 仍返回成功).
| 模式 | 推荐配置 |
|---|---|
| CLI 单用户 | DefaultSyncConfig() + GitSyncAdapter |
| API 高频无状态 | APISyncConfig(5*time.Minute) + HTTPSyncAdapter(P2) |
| 离线/不需要同步 | 不设置(默认 NoopSyncAdapter,IsAvailable=false,零 overhead) |
Hooks (pkg/hooks)¶
14 种 Hook 类型(模块 9.2 新增 pre/post-sampling):
| Hook 类型 | 触发时机 | 执行方式 | 可阻止 |
|---|---|---|---|
pre_sampling |
API 调用前(每轮) | 同步 | ✅ exit 非零终止本轮 |
post_sampling |
API 响应后,工具执行前 | 异步 | ❌ |
pre_tool_use |
工具执行前 | 同步 | ✅ exit 2 阻止 |
post_tool_use |
工具执行成功后 | 同步 | ❌ |
post_tool_use_failure |
工具执行失败后 | 同步 | ❌ |
session_start |
会话开始 | 异步 | ❌ |
session_end |
会话结束 | 异步 | ❌ |
permission_request |
请求权限时(可通过 JSON 输出自动批准) | 同步 | ✅ |
permission_denied |
权限被拒绝 | 同步 | ❌ |
stop |
Agent 停止 | 同步 | ✅ |
notification |
发送通知 | 异步 | ❌ |
config_change |
配置变更 | 同步 | ❌ |
task_created |
子任务创建 | 同步 | ❌ |
task_completed |
子任务完成 | 同步 | ❌ |
执行策略: - 同步 Hook 按注册顺序执行,fail-open(失败不阻塞后续 Hook) - 异步 Hook 在后台 goroutine 执行,使用独立 context - Hook 命令通过环境变量接收输入,通过 stdout JSON 返回控制指令
插件-Hook 桥接(模块 9.3)¶
HookDef.Source 字段(空字符串=全局,非空=插件名)实现单注册表多来源精准管理.
Engine.New()
├── 加载 HooksConfig → Register(hooks) // Source="" 全局
└── syncPluginHooks()
├── UnregisterAllBySource(pluginName) // 原子清除旧状态
└── Register(hooks, Source=pluginName) // 重新注册已启用插件
优先级由注册顺序自然保证(全局先注册 → 全局先执行).Enable/Disable/Load 均触发 syncPluginHooks() 完整重建,避免 hook 残留.
NormalizePipeline 架构¶
pkg/engine/normalize.go + pkg/engine/normalizer.go + pkg/engine/norm_*.go(含 norm_tool_result_pairing.go)
从硬编码 3 步升级为 10 步可组合 Pipeline.每个步骤实现 MessageNormalizer 接口,通过 Priority 排序执行.
MessageNormalizer 接口:
Name() string
Priority() int // 越小越先执行
Normalize(messages []query.Message) []query.Message
NormalizePipeline:
Add(step) 追加步骤
Remove(name) 按名称移除
Run(msgs) 按 Priority 排序后依次执行
10 步管道(按 Priority 排序)¶
| Priority | 步骤 | 职责 |
|---|---|---|
| 5 | AttachmentReorderer | 附件消息上浮(暂未启用) |
| 8 | ToolResultPairingNormalizer | 完整的 tool_use/tool_result 配对修复(4 种 case),通过 Observer 记录修复,严格模式下 panic |
| 10 | OrphanToolResultRemover | 移除无对应 tool_use 的 tool_result |
| 15 | ErrorContentStripper | 剥离错误内容中的噪音模式 |
| 18 | OrphanThinkingFilter | 过滤纯 thinking 的 assistant 消息 |
| 20 | EmptyMessageFilter | 过滤空消息 |
| 22 | WhitespaceAssistantFilter | 过滤纯空白 assistant 消息 |
| 25 | ToolUseInputNormalizer | 规范化 tool_use 输入 |
| 30 | ConsecutiveRoleMerger | 合并连续同角色消息 |
| 50 | ImageValidator | 验证图片大小(默认 20MB 上限) |
设计决策¶
- Priority 隐式排序:每个步骤只关心自己排在第几,新增步骤不需要知道其他步骤的存在
- 每次 Run 排序:允许 Add 之后,Run 之前动态调整 Priority
- 最后一道防线:即使上层 runLoop 完美,压缩,恢复等操作都可能产生畸形消息序列
- 场景可扩展:仓储场景追加
SensorDataNormalizer,法律场景追加RedactionNormalizer
pipeline := DefaultNormalizePipeline()
pipeline.Add(&SensorDataNormalizer{}) // 仓储
pipeline.Add(&RedactionNormalizer{}) // 法律
normalized := pipeline.Run(messages)
三层压缩降级¶
pkg/context/compact.go -- CompactTiered 方法
当上下文接近模型窗口时,按三层降级策略压缩:
CompactTiered(ctx, messages, policy)
│
├── 断路器检查: circuitBreaker.ShouldAttempt()
│ └── 打开 → 返回错误
│
├── 第 1 层: singleCompact(单次压缩)
│ 估算 token <= compactModelWindow × 85%
│ 完整对话 → API 生成摘要 → 摘要 + 最近消息
│ └── 成功 → 返回, PTL → 进入第 2 层
│
├── 第 2 层: truncateAndRetryCompact(砍头重试)
│ 逐步砍掉最旧的消息组(保留 preamble)
│ 最多重试 maxTruncateRetries 次
│ └── 成功 → 返回, PTL → 进入第 3 层
│
└── 第 3 层: chunkedCompact(分块压缩)
消息按 token 预算切分为多块
每块独立压缩 → 合并摘要
└── 成功/失败 → 返回
CompactCircuitBreaker¶
断路器防止反复调用失败的 API:
- 连续失败 N 次后打开(默认 3 次),拒绝后续压缩请求
- rate limit (429/529) 不计入失败次数(rate limit 通常 30 秒恢复,断路器关闭后要重启才能重试)
- 时间重置(默认 5 分钟后自动 Reset,不需要半开状态和后台定时器)
- 成功后立即 Reset
CircuitBreakerPersister(6.5 失败计数持久化)¶
pkg/context/compact_persist.go
跨进程保持断路器状态,解决 daemon 崩溃重启后每次浪费 3 次 API 调用的问题:
- 接口
CircuitBreakerPersister(Save/Load),默认实现FilePersister(JSON 文件) - per-project 隔离:文件名 = SHA-256(cwd)[:16],不同项目互不干扰
- TTL = 1 小时:超期状态加载时自动丢弃,防止历史失败永久锁死压缩
- 原子写入(write-then-rename),防止进程崩溃产生截断的 JSON
- fail-open:Load/Save 失败时静默记录 observer event,不影响压缩功能
- 注入方式:
compressor.SetPersister(p)(Setter 模式,向后兼容)
反向思考决策记录¶
- 为什么第 2 层逐步砍而不一次算好? token 估算有 20% 误差,一次性计算可能砍多或砍少.逐步砍保证不会砍过头.
- 为什么 rate limit 不计入? rate limit 通常 30 秒恢复,如果计入会触发断路器,之后要重启 Engine 才能重试.不计入的话,等 rate limit 过了自然能压缩.
- 为什么用时间重置而不是半开状态? 简单,不需要后台 goroutine.5 分钟足以让大多数临时问题恢复.
- 为什么 per-project 而非全局? 同机器多个项目,A 项目的 prompt_too_long 失败不应污染 B 项目的断路器状态.
SubAgent Fork 模式¶
pkg/engine/subagent.go
子 Agent 从独立 Engine 实例改为共享 prompt cache 的 fork 模式.
父 Engine
│
├── API Client ────────────┐
├── System Prompt ─────────┤ 共享(cache key 一致)
├── 完整工具定义列表 ───────┤
│ │
└── fork() ──► SubAgent │
│ ├── 复用 ◄──┘
│ ├── 独立消息历史
│ ├── allowedTools(运行时拦截)
│ └── 独立轮数限制
经济账¶
API 层传完整工具列表(和父 agent 一样)确保 cache key 一致,运行时用 canUseTool 函数拦截实际执行.
以 Sonnet 定价计算: - 多传 3K 没用的工具描述:多花 $0.009(3K x $3/M) - 省下 10K 系统提示的缓存命中:省 $0.027(9K x $3/M 变成 0.9K x $0.3/M) - 每个子 agent 调用净省 ~$0.018 - 一个复杂任务 10 个子 agent = 省 $0.18
安全性¶
canUseTool 在运行时拦截.子 agent 请求被禁工具会收到错误消息,模型自己换工具.安全性不降低.
MemoryExtractor 架构(模块 10.3)¶
pkg/memory/extractor.go
配方和厨房分离:Extractor 定义"提取什么",SubAgent fork 模式负责"怎么跑".
MemoryExtractor 接口:
Name() string
ShouldExtract(turnCount, lastExtractTurn) bool
BuildPrompt(existingMemories, newMessageCount) string ← 新增 newMessageCount
AllowedTools() []string
MaxTurns() int
│
▼
Engine.scheduleMemoryExtraction(ctx, messages, turnCount)
├── 单飞:extractInProgress 互斥
├── 后置补跑:extractPending stash(最新覆盖)
└── Engine.runMemoryExtraction(ctx, messages, turnCount)
├── hasMemoryWritesSince:主 agent 已写 → 跳过
├── newMessageCount = len(messages) - lastExtractMsgIdx
├── BuildPrompt(existingMemories, newMessageCount)
└── SpawnSubAgent(
HistoryMessages: messages, ← 传入完整对话历史
MemoryDirRestrict: e.mem.Dir(), ← Edit/Write 限制在记忆目录
)
DefaultCodeExtractor¶
内置编程场景提取器: - 每 5 轮对话触发一次(对应一个完整的"提问→探索→实现→验证"子任务周期) - BuildPrompt 注入 newMessageCount:"Analyze the most recent ~N messages"(精准定位) - 并行策略提示词:Turn 1 所有 Read 并行,Turn 2 所有 Write/Edit 并行(节省轮次) - 关注:项目结构,代码规范,技术决策,用户偏好 - 允许工具:Read, Grep, Glob, Edit, Write(不允许 Bash/Agent) - 最大 5 轮
自定义场景扩展¶
实现 MemoryExtractor 接口即可注入不同场景的提取策略:
type WarehouseMemoryExtractor struct{}
func (e *WarehouseMemoryExtractor) BuildPrompt(existing []*Entry, count int) string {
// 关注:SKU规则、货架布局、库存阈值、工作流偏好
return fmt.Sprintf("Analyze the most recent ~%d messages...", count)
}
SubAgent.HistoryMessages + MemoryDirRestrict¶
SubAgentConfig 新增两个字段:
| 字段 | 用途 |
|---|---|
HistoryMessages []api.RequestMessage |
预置父对话历史,SubAgent 有内容可分析 |
MemoryDirRestrict string |
非空时 Edit/Write 只允许写入此目录 |
记忆新鲜度系统(模块 10.4)¶
pkg/memory/freshness.go
记忆是点时刻观测值,随时间可能过时.新鲜度系统通过两个维度告知模型:
FreshnessConfig¶
type FreshnessConfig struct {
GlobalThreshold time.Duration // 全局阈值,0 = 总是警告
TypeOverrides map[string]time.Duration // 按类型覆盖
}
典型配置: | 场景 | GlobalThreshold | TypeOverrides | |------|----------------|---------------| | CLI 默认 | 24h | - | | 仓储 | 24h | project=2h | | 医疗 | 0(总是警告) | - |
三个输出层¶
FreshnessText → 自然语言警告("This memory is 3 days old...")
FreshnessNote → <system-reminder> 包裹,运行时注入到 CheckMemoryRelevance
IndexAnnotation → "_(last updated 3 days ago)_" 嵌入 MEMORY.md 行末
数据流¶
Config.FreshnessConfig (nil = 不启用)
│
├──→ memory.WithFreshness(cfg) → fileStore.freshnessConfig
│ └── UpdateIndex: IndexAnnotation() 替换硬编码 30 天
│
└──→ reminderSys.freshnessConfig
└── CheckMemoryRelevance: FreshnessNote() 追加到每条相关记忆
TruncateIndex 双重截断¶
MEMORY.md 写入前应用两重限制(防止超大索引压缩 context window): - 200 行上限(对应早期方案的 sliceLines 200) - 25KB 字节上限(新增防线,覆盖长描述场景) - 超出任一限制:截断 + 末尾追加 WARNING(告知模型有内容未显示)
RelevanceScorer 架构¶
pkg/memory/scorer.go
从包级函数 Score() 提升为接口,支持不同场景注入不同评分策略.
RelevanceScorer 接口:
Name() string
Score(query string, header *MemoryHeader) float64
实现:
├── TextScorer 默认文本相似度(Jaccard + token 权重 + 子串匹配)
├── CompositeScorer 多评分器加权组合(支持运行时 Add/Remove)
└── ExternalScorer 桥接外部进程(stdin/stdout JSON Lines 协议)
CompositeScorer 使用示例:
composite := NewCompositeScorer(
WeightedScorer{Scorer: &TextScorer{}, Weight: 0.7},
WeightedScorer{Scorer: warehouseScorer, Weight: 0.3},
)
ExternalScorer 跨语言桥接:
请求: {"query": "...", "name": "...", "description": "...", "type": "..."}
响应: {"score": 0.85}
失败时返回 0.0(不阻断主流程)
SelectRelevant 向后兼容¶
SelectRelevant(query, headers, limit, scorer...) 使用变参实现可选参数.旧代码 SelectRelevant(q, h, n) 继续工作,新代码 SelectRelevant(q, h, n, myScorer) 注入自定义评分器.
MemoryType 注册制¶
pkg/memory/types_registry.go
从 4 个硬编码 const 升级为分层注册制,支持跨行业扩展.
MemoryTypeRegistry:
parent *MemoryTypeRegistry // 上级注册表(只读继承)
local map[string]*MemoryTypeInfo // 本级注册
分层继承:
org 级(管理层) → team 级(运营) → local 级(加盟仓)
sla_rule exception_pattern warehouse_layout
查询: local 优先,沿 parent 链向上冒泡
注册: 只写入本级 local map,不污染上级
多格式提示词¶
FormatForPrompt(format) 根据模型自动选择格式:
- Claude 模型 → XML(训练有素)
- GPT/Gemini → Markdown
- AutoPromptFormat(modelID) 自动检测
MemoryTypeInfo 结构¶
每种类型包含:Name, Scope, Description, WhenToSave, HowToUse, BodyStructure, Examples, SortOrder.这些信息注入系统提示词,指导模型何时保存,如何使用.
Bash AST 解析器架构¶
internal/syslib/bash/ 包实现了一个宽容的 Bash 命令 AST 解析器,用于安全分析.
架构分层¶
┌──────────────────────────────────────────────────────┐
│ 调用方 │
│ pkg/permission/bash_security.go │
│ pkg/tools/builtin/bash_classify.go │
└───────────────┬──────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ extract.go(信息提取层) │
│ ExtractCommands(ast) → []*CommandInfo │
│ ExtractCommandName(cmd) → (command, subcommand) │
│ GetCommandPrefixes(cmd) → []string │
│ IsStaticRedirectTarget(target) → bool │
└───────────────┬──────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ parser.go(语法解析层) │
│ Parse(source) → *Node (AST root) │
│ 处理:管道 | 列表 && || ; & | 简单命令 | 复合命令 │
│ 处理:if/for/while/case | 函数 | 子 shell │
│ 处理:重定向 > >> < 2>&1 | 变量赋值 VAR=val │
│ 处理:引号 "..." '...' $'...' | 命令替换 $() `` │
│ 处理:进程替换 <() >() | 算术展开 $(()) │
│ 处理:变量展开 $VAR ${VAR} | 注释 # │
└───────────────┬──────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ heredoc.go(Heredoc 预处理层) │
│ PreprocessHeredocs(source) → (processed, []info) │
│ 在主解析之前识别并提取 heredoc body │
│ 支持 <<EOF <<-EOF <<'EOF' <<"EOF" <<\EOF │
│ 正确跳过引号内和 $(()) 内的 << │
└──────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────┐
│ ast.go(AST 节点定义) │
│ 20 种节点类型:Program, Pipeline, List, ... │
│ Node 结构体:Type, Value, Children, 特殊字段 │
│ 辅助方法:CommandName(), CommandArgs() │
└──────────────────────────────────────────────────────┘
20 种 AST 节点类型¶
| 节点类型 | 对应 Bash 语法 |
|---|---|
NodeProgram |
顶层根节点 |
NodePipeline |
cmd1 \| cmd2 |
NodeList |
cmd1 && cmd2 \|\| cmd3 |
NodeSimpleCommand |
ls -la |
NodeCompoundCommand |
{ cmd; } |
NodeSubshell |
(cmd) |
NodeIf |
if/elif/else/fi |
NodeFor |
for/do/done |
NodeWhile |
while/do/done |
NodeCase |
case/esac |
NodeFunction |
function f() { ... } |
NodeRedirection |
> >> < 2>&1 |
NodeHeredoc |
<<EOF ... EOF |
NodeAssignment |
VAR=value |
NodeWord |
单词 |
NodeCommandSubstitution |
$(cmd) 或 `cmd` |
NodeProcessSubstitution |
<(cmd) 或 >(cmd) |
NodeArithmeticExpansion |
$((expr)) |
NodeVariableExpansion |
$VAR 或 ${VAR} |
NodeQuotedString |
"..." 或 '...' 或 $'...' |
NodeComment |
# ... |
宽容解析¶
解析器采用宽容模式:无法识别的语法不报错,标记为 NodeWord 节点.这保证了即使遇到非标准或超复杂的 Bash 语法,解析也不会失败,安全分析可以在不完整的 AST 上进行.
模型角色系统架构¶
┌────────────────────────────────────────────────────────────┐
│ Engine (使用角色引用模型) │
│ │
│ runLoop() → registry.GetRole(RoleMain) → 主对话 │
│ compact() → registry.GetRole(RoleFast) → 摘要压缩 │
│ toolSummary() → registry.GetRole(RoleFast) → 结果摘要 │
│ thinking() → registry.GetRole(RoleThinking) → 深度推理 │
│ costCalc() → registry.EstimateCost(modelID) → 成本估算 │
└─────────────────────────┬──────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ ModelRegistry(线程安全) │
│ │
│ roles: map[ModelRole]string │
│ (DefaultRoles 为空,引擎不预设角色映射) │
│ 所有角色 fallback 到 Config.Model │
│ │
│ models: map[string]*ModelConfig │
│ ModelConfig = flyto.ModelInfo(类型别名,统一类型) │
│ 定价数据为空(由框架层运行时获取,引擎不硬编码) │
│ │
│ API: SetRole / GetRole / Register / EstimateCost / ... │
└────────────────────────────────────────────────────────────┘
详见 model-roles.md.
EngineObserver 可观测性架构¶
pkg/engine/observer.go
结构化的事件/指标/调用链流,给监控系统,数据仓库,告警平台消费.与 internal/logger 互补:logger 是给开发者看的文本日志,Observer 是给系统消费的结构化数据.
接口层次¶
EventObserver(核心,必须实现)
├── Event(name, data) 离散事件(某件事发生了)
└── Error(err, context) 错误事件(含上下文,用于告警)
MetricObserver(可选,接口断言检测)
└── Metric(name, value, tags) 数值指标
TraceObserver(可选,接口断言检测)
├── SpanStart(name, tags) → spanID
└── SpanEnd(spanID, err)
可选接口使用 type assertion 检测,未实现的 Observer 不会收到对应调用,零开销.
实现层次¶
EventObserver 接口
│
├── NoopObserver 空实现(未配置时用,避免 nil 检查散布各处)
│
├── StderrObserver 开发调试用(MinLevel 过滤,mu 加锁)
│ MinLevel: "debug" / "info" / "warn" / "error"
│ Output: io.Writer(默认 os.Stderr)
│
├── CompositeObserver 多路复合(同时发到 DataDog + 审计日志 + stderr)
│ observers: []EventObserver
│ Metric/Span 只转发到实现了对应接口的子 observer
│
└── BufferedObserver 异步缓冲(不阻塞热路径)
inner: EventObserver(实际发送方)
buffer: chan observerEntry(缓冲区)
batchSize: 批量大小(默认 100)
interval: 刷新间隔(默认 1s)
Event 非阻塞发送(缓冲区满丢弃)
Error 优先发送(缓冲区满直接同步)
Close() 关闭并等待刷新完毕
埋点位置¶
引擎内关键埋点:
| 事件名 | 触发位置 | 用途 |
|---|---|---|
tool_result_pairing_repaired |
ToolResultPairingNormalizer | 配对修复发生时记录修复详情 |
strict_mode_would_fail |
StrictMode.Check | 非严格模式下记录"本应失败"的条件 |
api_latency_ms |
runLoop API 调用 | API 延迟指标(MetricObserver) |
token_usage |
轮次结束 | Token 用量指标(MetricObserver) |
压缩模块(pkg/context/compact.go)¶
通过 flyto.EventObserver 契约直接接入,Compressor.SetObserver() 注入 (2026-04-16 / L1326 衍生 a 重构: 原 CompactObserver 本地接口删除, context 直接消费 flyto 契约).
| 事件名 | 触发位置 | 用途 |
|---|---|---|
compact_started |
CompactWithStrategy / CompactTiered 开始 | 记录压缩触发时的 token 量和策略 |
compact_completed |
CompactWithStrategy / CompactTiered 完成 | 压缩效果评估(before/after token,耗时,恢复项数) |
compact_circuit_breaker_open |
CompactTiered 断路器触发 | 压缩连续失败告警,需要人工介入 |
compact_chunked |
chunkedCompact 分块压缩 | 记录分块数和并行度,用于 API 成本分析 |
micro_compact_triggered |
DoMicroCompact | 轻量修剪效果追踪 |
记忆模块(pkg/memory/memory.go)¶
通过 flyto.EventObserver 契约直接接入,WithObserver() functional option 注入 (2026-04-16 / L1326 重构: 原 MemoryObserver 本地接口删除, memory 直接消费 flyto 契约).
| 事件名 | 触发位置 | 用途 |
|---|---|---|
memory_saved |
Save 成功后 | 记忆增长趋势和磁盘空间监控 |
memory_deleted |
Delete 成功后 | 安全审计(谁删了什么) |
memory_search |
FindRelevant 完成后 | 搜索延迟和结果质量分析 |
memory_index_updated |
UpdateIndex 完成后 | 索引更新频率和条目数追踪 |
memory_symlink_detected |
checkSymlinkWithObserver | 安全审计(符号链接检测) |
Hook 模块(pkg/hooks/hooks.go)¶
通过 flyto.EventObserver 契约直接接入,Manager.SetObserver() 注入 (2026-04-16 / L1326 衍生 a 重构: 原 HookObserver 本地接口删除, hooks 直接消费 flyto 契约).
| 事件名 | 触发位置 | 用途 |
|---|---|---|
hook_started |
Execute 开始 | Hook 执行性能分析起点 |
hook_executed |
单个 Hook 完成 | 每个 hook 的耗时,退出码,JSON 输出 |
hook_timeout |
单个 Hook 超时 | 用户脚本挂起告警 |
hook_async_started |
ExecuteAsync 开始 | 后台 hook 任务追踪 |
自进化模块(pkg/evolve/evolve.go)¶
通过 flyto.EventObserver 契约直接接入,Config.Observer 构造函数注入 (2026-04-16 / L1326 衍生 a 重构: 原 EvolveObserver 本地接口删除, evolve 直接消费 flyto 契约).
| 事件名 | 触发位置 | 用途 |
|---|---|---|
evolution_proposed |
Propose 提案提交 | 自进化意图审计 |
evolution_approved |
Propose 审批通过 | 能力变更审计 |
evolution_rejected |
Propose 审批拒绝 | 进化策略优化分析 |
tool_created |
apply 工具创建成功 | Agent 能力扩展里程碑 |
skill_learned |
apply 技能学习成功 | 自进化有效性评估 |
reflection_saved |
apply 反思记录保存 | Agent 元认知深度指标 |
Config 集成¶
// Engine 初始化时:
observer := cfg.Observer
if observer == nil {
observer = &NoopObserver{} // 永不为 nil
}
子模块 Observer 适配¶
memory / context / hooks / evolve 四个子模块均已收敛到 flyto.EventObserver 契约 (memory: 2026-04-16 L1326; context / hooks / evolve: 2026-04-16 L1326 衍生 a).消费者一个 flyto.EventObserver 实例可喂给所有子模块 + engine 本身,不再需要鸭子类型隐式实现的隐性 coupling.
// engine.go 中的集成示例:
compressor.SetObserver(observer) // observer 是 flyto.EventObserver, 契约强制匹配
hooksMgr.SetObserver(observer) // 同上
StrictMode 严格模式¶
pkg/engine/strict.go
来自 inc-4977 事故的教训:安全评估时不能让引擎静默修改模型看到的上下文.
三维控制¶
| 字段 | 控制范围 | true 行为 | false 行为 |
|---|---|---|---|
ToolResultPairing |
消息配对异常 | panic | 修复 + Observer 记录 |
CompactFailure |
压缩失败 | panic | 降级 + Observer 记录 |
NormalizerError |
规范化异常 | panic | 跳过 + Observer 记录 |
Check 方法¶
StrictMode.Check(condition, enabled, observer, detail) 一行代码同时处理严格模式和可观测性:
- enabled=true -- panic(中断执行)
- enabled=false -- Observer.Event("strict_mode_would_fail", ...)(记录但继续)
便捷方法:CheckToolResultPairing / CheckCompactFailure / CheckNormalizerError.
使用场景¶
ToolResultPairing 配对修复¶
pkg/engine/norm_tool_result_pairing.go
从早期方案 OrphanToolResultRemover 的单一清理升级为 4 种 case 的完整修复.Priority 8,在 OrphanToolResultRemover(10) 之前执行.
4 种 case¶
Case 1: tool_use 无 tool_result
原因: 会话中断、压缩截断、异常退出
修复: 注入合成 tool_result(追加到消息末尾作为 user 消息)
合成文案: "[Tool result not available - session may have been interrupted...]"
Case 2: tool_result 无 tool_use
原因: 消息历史损坏、压缩丢失 tool_use
修复: 委托给后续 OrphanToolResultRemover(Priority 10)处理
Case 3: 重复 tool_use ID
原因: 压缩合并、消息回放
修复: 去重(保留第一个)
Case 4: 重复 tool_result ID
原因: 网络重传、SDK 重试
修复: 去重(保留第一个)
执行阶段¶
阶段 1: 收集所有 tool_use 和 tool_result 的 ID
阶段 2: 第一遍扫描——去重 tool_use (case 3) 和去重 tool_result (case 4)
阶段 3: 第二遍扫描——注入缺失的 tool_result (case 1),标记孤立 tool_result (case 2)
阶段 4: 通过 Observer 记录修复详情,检查 StrictMode
诊断快照¶
buildDiagnosticDetail 只提取消息结构信息(角色,content 类型,ID),不序列化消息内容,可安全发到外部监控.
防御性编程的三层防御模式¶
模型交互点
│
┌───────────┼───────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 指令层 │ │ 参数层 │ │ 兜底层 │
│ │ │ │ │ │
│ 提示词中 │ │ API 参数 │ │ 代码中 │
│ 约束行为 │ │ 阻止行为 │ │ 处理异常 │
└─────────┘ └─────────┘ └─────────┘
应用位置:
| 交互点 | 指令层 | 参数层 | 兜底层 |
|---|---|---|---|
| 压缩 API 调用 | "不要调用工具" | 不传 tools 参数 | tool_use 当文本处理 |
| 子代理递归 | "不要使用 Agent" | 工具列表移除 Agent | 递归深度检测 |
| 工具输入 | description 说明格式 | InputSchema 约束 | json.Unmarshal 错误返回提示 |
| 权限超时 | 无 | context.WithTimeout | 默认拒绝 |
KAIROS/Dream 系统的架构位置¶
AutoDream(记忆巩固)是 KAIROS 系统的核心组件,与自进化系统(Evolve)直接关联:
Engine
│
┌────────────┼────────────┐
▼ ▼ ▼
Memory Evolve Dream (AutoDream)
Store System Engine
│ │ │
│ │ ├── 触发条件:时间(24h) + 会话数(5次) + 文件锁
│ │ │
│ │ ├── Phase 1: Orient(扫描现状)
│ │ ├── Phase 2: Gather(收集新信号)
│ │ ├── Phase 3: Consolidate(整合)
│ │ └── Phase 4: Prune(清理索引)
│ │ │
│ │ ▼
│◄─────────────┼──────── 整理后的记忆
│ │
│ ▼
│ 自进化基于整理后的记忆:
│ - Reflect 读取记忆做反思
│ - LearnSkill 基于项目记忆学习技能
│ - CreateTool 基于使用模式创建工具
│
▼
记忆文件 (markdown + frontmatter)
~/.flyto/projects/<hash>/memory/
Dream 与 Evolve 的关系:Dream 负责"整理记忆",Evolve 负责"基于记忆自我改进".Dream 的输出是 Evolve 的输入.
当前状态:Dream 系统已完整实现(模块 16).核心文件:
- pkg/engine/dream.go - DreamEngine(门槛,锁,fork,PeriodicInterval,SessionProvider)
- pkg/engine/dream_lock.go - FileLock(flock + mtime-as-state,ReadLockMtime)
- pkg/engine/dream_prompt.go - BuildConsolidationPrompt(叙事式 4 阶段)
- pkg/engine/dream_task.go - DreamTaskStore(任务状态 + Turns 滚动窗口)
- pkg/engine/subagent.go - RunSyncWithCallback(onTurn 回调,ToolUseInfo)
mtime-as-state 设计:lock 文件的 mtime = lastConsolidatedAt,消除 crash-between-dream-and-state-write 的重复触发 bug.flock(2) 负责互斥,mtime 负责持久状态,两者互补.
SessionProvider 接口:抽象会话数据源,FileSessionProvider 是 CLI 的 JSONL 实现,SDK/API 可注入自定义实现(数据库,消息队列等).
Plan Mode 系统(模块 17)¶
UltraPlan 是复杂任务的规划工作流.模型在实际编码前先探索代码库并设计方案,用户审批后再执行.
核心组件¶
EnterPlanModeTool ──→ PlanModeManager ──→ ExitPlanModeTool
│
┌──────┼──────┐
▼ ▼ ▼
PlanStore perms ApprovalPolicy
(File/Memory) (SetMode) (Func/Noop)
关键文件¶
pkg/engine/plan.go- PlanModeManager 状态机 + EnterPlanMode/ExitPlanMode 工具 + ApprovalPolicy 接口pkg/engine/plan_store.go- PlanStore 接口 + FilePlanStore(word slug)+ MemoryPlanStore
安全设计:文件读取防注入¶
ExitPlanMode 从 PlanStore 读计划内容,而非接受 plan 参数.如果允许参数传入,模型可以注入任意内容绕过用户审查.对应早期方案 ExitPlanModeV2Tool 的同等设计.
ApprovalPolicy 接口¶
抽象审批机制,不同部署方式注入不同实现:
- NoopApprovalPolicy - 自动批准(测试/bypass 场景)
- FuncApprovalPolicy - 函数回调(SDK 嵌入最常用)
- CLI 实现终端交互,SaaS 实现 WebSocket/webhook 审批
prePlanMode 恢复¶
进入 plan 模式前记录当前权限模式(default/accept_edits/bypass),退出时恢复. 用户拒绝计划时保持 plan 模式--让模型修改后再次提交.
PlanStep 依赖图(P1)¶
PlanStep.Deps []string 引用其他步骤 ID,消费方可做拓扑排序并行调度.
引擎只暴露结构,不做调度("叠加而非替换"原则).
异步计划队列(模块 20.3)¶
UltraPlan(模块 17)是交互式规划工作流,要求用户在场审批并等待执行. 对于"超大计划要干很久而发起方要继续干别的"场景,引入异步计划队列.
架构¶
客户端(CLI/另一个 Agent)
│ submit_plan / plan_status / plan_cancel / plan_list
│ JSON over UDS: /tmp/flyto-plan-{sessionID}.sock
▼
PlanCommandServer(请求-响应 UDS 服务端)
│ dispatch
▼
FilePlanQueue(状态机 + 持久化 + 执行循环)
│ PlanExecFunc(注入的执行函数)
▼
Engine.Run()(实际 Agent 执行)
│ 状态文件: ~/.flyto/plans/{plan-id}.json
▼
文件系统(客户端可直接读取 JSON 轮询进度)
关键设计¶
- 状态文件持久化:每个计划一个 JSON 文件,daemon 崩溃重启后
RecoverPending自动恢复 running→pending(at-least-once 执行语义) - 原子写入:write-then-rename 防止截断 JSON
- 串行执行:避免并发计划操作同一批文件产生冲突(可扩展为有 file-lock 检查的并行策略)
- 依赖反转:
FilePlanQueue不导入 Engine,只持有PlanExecFunc函数指针,测试可用 mock 替换 - per-step 状态:
onStepDone回调实时更新步骤状态,客户端轮询可见精确进度 - TTL 清理:终态计划文件 24 小时后自动删除,防止
~/.flyto/plans/无限膨胀
与 SubAgent 的区别¶
| 特性 | SubAgent | PlanQueue |
|---|---|---|
| 执行模式 | 同步(Orchestrator goroutine 阻塞等待) | 异步(fire-and-forget,立即返回 planID) |
| 适用场景 | Orchestrator 需要子任务结果才能继续 | 超大计划,发起方要继续做其他事 |
| 跨进程 | 否(同进程内 goroutine) | 是(UDS 跨进程通信) |
| 崩溃恢复 | 否 | 是(RecoverPending) |
配置¶
engine.Config{
EnablePlanQueue: true, // 启用
PlanQueueDir: "~/.flyto/plans/", // 可选,默认此路径
PlanQueueSessionID: "my-session", // 可选,默认时间戳
}
启用后 FLYTO_PLAN_SOCK 环境变量会被设置为 socket 路径,子进程可通过 SendPlanCommand 提交计划.
相关文件¶
pkg/engine/plan_queue.go- FilePlanQueue 状态机 + 持久化 + 执行循环pkg/engine/plan_command_server.go- PlanCommandServer UDS 请求-响应服务端 + SendPlanCommand 客户端辅助pkg/engine/plan_queue_test.go- 完整测试(含 RecoverPending,TTL 清理,端到端)
跨行业扩展的架构支撑¶
引擎的架构设计不绑定软件工程场景,以下机制支撑跨行业扩展:
┌─────────────────────────────────────────────────────┐
│ 行业无关的核心引擎 │
│ Engine + runLoop + Events + Session │
└──────────────┬──────────────────────────────────────┘
│
┌──────────┼──────────┬──────────┬──────────┐
▼ ▼ ▼ ▼ ▼
Tool 接口 权限规则 技能系统 系统提示 MCP 协议
│ │ │ │ │
│ 任何行业 │ 通用格式 │ 不假设 │ 行业内容 │ 任何行业
│ 的工具 │ prefix/ │ 编程场景 │ 通过 │ 的服务
│ 通过接口 │ glob/ │ │ FLYTO.md │ 通过协议
│ 接入 │ domain │ │ 注入 │ 接入
▼ ▼ ▼ ▼ ▼
医疗诊断 金融合规 供应链 法律文档 数据分析
工具集 规则 工作流 提示词 服务
关键设计:
- 工具系统:Tool 接口不假设任何行业.4 个方法(Name/Description/InputSchema/Execute)足以封装任何操作
- 权限规则:prefix: / glob: / domain: 语法通用,不绑定编程工具
- 技能系统:SkillDefinition 的 Steps 是文本描述 + 工具调用,不假设编程场景
- 系统提示:行业相关内容通过 FLYTO.md 注入(pkg/context/instructions.go),不硬编码进 prompts.go
- MCP 协议:4 种传输协议(stdio/sse/http/ws),任何行业的服务都能接入
叠加而非替换(Composite 层)¶
宪法第 8 条:所有可插拔接口支持多实现共存叠加.现实使用永远是交叉的.
核心思想¶
单一策略/处理器的设计假设用户处于单一场景(纯编程或纯仓储),但现实中 Agent 经常跨场景工作.Composite 层让多个实现叠加共存,而不是互相替换.
CompositePolicy(压缩策略叠加)¶
pkg/context/composite.go
codePolicy := &DefaultCodePolicy{} // 保留: 文件路径、函数名
warehousePolicy := &WarehousePolicy{} // 保留: 订单号、SKU
combined := NewCompositePolicy(codePolicy, warehousePolicy)
compressor.SetPolicy(combined)
// 压缩时同时保留文件路径和订单号
叠加规则:
| 方法 | 叠加策略 | 原因 |
|---|---|---|
| PreserveKeywords | 合并去重 | 不丢任何场景的关键词 |
| ScoreMessageImportance | 取最高分 | 宁可多保留不可多丢弃 |
| MaxRecentRoundsToKeep | 取最大值 | 保留更多上下文 |
| PreprocessForCompaction | 依次应用 | 前一个的裁剪影响后一个 |
支持运行时动态添加/移除策略:
CompositeHandler(权限处理器叠加)¶
pkg/permission/composite_handler.go
ch := NewCompositeHandler(
NamedHandler{Name: "cli", Handler: cliHandler, IsDecisionMaker: true},
NamedHandler{Name: "audit", Handler: auditHandler, IsDecisionMaker: false},
)
engine := permission.NewEngine(permission.ModeDefault, ch.Handle)
// 权限请求同时弹框给用户 + 记录审计日志
关键设计:决策者与观察者分离.IsDecisionMaker: true 的处理器参与决策(第一个有效响应胜出),IsDecisionMaker: false 的处理器仅观察/审计,所有处理器都会被执行.
Config 向后兼容¶
pkg/engine/engine.go
CompactionPolicies []CompactionPolicy-- 多策略自动叠加为 CompositePolicyPermissionHandlers []NamedHandler-- 多处理器自动叠加为 CompositeHandler- 旧字段
PermissionHandler仍然有效(向后兼容) - 空数组时使用默认值(DefaultCodePolicy / nil Handler)
接口合规表¶
| 接口 | 叠加实现 | Name() | 动态增删 |
|---|---|---|---|
| CompactionPolicy | CompositePolicy | composite(code+warehouse) | Add / Remove |
| permission.Handler | CompositeHandler | N/A (函数类型) | Add / Remove |
QueryChainTracking 查询链追踪¶
查询链追踪(模块 7.5),用 3 个字段追踪从用户请求到所有子 agent 的完整调用链.
核心结构¶
QueryChainTracking {
ChainId string // 整条链的唯一 ID(所有子 agent 共享)
Depth int // 调用深度(0=主查询, 1=子agent, 2=子子agent)
ParentAgentId string // 父 agent 的 ID(主查询为空)
}
工作流¶
用户请求 → Engine.runLoop → NewQueryChain() → chain{id="chain_xxx", depth=0}
↓
SpawnSubAgent(cfg.Chain=chain) → chain.Fork(sa.ID)
→ child{id="chain_xxx", depth=1, parent="subagent_xxx"}
↓
再次 Fork → grandchild{depth=2}
Observer 集成¶
所有 runLoop 中的关键 Observer 事件(api_call_complete,tool_executed,session_complete)自动注入链追踪字段:
| 字段 | 说明 | 用途 |
|---|---|---|
| query_chain_id | 链唯一 ID | BigQuery 按链分组 |
| query_depth | 调用深度 | 嵌套深度统计 |
| parent_agent_id | 父 agent ID(仅子链) | 构建调用树 |
设计决策¶
- 不修改 Observer 接口:通过
observeWithChain在调用侧 merge 字段,所有现有 Observer 实现零改动. - 可选追踪:
chain为 nil 时功能不受影响,追踪可渐进式接入. - 链 ID 可读性:用
chain_<timestamp>_<random>格式,比 UUID 更适合日志追踪.
设计决策记录¶
为什么选 Go¶
- 单二进制部署 -- 不需要 Node.js 运行时,
scp一个文件即可部署 - 并发原语 -- goroutine + channel 天然适合流式事件和工具并发调度
- 编译时类型安全 -- 事件类型用
type switch分发,比 TypeScript 的 union type 更可靠 - 零外部依赖可行 -- Go 标准库足够强大(HTTP server,JSON,正则,文件系统)
- 交叉编译 --
GOOS=darwin GOARCH=arm64 go build即可编译 Mac 版
为什么零外部依赖¶
- 减少供应链攻击面
- 避免依赖版本冲突
- 编译速度快(<3 秒)
- 二进制体积小
Go 标准库覆盖了所有需求:
- HTTP 路由:Go 1.22+ net/http 支持路径参数
- JSON 处理:encoding/json
- 正则搜索:regexp
- 文件操作:os + path/filepath
- 并发控制:sync + channel
- HTML 解析:手写的轻量级解析器(WebFetch 工具内)
- Hash 计算:crypto/sha256(文件缓存)
可选 CGO 功能:HEIC 图片解码¶
HEIC/HEIF/AVIF 是 iPhone 默认相机格式,仓储,现场巡检等场景需要支持. 引擎通过 build tag 实现按需 opt-in,不影响默认构建:
| 构建方式 | HEIC 支持 | 系统依赖 |
|---|---|---|
go build ./...(默认) |
降级(原样透传) | 无 |
go build -tags libheif ./... |
完整转换为 JPEG | apt install libheif-dev libde265-dev |
CGO_ENABLED=0 go build ./... |
降级(原样透传) | 无 |
SDK 消费方(如 flysafe)按场景选择:
- 运维场景:默认构建即可
- 仓储场景:-tags libheif + 服务器安装 libheif-dev
为什么自建 Bash AST 解析器¶
- 字符串分割无法正确处理引号内的空格,heredoc 中的命令,
$(())中的<< - 安全分析需要理解命令的结构(哪些是命令名,哪些是参数,哪些是重定向)
- 不追求完整的 Bash 兼容性,宽容解析够用即可
- 零外部依赖约束排除了使用第三方解析库
为什么用 channel 而不是 callback¶
for range ch比嵌套回调更易读- 天然支持背压(channel 满了会自动阻塞生产者)
- 消费层可以用
select同时处理多个事件源 - 关闭 channel 自动通知消费层事件流结束
为什么工具编排用分批而不是依赖图¶
- 模型 API 返回的工具调用是有序列表,没有显式的依赖关系
- 分批算法简单可靠:连续的安全工具并行,不安全的串行
- 实测中模型很少一次返回复杂的工具依赖链
为什么权限系统独立于工具¶
- 工具只关心"怎么做",权限引擎关心"能不能做"
- 同一个工具在不同模式下有不同的权限策略
- 权限规则可以跨工具统一管理(用户级 + 项目级 + 会话级)
- 方便单元测试(权限逻辑不依赖工具实现)
为什么自进化需要人类审批¶
- 安全边界:Agent 不能在没有人类审批的情况下改造自己
- 可追溯性:每个进化提案都持久化,可以回溯和撤销
- 渐进信任:先从只读进化开始(技能学习),逐步放宽到工具创建
为什么使用角色系统而不是直接传模型 ID¶
- 业务逻辑与模型选择解耦:更换模型只需修改角色映射
- 成本优化:不同用途使用不同成本的模型(摘要用 Haiku,对话用 Sonnet)
- 一处修改全局生效:
registry.SetRole(RoleFast, "new-haiku")即可全局切换
ToolCapability 安全协议¶
Agent Tool Safety Protocol -- 文件/数据库/API 三种操作统一为同一套能力声明.
三个安全等级¶
| Level | 能力 | 示例 | Agent 行为 |
|---|---|---|---|
| 0 | 无安全能力 | Bash | 调用前警告 |
| 1 | Reversible(可回滚) | API 调用 | Saga 补偿模式 |
| 2 | DryRun + Reversible | FileEdit, FileWrite | 先预览再执行,最安全 |
可选接口设计¶
Tool(必须实现)
├── MetadataProvider(可选) → 声明"工具是什么"
├── CapabilityProvider(可选)→ 声明"工具能做什么安全措施"
├── DryRunnable(可选) → 模拟执行
└── Reversible(可选) → 生成撤销信息
关键决策:全部使用可选接口(type assertion 检测),不修改 Tool 接口.现有的 10+ 个工具无需任何改动.
DryRun 工作流¶
Agent 想修改文件
→ 调用 FileEdit.DryRun() 获取 diff 预览
→ 分析 diff,确认修改合理
→ 调用 FileEdit.Execute() 真正执行
→ 自动生成 UndoInfo 存入 OperationLog
FileHistory 文件历史系统¶
内容寻址备份 + 按消息回滚.
内容寻址存储¶
存储路径:~/.flyto/history/<project-hash>/<content-hash>
备份时机¶
FileEdit.Execute()开头自动调用FileHistory.BeforeEdit()FileWrite.Execute()开头自动调用FileHistory.BeforeWrite()- 同一消息内多次编辑同一文件,只保留第一次修改前的状态
回滚机制¶
Engine.Rollback(messageID)
→ FileHistory.Rollback(messageID) // 物理层:恢复文件内容
→ OperationLog.RollbackMessage() // 逻辑层:倒序执行补偿操作
- 已有文件被修改:恢复到修改前的备份内容
- 新建的文件:回滚时删除
- maxSnapshots = 100(超过时淘汰最旧的快照)
OperationLog 统一操作日志¶
记录所有工具操作,支持按消息 ID 回滚.统一文件/数据库/API 三种操作的日志格式.
Saga 补偿模式¶
回滚时倒序执行所有补偿操作(和分布式事务的 Saga 模式一样):
- 如果某个补偿失败,继续执行后续补偿(best-effort)
- 不可逆操作(Irreversible=true)跳过,记录 ManualGuide 人工处理指南
- 无 UndoInfo 的操作(只读工具)自动跳过
与 FileHistory 的协同¶
| 组件 | 职责 | 层面 |
|---|---|---|
| FileHistory | 文件内容的物理备份 | 磁盘层面 |
| OperationLog | 所有操作的逻辑记录 | 编排层面 |
文件回滚时两者协同:OperationLog 提供顺序,FileHistory 提供内容.
安全审计体系(INF-5)¶
pkg/security/ + pkg/engine/audit_*.go
核心设计:翻转默认值¶
早期方案只保护 TeamMem 同步路径("显式开启").我们翻转:默认全路径扫描,显式豁免收窄.
FileWriteTool.Execute()
│
├─ SecretGuard.Scan(path, content)
│ ├─ 豁免路径?→ 放行
│ ├─ 超过 MaxScanBytes?→ ErrContentTooLarge(放行)
│ └─ 命中规则?→ return error Result(拦截)
│
└─ 正常写入流程
组件关系¶
pkg/security/
├── SecretGuard(接口)
│ └── DefaultSecretGuard(45条规则,sync.Once惰性编译)
├── AuditSink(接口)
│ ├── NoopAuditSink
│ └── CompositeAuditSink(叠加原则)
└── AuditEntry(跨行业可扩展,Extra map[string]string)
pkg/engine/
├── LocalAuditSink(JSONL追加,~/.flyto/audit.jsonl)
└── AuditObserver(EventObserver实现,桥接事件→审计记录)
监听:operation_recorded / secret_scan_blocked
导入方向(无循环)¶
pkg/security(零依赖)
↑
pkg/tools/builtin(注入 SecretGuard)
↑
pkg/engine(默认注入 DefaultSecretGuard、LocalAuditSink、AuditObserver)
引擎默认行为¶
New(cfg) 时若 cfg.SecretGuard == nil,自动注入 DefaultSecretGuard.
nil 表示"忘了配置",NoopSecretGuard{} 表示"明确关闭".两者含义不同,不可混淆.
// 默认保护(不设置 SecretGuard 即受保护)
engine.New(&engine.Config{...})
// 明确关闭(必须显式传入)
engine.New(&engine.Config{
SecretGuard: security.NoopSecretGuard{},
})
// 自定义规则
engine.New(&engine.Config{
SecretGuard: security.NewSecretGuardWithRules(
append(security.BuiltinRules(), internalRules...),
),
})
秘密规则¶
45 条来自 gitleaks(MIT 协议)的高置信度规则,覆盖:云服务商(AWS/GCP/Azure),AI API(Anthropic/OpenAI),版本控制(GitHub/GitLab),通信(Slack/Twilio),支付(Stripe/Shopify),私钥(PEM 格式)等.
只收录有独特前缀的规则,排除"关键词上下文"类规则(误报率过高).
跨行业扩展示例¶
// 仓储场景:库存变更审计
entry := security.AuditEntry{
ToolName: "Write",
Operation: "write",
Resource: "/warehouse/inventory.csv",
Outcome: "allowed",
Extra: map[string]string{
"sku": "SKU-001",
"qty_delta": "-20",
},
}
数据安全(文件/DB/API 三维度)¶
详细设计见 data-safety.md,本节只列核心决策和与引擎的接入点.
核心原则¶
AI 负责决策,业务系统 API 负责执行.AI 不直接写业务数据库.
三个操作维度¶
| 维度 | 引擎已有支持 | 数据安全文档补充 |
|---|---|---|
| 文件操作 | FileHistory 原子写 + OperationLog Saga 补偿 | - |
| 数据库操作 | AuditSink 接口 + ToolCapability.DryRunnable | Dry-run 流程,写操作分级,影子表 |
| API 调用 | Reversible 接口 + Saga 补偿 | Dry-run 验证后执行 |
写操作风险分级¶
| 级别 | 操作 | 处理 |
|---|---|---|
| L1 追加写 | 审计日志,状态推进 | 直接提交,异步审计 |
| L2 修改已有记录 | 订单状态,数量调整 | 业务不变量检查后提交 |
| L3 高风险写 | 金额,库存,关键配置 | 必须人工审批 |
| L4 不可逆删除 | 物理删除,DDL 变更 | 默认禁止,需 override |
Dry-run 设计要点¶
业务 API dry_run=true
→ BEGIN TX → 执行 SQL → 捕获 diff → ROLLBACK
→ diff 返回给调用方
→ ML 验证(事务外,无锁,毫秒后)
→ 通过 → 短事务 + CAS 乐观锁正式写入
→ 不通过 → Agent 重新规划
ML 验证在事务外执行是关键设计:事务只持有毫秒级,验证耗时不占锁.
四层防御¶
- 输出约束(SQL 白名单 + 业务规则预校验)
- Dry-run + ML 验证
- 熔断器(连续失败自动暂停 AI 写权限)
- 人工 kill switch(
permission.Handler)
引擎接口对接点¶
tools.DryRunnable/tools.Reversible/tools.CapabilityProvider(pkg/tools/tool.go)security.AuditSink/security.AuditEntry(pkg/security/audit.go)permission.Handler(pkg/permission/permission.go)engine.Config.AuditSink(pkg/engine/config.go)
消费层实现:SQL 解析器,ML 验证器,熔断器,影子表管理.
TokenBudgetManager 预算管理¶
Token 预算精细度管理(模块 7.4),集中管理上下文窗口的 token 预算计算.
混合估算法¶
不纯估也不纯用 API 返回值:
误差范围被限制在"最近几次工具调用"的粗估量,而非整个上下文.
Sibling 回溯¶
并行工具调用时消息被拆成多条 assistant 交替排列:
通过 api_response_id 元数据追踪,回溯到同一批次的第一个 assistant.
三种 token 计算¶
| 函数 | 公式 | 用途 |
|---|---|---|
| GetTokenCountFromUsage | input + cache_creation + cache_read + output | 压缩阈值检查 |
| GetBillingTokens | 按模型实际价格计算 | 成本报告 |
| GetFinalContextTokens | input + output(不含 cache) | 服务端预算对齐 |
有效窗口多层计算¶
模型窗口 (200K)
- 摘要输出预留 (min(maxOutput, 20K))
= 有效窗口 (180K~184K)
- Thinking 预算 (可选)
= Thinking 后有效窗口
- AutoCompactBuffer (13K) = 自动压缩阈值
- WarningBuffer (20K) = 黄色警告阈值
- ErrorBuffer (7K) = 红色警告阈值
模型切换检测¶
切换模型时立刻检查当前用量是否超过新模型的阈值,通过 Observer 事件通知消费层.
API 错误分类系统¶
模块 8.1,结构化 API 错误分类.核心思路:一次分类,多处消费.
架构¶
HTTP 响应 → ErrorClassifier.Classify() → *APIError(结构化)
├─ Category(20种枚举)
├─ RetryInfo(重试建议)
├─ TokenGap(压缩跳步量)
├─ Hint(用户可操作提示)
└─ Headers/Body(原始信息)
ErrorCategory 枚举(20种)¶
| 分类 | 分析标签 | 触发条件 |
|---|---|---|
| ErrAborted | aborted | 请求取消 |
| ErrTimeout | api_timeout | 超时/408 |
| ErrRateLimit | rate_limit | 429 |
| ErrOverloaded | server_overload | 529/overloaded_error |
| ErrPromptTooLong | prompt_too_long | prompt 超长 |
| ErrMediaTooLarge | media_too_large | 图片/PDF 超限 |
| ErrRequestTooLarge | request_too_large | 413 |
| ErrAuthentication | auth_error | 401/403 |
| ErrModelNotFound | model_not_found | 404 |
| ErrBilling | billing_error | 余额不足 |
| ErrServerError | server_error | 5xx |
| ErrConnection | connection_error | 网络错误 |
| ErrSSL | ssl_cert_error | SSL/TLS 证书 |
| ErrToolMismatch | tool_use_mismatch | tool_use 无 tool_result |
| ErrInvalidModel | invalid_model | 模型名无效 |
ErrorClassifier 接口¶
type ErrorClassifier interface {
Classify(statusCode int, headers http.Header, body []byte, cause error) *APIError
}
三个实现: - DefaultClassifier - 纯 HTTP 状态码,不解析供应商 header - AnthropicClassifier - 增强 Anthropic header(ratelimit/retry/reset)+ 诊断提示 - CompositeClassifier - 叠加多个分类器,第一个非 Unknown 胜出
DiagnosticHinter 接口¶
为 SSL/连接错误提供用户可操作的诊断提示: - 19 个 SSL 错误码 → 中文可操作提示 - DNS/超时/拒绝/重置 → 具体检查建议 - CompositeHinter 支持叠加
重试策略系统¶
模块 8.2,可组合的重试决策框架.
架构¶
APIError → RetryPolicy.ShouldRetry() → RetryDecision
├─ ForegroundOnly (后台 529 不重试)
├─ ServerDirective (服务端 x-should-retry)
├─ SubscriptionAwareRetry (Anthropic: 订阅 429 不重试)
├─ FastModeCooldown (Anthropic: 快速模式降级)
├─ ConsecutiveLimit (连续 N 次放弃)
├─ ModelFallback (Anthropic: 过载→模型降级)
└─ ExponentialBackoff (兜底退避)
RetryPolicy 接口¶
type RetryPolicy interface {
ShouldRetry(err *APIError, attempt int, ctx *RetryContext) *RetryDecision
}
返回 nil = "不表态",非 nil = 明确决策.CompositeRetryPolicy 按顺序评估,第一个非 nil 胜出.
Retryer 执行器¶
retryer := &retry.Retryer{
Policy: retry.NewAnthropicRetryPolicy(opts),
OverflowHandler: retry.DefaultOverflowHandler(),
OnRetry: func(err, attempt, delay, reason) { observer.OnEvent(...) },
}
err := retryer.Do(ctx, rctx, func(attempt int, rctx *RetryContext) error {
return client.CreateMessageStream(...)
})
通用策略 vs Anthropic 子集¶
| 策略 | 类型 | 说明 |
|---|---|---|
| ExponentialBackoff | 跨行业 | 500ms*2^n + 25%抖动,上限 32s |
| ForegroundOnly | 跨行业 | 后台过载直接失败,防级联放大 |
| ConsecutiveLimit | 跨行业 | 连续 N 次同类错误放弃 |
| ServerDirective | 跨行业 | 尊重 x-should-retry |
| ContextOverflowHandler | 跨行业 | max_tokens 溢出自动修正 |
| SubscriptionAwareRetry | Anthropic | 订阅用户 429 不重试(窗口级) |
| FastModeCooldown | Anthropic | 快速模式 429/529 降级 |
| ModelFallback | Anthropic | 连续过载 → 模型降级信号 |
API 预连接¶
模块 8.3,TCP+TLS 握手预热 + HTTP Transport 优化.
连接预热¶
transport := api.NewTransport(nil) // 默认配置:MaxIdlePerHost=2, HTTP/2
client := api.NewClient(key, url, api.WithTransport(transport))
preconn := api.NewPreconnector(url, transport) // 共享同一个 Transport
preconn.Warmup(ctx) // fire-and-forget HEAD,100-400ms 握手与初始化并行
DNS 预解析¶
cache := api.NewDNSCache(5 * time.Minute)
cache.Prefetch(ctx, "api.anthropic.com") // fire-and-forget
SSE 流守卫¶
模块 8.4,SSE 流边界情况检测 + 空闲看门狗.
架构¶
HTTP Body → parseSSE(裸解析)→ rawCh → StreamGuard.Watch() → guardedCh → 消费者
├─ 状态追踪(message_start/stop_reason/block_count)
├─ 空闲看门狗(90s 无事件 → 中止流)
├─ 停顿检测(>30s 间隔 → OnStall 回调)
├─ 空响应检测(无 message_start → EventError)
└─ 部分流检测(无 stop_reason → EventError)
检测的边界情况¶
| 情况 | 检测方式 | 行为 |
|---|---|---|
| 空响应(代理返回 HTML) | !HasMessageStart |
追加 EventError |
| 部分流(网络中断) | HasMessageStart && !HasStopReason |
追加 EventError |
| 空闲挂起(静默断开) | 看门狗计时器 90s | OnIdleTimeout + 追加 EventError |
| 停顿(网络抖动) | 事件间隔 > 30s | OnStall 回调(纯诊断) |
| Scanner 错误 | scanner.Err() |
追加 EventError |
| 合法空响应 | 有 stop_reason | 不误报 |
配置¶
client := api.NewClient(key, url, api.WithStreamGuard(&api.StreamGuardConfig{
IdleTimeout: 90 * time.Second,
StallThreshold: 30 * time.Second,
OnIdleTimeout: func() { observer.OnEvent("stream_idle_timeout", nil) },
OnStall: func(gap, count, total) { observer.OnEvent("stream_stall", ...) },
OnStreamEnd: func(stats) { log.Info("stream stats", stats) },
}))
版本兼容基础设施(INF-6)¶
pkg/engine/migrate.go + pkg/memory/migrate.go
设计原则¶
当前没有历史债(无 v1/v2 格式差异),INF-6 的价值是: 1. 防患于未然--为未来 breaking change 建立规范路径 2. 防降级数据损坏--旧引擎读到新格式文件时明确报错,而非静默损坏
两层版本¶
| 字段 | 类型 | 用途 |
|---|---|---|
FormatVersion int |
int 单调递增 | 格式迁移:LoadTranscript 决定是否迁移 |
EngineVersion string |
应用版本号 | 仅 audit,不参与迁移逻辑 |
早期方案 SerializedMessage.version 只存应用版本(如 "1.2.3"),两层混用无法区分"格式变了"和"版本升了".
MaxSupportedVersion 保护¶
LoadTranscript / ScanMemoryDir:
if version > maxSupportedVersion → 明确报错 / 跳过文件
if version == 0 (旧文件缺字段) → 规范化为 1
if version < current → 运行迁移函数(当前空表,no-op)
注册式迁移骨架¶
// 当前为空——无历史债
var transcriptMigrations = map[int]MigrateFunc{}
// 未来 v1→v2 迁移示例(在 migrate.go 中注册):
func init() {
transcriptMigrations[1] = func(t *Transcript) error {
// 就地修改字段,FormatVersion 由 migrateTranscript() 统一递增
return nil
}
}
Memory frontmatter 的 migrate.go 结构完全对称.
文件格式变化¶
旧 Transcript(无 format_version):
{"session_id": "x", "version": ..., ...} ← "version" 已弃用(只有旧文件有)
新 Transcript(INF-6 后):
{"format_version": 1, "engine_version": "1.0.0", "session_id": "x", ...}
记忆文件 frontmatter(INF-6 后):
---
name: xxx
description: xxx
type: user
version: 1 ← 新增,旧文件无此行(读取时规范化为 1)
---
多模型适配系统(PromptBundle,模块 15)¶
整体架构¶
Config.Scenario + Config.ModelFamily
↓
resolveBundleKey() ← 引擎每次 Run 时调用
↓
BundleRegistry.Resolve(key) ← 精确匹配,回退到 DefaultBundleKey
↓
PromptBundle
├── StaticSections() → SectionRegistry 缓存 → ephemeral cache block
└── DynamicSections() → SectionRegistry 缓存 → ephemeral / no-cache block
↓
Builder.BuildSystemPromptBlocks(ctx)
↓
[]SystemPromptBlock → buildAPIRequest → Anthropic API
BundleRegistry 查找策略¶
- 精确匹配:
BundleRegistry.Resolve({ModelFamily, Scenario}) - 回退默认:未找到时使用
DefaultBundleKey({claude, programming}) - 安全保证:永远不返回 nil(但 registry 为空时返回 nil,需检查)
内置 Bundle¶
| Bundle | Key | 静态段落 | 动态段落 |
|---|---|---|---|
DefaultBundle |
{claude, programming} |
9 个英文段落 | 6 个(env/tools/instructions 等) |
ChineseBundle |
{qwen, programming} 等 5 个 |
9 个中文段落 | 继承 DefaultBundle |
BundleOverlay 组合关系¶
DefaultBundle (base)
↑ 继承
BundleOverlay
├── overrides["intro"] = 自定义 Section
├── overrides["env_info"] = 自定义 Section
└── 其他 section → 透传到 base.StaticSections() / base.DynamicSections()
覆盖逻辑(applyOverrides):
- 遍历 base 的 section 列表,按 Name 查找 overrides 表
- 命中 → 替换为 override Section
- 未命中 → 透传 base Section(零分配路径:无覆盖时直接返回 base slice)
- override 中存在但 base 中不存在的 Name → 静默忽略(防止拼写错误引入幽灵 section)
per-request 切换(WithBundleKey RunOption)¶
Run(ctx, prompt, WithBundleKey(key))
↓
runConfig.bundleKey = &key
↓
buildSystemPromptWithContext(ctx, model, &key)
↓ overrideBundleKey != nil
builder.SetBundleKey(*key) ← 跳过 resolveBundleKey()
同一个 Engine 实例可以在不同请求间使用不同 Bundle, 无需为每个场景创建独立 Engine(节省资源,适合 SaaS 多场景部署).
Section 缓存生命周期¶
SectionRegistry(会话级)
├── 首次 Compute → 计算 + 写入缓存
├── 命中缓存 → 直接返回(零 IO)
├── CacheBreak=true → 每轮强制重算,绕过缓存
└── Reset() → 在 /clear 或 /compact 后清空,让下轮重新渲染
注意:BundleOverlay 的 override Section 与 base Section 共享同一个 SectionRegistry,
同名 section 的缓存条目由最后写入者决定(先英文后中文 → 中文覆盖).
多语言 Bundle 并存于同一 Engine 时,建议使用带前缀的 Name(如 "zh.intro").
权限动态注册(P0修复)¶
架构¶
tools.Metadata.PermissionClass = "file"
↓
tools.Registry.Register(tool)
↓
permission.RegisterToolClass(toolName, permClass)
↓
toolClassRegistry map[string]PermissionClass
↓
permission.CheckToolPermission(toolName, ...)
├── lookupToolClass(toolName) → 动态注册表查找(优先)
└── 未注册 → 旧版硬编码 switch(向后兼容)
PermissionClass 常量¶
| 常量 | 值 | 安全行为 |
|---|---|---|
PermClassBash |
"bash" |
AST 解析 + 风险评估 + 用户确认 |
PermClassFile |
"file" |
路径安全检查 + 沙箱验证 |
PermClassWebFetch |
"webfetch" |
URL 安全检查 |
PermClassReadOnly |
"readonly" |
直接通过,无需用户确认 |
PermClassGeneric |
"generic" |
要求用户批准 |
空字符串(未声明)→ 回退到旧版按工具名的硬编码逻辑(向后兼容).
压缩器依赖注入(P0修复)¶
问题根源¶
旧版 compact.go 用两个包级全局变量持有回调:
var contextWindowProvider func(string) int // ❌ 全局变量
var compactModelProvider func() string // ❌ 全局变量
多个 Engine 实例(多租户)共享这两个变量,最后注册的 Engine 会覆盖前者的配置.
修复方案¶
将回调迁移到 Compressor 实例字段,Engine 创建后注入:
Compressor struct {
contextWindowFn ContextWindowFunc // 实例级,互不干扰
compactModelFn CompactModelFunc // 实例级,互不干扰
httpClient CompactHTTPClient // 实例级,独立 HTTP 客户端
}
effectiveXxx() helpers 实现两级回退:
1. 实例字段非 nil → 使用实例配置
2. 实例字段为 nil → 使用全局默认(向后兼容单 Engine 场景)