企业网站404页面设计,静态网站可以做留言板,如何让自己的网站排在前面,装修在线设计平台Spring Boot | 如何使用 Apache Kafka 消费 JSON 消息 Apache Kafka 是一个流处理系统#xff0c;可让您在进程、应用程序和服务器之间发送消息。在本文中#xff0c;我们将了解如何使用 Apache Kafka 在 Spring Boot 应用程序的控制台上发布 JSON 消息。
为了了解如何创建 …Spring Boot | 如何使用 Apache Kafka 消费 JSON 消息 Apache Kafka 是一个流处理系统可让您在进程、应用程序和服务器之间发送消息。在本文中我们将了解如何使用 Apache Kafka 在 Spring Boot 应用程序的控制台上发布 JSON 消息。
为了了解如何创建 Spring Boot 项目请参阅本文。
工作步骤
步骤 1
转到Spring 初始化程序并创建具有以下依赖项的启动项目 Spring for Apache Kafka
步骤 2
在 IDE 中打开项目并同步依赖项。在本文中我们将创建一个学生模型我们将在其中发布学生详细信息。因此创建一个模型类Student。添加数据成员并创建构造函数并重写toString方法以查看 JSON 格式的消息。以下是学生类的实现
学生模型 // Java program to implement a // student class // Creating a student class public class Student { // Data members of the class int id; String firstName; String lastName; // Constructor of the student // Class public Student() { } // Parameterized constructor of // the student class public Student(int id, String firstName, String lastName) { this.id id; this.firstName firstName; this.lastName lastName; } Override public String toString() { return Student{ id id , firstName firstName , lastName lastName }; } }
步骤 3
创建一个新的类Config并添加注释Configuration和EnableKafka。现在使用 Student 类对象创建 Bean ConsumerFactory和ConcurrentKafkaListenerContainerFactory 。
配置类 EnableKafka Configuration public class Config { // Function to establish a connection // between Spring application // and Kafka server Bean public ConsumerFactoryString, Student studentConsumer() { // HashMap to store the configurations MapString, Object map new HashMap(); // put the host IP in the map map.put(ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092); // put the group ID of consumer in the map map.put(ConsumerConfig .GROUP_ID_CONFIG, id); map.put(ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); map.put(ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); // return message in JSON formate return new DefaultKafkaConsumerFactory( map, new StringDeserializer(), new JsonDeserializer(Student.class)); } Bean public ConcurrentKafkaListenerContainerFactoryString, Student studentListner() { ConcurrentKafkaListenerContainerFactoryString, Student factory new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(studentConsumer()); return factory; } } 步骤 4
创建一个带有Service注释的KafkaService类。此类将包含用于在控制台上发布消息的侦听器方法。
KafkaService 类 Service public class KafkaService { // Annotation required to listen // the message from Kafka server KafkaListener(topics JsonTopic, groupId id, containerFactory studentListner) public void publish(Student student) { System.out.println(New Entry: student); } }
步骤 5
启动 zookeeper 和 Kafka 服务器。现在我们需要创建一个名为JsonTopic的新主题。为此打开一个新的命令提示符窗口并将目录更改为 Kafka 目录。
步骤6
现在使用下面给出的命令创建一个新主题
bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // 用于 mac 和 linux .\bin\windows\Kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // 用于 windows
步骤 7
现在运行 Kafka 生产者控制台使用以下命令
bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // 适用于 mac 和 linux .\bin\windows\Kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // 适用于 windows
步骤 8
运行应用程序并在 Kafka 生产器上输入消息并按回车键。