精美手机网站模板,安卓优化大师官网,企业网站里面的qq咨询怎么做,进入网上商城目录 一、安装JDK1.81、检查服务器是否已安装JDK2、若已安装JDK#xff0c;进行卸载3、更新yum源4、搜索JDK1.8安装包5、安装JDK1.86、查看是否安装成功7、配置环境变量 二、安装Kafka1、下载并解压kafka部署包至/usr/local/目录2、修改server.properties3、修改/etc/profile4… 目录 一、安装JDK1.81、检查服务器是否已安装JDK2、若已安装JDK进行卸载3、更新yum源4、搜索JDK1.8安装包5、安装JDK1.86、查看是否安装成功7、配置环境变量 二、安装Kafka1、下载并解压kafka部署包至/usr/local/目录2、修改server.properties3、修改/etc/profile4、执行/etc/profile5、启动kafka6、topic管理7、生产者管理8、消费者管理9、查看group-id 三、python连接kafka1、安装kafka-python2、消费消息3、生产数据 Kafka的安装需要依赖于jdk和zookeeper。kafka 2.11-1.1.0版本才与JDK1.7兼容更高版本需要JDK1.8 2.8之前版本的Kafka需要单独下载zookeeper2.8及之后的Kafka已经内置了一个zookeeper环境无需单独下载 一、安装JDK1.8
1、检查服务器是否已安装JDK
rpm -qa |grep java
rpm -qa |grep jdk
rpm -qa |grep gcj2、若已安装JDK进行卸载
rpm -qa | grep java | xargs rpm -e --nodeps3、更新yum源
yum update -y4、搜索JDK1.8安装包
yum list java-1.8*5、安装JDK1.8
yum install java-1.8.0-openjdk* -y6、查看是否安装成功
java -version7、配置环境变量
export JAVA_HOME/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.412.b08-1.el7_9.x86_64
export JRE_HOME${JAVA_HOME}/jre
export CLASSPATH$CLASSPATH:.:${JAVA_HOME}/lib:${JAVA_HOME}/jre/lib
export PATH${JAVA_HOME}/bin:${JAVA_HOME}/jre/bin:$PATH二、安装Kafka
1、下载并解压kafka部署包至/usr/local/目录
tar -zxvf kafka_2.12-3.1.1.tgz -C /usr/local/2、修改server.properties
vim /usr/local/kafka_2.12-3.1.1/config/server.properties修改以下内容
listenersPLAINTEXT://192.168.15.128:9092
advertised.listenersPLAINTEXT://192.168.15.128:9092
log.dirs/data/kafka/logs
zookeeper.connectlocalhost:2181local改成192.168.15.128会报错[2024-12-03 11:17:06,427] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)3、修改/etc/profile
vim /etc/profile新增
export KAFKA_HOME/usr/local/kafka_2.12-3.1.1
export PATH$KAFKA_HOME/bin:$PATH4、执行/etc/profile
source /etc/profile5、启动kafka
先启动zookeeper
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties查看是否启动
netstat -tuln | grep 2181再启动kafka
/usr/local/kafka_2.12-3.1.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.1.1/config/server.properties查看是否启动
netstat -tuln | grep 9092
jps #有kafka则为启动后台启动
/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/zookeeper.properties/usr/local/kafka_2.12-3.1.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-3.1.1/config/server.properties6、topic管理
1. 创建topic
# replication-factor指定副本因子。注意指定副本因子的时候不能大于broker实例个数否则报错
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest # 旧版本创建方式新版本只有--bootstrap-server 一种创建topic的方式
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --create --bootstrap-server 192.168.15.128:9092 --replication-factor 1 --partitions 1 --topic demo2. 查询topic详情
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --describe --bootstrap-server 192.168.15.128:9092 --topic demo3. 查询所有topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --list4. 修改topic参数配置
# 注意partition个数count只能增加不能减少
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --alter --topic mytest --parti-tions count5. 删除topic
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics.sh --bootstrap-server 192.168.15.128:9092 --delete --topic mytest5.1 如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enabletrue那么此时的删除并不是真正的删除而是把topic标记为marked for deletion
输入如下命令查看
/usr/local/kafka_2.12-3.1.1/bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic此时你若想真正删除它可以如下操作
1登录zookeeper客户端命令./bin/zookeeper-client
2找到topic所在的目录ls /brokers/topics
3找到要删除的topic执行命令rm -r /brokers/topics/【topic name】即可此时topic被彻底删除。另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得ls /admin/delete_topics/【topic name】如果你删除了此处的topic那么marked for deletion 标记消失
7、生产者管理
# 新起一个终端进入kafka解压目录后输入如下命令。在执行完毕后会进入的编辑器页面此时任意编辑一个消息之后消费者那边的终端可以看到终端中已经打印出了我们刚才发送的消息
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.128:9092 --topic demo8、消费者管理
1.创建消费者有非必须参数分区与consumer之间的关系一个分区不能分给两个consumer,但是两个分区可以分给一个consumer
# 下面的命令可以创建一个用于消费topic为mytest的消费者
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --from-beginning --group testgroup2.从尾部开始取数据必需要指定分区指定分区
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 03.从尾部开始取数据必需要指定分区取指定个数
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.128:9092 --topic demo --offset latest --partition 0 --max-messages 1 9、查看group-id
kafka-consumer-groups.sh --bootstrap-server 192.168.15.128:9092 --list 三、python连接kafka
1、安装kafka-python
pip3 install kafka-python-ng2、消费消息
from kafka import KafkaConsumerkafka_broker 192.168.15.128:9092 # 替换为虚拟机的IP和端口# 创建Kafka消费者
consumer KafkaConsumer(demo, bootstrap_servers[kafka_broker])for message in consumer:print(message.value)3、生产数据
import json
from kafka import KafkaProducer# 指定Kafka代理地址格式为host:port
kafka_broker 192.168.15.128:9092 # 替换为虚拟机的IP和端口# 创建Kafka生产者
producer KafkaProducer(bootstrap_servers[kafka_broker])
# 发送10条消息
for i in range(10):# 创建一个字典然后使用json.dumps()将其转换为JSON格式的字符串并编码为字节串message json.dumps({name: kafka, index: i}).encode(utf-8)producer.send(demo, message)# 如果你需要打印消息内容可以解码字节串并打印print(message.decode(utf-8))# 确保所有消息都已发送
producer.flush()