最低的成本做网站,孝感网页设计,做cpa用什么类型的网站好,网站seo推广营销1.软件的选型
1.1.使用免费版EMQX
1.1.1.下载
百度搜索的目前是会打开官网#xff0c;这里提供下免费版的使用链接EMQX使用手册
文档很详细#xff0c;这里不再记录了。 1.2.使用rabbitmq
rabbitmq一般做消息队列用#xff0c;作为mqtt用我没有找到详细资料#xff0c…1.软件的选型
1.1.使用免费版EMQX
1.1.1.下载
百度搜索的目前是会打开官网这里提供下免费版的使用链接EMQX使用手册
文档很详细这里不再记录了。 1.2.使用rabbitmq
rabbitmq一般做消息队列用作为mqtt用我没有找到详细资料这里总结下使用方法
1.window安装rabbitmq 首先安装rabbitmq得依赖也就是opt_win64_24.0.exe,然后傻瓜式安装接可 安装完毕进入安装目录下sbin文件夹
1.浏览器查看插件 执行命令rabbitmq-plugins enable rabbitmq_management 回车浏览器输入http://127.0.0.1:15672/#/看到此页面及安装成功默认账号密码均是 guest 2.注意如果做mqtt使用的话需安装mqtt插件 安装命令rabbitmq-plugins enable rabbitmq_mqtt 执行完命令在浏览器上查看 mqtt及其端口号出现了的话就证明安装成功下面就可以开始整合了 2.linux安装rabbitmq
以前公司都是用window服务器没用过linux折腾了好久安装 erlang与rabbitmq不对应 不是最新 等等一系列问题最后看了一个视频 用 dock安装 根据官网docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management 一句话就可以安装 如果后期需要安装插件docker exec 容器id rabbitmq-plugins enable rabbitmq_mqtt ps:查看容器id 方法1.使用docker ps -aqf “namecontainername” -------简短容器id 2.docker inspect --format{{.Id}} container_name -------详情容器id
带密码启动dockdocker run -it --rm --name rabbitmq -e RABBITMQ_DEFAULT_USERadmin -e RABBITMQ_DEFAULT_PASS密码 -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbitmq:management15672 是rabbitmq management管理界面默认访问端口 5672 是amqp默认端口 1883 是mqtt tcp协议默认端口 15675 是web_mqtt ws协议默认端口 2.springboot集成mqtt
2.1yml文件集成配置
iot:mqtt:clientId: mqttClientOutputIdsendTopic: ktcotrl/dy/#topics:- /ktcotrl/#- gateway/# default:topic: /ktcotrl/dy/*****qos: 1receive:enable: trueserverClientId: mqttClientInputIdservers: tcp://ip:1883username: usernamepassword: password
2.2主要代码 Slf4j
Configuration
public class IotMqttSubscriberConfig {Autowiredprivate MqttConfig mqttConfig;/*** MQTT连接器选项* **/Bean(value getMqttConnectOptions)public MqttConnectOptions getMqttConnectOptions1() {MqttConnectOptions mqttConnectOptions new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录这里设置为true表示每次连接到服务器都以新的身份连接mqttConnectOptions.setCleanSession(true);// 设置超时时间 单位为秒mqttConnectOptions.setConnectionTimeout(10);mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(mqttConfig.getUsername());mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getServers()});// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线但这个方法并没有重连的机制mqttConnectOptions.setKeepAliveInterval(10);// 设置“遗嘱”消息的话题若客户端与服务器之间的连接意外中断服务器将发布客户端的“遗嘱”消息。//mqttConnectOptions.setWill(willTopic, WILL_DATA, 2, false);return mqttConnectOptions;}Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions1());return factory;}Beanpublic MessageChannel iotMqttInputChannel() {return new DirectChannel();}Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),mqttClientFactory(),mqttConfig.getTopics().toArray(new String[0]));
// mqttConfig.getDefaultTopic());adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(2);adapter.setOutputChannel(iotMqttInputChannel());return adapter;}BeanServiceActivator(inputChannel iotMqttInputChannel)public MessageHandler handler() {return new MessageHandler() {Overridepublic void handleMessage(Message? message) throws MessagingException {String topic (String) message.getHeaders().get(mqtt_receivedTopic);
// msgid message.getHeaders().get(id);String messageContents message.getPayload().toString();//操作}};}Beanpublic MessageChannel defaultMqttInputChannel() {return new DirectChannel();}Value(${iot.mqtt.default.topic})private String defaultTopic;/*** 说明* ConditionalOnProperty(value driver.mqtt.default.receive.enable)* 根据配置属性driver.mqtt.default.receive.enable选择是否开启 Default Topic 主题的数据接收逻辑** return*/
// Bean
// ConditionalOnProperty(value iot.mqtt.default.receive.enable)
// public MessageProducer defaultInbound() {
// MqttPahoMessageDrivenChannelAdapter adapter
// new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),
// mqttClientFactory(),
// defaultTopic);
// adapter.setCompletionTimeout(5000);
// adapter.setConverter(new DefaultPahoMessageConverter());
// adapter.setQos(2);
// adapter.setOutputChannel(defaultMqttInputChannel());
// return adapter;
// }/*** 说明* ConditionalOnProperty(value iot.mqtt.default.receive.enable)* 根据配置属性driver.mqtt.default.receive.enable选择是否开启 Default Topic 主题的数据接收逻辑** return*/
// Bean
// ServiceActivator(inputChannel defaultMqttInputChannel)
// ConditionalOnProperty(value iot.mqtt.default.receive.enable)
// public MessageHandler defaultHandler() {
//
// return message - {
// log.info(
// defaultTopicReceiver\nheader:{},\npayload:{},
// JSON.toJSONString(message.getHeaders(), true),
// JSON.toJSONString(message.getPayload(), true)
// );
// };
// }
}
Getter
Setter
Component
IntegrationComponentScan
ConfigurationProperties(prefix iot.mqtt)
public class MqttConfig {/*** 服务地址*/private String servers;/*** 客户端id*/private String clientId;
/*
** 服务端id*/private String serverClientId;
/*
** 默认主题*/private String[] defaultTopic;private String sendTopic;/*** 用户名和密码*/private String username;private String password;private ListString topics;
}
Configuration
IntegrationComponentScan
EnableIntegration
public class IotMqttSendConfig {Autowiredprivate MqttConfig mqttConfig;/*** 将channel绑定到MqttClientFactory上* ServiceActivator 表明当前方法用于处理Mqtt消息inputChannel用于接收消息的通道*/BeanServiceActivator(inputChannel mqttOutboundChannel)public MessageHandler mqttOutbound() {DefaultMqttPahoClientFactory factory new DefaultMqttPahoClientFactory();MqttConnectOptions mqttConnectOptionsnew MqttConnectOptions();mqttConnectOptions.setUserName(mqttConfig.getUsername());mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getServers()});mqttConnectOptions.setKeepAliveInterval(2);factory.setConnectionOptions(mqttConnectOptions);MqttPahoMessageHandler messageHandler new MqttPahoMessageHandler(mqttConfig.getServerClientId(), factory);messageHandler.setAsync(true);messageHandler.setDefaultRetained(false);messageHandler.setDefaultTopic(mqttConfig.getSendTopic());return messageHandler;}/* 发布者 */Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}
}
RestController
RequestMapping(/path)
Slf4j
public class WkqController {Autowiredprivate IotMqttGateway mqttGateway;RequestMapping(/test)ResponseBodypublic void test() {//topic:主题mqttGateway.sendMessage2MqttHex( topic,1, sendStr);}
/*** description rabbitmq mqtt协议网关接口*/
MessagingGateway(defaultRequestChannel mqttOutboundChannel)
public interface IotMqttGateway {void sendMessage2Mqtt(String data);void sendMessage2Mqtt(String data, Header(MqttHeaders.TOPIC) String topic);void sendMessage2Mqtt(Header(MqttHeaders.TOPIC) String topic,Header(MqttHeaders.QOS) int qos, String payload);void sendMessage2MqttHex(Header(MqttHeaders.TOPIC) String topic,Header(MqttHeaders.QOS) int qos, byte[] payload);void sendMessage3Mqtt(Header(MqttHeaders.TOPIC) String topic,Header(MqttHeaders.RECEIVED_TOPIC)String revicetopic,Header(MqttHeaders.QOS) int qos, String payload);
}