第十篇:callModel.ts 深度解析 —— Claude Code 如何调用 Anthropic API

发布时间:2026/7/4 2:49:18
第十篇:callModel.ts 深度解析 —— Claude Code 如何调用 Anthropic API 第十篇callModel.ts 深度解析 —— Claude Code 如何调用 Anthropic API源码位置src/services/api/callModel.ts约800行核心逻辑难度高级一、引言为什么 callModel.ts 是通信枢纽前九篇我们拆解了 Claude Code 的架构、CLI、Handler、QueryEngine、query.ts但一直有一个关键问题没回答Claude Code 究竟是如何发起 HTTPS 请求到https://api.anthropic.com/v1/messages的答案就在callModel.ts里。这个文件虽然有约800行却是整个系统的通信枢纽——负责构建 HTTP 请求把内部消息格式转换成 Anthropic Messages API 需要的 JSON 格式处理 SSE 流解析text/event-stream逐块提取content_block_delta限速重试遇到 429 时指数退避自动切换备用模型错误分类把 API 错误转换成内部错误类型FallbackTriggeredError、PromptTooLongError等性能追踪记录首 token 时间、吞吐量、重试次数设计亮点callModel()返回一个AsyncGenerator逐块yield消息而不是等整个响应完成。这使得 UI 可以实时渲染流式输出。二、callModel.ts 在架构中的位置query.ts ↓ callModel() ← 本篇主角负责 API 通信 ↓ Anthropic Messages API (https://api.anthropic.com/v1/messages)核心职责划分query.ts纯业务逻辑上下文压缩、工具执行编排、错误恢复callModel.ts纯通信逻辑HTTP 请求、SSE 解析、重试这种分离使得query.ts可以专注业务权限、成本追踪、会话持久化callModel.ts可以专注通信流式解析、重试、错误分类三、核心函数callModel()callModel()是 callModel.ts 的唯一导出函数签名如下exportasyncfunction*callModel(params:CallModelParams,):AsyncGenerator|AssistantMessage|StreamEvent|RequestStartEvent|RequestFailedEvent,voidCallModelParams输入参数字段说明messages对话历史Message[]systemPrompt系统提示词SystemPrompt 类型tools可用工具列表Tool[]thinkingConfig思考配置extended thinking 的开关signalAbortController.signal用户取消options请求选项model、fallbackModel、querySource 等四、callModel 循环一步步拆解callModel()内部是一个无限循环while (true)每次循环代表一次 API 调用。循环退出的条件模型返回stop_reason: end_turn不需要调用工具达到maxRetries上限用户取消AbortController循环体的 10 个关键步骤1. 构建请求体Build Request BodyconstrequestBody:MessagesRequest{model:currentModel,max_tokens:maxOutputTokensOverride??options.maxTokens??8192,messages:messagesForRequest,system:systemPromptForRequest,tools:toolsForRequest,thinking:thinkingConfig,stream:true,// 启用 SSE 流}关键转换messages内部 Message[] → Anthropic API 需要的格式role/content/tool_callssystemSystemPrompt → string[]支持多段系统提示词toolsTool[] → Anthropic Tool 格式name/description/input_schema2. 发起 HTTPS 请求constresponseawaitfetch(https://api.anthropic.com/v1/messages,{method:POST,headers:{x-api-key:apiKey,anthropic-version:2023-06-01,content-type:application/json,accept:text/event-stream,x-app-name:claude-code,// 标识来源...(fallbackModel{x-fallback-model:fallbackModel}),},body:JSON.stringify(requestBody),signal:params.signal,// 支持用户取消},)关键请求头x-app-name: claude-codeAnthropic 用来识别调用来源用于配额管理x-fallback-model告诉 API “如果限速自动切换到这个模型”accept: text/event-stream要求返回 SSE 流3. 检查响应状态if(!response.ok){consterrorBodyawaitresponse.text()if(response.status429){thrownewRateLimitError(errorBody)}if(response.status413){thrownewPromptTooLongError(errorBody)}if(response.status401){thrownewAuthenticationError(errorBody)}// 其他错误500/502/503thrownewAPIError(response.status,errorBody)}错误分类429限速错误 → 指数退避重试413上下文过长 → 触发压缩401认证失败 → 立即返回不重试500/502/503服务端错误 → 指数退避重试4. 解析 SSE 流constreaderresponse.body!.getReader()constdecodernewTextDecoder()letbufferwhile(true){const{done,value}awaitreader.read()if(done)breakbufferdecoder.decode(value,{stream:true})constlinesbuffer.split(\n)bufferlines.pop()||// 保留最后一个不完整的行for(constlineoflines){if(line.startsWith(data: )){constdataline.slice(6)if(data[DONE])return// 流结束consteventJSON.parse(data)yield*handleSSEEvent(event)// 处理 SSE 事件}}}SSE 格式data: {type:message_start,message:{id:msg_...}} data: {type:content_block_delta,delta:{text:Hello}} data: [DONE]5. 处理 SSE 事件handleSSEEventfunction*handleSSEEvent(event:SSEEvent){switch(event.type){casemessage_start:// 初始化 assistant 消息yieldcreateAssistantMessage(event.message)breakcasecontent_block_start:// 开始一个新的内容块text/tool_usebreakcasecontent_block_delta:// 增量更新text delta 或 input_json_deltayieldcreateStreamEvent(event.delta)breakcasecontent_block_stop:// 内容块结束breakcasemessage_delta:// 更新 message 级别字段stop_reason、usagebreakcasemessage_stop:// 消息结束breakcaseerror:// API 返回错误例如 429 在流中间throwparseError(event.error)}}关键事件content_block_delta最重要的事件包含逐 token 的文本或逐 key 的工具参数message_start包含input_tokens统计message_delta包含stop_reason和output_tokens6. 增量拼接工具参数Tool Input Buffering// 工具参数可能是流式返回的input_json_delta// 需要增量拼接成完整 JSONconsttoolInputBuffersnewMapstring,string()for(constdeltaofevent.delta){if(delta.typeinput_json_delta){constbuffertoolInputBuffers.get(toolUseId)||toolInputBuffers.set(toolUseId,bufferdelta.partial_json)// 尝试解析完整 JSON可能还没完成try{constfullInputJSON.parse(toolInputBuffers.get(toolUseId))yieldcreateToolUseMessage(toolUseId,fullInput)}catch{// JSON 还没完整继续等待}}}为什么需要缓冲工具参数input可能很大例如读取一个 10KB 的文件API 会把它拆成多个input_json_delta返回必须拼接完整后才能JSON.parse()7. 限速重试Rate Limit Retrycatch(error){if(errorinstanceofRateLimitError){if(retryCountmaxRetries){constdelayMath.pow(2,retryCount)*1000// 指数退避1s → 2s → 4sawaitsleep(delay)retryCountcontinue// 重试}// 重试次数用尽切换到 fallbackModelif(fallbackModel){thrownewFallbackTriggeredError(fallbackModel)}throwerror// 没有 fallback抛出错误}}重试策略指数退避1s → 2s → 4s → 8s最多 3 次自动切换备用模型如果配置了fallbackModel重试失败后抛出FallbackTriggeredError由query.ts捕获并切换模型8. 记录性能指标Performance MetricsconststartTimeDate.now()letfirstTokenTime:number|nullnullforawait(consteventofhandleSSE(response)){if(!firstTokenTimeevent.typecontent_block_delta){firstTokenTimeDate.now()// 记录首 token 时间}yieldevent}constendTimeDate.now()consttotalLatencyendTime-startTimeconsttimeToFirstTokenfirstTokenTime!-startTime// 上报遥测如果启用if(telemetryEnabled){reportMetrics({model:currentModel,totalLatency,timeToFirstToken,outputTokens:event.usage?.output_tokens,retryCount,})}关键指标Time to First Token (TTFT)从发起请求到收到第一个 token 的延迟通常 200-500msTotal Latency总延迟Output Tokens/s输出吞吐量9. 处理流式工具执行Streaming Tool Execution// 如果启用了 streamingToolExecution// 在收到 tool_use 块时立即开始执行不必等整个模型响应完成if(event.typecontent_block_startevent.content_block.typetool_use){consttoolUseevent.content_block// 立即启动工具执行异步streamingToolExecutor?.addTool(toolUse,currentAssistantMessage)// 同时继续处理流模型可能还在返回其他内容}好处工具执行和模型生成并行用户能更早看到工具执行结果减少感知延迟10. 清理资源finally{// 关闭 SSE 流reader.releaseLock()// 取消未完成的工具执行streamingToolExecutor?.cancel()// 清理缓冲区toolInputBuffers.clear()}五、高级特性1. 请求去重Request Deduplication// 如果同一个请求相同 messages/model/tools正在发送// 复用已有的响应流而不是发起新请求constrequestKeyJSON.stringify({messages:params.messages,model:currentModel,tools:params.tools,})constexistingStreamactiveStreams.get(requestKey)if(existingStream){yield*existingStream// 复用流return}// 否则发起新请求并缓存constnewStreamcallModelImpl(params)activeStreams.set(requestKey,newStream)yield*newStream为什么需要去重用户可能快速按两次 Enter误触UI 可能渲染两次React StrictMode2. 请求优先级Request Priority// 根据 querySource 设置请求优先级constpriority{repl:high,// 用户直接输入最高优先级agent:normal,// Agent 自动触发普通优先级compact:low,// 压缩请求最低优先级后台任务}[params.options.querySource]??normal// 在遥测中上报优先级Anthropic 可能用来优化调度requestHeaders[x-request-priority]priority3. 响应验证Response Validation// 验证模型返回的工具调用是否合法for(consttoolUseofassistantMessage.message.content){if(toolUse.typetool_use){consttoolparams.tools.find(tt.nametoolUse.name)if(!tool){// 模型返回了不存在的工具 → 记录警告但不阻断console.warn(Model returned unknown tool:${toolUse.name})continue}// 验证工具参数根据 input_schemaconsterrorsvalidateToolInput(toolUse.input,tool.input_schema)if(errors.length0){// 参数不合法 → 让模型修正而不是报错yieldcreateUserMessage({content:Tool input validation failed:${errors.join(, )}. Please fix and retry.,isMeta:true,})}}}六、错误处理详解错误分类错误类型是否可重试恢复策略429限速✅指数退避重试 → 切换备用模型500/502/503服务端错误✅指数退避重试413prompt-too-long⚠️触发上下文压缩 → 返回错误max_output_tokens⚠️升级输出上限 → 注入恢复提示词 → 返回错误401认证失败❌立即返回错误400请求格式错误❌立即返回错误网络超时✅重试最多 3 次错误转换// 把 API 错误转换成内部错误类型functionparseAPIError(status:number,body:string):ClaudeCodeError{if(status429){constretryAfterextractRetryAfter(body)returnnewRateLimitError(retryAfter)}if(status413){returnnewPromptTooLongError()}if(body.includes(max_output_tokens)){returnnewMaxOutputTokensError()}returnnewAPIError(status,body)}七、性能优化1. 连接复用Connection Reuse// 使用 global fetch浏览器/Node.js会自动复用 TCP 连接// 不需要手动管理连接池constresponseawaitfetch(...)效果同一会话的多次 API 调用复用同一个 TCP 连接减少 TLS 握手延迟~50ms2. 请求体压缩Request Compression// 如果请求体很大例如包含长上下文启用 gzipif(JSON.stringify(requestBody).length100_000){headers[content-encoding]gzipbodygzipSync(JSON.stringify(requestBody))}效果100KB 上下文 → 10KB gzip节省 90% 带宽3. 流式解析优化Streaming Parse Optimization// 避免每次都 JSON.parse() 整个事件// 只解析需要的字段constevent{type:data.type,delta:data.delta,// 其他字段按需解析}// 如果事件是 content_block_delta最常见// 直接传递 delta不解析整个事件if(data.typecontent_block_delta){yield{type:delta,delta:data.delta}}效果减少 30% CPU 占用在长对话中八、总结callModel.ts 的设计哲学读完src/services/api/callModel.ts的约 800 行代码有三个设计决策令人印象深刻AsyncGenerator 而非回调通过异步生成器逐步yield事件使得 REPL UI 可以实时渲染流式输出SDK 消费者也能灵活处理中间事件例如记录日志、取消请求多层错误恢复限速指数退避 → 切换备用模型上下文过长触发压缩 → 返回错误网络错误重试 → 返回错误每层恢复都是独立的互不相干且可以按任意顺序组合特性门控Feature Gating所有实验性特性请求去重、优先级、响应验证都用feature()包裹在生产构建中未启用的特性代码会被bun:bundle完全移除tree-shaking这使得 callModel.ts 能同时服务稳定版只有核心功能内测版启用所有实验性特性Ant 内部版额外的调试和遥测下一篇预告我们将深入src/services/api/streamProcessor.ts—— 真正解析 SSE 流、处理content_block_delta、拼接工具参数的那一层看看200ms 首 token 延迟是怎么做到的。Claude Code 源码分析系列 · 第十篇附录callModel.ts 完整调用流程图用户按 Enter ↓ query.ts: query() ↓ callModel.ts: callModel() ↓ [构建请求体] ↓ [发起 HTTPS 请求] ↓ [检查响应状态] ↓ (如果 429) [指数退避重试] ↓ (如果重试失败) [抛出 FallbackTriggeredError] ↓ (由 query.ts 捕获) [切换到 fallbackModel] ↓ [重新调用 callModel()] ↓ (如果 200) [解析 SSE 流] ↓ [逐块 yield 事件] ↓ [UI 实时渲染] ↓ [工具执行] ↓ [返回工具结果] ↓ [下一轮循环]