永久短网址生成,博客关键词优化,我国中小企业名单,房产中介公司网站源码本笔记是看了黑马的Netty进行总结的。想要更详细的可以去看视频 学习netty之前要先打好NIO的基础#xff0c;可以先去看我的另一篇文章
一、概述 不想看的可以直接跳过 Netty 的地位
Netty 在 Java 网络应用框架中的地位就好比#xff1a;Spring 框架在 JavaEE 开发中的地位… 本笔记是看了黑马的Netty进行总结的。想要更详细的可以去看视频 学习netty之前要先打好NIO的基础可以先去看我的另一篇文章
一、概述 不想看的可以直接跳过 Netty 的地位
Netty 在 Java 网络应用框架中的地位就好比Spring 框架在 JavaEE 开发中的地位
以下的框架都使用了 Netty因为它们有网络通信需求 Cassandra - nosql 数据库 Spark - 大数据分布式计算框架 Hadoop - 大数据分布式存储框架 RocketMQ - ali 开源的消息队列 ElasticSearch - 搜索引擎 gRPC - rpc 框架 Dubbo - rpc 框架 Spring 5.x - flux api 完全抛弃了 tomcat 使用 netty 作为服务器端 Zookeeper - 分布式协调框架
Netty 的优势 Netty vs NIO工作量大bug 多 需要自己构建协议 解决 TCP 传输问题如粘包、半包 epoll 空轮询导致 CPU 100% 对 API 进行增强使之更易用如 FastThreadLocal ThreadLocalByteBuf ByteBuffer Netty vs 其它网络应用框架 Mina 由 apache 维护将来 3.x 版本可能会有较大重构破坏 API 向下兼容性Netty 的开发迭代更迅速API 更简洁、文档更优秀 久经考验16年Netty 版本 2.x 2004 3.x 2008 4.x 2013 5.x 已废弃没有明显的性能提升维护成本高 二、入门案例 首次看netty的代码会比较乱不要慌多看看多学学就会很熟悉的。 最重要的是要理解每一步的作用 开发一个简单的服务器端和客户端 客户端向服务器端发送 hello, world 服务器仅接收不返回
依赖
dependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactIdversion4.1.39.Final/version
/dependency
服务端
new ServerBootstrap().group(new NioEventLoopGroup()) // 1.channel(NioServerSocketChannel.class) // 2.childHandler(new ChannelInitializerNioSocketChannel() { // 3protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new StringDecoder()); // 5ch.pipeline().addLast(new SimpleChannelInboundHandlerString() { // 6Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {System.out.println(msg);}});}}).bind(8080); // 4 1 处创建 NioEventLoopGroup可以简单理解为 线程池 Selector 后面会详细展开 2 处选择服务 Scoket 实现类其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现其它实现还有 3 处为啥方法叫 childHandler是接下来添加的处理器都是给 SocketChannel 用的而不是给 ServerSocketChannel。ChannelInitializer 处理器仅执行一次它的作用是待客户端 SocketChannel 建立连接后执行 initChannel 以便添加更多的处理器 4 处ServerSocketChannel 绑定的监听端口 5 处SocketChannel 的处理器解码 ByteBuf String 6 处SocketChannel 的业务处理器使用上一个处理器的处理结果
客户端
new Bootstrap().group(new NioEventLoopGroup()) // 1.channel(NioSocketChannel.class) // 2.handler(new ChannelInitializerChannel() { // 3Overrideprotected void initChannel(Channel ch) {ch.pipeline().addLast(new StringEncoder()); // 8}}).connect(127.0.0.1, 8080) // 4.sync() // 5.channel() // 6.writeAndFlush(new Date() : hello world!); // 7 1 处创建 NioEventLoopGroup同 Server 2 处选择客户 Socket 实现类NioSocketChannel 表示基于 NIO 的客户端实现其它实现还有 3 处添加 SocketChannel 的处理器ChannelInitializer 处理器仅执行一次它的作用是待客户端 SocketChannel 建立连接后执行 initChannel 以便添加更多的处理器 4 处指定要连接的服务器和端口 5 处Netty 中很多方法都是异步的如 connect这时需要使用 sync 方法等待 connect 建立连接完毕 6 处获取 channel 对象它即为通道抽象可以进行数据读写操作 7 处写入消息并清空缓冲区 8 处消息会经过通道 handler 处理这里是将 String ByteBuf 发出 数据经过网络传输到达服务器端服务器端 5 和 6 处的 handler 先后被触发走完一个流程 流程分析 其实黑马使用链式编程对初学者来说并不友好我下面对代码进行拆分
乍一看怎么全是ServerBootstrap其实这些操作都是围绕着这个类在转的。
下一章组件 将会对每一个组件进行详细的分析到时候就没有这么乱了
可以说明的是NioEventLoopGroup 是一个EventLoop的集合EventLoop相当于NIO里面处理读写时间的工作者都可以单开线程的。类似于NIO里面的多线程优化。 ServerBootstrap serverBootstrap new ServerBootstrap();//得到启动类NioEventLoopGroup group new NioEventLoopGroup();// EventLoop的集合ServerBootstrap serverBootstrap1 serverBootstrap.group(group);// 将EventLoopGroup交给启动类ServerBootstrap serverBootstrap2 serverBootstrap1.channel(NioServerSocketChannel.class);// 指定channel的类型ServerBootstrap serverBootstrap3 serverBootstrap2.childHandler(new ChannelInitializerNioSocketChannel() {// 设置子通道Channel的处理器ChannelHandler的protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new StringDecoder()); // 添加处理器ch.pipeline().addLast(new SimpleChannelInboundHandlerString() { // 添加自定义处理器Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {System.out.println(msg);}});}});ChannelFuture channelFuture serverBootstrap1.bind(8080);// 绑定端口 三、组件
1、EventLoop 我在前面提过这就相当于处理读写事件的工作者。维护着自己的Selector。 我们之前提到过一个Selector能监听多个channel。一个服务端有多个Selector 事件循环对象
EventLoop 本质是一个单线程执行器同时维护了一个 Selector里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法这也就让他有处理定时任务的能力 另一条线是继承自 netty 自己的 OrderedEventExecutor 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop 提供了 parent 方法来看看自己属于哪个 EventLoopGroup 事件循环组
EventLoopGroup 是一组 EventLoopChannel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop后续这个 Channel 上的 io 事件都由此 EventLoop 来处理保证了 io 事件处理时的线程安全 继承自 netty 自己的 EventExecutorGroup 实现了 Iterable 接口提供遍历 EventLoop 的能力 另有 next 方法获取集合中下一个 EventLoop 可以自己指定group的大小但是没有必要
// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
DefaultEventLoopGroup group new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
结果
io.netty.channel.DefaultEventLoop60f82f98
io.netty.channel.DefaultEventLoop35f983a6
io.netty.channel.DefaultEventLoop60f82f98 优雅关闭
优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入然后在任务队列的任务都处理完成后停止线程的运行。从而确保整体应用是在正常有序的状态下退出的 演示 NioEventLoop 处理 io 事件 下面主要演示的是 工人是轮流工作的但是对于同一个channel多次来进行读写为他服务的是同一个工作也就是EventLoop 服务器端两个 nio worker 工人
new ServerBootstrap().group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf msg instanceof ByteBuf ? ((ByteBuf) msg) : null;if (byteBuf ! null) {byte[] buf new byte[16];ByteBuf len byteBuf.readBytes(buf, 0, byteBuf.readableBytes());log.debug(new String(buf));}}});}}).bind(8080).sync();
客户端启动三次分别修改发送字符串为 zhangsan第一次lisi第二次wangwu第三次
public static void main(String[] args) throws InterruptedException {Channel channel new Bootstrap().group(new NioEventLoopGroup(1)).handler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {System.out.println(init...);ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}}).channel(NioSocketChannel.class).connect(localhost, 8080).sync().channel();channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(wangwu.getBytes()));Thread.sleep(2000);channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(wangwu.getBytes()));
最后输出
22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan
22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi
22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
可以看到两个工人轮流处理 channel但工人与 channel 之间进行了绑定 2、channelFuture 3、future和promise 4、bytebuf 工具类用于调试 展示bytebuf的内容
private static void log(ByteBuf buffer) {int length buffer.readableBytes();int rows length / 16 (length % 15 0 ? 0 : 1) 4;StringBuilder buf new StringBuilder(rows * 80 * 2).append(read index:).append(buffer.readerIndex()).append( write index:).append(buffer.writerIndex()).append( capacity:).append(buffer.capacity()).append(NEWLINE);appendPrettyHexDump(buf, buffer);System.out.println(buf.toString());
} 1、内存模式和池化
堆内存vs直接内存 堆内存 分配效率高但读写效率低。直接内存反之。
使用ByteBuf buffer ByteBufAllocator.DEFAULT.buffer(); 默认方式创建出来的是使用的直接内存 可以使用下面的代码来创建池化基于堆的 ByteBuf
ByteBuf buffer ByteBufAllocator.DEFAULT.heapBuffer(10);
也可以使用下面的代码来创建池化基于直接内存的 ByteBuf
ByteBuf buffer ByteBufAllocator.DEFAULT.directBuffer(10); 直接内存创建和销毁的代价昂贵但读写性能高少一次内存复制适合配合池化功能一起用 直接内存对 GC 压力小因为这部分内存不受 JVM 垃圾回收的管理但也要注意及时主动释放
池化 vs 非池化
池化的最大意义在于可以重用 ByteBuf优点有 没有池化则每次都得创建新的 ByteBuf 实例这个操作对直接内存代价昂贵就算是堆内存也会增加 GC 压力 有了池化则可以重用池中 ByteBuf 实例并且采用了与 jemalloc 类似的内存分配算法提升分配效率 高并发时池化功能更节约内存减少内存溢出的可能
池化功能是否开启可以通过下面的系统环境变量来设置
-Dio.netty.allocator.type{unpooled|pooled} 4.1 以后非 Android 平台默认启用池化实现Android 平台启用非池化实现 4.1 之前池化功能还不成熟默认是非池化实现 2、组成、写入、读取 相比于ByteBuffer的优点是①可扩容 ②使用读写指针不用切换逻辑更简单 读过的内存就废除 。
如果没有指定初始大小默认是256个字节
扩容
再写入一个 int 整数时容量不够了初始容量是 10这时会引发扩容
buffer.writeInt(6);
log(buffer);
扩容规则是 如何写入后数据大小未超过 512则选择下一个 16 的整数倍例如写入后大小为 12 则扩容后 capacity 是 16 如果写入后数据大小超过 512则选择下一个 2^n例如写入后大小为 513则扩容后 capacity 是 2^1010242^9512 已经不够了 扩容不能超过 max capacity 会报错 使用mark配合reset实现读取多次其实就是做个标记然后跳回标记的地方。 public static void main(String[] args) throws ExecutionException, InterruptedException {ByteBuf buffer ByteBufAllocator.DEFAULT.buffer(10);buffer.writeBytes(new byte[]{1,2,3,4,5});log(buffer);buffer.markReaderIndex();System.out.println(buffer.readByte());buffer.resetReaderIndex();System.out.println(buffer.readByte());log(buffer);} 3、slice 和composite
【零拷贝】的体现之一对原始 ByteBuf 进行切片成多个 ByteBuf切片后的 ByteBuf 并没有发生内存复制还是使用原始 ByteBuf 的内存切片后的 ByteBuf 维护独立的 readwrite 指针
例原始 ByteBuf 进行一些初始操作
ByteBuf origin ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));
输出 -------------------------------------------------| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
-------------------------------------------------------------------------
|00000000| 02 03 04 |... |
------------------------------------------------------------------------- 切片的正确使用
就是每个切片要执行retain防止内存被释放。等到自己用完之后再执行release composite合并 public static void main(String[] args) {ByteBuf buffer1 ByteBufAllocator.DEFAULT.buffer();buffer1.writeBytes(new byte[]{1,2,3});ByteBuf buffer2 ByteBufAllocator.DEFAULT.buffer();buffer2.writeBytes(new byte[]{4,5,6});CompositeByteBuf compositeBuffer ByteBufAllocator.DEFAULT.compositeBuffer();compositeBuffer.addComponents(true,buffer1,buffer2);log(compositeBuffer);} 5、实现一个简单的回响功能 客户端发送什么服务端就回复什么 这是 服务端的代码
package cn.itcast.mytest.homework;import com.sun.corba.se.internal.CosNaming.BootstrapServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;Slf4j
//回声服务器 返回客户端发送的消息
public class Server {public static void main(String[] args) throws InterruptedException {log.debug(启动中。。。);new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringDecoder());nioSocketChannel.pipeline().addLast(new StringEncoder());nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug(收到消息{},msg);nioSocketChannel.writeAndFlush(msg);}});nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter(){Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug(发送消息{},msg);super.write(ctx, msg, promise);}});}}).bind(8080);}
}这是客户端 package cn.itcast.mytest.homework;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.java.Log;
import lombok.extern.slf4j.Slf4j;import java.util.Scanner;Slf4j
public class Client {public static void main(String[] args) throws InterruptedException {ChannelFuture channelFuture new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new StringEncoder());nioSocketChannel.pipeline().addLast(new StringDecoder());nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug(收到消息{},msg);}});}}).connect(localhost,8080).sync();Scanner scnew Scanner(System.in);while(true){System.out.println(请输入要发送的消息(输入q退出));String s sc.nextLine();if(!q.equals(s)){channelFuture.channel().writeAndFlush(s);}else{break;}}log.debug(结束对话);}
}四、黏包和半包
1、黏包现象 所谓黏包现象就是发送方在短时间内发送多条数据接收方无法准确分辨出每个独立数据包的边界。这种情况常见于基于流的传输协议如TCP因为TCP是面向字节流的协议数据在网络中以流的形式发送而不是分包的形式。 就是多个数据包黏在一起了 channel建立成功之后会触发active事件。 案例 客户端连续发送10次16字节的数据服务方接收数据之后打印出来我们会发现服务端试将这10条数据当成1条数据了。
接收方服务端 Slf4j
public class HelloWorldServer {static final Logger log LoggerFactory.getLogger(HelloWorldServer.class);void start() {NioEventLoopGroup boss new NioEventLoopGroup(1);NioEventLoopGroup worker new NioEventLoopGroup();try {ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug(connected {}, ctx.channel());super.channelActive(ctx);}Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug(disconnect {}, ctx.channel());super.channelInactive(ctx);}});}});ChannelFuture channelFuture serverBootstrap.bind(8080);log.debug({} binding..., channelFuture.channel());channelFuture.sync();log.debug({} bound..., channelFuture.channel());channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(server error, e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug(stoped);}}public static void main(String[] args) {new HelloWorldServer().start();}
}
发送方客户端
Slf4j
public class HelloWorldClient {static final Logger log LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug(connetted...);ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug(sending...);Random r new Random();char c a;for (int i 0; i 10; i) {ByteBuf buffer ctx.alloc().buffer();buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buffer);}}});}});ChannelFuture channelFuture bootstrap.connect(127.0.0.1, 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(client error, e);} finally {worker.shutdownGracefully();}}
} 2、半包现象 半包现象指的是在网络通信中一个逻辑上的数据包被拆分成多个部分进行接收。这种现象通常发生在基于流的协议如 TCP中由于 TCP 是面向字节流的协议数据在传输时并不会被强制分包而是以流的方式发送和接收。 案例 客户端一次发送1600字节的数据但是服务端一次只能接受1024要将一份数据分成两份出现了半包的现象。 Slf4j
public class HelloWorldServer {static final Logger log LoggerFactory.getLogger(HelloWorldServer.class);void start() {NioEventLoopGroup boss new NioEventLoopGroup(1);NioEventLoopGroup worker new NioEventLoopGroup();try {ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);serverBootstrap.childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug(connected {}, ctx.channel());super.channelActive(ctx);}Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.debug(disconnect {}, ctx.channel());super.channelInactive(ctx);}});}});ChannelFuture channelFuture serverBootstrap.bind(8080);log.debug({} binding..., channelFuture.channel());channelFuture.sync();log.debug({} bound..., channelFuture.channel());channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(server error, e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();log.debug(stoped);}}public static void main(String[] args) {new HelloWorldServer().start();}
} Slf4j
public class HelloWorldClient {static final Logger log LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug(connetted...);ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug(sending...);ByteBuf buffer ctx.alloc().buffer();for (int i 0; i 100; i) {buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture bootstrap.connect(127.0.0.1, 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(client error, e);} finally {worker.shutdownGracefully();}}
} 出现原因
本质是因为 TCP 是流式协议消息无边界
滑动窗口 TCP 以一个段segment为单位每发送一个段就需要进行一次确认应答ack处理但如果这么做缺点是包的往返时间越长性能就越差
为了解决此问题引入了窗口概念窗口大小即决定了无需等待应答而可以继续发送的数据最大值 窗口实际就起到一个缓冲区的作用同时也能起到流量控制的作用 图中深色的部分即要发送的数据高亮的部分即窗口 窗口内的数据才允许被发送当应答未到达前窗口必须停止滑动 如果 1001~2000 这个段的数据 ack 回来了窗口就可以向前滑动 接收方也会维护一个窗口只有落在窗口内的数据才能允许接收 3、解决方案
1短连接 每一次接收完就断开连接分10次发送 public class HelloWorldClient {static final Logger log LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {for (int i 0; i 10; i) {send();}}private static void send() {NioEventLoopGroup worker new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug(connetted...);ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug(sending...);ByteBuf buffer ctx.alloc().buffer();buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buffer);ctx.close();}});}});ChannelFuture channelFuture bootstrap.connect(127.0.0.1, 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(client error, e);} finally {worker.shutdownGracefully();}}
} 半包用这种办法还是不好解决因为接收方的缓冲区大小是有限的 2定长解码器 本质就是发送一系列消息以最长的那个消息为固定值。然后将发送的消息都以这个固定的长度为准如果长度不够就进行填充。 前提是服务端和客户端要约定好长度。 在服务端代码中要加上 将收到的数据进行解码
ch.pipeline().addLast(new FixedLengthFrameDecoder(8)); 客户端代码
Slf4j
public class HelloWorldClient {static final Logger log LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug(connetted...);ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug(sending...);// 发送内容随机的数据包Random r new Random();char c a;ByteBuf buffer ctx.alloc().buffer();for (int i 0; i 10; i) {byte[] bytes new byte[8];for (int j 0; j r.nextInt(8); j) {bytes[j] (byte) c;}c;buffer.writeBytes(bytes);}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture bootstrap.connect(localhost, 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(client error, e);} finally {worker.shutdownGracefully();}}
} 服务端接收定长的数据 3行解码器 消息之间以换行符作为分隔符 LineBasedFrameDecoder 参数是指定最大长度要是超过这个最大长度还没找到分隔符就报错 服务端加上
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
客户端代码
package cn.itcast.mytest;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Random;
Slf4j
public class HelloWorldClient {static final Logger log LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug(connetted...);ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug(sending...);// 发送内容随机的数据包ByteBuf buffer ctx.alloc().buffer();char c0;Random rnew Random();StringBuilder sbnew StringBuilder();for (int i 0; i 10; i) {sb.setLength(0);for(int j0;jr.nextInt(256);j){sb.append(c);}sb.append(\n);buffer.writeBytes(sb.toString().getBytes());c;}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture bootstrap.connect(localhost, 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(client error, e);} finally {worker.shutdownGracefully();}}} 这种方法性能低用得比较少。 4LTC解码器 lengthFieldOffset 长度字段偏移量 lengthFieldLength 长度字段 的长度 lengthAdjustment 长度字段跳到正式内容的偏移值 initialBytesToStrip 消息剖离 例子1:
长度字段偏移量是0因为一开始就是长度字段 0 长度字段占两个字节 2 长度字段之后跳0字节就是正式内容 0 消息不剖离 0 例子2
消息头可能携带其他信息比如版本号、协议。比如下面的hdr1、hdr2
长度字段偏移量是1 从开始到长度字段有1个字节 1 长度字段占两个字节 2 长度字段之后跳0字节就是正式内容 hdr2占1个字节 1 消息不剖离 3 从头到第3个字节的部分不要。 服务端加上 第一个参数是消息的最大长度
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,0));
客户端代码
public class HelloWorldClient {static final Logger log LoggerFactory.getLogger(HelloWorldClient.class);public static void main(String[] args) {NioEventLoopGroup worker new NioEventLoopGroup();try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug(connetted...);ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug(sending...);// 发送内容随机的数据包ByteBuf buffer ctx.alloc().buffer();String contenthello world;//将内容加入到缓冲区buffer.writeInt(content.length());//先写入长度buffer.writeBytes(content.getBytes());String content2hi;//将内容加入到缓冲区buffer.writeInt(content2.length());//先写入长度buffer.writeBytes(content2.getBytes());ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture bootstrap.connect(localhost, 8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(client error, e);} finally {worker.shutdownGracefully();}}} 注意如果buffer中的消息长度不一致的时候参数要以最长的消息制定
假如我将上面代码的长度字段改成2也就是意味着长度字段所占的字节数只有2 hello world消息中 长度字段是00 00 00 0b这样他只读取到00消息长度才0 第二次读取到 00 0b 才会去真正读hello world 这是错误的 int 占4字节 五、协议设计与解析 1、redis
其实我们可以用过netty发送消息给redis比如set name zhangsan 解析出来就是 下面这图 使用下面的代码可以向redis发送消息并得到响应
Slf4j
public class testRedis {public static void main(String[] args) {NioEventLoopGroup worker new NioEventLoopGroup();byte[] LINE {13, 10};try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后会触发 active 事件Overridepublic void channelActive(ChannelHandlerContext ctx) {set(ctx);get(ctx);}private void get(ChannelHandlerContext ctx) {ByteBuf buf ctx.alloc().buffer();buf.writeBytes(*2.getBytes());buf.writeBytes(LINE);buf.writeBytes($3.getBytes());buf.writeBytes(LINE);buf.writeBytes(get.getBytes());buf.writeBytes(LINE);buf.writeBytes($3.getBytes());buf.writeBytes(LINE);buf.writeBytes(aaa.getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}private void set(ChannelHandlerContext ctx) {ByteBuf buf ctx.alloc().buffer();buf.writeBytes(*3.getBytes());buf.writeBytes(LINE);buf.writeBytes($3.getBytes());buf.writeBytes(LINE);buf.writeBytes(set.getBytes());buf.writeBytes(LINE);buf.writeBytes($3.getBytes());buf.writeBytes(LINE);buf.writeBytes(aaa.getBytes());buf.writeBytes(LINE);buf.writeBytes($3.getBytes());buf.writeBytes(LINE);buf.writeBytes(bbb.getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf (ByteBuf) msg;System.out.println(buf.toString(Charset.defaultCharset()));}});}});ChannelFuture channelFuture bootstrap.connect(localhost, 6379).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error(client error, e);} finally {worker.shutdownGracefully();}}
}2、http http协议相对来说比较复杂好在netty已经帮我们封装了编解码器 我们只需要加上ch.pipeline().addLast(new HttpServerCodec());即可 首先我们先打印出消息的类型 浏览器访问http://localhost:8080/index.html之后index.html可以换成其他的 11:29:19 [DEBUG] [nioEventLoopGroup-2-4] c.i.m.NettyServer - class io.netty.handler.codec.http.DefaultHttpRequest 11:29:19 [DEBUG] [nioEventLoopGroup-2-4] c.i.m.NettyServer - class io.netty.handler.codec.http.LastHttpContent$1
我们会发现消息有两个一个是请求头、请求行 另一个是请求体。这是http编解码器解析的
我们可以使用 if (msg instanceof HttpRequest) { // 请求行请求头 } else if (msg instanceof HttpContent) { //请求体 }
进行判断。 或者我们可以使用 SimpleChannelInboundHandler只处理我们关心的消息类型 public static void main(String[] args) throws InterruptedException {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new HttpServerCodec());ch.pipeline().addLast(new SimpleChannelInboundHandlerHttpRequest() {Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {log.debug({},msg.uri());DefaultFullHttpResponse responsenew DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);byte[] mh1Hello World/h1.getBytes();response.content().writeBytes(m);response.headers().setInt(CONTENT_LENGTH,m.length);//消息头写入 正文的长度这样浏览器就不用一直空转ctx.writeAndFlush(response);}});/* ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug({},msg.getClass());}});*/}}).bind(8080).sync().channel();} 3、自定义
要素 魔数用来在第一时间判定是否是无效数据包 版本号可以支持协议的升级 序列化算法消息正文到底采用哪种序列化反序列化方式可以由此扩展例如json、protobuf、hessian、jdk 指令类型是登录、注册、单聊、群聊... 跟业务相关 请求序号为了双工通信提供异步能力 正文长度 消息正文
编解码器 根据上面的要素设计一个登录请求消息和登录响应消息并使用 Netty 完成收发 Slf4j
public class MessageCodec extends ByteToMessageCodecMessage {Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos new ByteArrayOutputStream();ObjectOutputStream oos new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);}Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception {int magicNum in.readInt();byte version in.readByte();byte serializerType in.readByte();byte messageType in.readByte();int sequenceId in.readInt();in.readByte();int length in.readInt();byte[] bytes new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois new ObjectInputStream(new ByteArrayInputStream(bytes));Message message (Message) ois.readObject();log.debug({}, {}, {}, {}, {}, {}, magicNum, version, serializerType, messageType, sequenceId, length);log.debug({}, message);out.add(message);}
} 注意
channel能不能共享Handler。看源码有没有Sharable注解否则有线程安全问题
ByteToMessageCodec的子类不能标注为Sharable 想要使用这个注解就得继承MessageToMessageCodec这个类默认是消息转化为消息不会出现黏包和半包的情况。所以要使用LengthFieldBasedFrameDecoder来确保不会出现黏包半包的情况
ChannelHandler.Sharable
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodecByteBuf, Message 自定义类没有保存状态就是线程安全的 六、聊天业务 1、登录功能 使用最简单的控制台输入还有固定数据 注意全是采用我们之前自定义的编解码器
MessageCodecSharable
相关资料我上传上去了服务端代码
添加后处理器 在连接建立之后执行校验用户名、密码的功能 Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss new NioEventLoopGroup();NioEventLoopGroup worker new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();try {ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);//自定义协议 编解码器ch.pipeline().addLast(new SimpleChannelInboundHandlerLoginRequestMessage() {Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {String username msg.getUsername();String password msg.getPassword();boolean b UserServiceFactory.getUserService().login(username, password);LoginResponseMessage result null;if(b){//登录成功result new LoginResponseMessage(true, 登录成功);}else{result new LoginResponseMessage(false, 用户名或密码错误);}ctx.writeAndFlush(result);}});}});Channel channel serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error(server error, e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
} 客户端代码
连接建立之后控制台输入用户名、密码。发送给客服端 Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG); //打印日志MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();//自定义消息协议 编解码try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new Thread(()-{Scanner scnew Scanner(System.in);System.out.println(请输入用户名);String username sc.nextLine();System.out.println(请输入密码);String password sc.nextLine();LoginRequestMessage loginRequestMessage new LoginRequestMessage(username, password);ctx.writeAndFlush(loginRequestMessage);}).start();}});}});Channel channel bootstrap.connect(localhost, 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error(client error, e);} finally {group.shutdownGracefully();}}
}线程通信
我们使用CountDownLatch来处理 登陆成功和登陆失败的情况实现
两个线程在channelRead和channelActive之间的通信 Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG); //打印日志MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();//自定义消息协议 编解码CountDownLatch countDownLatchnew CountDownLatch(1);//到0表示可以继续运行AtomicBoolean bnew AtomicBoolean(false);//响应是成功还是失败try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new Thread(()-{Scanner scnew Scanner(System.in);System.out.println(请输入用户名);String username sc.nextLine();System.out.println(请输入密码);String password sc.nextLine();LoginRequestMessage loginRequestMessage new LoginRequestMessage(username, password);ctx.writeAndFlush(loginRequestMessage);try {countDownLatch.await();//线程阻塞在这里} catch (InterruptedException e) {throw new RuntimeException(e);}if(!b.get()){//如果登录失败。关闭连接ctx.channel().close();}log.debug(菜单);}).start();}Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {super.channelRead(ctx,msg);if (msg instanceof LoginResponseMessage) {LoginResponseMessage responseMessage (LoginResponseMessage) msg;if (responseMessage.isSuccess()) {//登录成功b.set(true);}else{b.set(false);}countDownLatch.countDown();}}});}});Channel channel bootstrap.connect(localhost, 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error(client error, e);} finally {group.shutdownGracefully();}}
}2、业务消息发送 没什么看看就行 while(true){System.out.println();System.out.println(send [username] [content]);System.out.println(gsend [group name] [content]);System.out.println(gcreate [group name] [m1,m2,m3...]);System.out.println(gmembers [group name]);System.out.println(gjoin [group name]);System.out.println(gquit [group name]);System.out.println(quit);System.out.println();String choice sc.nextLine();String[] s choice.split( );switch (s[0]){case send: ctx.writeAndFlush(new ChatRequestMessage(username,s[1],s[2]));break;case gsend: ctx.writeAndFlush(new GroupChatRequestMessage(username,s[1],s[2]));break;case gcreate:String[] members s[2].split(,);SetString setnew HashSet( Arrays.asList(members));ctx.writeAndFlush(new GroupCreateRequestMessage(s[1],set));break;case gmembers:ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));break;case gjoin:ctx.writeAndFlush(new GroupJoinRequestMessage(username,s[1]));break;case gquit:ctx.writeAndFlush(new GroupQuitRequestMessage(username,s[1]));break;case quit:ctx.channel().close();return ;}} 3、单聊消息处理
为了让我们的代码更清晰
我们先对匿名内部类转化为内部类 再转化为外部类 第一步
模仿上面的结构编写自定义ChatRequestMessageHandler我们写在单独的类下
然后使用addLast加入到channel package cn.itcast.server.Handler;import cn.itcast.message.ChatRequestMessage;
import cn.itcast.message.ChatResponseMessage;
import cn.itcast.server.service.UserServiceFactory;
import cn.itcast.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;//处理单聊的业务
ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandlerChatRequestMessage {Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage msg) throws Exception {String from msg.getFrom();String to msg.getTo();Channel channel SessionFactory.getSession().getChannel(to);//查看对方是否在线if(channel!null){//在线 发送给接收方channel.writeAndFlush(new ChatResponseMessage(from,msg.getContent()));}else{//发送给 发送方channelHandlerContext.writeAndFlush(new ChatResponseMessage(false,用户不存在或者用户不在线));}}
}这是服务端的代码 Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss new NioEventLoopGroup();NioEventLoopGroup worker new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC new MessageCodecSharable();LoginRequestMessageHandler LOGIN_HANDLER new LoginRequestMessageHandler();ChatRequestMessageHandler CHAT_HANDLER new ChatRequestMessageHandler();try {ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);//自定义协议 编解码器ch.pipeline().addLast(LOGIN_HANDLER);ch.pipeline().addLast(CHAT_HANDLER);}});Channel channel serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error(server error, e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}}4、群聊创建
在服务端代码中加上这个handler这里就不做演示了。
package cn.itcast.server.Handler;import cn.itcast.message.GroupCreateRequestMessage;
import cn.itcast.message.GroupCreateResponseMessage;
import cn.itcast.server.session.Group;
import cn.itcast.server.session.GroupSession;
import cn.itcast.server.session.GroupSessionFactory;
import cn.itcast.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.List;
import java.util.Set;ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandlerGroupCreateRequestMessage {Overrideprotected void channelRead0(ChannelHandlerContext ctx,GroupCreateRequestMessage msg) throws Exception {String groupName msg.getGroupName();SetString members msg.getMembers();GroupSession groupSession GroupSessionFactory.getGroupSession();Group group groupSession.createGroup(groupName, members);//不存在的话创建并返回nullif(groupnull){ctx.writeAndFlush(new GroupCreateResponseMessage(true,群聊创检查成功));//创建成功ListChannel membersChannel groupSession.getMembersChannel(groupName);for (Channel channel : membersChannel) {channel.writeAndFlush(new GroupCreateResponseMessage(true,您已被拉入groupName));}}else{//创建失败ctx.writeAndFlush(new GroupCreateResponseMessage(false,群聊已存在));}}
}这里有一个缺陷就是群主并没有进去群聊。所以要在客户端加上
set.add(username); 其他业务就不一一写了都是编写自定义的Handler。 5、空闲检测、心跳 连接假死可能因为网络的原因连接已经断开了。 我们可以做空闲检测就是服务端和客户端定时检测是否还有消息传输 如果超过5秒就判断连接假死但是这样是不合理的客户端可能没有及时发送消息导致连接被断开。 我们可以在客户端进行定时发送心跳如果心跳没有传输到服务端可以证明连接确实出现问题。如果能传输到服务端也不会断开连接 这里要介绍netty提供的IdleStateHandler他有三个参数
参数1读空闲时间 参数2写空闲时间 参数3读写空闲时间时间到了之后会触发响应的事件
我们可以使用ChannelDuplexHandler的userEventTriggered方法来处理各种事件
客户端加上ch.pipeline().addLast(new IdleStateHandler(0,3,0));ch.pipeline().addLast(new ChannelDuplexHandler(){Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event (IdleStateEvent) evt;if(event.state() IdleState.WRITER_IDLE){log.debug(心跳);ctx.writeAndFlush(new PingMessage());}}});服务端加上//参数1读空闲时间 参数2写空闲时间 参数3读写空闲时间ch.pipeline().addLast(new IdleStateHandler(5,0,0));ch.pipeline().addLast(new ChannelDuplexHandler(){Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event (IdleStateEvent) evt;if(eventIdleStateEvent.READER_IDLE_STATE_EVENT){log.debug(5秒没有读事件);ctx.channel().close();}}});
如果客户端还是断开了是因为这里有个bug等下一章才学习到。