建设网站学习,怎么做阿里巴巴国际网站,宠物网站素材,wordpress精美网站目录 前言阅读对象阅读导航前置知识课程内容一、使用Netty实现一个通信框架需要考虑什么问题二、通信框架功能设计2.1 功能描述2.2 通信模型2.3 消息体定义2.4 心跳机制2.5 重连机制*2.6 Handler的组织顺序2.7 交互式调试 三、代码实现#xff1a;非必要。感兴趣的自行查看3.1… 目录 前言阅读对象阅读导航前置知识课程内容一、使用Netty实现一个通信框架需要考虑什么问题二、通信框架功能设计2.1 功能描述2.2 通信模型2.3 消息体定义2.4 心跳机制2.5 重连机制*2.6 Handler的组织顺序2.7 交互式调试 三、代码实现非必要。感兴趣的自行查看3.1 最外层的通信入口3.1.1 NettyRpcServer服务端通信入口3.3 NettyRpcClient客户端通信入口3.3 NettyRpcClient交互式调试 3.2 server包下3.2.1 ServerInitializer服务端的Handler链化3.2.2 handler包下所有的handler3.2.3 helper包工具包3.2.4 async包异步处理类 3.3 client包下3.3.1 ClientInitializer客户端的Handler链化3.3.2 handler包下所有的handler 3.4 common包下一些公用的定义3.4.1 NettyConstant一些公用常量3.4.2 helper包工具包3.4.3 codec包编解码反序列化工具 3.4 biz包下业务模拟 四、业务流程图 学习总结感谢 前言
阅读对象
有一定网络编程基础了解Netty常用API
阅读导航
系列上一篇文章《【Netty专题】Netty实战与核心组件详解》
前置知识
长连接 长连接也叫持久连接是指在TCP层握手成功后不立即断开连接并在此连接的基础上进行多次消息包括心跳交互直至连接的任意一方客户端OR服务端主动断开连接此过程称为一次完整的长连接。 SOCKET 连接后不管是否使用都保持连接的一种连接
短连接 短连接顾名思义与长连接的区别就是客户端收到服务端的响应后立刻发送FIN消息主动释放连接。也有服务端主动断连的情况凡是在一次消息交互发请求-收响应之后立刻断开连接的情况都称为短连接。短连接是建立在TCP协议上的有完整的握手挥手流程区别于UDP协议。SOCKET 连接发送数据接收完数据后马上断开连接的一种连接
课程内容
一、使用Netty实现一个通信框架需要考虑什么问题
这里已经假设我们确定使用Netty作为我们的网络编程框架了。但是我想跟楼主一样的初学者还是会有疑惑那我想使用Netty开发一个简单的通信交互程序该如何做到呢 讲真的 这个问题确实难倒了我。因为不在那个层次我甚至连【需要关心什么问题】都搞不懂。思来想去我只能以【没见过猪跑但吃过猪肉】的视角出发 大概说一下自己的想法了。
序列化和反序列化问题 Q1什么是序列化、反序列化 答我们知道数据在网络中传输不可能是原文传输的人家机器设备只认得二进制01串。所以把原文转换为字节流这个过程就是序列化反之则叫做反序列化。 Q2序列化有什么作用 答主要目的有【网络传输】及【对象持久化保存】。持久化保存知道啥意思吧就是存到各种数据库中。 编解码问题 Q3什么是编解码问题 答编解码就是将一个格式转变为另一个格式的过程。比如MP3格式转为MP4JSON转为XML。在我们这里就是将字节流反序列化后的数据如何转变为我们Java应用程序识别的数据结构 心跳保活机制 摘抄自【百度文心一言】。 心跳保活机制是一种维持网络连接长连接的机制它通过定时发送心跳包来检测双方是否存活。如果没有特意的设置某些选项或者实现应用层心跳包TCP空闲的时候是不会发送任何数据包。也就是说当一个TCP的socket客户端与服务端谁也不发送数据会一直保持着连接。这其中如果有一方异常掉线例如死机、路由被破坏、防火墙切断连接等另一端如果没有发送数据永远也不可能知道。这对于一些服务型的程序来说是灾难性的后果将会导致服务端socket资源耗尽。 因此需要心跳保活机制来维持连接的有效性及时有效地检测到一方的非正常断开保证连接的资源被有效的利用。心跳保活机制可以应用在TCP协议层实现例如使用TCP Keepalive也可以在应用层实现例如使用心跳包。在应用层实现心跳保活机制时通常由客户端向服务端发送自定义消息命令服务端收到消息后回复自定义的消息给客户端。如果服务端未收到消息则表示连接失败如果失败的次数达到指定上限后则重新发起连接。 公共消息体定义 这个无需多说哪怕是我们在做web开发期间也会定义一个统一口径 特别提醒 个人在学习的时候发现网上对于【序列化】和【编解码】这两个概念多少有点混淆或者说边界模糊。确实他们是有些关联的但区别也有。以下是摘抄自【百度文心一言】 编码器和解码器常用于处理数据在不同格式之间的转换 而序列化是将数据结构或对象状态转换为可以存储或传输的格式的过程。 因此编码器和解码器主要关注的是数据的表示和转换而序列化和反序列化主要关注的是对象状态的转换。 二、通信框架功能设计
2.1 功能描述
我在后面的编码中将围绕以下功能来实现一个简单的长连接通信框架。功能如下
基于 Netty 的 NIO 通信框架提供消息的编解码框架可以实现 POJO 的序列化和反序列化【编解码】与【序列化】一块消息内容防篡改机制就跟我们web开发的鉴权一样在处理之前先校验一下内容合法性提供基于 IP 地址的白名单接入认证机制断线重连机制
2.2 通信模型 模型解读如下 1客户端发送应用握手请求消息携带节点 ID 等有效身份认证信息 2服务端对应用握手请求消息进行合法性校验包括节点 ID 有效性校验、节点重复登录校验和 IP 地址合法性校验校验通过后返回登录成功的应用握手应答消息 3链路建立成功之后客户端发送业务消息 4链路成功之后服务端发送心跳消息 5链路建立成功之后客户端发送心跳消息 6链路建立成功之后服务端发送业务消息 7服务端退出时服务端关闭连接客户端感知对方关闭连接后被动关闭客户端连接 PS协议通信双方链路建立成功之后双方可以进行全双工通信无论客户端还是服务端都可以主动发送请求消息给对方所以通信方式有如下两种 TWO-WAY即需要响应的请求。如请求登录ONE-WAY即无需响应的请求。如日志记录 双方之间的心跳采用 Ping-Pong 机制当链路处于空闲状态时客户端主动发送Ping 消息给服务端服务端接收到 Ping 消息后发送应答消息 Pong 给客户端如果客户端连续发送 N 条 Ping 消息都没有接收到服务端返回的 Pong 消息说明链路已经挂死或者对方处于异常状态客户端主动关闭连接间隔周期 T 后发起重连操作直到重连成功。 2.3 消息体定义
在我的设计中把消息定义分为了两个部分消息头、消息体。代码如下
Getter
Setter
ToString
public class CommonMessage {/*** 消息头*/private CommonMessageHeader header;/*** 0-失败1-成功*/private Byte result;/*** 消息体*/private Object body;
}Getter
Setter
ToString
public final class CommonMessageHeader {/*** 消息体的MD5摘要用来做简单校验*/private String md5;/*** 服务标识*/private int severId;/*** 消息id*/private long msgID;/*** 消息类型枚举值。见MessageType*/private byte type;
}然后是消息类型
public enum MessageType {/*** 业务请求消息*/SERVICE_REQ((byte) 0),/*** 业务应答消息*/SERVICE_RESP((byte) 1),/*** 无需应答的业务请求消息*/SERVICE_REQ_ONE_WAY((byte) 2),/*** 心跳请求消息*/HEARTBEAT_REQ((byte) 99),/*** 心跳应答消息*/HEARTBEAT_RESP((byte) 100),;private byte value;MessageType(byte value) {this.value value;}public byte value() {return this.value;}
}2.4 心跳机制
心跳机制我估计大家多少能理解这个名字就起的很形象。当读或者写心跳消息发生 I/O 异常的时候说明已经中断此时需要立即关闭连接如果是客户端需要重新发起连接如果是服务端需要清空缓存的半包信息等到客户端重连。 是的两边都需要心跳检测毕竟是【全双工】 但是心跳机制的设计也是有点说法的。比如什么时候需要传心跳包过去发什么包过去。
先说发什么包过去。这个就比较简单了正常来说发一个空包就行了除非你有什么特别的要求。比如我就在消息定义中新增了一个类型简单标记一下而已。 再说什么时候发。
方案一最粗暴 最粗暴的当然是TCP握手完成之后开始启动一个心跳任务然后以固定的频率发送不管三次二十一我就要在存续期间一直发。这当然可以实现目标但是这【合李】吗
方案二小改进 很简单的道理啊如果我们互相之间本身就正在进行业务上的通信咱俩都正在【说话】呢你还发个心跳过来问我【你死没死】啊你礼貌吗这所以我们可以使用Netty提供的一个【写空闲检测】机制来完成。直接上源码给你们看
// IdleStateHandler。一个实现了InBound和OutBound的Handler
public IdleStateHandler(int readerIdleTimeSeconds,int writerIdleTimeSeconds,int allIdleTimeSeconds) {this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,TimeUnit.SECONDS);}参数解读
readerIdleTimeSeconds当在指定的时间段内没有执行读操作时将触发 IdleState.READER_IDLE。0表示禁用writerIdleTimeSeconds当在指定的时间段内没有执行写操作时将触发IdleState.WRITER_IDLE。0表示禁用allIdleTimeSeconds当在指定的时间段内没有进行读写操作时将触发IdleState.ALL_IDLE。0表示禁用
PS检测空闲连接以及超时对于及时释放资源来说是至关重要的。这就是心跳机制做的事情。因为很重要所以Netty也给我们预提供了这些Handler就是上面说的IdleStateHandler。
2.5 重连机制
如果链路中断等到INTERVAL时间后由客户端发起重连操作如果重连失败间隔周期INTERVAL后再次发起重连直到重连成功。 为了保持服务端能够有充足的时间释放句柄资源在首次断连时客户端需要等待INTERVAL时间之后再发起重连而不是失败后立即重连。 为了保证句柄资源能够及时释放无论什么场景下重连失败客户端必须保证自身的资源被及时释放包括但不限于SocketChannel、Socket 等。 重连失败后可以打印异常堆栈信息方便后续的问题定位。
*2.6 Handler的组织顺序
大家还记得吗Handler出入境可以是无序的但是同是入境、出境的Handler之间是局部有序的。这不难理解就跟JDK8的Stream一样前面对流的操作会影响后面的结果。所以顺序很重要。这边大概的模型如下 我想大家应该能理解为什么我的顺序是这样组织的吧…
写空闲监控或者性能监控放前面没毛病正常来说这个业务不会去操作原始报文粘包半包处理。开始对包数据做拆分了这一步肯定要在所有需要操作【业务报文】的前面做。为啥我都没拆包给你呢你咋知道这个就是你要的序列化反序列化。正常【2】之后拿到的就是【字节流业务报文】这个时候需要先【序列化/反序列化】再【编解码】这两步我合在一起做了【读空闲】即心跳机制。 【心跳】跟【认证申请/检查】需要分客户端跟服务端。客户端都没有登录呢没必要开启【心跳】对吧所以客户端的【心跳】在【认证申请】之后服务端我就不多解释了吧【心跳请求/应答】也是心跳机制里面的东西业务处理 大家好好思考下理论上5/6是可以随意调换顺序的毕竟【心跳包】是一种特殊的业务 我估计有很多人不理解【读空闲】跟【心跳请求/应答】的关系大家可以再看看【2.4 心跳机制】的【方案二】。我说他们都是心跳机制里面的怎么理解
读空闲其实就是一种事件监听机制。监听Channel上的【读事件】。当事件发生的时候触发对应事件并且往管道中传输说到这里了写空闲也知道了吧心跳请求/应答对上面说的事件的响应
2.7 交互式调试
写了通信的客户端、服务端怎么调试呢咱也没有可视化界面所以我就搞了一个最原始的通过Scanner输入命令的交互式调试方案。像这样 你懂我意思吧
三、代码实现非必要。感兴趣的自行查看
头疼。代码貌似也挺多的本来想打包压缩包上来然后只贴核心代码。但是我的电脑有加密软件打包上来的代码可能会有问题。代码包结构如下 红框内的pojo已经给过了咱就不重复上代码了。如果大家要本地运行的话可以跟我一样先创建好对应的包然后一个一个复制上去吧。真不是我不想给源码压缩包而是加密问题。给了你们打开也是乱码
3.1 最外层的通信入口 这三个类。
3.1.1 NettyRpcServer服务端通信入口
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.server.ServerInitializer;/*** RPC Server服务端** author zhangshen* date 2023/10/25 9:16* slogan 编码即学习注释断语义**/
Slf4j
public class NettyRpcServer {public static void main(String[] args) {try {start();} catch (InterruptedException e) {e.printStackTrace();log.error(【Netty服务器】启动失败);}}private static void start() throws InterruptedException {EventLoopGroup acceptLoopGroup new NioEventLoopGroup();EventLoopGroup reactorLoopGroup new NioEventLoopGroup();ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.group(acceptLoopGroup, reactorLoopGroup).channel(NioServerSocketChannel.class).localAddress(NettyConstant.PORT).childHandler(new ServerInitializer());serverBootstrap.bind().sync();log.info(【Netty服务器】启动成功);}
}上面这个很简单啦跟上一篇文章的使用示例如出一辙。不同的是在Pipeline中链化Handler的逻辑我单独抽出来写成一个ServerInitializer。后面会讲
3.3 NettyRpcClient客户端通信入口
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.client.ClientInitializer;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.MessageType;import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** RPC Client客户端** author zhangshen* date 2023/10/25 9:16* slogan 编码即学习注释断语义**/
Slf4j
public class NettyRpcClient {private Channel channel;private EventLoopGroup eventLoopGroup new NioEventLoopGroup();private volatile boolean userClose false;/*** 定时线程池用于断线重连*/private ScheduledExecutorService executor Executors.newScheduledThreadPool(1);public void connect() throws InterruptedException {try {Bootstrap bootstrap new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ClientInitializer());ChannelFuture sync bootstrap.connect(new InetSocketAddress(NettyConstant.SERVER_IP, NettyConstant.PORT)).sync();log.info(【Netty客户端】启动成功);channel sync.channel();channel.closeFuture().sync();} finally {if (userClose) {channel null;eventLoopGroup.shutdownGracefully().sync();} else {// 断线重连reconect();}}}/*** 断线重连*/private void reconect() {log.info(【Netty客户端】开始断线重连);executor.execute(() - {try {// 给操作系统足够的时间去释放相关的资源TimeUnit.SECONDS.sleep(1);connect();} catch (InterruptedException e) {e.printStackTrace();}});}public void sendMessage(Object msg) {if (channel null || !channel.isActive()) {throw new IllegalStateException(和服务器还未未建立起有效连接请稍后再试);}CommonMessage message MessageGenerateHelper.requestWithMsgId(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_REQ.value(), msg);log.info(【Netty客户端】发送消息。CommonMessage{}, message);channel.writeAndFlush(message);}public void sendOneWay(Object msg) {if (channel null || !channel.isActive()) {throw new IllegalStateException(和服务器还未未建立起有效连接请稍后再试);}CommonMessage message MessageGenerateHelper.requestWithMsgId(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_REQ_ONE_WAY.value(), msg);log.info(【Netty客户端】发送消息。CommonMessage{}, message);channel.writeAndFlush(message);}public void close() {userClose true;channel.close();}
}这个相对于服务端以及前面的使用示例的客户端确实稍微复杂一点。主要的变化是【断线重连】机制引起的
引入了【定时线程池】定时调用reconnect方法新增了reconnect()方法处理断线重连【断线重连】还得看是不是自己主动发起的【关闭】如果是自己主动发起的关闭肯定不能重连啊
3.3 NettyRpcClient交互式调试
import java.util.Scanner;/*** author zhangshen* date 2023/10/25 12:31* slogan 编码即学习注释断语义**/
public class ScannerCmdClient {public static void main(String[] args) throws InterruptedException {// 新建客户端NettyRpcClient client new NettyRpcClient();// 显示菜单栏showMenu();Scanner scanner new Scanner(System.in);while (true) {int cmd scanner.nextInt();switch (cmd) {case 1:client.connect();Thread.sleep(3000);break;case 2:client.sendMessage(客户端发送双端信息);break;case 3:client.sendOneWay(客户端发送ONE-WAY信息);break;case 4:client.close();case 5:showMenu();default:client.close();}}}/*** 展示菜单*/private static void showMenu() {System.out.println(请选择以下功能);System.out.println(【1】与服务端建立连接);System.out.println(【2】发送一个有响应的消息);System.out.println(【3】发送一个无响应的消息);System.out.println(【4】关闭连接);System.out.println(【5】显示菜单栏);}
}前面介绍过作用了不多说了
3.2 server包下
3.2.1 ServerInitializer服务端的Handler链化
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.codec.KryoDecodeHandler;
import org.tuling.io.rpc.common.codec.KryoEncodeHandler;
import org.tuling.io.rpc.server.handler.ServerHeartBeatHandler;
import org.tuling.io.rpc.server.handler.ServerLoginHandler;
import org.tuling.io.rpc.server.handler.ServerOrderHandler;/*** 服务端通道初始化器** author zhangshen* date 2023/10/25 9:44* slogan 编码即学习注释断语义**/
public class ServerInitializer extends ChannelInitializerSocketChannel {Overrideprotected void initChannel(SocketChannel ch) throws Exception {final ChannelPipeline pipeline ch.pipeline();// 添加【粘包/分包】处理器。 由Netty预备提供的pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0,2));pipeline.addLast(new LengthFieldPrepender(2));// 添加【序列化/反序列化】处理器开源序列化工具pipeline.addLast(new KryoDecodeHandler());pipeline.addLast(new KryoEncodeHandler());// 添加【心跳】处理器Netty预备提供的pipeline.addLast(new ReadTimeoutHandler(NettyConstant.HEARBEAT_FREQUENCY));// 添加业务处理器pipeline.addLast(new ServerLoginHandler());pipeline.addLast(new ServerHeartBeatHandler());pipeline.addLast(new ServerOrderHandler());}
}3.2.2 handler包下所有的handler 我就不一一说代码了只贴。没啥难度的最重要的还是顺序在【2.6 Handler的顺序组织】已经解释了一波了。
ServerHeartBeatHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;
import org.tuling.io.rpc.server.helper.SecurityCenterHelper;/*** 服务器心跳处理** author zhangshen* date 2023/10/25 10:33* slogan 编码即学习注释断语义**/
Slf4j
public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CommonMessage message (CommonMessage) msg;CommonMessageHeader header message.getHeader();if (header null) {log.error(【Netty服务器】非法消息);ctx.writeAndFlush(非法消息);ctx.close();ReferenceCountUtil.release(msg);return;}if (header.getType() ! MessageType.HEARTBEAT_REQ.value()) {ctx.fireChannelRead(msg);return;}// 处理心跳业务log.info(【Netty服务器】心跳应答);CommonMessage heartBeatResponse MessageGenerateHelper.success(-1, MessageType.HEARTBEAT_RESP.value(), null);ctx.writeAndFlush(heartBeatResponse);ReferenceCountUtil.release(msg);}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if(cause instanceof ReadTimeoutException){log.debug(【Netty服务器】客户端长时间未通信可能已经宕机关闭链路);SecurityCenterHelper.removeLoginUser(ctx.channel().remoteAddress().toString());ctx.close();}super.exceptionCaught(ctx, cause);}Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug(【Netty服务器】客户端已关闭连接);super.channelInactive(ctx);}
}ServerLoginHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;
import org.tuling.io.rpc.server.helper.SecurityCenterHelper;import java.net.InetSocketAddress;/*** 登录服务器处理器** author zhangshen* date 2023/10/25 10:05* slogan 编码即学习注释断语义**/
Slf4j
public class ServerLoginHandler extends ChannelInboundHandlerAdapter {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CommonMessage message (CommonMessage) msg;CommonMessageHeader header message.getHeader();if (header null) {log.error(【Netty服务器】非法消息);ctx.writeAndFlush(非法消息);ctx.close();ReferenceCountUtil.release(msg);return;}if (header.getSeverId() ! NettyConstant.LOGIN_SERVER_ID) {ctx.fireChannelRead(msg);return;}// 处理登录业务this.checkLogin(ctx, msg);ReferenceCountUtil.release(msg);}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 删除缓存SecurityCenterHelper.removeLoginUser(ctx.channel().remoteAddress().toString());ctx.close();}private void checkLogin(ChannelHandlerContext ctx, Object msg) {log.info(【Netty服务器】登录消息CommonMessage{}, msg);InetSocketAddress socketAddress (InetSocketAddress) ctx.channel().remoteAddress();String userLoginIP socketAddress.getAddress().getHostAddress();// 白名单校验boolean whiteIP SecurityCenterHelper.isWhiteIP(userLoginIP);if (!whiteIP) {String errorMessage 不在白名单内;log.error(【Netty服务器】{}, errorMessage);CommonMessage message MessageGenerateHelper.fail(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), errorMessage);ctx.writeAndFlush(message);ctx.close();ReferenceCountUtil.release(msg);return;}// 重复登录校验String userInfo ctx.channel().remoteAddress().toString();boolean repeatLogin SecurityCenterHelper.isRepeatLogin(userInfo);if (repeatLogin) {String errorMessage 重复登录;log.error(【Netty服务器】{}, errorMessage);CommonMessage message MessageGenerateHelper.fail(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), errorMessage);ctx.writeAndFlush(message);ctx.close();ReferenceCountUtil.release(msg);return;}// 通过校验记录SecurityCenterHelper.addLoginUser(userInfo);String successMessage 登录成功;log.info(【Netty服务器】{}, successMessage);CommonMessage message MessageGenerateHelper.success(NettyConstant.LOGIN_SERVER_ID, MessageType.SERVICE_RESP.value(), successMessage);ctx.writeAndFlush(message);}
}ServerOrderHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.biz.OrderInfo;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.EncryptHelper;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;
import org.tuling.io.rpc.server.async.AsyncBusiProcessor;import java.math.BigDecimal;/*** 订单业务处理类** author zhangshen* date 2023/10/25 10:39* slogan 编码即学习注释断语义**/
Slf4j
public class ServerOrderHandler extends SimpleChannelInboundHandlerCommonMessage {Overrideprotected void channelRead0(ChannelHandlerContext ctx, CommonMessage msg) throws Exception {// 检查MD5final CommonMessageHeader header msg.getHeader();if (header null) {log.error(【Netty服务器】非法消息);ctx.writeAndFlush(非法消息);ctx.close();ReferenceCountUtil.release(msg);return;}if (header.getSeverId() ! NettyConstant.ORDER_SERVER_ID) {ctx.fireChannelRead(msg);return;}log.info(【Netty服务器】CommonMessage{}, msg);String headMd5 header.getMd5();String calcMd5 EncryptHelper.encryptObj(msg.getBody());if (!headMd5.equals(calcMd5)) {log.error(【Netty服务器】报文md5检查不通过 headMd5 vs calcMd5 关闭连接);CommonMessage message MessageGenerateHelper.fail(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(),报文md5检查不通过关闭连接);ctx.writeAndFlush(message);ctx.close();}log.info(msg.toString());if (header.getType() MessageType.SERVICE_REQ_ONE_WAY.value()) {log.debug(【Netty服务器】ONE_WAY类型消息异步处理);AsyncBusiProcessor.submitTask(() - {log.info(【Netty服务器】模仿异步ONE_WEY业务处理);});} else {log.debug(【Netty服务器】TWO_WAY类型消息应答);OrderInfo orderInfo new OrderInfo();orderInfo.setOrderId(123456);orderInfo.setProductCount(2);orderInfo.setAmount(BigDecimal.valueOf(1499.99));CommonMessage message MessageGenerateHelper.success(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), orderInfo);ctx.writeAndFlush(message);}}
}
3.2.3 helper包工具包
helper包就是utils工具包。我喜欢叫做helper而已。里面只有一个类SecurityCenterHelper。用来实现【白名单】还有【重复登录校验】机制。
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;/*** 登录安全校验助手** author zhangshen* date 2023/10/25 10:14* slogan 编码即学习注释断语义**/
public class SecurityCenterHelper {/*** 用以检查用户是否重复登录的缓存*/private static MapString, Boolean nodeCheck new ConcurrentHashMapString, Boolean();/*** 用户登录的白名单*/private static SetString whiteList new CopyOnWriteArraySet();static {whiteList.add(127.0.0.1);}/*** 是否白名单内*/public static boolean isWhiteIP(String ip) {return whiteList.contains(ip);}/*** 给定用户信息是否重复登录*/public static boolean isRepeatLogin(String usrInfo) {return nodeCheck.containsKey(usrInfo);}/*** 添加登录用户信息*/public static void addLoginUser(String usrInfo) {nodeCheck.put(usrInfo, true);}/*** 移除登录用户信息*/public static void removeLoginUser(String usrInfo) {nodeCheck.remove(usrInfo, true);}}3.2.4 async包异步处理类
里面只有一个类AsyncBusiProcessor用来处理需要【异步】的任务。
import io.netty.util.NettyRuntime;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;/*** 异步业务处理器。某些消息可以异步处理比如ONE_WAY类型消息** author zhangshen* date 2023/10/25 10:56* slogan 编码即学习注释断语义**/
Slf4j
public class AsyncBusiProcessor {private static BlockingQueueRunnable taskQueue new ArrayBlockingQueueRunnable(3000);private static final ExecutorService executorService;static {int cores NettyRuntime.availableProcessors();executorService new ThreadPoolExecutor(1,cores,60,TimeUnit.SECONDS,taskQueue);}/*** 提交异步执行的任务** param task 任务*/public static void submitTask(Runnable task) {executorService.execute(task);}
}3.3 client包下
Client包下跟Server包下的东西其实差不多大家自行理解
3.3.1 ClientInitializer客户端的Handler链化
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.tuling.io.rpc.client.handler.CheckWriteIdleHandler;
import org.tuling.io.rpc.client.handler.ClientHeartBeatHandler;
import org.tuling.io.rpc.client.handler.ClientLoginHandler;
import org.tuling.io.rpc.client.handler.ClientOrderHandler;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.codec.KryoDecodeHandler;
import org.tuling.io.rpc.common.codec.KryoEncodeHandler;/*** 客户端通道初始化器** author zhangshen* date 2023/10/25 9:44* slogan 编码即学习注释断语义**/
public class ClientInitializer extends ChannelInitializerSocketChannel {Overrideprotected void initChannel(SocketChannel ch) throws Exception {final ChannelPipeline pipeline ch.pipeline();// 写空闲自己检测pipeline.addLast(new CheckWriteIdleHandler());// 添加【粘包/分包】处理器。 由Netty预备提供的pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0,2));pipeline.addLast(new LengthFieldPrepender(2));// 添加【序列化/反序列化】处理器开源序列化工具pipeline.addLast(new KryoDecodeHandler());pipeline.addLast(new KryoEncodeHandler());// 添加登录处理器// 登录处理器需放在心跳前面pipeline.addLast(new ClientLoginHandler());// 添加【心跳】处理器Netty预备提供的pipeline.addLast(new ReadTimeoutHandler(NettyConstant.HEARBEAT_FREQUENCY));pipeline.addLast(new ClientHeartBeatHandler());pipeline.addLast(new ClientOrderHandler());}
}3.3.2 handler包下所有的handler
CheckWriteIdleHandler客户端写空闲检测
import io.netty.handler.timeout.IdleStateHandler;/*** 客户端检测自己的写空闲** author zhangshen* date 2023/10/25 11:08* slogan 编码即学习注释断语义**/
public class CheckWriteIdleHandler extends IdleStateHandler {public CheckWriteIdleHandler() {super(0, 8, 0);}
}
ClientHeartBeatHandler客户端心跳处理
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;/*** 客户端在长久未向服务器业务请求时发出心跳请求报文** author zhangshen* date 2023/10/25 11:33* slogan 编码即学习注释断语义**/
Slf4j
public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {CommonMessage request MessageGenerateHelper.request(-1, MessageType.HEARTBEAT_REQ.value(), null);log.debug(【Netty客户端】写空闲发出心跳报文维持连接 request);ctx.writeAndFlush(request);}super.userEventTriggered(ctx, evt);}Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CommonMessage message (CommonMessage) msg;CommonMessageHeader header message.getHeader();if (header ! null header.getType() MessageType.HEARTBEAT_RESP.value()) {log.debug(【Netty客户端】收到服务器心跳应答服务器正常);ReferenceCountUtil.release(msg);} else {ctx.fireChannelRead(msg);}}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof ReadTimeoutException) {log.debug(【Netty客户端】服务器长时间未应答关闭链路);}super.exceptionCaught(ctx, cause);}
}ClientLoginHandler客户端登录请求。TCP三次握手成功之后就请求
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.helper.MessageGenerateHelper;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;
import org.tuling.io.rpc.common.pojo.MessageType;/*** 客户端发起登录请求** author zhangshen* date 2023/10/25 11:11* slogan 编码即学习注释断语义**/
Slf4j
public class ClientLoginHandler extends ChannelInboundHandlerAdapter {public void channelActive(ChannelHandlerContext ctx) throws Exception {// TCP三次握手完成发出认证请求CommonMessage message MessageGenerateHelper.request(NettyConstant.LOGIN_SERVER_ID, MessageType.SERVICE_REQ.value(), null);log.info(【Netty客户端】请求服务器认证 : message);ctx.writeAndFlush(message);}public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CommonMessage message (CommonMessage) msg;CommonMessageHeader header message.getHeader();if (header ! null header.getType() MessageType.SERVICE_RESP.value() header.getSeverId() NettyConstant.LOGIN_SERVER_ID) {log.info(【Netty客户端】收到认证应答报文服务器是否验证通过);byte loginResult message.getResult();if (loginResult ! 1) {// 握手失败关闭连接log.debug(【Netty客户端】未通过认证关闭连接: message);ctx.close();} else {log.info(【Netty客户端】通过认证移除本处理器进入业务通信 : message);ctx.pipeline().remove(this);ReferenceCountUtil.release(msg);}} else {ctx.fireChannelRead(msg);}}
}ClientOrderHandler瞎写的一个业务拓展类目前只是打印。
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.tuling.io.rpc.common.pojo.CommonMessage;/*** author Mark老师* 类说明接收业务应答消息并处理*/
Slf4j
public class ClientOrderHandler extends SimpleChannelInboundHandlerCommonMessage {Overrideprotected void channelRead0(ChannelHandlerContext ctx, CommonMessage msg) throws Exception {log.info(【Netty客户端】业务应答消息 msg.toString());ReferenceCountUtil.release(msg);}
}3.4 common包下一些公用的定义
pojo的我就不贴了看【2.3 消息体定义】
3.4.1 NettyConstant一些公用常量
/*** 常量定义** author zhangshen* date 2023/10/25 9:41* slogan 编码即学习注释断语义**/
public interface NettyConstant {/*** 程序绑定端口*/int PORT 8585;/*** 程序ip地址*/String SERVER_IP 127.0.0.1;/*** 成功*/byte SUCCESS 1;/*** 失败*/byte FAIL 0;/*** 心跳检测频率* 单位秒*/int HEARBEAT_FREQUENCY 15;/*** 登录服务器标识*/int LOGIN_SERVER_ID 1;/*** 订单服务器标识*/int ORDER_SERVER_ID 2;
}3.4.2 helper包工具包
EncryptHelper防篡改的加密摘要
import org.tuling.io.rpc.common.codec.KryoSerializer;import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;/*** 摘要的工具类** author zhangshen* date 2023/10/25 10:47* slogan 编码即学习注释断语义**/
public class EncryptHelper {/*** 加密信息** param strSrc 需要被摘要的字符串* param encName 摘要方式有 MD5、SHA-1和SHA-256 这三种缺省为MD5* return 返回摘要字符串*/private static String EncryptStr(String strSrc, String encName) {MessageDigest md null;String strDes null;byte[] bt strSrc.getBytes();try {if (encName null || encName.equals()) {encName MD5;}md MessageDigest.getInstance(encName);md.update(bt);strDes bytes2Hex(md.digest()); // to HexString} catch (NoSuchAlgorithmException e) {System.out.println(Invalid algorithm.);return null;}return strDes;}/*** MD5摘要** param str 需要被摘要的字符串* return 对字符串str进行MD5摘要后将摘要字符串返回*/public static String EncryptByMD5(String str) {return EncryptStr(str, MD5);}/*** SHA1摘要** param str 需要被摘要的字符串* return 对字符串str进行SHA-1摘要后将摘要字符串返回*/public static String EncryptBySHA1(String str) {return EncryptStr(str, SHA-1);}/*** SHA256摘要** param str 需要被摘要的字符串* return 对字符串str进行SHA-256摘要后将摘要字符串返回*/public static String EncryptBySHA256(String str) {return EncryptStr(str, SHA-256);}/*** 字节转十六进制结果以字符串形式呈现*/private static String bytes2Hex(byte[] bts) {String des ;String tmp null;for (int i 0; i bts.length; i) {tmp (Integer.toHexString(bts[i] 0xFF));if (tmp.length() 1) {des 0;}des tmp;}return des;}/*** 对字符串进行MD5加盐摘要* 先将str进行一次MD5摘要摘要后再取摘要后的字符串的第1、3、5个字符追加到摘要串* 再拿这个摘要串再次进行摘要*/public static String encrypt(String str) {String encryptStr EncryptByMD5(str);if (encryptStr ! null) {encryptStr encryptStr encryptStr.charAt(0) encryptStr.charAt(2) encryptStr.charAt(4);encryptStr EncryptByMD5(encryptStr);}return encryptStr;}/*** 对对象进行MD5摘要先对对象进行序列化转为byte数组* 再将byte数组转为字符串然后进行MD5加盐摘要*/public static String encryptObj(Object o) {return encrypt(bytes2Hex(KryoSerializer.obj2Bytes(o)));}
}
MessageGenerateHelper消息生成工具消除代码重复用的
import org.tuling.io.rpc.common.NettyConstant;
import org.tuling.io.rpc.common.pojo.CommonMessage;
import org.tuling.io.rpc.common.pojo.CommonMessageHeader;import java.util.concurrent.atomic.AtomicLong;/*** 消息生成助手** author zhangshen* date 2023/10/25 11:21* slogan 编码即学习注释断语义**/
public class MessageGenerateHelper {private static AtomicLong msgId new AtomicLong(1);public static long getID() {return msgId.getAndIncrement();}/*** 构建成功的业务消息*/public static CommonMessage success(int serverId, byte type, Object msg) {CommonMessage message new CommonMessage();CommonMessageHeader header getHeader(serverId, type);message.setHeader(header);message.setResult(NettyConstant.SUCCESS);message.setBody(msg);return message;}/*** 构建失败的业务消息*/public static CommonMessage fail(int serverId, byte type, Object msg) {CommonMessage message new CommonMessage();CommonMessageHeader header getHeader(serverId, type);message.setHeader(header);message.setResult(NettyConstant.FAIL);message.setBody(msg);return message;}/*** 构建请求业务消息*/public static CommonMessage request(int serverId, byte type, Object msg) {CommonMessage message new CommonMessage();CommonMessageHeader header getHeader(serverId, type);message.setHeader(header);message.setBody(msg);return message;}/*** 构建请求业务消息*/public static CommonMessage requestWithMsgId(int serverId, byte type, Object msg) {CommonMessage message new CommonMessage();CommonMessageHeader header getHeader(serverId, type);header.setMsgID(getID());header.setMd5(EncryptHelper.encryptObj(msg));message.setHeader(header);message.setBody(msg);return message;}private static CommonMessageHeader getHeader(int serverId, byte type) {CommonMessageHeader header new CommonMessageHeader();header.setSeverId(serverId);header.setType(type);return header;}
}3.4.3 codec包编解码反序列化工具
这里面的是一个基于Kryo编解码序列化API实现的。pom.xml如下 dependencygroupIdde.javakaffee/groupIdartifactIdkryo-serializers/artifactIdversion0.42/version/dependency当然大家可以使用其他的API我这里是抄的。
KryoFactoryKryo实例API要求的
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import de.javakaffee.kryoserializers.*;import java.lang.reflect.InvocationHandler;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;/*** Kryo的工厂,拿到Kryo的实例** author zhanghuitong* date 2023/10/25 20:12* slogan 编码即学习注释断语义**/
public class KryoFactory {public static Kryo createKryo() {Kryo kryo new Kryo();kryo.setRegistrationRequired(false);kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer());kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());kryo.register(InvocationHandler.class, new JdkProxySerializer());kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());kryo.register(Pattern.class, new RegexSerializer());kryo.register(BitSet.class, new BitSetSerializer());kryo.register(URI.class, new URISerializer());kryo.register(UUID.class, new UUIDSerializer());UnmodifiableCollectionsSerializer.registerSerializers(kryo);SynchronizedCollectionsSerializer.registerSerializers(kryo);kryo.register(HashMap.class);kryo.register(ArrayList.class);kryo.register(LinkedList.class);kryo.register(HashSet.class);kryo.register(TreeSet.class);kryo.register(Hashtable.class);kryo.register(Date.class);kryo.register(Calendar.class);kryo.register(ConcurrentHashMap.class);kryo.register(SimpleDateFormat.class);kryo.register(GregorianCalendar.class);kryo.register(Vector.class);kryo.register(BitSet.class);kryo.register(StringBuffer.class);kryo.register(StringBuilder.class);kryo.register(Object.class);kryo.register(Object[].class);kryo.register(String[].class);kryo.register(byte[].class);kryo.register(char[].class);kryo.register(int[].class);kryo.register(float[].class);kryo.register(double[].class);return kryo;}
}KryoSerializer序列化工具
/*** Kryo的序列化器负责序列化和反序列化** author zhanghuitong* date 2023/10/25 20:12* slogan 编码即学习注释断语义**/
public class KryoSerializer {private static Kryo kryo KryoFactory.createKryo();/*序列化*/public static void serialize(Object object, ByteBuf out) {ByteArrayOutputStream baos new ByteArrayOutputStream();Output output new Output(baos);kryo.writeClassAndObject(output, object);output.flush();output.close();byte[] b baos.toByteArray();try {baos.flush();baos.close();} catch (IOException e) {e.printStackTrace();}out.writeBytes(b);}/*序列化为一个字节数组主要用在消息摘要上*/public static byte[] obj2Bytes(Object object) {ByteArrayOutputStream baos new ByteArrayOutputStream();Output output new Output(baos);kryo.writeClassAndObject(output, object);output.flush();output.close();byte[] b baos.toByteArray();try {baos.flush();baos.close();} catch (IOException e) {e.printStackTrace();}return b;}/*反序列化*/public static Object deserialize(ByteBuf out) {if (out null) {return null;}Input input new Input(new ByteBufInputStream(out));return kryo.readClassAndObject(input);}
}KryoEncodeHandler编码处理器实现了NettyMessageToByteEncoder接口的Handler
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.tuling.io.rpc.common.pojo.CommonMessage;/*** 序列化的Handler** author zhangshen* date 2023/10/25 9:54* slogan 编码即学习注释断语义**/
public class KryoEncodeHandler extends MessageToByteEncoderCommonMessage {Overrideprotected void encode(ChannelHandlerContext ctx, CommonMessage message, ByteBuf out) throws Exception {KryoSerializer.serialize(message, out);ctx.flush();}
}KryoDecodeHandler解码处理器实现了NettyMessageToByteEncoder接口的Handler
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;/*** 反序列化的Handler** author zhangshen* date 2023/10/25 9:54* slogan 编码即学习注释断语义**/
public class KryoDecodeHandler extends ByteToMessageDecoder {Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception {Object obj KryoSerializer.deserialize(in);out.add(obj);}
}3.4 biz包下业务模拟
下面只有一个类OrderInfo
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;import java.math.BigDecimal;/*** 订单信息** author zhangshen* date 2023/10/25 10:44* slogan 编码即学习注释断语义**/
Getter
Setter
ToString
public class OrderInfo {private String orderId;private Integer productCount;private BigDecimal amount;
}
四、业务流程图 学习总结
使用Netty写了一个简单的通信示例
感谢
感谢【百度文心一言】