天津免费做网站,山东seo优化,wordpress免费问答模板,wordpress客户中心文章目录 异步发送普通异步发送异步发送流程Code 带回调函数的异步发送带回调函数的异步发送流程Code 同步发送API 异步发送
普通异步发送
需求#xff1a;创建Kafka生产者#xff0c;采用异步的方式发送到Kafka broker
异步发送流程 Code
!-- https://mvnrepository… 文章目录 异步发送普通异步发送异步发送流程Code 带回调函数的异步发送带回调函数的异步发送流程Code 同步发送API 异步发送
普通异步发送
需求创建Kafka生产者采用异步的方式发送到Kafka broker
异步发送流程 Code
!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.6.0/version
/dependency
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** author 小工匠* version 1.0* mark: show me the code , change the world*/
public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.126.170:9092);// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用send方法,发送消息for (int i 0; i 10; i) {RecordMetadata art kafkaProducer.send(new ProducerRecord(art, kafka-msg- i)).get();System.out.println(art.offset());System.out.println(over - i);}// 5. 关闭资源kafkaProducer.close();}}
输出
31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9
忽略我这个offset … 我都发了好多次了…
看控制台的吧 带回调函数的异步发送
回调函数callback会在producer收到ack时调用为异步调用。
该方法有两个参数分别是RecordMetadata元数据信息和Exception异常信息。
如果Exception为null说明消息发送成功如果Exception不为null说明消息发送失败
带回调函数的异步发送流程 注意消息发送失败会自动重试不需要我们在回调函数中手动重试。
Code
package com.artisan.pc;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** author 小工匠* version 1.0* mark: show me the code , change the world*/
public class CustomProducerWithCallBack {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.126.170:9092);// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用send方法,发送消息for (int i 0; i 10; i) {// 添加回调// 该方法在Producer收到ack时调用为异步调用kafkaProducer.send(new ProducerRecord(art, kafka-msg-callback- i), (recordMetadata, e) - {// 没有异常,输出信息到控制台System.out.println(主题 recordMetadata.topic() , 分区 recordMetadata.partition() , 偏移量 recordMetadata.offset());});}// 5. 关闭资源kafkaProducer.close();}} 控制台 同步发送API
同步发送的意思就是一条消息发送之后会阻塞当前线程直至返回ack。 由于send方法返回的是一个Future对象根据Futrue对象的特点我们也可以实现同步发送的效果只需在调用Future对象的get方发即可。 package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** author 小工匠* version 1.0* mark: show me the code , change the world*/
public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.126.170:9092);// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);// 4. 调用send方法,发送消息for (int i 0; i 10; i) {// 通过Future接口的get实现同步阻塞kafkaProducer.send(new ProducerRecord(art, kafka-msg-get- i)).get() ;}// 5. 关闭资源kafkaProducer.close();}}