腾网站建设,网站设计 模板,网页无法打开如何解决,成都建站模板公司一、前言 假如您跟随我的脚步#xff0c;学习到上一篇的内容#xff0c;到这里#xff0c;相信细心的您#xff0c;已经发现了#xff0c;在上一篇中遗留的问题。那就是IM服务过载的时候#xff0c;如何进行水平扩容#xff1f; 因为在每个IM服务中#xff0c;我们用JV…一、前言 假如您跟随我的脚步学习到上一篇的内容到这里相信细心的您已经发现了在上一篇中遗留的问题。那就是IM服务过载的时候如何进行水平扩容 因为在每个IM服务中我们用JVM缓存了用户与WS的通道的绑定关系并且使用Redis队列进行解耦。那扩展了IM服务实例之后如何确保Redis队列的消息能正常消费即如何能找回对应的用户通道别着急接下来我将给您做详细的解释。 二、术语
2.1.水平扩容 是指通过增加系统中的资源实例数量来提高系统的处理能力和吞吐量。在计算机领域水平扩容通常用于应对系统负载的增加或需要处理更多请求的情况。
2.2.无状态 无状态stateless是指系统或组件在处理请求时不依赖于之前的请求或会话信息。换句话说每个请求都是独立的系统不会在不同的请求之间保存任何状态或上下文信息。 在无状态系统中每个请求被视为一个独立的事件系统只关注当前请求所包含的信息和参数而不依赖于之前的请求历史。这使得系统更加简单、可伸缩和易于管理。 三、前置条件
3.1. 已经完成前两篇的学习 四、技术实现
4.1、实现思路 首先IM服务是有状态的AI服务是无状态每个实例中会缓存用户与WebSocket通道之间的信息。那是否可以采用中间共享存储的方式将状态信息保存至Redis或外部存储中答案是不行。WebScoket的通道信息无法进行序列化。 要实现IM服务水平扩容的方式有多种但目前我们采用以下的方案 每个IM服务保存对应的用户和WS通道的关系 每个IM服务对应唯一一个redis队列 前置的SLBApp入口能根据用户标识哈希将请求转发至指定的IM服务 当某一IM服务出现故障的时候由App端发起重连重新建立WebSocket连接。
4.2、调整配置文件
# 每个IM服务实例指定全局唯一的ID例如下面指定的node0
ws:server:node: 0
PS具体参数可以在外部指定作为JVM的运行参数传入
4.3、调整业务逻辑处理类
# 将原有Redis的单一队列名改为拼接上全局唯一ID的方式 # Redis中缓存的数据如下 4.4、调整任务处理类
# 将原有Redis的单一队列名改为拼接上全局唯一ID的方式 五、测试
# 这次换一下测试方式用离线页面的方式进行测试
5.1. 建立连接 5.2. 业务初始化 5.3. 业务对话 六、附带说明
6.1. BusinessHandler完整代码
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandler;
import lombok.extern.slf4j.Slf4j;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;/*** Description: 处理消息的handler*/
Slf4j
ChannelHandler.Sharable
Component
public class BusinessHandler extends AbstractBusinessLogicHandlerTextWebSocketFrame {public static final String LINE_UP_QUEUE_NAME AI-REQ-QUEUE;private static final String LINE_UP_LOCK_NAME AI-REQ-LOCK;private static final int MAX_QUEUE_SIZE 100;// Autowired
// private TaskUtils taskExecuteUtils;Autowiredprivate RedisUtils redisUtils;Autowiredprivate RedissonClient redissonClient;Autowiredprivate NettyConfig nettyConfig;Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {String channelId ctx.channel().id().asShortText();log.info(add client,channelId{}, channelId);}Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {String channelId ctx.channel().id().asShortText();log.info(remove client,channelId{}, channelId);}Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame)throws Exception {// 获取客户端传输过来的消息String content textWebSocketFrame.text();// 兼容在线测试if (StringUtils.equals(content, PING)) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.HEARTBEAT.getCode())).contents(心跳测试很高兴收到你的心跳包).build());return;}log.info(接收到客户端发送的信息: {}, content);Long userIdForReq;String msgType ;String contents ;try {ApiReqMessage apiReqMessage JSON.parseObject(content, ApiReqMessage.class);msgType apiReqMessage.getMsgType();contents apiReqMessage.getContents();userIdForReq apiReqMessage.getUserId();// 用户身份标识校验if (null userIdForReq || (long) userIdForReq 10000) {ApiRespMessage apiRespMessage ApiRespMessage.builder().code(String.valueOf(StatusCode.SYSTEM_ERROR.getCode())).respTime(String.valueOf(System.currentTimeMillis())).contents(用户身份标识有误!).msgType(String.valueOf(MsgType.SYSTEM.getCode())).build();buildResponseAndClose(channelHandlerContext, apiRespMessage);return;}if (StringUtils.equals(msgType, String.valueOf(MsgType.CHAT.getCode()))) {// 对用户输入的内容进行自定义违规词检测// 对用户输入的内容进行第三方在线违规词检测// 对用户输入的内容进行组装成Prompt// 对Prompt根据业务进行增强完善prompt的内容// 对history进行裁剪或总结检测history是否操作模型支持的上下文长度例如qwen-7b支持的上下文长度为8192// ...// 通过线程池来处理
// String messageId apiReqMessage.getMessageId();
// ListChatContext history apiReqMessage.getHistory();
// AITaskReqMessage aiTaskReqMessage AITaskReqMessage.builder().messageId(messageId).userId(userIdForReq).contents(contents).history(history).build();
// taskExecuteUtils.execute(aiTaskReqMessage);// 通过队列来缓冲boolean flag true;RLock lock redissonClient.getLock(LINE_UP_LOCK_NAME);String queueName LINE_UP_QUEUE_NAME-nettyConfig.getNode();//尝试获取锁最多等待3秒锁的自动释放时间为10秒if (lock.tryLock(3, 10, TimeUnit.SECONDS)) {try {if (redisUtils.queueSize(queueName) MAX_QUEUE_SIZE) {redisUtils.queueAdd(queueName, content);log.info(当前线程为{} 添加请求至redis队列,Thread.currentThread().getName());} else {flag false;}} catch (Throwable e) {log.error(系统处理异常, e);} finally {lock.unlock();}} else {flag false;}if (!flag) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.SYSTEM.getCode())).contents(当前排队人数较多请稍后再重试).build());}} else if (StringUtils.equals(msgType, String.valueOf(MsgType.INIT.getCode()))) {//一、业务黑名单检测多次违规永久锁定//二、账户锁定检测临时锁定//三、多设备登录检测//四、剩余对话次数检测//检测通过绑定用户与channel之间关系addChannel(channelHandlerContext, userIdForReq);String respMessage 用户标识: userIdForReq 登录成功;buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.INIT.getCode())).contents(respMessage).build());} else if (StringUtils.equals(msgType, String.valueOf(MsgType.HEARTBEAT.getCode()))) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.HEARTBEAT.getCode())).contents(心跳测试很高兴收到你的心跳包).build());} else {log.info(用户标识: {}, 消息类型有误不支持类型: {}, userIdForReq, msgType);}} catch (Exception e) {log.warn(【BusinessHandler】接收到请求内容{}异常信息{}, content, e.getMessage(), e);// 异常返回return;}}}6.2. TaskUtils完整代码
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;Component
Slf4j
public class TaskUtils implements ApplicationRunner {private static ExecutorService executorService Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);Autowiredprivate AIChatUtils aiChatUtils;Autowiredprivate RedisUtils redisUtils;Autowiredprivate NettyConfig nettyConfig;Overridepublic void run(ApplicationArguments args) throws Exception {while(true){String queueName BusinessHandler.LINE_UP_QUEUE_NAME-nettyConfig.getNode();
// 执行定时任务的逻辑String content redisUtils.queuePoll(queueName);if(StringUtils.isNotEmpty(content) StringUtils.isNoneBlank(content)){try{ApiReqMessage apiReqMessage JSON.parseObject(content, ApiReqMessage.class);String messageId apiReqMessage.getMessageId();String contents apiReqMessage.getContents();Long userIdForReq apiReqMessage.getUserId();ListChatContext history apiReqMessage.getHistory();AITaskReqMessage aiTaskReqMessage AITaskReqMessage.builder().messageId(messageId).userId(userIdForReq).contents(contents).history(history).build();execute(aiTaskReqMessage);}catch (Throwable e){log.error(处理消息出现异常,e);//将请求再次返回去队列//将请求丢弃//其他处理}}else{TimeUnit.SECONDS.sleep(1);}}}public void execute(AITaskReqMessage aiTaskReqMessage) {executorService.execute(() - {Long userId aiTaskReqMessage.getUserId();if (null userId || (long) userId 10000) {log.warn(用户身份标识有误!);return;}ChannelHandlerContext channelHandlerContext BusinessHandler.getContextByUserId(userId);if (channelHandlerContext ! null) {try {aiChatUtils.chatStream(aiTaskReqMessage);} catch (Throwable exception) {exception.printStackTrace();}}});}public static void destory() {executorService.shutdownNow();executorService null;}}