网站微信认证费用多少钱,wordpress主题图,备案域名一定要建好网站吗,小语种建站目录 一、引入依赖二、添加Kafka配置三、创建 Kafka 消费者#xff08;一#xff09;Kafka生产的消息是JSON 字符串1、方式一2、方式二#xff1a;需要直接访问消息元数据 #xff08;二#xff09;Kafka生产的消息是对象Order 四、创建 启动类五、配置 Kafka 生产者… 目录 一、引入依赖二、添加Kafka配置三、创建 Kafka 消费者一Kafka生产的消息是JSON 字符串1、方式一2、方式二需要直接访问消息元数据 二Kafka生产的消息是对象Order 四、创建 启动类五、配置 Kafka 生产者可选一消息类型为json串二消息类型为对象Order 六、启动 Kafka 服务七、测试 Kafka 消费者九、测试和调试十、 结语 一、引入依赖
你需要在 pom.xml 中添加 spring-kafka 相关依赖
dependencies!-- Spring Boot Web --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- Spring Kafka --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency!-- Spring Boot Starter for Logging (optional but useful for debugging) --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-logging/artifactId/dependency!-- Spring Boot Starter for Testing --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency
/dependencies
二、添加Kafka配置
在 application.yml 或 application.properties 文件中配置 Kafka 连接属性
application.yml 示例
spring:kafka:bootstrap-servers: localhost:9092 # Kafka服务器地址consumer:group-id: my-consumer-group # 消费者组IDauto-offset-reset: earliest # 消费者从头开始读取如果没有已提交的偏移量key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串listener:missing-topics-fatal: false # 如果主题不存在不抛出致命错误
application.properties 示例
spring.kafka.bootstrap-serverslocalhost:9092
spring.kafka.consumer.group-idmy-consumer-group
spring.kafka.consumer.auto-offset-resetearliest
spring.kafka.listener.missing-topics-fatalfalse
spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器
spring.kafka.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串
注意spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer # 设置key的反序列化器 spring.kafka.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer # 设置value的反序列化器为字符串 以上配置说明Kafka生产的数据是json字符串那么消费接收的数据默认也是json字符串如果接收消息想用对象接受需要自定义序列化器比如以下配置
spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 对 Key 使用 StringSerializervalue-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer # 对 Value 使用 ErrorHandlingSerializerproperties:spring.json.value.default.type: com.example.Order # 默认的 JSON 反序列化目标类型为 Order
三、创建 Kafka 消费者
创建一个 Kafka 消费者类来处理消息。你可以使用 KafkaListener 注解来监听 Kafka 中的消息
一Kafka生产的消息是JSON 字符串
1、方式一
如果消息是 JSON 字符串你可以使用 StringDeserializer 获取消息后再使用 ObjectMapper 将其转换为 Java 对象如 Order。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;Service
EnableKafka // 启用 Kafka 消费者
public class KafkaConsumer {private final ObjectMapper objectMapper new ObjectMapper();// 监听 Kafka 中的 order-topic 主题KafkaListener(topics order-topic, groupId order-consumer-group)public void consumeOrder(String message) {try {// 将 JSON 字符串反序列化为 Order 对象Order order objectMapper.readValue(message, Order.class);System.out.println(Received order: order);} catch (Exception e) {e.printStackTrace();}}}
说明
KafkaListener(topics “my-topic”, groupId “my-consumer-group”): topics 表示监听的 Kafka 主题groupId 表示消费者所属的消费者组。listen(String message): 该方法会被调用来处理收到的每条消息。在此示例中我们打印出消息内容。
2、方式二需要直接访问消息元数据
可以通过 ConsumerRecord 来接收 Kafka 消息。这种方式适用于需要直接访问消息元数据如 topic、partition、offset的场景也适合手动管理消息消费和偏移量提交的情况。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题KafkaListener(topics order-topic, groupId order-consumer-group)public void consumeOrder(ConsumerRecordString, String record) {// 获取消息的详细信息String key record.key(); // 获取消息的 keyString value record.value(); // 获取消息的 valueString topic record.topic(); // 获取消息的 topicint partition record.partition(); // 获取消息的分区long offset record.offset(); // 获取消息的偏移量long timestamp record.timestamp(); // 获取消息的时间戳// 处理消息这里我们只是打印消息System.out.println(Consumed record: );System.out.println(Key: key);System.out.println(Value: value);System.out.println(Topic: topic);System.out.println(Partition: partition);System.out.println(Offset: offset);System.out.println(Timestamp: timestamp);}
}
二Kafka生产的消息是对象Order
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;Service
public class KafkaConsumer {// 监听 Kafka 中的 order-topic 主题KafkaListener(topics order-topic, groupId order-consumer-group)public void consumeOrder(ConsumerRecordString, Order record) {// 获取消息的详细信息String key record.key(); // 获取消息的 keyOrder value record.value(); // 获取消息的 valueString topic record.topic(); // 获取消息的 topicint partition record.partition(); // 获取消息的分区long offset record.offset(); // 获取消息的偏移量long timestamp record.timestamp(); // 获取消息的时间戳// 处理消息这里我们只是打印消息System.out.println(Consumed record: );System.out.println(Key: key);System.out.println(Value: value);System.out.println(Topic: topic);System.out.println(Partition: partition);System.out.println(Offset: offset);System.out.println(Timestamp: timestamp);}
}四、创建 启动类
确保你的 Spring Boot 启动类正确配置了 Spring Boot 应用程序启动。
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class KafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(KafkaConsumerApplication.class, args);}}
五、配置 Kafka 生产者可选
一消息类型为json串
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;Service
EnableKafka
public class KafkaProducer {Autowiredprivate KafkaTemplateString, String kafkaTemplate; // 发送的是 String 类型消息private ObjectMapper objectMapper new ObjectMapper(); // Jackson ObjectMapper 用于序列化// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {try {// 将 Order 对象转换为 JSON 字符串String orderJson objectMapper.writeValueAsString(order);// 发送 JSON 字符串到 KafkakafkaTemplate.send(topic, orderJson); // 发送字符串消息System.out.println(Order JSON sent to Kafka: orderJson);} catch (Exception e) {e.printStackTrace();}}
}
二消息类型为对象Order
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;Service
EnableKafka
public class KafkaProducer {Autowiredprivate KafkaTemplateString, Order kafkaTemplate;// 发送订单到 Kafkapublic void sendOrder(String topic, Order order) {kafkaTemplate.send(topic, order); // 发送订单对象Spring Kafka 会自动将 Order 转换为 JSON}
}
六、启动 Kafka 服务
启动 Kafka 服务
bin/kafka-server-start.sh config/server.properties
七、测试 Kafka 消费者
你可以通过向 Kafka 发送消息来测试消费者是否工作正常。假设你已经在 Kafka 中创建了一个名为 my-topic 的主题可以使用 KafkaProducer 来发送消息
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;RestController
public class KafkaController {Autowiredprivate KafkaProducer kafkaProducer;GetMapping(/sendOrder)public String sendOrder() {Order order new Order();order.setOrderId(1L);order.setUserId(123L);order.setProduct(Laptop);order.setQuantity(2);order.setStatus(Created);kafkaProducer.sendOrder(order-topic, order);return Order sent!;}
}
当你访问 /sendOrder端点时KafkaProducer 会将消息发送到 KafkaKafkaConsumer 会接收到这条消息并打印出来。
九、测试和调试
你可以通过查看 Kafka 消费者日志确保消息已经被成功消费。你还可以使用 KafkaTemplate 发送消息并确保 Kafka 生产者和消费者之间的连接正常。
十、 结语
至此你已经在 Spring Boot 中成功配置并实现了 Kafka 消费者和生产者。你可以根据需要扩展功能例如处理更复杂的消息类型、批量消费等。