HTTP API 参考 / HTTP API Reference¶
Flyto Agent Engine HTTP Server 的完整 API 文档.
目录¶
概览¶
- 基础 URL:
http://{host}:{port} - 默认地址:
http://localhost:8080 - 所有请求和响应使用 JSON 格式(
Content-Type: application/json) - 流式端点使用 SSE(
Content-Type: text/event-stream) - 每个响应包含
X-Request-ID头 - 请求体最大 10MB
- Prompt 最大 100,000 字符
API 消费形态 / API Consumption Patterns¶
Flyto exposes three API shapes to consumers; different interfaces are grouped by mechanism, not by domain. Identify the shape you need, then read the corresponding interface sections below.
Flyto 对消费者暴露三种 API 形态, 不同接口按机制归类 (不按业务领域). 接入 前先判断你要的是哪种形态, 再找下面对应的接口章节.
形态一: 订阅 (push) — 引擎单向通知消费者¶
机制: 消费者预注册接口或打开流, 引擎按事件异步发送, 消费者不需要回应.
典型接口:
- HTTP SSE 事件流:
POST /v1/agent/run,POST /v1/sessions/{id}/messages返回text/event-stream, 事件类型见下 SSE 事件格式 章节. - Go SDK
flyto.EventObserver接口 (章节 Observer): 消费者实现Event(name, data) / Error / Metric三方法, 经engine.Config.Observer注入, 引擎内部 (compact / checkpoint / subagent / token_warning / permission 决策 / session_cost_threshold_crossed 等) 触发点 自动回调. - Go SDK
<-chan flyto.Event:Engine.Run/Session.Send返回的强类型 事件 channel, 消费者for evt := range ch+ type-switch 处理*TextEvent / *ToolUseEvent / *CheckpointSuggestedEvent / *InboxMessageEvent / *TeammateMessageReceivedEvent / *TurnStartEvent / *TurnEndEvent / *SubAgentStartEvent / *SubAgentEvent / *SubAgentEndEvent等. 子 agent 事件由父引擎工具派发时 ctx-embedded EventEmitter 透传到父 Run channel (详见下面 SSE 事件格式章节的 subagent_* 三个事件).
Breaking (2026-04-20): 直接消费 SubAgent.Run() 返回的 channel
(power-user 路径, 非 Engine.Run) 的 SDK 用户要注意: 该 channel 现
在发送裸类型事件 (*TextEvent 等), 不再包裹 *SubAgentEvent.
type switch case *SubAgentEvent: 现在在 sa.Run() channel 上拿
不到任何事件. SubAgentID 归属现在走父引擎 Engine.Run() 的
*SubAgentEvent 包装 (同步/worktree 模式) 或 observer 路径
(后台模式), 不在 sa.Run() 自己 channel 上重复暴露. 迁移: 消费
sa.Run() 的代码改为 type-switch 裸事件类型即可, 或迁到
Engine.Run() 主 channel 获得带 SubAgentID 的 wrapper shape.
何时用: 要实时知道引擎发生了什么 (UI 渲染 / 监控告警 / 审计流水). push 能告诉你"事件什么时候发生", pull 做不到.
形态二: 调取 (pull) — 消费者主动查询当前状态¶
机制: 消费者调用方法, 引擎同步返回当前数据快照.
典型接口:
- HTTP REST:
GET /v1/health,GET /v1/tools,GET /v1/sessions/{id}等. - Go SDK 快照方法:
Session.Stats() SessionStats— 会话 token/cost/turn/message 累计快照DenialTracker.Stats() DenialStats— 权限拒绝追踪器快照Classifier.Classify(ctx, req) (*ClassifyResult, error)— 权限 AI 分类器 返回含 Decision / Reason / Thinking / Stage / Usage / DurationMs 的结果FileStateCache.Stats() CacheStats— 文件缓存命中率统计pkg/git.Info(cwd) git.Info— git 仓库状态 (Branch / Dirty / Root 等)
何时用: 按需拿当前值 (渲染状态面板 / CLI 退出前打印摘要 / 做业务决策前 查询). pull 给你"现在是什么", push 给不了"现在的完整快照".
形态三: 同步回调 (callback) — 引擎阻塞等消费者决策¶
机制: 引擎把决策请求 (通常带完整 context struct) 交给消费者实现的 handler / policy 接口, 同步阻塞等 handler 返回结果, 再继续运行. 区别于 push 的是 引擎等你的答案; 区别于 pull 的是主动方是引擎而非消费者.
典型接口:
permission.Handler(经engine.Config.PermissionHandler注入): 工具 调用前决定 Allow / Deny / Ask, 消费者返回*Response含 Decision + 可选 UpdatedInput 改写.engine.ApprovalPolicy(经engine.Config.ApprovalPolicy注入):ExitPlanMode工具触发计划审批,PlanApprovalEvent含Approve(editedPlan)/Reject(reason)两个回调函数字段, policy 实现方通过调用字段回复决策.engine.ElicitationHandler(经engine.Config.ElicitationHandler注入): MCP 服务器运行时问用户动态输入,ElicitationRequest含字段定义, handler 返回 accept / decline / cancel 三选一.security.AuditSink.Write(entry): 每次需要审计的操作前引擎调, sink 实现方持久化AuditEntry到日志 / DB / 远端.pkg/hooksHook 执行器: PreToolUse / PostToolUse / PreCompact 等 14 种 钩子, 消费者在HookDef里注册 shell command 或 Go 回调.
何时用: 引擎要在继续前拿到决策 (安全把关 / 审批 / 动态输入 / 审计写穿). callback 是决策通道, 不是旁观接口.
选型速查 / Quick Chooser¶
| 需求 | 形态 | 推荐入口 |
|---|---|---|
| 实时看引擎在干啥 | 订阅 (push) | SSE / <-chan flyto.Event / EventObserver |
| 查询当前状态 | 调取 (pull) | REST GET / *.Stats() / Classify() / git.Info() |
| 拦截决策点 | 同步回调 (callback) | Handler / Policy / Sink / Hook 接口 |
为什么三者不互相替代¶
- push 做不到 "当前快照" — 它是时间序列; 如果你中途接入, 错过的事件丢了. 要快照仍得走 pull.
- pull 做不到 "首次触发" — 消费者不知道状态什么时候改变, 只能轮询 (浪费 资源 + 有延迟). 要"成本第一次跨 $1"这类语义只能靠 push.
- callback 做不到 "旁观" — 它是决策通道, 必须返回答案, 不适合只读监听 场景; 要看日志用 observer/SSE.
这三种形态在 Flyto 代码库和 SDK 里并存而不相互取代. 举个具体例子: 会话
统计信息 (SessionStats) 既有 Session.Stats() pull API (拿当前快照) 也有
session_cost_threshold_crossed observer 事件 (首次跨过 $1/$5/$10/$50/$100
时 push), 两者语义正交不重复 — 要状态调 Stats, 要告警订阅事件.
消费者实现清单对照¶
每个外部 API 表面对应的典型消费端实现位置 (仅示意, 具体以你的接入层为准):
| 接口 | CLI 消费端 | SaaS 消费端 | 测试 |
|---|---|---|---|
EventObserver |
stderr 打印 / TUI 渲染 | OpenTelemetry / 日志服务 | MockObserver |
<-chan flyto.Event |
终端着色输出 | WebSocket 推送前端 | drain 验事件序列 |
permission.Handler |
终端 y/n prompt | WebSocket 等前端点击 | 返回 Allow |
ApprovalPolicy |
终端审批 UI | 企业审批工作流 | NoopApprovalPolicy |
ElicitationHandler |
终端表单 | Web 表单 | NoopElicitationHandler |
AuditSink |
LocalAuditSink (JSONL) |
DB / 远端日志 | NoopAuditSink |
认证¶
通过 --bearer-token 启动参数启用 Bearer token 认证.启用后,所有请求(除 /v1/health 和 OPTIONS)必须携带:
# 启动时设置 token
flyto-server --bearer-token my-secret-token
# 请求时携带
curl -H "Authorization: Bearer my-secret-token" \
http://localhost:8080/v1/tools
认证错误响应:
// 401 - 缺少 Authorization 头
{"error": {"type": "authentication_error", "message": "missing Authorization header"}}
// 403 - token 无效
{"error": {"type": "permission_error", "message": "invalid bearer token"}}
错误格式¶
所有错误使用统一的 JSON 格式:
错误类型(error.type):
| type | HTTP 状态码 | 说明 |
|---|---|---|
invalid_request_error |
400 | 请求格式或参数错误 |
authentication_error |
401 | 缺少或无效的认证信息 |
permission_error |
403 | 权限不足 |
not_found_error |
404 | 资源不存在 |
conflict_error |
409 | 资源冲突(如会话已存在) |
rate_limit_error |
429 | 请求频率超限 |
internal_error |
500 | 服务器内部错误 |
结构化错误分类(Go SDK)¶
internal/transport 包(原 internal/api)提供结构化的 *APIError(实现 error 接口),支持 errors.As 提取:
var apiErr *api.APIError
if errors.As(err, &apiErr) {
switch apiErr.Category {
case api.ErrRateLimit:
delay := apiErr.RetryDelay()
// 等待后重试
case api.ErrPromptTooLong:
gap := apiErr.TokenGap // 溢出 token 数,用于压缩跳步
case api.ErrSSL:
hint := apiErr.Hint // "检查企业代理的 SSL 证书"
}
}
自定义错误分类器:
SSE 事件格式¶
流式端点(/v1/agent/run 和 /v1/sessions/{id}/messages)返回 SSE 格式:
每 15 秒发送心跳保持连接:
事件类型¶
text_delta -- 流式文本增量¶
text -- 完整文本块¶
thinking_delta -- 流式思考增量¶
thinking -- 完整思考块¶
tool_use -- 模型请求调用工具¶
tool_result -- 工具执行结果¶
event: tool_result
data: {"id": "toolu_01ABC", "tool_name": "Bash", "output": "total 28\ndrwxr-xr-x ...", "is_error": false}
tool_progress -- 工具执行进度¶
event: tool_progress
data: {"id": "toolu_01ABC", "tool_name": "Grep", "progress": 0.5, "detail": "scanning files..."}
permission_request -- 需要用户批准¶
event: permission_request
data: {"id": "perm_01XYZ", "tool_name": "Bash", "input": {"command": "npm install"}, "message": "Bash 请求执行: npm install"}
收到此事件后,客户端应通过 POST /v1/sessions/{id}/permissions/{perm_id} 回复.
turn_start -- 轮次开始¶
turn_end -- 轮次结束¶
event: turn_end
data: {"turn_number": 1, "input_tokens": 1234, "output_tokens": 567, "cost_usd": 0.0089}
done -- 运行正常结束¶
event: done
data: {"total_input_tokens": 5678, "total_output_tokens": 1234, "total_tokens": 6912, "cost_usd": 0.0234, "turn_count": 3}
stop_reason 可能为 "end_turn"(正常结束)或 "stop_sequence"(命中停止序列).
error -- 运行出错¶
subagent_start -- 子 agent 启动¶
event: subagent_start
data: {"subagent_id": "sa-1", "description": "explore-files", "cwd": "/tmp/worktree/abc123", "model": "claude-sonnet-4-6", "start_time_ms": 1712000000000}
父 agent 经 Agent / Skill / Team / Dream 工具 spawn 子 agent 时 emit,
让消费端 (TUI 树形视图 / 审计 sink / 成本统计) 知道子 agent 何时开工
以及领了什么任务. description / cwd / model 空串时 omitempty
省略.
subagent_event -- 子 agent 业务事件包装 (扁平合并)¶
子 agent 内部产生的 text / tool_use / tool_result / turn_start
/ turn_end / error 等事件由父引擎 ctx-embedded emitter 透传到父
Run channel, JSON payload 扁平合并 subagent_id + event_type +
内层事件的业务字段 (例如 tool_use 的 id / name / input). 消费
端一次 JSON.parse 就能读到子 agent 归属和业务字段, 无嵌套 unwrap.
subagent_end -- 子 agent 结束¶
event: subagent_end
data: {"subagent_id": "sa-1", "duration_ms": 3250, "status": "completed", "result": "找到 3 个文件..."}
子 agent 从任何退出路径 (正常完成 / 取消 / 错误 / 达到 maxTurns) 都
emit, 带最终 status (running / completed / failed /
cancelled). result 字段超过 2KB 截断并加 "..." 后缀避免 SSE
payload 撑爆. error 字段成功路径 omitempty 省略.
端点详情¶
GET /v1/health¶
健康检查端点.不需要认证,不受速率限制.
Response 200:
curl 示例:
POST /v1/agent/run¶
单次运行 Agent,返回 SSE 流式事件.无状态,不创建会话.
Request Body:
| 字段 | 类型 | 必需 | 说明 |
|---|---|---|---|
prompt |
string | 是 | 用户输入(最大 100,000 字符) |
model |
string | 否 | 覆盖默认模型 |
system_prompt |
string | 否 | 自定义系统提示词 |
max_turns |
integer | 否 | 最大轮次限制 |
stop_sequences |
[]string | 否 | 自定义停止序列(模型生成到此序列时立即停止) |
Response: SSE Stream
依次推送 turn_start → text_delta/tool_use/tool_result → turn_end → ... → done 事件.
当设置 stop_sequences 且模型输出匹配到停止序列时,done 事件的 stop_reason 为 "stop_sequence".
curl 示例:
curl -N -X POST http://localhost:8080/v1/agent/run \
-H "Content-Type: application/json" \
-d '{
"prompt": "查看当前目录下所有 Go 文件",
"max_turns": 5
}'
错误响应:
| 状态码 | 场景 |
|---|---|
| 400 | prompt 为空或超长 |
| 400 | JSON 解析失败 |
| 401 | 未认证 |
| 429 | 速率限制 |
POST /v1/sessions¶
创建新的对话会话.
Request Body:
| 字段 | 类型 | 必需 | 说明 |
|---|---|---|---|
session_id |
string | 否 | 自定义会话 ID(不提供则自动生成) |
Response 201:
curl 示例:
# 自动生成 ID
curl -X POST http://localhost:8080/v1/sessions
# 指定 ID
curl -X POST http://localhost:8080/v1/sessions \
-H "Content-Type: application/json" \
-d '{"session_id": "my-session"}'
错误响应:
| 状态码 | 场景 |
|---|---|
| 409 | session_id 已存在 |
GET /v1/sessions/{id}¶
获取会话信息.
Path Parameters:
| 参数 | 说明 |
|---|---|
id |
会话 ID |
Response 200:
curl 示例:
错误响应:
| 状态码 | 场景 |
|---|---|
| 404 | 会话不存在 |
DELETE /v1/sessions/{id}¶
关闭并删除会话.
Path Parameters:
| 参数 | 说明 |
|---|---|
id |
会话 ID |
Response 200:
curl 示例:
错误响应:
| 状态码 | 场景 |
|---|---|
| 404 | 会话不存在 |
POST /v1/sessions/{id}/messages¶
在会话中发送消息,返回 SSE 流式事件.自动携带会话历史上下文.
Path Parameters:
| 参数 | 说明 |
|---|---|
id |
会话 ID |
Request Body:
| 字段 | 类型 | 必需 | 说明 |
|---|---|---|---|
prompt |
string | 是 | 用户消息(最大 100,000 字符) |
model |
string | 否 | 覆盖默认模型 |
Response: SSE Stream
与 /v1/agent/run 相同的事件格式.
curl 示例:
# 第一轮
curl -N -X POST http://localhost:8080/v1/sessions/my-session/messages \
-H "Content-Type: application/json" \
-d '{"prompt": "查看项目结构"}'
# 第二轮(自动包含第一轮上下文)
curl -N -X POST http://localhost:8080/v1/sessions/my-session/messages \
-H "Content-Type: application/json" \
-d '{"prompt": "修复 main.go 中的编译错误"}'
错误响应:
| 状态码 | 场景 |
|---|---|
| 400 | prompt 为空或超长 |
| 404 | 会话不存在 |
POST /v1/sessions/{id}/permissions/{request_id}¶
回复权限请求.当 SSE 流推送 permission_request 事件时,客户端通过此端点告知引擎用户的决策.
Path Parameters:
| 参数 | 说明 |
|---|---|
id |
会话 ID |
request_id |
权限请求 ID(从 permission_request 事件获取) |
Request Body:
| 字段 | 类型 | 必需 | 说明 |
|---|---|---|---|
allow |
boolean | 是 | true=允许, false=拒绝 |
reason |
string | 否 | 决策原因 |
Response 200:
curl 示例:
curl -X POST http://localhost:8080/v1/sessions/my-session/permissions/perm_01XYZ \
-H "Content-Type: application/json" \
-d '{"allow": true, "reason": "user approved"}'
错误响应:
| 状态码 | 场景 |
|---|---|
| 404 | 权限请求不存在或已超时 |
| 409 | 权限请求已被回复 |
GET /v1/tools¶
列出所有可用工具.
Response 200:
{
"tools": [
{
"name": "Bash",
"description": "Executes a given bash command and returns its output...",
"input_schema": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The bash command to execute"
},
"timeout": {
"type": "integer",
"description": "Timeout in milliseconds (default 120000, max 600000)"
}
},
"required": ["command"]
}
}
],
"count": 12
}
curl 示例:
权限交互流程¶
当 Agent 需要执行需要用户批准的操作时,HTTP Server 通过 SSE + REST 实现异步权限决策:
Client Server Engine
│ │ │
│ POST /v1/sessions/s/messages │ │
│ ─────────────────────────────►│ engine.Run() │
│ │─────────────────────────────►│
│ │ │
│ SSE: turn_start │ │
│ ◄─────────────────────────────│ │
│ │ │
│ SSE: text_delta │ │
│ ◄─────────────────────────────│ │
│ │ │
│ │ PermissionHandler 被调用 │
│ │◄─────────────────────────────│
│ SSE: permission_request │ 创建 permCh[request_id] │
│ ◄─────────────────────────────│ 阻塞等待... │
│ │ │
│ (用户在前端审查并决策) │ │
│ │ │
│ POST /v1/.../permissions/rid │ │
│ {"allow": true} │ │
│ ─────────────────────────────►│ permCh[rid] <- reply │
│ 200 {"status": "ok"} │ ─────────────────────────────│
│ ◄─────────────────────────────│ Handler 返回 Allow │
│ │ ─────────────────────────────►
│ │ │
│ SSE: tool_result │ 工具执行完成 │
│ ◄─────────────────────────────│◄─────────────────────────────│
│ │ │
│ SSE: done │ │
│ ◄─────────────────────────────│ │
权限请求超时为 5 分钟,超时后自动拒绝.
Engine Config 新增字段(Go SDK)¶
HTTP Server 底层使用 engine.Config 初始化引擎.以下新增字段影响引擎运行时行为,HTTP 层间接受益.
Observer(可观测性)¶
type Config struct {
// ... 其他字段 ...
// Observer 可观测性接口(可选)。
// 结构化事件/指标/调用链流,给监控系统消费。
// nil 时使用 NoopObserver(零开销空实现)。
Observer EventObserver
// StrictMode 严格模式(可选)。
// 测试/安全评估环境下将静默修复升级为 panic。
// nil 表示关闭(生产环境推荐)。
StrictMode *StrictMode
}
EventObserver 接口:
| 方法 | 说明 |
|---|---|
Event(name, data) |
记录离散事件(配对修复,规范化异常等) |
Error(err, context) |
记录错误(含上下文,用于告警) |
可选接口(通过 type assertion 检测):
| 接口 | 方法 | 说明 |
|---|---|---|
MetricObserver |
Metric(name, value, tags) |
数值指标(延迟,token 用量等) |
TraceObserver |
SpanStart/SpanEnd |
调用链(分布式追踪) |
内置实现:NoopObserver(空操作),StderrObserver(stderr 输出),CompositeObserver(多路复合),BufferedObserver(异步缓冲).
StrictMode(严格模式)¶
type StrictMode struct {
ToolResultPairing bool // 消息配对异常:true=panic false=修复
CompactFailure bool // 压缩失败:true=panic false=降级
NormalizerError bool // 规范化异常:true=panic false=跳过
}
生产环境 StrictMode: nil(关闭),测试/安全评估环境按需开启.
中间件¶
HTTP Server 使用以下中间件链(从外到内):
- Panic Recovery -- 捕获 handler panic,返回 500 而非断开连接
- Request Logging -- 记录请求日志,生成
X-Request-ID - CORS -- 跨域支持(通过
--cors参数配置) - Rate Limiting -- 基于 IP 的速率限制(默认 60 次/分钟,通过
Retry-After头告知重试时间) - Bearer Auth -- Token 认证(通过
--bearer-token参数启用)
速率限制¶
- 默认每 IP 每分钟 60 次请求
- 使用滑动窗口算法(内存实现)
/v1/health和OPTIONS请求不受限- 超限时返回 429 和
Retry-After头
CORS¶
# 允许特定 origin
flyto-server --cors "http://localhost:3000,http://localhost:5173"
# 允许所有 origin
flyto-server --cors "*"
设置的 CORS 头:
- Access-Control-Allow-Origin
- Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS
- Access-Control-Allow-Headers: Content-Type, Authorization
- Access-Control-Expose-Headers: X-Request-ID
- Access-Control-Max-Age: 3600
安全审计 API(pkg/security)¶
SecretGuard 接口¶
type SecretGuard interface {
Scan(path, content string) ([]SecretMatch, error)
Redact(content string) string
}
内置实现:
// 默认(45 条内置规则,全路径扫描)
guard := security.NewDefaultSecretGuard()
// 自定义规则(内置 + 行业特有)
guard := security.NewSecretGuardWithRules(
append(security.BuiltinRules(), myRules...),
)
// 带豁免路径
guard := &security.DefaultSecretGuard{
ExemptPaths: []string{"testdata/", "/fixtures/"},
}
// 空实现(关闭扫描,必须显式传入)
guard := security.NoopSecretGuard{}
注入到文件工具:
tool := builtin.NewFileWriteToolWithGuard(history, guard)
tool := builtin.NewFileEditToolWithGuard(cache, history, cwd, guard)
AuditSink 接口¶
内置实现:
// 本地 JSONL 文件
path, _ := engine.DefaultAuditPath() // ~/.flyto/audit.jsonl
sink, err := engine.NewLocalAuditSink(path)
// 多路复合(叠加原则)
composite := security.NewCompositeAuditSink(sink1, sink2)
// 空实现
sink := security.NoopAuditSink{}
AuditObserver¶
将 EventObserver 事件桥接到 AuditSink:
auditObs := engine.NewAuditObserver(sink, sessionID)
obs := engine.NewCompositeObserver(mainObserver, auditObs)
监听事件:
- operation_recorded(status=success/failed)→ allowed/blocked
- secret_scan_blocked → blocked + reason=secret_detected:count=N(不记录规则名,防止攻击者枚举检测盲区)
AuditEntry 结构¶
type AuditEntry struct {
SessionID string // 会话 ID
TurnNumber int // 对话轮次
Timestamp time.Time // UTC 时间
ToolName string // "Write"/"Edit"/"Bash" 等
Operation string // "write"/"edit"/"read"/"execute"/"invoke"
Resource string // 文件路径、URL 等
ActorID string // 操作者(多用户场景)
Outcome string // "allowed"/"blocked"/"error"
Reason string // 拦截原因
Extra map[string]string // 跨行业扩展字段
}
Hook 系统 API(pkg/hooks)¶
HookType 枚举(14 种)¶
const (
// 采样生命周期(模块 9.2)
HookPreSampling HookType = "pre_sampling" // API 调用前,同步,可阻止(exit 非零)
HookPostSampling HookType = "post_sampling" // API 响应后,异步 fire-and-forget
// 工具生命周期
HookPreToolUse HookType = "pre_tool_use"
HookPostToolUse HookType = "post_tool_use"
HookPostToolUseFailure HookType = "post_tool_use_failure"
// 会话生命周期
HookSessionStart HookType = "session_start"
HookSessionEnd HookType = "session_end"
// 权限
HookPermission HookType = "permission_request"
HookPermissionDenied HookType = "permission_denied"
// 控制流
HookStop HookType = "stop"
// 通知 / 配置
HookNotification HookType = "notification"
HookConfigChange HookType = "config_change"
// 任务
HookTaskCreated HookType = "task_created"
HookTaskCompleted HookType = "task_completed"
)
HookDef 结构¶
type HookDef struct {
Command string // Shell 命令(CLI 模式)
Handler HookHandler // Go 回调(SDK 模式,不序列化)
WebhookURL string // HTTP 回调 URL(预留,API 模式)
Timeout int // 超时秒数,默认 30
Async bool // true = 异步执行,不阻塞主流程
Source string // 来源标识(空=全局,非空=插件名,模块 9.3 新增)
}
env builder 函数¶
| 函数 | Hook 类型 | 注入的 key |
|---|---|---|
BuildPreSamplingEnv(model, turn, msgCount, root) |
pre_sampling |
MODEL, TURN, MESSAGE_COUNT |
BuildPostSamplingEnv(model, turn, in, out, stopReason, preview, root) |
post_sampling |
MODEL, TURN, INPUT_TOKENS, OUTPUT_TOKENS, STOP_REASON, RESPONSE_PREVIEW(≤500B) |
BuildPreToolEnv(toolName, input, root) |
pre_tool_use |
TOOL_NAME, TOOL_INPUT |
BuildPostToolEnv(toolName, input, output, isError, root) |
post_tool_use[_failure] |
TOOL_NAME, TOOL_OUTPUT, TOOL_IS_ERROR |
BuildSessionEnv(sessionID, root, hookType) |
session_start/end |
SESSION_ID |
BuildPermissionEnv(toolName, reason, root, hookType) |
permission_* |
TOOL_NAME, PERMISSION_REASON |
BuildNotificationEnv(msg, level, root) |
notification |
NOTIFICATION_MESSAGE, NOTIFICATION_LEVEL |
BuildTaskEnv(taskID, desc, root, hookType) |
task_* |
TASK_ID, TASK_DESCRIPTION |
所有函数额外注入:
HOOK_TYPE,PROJECT_ROOT,PLATFORM,ARCH,CWD
退出码语义¶
| 退出码 | pre_tool_use / pre_sampling | stop |
|---|---|---|
0 |
放行 | 继续 |
1 |
放行(脚本自身出错,fail-open) | 阻止继续 |
2 |
阻止(有意拦截) | 阻止继续 |
pre_samplingexit 1 也会触发阻止(复用ParseStopHookResponse语义:任何非零退出 = 阻止).
Manager 插件级注册/注销(模块 9.3)¶
// 按来源移除指定 hook 类型的 hooks(source="" 移除全局 hooks)
func (m *Manager) UnregisterBySource(hookType HookType, source string)
// 移除某来源在所有 hook 类型中注册的全部 hooks
func (m *Manager) UnregisterAllBySource(source string)
HookDef.Source 约定
| Source 值 | 含义 |
|---|---|
"" |
全局 hook(来自 HooksConfig 或直接 Register 调用) |
| 非空字符串 | 插件名(来自 plugin.Host,值为 plugin.Plugin.Name) |
优先级由注册顺序保证:Engine.New() 先注册全局 hooks,后调用 syncPluginHooks(),
因此全局 hooks 始终先于插件 hooks 执行.
Engine 插件管理方法(模块 9.3)¶
// 加载插件(不自动启用)
func (e *Engine) LoadPlugin(p plugin.Plugin) error
// 启用已加载的插件,并同步其 hooks 到 hooksMgr
func (e *Engine) EnablePlugin(name string) error
// 禁用插件,并从 hooksMgr 移除其 hooks
func (e *Engine) DisablePlugin(name string) error
LoadPlugin调用pluginMgr.Load(),不注册 hooks(插件处于停用状态).EnablePlugin/DisablePlugin会触发syncPluginHooks()原子重建所有插件 hooks.syncPluginHooks()先UnregisterAllBySource清除旧状态,再重新注册已启用插件的 hooks, 防止 Enable/Disable 并发时出现幽灵 hooks.
plugin.HookDef → hooks.HookDef 字段映射
| plugin.HookDef 字段 | hooks.HookDef 字段 |
|---|---|
Type |
(作为 hookType 参数传入 Register) |
Command |
Command |
Timeout |
Timeout |
Async |
Async |
| plugin 名 | Source |
记忆同步 API(pkg/memory - 模块 10.2)¶
SyncAdapter 接口¶
type SyncAdapter interface {
Pull(ctx context.Context, localDir string) (pulled int, err error)
Push(ctx context.Context, localDir string, policy ConflictPolicy) (pushed int, err error)
IsAvailable() bool
}
ConflictPolicy¶
| 常量 | 值 | 适用场景 |
|---|---|---|
ConflictLocalWins |
0 | CLI 单用户,本地修改优先(默认) |
ConflictServerWins |
1 | API 无状态模式,服务器版本优先 |
ConflictMerge |
2 | Git 后端三路合并,冲突产生标记 |
ConflictFail |
3 | 冲突时返回 ErrSyncConflict,调用方处理 |
PullPolicy¶
| 常量 | 适用场景 |
|---|---|
PullOnSessionStart |
CLI 模式,session 生命周期内只 Pull 一次(默认) |
PullWithTTL |
SDK/API 模式,TTL 缓存,避免每次读都 Pull |
PullAlways |
强一致性场景,每次读前都 Pull |
PullNever |
离线/不需要同步 |
SyncConfig 工厂¶
// CLI 默认:session 开始 Pull 一次,本地胜
cfg := memory.DefaultSyncConfig()
// API 模式:TTL=5min,服务器胜
cfg := memory.APISyncConfig(5 * time.Minute)
WithSyncAdapter 选项¶
GitSyncAdapter¶
adapter := memory.NewGitSyncAdapter(memory.GitSyncOptions{
Mode: memory.GitModeStandalone, // 默认:独立 git repo
Remote: "origin",
Branch: "main",
CommitAuthorName: "Flyto Agent",
CommitAuthorEmail: "agent@flyto.local",
// M1 方案 β: Executor 必填, 本地 CLI 用 DefaultExecutor{},
// 云端 SaaS 由 platform 层传 sandbox.Backend.
Executor: execenv.DefaultExecutor{},
})
// 首次初始化(一次性)
adapter.InitRepo(ctx, localDir, "git@github.com:team/memory.git")
ConflictPolicy 映射(GitSyncAdapter)
| ConflictPolicy | git 操作 |
|---|---|
LocalWins |
git push --force-with-lease |
ServerWins |
git fetch && git reset --hard origin/branch |
Merge |
git pull --no-rebase(三路合并) |
Fail |
先检查 diverge,diverged 时返回 ErrSyncConflict |
NoopSyncAdapter(默认)¶
不设置 WithSyncAdapter 时,fileStore 使用内置 NoopSyncAdapter:
IsAvailable() 返回 false,所有同步逻辑完全跳过,零 overhead,向后兼容.
记忆自动提取 API(pkg/memory + pkg/engine - 模块 10.3)¶
MemoryExtractor 接口¶
type MemoryExtractor interface {
Name() string
ShouldExtract(turnCount, lastExtractTurn int) bool
// newMessageCount:自上次提取以来的新消息数,用于精准定位分析范围
BuildPrompt(existingMemories []*Entry, newMessageCount int) string
AllowedTools() []string
MaxTurns() int
}
DefaultCodeExtractor¶
内置编程场景提取器,通过 Config.MemoryExtractor 注入:
Store.Dir()¶
SubAgentConfig 新字段¶
| 字段 | 类型 | 说明 |
|---|---|---|
HistoryMessages |
[]api.RequestMessage |
预置父对话历史(提取 SubAgent 分析用) |
MemoryDirRestrict |
string |
非空时 Edit/Write 只允许写入此目录 |
提取生命周期¶
会话结束 → ShouldExtract 检查 → scheduleMemoryExtraction
├── 单飞:inProgress=true 时存入 pending(最新覆盖)
└── runMemoryExtraction
├── hasMemoryWritesSince → 主 agent 已写则跳过
├── BuildPrompt(existing, newMessageCount)
└── SpawnSubAgent(HistoryMessages=messages, MemoryDirRestrict=memDir)
NewFileStoreWithBaseDir(测试辅助)¶
store := memory.NewFileStoreWithBaseDir("/custom/path")
// 适合测试场景,绕过 ~/.flyto/projects/<hash>/memory/ 路径计算
记忆新鲜度 API(pkg/memory - 模块 10.4)¶
FreshnessConfig¶
type FreshnessConfig struct {
GlobalThreshold time.Duration // 超过此时长触发警告;0 = 总是警告
TypeOverrides map[string]time.Duration // 按记忆类型覆盖阈值
}
func DefaultFreshnessConfig() FreshnessConfig // GlobalThreshold = 24h
func (c FreshnessConfig) ThresholdFor(t Type) time.Duration
注入到 Engine¶
cfg := &engine.Config{
FreshnessConfig: func() *memory.FreshnessConfig {
fc := memory.DefaultFreshnessConfig()
return &fc
}(),
// 仓储场景:fc.TypeOverrides["project"] = 2 * time.Hour
}
工具函数¶
| 函数 | 用途 |
|---|---|
ShouldWarn(modTime, threshold) |
判断是否需要警告 |
FreshnessText(modTime, threshold) |
返回自然语言警告文本 |
FreshnessNote(modTime, threshold) |
返回 <system-reminder> 包裹的警告,空字符串表示不需要 |
IndexAnnotation(modTime, threshold) |
返回 MEMORY.md 行末注记(含前导空格),空字符串表示不需要 |
TruncateIndex(content) |
MEMORY.md 双重截断(200行/25KB),超出时追加 WARNING |
WithFreshness(FileStoreOption)¶
store := memory.NewFileStoreWithOptions(cwd,
memory.WithSecretGuard(guard),
memory.WithFreshness(memory.DefaultFreshnessConfig()),
)
设置后 UpdateIndex 自动使用 FreshnessConfig.ThresholdFor(type) 替代早期方案 30 天硬编码,
并在写入前调用 TruncateIndex.
多模型适配系统(PromptBundle)¶
PromptBundle 系统让引擎在不同模型族和业务场景下使用专门调优的系统提示词.
框架层(pkg/context)已内置英文编程 Bundle(claude+programming),
调用方通过注册自定义 Bundle 或使用中文 Bundle 适配国产模型.
核心概念¶
BundleKey{ModelFamily, Scenario}
↓
BundleRegistry.Resolve(key) -- 精确匹配,没有就回退到默认 Bundle
↓
PromptBundle
├── StaticSections() -- 静态文字,全局可缓存(角色定义、行为准则等)
└── DynamicSections() -- 动态计算,会话级缓存(环境信息、工具列表等)
BundleKey¶
// 按(模型族, 场景)唯一标识一个 Bundle
type BundleKey struct {
ModelFamily string // "claude", "qwen", "deepseek", "gpt", "gemini", "local"...
Scenario string // "programming", "warehouse", "medical", "general"...
}
// 便捷构造函数(engine 包导出,避免调用方 import pkg/context)
key := engine.BundleKeyFor("qwen", "programming")
ModelFamily 为空时引擎自动从模型 ID 推断:
- claude-sonnet-4-6 → "claude"
- gpt-4o → "gpt"
- gemini-1.5-pro → "gemini"
- 未知前缀 → "claude"(使用内置默认 Bundle)
注册自定义 Bundle¶
eng := engine.New(cfg)
// 1. 注册(引擎启动后任何时候都可以注册,支持热更新)
eng.RegisterPromptBundle(
engine.BundleKeyFor("qwen", "programming"),
myQwenBundle,
)
// 2. 全局生效:通过 Config 设置默认场景和模型族
cfg := &engine.Config{
Model: "qwen2.5-72b-instruct",
ModelFamily: "qwen", // 显式指定,或留空让引擎推断
Scenario: "programming",
}
eng := engine.New(cfg)
// 3. per-request 切换(同一 Engine 实例服务多场景)
for ev := range eng.Run(ctx, prompt,
engine.WithBundleKey(engine.BundleKeyFor("qwen", "programming")),
) {
// ...
}
内置中文 Bundle¶
import "git.flytoex.net/yuanwei/git.flytoex.net/yuanwei/flyto-agent/pkg/context"
// 预置 5 个中文模型族(qwen/deepseek/ernie/glm/hunyuan)+ programming 场景
// 静态段落:英文版提示词的完整中文翻译,语义一字不差
// 动态段落:继承 DefaultBundle(env_info、tool_descs、instructions 等)
// 方式一:一键批量注册
context.RegisterChineseBundle(eng.BundleRegistryRef())
// 方式二:逐个注册(可选择部分模型族)
eng.RegisterPromptBundle(
engine.BundleKeyFor("qwen", "programming"),
context.NewChineseBundle(),
)
// 查看预置的所有 key
keys := context.ChineseBundleKeys()
// [{qwen programming} {deepseek programming} {ernie programming} {glm programming} {hunyuan programming}]
BundleOverlay - 局部覆盖¶
接入新模型时,通常只需改 1-3 个段落,其余继承默认内容:
import "git.flytoex.net/yuanwei/git.flytoex.net/yuanwei/flyto-agent/pkg/context"
// 基于中文 Bundle,只改角色定义和环境描述
myBundle := context.NewBundleOverlay(context.NewChineseBundle()).
OverrideStatic("intro",
"你是企业智能助手,专注于仓储物流场景下的任务分析与执行。").
OverrideStatic("doing_tasks",
"用户主要请求你处理仓储调度、库存管理、入出库流程等物流任务...").
OverrideDynamic("env_info", func(ctx context.Context) string {
cwd := context.CwdFromCtx(ctx)
return fmt.Sprintf("# 运行环境\n工作目录:%s\n业务场景:仓储物流", cwd)
})
eng.RegisterPromptBundle(
engine.BundleKeyFor("qwen", "warehouse"),
myBundle,
)
四种覆盖方式:
| 方法 | 适用场景 | 说明 |
|---|---|---|
OverrideStatic(name, text) |
固定文字,不需要运行时计算 | 最常用;内容在引擎生命周期内不变 |
OverrideDynamic(name, fn) |
需要读取 cwd,model ID 等运行时值 | 会话级缓存;每次 /clear 或 /compact 后重算 |
OverrideVolatile(name, fn, reason) |
内容在会话中途可能变化 | 每轮强制重算;慎用,会打断 prompt 缓存 |
OverrideSection(section) |
需要精确控制 CacheBreak 等字段 | 最大灵活度 |
链式调用,支持任意多个覆盖:
overlay := context.NewBundleOverlay(base).
OverrideStatic("intro", "...").
OverrideStatic("doing_tasks", "...").
OverrideDynamic("env_info", myEnvFn).
OverrideVolatile("tool_status", myStatusFn, "MCP 连接状态每轮变化")
注意: 覆盖 base 中不存在的 section Name 会被静默忽略(不引入新段落).
若需要添加全新段落,请直接实现 PromptBundle 接口或使用 NewBundleFromFunc.
NewBundleFromFunc - 完全自定义¶
// 不继承任何 base Bundle,从头定义静态和动态段落
bundle := context.NewBundleFromFunc(
// 静态段落(不变内容)
func() []*context.Section {
return []*context.Section{
context.StaticSection("intro", "你是 X 系统的专属助手。"),
context.StaticSection("rules", "..."),
}
},
// 动态段落(运行时计算)
func() []*context.Section {
return []*context.Section{
context.DynamicSection("env", func(ctx context.Context) string {
return fmt.Sprintf("当前模型:%s", context.ModelIDFromCtx(ctx))
}),
}
},
)
WithBundleKey RunOption - per-request 切换¶
HTTP API 场景下,同一个 Engine 实例可以按请求切换 Bundle, 无需为每个场景创建独立的 Engine(节省内存和启动开销):
http.HandleFunc("/api/run", func(w http.ResponseWriter, r *http.Request) {
// 从请求头或路径读取场景
scenario := r.Header.Get("X-Scenario") // "programming" or "warehouse"
modelFamily := r.Header.Get("X-Model-Family") // "qwen" or "claude"
opts := []engine.RunOption{}
if scenario != "" && modelFamily != "" {
opts = append(opts, engine.WithBundleKey(
engine.BundleKeyFor(modelFamily, scenario),
))
}
for ev := range eng.Run(r.Context(), prompt, opts...) {
// 流式输出给客户端
}
})
若指定的 Bundle key 未注册,BundleRegistry.Resolve 自动回退到默认 Bundle(claude+programming),行为安全,不会 panic.
Context 注入值(DynamicSection 内可读取)¶
动态段落的 ComputeFn 接收 context.Context,可以读取以下注入值:
| 函数 | 说明 | 注入时机 |
|---|---|---|
context.CwdFromCtx(ctx) |
工作目录 | 每次 Run 前 |
context.ModelIDFromCtx(ctx) |
当前模型 ID | 每次 Run 前 |
context.ToolDescriptionsFromCtx(ctx) |
已注册工具列表 | 每次 Run 前 |
context.AppendPromptFromCtx(ctx) |
用户追加提示 | 每次 Run 前 |
context.EvolveFragmentFromCtx(ctx) |
进化能力提示片段 | 每次 Run 前(可选) |
context.PromptLanguageFromCtx(ctx) |
语言偏好(P2 预留) | 调用方手动注入 |
权限系统扩展(P0修复)¶
PermissionClass - 动态工具安全分类¶
第三方工具只需声明一个字符串字段,即可参与引擎的安全检查体系.
五个内置分类:
| 常量 | 值 | 行为 |
|---|---|---|
permission.PermClassBash |
"bash" |
走 Bash 命令安全分析(AST 解析 + 风险评估) |
permission.PermClassFile |
"file" |
走文件路径安全检查(沙箱目录 / 白名单) |
permission.PermClassWebFetch |
"webfetch" |
走 URL 安全检查 |
permission.PermClassReadOnly |
"readonly" |
只读操作,直接通过(无需弹窗) |
permission.PermClassGeneric |
"generic" |
通用检查(要求用户批准) |
在工具的 Metadata() 方法中声明:
func (t *MyTool) Metadata() tools.Metadata {
return tools.Metadata{
ConcurrencySafe: true,
ReadOnly: false,
Destructive: false,
SearchHint: "...",
// 声明安全分类,tools.Registry.Register() 会自动调用 permission.RegisterToolClass()
PermissionClass: permission.PermClassFile,
}
}
注册工具时自动生效:
registry := tools.NewRegistry()
registry.Register(myTool)
// ↑ 内部自动调用 permission.RegisterToolClass("MyTool", "file")
// 之后 permission.CheckToolPermission("MyTool", ...) 会走文件安全检查
手动注册(不走 tools.Registry 的场景):
// 注册
permission.RegisterToolClass("MySpecialTool", permission.PermClassBash)
// 注销(工具被卸载时)
permission.UnregisterToolClass("MySpecialTool")
压缩器配置¶
pkg/context.Compressor 通过 flyto.ModelProvider 接口统一所有 Provider,不再接受 Anthropic 专有的 apiKey/baseURL/bearerAuth 参数.
创建压缩器¶
// 通过 ModelProvider 创建(Provider 路径,推荐)
compressor := agentctx.NewCompressor(threshold, provider)
// nil provider:仅支持微压缩(MicroCompact),摘要生成不可用
compressor := agentctx.NewCompressor(threshold, nil)
SetContextWindowFn - 上下文窗口大小¶
// 注入:从 ModelRegistry 动态获取上下文窗口大小
reg := cfg.ModelRegistry()
compressor.SetContextWindowFn(func(modelID string) int {
return reg.ContextWindow(modelID)
})
// 若不注入,回退到全局函数(默认返回 DefaultContextWindow=200000)
SetCompactModelFn - 压缩使用的模型¶
// 注入:从配置动态获取 fast 模型(用于摘要压缩)
compressor.SetCompactModelFn(func() string {
return cfg.ModelForRole(config.RoleFast)
})
// 若不注入,回退到全局函数 agentctx.getCompactModel()
SetHTTPClient - HTTP 客户端注入¶
// 注入:使用带超时的独立客户端,与引擎主 API 客户端隔离
compressor.SetHTTPClient(&http.Client{
Timeout: 90 * time.Second,
})
// 若不注入,回退到 http.DefaultClient(不推荐在生产环境使用)
// 也可以注入带重试逻辑的客户端
compressor.SetHTTPClient(myRetryableHTTPClient)
CompactHTTPClient 接口:
// 任何实现了 Do 方法的类型都可以注入
type CompactHTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
// *http.Client 自动满足此接口
// 自定义重试客户端示例:
type RetryClient struct{ inner *http.Client }
func (r *RetryClient) Do(req *http.Request) (*http.Response, error) {
// 重试逻辑...
}
AnthropicAPIVersion 常量(避免多处维护):
完整的 Engine 初始化示例¶
// engine.go 内部自动完成以下配置,外部调用方无需手动操作
// 此示例仅供理解内部原理
// 构造时注入 Provider(代替旧的 apiKey/baseURL/bearerAuth)
compressor := agentctx.NewCompressor(compactThreshold, cfg.Provider)
// 实例级上下文窗口 + 压缩模型(多 Engine 实例互不覆盖)
reg := cfg.ModelRegistry()
compressor.SetContextWindowFn(func(m string) int { return reg.ContextWindow(m) })
compressor.SetCompactModelFn(func() string { return cfg.ModelForRole(config.RoleFast) })
凭据注入 API(SetSecret / WithSecret)¶
凭据(API Key,数据库密码,Token)注册后,引擎自动完成两件事:
1. 注入 Bash 工具子进程 env(工具脚本通过 $NAME 引用)
2. 对所有工具输出进行 value-level 脱敏([SECRET:NAME]),防止明文进入事件流和模型上下文
引擎级注册(所有 Run 共享)¶
eng, _ := engine.New(cfg)
// 注册凭据(name 建议大写,value 必须 ≥8 字节)
eng.SetSecret("DB_PASSWORD", os.Getenv("DB_PASSWORD"))
eng.SetSecret("MY_API_KEY", os.Getenv("MY_API_KEY"))
// 后续所有 Run 自动脱敏 + env 注入
for ev := range eng.Run(ctx, "查询数据库并返回结果") {
// ToolResultEvent.Output 中的 DB_PASSWORD 值已被替换为 [SECRET:DB_PASSWORD]
}
请求级注册(per-request 隔离,适合 HTTP API 多租户)¶
http.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
tenantToken := r.Header.Get("X-Tenant-Token")
for ev := range eng.Run(ctx, prompt,
engine.WithSecret("TENANT_TOKEN", tenantToken), // 仅对本次 Run 有效
) {
// ...
}
})
Bash 工具中使用凭据¶
# 工具脚本通过 $NAME 引用凭据,不需要在命令中硬编码密码
curl -H "Authorization: Bearer $MY_API_KEY" https://api.example.com/data
SecretStore 错误处理¶
if err := eng.SetSecret("SHORT", "abc"); err != nil {
// error: secret "SHORT" value too short (min 8 bytes): got 3
log.Fatal(err)
}
不可逆操作确认 API(CheckpointEvent / WithCheckpointHandler)¶
工具声明 RequiresCheckpoint: true 后,引擎在执行前自动暂停,通过 CheckpointHandlerFn 等待确认.
工具声明层¶
func (t *SendEmailTool) Metadata() tools.Metadata {
return tools.Metadata{
Name: "send_email",
RequiresCheckpoint: true, // 执行前必须经过 handler 确认
}
}
SDK 嵌入(同步确认)¶
for ev := range eng.Run(ctx, prompt,
engine.WithCheckpointHandler(func(evt engine.CheckpointEvent) bool {
fmt.Printf("⚠️ 工具 %q 请求执行,参数:%v\n继续?[y/N] ", evt.ToolName, evt.Input)
var s string
fmt.Scan(&s)
return s == "y" || s == "Y"
}),
) {
// ...
}
HTTP API(通过 CheckpointEvent 通知前端)¶
// CheckpointEvent 通过 EventChannel 推送,HTTP SSE 适配层可以:
// 1. 将 checkpoint 事件序列化为 SSE 发送给浏览器
// 2. 等待浏览器发来 ACK 请求(通过另一个 HTTP 端点或 WebSocket)
// 3. 调用 handler 回调(handler 内部等待 ACK 信号量)
ackCh := make(chan bool, 1)
for ev := range eng.Run(ctx, prompt,
engine.WithCheckpointHandler(func(evt engine.CheckpointEvent) bool {
sendSSEToClient(evt) // 推送给浏览器
return <-ackCh // 阻塞等待浏览器 ACK
}),
) {
if chkEvt, ok := ev.(*engine.CheckpointEvent); ok {
// 可以在这里额外处理,如记录审计日志
_ = chkEvt
}
}
默认行为(未注册 handler)¶
未注册 WithCheckpointHandler 时,所有 RequiresCheckpoint=true 的工具调用自动被拒绝,
模型收到 "error: tool execution denied at checkpoint" 并可以在后续轮次中调整策略.
这是 deny-safe 原则:宁可误拒绝,不可在用户不知情的情况下执行不可逆操作.
CheckpointEvent 字段说明¶
type CheckpointEvent struct {
ToolCallID string // 工具调用 ID(对应 ToolUseEvent.ID)
ToolName string // 工具名称
Input map[string]interface{} // 工具输入参数(已脱敏)
Message string // 人类可读的确认描述
}
模型提供商(pkg/providers)¶
引擎通过 flyto.ModelProvider 接口隔离底层 API,当前内置 7 个 provider.
提供商一览¶
| Provider | 包路径 | 协议 | 说明 |
|---|---|---|---|
anthropic |
pkg/providers/anthropic |
Anthropic SSE | Claude 系列,支持 thinking / caching / batch |
openai |
pkg/providers/openai |
OpenAI Chat | GPT 系列,Chat Completions 流式 |
openrouter |
pkg/providers/openrouter |
OpenAI 兼容 | 聚合路由,透传底层模型 |
minimax |
pkg/providers/minimax |
MiniMax 原生 | MiniMax-M2 系列,支持 thinking / caching |
gemini |
pkg/providers/gemini |
Gemini SSE | Google Gemini 2.5+,AI Studio + Vertex AI 双模式 |
ollama |
pkg/providers/ollama |
OpenAI 兼容 | 本地部署,零网络依赖 |
lmstudio |
pkg/providers/lmstudio |
OpenAI 兼容 | LM Studio 本地部署 |
Gemini Provider(pkg/providers/gemini)¶
Google Gemini 是第三种 SSE 协议,与 Anthropic / OpenAI 均不同:每块是完整 GenerateContentResponse,thinking 用 thought: true 标记,functionCall 完整到达无需拼接 arguments.
Config 字段:
type Config struct {
APIKey string // Google AI Studio API Key(aistudio.google.com)
BearerToken string // Vertex AI GCP OAuth2 Bearer token(非空时切换 Vertex AI 模式)
BaseURL string // 覆盖 API 端点(可选)
ThinkingBudget int // 扩展思考预算,Gemini 2.5+ 支持;0 = 禁用
HTTPClient *http.Client // 自定义 HTTP 客户端(代理、超时等)
ModelOverrides []flyto.ModelInfo // 覆盖静态模型表(新模型热更新)
}
两种接入模式:
| 模式 | BearerToken | APIKey | 适用场景 |
|---|---|---|---|
| Google AI Studio | 空 | 必填 | 开发测试,免费额度慷慨 |
| Vertex AI | 必填 | 忽略 | 企业级 SLA,VPC Service Controls |
使用示例:
import "git.flytoex.net/yuanwei/git.flytoex.net/yuanwei/flyto-agent/pkg/providers/gemini"
// Google AI Studio 模式
provider := gemini.New(gemini.Config{
APIKey: os.Getenv("GEMINI_API_KEY"),
ThinkingBudget: 8000, // 启用扩展思考
})
// Vertex AI 模式
provider := gemini.New(gemini.Config{
BearerToken: os.Getenv("GCP_BEARER_TOKEN"),
BaseURL: "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/google",
})
eng := engine.New(&engine.Config{
Provider: provider,
Model: "gemini-2.5-pro",
})
内置模型(2026-04 更新,USD / 1M tokens):
| 模型 ID | 上下文窗口 | 输入价格 | 输出价格 | Thinking | Vision |
|---|---|---|---|---|---|
gemini-2.5-pro |
1M | $1.25 | $10.00 | ✓ | ✓ |
gemini-2.5-flash |
1M | $0.15 | $0.60 | ✓ | ✓ |
gemini-2.5-flash-thinking-exp |
1M | $0.15 | $0.60 | ✓ | ✓ |
注意:Vertex AI 定价与 Google AI Studio 可能不同,以 GCP 控制台为准.
Anthropic Provider(pkg/providers/anthropic)¶
Config 字段(关键项):
type Config struct {
APIKey string
BaseURL string // 默认 https://api.anthropic.com
ThinkingBudget int // 全局 thinking 预算;Request.NeedsThinking 为 per-request 开关
HTTPClient *http.Client
ModelOverrides []flyto.ModelInfo
}
支持 prompt caching(SupportsCaching=true),引擎自动在系统提示超过阈值时注入 cache_control.
OpenAI / OpenRouter / Ollama / LM Studio Provider¶
这四个 provider 均基于 OpenAI Chat Completions 协议(internal/wire.OpenAIClient),
Config 字段一致:
type Config struct {
APIKey string
BaseURL string // OpenAI: https://api.openai.com/v1;Ollama: http://localhost:11434/v1
HTTPClient *http.Client
ModelOverrides []flyto.ModelInfo
}
快速构建本地推理:
import "git.flytoex.net/yuanwei/git.flytoex.net/yuanwei/flyto-agent/pkg/providers/ollama"
// Ollama 本地(零网络依赖)
provider := ollama.New(ollama.Config{
BaseURL: "http://localhost:11434/v1",
})
eng := engine.New(&engine.Config{
Provider: provider,
Model: "llama3.3:latest",
})
MiniMax Provider(pkg/providers/minimax)¶
MiniMax 使用原生协议(非 OpenAI 兼容),支持 thinking 和 prompt caching.
import "git.flytoex.net/yuanwei/git.flytoex.net/yuanwei/flyto-agent/pkg/providers/minimax"
provider := minimax.New(minimax.Config{
APIKey: os.Getenv("MINIMAX_API_KEY"),
ThinkingBudget: 4096,
})
eng := engine.New(&engine.Config{
Provider: provider,
Model: "MiniMax-M2.7",
})
自定义 Provider¶
实现 flyto.ModelProvider 接口即可接入任意模型服务:
type MyProvider struct{}
func (p *MyProvider) Name() string { return "my-provider" }
func (p *MyProvider) Stream(ctx context.Context, req *flyto.Request) (<-chan flyto.Event, error) {
ch := make(chan flyto.Event, 32)
go func() {
defer close(ch)
// 调用 API,将响应转换为 flyto.Event 发送到 ch
// 支持:TextDeltaEvent / TextEvent / ThinkingDeltaEvent / ThinkingEvent
// ToolUseEvent / ToolResultEvent / TurnStartEvent / TurnEndEvent / ErrorEvent
}()
return ch, nil
}
func (p *MyProvider) Models(ctx context.Context) ([]flyto.ModelInfo, error) {
return []flyto.ModelInfo{
{ID: "my-model", DisplayName: "My Model", Provider: "my-provider",
ContextWindow: 128_000, MaxOutputTokens: 8_192},
}, nil
}
Transport 客户端(internal/transport)¶
原
internal/api已重命名为internal/transport,反映其通用 SSE 流式 HTTP 客户端定位.
ClientOption¶
transport.Client 不再有任何 Anthropic 默认值,所有配置通过 ClientOption 函数注入:
| Option | 说明 |
|---|---|
WithMessagePath(path) |
API 消息路径(如 "/v1/messages"),空字符串表示 baseURL 已包含完整路径 |
WithAPIVersion(ver) |
API 版本 header 值(如 "2023-06-01"),空字符串时不发此 header |
WithRetryPolicy(policy) |
自定义重试策略,nil 表示不重试 |
WithClassifier(c) |
自定义错误分类器 |
WithHTTPClient(hc) |
自定义 HTTP 客户端(代理,超时等) |
WithBearerAuth() |
切换为 "Authorization: Bearer" 鉴权格式 |
WithStreamGuard(cfg) |
自定义 SSE 流守卫配置 |
WithOverflowHandler(h) |
max_tokens 溢出修正器 |
WithTransport(t) |
共享 Transport(预连接 + 正式请求共享连接池) |
import "git.flytoex.net/yuanwei/git.flytoex.net/yuanwei/flyto-agent/internal/transport"
client := transport.NewClient(apiKey, baseURL,
transport.WithMessagePath("/v1/messages"),
transport.WithAPIVersion("2023-06-01"),
transport.WithRetryPolicy(myRetryPolicy),
transport.WithClassifier(&MyClassifier{}),
)
flyto.ModelInfo 字段(pkg/flyto)¶
flyto.ModelInfo 是模型元数据的规范类型,config.ModelConfig 是其类型别名.
| 字段 | 类型 | 说明 |
|---|---|---|
ID |
string | API 使用的模型 ID |
DisplayName |
string | 展示名称 |
Provider |
string | provider 标识 |
ContextWindow |
int | 上下文窗口大小(token 数) |
MaxOutputTokens |
int | 最大输出 token 数 |
InputPricePer1M |
float64 | 输入价格(USD / 百万 token) |
OutputPricePer1M |
float64 | 输出价格(USD / 百万 token) |
CacheReadPricePer1M |
float64 | 缓存读取价格(USD / 百万 token) |
CacheWritePricePer1M |
float64 | 缓存写入价格(USD / 百万 token) |
SupportsCaching |
bool | 是否支持 Prompt Caching |
SupportsThinking |
bool | 是否支持扩展思考 |
SupportsImages |
bool | 是否支持图片输入 |
flyto.Block 变更¶
flyto.Block 的 ThinkingSignature 字段已替换为 ProviderMetadata map[string]string:
ProviderMetadata存储 Provider 特有的不透明数据(如签名,ID 等)- 引擎不解读此字段,原样回传给 Provider
- 各 Provider 自行定义 key 约定(如 Anthropic 用
"signature"key)