当前位置: 首页 > news >正文

完整网站模板下载wordpress中国网站模板

完整网站模板下载,wordpress中国网站模板,安徽省建设厅网站职称申报,网络推广工具大全1. kafka的客户端 Kafka提供了两套客户端API#xff0c;HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节#xff0c;使用起来比较简单#xff0c;是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节#xff0c;Pa…1. kafka的客户端 Kafka提供了两套客户端APIHighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节使用起来比较简单是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节PartitionOffset这些数据都由客户端自行管理。这层API功能更灵活但是使用起来非常复杂也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。 2. 基础客户端的使用 Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.13/artifactIdversion3.4.0/version/dependency 2.1 如何发消息 现在 我们使用Kafka提供的Producer类如何发送消息。 2.1.1 单项发送消息 代码 public class MyProducerTest {private static final String BOOTSTRAP_SERVERS 192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092;private static final String TOPIC disTopic;public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,com.roy.kfk.basic.MyInterceptor);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);ProducerString,String producer new KafkaProducer(props);for(int i 0; i 5; i) {//Part2:构建消息ProducerRecordString, String record new ProducerRecord(TOPIC, Integer.toString(i), MyProducer i);//Part3:发送消息//单向发送不关心服务端的应答。producer.send(record);System.out.println(message i sended);}//消息处理完才停止发送者。producer.close();} }执行结果 2.1.2 同步发送 代码 public class MyProducerTest {private static final String BOOTSTRAP_SERVERS 192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092;private static final String TOPIC disTopic;public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,com.roy.kfk.basic.MyInterceptor);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);ProducerString,String producer new KafkaProducer(props);for(int i 0; i 5; i) {//Part2:构建消息ProducerRecordString, String record new ProducerRecord(TOPIC, Integer.toString(i), MyProducer i);//Part3:发送消息//同步发送获取服务端应答消息前会阻塞当前线程。RecordMetadata recordMetadata producer.send(record).get();String topic recordMetadata.topic();int partition recordMetadata.partition();long offset recordMetadata.offset();String message recordMetadata.toString();System.out.println(message:[ message] sended with topic:topic; partition:partition ;offset:offset);}//消息处理完才停止发送者。producer.close();} } 执行结果 2.1.2 异步发送  代码 public class MyProducerTest {private static final String BOOTSTRAP_SERVERS 192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092;private static final String TOPIC disTopic;public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,com.roy.kfk.basic.MyInterceptor);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);ProducerString,String producer new KafkaProducer(props);CountDownLatch latch new CountDownLatch(5);for(int i 0; i 5; i) {//Part2:构建消息ProducerRecordString, String record new ProducerRecord(TOPIC, Integer.toString(i), MyProducer i);//Part3:发送消息//异步发送消息发送后不阻塞服务端有应答后会触发回调函数producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null ! e){System.out.println(消息发送失败,e.getMessage());e.printStackTrace();}else{String topic recordMetadata.topic();long offset recordMetadata.offset();String message recordMetadata.toString();System.out.println(message:[ message] sended with topic:topic;offset:offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();//消息处理完才停止发送者。producer.close();} } 执行结果 2.1.3 总结  ​ 从上述示例中我们可以总结出构建Producer分为三个步骤 设置Producer核心属性 Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中对于大部分比较重要的属性都配置了对应的DOC属性进行描述。构建消息Kafka的消息是一个Key-Value结构的消息。其中key和value都可以是任意对象类型。其中key主要是用来进行Partition分区的业务上更关心的是value。使用Producer发送消息通常用到的就是单向发送、同步发送和异步发送者三种发送方式。 2.2 如何消费消息 接下来可以使用Kafka提供的Consumer类快速消费消息。 2.2.1 消费消息 代码 public class MyConsumerTest {private static final String BOOTSTRAP_SERVERS 192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092;private static final String TOPIC disTopic;public static void main(String[] args) {//PART1:设置发送者相关属性Properties props new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, test);//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);ConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecordsString, String records consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecordString, String record : records) {System.out.println(partition record.partition()offset record.offset() ;key record.key() ; value record.value());}//提交offset消息就不会重复推送。consumer.commitSync(); //同步提交表示必须等到offset提交完毕再去消费下一批数据。 // consumer.commitAsync(); //异步提交表示发送完提交offset请求后就开始消费下一批数据了。不用等到Broker的确认。}} } 2.2.2 总结 ​ 整体来说Consumer同样是分为三个步骤 设置Consumer核心属性 可选的属性都可以由ConsumerConfig类管理。在这个类中同样对于大部分比较重要的属性都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。拉取消息Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。处理消息提交位点消费者将消息拉取完成后就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交OffsetBroker会认为消费者端消息处理失败了还会重复进行推送。 3. 客户端核心参数与客户端机制 3.1 消费者分组消费机制 3.2 生产者拦截器机制 3.3 消息序列化机制 3.4 消息分区路由机制 3.5 生产者消息缓存机制 3.6 发送应答机制 3.7 生产者消息幂等性 3.8 生产者消息事务 内容更新中
http://www.dnsts.com.cn/news/141626.html

相关文章:

  • 茶山网站仿做湖州网站建设
  • 上海网站建设哪里便宜通信管理局网站备案
  • 重庆网站设计中心网页版qq在线登录
  • 企业网站和信息化建设制度免费开放服务器
  • 宝塔自助建站源码企业网站推广策略
  • 郴州网站seo招聘类网站如何做
  • 广州h5网站制作公司dede手机网站模板哦
  • 网站搭建的流程wordpress采集文章内容
  • 快速网站开发课程wordpress小鹅通
  • 营销型网站的现状学生免费舆情监测平台官网
  • 做h5网站pc加手机版要多少钱哪些品牌网站做的好
  • html语言大型网站开发网站改版收费
  • 关于加强网站建设的建议上海公司注销流程步骤
  • 站长统计幸福宝宝官方网站建成后 再添加小功能麻烦吗
  • 网站 例网站开发前端和后端
  • 地方门户网站开发方案51我们一起做网站
  • 校体育网站建设的好处群晖非插件搭建wordpress
  • 打开网站不要出现 index.html酷家乐装修设计软件
  • 网站外链要怎么做wordpress如何添加商桥
  • 漫画驿站网页设计图纸尺寸图做哪个网站零售最好
  • 网站建设实训结论和体会海珠区
  • 口碑好的盐城网站建设江门网站制作案例
  • 音乐网站排名wordpress产品分类插件
  • 服务网站运营方案wordpress 回复 慢
  • 怎么做网站框架注册个免费网站
  • 工商网站查询企业品牌百度网站建设
  • 社交网站建设码世界杯32强排名
  • vps除了做网站还能做什么自己的网站怎么制作
  • 网站开发交流吧教做饮品的网站
  • php做彩票网站个人管理系统