SpringAI 实现流式调用效果, 最小与最佳实践

in 工作记录 with 0 comment

本文专注于最佳流式调用实现,所以与模型对话采用一问一答的方式。

开们见山

@RestController
@AllArgsConstructor
@RequestMapping("/simple")
public class SimpleController {

    private final StreamingChatClient streamingChatClient;

    @GetMapping(value = "/stream")
    public Flux<String> stream(String message) {
        return streamingChatClient.stream(message);
    }# 
}

这是一个实现流式调用的最小例子, 直接返回 Flux, SpringMVC 目前是支持这样的写法,完全的异步。

查看下响应头:

image.png

可以看到 content-type 为 text/plain,

但没有关系,你依然可以采用流式读取,在前端实现打字机效果。

const response = await fetch(url, options);
if (response.ok) {
    const reader = response.body.getReader();
    let receivedLength = 0;
    while (true) {
      const { done, value } = await reader.read();
      if (done) {
          console.log('Data fully received');
          break;
      }
      const chunk = new TextDecoder("utf-8").decode(value);
      console.log("Chunk received: " + chunk);
      receivedLength += value.length;
    }
} 

但是这样的写法并不是太好,content-type 让人看上去的第一眼下意识认为是同步的请求,也没有指定字符集和更多的信息。

用 WebSocket 来实现,又显得过于复杂,只能来参考 SSE 实现了。

server-sent_events

https://www.ruanyifeng.com/blog/2017/05/server-sent_events.html

首先设置 正确的响应头,声明这是个流式返回

    private final HttpServletResponse response;

    /**
     * 给当前请求设置响应头
     */
    public void setResponseHeader() {
        // Set the content type to text/event-stream
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE + "; charset=UTF-8");
        response.addHeader("cache-control", "no-cache");
        response.addHeader("connection", "keep-alive");
    }

然后 Spring 会认为这是个 SEE 响应, 会按照 SSE 标准报文去封装你的响应。

这样:

data: message\n\n

image.png

SSE 格式的报文,对于实现流式会话来说,似乎有些臃肿,因为会话模型基本一次只返回一个词,并且报文格式对于对话来说并未卵用。

并且,SSE 协议规定只能用 GET 请求,但是想实现会话效果的话,很明显,需要 Body 传递报文信息。

但.

POST 请求难道不能流式返回吗?

当然可以!

第一个例子中 content-type 为 text/plain,你依然可以流式读取响应,只不过没有对返回数据的格式进行声明,这总是不太好的。

POST + 流式返回,已经没有完全遵守 SSE 协议了,再加个自定义报文格式又如何?

想要完全异步的,再 SpringMVC 中实现自定义响应内容, 你可以使用 ResponseBodyEmitter 。

  @PostMapping(value = "/async/stream")
    public ResponseBodyEmitter stream2(@RequestBody MessageHistory messageHistory) {

        sessionService.setResponseHeader();

        final ResponseBodyEmitter emitter = new ResponseBodyEmitter();

        // 模型名称, 按需处理
        String model = messageHistory.getModel();

        Message[] messages = sessionService.getMessages(messageHistory);
        streamingChatClient.stream(messages)
                .subscribe(
                        message -> {
                            try {
                                // emitter.send("data:" + message + "\n\n"); 这是遵守 SSE 规范的写法.这里不遵守。
                                emitter.send(message);
                            } catch (IOException e) {
                                emitter.completeWithError(e);
                            }
                        },
                        error -> emitter.completeWithError(error),
                        () -> emitter.complete()
                );

        return emitter;
    }

简单又直接的返回报文... 我觉得这很好。

image.png

这里是完整的示例代码

https://github.com/qq418745/spring-ai-example