公司网站的具体步骤,四川住房和城乡建设厅网站,企业网站剖析,外贸网站赚钱系统为centos#xff0c;基于emqx搭建broker#xff0c;流程参考官方。
安装好后#xff0c;用ssl加密。
进入/etc/emqx/certs,可以看到 分别为
cacert.pem CA 文件cert.pem 服务端证书key.pem 服务端keyclient-cert.pem 客户端证书client-key.pem 客户端key 编辑emqx配…系统为centos基于emqx搭建broker流程参考官方。
安装好后用ssl加密。
进入/etc/emqx/certs,可以看到 分别为
cacert.pem CA 文件cert.pem 服务端证书key.pem 服务端keyclient-cert.pem 客户端证书client-key.pem 客户端key 编辑emqx配置vim /etc/emqx/emqx.conf,添加ssl配置
listeners.ssl.default {# 端口bind 0.0.0.0:8883ssl_options {cacertfile /etc/emqx/certs/cacert.pem #CA文件certfile /etc/emqx/certs/cert.pem #服务端证书keyfile /etc/emqx/certs/key.pem #服务端keyverify verify_peer # 双向认证fail_if_no_peer_cert true}
}
再在客户端如MQTTX配置连接信息
Springboot订阅MQTTS
添加依赖 !--mqtt--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-integration/artifactId/dependencydependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-stream/artifactId/dependencydependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-mqtt/artifactId/dependency!--ssl--dependencygroupIdorg.bouncycastle/groupIdartifactIdbcpkix-jdk15on/artifactIdversion1.64/version/dependency
配置
spring:mqtt:provider:#MQTTS服务地址端口号默认8883如果有多个用逗号隔开url: ssl://192.168.1.xx:8883#用户名username: xx#密码password: xxxx#客户端id(不能重复)client:id: provider-id#MQTT默认的消息推送主题实际可在调用接口是指定default:topic: topicconsumer:url: ssl://192.168.1.xx:8883#用户名username: xx#密码password: xxxx#客户端id不能重复client:id: consumer-id#MQTT默认的消息推送主题实际可在调用接口时指定default:topic: topicMqtt配置
Configuration
public class MqttConsumerConfig {Value(${spring.mqtt.consumer.username})private String username;Value(${spring.mqtt.consumer.password})private String password;Value(${spring.mqtt.consumer.url})private String hostUrl;Value(${spring.mqtt.consumer.client.id})private String clientId;Value(${spring.mqtt.consumer.default.topic})private String defaultTopic;// 把证书文件放在在resource的ssl_certs目录下String caFilePath /ssl_certs/cacert.pem;String clientCrtFilePath /ssl_certs/client-cert.pem;String clientKeyFilePath /ssl_certs/client-key.pem;/*** 客户端对象*/private MqttClient client;/*** 在bean初始化后连接到服务器*/PostConstructpublic void init() {connect();}/*** 客户端连接服务端*/SneakyThrowspublic void connect() {
// try {//创建MQTT客户端对象client new MqttClient(hostUrl, clientId, new MemoryPersistence());//连接设置MqttConnectOptions options new MqttConnectOptions();SSLSocketFactory socketFactory getSocketFactory(caFilePath,clientCrtFilePath, clientKeyFilePath, );options.setSocketFactory(socketFactory);/*允许所有host连接*/options.setSSLHostnameVerifier((s, sslSession) - true);/*不配置可能出现报错java.security.cert.CertificateException: No subject alternative names present*/options.setHttpsHostnameVerificationEnabled(false);//是否清空session设置为false表示服务器会保留客户端的连接记录客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接到服务端都是以新的身份options.setCleanSession(true);//设置连接用户名options.setUserName(username);//设置连接密码options.setPassword(password.toCharArray());//设置超时时间单位为秒options.setConnectionTimeout(100);//设置心跳时间 单位为秒表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);//设置遗嘱消息的话题若客户端和服务器之间的连接意外断开服务器将发布客户端的遗嘱信息options.setWill(willTopic, (clientId 与服务器断开连接).getBytes(), 0, false);options.setAutomaticReconnect(true);//设置回调client.setCallback(new MqttCallbackImpl());client.connect(options);System.out.println( 客户端连接成功 );//订阅主题//消息等级和主题数组一一对应服务端将按照指定等级给订阅了主题的客户端推送消息int[] qos {1, 1};//主题String[] topics {topicX#, topicY#};//订阅主题
// client.subscribe(topicX,1);client.subscribe(topics);
// } catch (Exception e) {
// e.printStackTrace();
// }}/*** 断开连接*/public void disConnect() {try {client.disconnect();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅主题*/public void subscribe(String topic, int qos) {try {client.subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}private static SSLSocketFactory getSocketFactory(final String caCrtFile,final String crtFile, final String keyFile, final String password)throws Exception {ClassPathResource caCrtFileRes new ClassPathResource(caCrtFile);ClassPathResource crtFileRes new ClassPathResource(crtFile);ClassPathResource keyFileRes new ClassPathResource(keyFile);// add BouncyCastle providerSecurity.addProvider(new BouncyCastleProvider());// load CA certificateX509Certificate caCert null;BufferedInputStream bis new BufferedInputStream(caCrtFileRes.getStream());CertificateFactory cf CertificateFactory.getInstance(X.509);while (bis.available() 0) {caCert (X509Certificate) cf.generateCertificate(bis);// System.out.println(caCert.toString());}// load client certificatebis new BufferedInputStream(crtFileRes.getStream());X509Certificate cert null;while (bis.available() 0) {cert (X509Certificate) cf.generateCertificate(bis);// System.out.println(caCert.toString());}// load client private keyPEMParser pemParser new PEMParser(new InputStreamReader(keyFileRes.getStream()));Object object pemParser.readObject();PEMDecryptorProvider decProv new JcePEMDecryptorProviderBuilder().build(password.toCharArray());JcaPEMKeyConverter converter new JcaPEMKeyConverter().setProvider(BC);KeyPair key;if (object instanceof PEMEncryptedKeyPair) {System.out.println(Encrypted key - we will use provided password);key converter.getKeyPair(((PEMEncryptedKeyPair) object).decryptKeyPair(decProv));} else {System.out.println(Unencrypted key - no password needed);key converter.getKeyPair((PEMKeyPair) object);}pemParser.close();// CA certificate is used to authenticate serverKeyStore caKs KeyStore.getInstance(KeyStore.getDefaultType());caKs.load(null, null);caKs.setCertificateEntry(ca-certificate, caCert);TrustManagerFactory tmf TrustManagerFactory.getInstance(X509);tmf.init(caKs);// client key and certificates are sent to server so it can authenticate// usKeyStore ks KeyStore.getInstance(KeyStore.getDefaultType());ks.load(null, null);ks.setCertificateEntry(certificate, cert);ks.setKeyEntry(private-key, key.getPrivate(), password.toCharArray(),new java.security.cert.Certificate[]{cert});KeyManagerFactory kmf KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());kmf.init(ks, password.toCharArray());// finally, create SSL socket factorySSLContext context SSLContext.getInstance(TLSv1.2);context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);return context.getSocketFactory();}}
Slf4j
Component
public class MqttCallbackImpl implements MqttCallback {Overridepublic void connectionLost(Throwable throwable) {log.info([MQTT] 连接断开);}Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String msg new String(message.getPayload());log.info(String.format(接收消息主题 : %s, topic));log.info(String.format(接收消息Qos : %d, message.getQos()));log.info(String.format(接收消息内容 : %s, msg));log.info(String.format(接收消息retained : %b, message.isRetained()));}Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info(发送消息成功);}
}