长春网站建设q479185700強,深圳高端网站建设公司,聊城网架公司,网站开发信息文档证书生成
生成证书以及jks参考以下文章 https://blog.csdn.net/qq_41527073/article/details/121148600
证书转换jks - pem
需要转化成p12以下转换才能适配confluent_kafka包#xff0c;直接jks转pem会报错不能使用#xff0c;具体参考以下文章 https://www.ngui.cc/z…证书生成
生成证书以及jks参考以下文章 https://blog.csdn.net/qq_41527073/article/details/121148600
证书转换jks - pem
需要转化成p12以下转换才能适配confluent_kafka包直接jks转pem会报错不能使用具体参考以下文章 https://www.ngui.cc/zz/1104321.html?actiononClick keytool -importkeystore -srckeystore server.truststore.jks -destkeystore server.p12 -deststoretype PKCS12 openssl pkcs12 -in server.p12 -nokeys -out server.cer.pem keytool -importkeystore -srckeystore server.keystore.jks -destkeystore client.p12 -deststoretype PKCS12 openssl pkcs12 -in client.p12 -nokeys -out client.cer.pem openssl pkcs12 -in client.p12 -nodes -nocerts -out client.key.pem (转换过程) 生成证书目录结构
python客户端示例代码
import rsa, json
import time, sys
from kafka import KafkaProducer
import confluent_kafka
import ssl
def encrypt(msg):with open(public.pem, rb) as publickfile:p publickfile.read()pubkey rsa.PublicKey.load_pkcs1(p)original_text msg.encode(utf8)crypt_text rsa.encrypt(original_text, pubkey)return crypt_textdef decrypt(data):with open(private.pem, rb) as privatefile:p privatefile.read()privkey rsa.PrivateKey.load_pkcs1(p)crypt_text dataoriginal_text rsa.decrypt(crypt_text, privkey)return original_text.decode(utf8)def produce_message():producer: KafkaProducer Nonesuccess 0conn_error 0msg {type: webclone,api: delete,state: True,nodename: node-1,uuid: asjdkjrh}context ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)context.set_ciphers(TLSv1:TLSv1.2)context.check_hostname Falsecontext.verify_mode ssl.CERT_NONEwhile success 5:try:producer.send(message_push, valuejson.dumps(msg))producer.flush()success 1except KeyboardInterrupt:breakexcept:while True:try:producer KafkaProducer(bootstrap_servers[172.XX.X.XX:9093],acks 1,security_protocolSASL_SSL,ssl_cafile/Kafka/config/ssl/server.cer.pem,ssl_certfile/Kafka/config/ssl/client.cer.pem,ssl_keyfile/Kafka/config/ssl/client.key.pem,ssl_contextcontext,sasl_mechanismPLAIN,sasl_plain_usernamekafka,sasl_plain_passwordXXXXX,api_version (2, 0),)breakexcept Exception as e:producer Noneprint(e)conn_error 1time.sleep(1)print(fconnect error: {conn_error})if __name__ __main__:if len(sys.argv) 2:print(Usage: python3 test.py produce|consume)sys.exit(1)start_time time.time()if sys.argv[1] produce:produce_message()elif sys.argv[1] consume:consume_message()end_time time.time()print(fstart at: {start_time}, end at: {end_time}, cost: {end_time - start_time} seconds)