实时通讯方案

性能优化

# 一、需求分析

原先 AI 生成题目的场景响应较慢,如果题目数过多,容易产生请求超时;并且界面上没有响应,用户体验不佳。

需要 流式化改造 AI 生成题目接口,一道一道地实时返回已生成题目给前端,而不是让前端请求一直阻塞等待,最后一起返回,提升用户体验且避免请求超时。

首先智谱 AI 为我们提供了流式响应的支持,数据已经可以一点一点地返回给后端了,那么我们要思考的问题是如何让后端接收到的一点一点的内容实时返回给前端?

需要进行一些调研,来了解前后端实时通讯的方案。

# 二、前后端实时通讯方案

几种主流的实现方案:

  1. 1)轮询(前端主动去要)

    前端间隔一定时间就调用后端提供的结果接口,比如 200ms 一次,后端处理一些结果就累加放置在缓存中。

  2. 2)SSE(后端推送给前端)

    前端发送请求并和后端建立连接后,后端可以实时推送数据给前端,无需前端自主轮询。

  3. 3)WebSocket

    全双工协议,前端能实时推送数据给后端(或者从后端缓存拿数据),后端也可以实时推送数据给前端。

如果对 SSE 技术比较陌生,下面重点讲解。

# 三、SSE 技术

# 基本概念

服务器发送事件(Server-Sent Events)是一种用于从服务器到客户端的 单向、实时 数据传输技术,基于 HTTP协议实现。

它有几个重要的特点:

  1. 单向通信:SSE 只支持服务器向客户端的单向通信,客户端不能向服务器发送数据。
  2. 文本格式:SSE 使用 纯文本格式 传输数据,使用 HTTP 响应的 text/event-stream MIME 类型。
  3. 保持连接:SSE 通过保持一个持久的 HTTP 连接,实现服务器向客户端推送更新,而不需要客户端频繁轮询。
  4. 自动重连:如果连接中断,浏览器会自动尝试重新连接,确保数据流的连续性。

# SSE 数据格式

SSE 数据流的格式非常简单,每个事件使用 data 字段,事件以两个换行符结束。还可以使用 id 字段来标识事件,并且 retry 字段可以设置重新连接的时间间隔。

示例格式如下:

data: First message\n
\n
data: Second message\n
\n
data: Third message\n
id: 3\n
\n
retry: 10000\n
data: Fourth message\n
\n
1
2
3
4
5
6
7
8
9
10

# 自主实现 SSE

实现 SSE 非常简单,无论是 Java 服务端还是前端 HTML5 都支持了 SSE,以下内容仅作了解。

1)服务器端

需要生成符合 SSE 格式的响应,并设置合适的 HTTP 头。使用 Servlet 来实现 SSE 示例:

import java.io.IOException;
import java.io.PrintWriter;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet("/sse")
public class SseServlet extends HttpServlet {
    protected void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("UTF-8");
        PrintWriter writer = response.getWriter();
        for (int i = 0; i < 10; i++) {
            writer.write("data: Message " + i + "\n\n");
            writer.flush();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        writer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

当然,如果是 Spring 项目,还有更简单的实现方式,直接给请求返回 SseEmitter 对象即可。

注意,一定要使用 Get 请求!!!

示例代码:

@GetMapping("/sse")
public SseEmitter testSSE() {
    // 建立 SSE 连接对象,0 表示不超时
    SseEmitter emitter = new SseEmitter(0L);   
    ... 业务逻辑处理
    return emitter;
}
1
2
3
4
5
6
7

2)Web 前端

可以使用 JavaScript 的 EventSource 对象来连接和处理服务器发送的事件。示例代码:

// 创建 SSE 请求
const eventSource = new EventSource(
  "http://localhost:8080/sse"
);
// 接收消息
eventSource.onmessage = function (event) {
  console.log(event.data);
};
// 生成结束,关闭连接
eventSource.onerror = function (event) {
  if (event.eventPhase === EventSource.CLOSED) {
    eventSource.close();
  }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 应用场景

由于现代浏览器普遍支持 SSE,所以它的应用场景非常广泛,AI 对话就是 SSE 的一个典型的应用场景。

再举一些例子:

  1. 实时更新:股票价格、体育比赛比分、新闻更新等需要实时推送的应用。
  2. 日志监控:实时监控服务器日志或应用状态。
  3. 通知系统:向客户端推送系统通知或消息。

# 四、方案设计

# 方案对比

熟悉了 SSE 技术后,对比上述前后端实时通讯方案。

1)主动轮询其实是一种伪实时,比如每 3 秒轮询请求一次,结果后端在 0.1 秒就返回了数据,还要再等 2.9 秒,存在延迟。

2)WebSocket 和 SSE 虽然都能实现服务端推送,但 Websocket 会更复杂些,且是二进制协议,调试不方便。AI 对话只需要服务器单向推送即可,不需要使用双向通信,所以选择文本格式的 SSE。

# 最终方案

回归到本项目,具体实现方案如下:

1)前端向后端发送普通 HTTP 请求

2)后端创建 SSE 连接对象,为后续的推送做准备

3)后端流式调用智谱 AI,获取到数据流,使用 RxJava 订阅数据流

4)以 SSE 的方式响应前端,至此接口主流程已执行完成

5)异步:基于 RxJava 实时获取到智谱 AI 的数据,并持续将数据拼接为字符串,当拼接出一道完整题目时,通过 SSE 推送给前端。

6)前端每获取一道题目,立刻插入到表单项中

明确方案后,下面进行开发。

# 五、后端开发

# 1、封装通用的流式调用 AI 接口

跟之前的请求方法不同的是:

  • 将请求的 stream 参数为 true,表示开始流式调用。
  • 返回结果为 Flowable 类型,为流式结果。

代码如下:

/**
 * 通用流式请求(简化消息传递)
 *
 * @param systemMessage
 * @param userMessage
 * @param temperature
 * @return
 */
public Flowable<ModelData> doStreamRequest(String systemMessage, String userMessage, Float temperature) {
    // 构造请求
    List<ChatMessage> messages = new ArrayList<>();
    ChatMessage systemChatMessage = new ChatMessage(ChatMessageRole.SYSTEM.value(), systemMessage);
    ChatMessage userChatMessage = new ChatMessage(ChatMessageRole.USER.value(), userMessage);
    messages.add(systemChatMessage);
    messages.add(userChatMessage);
    return doStreamRequest(messages, temperature);
}

/**
 * 通用流式请求
 *
 * @param messages
 * @param temperature
 * @return
 */
public Flowable<ModelData> doStreamRequest(List<ChatMessage> messages, Float temperature) {
    // 构造请求
    ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder()
    .model(Constants.ModelChatGLM4)
    .stream(Boolean.TRUE)
    .invokeMethod(Constants.invokeMethod)
    .temperature(temperature)
    .messages(messages)
    .build();
    ModelApiResponse invokeModelApiResp = clientV4.invokeModelApi(chatCompletionRequest);
    return invokeModelApiResp.getFlowable();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 2、新建 AI 生成题目的 SSE 接口

1)首先定义接口,注意 SSE 必须是 get 请求:

@GetMapping("/ai_generate/sse")
public SseEmitter aiGenerateQuestionSSE(
    AiGenerateQuestionRequest aiGenerateQuestionRequest) {
    ...
}
1
2
3
4
5

2)建立 SSE 连接对象,并返回:

// 建立 SSE 连接对象,0 表示不超时
SseEmitter emitter = new SseEmitter(0L);
return emitter;
1
2
3

3)调用 AI 获取数据流:1671423090896154626_0.4811694112607443

// AI 生成,sse 流式返回
Flowable<ModelData> modelDataFlowable = aiManager.doStreamRequest(GENERATE_QUESTION_SYSTEM_MESSAGE, userMessage, null);
1
2

4)异步对流进行解析和转换,转为单个字符,便于处理:

modelDataFlowable
// 异步线程池执行
.observeOn(Schedulers.io())
.map(chunk -> chunk.getChoices().get(0).getDelta().getContent())
.map(message -> message.replaceAll("\\s", ""))
.filter(StrUtil::isNotBlank)
.flatMap(message -> {
    // 将字符串转换为 List<Character>
    List<Character> charList = new ArrayList<>();
    for (char c : message.toCharArray()) {
        charList.add(c);
    }
    return Flowable.fromIterable(charList);
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14

5)异步拼接 JSON 单题数据,并利用 SSE 推送至前端

括号匹配算法:

https://leetcode.cn/problems/valid-parentheses/description/1671423090896154626_0.530474413034121

// 左括号计数器,除了默认值外,当回归为 0 时,表示左括号等于右括号,可以截取
AtomicInteger counter = new AtomicInteger(0);
// 拼接完整题目
StringBuilder stringBuilder = new StringBuilder();

flowable.doOnNext(c -> {
    {
        // 识别第一个 [ 表示开始 AI 传输 json 数据,打开 flag 开始拼接 json 数组
        if (c == '{') {
            flag.addAndGet(1);
        }
        if (flag.get() > 0) {
            contentBuilder.append(c);
        }
        if (c == '}') {
            flag.addAndGet(-1);
            if (flag.get() == 0) {
                // 累积单套题目满足 json 格式后,sse 推送至前端
                // sse 需要压缩成当行 json,sse 无法识别换行
                emitter.send(JSONUtil.toJsonStr(contentBuilder.toString()));
                // 清空 StringBuilder
                contentBuilder.setLength(0);
            }
        }
    }
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

6)监听 flowable 完成事件,并开启订阅:

flowable
.doOnComplete(emitter::complete)
.subscribe()
1
2
3

完整代码如下:

@GetMapping("/ai_generate/sse")
public SseEmitter aiGenerateQuestionSSE(AiGenerateQuestionRequest aiGenerateQuestionRequest) {
    ThrowUtils.throwIf(aiGenerateQuestionRequest == null, ErrorCode.PARAMS_ERROR);
    // 获取参数
    Long appId = aiGenerateQuestionRequest.getAppId();
    int questionNumber = aiGenerateQuestionRequest.getQuestionNumber();
    int optionNumber = aiGenerateQuestionRequest.getOptionNumber();
    // 获取应用信息
    App app = appService.getById(appId);
    ThrowUtils.throwIf(app == null, ErrorCode.NOT_FOUND_ERROR);

    // 封装 Prompt
    String userMessage = getGenerateQuestionUserMessage(app, questionNumber, optionNumber);
    // 建立 SSE 连接对象,0 表示不超时
    SseEmitter emitter = new SseEmitter(0L);
    // AI 生成,sse 流式返回
    Flowable<ModelData> modelDataFlowable = aiManager.doStreamRequest(GENERATE_QUESTION_SYSTEM_MESSAGE, userMessage, null);
    StringBuilder contentBuilder = new StringBuilder();
    AtomicInteger flag = new AtomicInteger(0);
    modelDataFlowable
    // 异步线程池执行
    .observeOn(Schedulers.io())
    .map(chunk -> chunk.getChoices().get(0).getDelta().getContent())
    .map(message -> message.replaceAll("\\s", ""))
    .filter(StrUtil::isNotBlank)
    .flatMap(message -> {
        // 将字符串转换为 List<Character>
        List<Character> charList = new ArrayList<>();
        for (char c : message.toCharArray()) {
            charList.add(c);
        }
        return Flowable.fromIterable(charList);
    })
    .doOnNext(c -> {
        {
            // 识别第一个 [ 表示开始 AI 传输 json 数据,打开 flag 开始拼接 json 数组
            if (c == '{') {
                flag.addAndGet(1);
            }
            if (flag.get() > 0) {
                contentBuilder.append(c);
            }
            if (c == '}') {
                flag.addAndGet(-1);
                if (flag.get() == 0) {
                    // 累积单套题目满足 json 格式后,sse 推送至前端
                    // sse 需要压缩成当行 json,sse 无法识别换行
                    emitter.send(JSONUtil.toJsonStr(contentBuilder.toString()));
                    // 清空 StringBuilder
                    contentBuilder.setLength(0);
                }
            }
        }
    }).doOnComplete(emitter::complete).subscribe();
    return emitter;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

# 六、前端开发

1)在 AI 生成抽屉表单中补充一个 AI 生成按钮,绑定提交事件。

<a-form-item>
  <a-space>
    <a-button
      :loading="submitting"
    type="primary"
    html-type="submit"
    style="width: 120px"
    >
    {{ submitting ? "生成中" : "一键生成" }}
  </a-button>
  <a-button
:loading="submitting"
style="width: 120px"
@click="doSSESubmit"
  >
  {{ submitting ? "生成中" : "实时生成" }}
</a-button>
</a-space>
  </a-form-item>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

2)编写提交函数,遵循前面提到的 SSE 前端实现方法,先能够打印出 SSE 推送的消息、生成开始、生成结束的信息。1671423090896154626_0.4346634254510635

需要注意这里不能调用生成的 api,因为 axios 默认不支持 SSE。

3)当有内容生成时,向父页面插入题目(利用属性传递实现)

最近修改于: 2024/8/25 00:39:31
和宇宙温柔的关联
房东的猫