协和医院网站建设目标,忆达城市建设游戏登录网站,网站建设后期需要做什么,深圳网站搭建专业公司当前我们的业务场景#xff0c;是基于dataStream代码#xff0c; 维表数据量很大#xff0c; 实时性要求很高#xff0c;所以采用预加载分区维表模式#xff0c; kafka广播流实时更新配置。
实现方案 1#xff1a;job初始化时 每个分区open 只加载自己那部分的配置…当前我们的业务场景是基于dataStream代码 维表数据量很大 实时性要求很高所以采用预加载分区维表模式 kafka广播流实时更新配置。
实现方案 1job初始化时 每个分区open 只加载自己那部分的配置 不用每个分区都全量加载。 2 配置实时更新 采用kafka topic传到flink job广播流中使用ConfigBroadcastProcessFunction更新分区内的配置信息。
衡量指标
总体来讲关联维表有三个基础的方式实时数据库查找关联Per-Record Reference Data Lookup、预加载维表关联Pre-Loading of Reference Data和维表变更日志关联Reference Data Change Stream而根据实现上的优化可以衍生出多种关联方式且这些优化还可以灵活组合产生不同效果不过为了简单性这里不讨论同时应用多种优化的实现方式。对于不同的关联方式我们可以从以下 7 个关键指标来衡量每个指标的得分将以 1-5 五档来表示:
实现简单性: 设计是否足够简单易于迭代和维护。
吞吐量: 性能是否足够好。
维表数据的实时性: 维度表的更新是否可以立刻对作业可见。
数据库的负载: 是否对外部数据库造成较大的负载负载越低分越高。
内存资源占用: 是否需要大量内存来缓存维表数据内存占用越少分越高。
可拓展性: 在更大规模的数据下会不会出现瓶颈。
结果确定性: 在数据延迟或者数据重放情况下是否可以得到一致的结果。
启动预加载分区维表 对于维表比较大的情况可以启动预加载维表基础之上增加分区功能。简单来说就是将数据流按字段进行分区然后每个 Subtask 只需要加在对应分区范围的维表数据。值得注意的是这里的分区方式并不是用 keyby 这种通用的 hash 分区而是需要根据业务数据定制化分区策略然后调用 DataStream#partitionCustom。比如按照 userId 等区间划分0-999 划分到 subtask 11000-1999 划分到 subtask 2以此类推。而在 open() 方法中我们再根据 subtask 的 id 和总并行度来计算应该加载的维表数据范围。 启动预加载分区维表介绍 通过这种分区方式维表的大小上限理论上可以线性拓展解决了维表大小受限于单个 TaskManager 内存的问题现在是取决于所有 TaskManager 的内存总量但同时给带来设计和维护分区策略的复杂性。
缓存方式 之前业务场景是采用的第一种 但是配置数据量越来越大已经不能支撑业务所以模拟调研第三种方式设计和维护分区策略
代码实验 Flink设置4个并行度 2个taskmanager
-m yarn-cluster -p 4 -yjm 1024m -ytm 2048m -ynm $application_name -ys 2 采用自定义Partition设计和维护分区策略数据流和维表connect
.filter(_.nonEmpty)
.map(_.get)
.partitionCustom(new CustomPartitioner(),data {s${data.datas.controlPlanId}
})
.connect(indicatorConfigBroadcastStream)
.process(new FdcIndicatorConfigBroadcastProcessFunction)
.name(FdcGenerateIndicator)
.uid(FdcGenerateIndicator)自定义Partition分区类
import org.apache.flink.api.common.functions.Partitioner
import org.slf4j.{Logger, LoggerFactory}class CustomPartitioner extends Partitioner[String]{lazy private val logger: Logger LoggerFactory.getLogger(classOf[CustomPartitioner])override def partition(key: String, numPartitions: Int): Int {logger.warn(分区总数numPartitions)return (key.hashCode % numPartitions).abs}
}BroadcastProcessFunction
class ConfigBroadcastProcessFunctionextends BroadcastProcessFunction[fdcWindowData, JsonNode,(ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])] {lazy private val logger: Logger LoggerFactory.getLogger(classOf[FdcIndicatorConfigBroadcastProcessFunction])// 初始化override def open(parameters: Configuration): Unit {logger.warn(sgetIndexOfThisSubtask: ${getRuntimeContext.getIndexOfThisSubtask})logger.warn(sgetNumberOfParallelSubtasks: ${getRuntimeContext.getNumberOfParallelSubtasks})super.open(parameters)// 获取全局变量val p getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]ProjectConfig.getConfig(p)}// 数据流override def processElement(windowData: fdcWindowData, ctx: BroadcastProcessFunction[fdcWindowData,JsonNode, (ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]#ReadOnlyContext,out: Collector[(ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]): Unit {logger.warn(s${getRuntimeContext.getIndexOfThisSubtask})}// 广播流override def processBroadcastElement(value: JsonNode, ctx: BroadcastProcessFunction[fdcWindowData, JsonNode, (ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]#Context,out: Collector[(ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]): Unit {}
}打印结果 taskmanager1 open的时候打印信息 taskmanager2 open的时候打印信息 当数据流来时 processElement中的打印信息 参考 https://blog.csdn.net/weixin_44904816/article/details/104305824 https://codeantenna.com/a/IcVVHYGUVi
https://www.jianshu.com/p/66b014dd2e36
https://blog.csdn.net/cloudbigdata/article/details/125013545