当前位置: 首页 > news >正文

电影网站系统源码设计公司起名字寓意好的字

电影网站系统源码,设计公司起名字寓意好的字,学校网站建设xml,站外推广内容策划2.3. Pulsar Adaptors适配器 2.3.1.kafka适配器 2.3.2.Spark适配器 2.3. Pulsar Adaptors适配器 2.3.1.kafka适配器 Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。 在生产者中, 如果想不改变原有kafka的代码架构, 就切换到Pulsar的…2.3. Pulsar Adaptors适配器 2.3.1.kafka适配器 2.3.2.Spark适配器 2.3. Pulsar Adaptors适配器 2.3.1.kafka适配器 Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。 在生产者中, 如果想不改变原有kafka的代码架构, 就切换到Pulsar的平台中, 那么Pulsar adaptor on kafka就变的非常的有用了, 它可以帮助我们在不改变原有kafka的代码基础上, 即可接入pulsar, 但是需要注意, 相关配置信息需要进行一些调整, 例如: 地址与topic 1- 需要导入Pulsar集成kafka的依赖包, 删除掉原有Kafka-client包 dependency groupIdorg.apache.pulsar/groupId artifactIdpulsar-client-kafka/artifactId version2.8.0/version /dependency注: 目前Pulsar并在Maven中央仓库中并没有提供Pulsar-client-kafka 2.8.1的包, 故此处导入2.8.0 2-编写生产者 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties; import java.util.concurrent.ExecutionException;public class KafkaAdaptorProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//1. 创建kafka生产者的核心类对象: KafkaProducer// 1.1: 创建生产者配置对象: 设置相关配置Properties props new Properties();props.put(bootstrap.servers, pulsar://node1:6650,node2:6650,node3:6650);// 消息的确认方案props.put(acks, all);// key序列化类型props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);// value 序列化类型props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer new KafkaProducer(props); //2. 发送数据 for (int i 0; i 10; i) { //2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据ProducerRecordString, String producerRecord new ProducerRecord(persistent://public/default/txn_t1,Integer.toString(i), Integer.toString(i)); producer.send(producerRecord).get(); }//3. 释放资源 producer.close();}}3-编写消费者 import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration; import java.util.Arrays; import java.util.Properties;public class KafkaAdaptorConsumer {public static void main(String[] args) {//1. 创建kafka的消费者的核心对象: KafkaConsumer//1.1: 创建消费者配置对象, 并设置相关的参数:Properties props new Properties();props.setProperty(bootstrap.servers, pulsar://node1:6650,node2:6650,node3:6650);//消费者组的 idprops.setProperty(group.id, test);//是否启动消费者自动提交消费偏移量props.setProperty(enable.auto.commit, true);//每间隔多长时间提交一次偏移量:单位 毫秒props.setProperty(auto.commit.interval.ms,1000);//key 反序列化props.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);//val 发序列化props.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);//2. 给消费者设置订阅topic:consumer.subscribe(Arrays.asList(persistent://public/default/txn_t1));//3. 循环获取相关的消息数据while (true) {//3.1: 从kafka中获取消息数据: 参数表示等待超时时间//注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));//3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息for (ConsumerRecordString, String record : records) {String massage record.value();System.out.println(消息数据为:massage);}} } }4- 先运行消费者, 进行监听, 然后运行生产者, 观察消费者是否可以正常消费到数据 2.3.2.Spark适配器 Pulsar 的 Spark Streaming 接收器是一个自定义的接收器它使用 Apache Spark Streaming 能够从 Pulsar 接 收原始数据。 应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据并可 以通过多种方式对其进行处理。 1-导入相关的依赖包 dependencygroupIdorg.apache.pulsar/groupIdartifactIdpulsar-spark/artifactIdversion2.8.0/version /dependency2-编写spark的流式代码 String serviceUrl pulsar://localhost:6650/; String topic persistent://public/default/test_src; String subs test_sub; SparkConf sparkConf new SparkConf().setMaster(local[*]).setAppName(Pulsar Spark Example); JavaStreamingContext jsc new JavaStreamingContext(sparkConf, Durations.seconds(60)); ConsumerConfigurationDatabyte[] pulsarConf new ConsumerConfigurationData(); SetString set new HashSet(); set.add(topic); pulsarConf.setTopicNames(set); pulsarConf.setSubscriptionName(subs); SparkStreamingPulsarReceiver pulsarReceiver new SparkStreamingPulsarReceiver( serviceUrl, pulsarConf, new AuthenticationDisabled()); JavaReceiverInputDStreambyte[] lineDStream jsc.receiverStream(pulsarReceiver);
http://www.dnsts.com.cn/news/6376.html

相关文章:

  • 宿迁哪里做网站珠海网站制作设计
  • 腾讯云做网站河西做网站
  • 济南网站托管运营vi设计方案包括
  • 网站优化需要哪些工具做网站内嵌地图
  • 哪个网站做外贸年费比较便宜深圳市住房和建设局工程交易服务主页
  • 网站弹出的对话框怎么做免费学做衣服的网站
  • html5中国网站欣赏兰州网站优化推广
  • 舟山建设银行纪念币预约网站wordpress花园商城
  • 深圳网站制作建设哪家专业网站开发基础培训
  • wordpress网站鼠标网站设计的研究方法
  • 电商网站 案例东莞网站设计建设
  • 高端html5网站建设密云新闻 今天 最新
  • 网站制作公司高端seo关键词优化费用
  • 蚌埠专业制作网站的公司网站开发工具介绍
  • 中文域名网站跳转西安做h5网站
  • 潍坊网站建设 APP开发小程序外贸营销是做什么的
  • 自己建立网站江西住房和城乡建设部网站首页
  • 网站版块策划网站建设原型图
  • 给一个网站风格做定义匿名聊天网站开发
  • 网站怎么做一盘优化排名wordpress 关键词链接插件
  • 长春网站建设v1公司装修费用怎么做账
  • 宜昌网站设计创新创意产品设计作品
  • 农村建设房子建设网站建设淘宝小程序入口
  • 自己做链接的网站吗万全县城乡建设网站
  • 自己可以做网站推广吗做内销网站
  • 广州做网站优化公司报价商城网站建设哪家最好
  • 做网站前端需要懂得玉田县网站建设
  • 网站制作与建设教程下载手机网页开发者模式
  • 在哪个网站去租地方做收废站制作网页之前必须先建立的是
  • 滑县网站建设哪家专业wordpress网站缩