当前位置: 首页 > news >正文

落伍者论坛 做网站华为云云速建站教程

落伍者论坛 做网站,华为云云速建站教程,wordpress 登录显示,中华建设网官网背景 项目中有很多ods层#xff08;mysql 通过cannal#xff09;kafka#xff0c;需要对这些ods kakfa做一些etl操作后写入下一层的kafka#xff08;dwd层#xff09;。 一开始采用的是executeSql方式来执行每个ods→dwd层操作#xff0c;即类似#xff1a; def main(…背景 项目中有很多ods层mysql 通过cannalkafka需要对这些ods kakfa做一些etl操作后写入下一层的kafkadwd层。 一开始采用的是executeSql方式来执行每个ods→dwd层操作即类似 def main(args: Array[String]): Unit {val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment StreamTableEnvironment.create(env)val configuration: Configuration tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction(etl_handle, classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dml,在insert语句中调用etl_handle进行预处理和写入tableEnv.executeSql(INSERT_DWD_TABLE1)tableEnv.executeSql(INSERT_DWD_TABLE2)... } 当有多个ods-dwd操作放在同一个flink作业中时发现这种方式会导致每次insert操作都是单独的DAG,非常消耗资源特别是这些处理都是比较轻量级的最好是能融合在同一个DAG中共享资源。 解决方案 查看flink文档INSERT 语句 | Apache Flink 因此可以采用statementset的方式将不同insert sql进行分组执行每组的insert sql会先被缓存到 StatementSet 中并在StatementSet.execute() 方法被调用时同一组的 insert sql(sink) 会被优化成一张DAG共用taskmanager减少资源浪费即类似 def main(args: Array[String]): Unit {val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment StreamTableEnvironment.create(env)val configuration: Configuration tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction(etl_handle, classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dmltableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE1).addInsertSql(INSERT_DWD_TABLE2).addInsertSql(INSERT_DWD_TABLE3).execute()tableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE4).addInsertSql(INSERT_DWD_TABLE5).addInsertSql(INSERT_DWD_TABLE6).execute() } 其他 如果是纯flink sql而不用data stream api,也是可以达到同样的效果的。
http://www.dnsts.com.cn/news/226850.html

相关文章:

  • 物流网站购买物流单号apache做网站
  • 如何建立温州seo顾问
  • 企业网站开发实训总结seo一个月赚多少钱
  • 哪一些网站可以开户做百度广告做网站开通手机验证功能
  • 海阳有没有做企业网站的discuz论坛和网站同步登录
  • 石河建设技校网站网上购物网站建设
  • 公司网页网站建搞笑证书图片在线制作
  • 福州p2p网站建设公司外贸获客软件排名前十名
  • 给小公司做网站赚钱吗注册公司100万意味着什么
  • 做淘客需要用的网站iis怎么做IP网站
  • 网站开发处理大量用户请求网络营销课程的心得体会
  • 网站开发合作合同范本德州金航网站建设
  • 电商网站开发教程漳州 外贸网站建设 SEO
  • 网站的外链是什么石油化工建设工程网站
  • 网站介绍模版丹阳网站建设公司
  • wordpress 几天前搜索引擎优化技术
  • 可以做视频的网站销售培训主要培训内容
  • 河南省住房和城乡建设厅查询网站首页电商网站开发的代价
  • html5炫酷网站推广甘肃省
  • 对网站建设培训的建议wordpress图片切换插件
  • 企业模式网站列表管理器网站注册 英文
  • 手机网站快速建设百度云网盘
  • 大气企业网站源码卖淘宝店铺平台有哪些
  • 自己怎样建设网站wordpress 导入demo
  • 中国五码一级做爰网站电商开源
  • 装修公司怎么做免费网站手机如何打开wordpress
  • 网站app的区别是什么意思二手闲置平台网站怎么做
  • 深圳网站建设中心工信部网站备案查询官网
  • 自己做qq头像静态的网站制作公司网页软件
  • 爱淘宝网页网站建设注册城乡规划师报考条件2022