dw网站制作素材,opencart 构建电子商务网站,自己怎么建设购物网站,中小企业做网站贷款写在最前#xff1a;
常用的http协议是无状态的#xff0c;且不能主动响应到客户端。最初想实现状态动态跟踪只能用轮询或者其他效率低下的方式#xff0c;所以引入了websocket协议#xff0c;允许服务端主动向客户端推送数据。在WebSocket API中#xff0c;浏览器和服务…写在最前
常用的http协议是无状态的且不能主动响应到客户端。最初想实现状态动态跟踪只能用轮询或者其他效率低下的方式所以引入了websocket协议允许服务端主动向客户端推送数据。在WebSocket API中浏览器和服务器只需要完成一次握手两者之间就直接可以创建持久性的连接并进行双向数据传输。简单来说就是两个或多个客户端之间不能相互交流要想实现类似一对一聊天的功能实质上就是A客户端发送信息到socket服务器再由socket服务器主动推送到B客户端或者多个客户端实现两个或多个客户端之间的信息传递。
吐槽t-io是个很优秀的socket框架但是文档很少作者写的文档也不明不白的对新手很不友好(花钱除外)其他写的文档不是要钱就是写的巨烂这技术环境真心垃圾。
一、导包导入TIO的两个依赖其他必要依赖不赘述 dependencygroupIdorg.t-io/groupIdartifactIdtio-websocket-spring-boot-starter/artifactIdversion3.6.0.v20200315-RELEASE/version/dependencydependencygroupIdorg.t-io/groupIdartifactIdtio-core-spring-boot-starter/artifactIdversion3.6.0.v20200315-RELEASE/version/dependency二、yml配置
server:port: 8652tio:websocket:server:port: 8078heartbeat-timeout: 12000cluster:enabled: falsecustomPort: 4768 //自定义socket服务端监听端口,其实也可以用上面server.port做监听端口
三、配置参数
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.tio.utils.time.Time;/*** Author 955* Date 2023-07-26 17:25* Description*/
Component
public class CaseServerConfig {/*** 协议名字(可以随便取主要用于开发人员辨识)*/public static final String PROTOCOL_NAME xxxxxxx;public static final String CHARSET utf-8;/*** 监听的ip*/public static final String SERVER_IP null;//null表示监听所有并不指定ip/*** 监听端口*/public static int PORT;/*** 心跳超时时间单位毫秒*/public static final int HEARTBEAT_TIMEOUT 1000 * 60;/*** 服务器地址*/public static final String SERVER 127.0.0.1;/*** ip数据监控统计时间段** author tanyaowu*/public static interface IpStatDuration {public static final Long DURATION_1 Time.MINUTE_1 * 5;public static final Long[] IPSTAT_DURATIONS new Long[]{DURATION_1};}/*** 用于群聊的group id(自定义)*/public static final String GROUP_ID showcase-websocket;Value(${tio.customPort})public void setPort(int port) {PORT port;}}四、实现一些监听类
1.ServerAioListener监听
import org.tio.core.ChannelContext;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioListener;/*** Author 955* Date 2023-07-26 17:24* Description*/
public class ServerAioListenerImpl implements ServerAioListener {Overridepublic void onAfterConnected(ChannelContext channelContext, boolean b, boolean b1) throws Exception {}Overridepublic void onAfterDecoded(ChannelContext channelContext, Packet packet, int i) throws Exception {}Overridepublic void onAfterReceivedBytes(ChannelContext channelContext, int i) throws Exception {}Overridepublic void onAfterSent(ChannelContext channelContext, Packet packet, boolean b) throws Exception {}Overridepublic void onAfterHandled(ChannelContext channelContext, Packet packet, long l) throws Exception {}Overridepublic void onBeforeClose(ChannelContext channelContext, Throwable throwable, String s, boolean b) throws Exception {}Overridepublic boolean onHeartbeatTimeout(ChannelContext channelContext, Long aLong, int i) {return false;}}2.IpStatListener监听(这个可选) import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.core.stat.IpStat;
import org.tio.core.stat.IpStatListener;/*** Author 955* Date 2023-07-27 12:03* Description*/
public class ShowcaseIpStatListener implements IpStatListener {SuppressWarnings(unused)private static Logger log LoggerFactory.getLogger(ShowcaseIpStatListener.class);public static final ShowcaseIpStatListener me new ShowcaseIpStatListener();/****/private ShowcaseIpStatListener() {}Overridepublic void onExpired(TioConfig tioConfig, IpStat ipStat) {//在这里把统计数据入库中或日志
// if (log.isInfoEnabled()) {
// log.info(可以把统计数据入库\r\n{}, Json.toFormatedJson(ipStat));
// }}Overridepublic void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect, IpStat ipStat) throws Exception {
// if (log.isInfoEnabled()) {
// log.info(onAfterConnected\r\n{}, Json.toFormatedJson(ipStat));
// }}Overridepublic void onDecodeError(ChannelContext channelContext, IpStat ipStat) {
// if (log.isInfoEnabled()) {
// log.info(onDecodeError\r\n{}, Json.toFormatedJson(ipStat));
// }}Overridepublic void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess, IpStat ipStat) throws Exception {
// if (log.isInfoEnabled()) {
// log.info(onAfterSent\r\n{}\r\n{}, packet.logstr(), Json.toFormatedJson(ipStat));
// }}Overridepublic void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize, IpStat ipStat) throws Exception {
// if (log.isInfoEnabled()) {
// log.info(onAfterDecoded\r\n{}\r\n{}, packet.logstr(), Json.toFormatedJson(ipStat));
// }}Overridepublic void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes, IpStat ipStat) throws Exception {
// if (log.isInfoEnabled()) {
// log.info(onAfterReceivedBytes\r\n{}, Json.toFormatedJson(ipStat));
// }}Overridepublic void onAfterHandled(ChannelContext channelContext, Packet packet, IpStat ipStat, long cost) throws Exception {
// if (log.isInfoEnabled()) {
// log.info(onAfterHandled\r\n{}\r\n{}, packet.logstr(), Json.toFormatedJson(ipStat));
// }}}3.WsServerAioListener监听 import com.wlj.config.CaseServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.common.WsSessionContext;
import org.tio.websocket.server.WsServerAioListener;/*** Author 955* Date 2023-07-27 12:01* Description*/
public class ShowcaseServerAioListener extends WsServerAioListener {private static Logger log LoggerFactory.getLogger(ShowcaseServerAioListener.class);public static final ShowcaseServerAioListener me new ShowcaseServerAioListener();private ShowcaseServerAioListener() {}Overridepublic void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {super.onAfterConnected(channelContext, isConnected, isReconnect);if (log.isInfoEnabled()) {log.info(onAfterConnected\r\n{}, channelContext);}}Overridepublic void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {super.onAfterSent(channelContext, packet, isSentSuccess);if (log.isInfoEnabled()) {log.info(onAfterSent\r\n{}\r\n{}, packet.logstr(), channelContext);}}Overridepublic void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {super.onBeforeClose(channelContext, throwable, remark, isRemove);if (log.isInfoEnabled()) {log.info(onBeforeClose\r\n{}, channelContext);}WsSessionContext wsSessionContext (WsSessionContext) channelContext.getAttribute();if (wsSessionContext ! null wsSessionContext.isHandshaked()) {int count Tio.getAllChannelContexts(channelContext.tioConfig).getObj().size();String msg channelContext.getClientNode().toString() 离开了现在共有【 count 】人在线;//用tio-websocket服务器发送到客户端的Packet都是WsResponseWsResponse wsResponse WsResponse.fromText(msg, CaseServerConfig.CHARSET);//群发Tio.sendToGroup(channelContext.tioConfig, CaseServerConfig.GROUP_ID, wsResponse);}}Overridepublic void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {super.onAfterDecoded(channelContext, packet, packetSize);if (log.isInfoEnabled()) {log.info(onAfterDecoded\r\n{}\r\n{}, packet.logstr(), channelContext);}}Overridepublic void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {super.onAfterReceivedBytes(channelContext, receivedBytes);if (log.isInfoEnabled()) {log.info(onAfterReceivedBytes\r\n{}, channelContext);}}Overridepublic void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {super.onAfterHandled(channelContext, packet, cost);if (log.isInfoEnabled()) {log.info(onAfterHandled\r\n{}\r\n{}, packet.logstr(), channelContext);}}}4.IWsMsgHandler拦截里面逻辑根据具体业务但是必须实现这个不然启动报错
package com.wlj.im;import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.server.handler.IWsMsgHandler;/*** Author 955* Date 2023-07-31 18:26* Description*/
Slf4j
Component
public class WebSocketMessageHandler implements IWsMsgHandler {/*** TIO-WEBSOCKET 配置信息*/public static TioConfig serverTioConfig;Overridepublic HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {serverTioConfig channelContext.tioConfig;return httpResponse;}Overridepublic void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {// 拿到用户idString id httpRequest.getParam(id);// 绑定用户Tio.bindUser(channelContext, id);// 绑定业务类型(根据业务类型判定处理相关业务)String bsId httpRequest.getParam(bsId);if (StringUtils.isNotBlank(bsId)) {Tio.bindBsId(channelContext, bsId);}// 给用户发送消息WsResponse wsResponse WsResponse.fromText(您已成功连接 WebSocket 服务器, UTF-8);Tio.sendToUser(channelContext.tioConfig, id, wsResponse);}Overridepublic Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {return null;}Overridepublic Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {// 关闭连接Tio.remove(channelContext, WebSocket Close);return null;}Overridepublic Object onText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {WsResponse wsResponse WsResponse.fromText(服务器已收到消息 s, UTF-8);Tio.sendToUser(channelContext.tioConfig, userid, wsResponse);return null;}
}
五、一些消息体根据业务需求
import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.Setter;
import org.tio.core.intf.Packet;import java.util.List;/*** Author 955* Date 2023-07-26 17:26* Description 消息体*/
Setter
Getter
public class MindPackage extends Packet {private static final long serialVersionUID -172060606924066412L;public static final String CHARSET utf-8;private ListJSONObject body;}import com.alibaba.fastjson.JSONObject;
import lombok.Getter;
import lombok.Setter;
import org.tio.core.intf.Packet;import java.io.Serializable;/*** Author 955* Date 2023-07-26 17:27* Description 响应消息体*/
Getter
Setter
public class ResponsePackage extends Packet {private static final long serialVersionUID -172060606924066412L;public static final String CHARSET utf-8;//响应具体内容private JSONObject body;//电话号码private String phoneNum;// 下发指令类型private Integer type;
}六、一些vo根据实际业务来
import lombok.Data;import java.io.Serializable;/*** Author 955* Date 2023-07-26 17:28* Description 客户端接收指令类型*/
Data
public class ClientDirectivesVo implements Serializable {// 结束上报指令public static final int END_REPORT_RESPONSE 0;// 心跳检查指令public static final int HEART_BEET_REQUEST 1;// GPS开始上报指令public static final int GPS_START_REPORT_RESPONSE 2;// 客户端数据下发public static final int DATA_DISTRIBUTION 3;// 0:结束上报指令1:心跳检测指令2GPS开始上报指令,3:客户端数据下发private Integer type;}import lombok.Data;import java.io.Serializable;/*** Author 955* Date 2023-07-26 17:29* Description 业务实体vo,根据自己业务来*/
Data
public class PositioningDataReportVo implements Serializable {private String userId;private String name;private String phone;private String type;}import lombok.Data;import java.io.Serializable;/*** Author 955* Date 2023-07-26 17:30* Description 回执方法vo*/
Data
public class ReceiptDataVo implements Serializable {//所属用户idprivate String userId;//所属用户电话号码private String phone;//xxx具体业务字段private String yl;}import lombok.Data;import java.io.Serializable;/*** Author 955* Date 2023-07-26 17:31* Description 响应vo*/
Data
public class ResponseVo implements Serializable {//响应类型private Integer type;//响应值private Integer value;}七、具体业务方法 import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wlj.tcp.MindPackage;
import com.wlj.tcp.ResponsePackage;
import com.wlj.vo.ClientDirectivesVo;
import com.wlj.vo.PositioningDataReportVo;
import com.wlj.vo.ReceiptDataVo;
import com.wlj.vo.ResponseVo;
import jodd.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioHandler;
import org.tio.utils.hutool.CollUtil;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;/*** Author 955* Date 2023-07-26 17:27* Description 具体业务方法*/
Slf4j
public class ServerAioHandlerImpl implements ServerAioHandler {private static AtomicInteger counter new AtomicInteger(0);private MapString, ChannelContext channelMaps new ConcurrentHashMap();private QueueResponsePackage respQueue new LinkedBlockingQueue();private QueueResponsePackage heartQueue new LinkedBlockingQueue();public boolean offer2SendQueue(ResponsePackage respPacket) {return respQueue.offer(respPacket);}public QueueResponsePackage getRespQueue() {return respQueue;}public boolean offer2HeartQueue(ResponsePackage respPacket) {return heartQueue.offer(respPacket);}public MapString, ChannelContext getChannelMaps() {return channelMaps;}/*** 解码把接收到的ByteBuffer解码成应用可以识别的业务消息包* 总的消息结构消息体* 消息体结构 对象的json串的16进制字符串*/Overridepublic MindPackage decode(ByteBuffer buffer, int i, int i1, int i2, ChannelContext channelContext) throws AioDecodeException {MindPackage imPacket new MindPackage();try {ListJSONObject msgList new ArrayList();//Charset charset Charset.forName(UTF-8);//这里使用UTF-8收中文时会报错Charset charset Charset.forName(GBK);CharsetDecoder decoder charset.newDecoder();CharBuffer charBuffer decoder.decode(buffer);String str charBuffer.toString();if (str.indexOf({) ! 0) {str str.substring(str.indexOf({));}if (str.indexOf(}{) -1) {String[] split str.split(});ListString list Arrays.asList(split);list.forEach(item - {item };msgList.add(JSON.parseObject(item));});} else {msgList.add(JSON.parseObject(str));}log.info(收到 msgList.size() 条消息);imPacket.setBody(msgList);return imPacket;} catch (Exception e) {return imPacket;}}/*** 编码把业务消息包编码为可以发送的ByteBuffer*/Overridepublic ByteBuffer encode(Packet packet, TioConfig groupContext, ChannelContext channelContext) {ResponsePackage helloPacket (ResponsePackage) packet;JSONObject body helloPacket.getBody();//写入消息体try {return ByteBuffer.wrap(body.toJSONString().getBytes(GB2312));} catch (UnsupportedEncodingException e) {}return null;}/*** 处理消息(最核心的方法)*/Overridepublic void handler(Packet packet, ChannelContext channelContext) throws Exception {MindPackage helloPacket (MindPackage) packet;ListJSONObject msgList helloPacket.getBody();if (CollectionUtil.isNotEmpty(msgList)) {msgList.forEach(body - {if (body ! null) {log.info(收到设备上报信息 body);// 获取指令Integer type body.getInteger(type);if (type ! null) {channelContext.set(type, type);String phoneNum body.getString(phoneNum);String content body.getString(content);Tio.bindToken(channelContext, phoneNum);ResponsePackage respPacket new ResponsePackage();switch (type) {// 接收下线指令case ClientDirectivesVo.END_REPORT_RESPONSE://保存连接channelMaps.put(phoneNum, channelContext);//TODO 更改客户端状态为下线状态log.info(收到{}客户端下线通知, phoneNum);Tio.unbindUser(channelContext.tioConfig, phoneNum);respPacket.setPhoneNum(您已下线);// 回执方法receiptHandler(respPacket, phoneNum, ClientDirectivesVo.END_REPORT_RESPONSE);break;case ClientDirectivesVo.HEART_BEET_REQUEST: //接收心跳检查指令//保存连接channelMaps.put(phoneNum, channelContext);Tio.bindUser(channelContext, phoneNum);log.info(收到{}客户端心跳检查指令, phoneNum);// 回执方法receiptHandler(respPacket, phoneNum, ClientDirectivesVo.HEART_BEET_REQUEST);break;case ClientDirectivesVo.GPS_START_REPORT_RESPONSE: //开始上报GPS指令//保存连接channelMaps.put(phoneNum, channelContext);// PositioningDataReportVo vo JSONObject.toJavaObject(body, PositioningDataReportVo.class);log.info(收到{}客户端上报GPS指令上报数据{}, phoneNum, vo);// 回执方法receiptHandler(respPacket, phoneNum, ClientDirectivesVo.GPS_START_REPORT_RESPONSE);break;case ClientDirectivesVo.DATA_DISTRIBUTION: //开始下发数据指令//保存连接channelMaps.put(phoneNum, channelContext);log.info(收到{}客户端下发数据指令, phoneNum);SetWithLockChannelContext obj Tio.getByUserid(channelContext.tioConfig, phoneNum);if (ObjectUtil.isEmpty(obj)) {// 回执方法respPacket.setBody(JSONObject.parseObject({\type\:\该用户不在线\}));receiptHandler(respPacket, phoneNum, ClientDirectivesVo.GPS_START_REPORT_RESPONSE);} else {// 回执方法DataDistributionReportVo data new DataDistributionReportVo();data.setPhone(phoneNum);data.setServiceInfo(content);// 回复时的设备标志必填respPacket.setPhoneNum(phoneNum);respPacket.setBody((JSONObject) JSON.toJSON(data));respPacket.setType(ClientDirectivesVo.DATA_DISTRIBUTION);Tio.sendToUser(channelContext.tioConfig, phoneNum, respPacket);}break;}}}});}return;}/*** 回执信息方法** Author: laohuang* Date: 2022/11/24 13:53*/public void receiptHandler(ResponsePackage respPacket, String phoneNum, Integer clientDirectives) {// 回执信息//ResponseVo callVo new ResponseVo();//callVo.setType(clientDirectives);// 响应结果 1成功 0失败//callVo.setValue(1);// 回复时的设备标志必填respPacket.setPhoneNum(phoneNum);//respPacket.setBody((JSONObject) JSON.toJSON(callVo));respPacket.setType(clientDirectives);offer2SendQueue(respPacket);}private Object locker new Object();public ServerAioHandlerImpl() {try {new Thread(() - {while (true) {try {ResponsePackage respPacket respQueue.poll();if (respPacket ! null) {synchronized (locker) {String phoneNum respPacket.getPhoneNum();ChannelContext channelContext channelMaps.get(phoneNum);if (channelContext ! null) {Boolean send Tio.send(channelContext, respPacket);String s JSON.toJSONString(respPacket);System.err.println(发送数据 s);System.err.println(数据长度 s.getBytes().length);log.info(下发设备指令 设备ip channelContext 设备[ respPacket.getPhoneNum() ] (send ? 成功 : 失败) 消息: JSON.toJSONString(respPacket.getBody()));}}}} catch (Exception e) {log.error(e.getMessage());} finally {log.debug(发送队列大小 respQueue.size());ThreadUtil.sleep(10);}}}).start();} catch (Exception e) {e.printStackTrace();}}/*** 确保只有一个呼叫器响应后修改呼叫记录** param recordId 记录id* param resCallSn 响应的呼叫器sn*/public synchronized void updateCallRecordAndStopResponse(Long recordId, String resCallSn, String sn) {}
}八、启动类(加上EnableTioWebSocketServer表明作为Socket服务端)
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.tio.websocket.starter.EnableTioWebSocketServer;SpringBootApplication
EnableTioWebSocketServer
public class PartApplication {public static void main(String[] args) {SpringApplication.run(PartApplication.class, args);}
}九、使用NetAssist测试工具测试效果0积分下载即可 https://download.csdn.net/download/m0_49605579/88106789?spm1001.2014.3001.5503
注这里远程主机端口为yml内配置的tioPort即为项目启动时控制台打印的监听端口连接上就可以发送数据到服务器工具可以打开多个模拟多个客户端。 写在最后 这里说一下主要业务这个handler的逻辑 第一步 A用户发送{“type”:1,“phoneNum”:“用户A”}对应type:HEART_BEET_REQUEST使用Tio.bindUser(channelContext, userId);绑定该用户。 B用户按上述同样操作{“type”:1,“phoneNum”:“用户A”}
第二步 A用户发送{“type”:3,“content”:“发送消息到用户B”,“phoneNum”:“用户B”}对应type:DATA_DISTRIBUTION通过服务器下发指令服务器这里先判断是否在线如果在线就把A用户发的消息推送给手机号是用户B的B用户此时B用户实时收到消息。
效果图type分别控制用户发送消息、上线、下线