当前位置: 首页 > news >正文

网站开发后端需要哪些技术网站模板大全

网站开发后端需要哪些技术,网站模板大全,清丰网站建设费用,步骤的英文目录 一、介绍 二、依赖引入 三、公共部分实现 四、server端实现 五、client端实现 六、测试 一、介绍 本片文章将实现请求响应同步#xff0c;什么是请求响应同步呢#xff1f;就是当我们发起一个请求时#xff0c;希望能够在一定时间内同步#xff08;线程阻塞什么是请求响应同步呢就是当我们发起一个请求时希望能够在一定时间内同步线程阻塞等待响应结果。 我们通过netty实现rpc调用时由于客户端和服务端保持连接在此期间客户端会有无数的接口调用并发而此时每次发送的请求需要能够及时响应获取调用结果服务端一次次返回调用结果客户端在处理响应结果时需要与请求建立联系确保每一次的请求能够正确获取到对应的调用结果。 由于在一个应用中客户端与服务端的channel只有一条所有线程都通过该channel进行rpc调用所以在接下来客户端设计中每个线程发送的请求将会分配一个id当请求发送完毕之后该线程会进行阻塞状态等待channel收到请求id对应返回的响应消息时唤醒或超时唤醒。在接下来服务端设计中服务端收到客户端的rpc调用请求对该请求进行处理将该请求的id和处理结果写入响应类中进行返回。 二、依赖引入 dependenciesdependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactIdversion4.1.101.Final/version/dependencydependencygroupIdio.protostuff/groupIdartifactIdprotostuff-core/artifactIdversion1.8.0/version/dependencydependencygroupIdio.protostuff/groupIdartifactIdprotostuff-runtime/artifactIdversion1.8.0/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.30/version/dependency/dependencies 三、公共部分实现 1、结构 2、Message类所有Request和Response类的父类最关键的字段就是messageType子类继承之后进行赋值该值与类的类型进行绑定用于byte字节数组反序列化时能够获取到需要反序列化的类型。 Data public abstract class Message {protected Byte messageType;}RpcRequest用于客户端向服务端发起调用的消息通信类 Data ToString public class RpcRequest extends Message{private String id;private String param;public RpcRequest() {this.id UUID.randomUUID().toString();super.messageType MessageConstant.rpcRequest;}} RpcResponse用于服务端向客户端返回结构的消息通信类 Data ToString public class RpcResponse extends Message{private String id;private String result;public RpcResponse() {super.messageType MessageConstant.rpcResponse;}} 3、MessageConstant通过数值常量messageType绑定消息类型在序列化对象时会在数据中记录对象的messageType在反序列化对象时会从数据包中拿到messageType将其转化为对应的消息类型进行处理 public class MessageConstant {public final static Byte rpcRequest 1;public final static Byte rpcResponse 2;public static MapByte, Class? extends Message messageTypeMap new ConcurrentHashMap();static {messageTypeMap.put(rpcRequest, RpcRequest.class);messageTypeMap.put(rpcResponse, RpcResponse.class);}public static Class? extends Message getMessageClass(Byte messageType){return messageTypeMap.get(messageType);}} 4、序列化工具用于将类对象序列化为字节数组以及将字节数组反序列化为对象 public class SerializationUtil {private final static MapClass?, Schema? schemaCache new ConcurrentHashMap();/*** 序列化*/public static T byte[] serialize(T object){LinkedBuffer buffer LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);try {ClassT cls (ClassT) object.getClass();SchemaT schema getSchema(cls);return ProtostuffIOUtil.toByteArray(object, schema, buffer);} catch (Exception e) {throw e;} finally {buffer.clear();}}/*** 反序列化*/public static T T deserialize(ClassT cls, byte[] data) {SchemaT schema getSchema(cls);T message schema.newMessage();ProtostuffIOUtil.mergeFrom(data, message, schema);return message;}public static T SchemaT getSchema(ClassT cls) {SchemaT schema (SchemaT) schemaCache.get(cls);if(schema null) {schema RuntimeSchema.getSchema(cls);schemaCache.put(cls, schema);}return schema;}}5、MesasgeEncode和MessageDecode实现 MessageEncode用于将消息对象序列化为字节数组 字节数组主要包括三部分 ·有效数组长度占4个字节长度不包括自己用于半包黏包判断 ·消息的类型占1个字节用于反序列选择类型使用 ·消息对象占n个字节 public class MessageEncode extends MessageToByteEncoderMessage {Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {// 将对象进行序列化byte[] data SerializationUtil.serialize(message);// 写数据长度前4个字节用于记录数据总长度对象 类型1个字节byteBuf.writeInt(data.length 1);// 写记录消息类型用于反序列选择类的类型byteBuf.writeByte(message.getMessageType());// 写对象byteBuf.writeBytes(data);}} MesageDecode用于将字节数组反序列化为消息对象 反序列时会进行判断数据是否足够读取足够的话就会读取到符合长度的字节数组进行序列化否则的话等到下一个数据包到来再进行重新判断处理解决半包黏包方案 public class MessageDecode extends ByteToMessageDecoder {Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, ListObject list) throws Exception {// 由于数据包的前4个字节用于记录总数据大小如果数据不够4个字节不进行读if(byteBuf.readableBytes() 4) {return;}// 标记开始读的位置byteBuf.markReaderIndex();// 前四个字节记录了数据大小int dataSize byteBuf.readInt();// 查看剩余可读字节是否足够如果不是重置读取位置等待下一次解析if(byteBuf.readableBytes() dataSize) {byteBuf.resetReaderIndex();return;}// 读取消息类型byte messageType byteBuf.readByte();// 读取数据, 数组大小需要剔除1个字节的消息类型byte[] data new byte[dataSize -1];byteBuf.readBytes(data);Message message SerializationUtil.deserialize(MessageConstant.getMessageClass(messageType), data);list.add(message);}} 四、server端实现 1、结构 2、RpcRequestHandler用于处理客户端rpc请求 public class RpcRequestHandler extends SimpleChannelInboundHandlerRpcRequest {private final static EventLoopGroup worker new DefaultEventLoopGroup(Runtime.getRuntime().availableProcessors() 1);Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {// 为避免占用网络io此处异步进行处理worker.submit(() - {System.out.println([RpcRequestHandler] Thread.currentThread().getName() 处理请求msg msg);// 模拟处理耗时try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}RpcResponse rpcResponse new RpcResponse();rpcResponse.setId(msg.getId());rpcResponse.setResult(处理 msg.getParam());ctx.writeAndFlush(rpcResponse);});}}3、ServerChannelInitializer该类用于初始化Server与Client通信的Channel需要将我们前面写的编解码器以及RequestHandler添加进pipeline public class ServerChannelInitializer extends ChannelInitializerSocketChannel {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline socketChannel.pipeline();pipeline.addLast(new MessageEncode());pipeline.addLast(new MessageDecode());pipeline.addLast(new RpcRequestHandler());}} 4、RpcServer用于启动一个Netty Server服务 public class RpcServer {public void bind(Integer port) {EventLoopGroup parent new NioEventLoopGroup();EventLoopGroup child new NioEventLoopGroup();Channel channel null;try{ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.group(parent, child).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ServerChannelInitializer());ChannelFuture channelFuture serverBootstrap.bind(port).sync();System.out.println(server启动);// 非阻塞等待关闭channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {System.out.println(server关闭);parent.shutdownGracefully();child.shutdownGracefully();}});channel channelFuture.channel();} catch (Exception e) {e.printStackTrace();if(channel null || !channel.isActive()) {System.out.println(server关闭);parent.shutdownGracefully();child.shutdownGracefully();} else {channel.close();}}}} 五、client端实现 1、结构 2、SyncPromise用于Netty客户端的工作线程与外部发起RpcRequest的线程通信的类通过该类可以阻塞与唤醒外部发起RpcRequest的线程以及设置线程之间通信的内容功能有点像Netty提供的Promise不过此处我加了超时机制 此处使用CountDownLatch来阻塞与唤醒线程有以下好处 1、能够通过await(long timeout, TimeUnit unit)返回值true/false进行判断线程等待返回结果是否超时。因为线程进入阻塞时CountDownLatch的值为1当netty客户端的工作线程调用countDown()唤醒线程时CountDownLatch值减为0await(long timeout, TimeUnit unit)返回true意味着线程等待响应结果时没有超时。当netty客户端的工作线程没有来得及调用countDown()唤醒线程时。也就是说服务端返回结果超时CountDownLatch值为1线程超时唤醒await(long timeout, TimeUnit unit)返回false。 综上所述以await(long timeout, TimeUnit unit)返回值进行判断线程是否超时唤醒。此处给一个对比就是有人认为为什么不使用LockSupport进行线程的阻塞与唤醒原因如下虽然LockSupport提供了超时唤醒的方法但是该方法既没有返回值也没有抛出异常线程唤醒时我们没有办法判断该线程是否超时了。 2、在我们实现的流程中我们先发送了请求才进行线程阻塞。那么存在一种情况如果结果在我们线程阻塞之前就返回了那么当线程进入阻塞时就再也没有唤醒线程的时机了导致线程每次调用接口都是超时的。 CountDownLatch的await(long timeout, TimeUnit unit)方法很好的规避了上诉问题如果netty客户端的工作线程调用countDown()唤醒线程那么此时CountDownLatch值减为0线程需要调用await进入阻塞此时由于CountDownLatch为0线程将不会进入阻塞方法返回true我们线程也能够正常的拿到请求的响应结果。 具体妙处需要大家仔细感受一开始可能不太能理解但把流程仔细梳理一下就能够有更好的体验。 public class SyncPromise {// 用于接收结果private RpcResponse rpcResponse;private final CountDownLatch countDownLatch new CountDownLatch(1);// 用于判断是否超时private boolean isTimeout false;/*** 同步等待返回结果*/public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException {// 等待阻塞超时时间内countDownLatch减到0将提前唤醒以此作为是否超时判断boolean earlyWakeUp countDownLatch.await(timeout, unit);if(earlyWakeUp) {// 超时时间内countDownLatch减到0提前唤醒说明已有结果return rpcResponse;} else {// 超时时间内countDownLatch没有减到0自动唤醒说明超时时间内没有等到结果isTimeout true;return null;}}public void wake() {countDownLatch.countDown();}public RpcResponse getRpcResponse() {return rpcResponse;}public void setRpcResponse(RpcResponse rpcResponse) {this.rpcResponse rpcResponse;}public boolean isTimeout() {return isTimeout;}} 3、RpcUtil封装的请求发送工具类需要调用rpc发送的请求的线程将通过该工具的send方法进行远程调用不能简单的通过channel.writeAndFlush()进行客户端与服务端的通信 syncPromiseMap的作用记录请求对应的SyncPromise对象一次请求对应一个SyncPromise对象由于外部线程与netty客户端的工作线程是通过SyncPromise进行通信的我们需要通过请求的id与SyncPromise建立关系确保netty客户端在处理RpcResopnse时能够根据其中的请求id属性值找到对应SyncPromise对象为其设置响应值以及唤醒等待结果的线程。 public class RpcUtil {private final static MapString, SyncPromise syncPromiseMap new ConcurrentHashMap();private final static Channel channel;static{channel new RpcClient().connect(127.0.0.1, 8888);}public static RpcResponse send(RpcRequest rpcRequest, long timeout, TimeUnit unit) throws Exception{if(channel null) {throw new NullPointerException(channel);}if(rpcRequest null) {throw new NullPointerException(rpcRequest);}if(timeout 0) {throw new IllegalArgumentException(timeout must greater than 0);}// 创造一个容器用于存放当前线程与rpcClient中的线程交互SyncPromise syncPromise new SyncPromise();syncPromiseMap.put(rpcRequest.getId(), syncPromise);// 发送消息此处如果发送玩消息并且在get之前返回了结果下一行的get将不会进入阻塞也可以顺利拿到结果channel.writeAndFlush(rpcRequest);// 等待获取结果RpcResponse rpcResponse syncPromise.get(timeout, unit);if(rpcResponse null) {if(syncPromise.isTimeout()) {throw new TimeoutException(等待响应结果超时);} else{throw new Exception(其他异常);}}// 移除容器syncPromiseMap.remove(rpcRequest.getId());return rpcResponse;}public static MapString, SyncPromise getSyncPromiseMap(){return syncPromiseMap;}} 4、RpcResponseHandler处理返回的调用结果在该处理器中将唤醒等待返回结果的线程 public class RpcResponseHandler extends SimpleChannelInboundHandlerRpcResponse {Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {// 根据请求id在集合中找到与外部线程通信的SyncPromise对象SyncPromise syncPromise RpcUtil.getSyncPromiseMap().get(msg.getId());if(syncPromise ! null) {// 设置响应结果syncPromise.setRpcResponse(msg);// 唤醒外部线程syncPromise.wake();}}}5、ClientChannelInitializer该类用于初始化Server与Client通信的Channel需要将我们前面写的编解码器以及ResponseHandler添加进pipeline public class ClientChannelInitializer extends ChannelInitializerSocketChannel {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline socketChannel.pipeline();pipeline.addLast(new MessageEncode());pipeline.addLast(new MessageDecode());pipeline.addLast(new RpcResponseHandler());}}6、RpcClient实现用于启动客户端 public class RpcClient {public Channel connect(String host, Integer port) {EventLoopGroup worker new NioEventLoopGroup();Channel channel null;try {Bootstrap bootstrap new Bootstrap();bootstrap.group(worker).channel(NioSocketChannel.class).option(ChannelOption.AUTO_READ, true).handler(new ClientChannelInitializer());ChannelFuture channelFuture bootstrap.connect(host, port).sync();System.out.println(客户端启动);channel channelFuture.channel();// 添加关闭监听器channel.closeFuture().addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {System.out.println(关闭客户端);worker.shutdownGracefully();}});} catch (Exception e) {e.printStackTrace();if(channel null || !channel.isActive()) {worker.shutdownGracefully();} else {channel.close();}}return channel;}} 六、测试 1、启动服务端 public static void main(String[] args) {new RpcServer().bind(8888); } 启动结果如下 server启动 2、启动客户端并且通过两个异步线程发送请求 public static void main(String[] args) throws Exception{//Channel channel new RpcClient().connect(127.0.0.1, 8888);Thread thread1 new Thread(new Runnable() {Overridepublic void run() {RpcRequest rpcRequest new RpcRequest();rpcRequest.setParam(参数1);try {System.out.println(thread1发送请求);RpcResponse rpcResponse RpcUtil.send(rpcRequest, 5, TimeUnit.SECONDS);System.out.println(thread1处理结果 rpcResponse);} catch (Exception e) {throw new RuntimeException(e);}}});Thread thread2 new Thread(new Runnable() {Overridepublic void run() {RpcRequest rpcRequest2 new RpcRequest();rpcRequest2.setParam(参数2);try {System.out.println(thread2发送请求);RpcResponse rpcResponse RpcUtil.send(rpcRequest2, 5, TimeUnit.SECONDS);System.out.println(thread2处理结果 rpcResponse);} catch (Exception e) {throw new RuntimeException(e);}}});// 休眠一下等待客户端与服务端进行连接Thread.sleep(1000);thread1.start();thread2.start();} 服务端结果 [RpcRequestHandler] defaultEventLoopGroup-4-3 处理请求msg RpcRequest(idade6af01-2bcf-4a4c-a42a-381731010027, param参数1) [RpcRequestHandler] defaultEventLoopGroup-4-4 处理请求msg RpcRequest(iddb57bf9a-3220-44ca-8e4f-d74237a3d5b2, param参数2) 客户端结果 thread1发送请求 thread2发送请求 thread1处理结果RpcResponse(idade6af01-2bcf-4a4c-a42a-381731010027, result处理参数1) thread2处理结果RpcResponse(iddb57bf9a-3220-44ca-8e4f-d74237a3d5b2, result处理参数2) 以上由于我们在RpcRequestHandler中模拟处理请求为3秒而线程等待结果超时为5秒所以接下来将线程调用rpc请求的的超时时间设置为2秒重启客户端客户端结果如下 thread1发送请求 thread2发送请求 Exception in thread Thread-1 Exception in thread Thread-0 java.lang.RuntimeException: java.util.concurrent.TimeoutException: 等待响应结果超时at org.ricardo.sync.client.RpcClientTest$1.run(RpcClientTest.java:32)at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.TimeoutException: 等待响应结果超时at org.ricardo.sync.client.rpc.RpcUtil.send(RpcUtil.java:56)at org.ricardo.sync.client.RpcClientTest$1.run(RpcClientTest.java:29)... 1 more java.lang.RuntimeException: java.util.concurrent.TimeoutException: 等待响应结果超时at org.ricardo.sync.client.RpcClientTest$2.run(RpcClientTest.java:48)at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.TimeoutException: 等待响应结果超时at org.ricardo.sync.client.rpc.RpcUtil.send(RpcUtil.java:56)at org.ricardo.sync.client.RpcClientTest$2.run(RpcClientTest.java:45)... 1 more
http://www.dnsts.com.cn/news/187520.html

相关文章:

  • 软路由系统如何做网站莱芜金点子招聘网
  • 怎么让百度收录我的网站网站开发网校
  • 如何把网站点击连接到百度商桥云服务器一年多少钱
  • 武义县建设局网站做网站还是做阿里
  • 网站做广告的好处网站怎么办
  • 做淘客网站怎么建要购买数据库吗wordpress如何改字体大小
  • 携程特牌 的同时做别的网站支付宝小程序开发者工具
  • 自做网站需要多少钱简单的网页制作软件
  • 网站源码查询湖北住房和城乡建设厅网站
  • 网站 百度 关键字优化用手机能建网站吗
  • 崇明区建设镇网站用wordpress建公司网站步骤
  • icp网站备案wordpress用户注册地址
  • 聊城网站建设设计实力公司wordpress天气接口
  • 中国做网站正邦做vi的网站
  • 怎么看一家网站是谁做的东营网新闻
  • 网页制作与网站建设实战大全光盘电商网站功能模块图
  • 开鲁网站seo转接手机代理网址
  • 国外网站流量查询给个网站免费的
  • 重庆智能建站模板上海城乡住房建设部网站
  • 商城站时刻表网站开发贴吧
  • 网站项目中的工作流程营销推广方案
  • 网站县区分站点建设做网站 没内容
  • 网站建设开题报告ppt模板美妆网站建设环境分析
  • 重庆建设厂网站建设银行网站需要什么浏览器
  • 网站框架类型wordpress短信
  • 网站开发人员属于什么软件WordPress开VPN访问快
  • 外国出名的设计网站如何安装wordpress主题
  • 智慧校园信息门户网站建设自己做网站的给微信取个什么名字好
  • 购物网站英文介绍专业的免费建站
  • 哈尔滨做设计和网站的公司优化二十条措施建议