当当网网站系统建设的意义,自己做购物网站推广,昆山做网站怎么做,邢台网拓信息技术服务有限公司一、Kafka相关概念
1、关于Kafka的描述
Kafka是由Apache开源#xff0c;具有分布式、分区的、多副本的、多订阅者#xff0c;基于Zookeeper协调的分布式处理平台#xff0c;由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据#xff0c;并高速的处…一、Kafka相关概念
1、关于Kafka的描述
Kafka是由Apache开源具有分布式、分区的、多副本的、多订阅者基于Zookeeper协调的分布式处理平台由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据并高速的处理。日志类的数据需要高吞吐量的性能要求对于像Hadoop一样的日志数据和离线分析系统但又要求实时处理的限制这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理也是为了通过集群来提供实时的消息。
2、关于Kafka的功能特点
通过磁盘数据结构提供消息的持久化消息存储也能够保持长时间稳定性;高吞吐量即使是非常普通的硬件Kafka也可以支持每秒超高的并发量;支持通过Kafka服务器和消费机集群来分区消息;支持Hadoop并行数据加载;API包封装的非常好简单易用上手快 ;分布式消息队列。Kafka对消息保存时根据Topic主题进行归类发送消息者称为Producer生产者消息接受者称为Consumer消费者;
3、Kafka消息功能
如下图所示Kafka作为一个中间服务代表一个broker经纪人角色负责接收APP的消费与推送消息给其他相关APP。这里APP可分为ProducerConsumer。 消息的消费模式
点对点模式点对点模式通常是一个基于拉取或者轮询的消息传递模型消费者主动拉取数据消息收到后从队列移除消息这种模型不是将消息推送到客户端而是从队列中请求消息。特点是发送到队列的消息被一个且只有一个消费者接收处理即使有多个消费者监听队列也是如此。
发布订阅模式订阅模式是一个基于推送的消费传送模型消息产生后Kafka会推送给所有订阅相关Topic的订阅者。发布订阅模型可以有多种不同的订阅者临时订阅者只在主动监听主题时才接收消息而持久订阅者则监听主题的所有消息即使当前订阅者不可用处于离线状态。 4、Kafka消息队列的作用
应用程序之间解耦生产者与消费者相互独立各自异步执行。消息数据持久化存储直到所有消息都被消费规避消息数据丢失的风险。流量削峰使用Kafka消息队列可以帮助server承接访问压力尽可能避免应用程序崩溃。降低进程间的耦合度系统部分应用组件发生崩溃时不会影响到整体系统的运行。保证消息顺序执行解决特定场景业务需求。
5、Kafka相关术语介绍
Broker 一台kafka服务器就是一个broker经纪人。一个集群由多个broker组成。一个broker可以容纳多个topic消息主题。
Producer 消息生产者就是向kafka broker发消息的APP客户端。
Consumer 消息消费者向kafka broker取消息的APP客户端。
Topic 每条发布到Kafka集群的消息都有一个类别这个类别被称为Topic可以理解为一个队列。
Consumer Group 每个Consumer属于一个特定的Consumer Group可为每个Consumer指定group name若不指定group name则属于默认的分组。
Partition
一个庞大大的topic可以分布到多个broker上一个topic可以分为多个partition每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id。kafka只保证按一个partition中的顺序将消息发给consumer不保证一个topic的整体的顺序。Partition是物理上的概念方便在集群中扩展提高并发。
二、liunx系统下搭建Kafka环境 --新建kafka应用目录。并下载到当前目录下
cd /usr/localmkdir kafkacd kafka
--下载wget https://downloads.apache.org/kafka/3.7.0/kafka-3.7.0-src.tgz--解压tar -zxvf kafka-3.7.0-src.tgz--启动服务cd kafka-3.7.0./bin/kafka-server-start.sh config/server.properties--查看服务ps -aux |grep kafka--开放kafka地址端口vim server.properties--添加下面注释advertised.listenersPLAINTEXT://10.98.3.22:9092 三、Springboot2整合Kafka 服务
1、导入基础依赖
!-- SpringBoot依赖 --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId
/dependency
!-- kafka 依赖 --
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.2.4.RELEASE/version
/dependency
2、项目目录结构 3、生产者与消费者yml文件配置
#消费者配置
spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: test-consumer-group#生产者配置spring:kafka:bootstrap-servers: 127.0.0.1:9092
4、生成消息
RestController
RequestMapping(/kafka)
public class ProducerController {Resourceprivate KafkaTemplateString, String kafkaTemplate;RequestMapping(/send)public HttpResult sendMsg () {MsgLog msgLog new MsgLog(1,消息生成,1,消息日志,new Date()) ;String msg JSON.toJSONString(msgLog) ;// 这里Topic如果不存在会自动创建kafkaTemplate.send(cicada-topic, msg);return HttpResult.create(HttpStatus.SUCCESS,msg);}
}
Component
public class ConsumerMsg {private static Logger LOGGER LoggerFactory.getLogger(ConsumerMsg.class);//此注解是监听主题为cicada-topic的消息队列KafkaListener(topics cicada-topic)public void listenMsg (ConsumerRecord?,String record) {String value record.value();LOGGER.info(ConsumerMsgvalue);}
}