商务网站建设实验报告,三层别墅设计图片大全 效果图,做网站的费用记哪个科目,网站的面包屑怎么做的Netty高级应用之#xff1a;编解码器与群聊天室开发 文章目录Netty高级应用之#xff1a;编解码器与群聊天室开发Netty编解码器Java的编解码Netty编解码器概念解码器(Decoder)编码器(Encoder)编码解码器CodecNetty案例-群聊天室聊天室服务端编写聊天室客户端编写Netty编解码器…Netty高级应用之编解码器与群聊天室开发 文章目录Netty高级应用之编解码器与群聊天室开发Netty编解码器Java的编解码Netty编解码器概念解码器(Decoder)编码器(Encoder)编码解码器CodecNetty案例-群聊天室聊天室服务端编写聊天室客户端编写Netty编解码器
Java的编解码
编码Encode称为序列化 它将对象序列化为字节数组用于网络传输、数据持久化或者其它用途。解码Decode称为反序列化它把从网络、磁盘等读取的字节数组还原成原始对象通常是原始对象的拷贝以方便后续的业务逻辑操作。 java序列化对象只需要实现java.io.Serializable接口并生成序列化ID这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。
Java序列化目的1.网络传输。2.对象持久化。
Java序列化缺点1.无法跨语言。 2.序列化后码流太大。3.序列化性能太低
Java序列化仅仅是Java编解码技术的一种由于它的种种缺陷衍生出了多种编解码技术和框架这些编解码框架实现消息的高效序列化。
Netty编解码器
概念
在网络应用中需要实现某种编解码器将原始字节数据与自定义的消息对象进行互相转换。网络中都是以字节码的数据形式来传输数据的服务器编码数据后发送到客户端客户端需要对数据进行解码。
对于Netty而言编解码器由两部分组成编码器、解码器。
解码器负责将消息从字节或其他序列形式转成指定的消息对象。编码器将消息对象转成字节或其他序列形式在网络上传输。
Netty 的编解码器实现了 ChannelHandlerAdapter也是一种特殊的 ChannelHandler所以依赖于 ChannelPipeline可以将多个编解码器链接在一起以实现复杂的转换逻辑。
Netty里面的编解码
解码器负责处理“入站 InboundHandler”数据。编码器负责“出站OutboundHandler” 数据。
解码器(Decoder)
解码器负责解码“入站”数据从一种格式到另一种格式解码器处理入站数据是抽象ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中。对于解码器Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder 抽象解码器
ByteToMessageDecoder: 用于将字节转为消息需要检查缓冲区是否有足够的字节ReplayingDecoder: 继承ByteToMessageDecoder不需要检查缓冲区是否有足够的字节,但是 ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。项目复杂性高则使用ReplayingDecoder否则使用ByteToMessageDecoderMessageToMessageDecoder: 用于从一种消息解码为另外一种消息例如POJO到POJO
核心方法
decode(ChannelHandlerContext ctx, ByteBuf msg, ListObject out)代码实现
解码器:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;import java.util.List;/*** 消息解码器*/
public class MessageDecoder extends MessageToMessageDecoder {Overrideprotected void decode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {System.out.println(正在进行消息解码....);ByteBuf byteBuf (ByteBuf) msg;out.add(byteBuf.toString(CharsetUtil.UTF_8));//传递到下一个handler}
}通道读取方法 /*** 通道读取事件** param ctx* param msg* throws Exception*/Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(客户端发送过来的消息: msg);}启动类 protected void initChannel(SocketChannel ch) throws Exception {//8. 向pipeline中添加自定义业务处理handlerch.pipeline().addLast(new MessageDecoder());//添加解码器ch.pipeline().addLast(new NettyServerHandler());}编码器(Encoder)
与ByteToMessageDecoder和MessageToMessageDecoder相对应Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder二者都实现ChannelOutboundHandler接口。 抽象编码器
MessageToByteEncoder: 将消息转化成字节MessageToMessageEncoder: 用于从一种消息编码为另外一种消息例如POJO到POJO
核心方法
encode(ChannelHandlerContext ctx, String msg, ListObject out)代码实现
编码器:
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;import java.util.List;/*** 消息的编码器*/
public class MessageEncoder extends MessageToMessageEncoder {Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {System.out.println(消息正在进行编码....);String str (String) msg;out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));}
}消息发送
/*** 通道就绪事件** param ctx* throws Exception*/Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ChannelFuture future ctx.writeAndFlush(你好呀.我是Netty客户端);future.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {System.out.println(数据发送成功!);} else {System.out.println(数据发送失败!);}}});}启动类:
Override
protected void initChannel(SocketChannel ch) throws Exception {//6. 向pipeline中添加自定义业务处理handlerch.pipeline().addLast(new MessageDecoder());//添加解码器ch.pipeline().addLast(new MessageEncoder());//添加编码器ch.pipeline().addLast(new NettyClientHandler());
}编码解码器Codec
编码解码器: 同时具有编码与解码功能特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口因此在数据输入和输出时都能进行处理。 Netty提供提供了一个ChannelDuplexHandler适配器类,编码解码器的抽象基类ByteToMessageCodec ,MessageToMessageCodec都继承与此类.
代码实现
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.util.CharsetUtil;import java.util.List;/*** 消息编解码器*/
public class MessageCodec extends MessageToMessageCodec {/*** 编码** param ctx* param msg* param out* throws Exception*/Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {System.out.println(消息正在进行编码....);String str (String) msg;out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));}/*** 解码** param ctx* param msg* param out* throws Exception*/Overrideprotected void decode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {System.out.println(正在进行消息解码....);ByteBuf byteBuf (ByteBuf) msg;out.add(byteBuf.toString(CharsetUtil.UTF_8));//传递到下一个handler}
}启动类
protected void initChannel(SocketChannel ch) throws Exception {//8. 向pipeline中添加自定义业务处理handlerch.pipeline().addLast(new MessageCoder());//添加编解码器ch.pipeline().addLast(new NettyServerHandler());
}Netty案例-群聊天室
案例要求:
编写一个 Netty 群聊系统实现服务器端和客户端之间的数据简单通讯实现多人群聊服务器端可以监测用户上线离线并实现消息转发功能客户端可以发送消息给其它所有用户同时可以接受其它用户发送的消息
聊天室服务端编写
NettyChatServer
import com.lagou.demo.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;/*** 聊天室服务端*/
public class NettyChatServer {//端口号private int port;public NettyChatServer(int port) {this.port port;}public void run() throws InterruptedException {//1. 创建bossGroup线程组: 处理网络事件--连接事件EventLoopGroup bossGroup null;//2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数EventLoopGroup workerGroup null;try {bossGroup new NioEventLoopGroup(1);workerGroup new NioEventLoopGroup();//3. 创建服务端启动助手ServerBootstrap serverBootstrap new ServerBootstrap();//4. 设置bossGroup线程组和workerGroup线程组serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO.option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6. 参数设置.childHandler(new ChannelInitializerSocketChannel() { //7. 创建一个通道初始化对象Overrideprotected void initChannel(SocketChannel ch) throws Exception {//8. 向pipeline中添加自定义业务处理handler//添加编解码器ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());// todoch.pipeline().addLast(new NettyChatServerHandler());}});//9. 启动服务端并绑定端口,同时将异步改为同步ChannelFuture future serverBootstrap.bind(port);future.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {System.out.println(端口绑定成功!);} else {System.out.println(端口绑定失败!);}}});System.out.println(聊天室服务端启动成功.);//10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池future.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws InterruptedException {new NettyChatServer(9998).run();}
}NettyChatServerHandle
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.ArrayList;
import java.util.List;/*** 聊天室业务处理类*/
public class NettyChatServerHandler extends SimpleChannelInboundHandlerString {public static ListChannel channelList new ArrayList();/*** 通道就绪事件** param ctx* throws Exception*/Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel channel ctx.channel();//当有新的客户端连接的时候, 将通道放入集合channelList.add(channel);System.out.println([Server]: channel.remoteAddress().toString().substring(1) 在线.);}/*** 通道未就绪--channel下线** param ctx* throws Exception*/Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel channel ctx.channel();//当有客户端断开连接的时候,就移除对应的通道channelList.remove(channel);System.out.println([Server]: channel.remoteAddress().toString().substring(1) 下线.);}/*** 通道读取事件** param ctx* param msg* throws Exception*/Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {//当前发送消息的通道, 当前发送的客户端连接Channel channel ctx.channel();for (Channel channel1 : channelList) {//排除自身通道if (channel ! channel1) {channel1.writeAndFlush([ channel.remoteAddress().toString().substring(1) ]说: msg);}}}/*** 异常处理事件** param ctx* param cause* throws Exception*/Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();Channel channel ctx.channel();//移除集合channelList.remove(channel);System.out.println([Server]: channel.remoteAddress().toString().substring(1) 异常.);}
}聊天室客户端编写
NettyChatClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;import java.util.Scanner;/*** 聊天室的客户端*/
public class NettyChatClient {private String ip;//服务端IPprivate int port;//服务端端口号public NettyChatClient(String ip, int port) {this.ip ip;this.port port;}public void run() throws InterruptedException {//1. 创建线程组EventLoopGroup group null;try {group new NioEventLoopGroup();//2. 创建客户端启动助手Bootstrap bootstrap new Bootstrap();//3. 设置线程组bootstrap.group(group).channel(NioSocketChannel.class)//4. 设置客户端通道实现为NIO.handler(new ChannelInitializerSocketChannel() { //5. 创建一个通道初始化对象Overrideprotected void initChannel(SocketChannel ch) throws Exception {//6. 向pipeline中添加自定义业务处理handler//添加编解码器ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());//添加客户端的处理类ch.pipeline().addLast(new NettyChatClientHandler());}});//7. 启动客户端,等待连接服务端,同时将异步改为同步ChannelFuture channelFuture bootstrap.connect(ip, port).sync();Channel channel channelFuture.channel();System.out.println(------- channel.localAddress().toString().substring(1) --------);Scanner scanner new Scanner(System.in);while (scanner.hasNextLine()) {String msg scanner.nextLine();//向服务端发送消息channel.writeAndFlush(msg);}//8. 关闭通道和关闭连接池channelFuture.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}public static void main(String[] args) throws InterruptedException {new NettyChatClient(127.0.0.1, 9998).run();}
}NettyChatClientHandle
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** 聊天室处理类*/
public class NettyChatClientHandler extends SimpleChannelInboundHandlerString {/*** 通道读取就绪事件** param ctx* param msg* throws Exception*/Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg);}
}