郑州网站建设选微锐,网页免费制作网站,之江汇学校网站建设,网站建设关键字优化文章目录说明分享内置编码器和解码器解码器编码器代码实现创建核心类消息实体类自定义编码类自定义解码类服务端ServerHandler入口类客户端ClientHandler入口类测试参考总结说明 netty是java重要的企业级NIO#xff0c;使用它可以快速实现很多功能通信功能如#xff1a;http、…
文章目录说明分享内置编码器和解码器解码器编码器代码实现创建核心类消息实体类自定义编码类自定义解码类服务端ServerHandler入口类客户端ClientHandler入口类测试参考总结说明 netty是java重要的企业级NIO使用它可以快速实现很多功能通信功能如http、ftp、socket、websocket、udp等。 本站使用自定义网包实现网络通信。 分享
大数据博客列表开发记录汇总个人java工具库 项目https://gitee.com/wangzonghui/object-tool 包含json、string、集合、excel、zip压缩、pdf、bytes、http等多种工具欢迎使用。
内置编码器和解码器
解码器
名称说明ByteToMessageDecoder如果想实现自己的半包解码器实现该类MessageToMessageDecoder一般作为二次解码器当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 对象的时候我们可能还需要将这个对象进行二次解码成其他对象我们就可以继承这个类LineBasedFrameDecoder通过在包尾添加回车换行符 \r\n 来区分整包消息StringDecoder字符串解码器DelimiterBasedFrameDecoder特殊字符作为分隔符来区分整包消息FixedLengthFrameDecoder报文大小固定长度不够空格补全ProtoBufVarint32FrameDecoder通过 Protobuf 解码器来区分整包消息ProtobufDecoderProtobuf 解码器LengthFieldBasedFrameDecoder指定长度来标识整包消息通过在包头指定整包长度来约定包长。
编码器
名称说明ProtobufEncoderProtobuf 编码器MessageToByteEncoder将 Java 对象编码成 ByteBufMessageToMessageEncoder如果不想将 Java 对象编码成 ByteBuf而是自定义类就继承这个LengthFieldPrependerLengthFieldPrepender 是一个非常实用的工具类如果我们在发送消息的时候采用的是消息长度字段原始消息的形式那么我们就可以使用 LengthFieldPrepender。这是因为 LengthFieldPrepender 可以将待发送消息的长度二进制字节长度写到 ByteBuf 的前两个字节。
代码实现
创建核心类
消息实体类
public class MyMessage {private int len;//发送内容的长度private byte[] content;//发送的内容public int getLen() {return len;}public void setLen(int len) {this.len len;}public byte[] getContent() {return content;}public void setContent(byte[] content) {this.content content;}
}自定义编码类
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;public class MyMessageEncoder extends MessageToByteEncoderMyMessage {Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, MyMessage myMessage, ByteBuf byteBuf) throws Exception {byteBuf.writeInt(myMessage.getLen());byteBuf.writeBytes(myMessage.getContent());}}自定义解码类
import java.util.List;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;public class MyMessageDecoder extends ByteToMessageDecoder {int length0;Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, ListObject list) throws Exception {//将二进制字节码转为对象if(byteBuf.readableBytes()4){if(length0){lengthbyteBuf.readInt();}if(byteBuf.readableBytes()length){// System.out.println(可读数据不够继续等待);return;}byte[] contentnew byte[length];byteBuf.readBytes(content);MyMessage messagenew MyMessage();message.setLen(length);message.setContent(content);list.add(message);//传递给下一个handlerlength0;}}}
服务端
ServerHandler
import com.netty.cn.rpc.selfmessage.core.MyMessage;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;public class MyServerHandler extends SimpleChannelInboundHandlerMyMessage {private int count;/*** 读取客户端的数据* throws Exception*/Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessage myMessage) throws Exception {System.out.println(服务端收到消息);System.out.println(长度myMessage.getLen());System.out.println(内容: new String(myMessage.getContent(),CharsetUtil.UTF_8));System.out.println(收到消息数量(count));String msg服务端收到请求;MyMessage messagenew MyMessage();message.setContent(msg.getBytes(CharsetUtil.UTF_8));message.setLen(msg.getBytes(CharsetUtil.UTF_8).length);ctx.writeAndFlush(message);}Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {super.channelReadComplete(ctx);// 客户端连接进入 FIN_WAIT1 状态// ctx.channel().close();}
}入口类
import com.netty.cn.rpc.selfmessage.core.MyMessageDecoder;
import com.netty.cn.rpc.selfmessage.core.MyMessageEncoder;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class MyServer {public static void main(String[] args) {int port8080;EventLoopGroup bossGroupnew NioEventLoopGroup(1);//处理连接请求EventLoopGroup workerGroupnew NioEventLoopGroup();//默认线程数量为cpu核数的两倍处理业务try {ServerBootstrap bootstrapnew ServerBootstrap();//创建服务器端的启动对象bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,port).childHandler(new ChannelInitializerSocketChannel() {protected void initChannel(SocketChannel socketChannel) {ChannelPipeline channelPipelinesocketChannel.pipeline();channelPipeline.addLast(new MyMessageDecoder());//加解码器channelPipeline.addLast(new MyMessageEncoder());channelPipeline.addLast(new MyServerHandler());}});System.out.println(netty server start);//启动服务器绑定端口bind是异步操作sync是等待ChannelFuture cfbootstrap.bind(port).sync();cf.channel().closeFuture().sync();}catch(Exception e){
// log.error(e.toString(),e);System.out.println(e.toString());}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}客户端
ClientHandler
import com.netty.cn.rpc.selfmessage.core.MyMessage;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;public class MyClientHandler extends SimpleChannelInboundHandlerMyMessage {/*** 当客户端连接到服务端是触发* param ctx* throws Exception*/Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(连接服务端 ctx.channel().remoteAddress() 成功);String msg你好我是张asdfasdfsadfwerwerwerwerewrewrewrewr三。;for (int i0;i20;i){MyMessage messagenew MyMessage();message.setContent(msg.getBytes(CharsetUtil.UTF_8));message.setLen(msg.getBytes(CharsetUtil.UTF_8).length);ctx.writeAndFlush(message);}}Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, MyMessage myMessage) throws Exception {System.out.println(client 接收到信息:new String(myMessage.getContent()).toString());}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}入口类
import com.netty.cn.rpc.selfmessage.core.MyMessageDecoder;
import com.netty.cn.rpc.selfmessage.core.MyMessageEncoder;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class MyClient {public static void main(String[] args) {int port8080;EventLoopGroup groupnew NioEventLoopGroup();try {Bootstrap bootstrapnew Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializerSocketChannel() {protected void initChannel(SocketChannel socketChannel) {ChannelPipeline channelPipelinesocketChannel.pipeline();channelPipeline.addLast(new MyMessageDecoder());//加解码器channelPipeline.addLast(new MyMessageEncoder());channelPipeline.addLast(new MyClientHandler());}});//System.out.println(netty client start);//启动客户端连接服务器ChannelFuture cf bootstrap.connect(127.0.0.1,port).sync();//关闭通道进行监听cf.channel().closeFuture().sync();System.out.println(启动客户端port);} catch(Exception e){
// log.error(e.toString(),e);System.out.println(e.toString());}finally {group.shutdownGracefully();}}
}测试
先启动 MyServer再启动 MyClient可以看到控制台打印如下内容Server
netty server start
服务端收到消息
长度60
内容: 你好我是张asdfasdfsadfwerwerwerwerewrewrewrewr三。
收到消息数量1Client
连接服务端 /127.0.0.1:8080 成功
client 接收到信息:服务端收到请求参考
博客
总结
该方式定义了数据传输结构传输过程中由编码器ByteBuf 完成数据处理。由于内容是二进制格式可以存储数据如json字符串、protobuf二次处理后数据提升了数据传输灵活性。