三创大赛网站建设,wordpress翻页按钮颜色,推广普通话的广告语,装饰设计网站大全Kafka简介集群部署配置Kafka测试Kafka1.Kafka简介 数据缓冲队列。同时提高了可扩展性。具有峰值处理能力#xff0c;使用消息队列能够使关键组件顶住突发的访问压力#xff0c;而不会因为突发的超负荷的请求而完全崩溃。 Kafka是一个分布式、支持分区的#xff08;partition…Kafka简介集群部署配置Kafka测试Kafka1.Kafka简介 数据缓冲队列。同时提高了可扩展性。具有峰值处理能力使用消息队列能够使关键组件顶住突发的访问压力而不会因为突发的超负荷的请求而完全崩溃。 Kafka是一个分布式、支持分区的partition、多副本的replica基于zookeeper协调的分布式消息系统它的最大的特性就是可以实时的处理大量数据以满足各种需求场景比如基于hadoop的批处理系统、低延迟的实时系统、web/nginx日志、访问日志消息服务等等用scala语言编写Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。 特性: 高吞吐量kafka每秒可以处理几十万条消息。 可扩展性kafka集群支持热扩展- 持久性、 可靠性消息被持久化到本地磁盘并且支持数据备份防止数据丢失 容错性允许集群中节点失败若副本数量为n,则允许n-1个节点失败 高并发支持数千个客户端同时读写 它主要包括以下组件 话题Topic是特定类型的消息流。(每条发布到 kafka 集群的消息属于的类别即 kafka 是面向 topic 的。) 生产者Producer是能够发布消息到话题的任何对象(发布消息到 kafka 集群的终端或服务). 消费者Consumer可以订阅一个或多个话题从而消费这些已发布的消息。 服务代理Broker已发布的消息保存在一组服务器中它们被称为代理Broker或Kafka集群。 partition区每个 topic 包含一个或多个 partition。 replicationpartition 的副本保障 partition 的高可用。 leaderreplica 中的一个角色 producer 和 consumer 只跟 leader 交互。 followerreplica 中的一个角色从 leader 中复制数据。 zookeeperkafka 通过 zookeeper 来存储集群的信息。 Zookeeper: ZooKeeper是一个分布式协调服务它的主要作用是为分布式系统提供一致性服务提供的功能包括配置维护、分布式同步等。Kafka的运行依赖ZooKeeper。 也是java微服务里面使用的一个注册中心服务 ZooKeeper主要用来协调Kafka的各个broker不仅可以实现broker的负载均衡而且当增加了broker或者某个broker故障了ZooKeeper将会通知生产者和消费者这样可以保证整个系统正常运转。 在Kafka中,一个topic会被分成多个区并被分到多个broker上分区的信息以及broker的分布情况与消费者当前消费的状态信息都会保存在ZooKeeper中。 2.集群部署 2.1环境 系统Centos-Stream7 节点 192.168.26.166 es01 192.168.26.170 es02 192.168.26.171 es03 软件版本kafka_2.12-3.0.2.tgz 2.2 安装配置jdk8 #yum install -y java-1.8.0-openjdk 2.3 安装配置zookeeper 在配置中要注意每个配置项后面不要有空格否则会导致zookeeper启动不起来 Kafka运行依赖ZKKafka官网提供的tar包中已经包含了ZK这里不再额外下载ZK程序。 配置相互解析---三台机器(在es集群上安装的kafka) # vim /etc/hosts 192.168.26.166 es01 192.168.26.170 es02 192.168.26.171 es03 安装Kafka # wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.2/kafka_2.12-3.0.2.tgz # tar xzvf kafka_2.12-2.8.0.tgz -C /usr/local/ # mv /usr/local/kafka_2.12-2.8.0/ /usr/local/kafka/ 配置zookeeper 在es01节点中 # sed -i s/^[^#]/#/ /usr/local/kafka/config/zookeeper.properties # vim /usr/local/kafka/config/zookeeper.properties #添加如下配置 dataDir/opt/data/zookeeper/data # 需要创建所有节点一致 dataLogDir/opt/data/zookeeper/logs # 需要创建所有节点一致 clientPort2181 tickTime2000 initLimit20 syncLimit10 # 以下 IP 信息根据自己服务器的 IP 进行修改 server.1192.168.19.20:2888:3888 //kafka集群IP:Port server.2192.168.19.21:2888:3888 server.3192.168.19.22:2888:3888 #创建data、log目录 # mkdir -p /opt/data/zookeeper/{data,logs} #创建myid文件 # echo 1 /opt/data/zookeeper/data/myid #myid号按顺序排 在es02节点中 # sed -i s/^[^#]/#/ /usr/local/kafka/config/zookeeper.properties # vim /usr/local/kafka/config/zookeeper.properties dataDir/opt/data/zookeeper/data dataLogDir/opt/data/zookeeper/logs clientPort2181 tickTime2000 initLimit20 syncLimit10 server.1192.168.19.20:2888:3888 server.2192.168.19.21:2888:3888 server.3192.168.19.22:2888:3888 #创建data、log目录 # mkdir -p /opt/data/zookeeper/{data,logs} #创建myid文件 # echo 2 /opt/data/zookeeper/data/myid 在es03节点中 # sed -i s/^[^#]/#/ /usr/local/kafka/config/zookeeper.properties # vim /usr/local/kafka/config/zookeeper.properties dataDir/opt/data/zookeeper/data dataLogDir/opt/data/zookeeper/logs clientPort2181 tickTime2000 initLimit20 syncLimit10 server.1192.168.19.20:2888:3888 server.2192.168.19.21:2888:3888 server.3192.168.19.22:2888:3888 #创建data、log目录 # mkdir -p /opt/data/zookeeper/{data,logs} #创建myid文件 # echo 3 /opt/data/zookeeper/data/myid 配置项含义 dataDir ZK数据存放目录。 dataLogDir ZK日志存放目录。 clientPort 客户端连接ZK服务的端口。 tickTime ZK服务器之间或客户端与服务器之间维持心跳的时间间隔。 initLimit 允许follower连接并同步到Leader的初始化连接时间当初始化连接时间超过该值则表示连接失败。 syncLimit Leader与Follower之间发送消息时如果follower在设置时间内不能与leader通信那么此follower将会被丢弃。 server.1192.168.19.20:2888:3888 2888是follower与leader交换信息的端口3888是当leader挂了时用来执行选举时服务器相互通信的端口。 3.配置Kafka 3.1 配置 # sed -i s/^[^#]/#/ /usr/local/kafka/config/server.properties # vim /usr/local/kafka/config/server.properties #在最后添加 broker.id1 #改 listenersPLAINTEXT://192.168.19.20:9092 #改 num.network.threads3 num.io.threads8 socket.send.buffer.bytes102400 socket.receive.buffer.bytes102400 socket.request.max.bytes104857600 log.dirs/opt/data/kafka/logs num.partitions6 num.recovery.threads.per.data.dir1 offsets.topic.replication.factor2 transaction.state.log.replication.factor1 transaction.state.log.min.isr1 log.retention.hours168 log.segment.bytes536870912 log.retention.check.interval.ms300000 zookeeper.connect192.168.19.20:2181,192.168.19.21:2181,192.168.19.22:2181 zookeeper.connection.timeout.ms6000 group.initial.rebalance.delay.ms0 [rootes01 ~]# mkdir -p /opt/data/kafka/logs 3.2 其他节点配置 只需把配置好的安装包直接分发到其他节点修改 Kafka的broker.id和 listeners就可以了。 3.3 配置项含义 broker.id 每一个broker在集群中的唯一标识要求是正数。在改变IP地址不改变broker.id的时不会影响consumers listenersPLAINTEXT://192.168.19.22:9092 监听地址 num.network.threads broker 处理消息的最大线程数一般情况下不需要去修改 num.io.threads broker处理磁盘IO 的线程数 数值应该大于你的硬盘数 socket.send.buffer.bytes socket的发送缓冲区 socket.receive.buffer.bytes socket的接收缓冲区 socket.request.max.bytes socket请求的最大数值防止serverOOMmessage.max.bytes必然要小于socket.request.max.bytes会被topic创建时的指定参数覆盖 log.dirs 日志文件目录 num.partitions num.recovery.threads.per.data.dir 每个数据目录(数据目录即指的是上述log.dirs配置的目录路径)用于日志恢复启动和关闭时的线程数量。 offsets.topic.replication.factor transaction state log replication factor 事务主题的复制因子设置更高以确保可用性。 内部主题创建将失败直到群集大小满足此复制因素要求 log.cleanup.policy delete 日志清理策略 选择有delete和compact 主要针对过期数据的处理或是日志文件达到限制的额度会被 topic创建时的指定参数覆盖 log.cleanup.interval.mins1 指定日志每隔多久检查看是否可以被删除默认1分钟 log.retention.hours 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据也就是消费端能够多久去消费数据。log.retention.bytes和log.retention.minutes或者log.retention.hours任意一个达到要求都会执行删除会被topic创建时的指定参数覆盖 log.segment.bytes topic的分区是以一堆segment文件存储的这个控制每个segment的大小会被topic创建时的指定参数覆盖 log.retention.check.interval.ms 文件大小检查的周期时间是否触发 log.cleanup.policy中设置的策略 zookeeper.connect ZK主机地址如果zookeeper是集群则以逗号隔开。 zookeeper.connection.timeout.ms 连接到Zookeeper的超时时间。 4.测试Kafka 4.1 启动zookeeper集群 在三个节点依次执行 # cd /usr/local/kafka # nohup bin/zookeeper-server-start.sh config/zookeeper.properties 查看端口 # netstat -lntp | grep 2181 4.2 启动Kafka 在三个节点依次执行 # cd /usr/local/kafka # nohup bin/kafka-server-start.sh config/server.properties 4.3 测验 验证 在192.168.26.166上创建topic # bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic 参数解释 –zookeeper指定zookeeper的地址和端口 –partitions指定partition的数量 –replication-factor指定数据副本的数量 在26.170上面查询192.168.26.166上的topic [rootes03 kafka]# bin/kafka-topics.sh --zookeeper 192.168.26.166:2181 --list testtopic 二模拟消息生产和消费 发送消息到192.168.26.166 [rootes01 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.19.20:9092 --topic testtopic hello 你好呀 从192.168.26.171接受消息 [rootes02 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.19.21:9092 --topic testtopic --from-beginning