流式输出SSE实操
流式输出SSE实操
😊 面试官您好,关于 SSE 流式输出在 AI 场景的原理与实操,我从核心本质、Java 落地、AI 链路、生产踩坑四个方面来回答。
📌 SSE 核心本质
SSE 全称 Server-Sent Events,是基于 HTTP 的服务端单向推送技术,也是目前 AI 对话流式输出的主流方案。
- 底层就是一个长连接 HTTP 响应,响应头声明
Content-Type: text/event-stream - 数据按固定格式逐段发送,每段以
data: 内容\n\n作为分隔符 - 浏览器原生支持
EventSource对象,自带断线自动重连,开发成本极低
🆚 和 WebSocket 的核心区别
很多人会拿它和 WebSocket 比,我整理了核心差异:
| 对比维度 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 服务端→客户端 单向 | 双向全双工 |
| 协议 | 复用 HTTP/HTTPS | 独立 ws/wss 协议 |
| 自动重连 | 原生支持 | 需手动实现 |
| 开发成本 | 极低,和普通接口一致 | 较高,需处理握手、心跳 |
| AI 场景适配 | 完美匹配(单向输出 token) | 能力冗余,过重 |
💻 Java 实操代码(Spring Boot)
我在项目里用过两种实现方式,都是生产验证过的:
方式一:Spring MVC + SseEmitter(最常用)
适合绝大多数传统 Spring Boot 项目:
@RestController
@RequestMapping("/ai")
public class AiStreamController {
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat(String prompt) {
// 0L表示不设置超时,由业务生命周期控制
SseEmitter emitter = new SseEmitter(0L);
// 异步调用大模型,流式转发
CompletableFuture.runAsync(() -> {
try {
// 模拟大模型逐token生成返回
List<String> tokens = aiService.streamCall(prompt);
for (String token : tokens) {
emitter.send(SseEmitter.event().data(token));
}
// 约定结束标识,通知前端关闭连接
emitter.send("[DONE]");
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}前端调用(原生 JS)
const sse = new EventSource(`/ai/chat?prompt=介绍一下Java`);
sse.onmessage = (e) => {
if (e.data === '[DONE]') {
sse.close();
return;
}
// 追加内容,实现打字机效果
document.getElementById('answer').innerText += e.data;
};
sse.onerror = () => sse.close();🤖 AI 场景完整调用链路
实际生产中,Java 后端是作为前端和大模型的中转代理层,完整流程如下:
✅ 为什么 AI 都用 SSE?
大模型是边推理边输出的增量模式,SSE 可以完美实现打字机效果,大幅降低用户等待感知;而且基于 HTTP,对网关、防火墙、代理兼容性拉满,基础设施几乎不用改造。
⚠️ 生产环境必踩的坑
这几个都是我线上踩过的点,面试提出来很加分:
- 网关超时:Nginx / 网关默认读超时一般 30s,必须调大
proxy_read_timeout,否则长连接会被中间节点断开 - 连接数限制:HTTP/1.1 同域名最多 6 个 SSE 并发,HTTP/2 无此限制,生产建议开启 HTTP/2
- 连接泄漏:一定要注册
onTimeout、onError、onCompletion回调,异常时主动 complete,避免 SseEmitter 对象泄漏 - 心跳保活:长时间无数据时,定期发注释行
: heartbeat\n\n保活,防止连接被运营商 / 网关回收 - 中文乱码:响应头必须指定
charset=UTF-8,否则部分浏览器会出现乱码
以上就是我对 SSE AI流式输出的全部理解和实操经验。
真实面试模拟
真实面试模拟
面试官 👨💼:
“你好,我看你项目里用到了大模型对话,能说一下为什么需要流式输出吗?不流式会怎样?”
我 🙋♂️:
面试官好。核心原因是大模型生成 token 是一个一个出的,从收到 prompt 到整段话生成完,可能要 5~10 秒。
如果等全部生成完再返回,用户就会看到页面卡住好几秒,体验非常差。
流式输出就是:生成一个 token,就往前端推一个 token,让对话像打字一样实时出来,用户无等待感。😊
面试官 👨💼:
“实现这种效果,你技术选型是SSE 还是 WebSocket?为什么?”
我 🙋♂️:
我选 SSE,因为 AI 对话场景下它比 WebSocket 更合适。我简单对比一下:
| 对比点 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 服务器→客户端单向流 | 双向全双工 |
| 协议 | HTTP/1.1 或 HTTP/2 | 独立 ws:// 协议 |
| 部署复杂度 | ✅ 低,Nginx 无需特殊配置 | ❌ 需升级连接,代理配置复杂 |
| 浏览器 API | EventSource,自动重连 | 需额外库,重连需自己实现 |
| 场景匹配 | AI 流式对话、通知 | 即时聊天、协作编辑 |
AI 对话里客户端只需要发一次 prompt,后面全是服务器推送 token,几乎是纯粹的单向推送。
用 SSE 部署省心,还能复用 HTTP 生态的负载均衡、鉴权等,所以选了它。✅
面试官 👨💼:
“好,那在 Spring Boot 里怎么落地?给我一段你实际写的核心代码看看。”
我 🙋♂️:
最常用的是 Spring 提供的 SseEmitter。我写一段关键代码(演示用):
@GetMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter chatStream(@RequestParam String prompt) {
SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时
// 必须异步,避免阻塞 Tomcat 线程
Executors.newSingleThreadExecutor().execute(() -> {
try {
for (String token : mockAIModel(prompt)) {
emitter.send(SseEmitter.event()
.name("message") // 前端可以按事件监听
.data(token));
TimeUnit.MILLISECONDS.sleep(20); // 模拟生成延时
}
emitter.complete(); // 正常结束
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}🔍 这里面几个要点:
- 必须设超时,不然连接会挂死。
- 异步执行,不占主线程,否则并发一高服务就崩了。
- 事件命名
.name("message"),前端可以addEventListener("message", ...)拿到数据。
面试官 👨💼:
“这是模拟的,如果接的是真实的 AI 模型,比如对接 OpenAI 的流式接口,你怎么处理?”
我 🙋♂️:
真实模型流式接口一般也返回 text/event-stream,我会在 Java 里透传这个流,直接用 HttpServletResponse 来写,更灵活:
@GetMapping(value = "/ai/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public void proxyAIStream(@RequestParam String prompt, HttpServletResponse response) throws IOException {
// 关闭 Nginx 缓冲,强制刷新
response.setHeader("Cache-Control", "no-cache");
response.setHeader("X-Accel-Buffering", "no"); // ⚠️ 关键
response.setContentType("text/event-stream");
PrintWriter writer = response.getWriter();
// 请求 AI 模型流式接口
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpPost post = new HttpPost("https://api.openai.com/v1/chat/completions");
// ... 设置 body、apiKey
post.setEntity(new StringEntity(buildJsonBody(prompt), StandardCharsets.UTF_8));
try (CloseableHttpResponse aiResponse = client.execute(post)) {
BufferedReader reader = new BufferedReader(
new InputStreamReader(aiResponse.getEntity().getContent()));
String line;
while ((line = reader.readLine()) != null) {
writer.write(line + "\n");
writer.flush(); // 逐行刷给前端
}
}
}
}用 SseEmitter 有时候超时不够灵活,直接操作 Writer 可以长时间保持流。
面试官 👨💼:
“提到 Nginx 缓冲,这确实是生产里容易踩坑的地方,你能展开说说遇到过哪些坑吗?”
我 🙋♂️:
当然,主要有三个坑:
① Nginx 代理缓冲导致“假流式” ⚡
默认 Nginx 会缓冲上游响应,到了一定量才发给客户端,用户看到的就是卡住然后突然全出来。
解决方案就是我代码里加的 X-Accel-Buffering: no 响应头,或者在 Nginx 配置里 proxy_buffering off;。
② 长连接占用线程
如果用同步阻塞方式,每个 SSE 连接会占用一个 Tomcat 工作线程,几百个连接就能拖垮服务。
所以必须异步 + 流式 I/O,用 Callable、DeferredResult 或上面的 execute() 线程池。
③ 前端 EventSource 不能自定义请求头
如果接口需要放 token 认证,EventSource 发不了 Authorization 头。
👉 方案:查询参数里短暂传 token,或者用 fetch 手动解析流(可以带头):
const res = await fetch('/ai/stream', {
headers: { 'Authorization': 'Bearer xxx' }
});
const reader = res.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
let text = decoder.decode(value, { stream: true });
// 解析 SSE 格式 "data: ..."
}面试官 👨💼:
“如果前端断线重连了,你的服务需要重新生成一次回答吗?怎么优化?”
我 🙋♂️:
好问题。SSE 的 EventSource 断线会自动重连,但服务端默认确实要从头开始生成,很浪费。
实践中可以:
- 客户端在 localStorage 里记录已接收的最后一个 token 序号。
- 重连时把序号带在请求参数里(比如
lastId=32)。 - 后端解析后,如果模型支持,就跳过前面 32 个 token,从第 33 个开始推送;如果不支持,就缓存之前生成的内容,直接续传。
不过很多模型 API 不支持“跳过”,我们会自建一层缓存层,用 sessionId 绑定生成结果,重连后直接读缓存推送剩余部分。
面试官 👨💼:
“整体流程能不能用图简单表示一下?”
我 🙋♂️:
可以,整个交互流是下面这样 🌀:
面试官 👨💼:
“最后,如果让你用一句话总结 AI 场景下 SSE 的核心要点,你会怎么说?”
我 🙋♂️:
AI 流式输出首选 SSE,因为它基于 HTTP 单向推送,部署简单、浏览器原生支持;Java 实现用 SseEmitter 或直接写 HttpServletResponse,但务必解决 Nginx 缓冲、异步化、鉴权穿透这三个生产问题,才能真正扛得住并发。
面试官 👨💼:
“好的,条理很清晰,看到你对流式输出了解比较深入,可以了。”
