seo免费自学的网站,wordpress网站网速慢,吴中网站建设,wordpress8小时前背景 因为安装了正向隔离网闸#xff0c;导致数据传输的时候仅支持TCP协议和UDP协议#xff0c;因此需要开发TCP Client和Server服务来将数据透传#xff0c;当前环境是获取的数据并将数据转发到kafka 1.引入依赖 dependencygroupIdio.netty/groupId导致数据传输的时候仅支持TCP协议和UDP协议因此需要开发TCP Client和Server服务来将数据透传当前环境是获取的数据并将数据转发到kafka 1.引入依赖 dependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactIdversion4.1.84.Final/version
/dependency 2.编写TCP Server端 TCP Server代码 本代码已经解决TCP的粘包和半包问题需要通过固定的$符号进行数据分割使得数据不会错出现粘包和半包问题可以根据数据大小制定一个不会超过发送消息长度的值 package com.huanyu.forward.tcp.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;Slf4j
Service(tcpServer)
ConditionalOnExpression(#{${spring.tcp-server.port:}.length()0})
public class TcpNettyServer {Value(${spring.tcp-server.port:22222})private Integer port;public static void main(String[] args) throws Exception {new TcpNettyServer().server(22222);}PostConstruct()public void initTcpServer() {try {log.info(start tcp server......);server(port);} catch (Exception e) {log.error(tcp server start failed);}}public void server(int port) throws Exception {//bossGroup就是parentGroup是负责处理TCP/IP连接的EventLoopGroup bossGroup new NioEventLoopGroup();//workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件EventLoopGroup workerGroup new NioEventLoopGroup();ByteBuf buffer ByteBufAllocator.DEFAULT.buffer(1, 1);buffer.writeByte($);ServerBootstrap sb new ServerBootstrap();//初始化服务端可连接队列,指定了队列的大小500sb.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)//保持长连接.childOption(ChannelOption.SO_KEEPALIVE, true)// 绑定客户端连接时候触发操作.childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel sh) throws Exception {//handler是按顺序执行的ChannelPipeline pipeline sh.pipeline();//业务编码 -解决 数据粘包和半包问题-pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024 * 10, buffer));
// pipeline.addLast(new LoggingHandler(LogLevel.WARN));pipeline.addLast(new TcpBizFlagHandler());//业务编码//使用DataHandler类来处理接收到的消息pipeline.addLast(new TcpDataHandler());}});//绑定监听端口调用sync同步阻塞方法等待绑定操作完ChannelFuture future sb.bind(port).sync();if (future.isSuccess()) {log.info(tcp server is listening on :{}, port);} else {log.error(tcp server is failed , future.cause());//关闭线程组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}//成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
// future.channel().closeFuture().await();}
} 数据标志位接收代码 package com.huanyu.forward.tcp.server;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.StandardCharsets;
import java.util.List;Slf4j
public class TcpBizFlagHandler extends ByteToMessageDecoder {public static final String BIZ_FLAG bizFlag;private static final String FLAG_PRE {;private static final String FLAG_SUF }##;private static final byte[] FLAG_PREFIX FLAG_PRE.getBytes(StandardCharsets.UTF_8);private static final byte[] FLAG_SUFFIX FLAG_SUF.getBytes(StandardCharsets.UTF_8);Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception {if (in.readableBytes() FLAG_PREFIX.length FLAG_SUFFIX.length) {log.warn(数据长度不够);text(in);return;}int prefixIndex in.readerIndex();if (!startsWith(in)) {text(in);// 忽略非标志位开头的数据in.skipBytes(in.readableBytes());log.warn(数据不包含指定的前缀);return;}int suffixIndex indexOf(in);if (suffixIndex -1) {log.warn(数据不包含指定的某字符);text(in);return;}int flagLength suffixIndex - prefixIndex FLAG_SUFFIX.length;byte[] flagBytes new byte[flagLength];in.readBytes(flagBytes); // 读取标志位// 保留标志位的对象结构-以{开头以}##结尾形如{k:v}##{k:v}$,和##之间的数据为补充的对象参数JSON,$为换行符号String flag new String(flagBytes, FLAG_PRE.length() - 1, flagBytes.length - FLAG_PREFIX.length - FLAG_SUFFIX.length 2, StandardCharsets.UTF_8);// 保存标志位到 Channel 属性中供后续使用ctx.channel().attr(AttributeKey.valueOf(BIZ_FLAG)).set(flag);// 剩余数据继续传递给下一个 Handler 处理透传out.add(in.readRetainedSlice(in.readableBytes()));}private static void text(ByteBuf in) {byte[] msgByte new byte[in.readableBytes()];in.readBytes(msgByte);log.warn(数据{}, new String(msgByte, StandardCharsets.UTF_8));}private boolean startsWith(ByteBuf buf) {for (int i 0; i TcpBizFlagHandler.FLAG_PREFIX.length; i) {if (buf.getByte(buf.readerIndex() i) ! TcpBizFlagHandler.FLAG_PREFIX[i]) {return false;}}return true;}private int indexOf(ByteBuf buf) {int readerIndex buf.readerIndex();int readableBytes buf.readableBytes();for (int i 0; i readableBytes - TcpBizFlagHandler.FLAG_SUFFIX.length; i) {boolean match true;for (int j 0; j TcpBizFlagHandler.FLAG_SUFFIX.length; j) {if (buf.getByte(readerIndex i j) ! TcpBizFlagHandler.FLAG_SUFFIX[j]) {match false;break;}}if (match) {return readerIndex i;}}return -1;}
}业务转发/解析代码 package com.huanyu.forward.tcp.server;import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;import static com.aimsphm.forward.tcp.server.TcpBizFlagHandler.BIZ_FLAG;Slf4j
Service
public class TcpDataHandler extends ChannelInboundHandlerAdapter {// Resourceprivate KafkaTemplateString, Object template;//接受client发送的消息Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {Channel channel ctx.channel();// 获取标志位String flag (String) channel.attr(AttributeKey.valueOf(BIZ_FLAG)).get();if (ObjectUtils.isEmpty(flag)) {log.warn(没有业务标识);return;}ByteBuf buf (ByteBuf) msg;byte[] msgByte new byte[buf.readableBytes()];buf.readBytes(msgByte);
// template.send(haha.haha.ha, gbk.getBytes());log.info(bizFag:{},data: {}, flag, new String(msgByte));}//通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}//读操作时捕获到异常时调用Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}//客户端去和服务端连接成功时触发Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.copiedBuffer(hello client [你好客户端].getBytes()));log.info(client 连接成功 {}, ctx.channel());}
}3.编写客户端代码 TCP Client 代码 package com.huanyu.forward.tcp.client;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;import java.util.stream.IntStream;Getter
Slf4j
public class TcpNettyClient {public static void main(String[] args) {extracted();}private static void extracted() {try {TcpNettyClient client new TcpNettyClient(localhost, 4444);Channel channel client.getChannel();IntStream.range(0, 1000).parallel().forEach(i - {ByteBuf buf ByteBufAllocator.DEFAULT.buffer();buf.writeBytes(({\cell-topic (i 1) \:true}##{01#.01#\:\data1\}).getBytes());buf.writeByte($);channel.writeAndFlush(buf);});} catch (Exception e) {log.error(出现异常, e);}}private Channel channel;//连接服务端的端口号地址和端口号public TcpNettyClient(String host, int port) {tcpClient(host, port);}public void tcpClient(String host, int port) {try {final EventLoopGroup group new NioEventLoopGroup();Bootstrap b new Bootstrap();b.group(group).channel(NioSocketChannel.class) // 使用NioSocketChannel来作为连接用的channel类.handler(new ChannelInitializerSocketChannel() { // 绑定连接初始化器Overridepublic void initChannel(SocketChannel ch) throws Exception {System.out.println(正在连接中...);ChannelPipeline pipeline ch.pipeline();pipeline.addLast(new TcpClientHandler()); //客户端处理类}});//发起异步连接请求绑定连接端口和host信息final ChannelFuture future b.connect(host, port).sync();future.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture arg0) throws Exception {if (future.isSuccess()) {log.info(连接服务器成功:);} else {log.warn(连接服务器失败:);System.out.println(连接服务器失败);group.shutdownGracefully(); //关闭线程组}}});this.channel future.channel();} catch (InterruptedException e) {log.error(TCP服务端启动异常, e);}}} 客户端数据解析代码 package com.huanyu.forward.tcp.client;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.Map;public class TcpClientHandler extends SimpleChannelInboundHandlerMapString, ByteBuf {//处理服务端返回的数据Overrideprotected void channelRead0(ChannelHandlerContext ctx, MapString, ByteBuf data) throws Exception {ByteBuf msg data.get(topic);byte[] msgByte new byte[msg.readableBytes()];msg.readBytes(msgByte);System.out.println(接受到server响应数据: new String(msgByte));}Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.copiedBuffer(hello server 你好.getBytes()));super.channelActive(ctx);}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}备注 1. 为了尽可能的降低性能消耗数据以字节数组的形式发送 2. 业务字段通过{key:value}##作为消息的头部用数据标志位处理器进行处理 3. 真实要传送的数据并不解析出来并以$结尾解决粘包和半包问题 记录备查