建电影网站教程,做网站订房网站,软件代理商招募,wordpress免费图床插件涤生大数据实战#xff1a;基于FlinkODPS历史累计计算项目分析与优化#xff08;一#xff09;
1.前置知识
ODPS#xff08;Open Data Platform and Service#xff09;是阿里云自研的一体化大数据计算平台和数据仓库产品#xff0c;在集团内部离线作为离线数据处理和存…涤生大数据实战基于FlinkODPS历史累计计算项目分析与优化一
1.前置知识
ODPSOpen Data Platform and Service是阿里云自研的一体化大数据计算平台和数据仓库产品在集团内部离线作为离线数据处理和存储的产品。离线计算任务节点叫做Odps节点存储的离线表叫做Odps表
Flink: 实时计算引擎本文代码开发和测试均基于集团内部实时计算平台代码细节可能会和Flink 官方社区文档有些许不同假如用于生产环境测试参考Apache Flink 官方文档为准但是技术方案是通用的哈
https://flink.apache.org/posts/
2.项目背景
现有业务需求是 “根据用户注册以来的累计跑步里程给用户发放勋章”需要实时的计算出用户【历史此时刻】的累计跑步数据。
比如说某个用户20210101首次上传跑步记录之后又多次上传跑步记录我们需要实时的计算出在20210101当前时刻 期间该用户累计跑了多少公里累计跑了多少次等指标。上述指标的计算涉及用户历史至今的所有数据(2018~至今该用户所有数据)考虑使用批流结合的方式进行统计。参考批流结合的常用 lambda 方案 我们将其拆分到“实时离线”两条链路分别计算离线链路计算用户历史至昨日的累计数据data1实时链路计算当日实时累计数据data2。然后在对两条链路的数据进行汇总data1data2即为用户历史至今日此时刻的累计数据。 这里离线链路使用odps来做实时计算使用Flink来做数据存储涉及 hbase、odps所用消息中间件是MQ。
3.解决方案
3.1 方案描述
离线链路设计
离线链路计算目的为了计算出全量用户【历史至昨日】的累计数据。
任务初始化时先将历史的存量数据全量计算一次得到存量累计值以后每日计算用户昨日的新增数据即新增累计值 两者相加即为用户历史至昨日的累计数据循环往复即可每日更新历史累计数据。
对应的数据链路应该长这样 离线链路计算流程如下
step1用户历史数据初始化。假设该计算任务发布的时间为20231010首先要对用户 历史20231009 期间的历史数据进行汇总得到一个 历史存量累计数据 history_data
step2从20231010起对用户每日的增量跑步数据进行汇总得到该日的增量累计数据 day_data
step3将每日的增量累计数据day_data 与 历史存量累计数据history_data 进行求和作为新的历史存量累计数据 history_data(T-1) day_data(T-1) history_data(T-2)
step4重复 step2 和step3 每日更新历史存量累计数据 history_data 。
该方案的优点是历史全量数据只用计算一次每日只需计算增量部分后再与存量合并即可节省计算资源。
实时链路设计
实时链路计算目的实时计算出用户【当日零点至此刻】的累计数据
实时链路的计算逻辑比较简单对应的计算链路示意图如下 实时链路计算流程如下
step1用户新增的跑步记录通过MQ发送给Flink任务
step2Flink节点1对数据去重
step3Flink节点2对实时汇总统计 当日零点至此刻 用户的跑步累计数据step4将计算结果输出给下游。
实时离线链路融合
实时离线链路融合目的实时得到用户历史至此时刻的汇总数据
从上述的离线、实时链路中我们分别得到了用户【历史昨日】累计数据和【当日凌晨此刻】累计数据只需将两者相加即可实时得到用户【历史此刻】的累计数据 ODPS 计算出用户 [非当日的历史累计数据]为使用方便会每天更新全量用户历史累计数据 使用Flink节点1 实时计算用户当日上传的跑步累计数据 使用 Flink节点2 实时的将离线数据和实时数据汇总起来 将汇总结果写入Hbase结果表同时发送个MQ消息给下游业务方。 这里需要有两点需要注意
1、根据业务特点这里将离线计算结果作为维表使用
Flink任务的下游业务方更关注当日上传过跑步记录的用户的数据更新情况ODPS结果表作为维表用Flink任务只对当日上传跑步记录的用户进行查询得到“非当日历史统计数据”在与“当日新增跑步数据”相加即可得到该历史至今的最终的统计数据更新hbase结果表符合需求
我们的跑步用户中大部分的用户不会每天都上传跑步记录这些人的结果数据不会发生改变。若将ODPS表作为源表则依旧会为这些用户更新数据浪费计算资源。
【优化】odps表作为维表不适合大数据量的情况大数据量使用hbase表作为维表比较合适。这里将odps表数据同步到hbase表中再拿该hbase表作为维表。
2、初始化下游结果表在整个任务跑起来前需要先使用ODPS表的bizdate分区数据初始化hbase结果表然后再由实时任务对结果表进行更新
最终的方案示意图如下 3.2 存在的问题
上面的lambda方案有个问题每日凌晨零点过后实时任务已开始计算新的一天数据而离线任务计算尚未结束这时会出现一个离线数据缺失的窗口期。重点分析一下框图中“实时数据离线数据”的部分 正常情况
当一个用户在T日实时上传了自己的跑步记录Flink节点1会计算出其 [当日0点起至此刻] 的跑步累计数据data1Flink节点2会根据该用户id取hbase维表里查询其 [历史T-1日] 的累计数据 data2 (hbase表里数据由odps每日更新即T-1日的存量累计汇总数据)将data1和data2二者汇总就可得到 用户历史至此时刻的汇总数据
异常情况
在凌晨(比如说在00:0000:30ODPS正在计算最新分区数据(T-1日的数据)的期间新的分区还没生成完或者ODPS计算已经完成但odps表同步base表同步任务还未完成此时若发生了查询会发生什么
会使用老分区的数据(T-2日的数据而不是期望的T-1日数据)导致数据不准。
【问题描述】
在凌晨时分ODPS计算T-1日数据期间如果发生了对T-1日的数据查询则无法获取到期望的T-1日数据会继续使用T-2日的数据
这里“无法获取正确数据”的时间长度 ODPS计算时间 ODPS同步数据到Hbase的时间
【原因】
Flink查询维表时 使用维表当前的数据快照本次查询完成后再发生的维表更新不会对已有查询造成影响。
【举例】
case1ODPS计算未完成
27号Flink任务计算27号当天的用户累计数据同时查询odps维表的 26号分区 中该用户的历史累计数据两者相加得到27号的实时累计结果
28号凌晨ODPS正在计算27号分区的数据任务还未结束27号分区数据尚不可用而Flink任务已经开始计算28号当天的用户累计数据此刻发生了一次维表查询期望从维表中查到该用户27号统计的历史累计数据然而由于27号数据未准备好则维表会返回26号的历史累计数据这会导致数据计算错误相当于丢失了该用户27号的数据。
case2ODPS计算完成但odps表同步habse表任务未完成
28号凌晨ODPS的计算已完成odps表正在同步数据到hbase表期间如果Flink发生了查询期望获取用户27号的最新数据但由于还没有更新完成还是会用26号的数据会造成类似的错误结果。
上面所述问题是批流融合的 lambda 框架常会遇到的问题因此必须思考优化方案来解决上述问题。优化方案将在下一篇文章展现敬请期待