当前位置: 首页 > news >正文

地宝网 网站建设淘宝网网站开发

地宝网 网站建设,淘宝网网站开发,深圳网站建设 设计贝尔,吉利汽车网站开发环境分析文章目录 Kafka Source1. 使用方法2. Topic / Partition 订阅3. 消息解析4. 起始消费位点5. 有界 / 无界模式6. 其他属性7. 动态分区检查8. 事件时间和水印9. 空闲10. 消费位点提交11. 监控12. 安全 Apache Kafka 连接器 Flink 提供了 Apache Kafka 连接器使用精确一次#xf… 文章目录 Kafka Source1. 使用方法2. Topic / Partition 订阅3. 消息解析4. 起始消费位点5. 有界 / 无界模式6. 其他属性7. 动态分区检查8. 事件时间和水印9. 空闲10. 消费位点提交11. 监控12. 安全 Apache Kafka 连接器 Flink 提供了 Apache Kafka 连接器使用精确一次Exactly-once的语义在 Kafka topic 中读取和写入数据。依赖 !-- flink kakfa --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.12/artifactIdversion${flink.version}/version/dependencyKafka Source 1. 使用方法 Kafka Source 提供了构建类来创建 KafkaSource 的实例。以下代码片段展示了如何构建 KafkaSource 来消费 “input-topic” 最早位点的数据 使用消费组 “my-group”并且将 Kafka 消息体反序列化为字符串 KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(input-topic).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source);2. Topic / Partition 订阅 以下属性在构建 KafkaSource 时是必须指定的 Bootstrap server通过 setBootstrapServers(String) 方法配置消费者组 ID通过 setGroupId(String) 配置要订阅的 Topic / Partition请参阅 Topic / Partition 订阅一节用于解析 Kafka 消息的反序列化器Deserializer请参阅消息解析一节 Kafka Source 提供了 3 种 Topic / Partition 的订阅方式 Topic 列表订阅 Topic 列表中所有 Partition 的消息 KafkaSource.builder().setTopics(topic-a, topic-b);正则表达式匹配订阅与正则表达式所匹配的 Topic 下的所有 Partition KafkaSource.builder().setTopicPattern(topic.*);Partition 列表订阅指定的 Partition final HashSetTopicPartition partitionSet new HashSet(Arrays.asList(new TopicPartition(topic-a, 0), // Partition 0 of topic topic-anew TopicPartition(topic-b, 5))); // Partition 5 of topic topic-b KafkaSource.builder().setPartitions(partitionSet);3. 消息解析 代码中需要提供一个反序列化器Deserializer来对 Kafka 的消息进行解析。 反序列化器通过 setDeserializer(KafkaRecordDeserializationSchema) 来指定其中 KafkaRecordDeserializationSchema 定义了如何解析 Kafka 的 ConsumerRecord。 如果只需要 Kafka 消息中的消息体value部分的数据可以使用 KafkaSource 构建类中的 setValueOnlyDeserializer(DeserializationSchema) 方法其中 DeserializationSchema 定义了如何解析 Kafka 消息体中的二进制数据。 也可使用 Kafka 提供的解析器 来解析 Kafka 消息体。例如使用 StringDeserializer 来将 Kafka 消息体解析成字符串 import org.apache.kafka.common.serialization.StringDeserializer;KafkaSource.Stringbuilder().setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));4. 起始消费位点 Kafka source 能够通过位点初始化器OffsetsInitializer来指定从不同的偏移量开始消费 。内置的位点初始化器包括 KafkaSource.builder()// 从消费组提交的位点开始消费不指定位点重置策略.setStartingOffsets(OffsetsInitializer.committedOffsets())// 从消费组提交的位点开始消费如果提交位点不存在使用最早位点.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))// 从时间戳大于等于指定时间戳毫秒的数据开始消费.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))// 从最早位点开始消费.setStartingOffsets(OffsetsInitializer.earliest())// 从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest());如果内置的初始化器不能满足需求也可以实现自定义的位点初始化器OffsetsInitializer,如果未指定位点初始化器将默认使用 OffsetsInitializer.earliest(); 5. 有界 / 无界模式 Kafka Source 支持流式和批式两种运行模式。默认情况下KafkaSource 设置为以流模式运行因此作业永远不会停止直到 Flink 作业失败或被取消。 可以使用 setBounded(OffsetsInitializer) 指定停止偏移量使 Kafka Source 以批处理模式运行。当所有分区都达到其停止偏移量时Kafka Source 会退出运行。 流模式下运行通过使用 setUnbounded(OffsetsInitializer) 也可以指定停止消费位点当所有分区达到其指定的停止偏移量时Kafka Source 会退出运行。 6. 其他属性 除了上述属性之外您还可以使用 setProperties(Properties) 和 setProperty(String, String) 为 Kafka Source 和 Kafka Consumer 设置任意属性。KafkaSource 有以下配置项 client.id.prefix指定用于 Kafka Consumer 的客户端 ID 前缀partition.discovery.interval.ms定义 Kafka Source 检查新分区的时间间隔。register.consumer.metrics 指定是否在 Flink 中注册 Kafka Consumer 的指标commit.offsets.on.checkpoint 指定是否在进行 checkpoint 时将消费位点提交至 Kafka broker 请注意即使指定了以下配置项构建器也会将其覆盖 key.deserializer 始终设置为 ByteArrayDeserializervalue.deserializer 始终设置为 ByteArrayDeserializerauto.offset.reset.strategy 被 OffsetsInitializer#getAutoOffsetResetStrategy() 覆盖partition.discovery.interval.ms 会在批模式下被覆盖为 -1 7. 动态分区检查 为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景可以将 Kafka Source 配置为在提供的 Topic / Partition 订阅模式下定期检查新分区。要启用动态分区检查请将 partition.discovery.interval.ms 设置为非负值 KafkaSource.builder().setProperty(partition.discovery.interval.ms, 10000); // 每 10 秒检查一次新分区分区检查功能默认不开启。需要显式地设置分区检查间隔才能启用此功能。 8. 事件时间和水印 默认情况下Kafka Source 使用 Kafka 消息中的时间戳作为事件时间。您可以定义自己的水印策略Watermark Strategy 以从消息中提取事件时间并向下游发送水印 env.fromSource(kafkaSource, new CustomWatermarkStrategy(), Kafka Source With Custom Watermark Strategy);9. 空闲 如果并行度高于分区数Kafka Source 不会自动进入空闲状态。您将需要降低并行度或向水印策略添加空闲超时。如果在这段时间内没有记录在流的分区中流动则该分区被视为“空闲”并且不会阻止下游操作符中水印的进度。 10. 消费位点提交 Kafka source 在 checkpoint 完成时提交当前的消费位点 以保证 Flink 的 checkpoint 状态和 Kafka broker 上的提交位点一致。如果未开启 checkpointKafka source 依赖于 Kafka consumer 内部的位点定时自动提交逻辑自动提交功能由 enable.auto.commit 和 auto.commit.interval.ms 两个 Kafka consumer 配置项进行配置。 注意Kafka source 不依赖于 broker 上提交的位点来恢复失败的作业。提交位点只是为了上报 Kafka consumer 和消费组的消费进度以在 broker 端进行监控。 11. 监控 Kafka source 会在不同的中汇报下列指标。 该指标反映了最后一条数据的瞬时值。之所以提供瞬时值是因为统计延迟直方图会消耗更多资源瞬时值通常足以很好地反映延迟 指标监控参考 12. 安全 要启用加密和认证相关的安全配置只需将安全配置作为其他属性配置在 Kafka source 上即可。下面的代码片段展示了如何配置 Kafka source 以使用 PLAIN 作为 SASL 机制并提供 JAAS 配置 KafkaSource.builder().setProperty(security.protocol, SASL_PLAINTEXT).setProperty(sasl.mechanism, PLAIN).setProperty(sasl.jaas.config, org.apache.kafka.common.security.plain.PlainLoginModule required username\username\ password\password\;);另一个更复杂的例子使用 SASL_SSL 作为安全协议并使用 SCRAM-SHA-256 作为 SASL 机制 KafkaSource.builder().setProperty(security.protocol, SASL_SSL)// SSL 配置// 配置服务端提供的 truststore (CA 证书) 的路径.setProperty(ssl.truststore.location, /path/to/kafka.client.truststore.jks).setProperty(ssl.truststore.password, test1234)// 如果要求客户端认证则需要配置 keystore (私钥) 的路径.setProperty(ssl.keystore.location, /path/to/kafka.client.keystore.jks).setProperty(ssl.keystore.password, test1234)// SASL 配置// 将 SASL 机制配置为 as SCRAM-SHA-256.setProperty(sasl.mechanism, SCRAM-SHA-256)// 配置 JAAS.setProperty(sasl.jaas.config, org.apache.kafka.common.security.scram.ScramLoginModule required username\username\ password\password\;);如果在作业 JAR 中 Kafka 客户端依赖的类路径被重置了relocate class登录模块login module的类路径可能会不同因此请根据登录模块在 JAR 中实际的类路径来改写以上配置。
http://www.dnsts.com.cn/news/145725.html

相关文章:

  • 精美旅游网站案例面试drupal网站开发岗位
  • 个人做动漫资源网站有哪些上海企业查询系统
  • 做视频网站赚钱吗网站备案填写要求吗
  • 做网站需要考虑哪些问题现在宁波做网站
  • 如何免费创建网站平台一般公司网址都怎么写
  • 长春工程建设信息网站中国建设学会查询网站
  • 合肥网站推广优化济南快速网站制作公司
  • 企业综合型网站建设方案宁波网站推广报价
  • 网站建设流程王晴儿html网页开发工具
  • 东莞个人免费建网站网站结构合理
  • 黑龙江建设监理协会网站中国建设银行网上银行网站特点
  • 网站建设 文档下载wordpress 加载jquery
  • 网站目录结构凡客建站官网登录入口
  • 电商网站建设教程怎么创建手机网站
  • 南通仿站定制模板建站织梦网站需要优化
  • 在那个网站做直播好赚钱吗自己注册的公司怎么报税
  • 株洲做网站 省心磐石网络阿里云多网站建设
  • 汕头模版网站建设网站建设怎么在png上写文字
  • 找公司做网站源代码给客户吗wordpress免刷新插件
  • 建设局工程网站百度题库
  • 白城网站建设用帝国cms系统怎么做网站
  • 常熟制作网站的地方在线网页代理极光
  • 做同性恋网站犯法吗商城版免费网站
  • 北京网站优化方法萍乡企业网站建设
  • 北京专业网站的建设网站开发流程人物
  • 扬州专业做网站wordpress ip排行榜
  • 做做网站2023w98免费服务器
  • 怎么按照屏幕比例做网站适应深圳市建设工程造价信息
  • 椒江网站建设公司前端软件开发工程师是什么
  • 网站内容策略建设信用卡银行商城网站