配置指南 / Configuration Guide¶
Flyto Agent Engine 的多级配置系统,模型角色,权限规则,Hook 和 MCP 服务器配置.
目录¶
- 多级配置系统
- settings.json 完整字段
- 模型角色配置
- Prompt Caching 配置
- Extended Thinking 配置
- Effort / Fast Mode 配置
- 权限规则
- Hook 配置
- MCP 服务器配置
- Observer 可观测性配置
- StrictMode 严格模式配置
- NormalizePipeline 配置
- MemoryType 注册制配置
- RelevanceScorer 配置
- Dream 配置
- 环境变量
- 配置热更新
- 叠加而非替换(Composite 配置)
- FileHistory 文件历史配置
多级配置系统¶
配置按以下顺序加载,后面的覆盖前面的:
1. 内置默认值 (代码中硬编码)
│
▼
2. 用户级配置 ~/.flyto/settings.json
│
▼
3. 项目级配置 <project>/.flyto/settings.json
│
▼
4. 本地级配置 <project>/.flyto/settings.local.json
│
▼
5. 环境变量覆盖 FLYTO_MODEL, FLYTO_API_BASE_URL, ...
│
▼
6. CLI 参数覆盖 --model, --max-turns, ...
最佳实践:
- 用户级(
~/.flyto/settings.json):个人偏好,如默认模型,自定义指令 - 项目级(
<project>/.flyto/settings.json):团队共享配置,提交到版本控制 - 本地级(
<project>/.flyto/settings.local.json):个人覆盖,加入.gitignore
settings.json 完整字段¶
{
"model": "claude-sonnet-4-6",
"api_base_url": "https://api.anthropic.com",
"max_turns": 0,
"max_budget_usd": 0,
"verbose": false,
"custom_instructions": "",
"permissions": {
"default_mode": "default",
"allowed_tools": [],
"denied_tools": []
},
"mcp_servers": {},
"hooks": {}
}
字段说明¶
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
model |
string | "" |
默认模型 ID(必填,所有角色 fallback 到此值).通过 Provider 接口调用 |
api_base_url |
string | "" |
已废弃 - API 端点由 Provider 配置管理,不再通过 Config 设置 |
max_turns |
int | 0 |
最大对话轮次.0 表示无限制 |
max_budget_usd |
float | 0 |
最大花费(美元).0 表示无限制 |
verbose |
bool | false |
详细日志模式 |
custom_instructions |
string | "" |
自定义指令,追加到系统提示词之后 |
permissions |
object | 权限设置(见下方) | |
mcp_servers |
object | {} |
MCP 服务器配置(见下方) |
hooks |
object | {} |
Hook 定义(见下方) |
合并规则¶
- 标量字段(model, api_base_url 等):非零值覆盖
- 列表字段(allowed_tools, denied_tools):整体替换(不是追加)
- Map 字段(mcp_servers, hooks):按 key 合并,同 key 覆盖
模型角色配置¶
ModelRegistry¶
Flyto Agent Engine 通过角色系统管理不同用途的模型选择.业务逻辑引用角色而非模型 ID.
四种角色:
| 角色 | 常量 | 用途 | 默认模型 |
|---|---|---|---|
| Main | RoleMain |
主对话(用户交互,工具调用) | (无默认,fallback 到 Config.Model) |
| Fast | RoleFast |
快速任务(摘要,压缩,分类) | (无默认,fallback 到 Config.Model) |
| Thinking | RoleThinking |
深度推理(复杂分析) | (无默认,fallback 到 Config.Model) |
| Embed | RoleEmbed |
嵌入模型(语义搜索,预留) | (无默认) |
Phase 2 变更:
DefaultRoles已清空,引擎不再预设角色映射.所有角色解析通过ModelForRole统一 fallback 到Config.Model.消费者需自行配置角色映射或直接使用Config.Model.
通过代码配置¶
import "git.flytoex.net/yuanwei/git.flytoex.net/yuanwei/flyto-agent/pkg/config"
registry := config.NewModelRegistry()
// 切换主模型到 Opus
registry.SetRole(config.RoleMain, "claude-opus-4-6")
// 切换快速模型
registry.SetRole(config.RoleFast, "claude-haiku-4-5")
// 注册第三方模型(ModelConfig = flyto.ModelInfo 类型别名)
registry.Register("my-model", &config.ModelConfig{
ID: "my-model",
ContextWindow: 128000,
MaxOutputTokens: 8192,
InputPricePer1M: 1.0,
OutputPricePer1M: 5.0,
CacheReadPricePer1M: 0.1,
CacheWritePricePer1M: 1.25,
SupportsCaching: false,
SupportsThinking: false,
SupportsImages: false,
})
registry.SetRole(config.RoleMain, "my-model")
// 传给 Engine
agent, _ := engine.New(&engine.Config{
Models: registry,
// ...
})
预置模型定价¶
Phase 2 变更:引擎
DefaultModels已清空,不再硬编码任何模型定价.定价数据由各 Provider 的静态模型表提供,引擎通过flyto.ModelProvider.Models()获取.EstimateCost对未知模型返回 0.
flyto.ModelInfo新增CacheReadPricePer1M/CacheWritePricePer1M字段(原config.ModelConfig现为flyto.ModelInfo的类型别名).
详见 model-roles.md.
Prompt Caching 配置¶
Prompt Caching 通过缓存重复的系统提示和工具描述来节省成本.多轮对话中,静态内容只需首次付费写入缓存,后续读取只需 10% 原价.
启用方式¶
通过 API 请求的 BetaFeatures 配置:
req := &api.MessageRequest{
Beta: &api.BetaFeatures{
PromptCaching: true, // 启用 prompt caching
PromptCachingScope: "global", // 缓存范围:"global" 或 "org"
},
}
缓存标记¶
在系统提示的 content block 上添加 cache_control:
blocks := []api.SystemContentBlock{
{
Type: "text",
Text: "...静态系统提示词...",
CacheControl: &api.CacheControl{Type: "ephemeral"},
},
{
Type: "text",
Text: "...动态环境信息...",
// 不设置 CacheControl → 不缓存
},
}
自动缓存边界¶
引擎自动将系统提示分为静态和动态两部分:
- 静态(标记缓存):角色定义,行为准则,工具使用指南,Git 协议,代码质量准则
- 动态(不缓存):环境信息(cwd, platform, git status),技能列表,用户追加提示
Extended Thinking 配置¶
Extended Thinking 让模型在回答前进行更深入的推理.
req := &api.MessageRequest{
Thinking: &api.ThinkingConfig{
Type: "enabled", // "enabled" 或 "disabled"
BudgetTokens: 10000, // thinking token 预算(0=不限制)
},
Beta: &api.BetaFeatures{
ExtendedThinking: true,
},
}
支持 Extended Thinking 的模型:
| 模型 | SupportsThinking |
|---|---|
| claude-opus-4-6 | Yes |
| claude-sonnet-4-6 | Yes |
| claude-haiku-4-5 | No |
当 SupportsThinking 为 false 时,引擎不会发送 thinking 配置.
Thinking 与 Token 预算的关系: 当设置 BudgetTokens 时,TokenBudgetManager 会从有效上下文窗口中扣除该预算,确保压缩阈值不会过高导致 thinking 和内容争抢空间.
// 不设 thinking: 有效窗口 = 200K - 16K = 184K
// 设 thinking 10K: 有效窗口 = 200K - 16K - 10K = 174K
engine.TokenBudget().EffectiveContextWindowWithThinking("claude-sonnet-4-6", 10000)
Token 预算管理¶
TokenBudgetManager 集中管理上下文窗口的 token 预算计算.通过 Engine.TokenBudget() 访问.
常量¶
| 常量 | 值 | 说明 |
|---|---|---|
| AutoCompactBufferTokens | 13,000 | 自动压缩缓冲 |
| WarningThresholdBuffer | 20,000 | 黄色警告缓冲 |
| ErrorThresholdBuffer | 20,000 | 红色警告缓冲 |
| ManualCompactBuffer | 3,000 | 手动压缩缓冲 |
| MaxOutputTokensForSummary | 20,000 | 摘要输出预留 |
查询方法¶
tb := engine.TokenBudget()
// 有效窗口(扣除摘要预留)
effective := tb.EffectiveContextWindow("claude-sonnet-4-6") // 183616
// 自动压缩阈值
threshold := tb.AutoCompactThreshold("claude-sonnet-4-6") // 170616
// 含 thinking 的阈值
thresholdT := tb.AutoCompactThresholdWithThinking("claude-sonnet-4-6", 10000) // 160616
// 警告状态
state := tb.CalculateWarningState(currentTokens, "claude-sonnet-4-6")
// state.PercentLeft, state.IsAboveWarningThreshold, ...
// 模型切换检测
state := tb.OnModelSwitch("claude-opus-4-6", "small-model", currentTokens)
Effort / Fast Mode 配置¶
控制模型的推理深度和响应速度.
Effort 级别¶
| 级别 | 说明 | 适用场景 |
|---|---|---|
low |
减少推理深度,快速响应 | 简单问答,命令补全 |
medium |
适中推理 | 日常编程任务 |
high |
最深推理,最准确但最慢 | 复杂架构决策 |
Fast Mode¶
比 effort: low 更激进的加速模式:
Beta Headers 完整列表¶
| 功能 | Header 值 | BetaFeatures 字段 |
|---|---|---|
| Prompt Caching | prompt-caching-2024-07-31 |
PromptCaching |
| Caching Scope | prompt-caching-scope-2025-02-19 |
PromptCachingScope |
| Extended Thinking | extended-thinking-2025-01-24 |
ExtendedThinking |
| Fast Mode | fast-mode-2025-04-01 |
FastMode |
| Effort | effort-2025-04-01 |
Effort |
| Context Management | context-management-2025-03-01 |
ContextManagement |
| Structured Output | structured-output-2025-03-01 |
StructuredOutput |
| Task Budgets | task-budgets-2025-04-01 |
TaskBudgets |
多个 beta header 用逗号分隔,通过 anthropic-beta HTTP header 发送.
权限规则¶
权限模式¶
通过 permissions.default_mode 或 CLI --mode 设置:
| 模式 | 说明 |
|---|---|
default |
所有有副作用的操作询问用户 |
accept_edits |
自动接受文件编辑(Edit, Write),其他操作仍询问 |
bypass |
绕过所有权限检查(危险,仅限受信任环境) |
plan |
计划模式 -- 暂停所有工具执行,只生成计划 |
规则语法¶
规则字符串格式:ToolName 或 ToolName(content)
"Bash" 匹配 Bash 工具的所有调用
"Bash(prefix:npm)" 匹配以 "npm" 开头的 Bash 命令
"Bash(prefix:go test)" 匹配以 "go test" 开头的 Bash 命令
"Edit(/src/**)" 匹配 /src/ 目录下的文件编辑
"Edit(/home/user/**)" 匹配用户主目录下的文件编辑
"Read" 匹配 Read 工具的所有调用
"WebFetch(domain:docs.anthropic.com)" 匹配特定域名
"*" 匹配所有工具
内容类型自动推断:
| 内容格式 | 推断类型 | 说明 |
|---|---|---|
prefix:xxx |
前缀匹配 | Bash 命令前缀 |
domain:xxx |
域名匹配 | WebFetch 域名 |
/path/** |
路径 glob | 以 / 或 . 开头,或包含 * |
| 其他 | 前缀匹配 | 回退到前缀匹配 |
配置示例¶
{
"permissions": {
"default_mode": "default",
"allowed_tools": [
"Glob",
"Grep",
"Read",
"Bash(prefix:npm)",
"Bash(prefix:go test)",
"Bash(prefix:go build)",
"Bash(prefix:git status)",
"Bash(prefix:git diff)",
"Bash(prefix:git log)",
"Edit(/home/admin/projects/myapp/src/**)",
"WebFetch(domain:pkg.go.dev)"
],
"denied_tools": [
"Bash(prefix:rm -rf /)",
"Bash(prefix:sudo)",
"Bash(prefix:curl -X POST)",
"WebFetch(domain:internal.corp.com)"
]
}
}
规则优先级¶
规则来源有优先级,高优先级覆盖低优先级:
同一来源内,denied_tools 优先于 allowed_tools.
权限学习¶
当用户反复批准同类操作时,权限系统会通过 PermissionLearnEvent 建议添加永久规则.消费层可以展示建议提示,让用户一键添加到配置文件.
Hook 配置¶
Hook 定义格式¶
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
command |
string | (必需) | 要执行的 shell 命令 |
timeout |
int | 30 |
超时秒数 |
async |
bool | false |
是否异步执行(不阻塞主流程) |
14 种 Hook 类型¶
| Hook 类型 | 触发时机 | 执行方式 | 可阻止 | 环境变量 |
|---|---|---|---|---|
pre_sampling |
API 调用前(每轮) | 同步 | ✅ exit 2 终止本轮 | MODEL, TURN, MESSAGE_COUNT |
post_sampling |
API 响应后,工具执行前 | 异步 | ❌ | MODEL, TURN, INPUT_TOKENS, OUTPUT_TOKENS, STOP_REASON, RESPONSE_PREVIEW |
pre_tool_use |
工具执行前 | 同步 | ✅ exit 2 阻止 | TOOL_NAME, TOOL_INPUT |
post_tool_use |
工具执行成功后 | 同步 | ❌ | TOOL_NAME, TOOL_OUTPUT |
post_tool_use_failure |
工具执行失败后 | 同步 | ❌ | TOOL_NAME, TOOL_ERROR |
session_start |
会话开始 | 异步 | ❌ | SESSION_ID |
session_end |
会话结束 | 异步 | ❌ | SESSION_ID, TURN_COUNT, COST_USD |
permission_request |
请求权限时 | 同步 | ✅ JSON决策 | TOOL_NAME, TOOL_INPUT, MESSAGE |
permission_denied |
权限被拒绝 | 同步 | ❌ | TOOL_NAME, REASON |
stop |
Agent 停止 | 同步 | ✅ exit 1 | REASON |
notification |
发送通知 | 异步 | ❌ | MESSAGE |
config_change |
配置变更 | 同步 | ❌ | CHANGE_TYPE |
task_created |
子任务创建 | 同步 | ❌ | TASK_ID, TASK_TITLE |
task_completed |
子任务完成 | 同步 | ❌ | TASK_ID, TASK_STATUS |
所有类型均额外注入基础变量:
PROJECT_ROOT,PLATFORM,ARCH,CWD.
pre_sampling / post_sampling 详解¶
pre_sampling(模块 9.2)¶
在每轮 API 调用前同步触发,可通过 exit 2 阻止本轮 API 调用.
# 配置示例:限制每日 API 调用次数
{
"hooks": {
"pre_sampling": [
{
"command": "/usr/local/bin/check-quota.sh",
"timeout": 5
}
]
}
}
脚本可通过退出码控制行为:
- exit 0:放行,API 调用正常发起
- exit 2:阻止,本轮 API 调用终止,Agent Run() 返回(推送 WarningEvent{Code: "pre_sampling_blocked"})
- exit 1(或其他非零):同 exit 2 效果(任何非零退出都视为阻止,与 stop hook 一致)
注意:重试(网络失败导致的重试)只触发第一次 attempt 的 pre-sampling,不重复触发.
post_sampling(模块 9.2)¶
在 API 响应收集完毕,assistant blocks 写入消息历史之后异步 fire-and-forget 触发,不影响控制流.
# 配置示例:记录每次 AI 响应到审计日志
{
"hooks": {
"post_sampling": [
{
"command": "echo \"[$(date)] model=$MODEL turn=$TURN in=$INPUT_TOKENS out=$OUTPUT_TOKENS\" >> /var/log/ai-audit.log",
"async": true,
"timeout": 3
}
]
}
}
RESPONSE_PREVIEW:助手回复文本的前 500 字节(截断,防止超大响应撑爆环境变量).完整响应需使用 SDK 模式的 CallbackHandler.
Hook 执行策略¶
- 同步 Hook:按注册顺序执行,每个都会执行(即使前一个失败,fail-open)
- 异步 Hook:在引擎
rootCtx下后台执行,引擎Close()时自动终止(无孤儿 goroutine) - JSON 输出:如果 Hook 的 stdout 是有效 JSON,会被解析并可用于控制流.例如
permission_requestHook 返回{"decision": "allow"}可自动批准
配置示例¶
{
"hooks": {
"pre_tool_use": [
{
"command": "echo \"[$(date)] Tool: $TOOL_NAME\" >> /tmp/agent_tools.log",
"timeout": 5
}
],
"post_tool_use": [
{
"command": "if [ \"$TOOL_NAME\" = \"Bash\" ]; then echo $TOOL_OUTPUT | head -1; fi",
"timeout": 10
}
],
"session_end": [
{
"command": "notify-send 'Agent Session' \"Done: $TURN_COUNT turns, \\$$COST_USD\"",
"async": true,
"timeout": 5
}
],
"permission_request": [
{
"command": "if echo $TOOL_INPUT | grep -q 'npm install'; then echo '{\"decision\":\"allow\"}'; fi",
"timeout": 5
}
]
}
}
MCP 服务器配置¶
MCP (Model Context Protocol) 允许 Agent 连接外部工具服务器.
配置格式¶
{
"mcp_servers": {
"<server_name>": {
"name": "server_name",
"transport": "stdio",
"command": "command_to_start_server",
"args": ["arg1", "arg2"],
"url": "http://...",
"env": {
"KEY": "value"
}
}
}
}
| 字段 | 类型 | 说明 |
|---|---|---|
name |
string | 服务器名称(唯一标识) |
transport |
string | 传输协议:stdio / sse / http / ws |
command |
string | 启动命令(stdio 模式必需) |
args |
[]string | 命令参数 |
url |
string | 服务器 URL(sse / http / ws 模式必需) |
env |
map | 环境变量(传递给子进程或 HTTP headers) |
传输协议¶
| 协议 | 说明 |
|---|---|
stdio |
子进程模式.引擎启动 MCP 服务器进程,通过 stdin/stdout 交换 JSON-RPC |
sse |
Server-Sent Events 模式.通过 HTTP SSE 连接接收事件 |
http |
HTTP 模式.通过 HTTP POST 发送请求 |
ws |
WebSocket 模式.通过 WebSocket 双向通信 |
配置示例¶
{
"mcp_servers": {
"github": {
"name": "github",
"transport": "stdio",
"command": "mcp-server-github",
"args": [],
"env": {
"GITHUB_TOKEN": "ghp_xxxxxxxxxx"
}
},
"postgres": {
"name": "postgres",
"transport": "stdio",
"command": "mcp-server-postgres",
"args": ["--connection-string", "postgresql://localhost/mydb"]
},
"remote-tools": {
"name": "remote-tools",
"transport": "sse",
"url": "https://mcp.example.com/sse"
}
}
}
MCP 服务器提供的工具会自动注册到 Engine 的工具注册表中,和内置工具一样使用.如果注册后工具总数超过延迟加载阈值(15),MCP 工具可能被延迟加载,需通过 ToolSearch 发现.
Dream 配置¶
AutoDream(记忆巩固)系统已完整实现(模块 16).通过 DreamConfig 配置:
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
MinHours |
float64 | 24 |
两次 Dream 之间的最小间隔时间(小时) |
MinSess |
int | 5 |
触发 Dream 需要的最小会话数 |
SessionProvider |
SessionProvider |
nil | 提供会话列表(CLI 用 FileSessionProvider) |
TranscriptDir |
string | "" |
Transcript 目录路径(空 = 不提示 grep 示例) |
PeriodicInterval |
time.Duration | 0 |
定时触发间隔(SDK/API 长驻用;CLI 不需要设) |
触发条件:时间间隔 >= MinHours 且 会话数 >= MinSess 且 获得文件锁(flock).
扫描节流:10 分钟内不重复检查(内存变量,进程重启后重置).
mtime-as-state:lock 文件的 mtime = lastConsolidatedAt,进程 crash 后重启也能正确判断时间门槛.
Dream 四阶段(SubAgent 使用 Edit/Write/Bash 工具直接操作): 1. Orient - ls 记忆目录 + 读 MEMORY.md,了解现状 2. Gather - grep transcript 文件发现新信号(TranscriptDir 非空时) 3. Consolidate - 合并新信号到现有话题,转换相对日期,修正矛盾 4. Prune - 更新 MEMORY.md 索引,保持索引 <200行/<25KB
Plan Mode 配置(模块 17)¶
EnterPlanMode / ExitPlanMode 工具通过 PlanModeManager 注入 Engine.
注册方式¶
store := &engine.FilePlanStore{Dir: "~/.flyto/plans"} // CLI 场景
// 或
store := engine.NewMemoryPlanStore("") // SDK/API 场景
approval := engine.FuncApprovalPolicy{
Fn: func(ctx context.Context, event engine.PlanApprovalEvent) (bool, string, error) {
// 展示计划给用户,等待确认
fmt.Println(event.Plan)
// ... 等待用户输入 ...
return true, "", nil
},
}
manager := engine.NewPlanModeManager(store, approval, permEngine)
manager.SetSessionID(sessionID)
cfg := &engine.Config{
// 通过 Tools 字段注册 plan 模式工具(如需):
// 或在 registerBuiltinTools 中调用 NewEnterPlanModeTool/NewExitPlanModeTool
}
PlanStore 选项¶
| 实现 | 适用场景 | 说明 |
|---|---|---|
FilePlanStore |
CLI / 本地 SDK | 文件存储,word slug 命名,路径遍历防护 |
MemoryPlanStore |
SDK/API 嵌入 / 测试 | 内存存储,无文件系统依赖 |
ApprovalPolicy 选项¶
| 实现 | 适用场景 | 说明 |
|---|---|---|
NoopApprovalPolicy |
测试 / bypass | 自动批准所有计划 |
FuncApprovalPolicy |
SDK 嵌入 | 函数回调,最灵活 |
| 自定义实现 | SaaS / 多人审批 | 实现 ApprovalPolicy 接口 |
环境变量¶
| 变量 | 说明 | 对应配置字段 |
|---|---|---|
ANTHROPIC_API_KEY |
Anthropic API 密钥(仅 Anthropic Provider 使用,由 Provider 自行读取) | Provider.Config.APIKey |
FLYTO_MODEL |
覆盖默认模型 | model |
FLYTO_CUSTOM_INSTRUCTIONS |
自定义指令 | custom_instructions |
Phase 2 变更:
Config.APIKey,Config.BaseURL,Config.BearerAuth已删除.API 密钥和端点由各 Provider 在自己的 Config 中管理.FLYTO_API_BASE_URL不再生效.
环境变量在第 5 步覆盖配置文件的值,但会被 CLI 参数(第 6 步)覆盖.
配置热更新¶
Engine 支持运行时配置热更新,通过 config.Watcher 实现:
watcher := config.NewWatcher(cwd, 2*time.Second, func(settings *config.Settings) {
// 配置发生变化,更新引擎
fmt.Printf("Configuration updated: %+v\n", settings)
})
watcher.Start()
defer watcher.Stop()
监听的文件:
- ~/.flyto/settings.json
- <project>/.flyto/settings.json
- <project>/.flyto/settings.local.json
轮询间隔默认 2 秒,通过比较文件修改时间检测变化.配置变化时重新加载并合并所有级别的配置.
保存配置¶
// 保存到用户级配置
config.SaveSettings(config.ScopeUser, cwd, settings)
// 保存到项目级配置
config.SaveSettings(config.ScopeProject, cwd, settings)
// 保存到本地级配置
config.SaveSettings(config.ScopeLocal, cwd, settings)
不能保存到 default scope(内置默认值).
Observer 可观测性配置¶
Config.Observer 字段接受 EventObserver 接口实现.未设置时使用 NoopObserver(零开销空实现).
StderrObserver(开发调试)¶
agent, _ := engine.New(&engine.Config{
Observer: &engine.StderrObserver{
MinLevel: "debug", // "debug" / "info" / "warn" / "error",默认 "info"
// Output: os.Stderr, // 输出目标,默认 os.Stderr
},
})
输出格式:
2026-04-01T12:00:00Z [OBSERVE] event=tool_result_pairing_repaired repairs=[synthetic_tool_result:tu_001] repair_count=1
2026-04-01T12:00:00Z [OBSERVE:ERROR] err="API timeout" endpoint=messages
2026-04-01T12:00:00Z [OBSERVE:METRIC] api_latency_ms=1234.0000 model=claude-sonnet-4-6
CompositeObserver(多路复合)¶
同时发到多个后端(如 DataDog + 审计日志 + stderr):
observer := engine.NewCompositeObserver(
&engine.StderrObserver{MinLevel: "info"}, // 开发调试
&DataDogObserver{apiKey: "..."}, // 实时监控
&AuditLogObserver{path: "/var/log/audit"}, // 合规审计
)
agent, _ := engine.New(&engine.Config{Observer: observer})
CompositeObserver 的 Metric/Span 方法只转发到实现了 MetricObserver/TraceObserver 接口的子 observer,其他子 observer 静默跳过.
BufferedObserver(异步批量)¶
热路径上 Observer 调用不阻塞(适用于网络发送型后端):
// inner: 实际的发送方(如 DataDogObserver)
// batchSize: 批量大小(0 默认 100)
// interval: 刷新间隔(0 默认 1s)
// bufferSize: channel 缓冲区大小(0 默认 1000)
buffered := engine.NewBufferedObserver(inner, 100, time.Second, 1000)
defer buffered.Close() // 关闭时等待缓冲区刷新完毕
agent, _ := engine.New(&engine.Config{Observer: buffered})
背压策略:
- Event -- 非阻塞发送,缓冲区满丢弃(热路径不阻塞)
- Error -- 优先发送,缓冲区满时直接同步发到 inner(错误不能丢)
自定义 Observer 实现¶
实现 EventObserver 接口即可:
type MyObserver struct{}
func (o *MyObserver) Event(name string, data map[string]interface{}) {
// 发到你的监控系统
}
func (o *MyObserver) Error(err error, context map[string]interface{}) {
// 发到你的告警系统
}
// 可选:实现 MetricObserver 接收数值指标
func (o *MyObserver) Metric(name string, value float64, tags map[string]string) {
// 发到你的指标系统
}
StrictMode 严格模式配置¶
Config.StrictMode 字段控制引擎在异常情况下的行为.
安全评估环境¶
agent, _ := engine.New(&engine.Config{
StrictMode: &engine.StrictMode{
ToolResultPairing: true, // 配对异常 → panic
CompactFailure: true, // 压缩失败 → panic
NormalizerError: true, // 规范化异常 → panic
},
})
生产环境¶
字段说明¶
| 字段 | 控制范围 | true | false |
|---|---|---|---|
ToolResultPairing |
tool_use/tool_result 配对异常 | panic | 注入合成 tool_result + Observer 记录 |
CompactFailure |
上下文压缩失败 | panic | 降级到 MicroCompact + Observer 记录 |
NormalizerError |
消息规范化异常 | panic | 跳过异常步骤 + Observer 记录 |
StrictMode 与 Observer 协同:非严格模式下,每次"本应失败"的条件都通过 Observer.Event("strict_mode_would_fail", ...) 记录,便于事后分析.
NormalizePipeline 配置¶
消息规范化管道可以自定义步骤.
自定义步骤¶
实现 MessageNormalizer 接口并添加到管道:
type MessageNormalizer interface {
Name() string
Priority() int // 越小越先执行
Normalize(messages []query.Message) []query.Message
}
// 获取默认管道
pipeline := engine.DefaultNormalizePipeline()
// 追加自定义步骤
pipeline.Add(&SensorDataNormalizer{Priority: 35})
// 移除不需要的步骤
pipeline.Remove("ImageValidator")
内置 10 步(Priority 排序)¶
| Priority | 步骤 | 职责 |
|---|---|---|
| 5 | AttachmentReorderer | 附件消息上浮(暂未启用) |
| 8 | ToolResultPairingNormalizer | 完整的 tool_use/tool_result 配对修复(4 种 case),通过 Observer 记录 |
| 10 | OrphanToolResultRemover | 移除孤立 tool_result |
| 15 | ErrorContentStripper | 剥离错误内容噪音 |
| 18 | OrphanThinkingFilter | 过滤纯 thinking assistant 消息 |
| 20 | EmptyMessageFilter | 过滤空消息 |
| 22 | WhitespaceAssistantFilter | 过滤纯空白 assistant 消息 |
| 25 | ToolUseInputNormalizer | 规范化 tool_use 输入 |
| 30 | ConsecutiveRoleMerger | 合并连续同角色消息 |
| 50 | ImageValidator | 验证图片大小(默认 20MB) |
MemoryType 注册制配置¶
记忆类型从硬编码 4 种升级为分层注册制.
注册自定义类型¶
import "git.flytoex.net/yuanwei/git.flytoex.net/yuanwei/flyto-agent/pkg/memory"
// 继承默认注册表(内置 user/feedback/project/reference)
registry := memory.NewTypeRegistry(memory.WithParent(memory.DefaultTypeRegistry))
// 注册仓储场景类型
registry.Register(&memory.MemoryTypeInfo{
Name: "inventory_rule",
Scope: "team",
Description: "仓储管理规则和库存策略",
WhenToSave: "当了解到库存管理规则、补货策略、存储位置约定时",
HowToUse: "处理仓储相关任务时参考这些规则",
BodyStructure: "规则 → Why → How to apply",
SortOrder: 10,
})
分层继承¶
// 管理层(org 级)
orgReg := memory.NewTypeRegistry()
orgReg.Register(&memory.MemoryTypeInfo{Name: "sla_rule", Scope: "org", ...})
// 运营团队继承 org
teamReg := memory.NewTypeRegistry(memory.WithParent(orgReg))
teamReg.Register(&memory.MemoryTypeInfo{Name: "exception_pattern", Scope: "team", ...})
// 加盟仓继承全部
localReg := memory.NewTypeRegistry(memory.WithParent(teamReg))
localReg.Register(&memory.MemoryTypeInfo{Name: "warehouse_layout", Scope: "local", ...})
查询时 local 优先,沿 parent 链向上冒泡.注册只写入本级 local map,不污染上级.
多格式提示词¶
// 根据模型自动选择格式
format := memory.AutoPromptFormat("claude-sonnet-4-6") // → FormatXML
prompt := registry.FormatForPrompt(format)
// 手动指定格式
prompt := registry.FormatForPrompt(memory.FormatMarkdown)
RelevanceScorer 配置¶
记忆相关性评分器从包级函数升级为可插拔接口.
使用 CompositeScorer¶
import "git.flytoex.net/yuanwei/git.flytoex.net/yuanwei/flyto-agent/pkg/memory"
// 编程 + 仓储评分器加权组合
scorer := memory.NewCompositeScorer(
memory.WeightedScorer{Scorer: &memory.TextScorer{}, Weight: 0.7},
memory.WeightedScorer{Scorer: &WarehouseScorer{}, Weight: 0.3},
)
// 传入 SelectRelevant
results := memory.SelectRelevant(query, headers, 5, scorer)
使用 ExternalScorer(跨语言桥接)¶
// 启动外部 Python 评分进程 (M1 严格 DI: Executor 必填)
scorer, err := memory.NewExternalScorer(ctx, memory.ExternalScorerOptions{
Name: "warehouse",
Command: "python3",
Args: []string{"warehouse_scorer.py"},
Executor: execenv.DefaultExecutor{}, // 本地 CLI, 云端走 sandbox.Backend
})
if err != nil { ... }
defer scorer.Close()
// 加入 CompositeScorer
composite.Add(memory.WeightedScorer{Scorer: scorer, Weight: 0.3})
外部进程通信协议(JSON Lines):
- 请求:{"query": "...", "name": "...", "description": "...", "type": "..."}
- 响应:{"score": 0.85}
叠加而非替换(Composite 配置)¶
宪法第 8 条:所有可插拔接口支持多实现共存叠加.
压缩策略叠加¶
Config.CompactionPolicies 字段接受多个压缩策略,自动叠加为 CompositePolicy:
cfg := &engine.Config{
CompactionPolicies: []context.CompactionPolicy{
&context.DefaultCodePolicy{}, // 编程场景
&myWarehousePolicy{}, // 仓储场景
},
// ... 其他配置
}
如果不设置 CompactionPolicies,默认使用 DefaultCodePolicy(向后兼容).
CompactionPolicies 叠加规则:
| 方法 | 叠加策略 | 原因 |
|---|---|---|
| PreserveKeywords | 合并去重 | 不丢任何场景的关键词 |
| ScoreMessageImportance | 取最高分 | 宁可多保留不可多丢弃 |
| MaxRecentRoundsToKeep | 取最大值 | 保留更多上下文 |
| PreprocessForCompaction | 依次应用 | 前一个的裁剪影响后一个 |
支持运行时动态增删策略:
权限处理器叠加¶
Config.PermissionHandlers 字段接受多个权限处理器,自动叠加为 CompositeHandler:
cfg := &engine.Config{
PermissionHandlers: []permission.NamedHandler{
{
Name: "cli",
Handler: cliPermHandler,
IsDecisionMaker: true, // 参与决策
},
{
Name: "audit",
Handler: auditLogHandler,
IsDecisionMaker: false, // 仅观察/审计
},
},
// ... 其他配置
}
如果不设置 PermissionHandlers,回退到旧的 PermissionHandler 字段(向后兼容).
混合场景示例(编程 + 仓储)¶
// 仓储策略保留订单号和 SKU
type WarehousePolicy struct{}
func (p *WarehousePolicy) Name() string { return "warehouse" }
func (p *WarehousePolicy) PreserveKeywords() string {
return "order numbers, SKU codes, tracking IDs, delivery dates"
}
func (p *WarehousePolicy) ScoreMessageImportance(role, content string) float64 {
if containsOrderNumber(content) { return 0.9 }
return 0.3
}
func (p *WarehousePolicy) MaxRecentRoundsToKeep() int { return 8 }
func (p *WarehousePolicy) PreprocessForCompaction(msgs []context.CompactMessage) []context.CompactMessage {
return msgs // 仓储场景无需特殊预处理
}
// 叠加结果:
// 关键词: "file paths, function names, ..., order numbers, SKU codes, ..."
// 评分: max(编程评分, 仓储评分) -- 含订单号的消息不会因编程评分低而被压缩
// 轮数: max(5, 8) = 8 -- 仓储需要更多上下文
FileHistory 文件历史配置¶
文件历史系统在 Engine 初始化时自动启用,无需额外配置.
备份存储路径:~/.flyto/history/<project-hash>/<content-hash>
| 配置项 | 默认值 | 说明 |
|---|---|---|
| 最大快照数 | 100 | 超过时淘汰最旧的快照(FIFO) |
| 内容寻址 | 启用 | 相同内容不重复存储 |
| 备份路径 | ~/.flyto/history/ |
基于 cwd 的 SHA256 前 12 字符 |
回滚 API¶
// 按消息 ID 回滚所有文件修改
err := engine.Rollback(ctx, "turn-3")
// 查询是否可回滚 (推荐: 使用 FileHistoryView 窄接口)
can, files := engine.FileHistoryView().CanRollback("turn-3")
// 查询快照总数 (监控 / 诊断场景)
count := engine.FileHistoryView().SnapshotCount()
FileHistoryView vs FileHistoryRef 的选择:
Engine.FileHistoryView() FileHistoryView- 推荐,返回只读查询接口 (CanRollback/SnapshotCount),便于消费者 mock 测试且 API 稳定性强Engine.FileHistoryRef() *FileHistory- 保留用于向后兼容,暴露完整的*FileHistory具体类型.新代码应用FileHistoryView()
FileHistoryView 接口定义在 pkg/engine/file_history.go (L1186 修复, 2026-04-13).*FileHistory 通过 Go 结构化 typing 自动实现该接口,消费者无需任何 adapter 类型.
ToolCapability 安全协议¶
工具能力声明通过可选接口实现,无需全局配置.自定义工具可通过实现以下接口声明安全能力:
// 声明能力
func (t *MyTool) Capability() tools.ToolCapability {
return tools.ToolCapability{
DryRun: true, Reversible: true,
UndoMethod: "tool", UndoToolName: "MyUndoTool",
AffectedResources: []string{"database"},
}
}
// 模拟执行
func (t *MyTool) DryRun(ctx context.Context, input json.RawMessage) (*tools.DryRunResult, error) { ... }
// 生成撤销信息
func (t *MyTool) GenerateUndo(ctx context.Context, input json.RawMessage, result *tools.Result) (*tools.UndoInfo, error) { ... }
Query Chain Tracking 查询链追踪¶
查询链追踪自动为 Engine 的每次 Run 生成唯一链 ID,子 agent 通过 SubAgentConfig.Chain 继承父链.所有 Observer 事件自动注入 query_chain_id,query_depth,parent_agent_id 字段,无需额外配置.
// 自动工作——Engine.Run 内部创建 chain,SubAgent 自动 fork
events := engine.Run(ctx, "用户请求")
// 手动使用(高级场景)
chain := engine.NewQueryChain()
child := chain.Fork("parent_agent_id")
fields := child.EventFields() // {"query_chain_id": "chain_xxx", "query_depth": 1, "parent_agent_id": "parent_agent_id"}
API 错误分类配置¶
api.NewClient 默认使用 AnthropicClassifier(含 SSL 诊断).自定义供应商需替换分类器:
// 使用自定义分类器(如 OpenAI 适配器)
client := api.NewClient(key, url,
api.WithClassifier(&MyOpenAIClassifier{}),
)
// 叠加多个分类器(第一个给出明确分类的胜出)
comp := api.NewCompositeClassifier(
&MyCustomClassifier{}, // 优先
&api.AnthropicClassifier{ // 回退
Hinter: &api.DefaultHinter{},
},
)
client := api.NewClient(key, url, api.WithClassifier(comp))
消费者通过 errors.As 提取结构化错误:
var apiErr *api.APIError
if errors.As(err, &apiErr) {
tag := apiErr.AnalyticsTag() // "rate_limit", "ssl_cert_error", ...
if apiErr.IsRetryable() {
time.Sleep(apiErr.RetryDelay())
}
}
重试策略配置¶
使用 Anthropic 工厂创建优化策略:
policy := retry.NewAnthropicRetryPolicy(retry.AnthropicRetryOpts{
MaxRetries: 10,
IsSubscriber: func() bool { return isProUser },
IsEnterprise: func() bool { return isEnterprise },
IsFastMode: func() bool { return fastModeOn },
FallbackThreshold: 3, // 连续 3 次 529 触发模型降级
})
retryer := &retry.Retryer{
Policy: policy,
OverflowHandler: retry.DefaultOverflowHandler(),
}
自定义场景(如仓储)可组合通用策略:
policy := retry.NewCompositeRetryPolicy(
&retry.ForegroundOnly{},
&retry.ConsecutiveLimit{Category: api.ErrConnection, Limit: 5},
&retry.ExponentialBackoff{BaseDelay: 2*time.Second, MaxDelay: 60*time.Second, MaxRetries: 20},
)
Scratchpad 配置¶
模式选择¶
Config.ScratchpadDir 控制 Scratchpad 后端:
| 值 | 后端 | 适用场景 |
|---|---|---|
"" (默认) |
in-memory *Scratchpad |
单进程 Agent,生命周期绑定 Engine |
| 非空目录路径 | *FileScratchpad |
多 Worker 跨进程共享暂存区 |
in-memory 模式(默认)¶
数据存在内存中,Engine 关闭后自动释放.
文件持久化模式¶
- 数据持久化到目录,进程重启后仍然可读
- 多个 Worker 指向同一目录即可共享暂存区
- 每个条目存为
<SHA-256(key)>.json,原子写入(temp file + os.Rename) - TTL 支持:条目过期后惰性删除(Get/Keys 时触发)
工具接口¶
Agent 通过三个内置工具访问 Scratchpad:
| 工具 | 功能 |
|---|---|
scratchpad_write |
写入键值对,可选 TTL(秒) |
scratchpad_read |
读取指定 key |
scratchpad_list |
列出所有未过期 key |
ScratchpadStore 接口¶
自定义后端只需实现: