宿迁建设局网站拆除备案,当当网电子商务网站建设特点,建设网站二级子页打不开,宣传片制作报价单文章目录1、心跳机制简介2、心跳机制实现方式3、客户端4 、服务端5、代码实现5.1 KeepAlive.java5.2 MyClient.java5.3 MyServer5.4 测试结果1、心跳机制简介
在分布式系统中#xff0c;分布在不同主机上的节点需要检测其他节点的状态#xff0c;如服务器节点需要检测从节点…
文章目录1、心跳机制简介2、心跳机制实现方式3、客户端4 、服务端5、代码实现5.1 KeepAlive.java5.2 MyClient.java5.3 MyServer5.4 测试结果1、心跳机制简介
在分布式系统中分布在不同主机上的节点需要检测其他节点的状态如服务器节点需要检测从节点是否失效。为了检测对方节点的有效性每隔固定时间就发送一个固定信息给对方对方回复一个固定信息如果长时间没有收到对方的回复则断开与对方的连接。
发包方既可以是服务端也可以是客户端这要看具体实现。因为是每隔固定时间发送一次类似心跳所以发送的固定信息称为心跳包。心跳包一般为比较小的包可根据具体实现。心跳包主要应用于长连接的保持与短线链接。
一般而言应该客户端主动向服务器发送心跳包因为服务器向客户端发送心跳包会影响服务器的性能。
实现原理长连接的维持是要客户端程序定时向服务端程序发送一个维持连接包的。如果长时间未发送维持连接包服务端程序将断开连接。
2、心跳机制实现方式
心跳机制有两种实现方式一种基于TCP自带的心跳包TCP的SO_KEEPALIVE选项可以系统默认的默认跳帧频率为2小时超过2小时后本地的TCP 实现会发送一个数据包给远程的 Socket. 如果远程Socket 没有发回响应, TCP实现就会持续尝试 11 分钟, 直到接收到响应为止。 否则就会自动断开Socket连接。但TCP自带的心跳包无法检测比较敏感地知道对方的状态默认2小时的空闲时间对于大多数的应用而言太长了。可以手工开启KeepAlive功能并设置合理的KeepAlive参数。
另一种在应用层自己进行实现基本步骤如下
1、Client使用定时器不断发送心跳2、Server收到心跳后回复一个包3、Server为每个Client启动超时定时器如果在指定时间内没有收到Client的心跳包则Client失效。
3、客户端
Client通过持有Socket的对象可以随时使用sendObject方法发送Massage Object(消息)给服务端。
如果keepAliveDelay毫秒程序中是2秒内未发送任何数据则自动发送一个KeepAlive Object(心跳)给服务端用于维持连接。
由于我们向服务端可以发送很多不同的消息对象服务端也可以返回不同的对象。所以对于返回对象的处理要编写具体的ObjectAction实现类进行处理。通过Client.addActionMap方法进行添加。这样程序会回调处理
4 、服务端
由于客户端会定时keepAliveDelay毫秒发送维持连接的信息过来所以服务端要有一个检测机制。
即当服务端receiveTimeDelay毫秒程序中是3秒内未接收任何数据则自动断开与客户端的连接。
ActionMapping的原理与客户端相似相同。
通过添加相应的ObjectAction实现类可以实现不同对象的响应、应答过程。
5、代码实现
5.1 KeepAlive.java
package com.zyz.mynative.demo02;/*** author zyz* version 1.0* data 2023/2/15 11:05* Description:*/
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;/**** 维持连接的消息对象心跳对象* author zyz* version 1.0* data 2023/2/15 11:05* Description:*/
public class KeepAlive implements Serializable{private static final long serialVersionUID -2813120366138988480L;/* 覆盖该方法仅用于测试使用。* see java.lang.Object#toString()*/Overridepublic String toString() {return new SimpleDateFormat(yyyy-MM-dd HH:mm:ss).format(new Date())\t维持连接包;}}5.2 MyClient.java
package com.zyz.mynative.demo02;/*** author zyz* version 1.0* data 2023/2/15 11:06* Description:*/import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;/*** C/S架构的客户端对象持有该对象可以随时向服务端发送消息。** author zyz* version 1.0* data 2023/2/15 11:06* Description:*/
public class MyClient {/*** 处理服务端发回的对象可实现该接口。*/public static interface ObjectAction {void doAction(Object obj, MyClient client);}public static final class DefaultObjectAction implements ObjectAction {Overridepublic void doAction(Object obj, MyClient client) {System.out.println(处理\t obj.toString());}}private String serverIp;private int port;private Socket socket;//连接状态private boolean running false;/*** 最后一次发送数据的时间*/private long lastSendTime;/*** 用于保存接收消息对象类型及该类型消息处理的对象*/private ConcurrentHashMapClass, ObjectAction actionMapping new ConcurrentHashMapClass, ObjectAction();public MyClient(String serverIp, int port) {this.serverIp serverIp;this.port port;}public void start() throws UnknownHostException, IOException {if (running) {return;}socket new Socket(serverIp, port);System.out.println(本地端口 socket.getLocalPort());lastSendTime System.currentTimeMillis();running true;//保持长连接的线程每隔2秒项服务器发一个一个保持连接的心跳消息new Thread(new KeepAliveWatchDog()).start();//接受消息的线程处理消息new Thread(new ReceiveWatchDog()).start();}public void stop() {if (running) {running false;};}/*** 添加接收对象的处理对象。** param cls 待处理的对象其所属的类。* param action 处理过程对象。*/public void addActionMap(ClassObject cls, ObjectAction action) {actionMapping.put(cls, action);}public void sendObject(Object obj) throws IOException {ObjectOutputStream oos new ObjectOutputStream(socket.getOutputStream());oos.writeObject(obj);System.out.println(发送\t obj);oos.flush();}class KeepAliveWatchDog implements Runnable {long checkDelay 10;//两秒钟检测一次long keepAliveDelay 2000;Overridepublic void run() {while (running) {if (System.currentTimeMillis() - lastSendTime keepAliveDelay) {try {MyClient.this.sendObject(new KeepAlive());} catch (IOException e) {e.printStackTrace();MyClient.this.stop();}lastSendTime System.currentTimeMillis();} else {try {Thread.sleep(checkDelay);} catch (InterruptedException e) {e.printStackTrace();MyClient.this.stop();}}}}}class ReceiveWatchDog implements Runnable {Overridepublic void run() {while (running) {try {InputStream in socket.getInputStream();if (in.available() 0) {ObjectInputStream ois new ObjectInputStream(in);Object obj ois.readObject();System.out.println(接收\t obj);ObjectAction oa actionMapping.get(obj.getClass());oa oa null ? new DefaultObjectAction() : oa;oa.doAction(obj, MyClient.this);} else {Thread.sleep(10);}} catch (Exception e) {e.printStackTrace();MyClient.this.stop();}}}}public static void main(String[] args) throws UnknownHostException, IOException {String serverIp 127.0.0.1;int port 65432;MyClient client new MyClient(serverIp, port);client.start();}
}
5.3 MyServer
package com.zyz.mynative.demo02;import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;/*** C/S架构的服务端对象。** author zyz* version 1.0* data 2023/2/15 11:08* Description:*/
public class MyServer {/*** 要处理客户端发来的对象并返回一个对象可实现该接口。*/public interface ObjectAction {Object doAction(Object rev, MyServer server);}public static final class DefaultObjectAction implements ObjectAction {Overridepublic Object doAction(Object rev, MyServer server) {System.out.println(处理并返回 rev);return rev;}}private int port;private volatile boolean running false;private long receiveTimeDelay 3000;/*** ConcurrentHashMap是线程安全的ConcurrentHashMap并非锁住整个方法* 而是通过原子操作和局部加锁的方法保证了多线程的线程安全且尽可能减少了性能损耗。*/private ConcurrentHashMapClass, ObjectAction actionMapping new ConcurrentHashMapClass, ObjectAction();private Thread connWatchDog;public MyServer(int port) {this.port port;}/*** 通过继承Runnable ,开启线程*/public void start() {if (running) {return;};running true;connWatchDog new Thread(new ConnWatchDog());connWatchDog.start();}SuppressWarnings(deprecation)public void stop() {if (running) {running false;};if (connWatchDog ! null) {connWatchDog.stop();};}public void addActionMap(ClassObject cls, ObjectAction action) {actionMapping.put(cls, action);}/*** 继承Runnable 重写run方法、实现线程*/class ConnWatchDog implements Runnable {Overridepublic void run() {try {ServerSocket ss new ServerSocket(port, 5);while (running) {Socket s ss.accept();new Thread(new SocketAction(s)).start();}} catch (IOException e) {e.printStackTrace();MyServer.this.stop();}}}/*** 继承Runnable 重写run方法、实现线程通过匿名内部类*/class SocketAction implements Runnable {Socket s;boolean run true;long lastReceiveTime System.currentTimeMillis();public SocketAction(Socket s) {this.s s;}Overridepublic void run() {while (running run) {if (System.currentTimeMillis() - lastReceiveTime receiveTimeDelay) {overThis();} else {try {InputStream in s.getInputStream();if (in.available() 0) {ObjectInputStream ois new ObjectInputStream(in);Object obj ois.readObject();lastReceiveTime System.currentTimeMillis();System.out.println(接收\t obj);ObjectAction oa actionMapping.get(obj.getClass());oa oa null ? new DefaultObjectAction() : oa;Object out oa.doAction(obj, MyServer.this);if (out ! null) {ObjectOutputStream oos new ObjectOutputStream(s.getOutputStream());oos.writeObject(out);oos.flush();}} else {Thread.sleep(10);}} catch (Exception e) {e.printStackTrace();overThis();}}}}private void overThis() {if (run) {run false;};if (s ! null) {try {s.close();} catch (IOException e) {e.printStackTrace();}}System.out.println(关闭 s.getRemoteSocketAddress());}}public static void main(String[] args) {int port 65432;MyServer server new MyServer(port);server.start();}}
5.4 测试结果
先开启服务端在开启客户端 参考资料
1、java实现心跳机制2、java实现长连接