一键网站制作app,wordpress 搜索伪静态,光明新区城市建设局网站,杭州做网站比较出名的公司有哪些目录
一、背景
二、总体架构
三、ETL实践
3.1 批量数据的导入
3.2 实时数据接入
3.3 数据加工
3.4 BI 查询
四、实时需求响应
五、其他经验
5.1 Doris BE内存溢出
5.2 SQL任务超时
5.3 删除语句不支持表达式
5.4 Drop 表闪回
六、未来展望 原文大佬的这篇Doris数…目录
一、背景
二、总体架构
三、ETL实践
3.1 批量数据的导入
3.2 实时数据接入
3.3 数据加工
3.4 BI 查询
四、实时需求响应
五、其他经验
5.1 Doris BE内存溢出
5.2 SQL任务超时
5.3 删除语句不支持表达式
5.4 Drop 表闪回
六、未来展望 原文大佬的这篇Doris数仓建设案例有借鉴意义这里摘抄下来用作学习和知识沉淀。
一、背景 特步集团有限公司是中国领先的体育用品企业之一为了提高特步零售 BI 主题数据分析的准确性和时效性2020 年11月特步集团首次引入了 Doris 进行数据仓库搭建试点。在项目实践过程中遇到了很多困难也解决了很多问题这里总结出来分享给大家。
二、总体架构 在特步零售数据仓库的项目中我们大胆的抛弃了传统的Hive离线数据处理模式基于Apache Doris 集群完成接口数据的接入、数仓层的建模和加工、以及 BI 报表的即时查询。 先展开说明一下这样设计的原因。在前期的项目经历中我们既有过基于 HiveGreenplum 搭建卡宾零售 BI 项目的经验也有基于 GreenplumMySQL 搭建斐乐 BI 项目的经验还有基于 HiveDoris 的安踏户外 BI 项目经验得到的结论有 ①MPP架构开发效率高查询和跑批速度远高于 Hive 数仓②MPP 架构支持有限的Delete和Update开发的灵活度更好③项目交付两套环境运维难度很大④ Doris 在架构设计上比 Greenplum 更为领先对 OLAP 支持更好查询性能也更高。基于以上原因加上 Hive 数据处理和查询的时效性无法满足业务需求所以我们坚定的选择选择了 Apache Doris 作为特步零售数据仓库的唯一大数据平台。 确定项目选型以后我们讨论了数据仓库的分层设计。项目最先启动的是特步儿童 BI 项目考虑到系统业务数据存在多个来源复杂的业务指标口径以及来源相同的不同品牌需要进行拆分我们在数据仓库层采用了 DWD、DWB、DWS 三层加工。 数据仓库分层逻辑如下
1DWD 模型层关联维度数据的加工和明细数据的简答整理包括商品拆箱处理、命名统一、数据粒度统一等。DWD层的销售库存明细数据均按照系统加工每个系统的加工逻辑创建一张视图结果对应一张物理表。DWD层大部分采用Duplicate Key 模型部分能明确主键的采用Unique Key 模型。
2DWB 基础层保留共性维度汇总数据到业务日期、店铺、分公司、SKC 粒度。销售模块 DWB 层合并了不同系统来源的电商数据、线下销售数据、库存明细数据还关联了维度信息增加数据过滤条件加工了分公司维度确保 DWS 层可以直接使用。DWB层较多采用 Duplicate Key 模型便于按照 Key 删除数据也有部分采用Aggregate Key 模型。
3DWS 汇总层将DWB加工结果宽表化并按照业务需求加工本日、本月、本周、本年、昨日、上月、上周、上年及每个标签对应的同期数据。DWS层较多采用Duplicate Key 模型也有部分采用Aggregate Key 模型。DWS层完成了指标的汇总和维度的拓展为报表提供了统一的数据来源。
三、ETL实践 本次项目采用了自研的一站式DataOps大数据管理平台完成系统数据的抽取、加载和转换以及定时任务的执行等。在数据分层标准之下关于 ETL 实践我们主要完成了一些内容
3.1 批量数据的导入 批量数据导入我们采用的是目前最主流的开源组件 DataX。自研的DataOps大数据管理平台在开源DataX的基础上做了很多封装我们只需要创建数据同步任务选择数据来源和数据目标表即可自动生成字段映射和 DataX规范的 Json 配置文件。 图 2 - 启数道平台 在项目初期Doris 未发布 DataX 插件仅通过原始的 JDBC 插入数据达不到性能要求。产品团队开发了 DataX 加速功能先将对应数据抽取到本地文件然后通过 Stream Load 方式加载入库可以极大的提升数据抽取速度。数据读取到本地文件取决于网络宽带和本地读写性能数据加载达到了 2000 千万数据 12.2G 仅需 5 分钟的效果。 图 3 - DataX 加速 此外DataX 数据同步还支持读取自定义 SQL 的方式通过自定义 SQL 可以处理 SQL SERVER 这种数据库比较难解决的字符转换问题和偶尔出现的乱码字符问题。批量数据同步还支持增量模式通过抽取最近 7 天的数据配合 Doris 的主键模型可以轻松解决大部分业务场景下的增量数据抽取。
3.2 实时数据接入 在实时数据接入方面由于接入的实时数据都来自于阿里云的 DRDS所以我们采用的是 CanalKafkaRoutine Load 模式。详细的配置就不展开了环境搭建完成以后只需要取Canal里面配置拦截策略将表对应的流数据映射成Kafka Topic然后去Doris创建 Routine Load 就 OK 了这里举一个 Routine Load 的案例。 ALTER TABLE DS_ORDER_INFO ENABLE FEATURE BATCH_DELETE
CREATE ROUTINE LOAD t02_e3_zy.ds_order_info ON DS_ORDER_INFO
WITH MERGE
COLUMNS(order_id, order_sn, deal_code, ori_deal_code, ori_order_sn,crt_time now(), cdc_op),
DELETE ON cdc_opDELETEPROPERTIES(desired_concurrent_number1,max_batch_interval 20,max_batch_rows 200000,max_batch_size 104857600,strict_mode false,strip_outer_array true,format json,json_root $.data,jsonpaths [\$.order_id\,\$.order_sn\,\$.deal_code\,\$.ori_deal_code\,\$.ori_order_sn\,\$.type\ ])
FROM KAFKA(kafka_broker_list 192.168.87.107:9092,192.168.87.108:9092,192.168.87.109:9092,kafka_topic e3_order_info,kafka_partitions 0,kafka_offsets OFFSET_BEGINNING,property.group.id ds_order_info,property.client.id doris
);
3.3 数据加工 本次项目的数据加工我们是通过Doris视图来完成的。利用Doris优秀的索引能力加上完善的SQL语法支持即使再复杂的逻辑也可以通过视图来实现。用视图加工数据减少了代码发布的流程避免了编译错误的问题比Hive的脚本开发更加高效。 在完成模型设计以后我们会确定模型表的命名、Key 类型等信息。完成表的创建以后我们会创建表名 _v 的视图用于处理该表数据的逻辑加工。在大多数情况下我们都是先清空目标表然后从视图读取数据写入目标表的所以我们的调度任务都比较简单例如
truncate table xtep_dw.dim_shop_info;
insert into xtep_dw.dim_shop_info
select * from xtep_dw.dim_shop_info_v; 对于数据量特别大的读写任务则需要分步写入。例如
truncate table xtep_dw.dwd_god_allocation_detail_drp;insert into xtep_dw.dwd_god_allocation_detail_drp
select * from xtep_dw.dwd_god_allocation_detail_drp_v
where UPDATE_DATE BETWEEN 2020-01-01 and 2020-06-30; insert into xtep_dw.dwd_god_allocation_detail_drp
select * from xtep_dw.dwd_god_allocation_detail_drp_v
where UPDATE_DATE BETWEEN 2020-07-01 and 2020-12-31; insert into xtep_dw.dwd_god_allocation_detail_drp
select * from xtep_dw.dwd_god_allocation_detail_drp_v
where UPDATE_DATE BETWEEN 2021-01-01 and 2021-06-30; insert into xtep_dw.dwd_god_allocation_detail_drp
select * from xtep_dw.dwd_god_allocation_detail_drp_v
where UPDATE_DATE BETWEEN 2021-07-01 and 2021-12-31; insert into xtep_dw.dwd_god_allocation_detail_drp
select * from xtep_dw.dwd_god_allocation_detail_drp_v
where UPDATE_DATE BETWEEN 2022-01-01 and 2022-06-30; insert into xtep_dw.dwd_god_allocation_detail_drp
select * from xtep_dw.dwd_god_allocation_detail_drp_v
where UPDATE_DATE BETWEEN 2022-07-01 and 2022-12-31; 但是视图开发的模式也是有一定的弊端的比如不能做版本管理也不便于备份。为此我们承受了一次惨痛教训在项目测试阶段有同事在 dbwaver 客户端误操作 Drop 掉了 xtep_dw 数据库导致我们花费了 3-4 天时间才恢复程序。因此我们紧急开发了 Python 备份程序 #!/usr/bin/python
# -*- coding: UTF-8 -*-
import pymysql
import json
import sys
import os
import time
if sys.getdefaultencoding() ! utf-8:reload(sys)sys.setdefaultencoding(utf-8)BACKUP_DIR/data/cron_shell/database_backup
BD_LIST[ods_add,ods_cdp,ods_drp,ods_e3_fx,ods_e3_jv,ods_e3_pld,ods_e3_rds1,ods_e3_rds2,ods_e3_zy,ods_hana,ods_rpa,ods_temp,ods_xgs,ods_xt1,t01_hana,xtep_cfg,xtep_dm,xtep_dw,xtep_fr,xtep_rpa] if __name__ __main__:basepath os.path.join(BACKUP_DIR,time.strftime(%Y%m%d-%H%M%S, time.localtime())) print basepath if not os.path.exists(basepath):# 如果不存在则创建目录 os.makedirs(basepath) # 连接databaseconn pymysql.connect(host192.168.xx.xx,port9030,userroot,password****,databaseinformation_schema,charsetutf8)# 得到一个可以执行SQL语句的光标对象cursor conn.cursor() # 执行完毕返回的结果集默认以元组显示for dbname in BD_LIST:##生成数据库的文件夹dbpath os.path.join(basepath ,dbname)print(dbpath)if not os.path.exists(dbpath):# 如果不存在则创建目录 os.makedirs(dbpath) sql1 select TABLE_SCHEMA,TABLE_NAME,TABLE_TYPE from information_schema.tables where TABLE_SCHEMA %s;%(dbname)print(sql1)# 执行SQL语句cursor.execute(sql1) for row in cursor.fetchall(): tbname row[1]filepath os.path.join(dbpath ,tbname .sql)print(u%s表结构将备份到路径%s%(tbname,filepath))sql2 show create table %s.%s%(dbname,tbname) print(sql2)cursor.execute(sql2)for row in cursor.fetchall(): create_sql row[1].encode(GB18030)with open(filepath, w) as fp:fp.write(create_sql)# 关闭光标对象cursor.close()# 关闭数据库连接conn.close() 然后配合 Linux 的 Crontab 定时任务每天三次备份代码。
#备份数据库每天执行三次8、12、18点各一次
0 8,12,18 * * * python /data/cron_shell/backup_doris_schema.py /data/cron_shell/logs/backup_doris_schema.log
3.4 BI 查询 本次项目采用的前端工具是某国产 BI 软件和定制化开发的 E-Charts 大屏。该 BI 软件是基于数据集为中心去构建报表并且支持灵活的数据权限管理。大多数据情况下我们基于SQL查询创建数据集可以有效过滤数据。本次项目基于该BI 平台构建了 PC端报表通过 App 适配手机也可以直接访问并且新增了自助分析报表同步开发了几张 E-Charts 大屏实现大屏、中屏、小屏的统一。 数据查询对 Doris 来说是很 Easy 的了基本上建好表以后设置好索引利用好 Bitmap 性能就不会差。这里需要说明的上在性能压力不大的情况下合理使用视图来关联多个结果集可以减少跑批的任务和数据处理层级有利于报表数据的快速刷新。在这方面我们也是尽可能减少 DWS 和 ADS 层的聚合模型减少大数据量的读写尽可能复用逻辑代码和模型表减少跑批时间加强系统稳定性。
四、实时需求响应 在实时的需求方面我们分别尝试了 Lambda 架构和 Kappa 架构最后走出来项目特色的第三条线路——相同的视图逻辑用不同的调度任务刷新不同范围的数据实现流批代码复用。 项目早期我们是按照 Lambda 架构构建的任务系统数据分为批处理和流处理两条线路随着批处理的稳定流处理数据的不准确性就逐步暴露。究其原因业务系统存在数据物理删除和更新的情况双流Join之后的数据准确性得不到保障。再有就是同时维护两套代码实时逻辑的更新滞后变更逻辑的代价也比较大。 项目后期基于业务要求我们也尝试了把所有的零售逻辑搬迁到流处理平台以实现流批一体但是发现无法处理报表上常规要求本同期对比、维度数据变更和复杂条件过滤导致搬迁工作半途而废。 最后我们结合项目的实际情况采用批处理和微批处理结合的方式一套代码两种跑批模式。T1的链路执行最近6个月或者全量数据的刷新微批处理流程刷新当日数据或者本周数据实现数据的快速迭代更新。以 DWB 层为例
--批处理任务每日夜间执行一次
truncate table xtep_dw.dwb_ret_sales;
insert into xtep_dw.dwb_ret_sales
select * from xtep_dw.dwb_ret_sales_v;
--微批任务白天8-24点每20分钟执行一次
delete from xtep_dw.dwb_ret_sales where report_date${curdate};
insert into xtep_dw.dwb_ret_sales
select * from xtep_dw.dwb_ret_sales_v
where report_date${curdate}; 而 DWS 和 ADS 层的情况更为复杂由于跑批频率太高为了避免出现用户查看报表时刚好数据被删除的情况我们采用分区替换的方式来实现数据的无缝切换。
--批处理任务每日夜间执行一次
truncate table xtep_dw.dws_ret_sales_xt_swap;insert into xtep_dw.dws_ret_sales_xt_swap
select * from xtep_dw.dws_ret_sales_xt_v
where date_tag in (本日,本周,本月,本年);insert into xtep_dw.dws_ret_sales_xt_swap
select * from xtep_dw.dws_ret_sales_xt_v
where date_tag in (昨日,上周,上月,上年);ALTER TABLE xtep_dw.dws_ret_sales_xt REPLACE WITH TABLE dws_ret_sales_xt_swap;--微批任务白天8-24点每20分钟执行一次
-- 分区替换的方式来实现数据的无缝切换
truncate table xtep_dw.dws_ret_sales_xt_swap;insert into xtep_dw.dws_ret_sales_xt_swap
select * from xtep_dw.dws_ret_sales_xt_v
where date_tag in (本日,本周);ALTER TABLE xtep_dw.dws_ret_sales_xt ADD TEMPORARY PARTITION tp_curr1 VALUES IN (本日,本周);insert into xtep_dw.dws_ret_sales_xt TEMPORARY PARTITION (tp_curr1) SELECT * from xtep_dw.dws_ret_sales_xt_swap;ALTER TABLE xtep_dw.dws_ret_sales_xt REPLACE PARTITION (p_curr1) WITH TEMPORARY PARTITION (tp_curr1); 希望Doris 的分区替换还需要再完善一下未来可以支持类似于 ClickHouse 的语法即ALTER TABLE table2 REPLACE PARTITION partition_expr FROM table1; 同时由于我们设计了良好的分层架构对于实时性要求特别高的数据例如双十一大屏我们可以直接从ODS层汇总数据导报表层可以实现秒级的实时查询对于实时性较高的业务例如移动端实时日报我们从DWD 或者 DWB 往上汇总数据可以实现分钟级的实时对于普通的自助分析或者固定报表则按照灵活的频率更新数据兼顾了二者的时效性和准确性。
五、其他经验 在项目过程中我们还遇到一些其它问题这里简单总结一下。
5.1 Doris BE内存溢出 查询任务耗用的内存过大导致 Doris BE 挂掉的情况我们也出现过。我们采取的方法是所有表都创建 3 副本然后给 Doris 进程配置 Supervisord 自启动进程失败的任务通过调度平台的重试功能一般都可以在 3 次重试机会以内跑过。
5.2 SQL任务超时
批处理过程中确实会有一些复杂的任务或者写入数据太多的任务会超时除了调大 timeout 参数目前设置为 10 分钟以外我们还把任务做了切分。前面的调度任务案例已经可以看到有些写入的 SQL 我们是按照分区字段或者日期区间来分批计算和写入的。
5.3 删除语句不支持表达式 删除语句不支持表达式我认为是 Doris 后续需要优化的一个功能点。在 Doris 无法实现的情况下我们通过改造调度平台的参数功能先计算好参数值然后传入变量的方式实现了动态条件删除数据。前文的调度任务代码也有案例。
5.4 Drop 表闪回 误删除重要的表是数据仓库开发过程中比较常见的情况表结构我们可以通过 Python 做好备份但是表数据实在没有更好的办法。这里 Doris 提供了一个很好的功能——Recover 功能推荐给大家。误删除的表在 1 天以内可以支持闪回。
六、未来展望 目前 Apache Doris 在特步集团的应用已经得到了用户的认可今年 2 月底又对 Doris 集群进行了硬件升级接下来会基于现有的接口数据拓展到特步品牌 BI 应用并且迁移更多的 HANA 数仓应用到 Doris 平台。 随着应用的深入我们需要加强对 Doris 集群、Routine Load 和 Flink 任务的监控及时发出异常预警缩短故障恢复时间。同时随着向量化引擎的逐步成熟和查询优化器的进一步完善我们需要调整一些 SQL 写法降低批处理对系统资源的占用让集群更好的同时服务批处理和查询需求。当然也期待社区在资源隔离方面可以有更进一步的完善。 参考文章
应用实践 | 特步集团基于 Apache Doris 的零售数据仓库项目实践 - ApacheDoris的个人空间 - OSCHINA - 中文开源技术交流社区