dw php网站建设视频教程,企业网站pv是什么,茂名本土网站建设公司,船舶cms是什么意思kafka基本概念 producer#xff1a; 生产者#xff0c;负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view#xff0c;或者是服务器日志#xff0c;系统CPU、memory等。
consumer#xff1a; 消费者#xff0c;每个consumer属于一个特定的c…kafka基本概念 producer 生产者负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view或者是服务器日志系统CPU、memory等。
consumer 消费者每个consumer属于一个特定的consuer group可为每个consumer指定group name若不指定group name则属于默认的group。创建消费者时要指定消费者接受的消息的topic该消费者只会接受该topic的消息。
topic 每条发布到Kafka集群的消息都有一个类别这个类别被称为topic。物理上不同topic的消息分开存储逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处。
broker kafka集群包含一个或多个服务器这些服务器就叫做broker。
本机安装kafka测试
安装kafkamac下
kafka下载 从官网下载 kafka_2.13-2.7.0.tgz直接解压即可。
本机测试kafka
1、进入到kafka的解压目录输入命令启动zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties
复制 打开另一个终端输入命令启动kafka
./bin/kafka-server-start.sh config/server.properties
复制
2、服务启起来后可以创建生产者和消费者了。 再打开另一个终端输入命令创建生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
复制
broker-list: 参数指定生产者所使用的broker localhost 9092 参数表示broker这个broker为本机(127.0.0.1)且使用的端口是kafka的默认端口号是9092 topic: 参数表示生产者生产的消息的topic 为 “test_topic”
最后再打开另一个终端创建消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning
复制
bootstrap-server: 是指定consumer从哪里(broker)取出消息 topic: 指定消费者consumer取出的 topic 为“test_topic”的消息。 from-beginning Kafka实际环境有可能会出现Consumer全部宕机虽然基于Kafka的高可用特性消费者群组中的消费者可以实现再均衡所有Consumer不处理数据的情况很少但是还是有可能会出现此时就要求Consumer重启的时候能够读取在宕机期间Producer发送的数据。基于消费者订阅模式默认是无法实现的因为只能订阅最新发送的数据。通过消费者命令行可以实现只要在命令行中加上–from-beginning即可
3、都创建完了可以通过生产者输入消息消费者来接收并显示消息效果图如下 springboot整合kafkaIDEA
注意 kafka要是部署在服务器的话本机就 要和服务器之间能ping通。
1、创建springboot项目 2、创建两个类分别为生产者和消费者 项目目录结构 配置文件application.yml(一般项目自动生成的是applicaiton.properties但为了书写简便改成yml)
spring:kafka:bootstrap-servers: 127.0.0.1:9092 #服务器的ip及端口可以写多个服务器之间用“”间隔producer: #生产者配置key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: #消费者配置group-id: test #设置消费者的组idenable-auto-commit: true
# auto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
复制
springboot启动类入口KafkaStudyApplication.java
package com.study.kafka.kafka_study;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class KafkaStudyApplication { public static void main(String[] args) { SpringApplication.run(KafkaStudyApplication.class, args);}}
复制
TestKafkaProducerController.java:(生产者)
package com.study.kafka.kafka_study;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
RestController //定义这是一个控制器可以通过浏览器访问
RequestMapping(/kafka)
public class TestKafkaProducerController { Autowired
private KafkaTemplateString, String kafkaTemplate;
//当在浏览器上输入http://localhost:8080/kafka/send?msgabc就会发送abc到服务器上去让消费者接收msg对应下面的String msg
RequestMapping(/producerSend)
public String send(String msg){ kafkaTemplate.send(test_topic, msg); //使用kafka模板发送信息
return success;
}
}
复制
TestConsumer.java:(消费者)
package com.study.kafka.kafka_study;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
Component
public class TestConsumer { /** * 定义此消费者接收topic为“test_topic”的消息,监听服务器上的kafka是否有相关的消息发过来 * param record record变量代表消息本身可以通过ConsumerRecord?,?类型的record变量来打印接收的消息的各种信息 * */
KafkaListener(topics test_topic)
public void listen (ConsumerRecord?, ? record) throws Exception { System.out.printf(topic %s, offset %d, value %s \n, record.topic(), record.offset(), record.value());
}
}
复制
测试
1、运行KafkaStudyApplication.java之后终端上输入消息时不仅终端上服务器运行的测试消费者能收到IDEA上的程序也能收到。 2、在浏览器上输入http://localhost:8080/kafka/producerSend?msgweb world31231不仅IDEA上的消费者能收到在终端(服务器)上运行的测试消费者也能收到其中8080是tomcat服务器的端口springboot默认下带的是tomcat