h5用什么网站来做,北京市建设厅门户网站6,郑州现在可以正常出入吗,wordpress创建文档系统1.环境准备#xff1a;
使用如下3台主机搭建zookeeper集群#xff0c;由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内#xff0c;故端口改为了8093。 172.2.1.69:8093 172.2.1.70:8093 172.2.1.71:8093
2.下载地址
去官网下载#xff0c;或者使用如…1.环境准备
使用如下3台主机搭建zookeeper集群由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内故端口改为了8093。 172.2.1.69:8093 172.2.1.70:8093 172.2.1.71:8093
2.下载地址
去官网下载或者使用如下仓库地址下载本次使用的时kafka_2.13-3.6.1.tgz 即3.6.1版本前面的2.13是scala版本该版本是较新的版本可以使用zookeeper也可以不使用zookeeper搭建集群本次记录使用了zkzk集群的部署可以参考上一篇记录。
# 软件包下载地址可以切到/kafka/路径选择自己需要的版本
https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz3.软件包下载解压
在上面3台服务器上分别执行wget下载或者本地下载后上传本次使用的环境为堡垒机接入如果使用的是宿主机账密登陆可以下载配置一台其余使用SCP命令拷贝过去即可。
cd /usr/local/wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgztar -zxvf kafka_2.13-3.6.1.tgzmv kafka_2.13-3.6.1 kafka4.修改配置
需要修改logs路径的话可以在/kafka路径下新建logs路径并配置到server.properties中这里使用默认的/tmp/kafka-logs路径。
cd kafka
vim conf/server.properties修改内容如下
# 每个节点唯一的id这里.69、.70、.71服务器分别设置为了1、2、3
broker.id1# 默认为9092云服务器开放端口问题改为了8093
port8093# 上一篇记录博客搭建的zk集群地址
zookeeper.connect172.2.1.69:8092,172.2.1.70:8092,172.2.1.71:8092# 配置监听访问、绑定地址这里都是PLAINTEXT协议不需要认证相当于内网访问
listenersPLAINTEXT://0.0.0.0:8093
advertised.listenersPLAINTEXT://172.2.1.71:8093# 日志路径
log.dirs/tmp/kafka-logs
5.启动kafka集群
分别在每个节点的bin路径下执行启动脚本
# 在3个节点分别执行如下命令-daemon表示后台启动不带该参数前台启动
./bin/kafka-server-start.sh -daemon config/server.properties使用jps命令或者去kafka启动日志查看kafka是否启动成功。
6.创建topic
# 创建topic在任一节点执行都可以。./bin/kafka-topics.sh --bootstrap-server172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --create --topic topic-demo --partitions3 --replication-factor3
# 查看topic是否创建成功在任一节点执行
./bin/kafka-topics.sh --bootstrap-server172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo --list7.模拟生产消费消息
需要注意的是网上搜到的一些老的博客kafka命令在高版本中是不再支持的如 sh ./bin/kafka-topics.sh --zookeeperzk集群地址可能出现命令无法识别zookeeper is not a recognized option 需要替换为sh ./bin/kafka-topics.sh --bootstrap-serverkafka集群地址注意–bootstrap-server后面跟的是kafka集群地址不是zookeeper地址。
# 在2个节点启动消费者模拟客户端接收消息在第3个节点启动生产者模拟客户端发送消息
./bin/kafka-console-consumer.sh --bootstrap-server172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo# 在第3个节点启动生产者客户端模拟发送消息hello
./bin/kafka-console-producer.sh --bootstrap-server172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
生产者客户端发送hello 此时可以看到2个消费者模拟客户端都受到了消息hello
8.集成springboot
坐标如下
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependencykafka配置类
package com.example.kafka.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;Configuration
EnableKafka
//RefreshScope
public class KafkaConfig {Value(${xxxx:172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093})private String kafkaServers;public MapString, Object producerConfigs() {MapString, Object props new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}Beanpublic ProducerFactoryString, String producerFactory() {return new DefaultKafkaProducerFactory(producerConfigs());}public MapString, Object consumerConfigs() {MapString, Object props new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);//props.put(ConsumerConfig.GROUP_ID_CONFIG, 0);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}Beanpublic ConsumerFactoryString, String consumerFactory() {return new DefaultKafkaConsumerFactory(consumerConfigs());}BeanConcurrentKafkaListenerContainerFactoryString, String kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryString, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());return factory;}Beanpublic KafkaTemplateString, String kafkaTemplate() {KafkaTemplateString, String kafkaTemplate new KafkaTemplate(producerFactory());return kafkaTemplate;}
}消费者客户端 KafkaListener(topics {topic-demo},groupId test1,properties {auto-offset-reset:latest, enable.auto.commit:true})public void listen(ConsumerRecordString, String consumerRecord) {log.info(consumer Received: consumerRecord);}生产者发送消息
RestController
RequiredArgsConstructor
RequestMapping(producer)
public class ProducerController {private final KafkaTemplateString, String kafkaTemplate;PostMapping(path /sendCommonMsg)public String sendCommonMsg(String topic, String msg) {ListenableFutureSendResultString, String hello_kafka this.kafkaTemplate.send(topic, hello kafka);SendResultString, String sendResult hello_kafka.completable().join();System.out.println(sendResult);return send topic: topic , msg: msg;}
}发送测试
消费者可以接收到消息
consumer Received: ConsumerRecord(topic topic-demo, partition 0, leaderEpoch 0, offset 2, CreateTime 1721720091071, serialized key size -1, serialized value size 5, headers RecordHeaders(headers [], isReadOnly false), key null, value hello)
9.IDEA客户端工具
可以使用kafkalytic工具本地开发环境可视化操作kafka服务器如查看topic创建topic