落伍者论坛 做网站,华为云云速建站教程,wordpress 登录显示,中华建设网官网背景
项目中有很多ods层#xff08;mysql 通过cannal#xff09;kafka#xff0c;需要对这些ods kakfa做一些etl操作后写入下一层的kafka#xff08;dwd层#xff09;。
一开始采用的是executeSql方式来执行每个ods→dwd层操作#xff0c;即类似#xff1a; def main(…背景
项目中有很多ods层mysql 通过cannalkafka需要对这些ods kakfa做一些etl操作后写入下一层的kafkadwd层。
一开始采用的是executeSql方式来执行每个ods→dwd层操作即类似 def main(args: Array[String]): Unit {val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment StreamTableEnvironment.create(env)val configuration: Configuration tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction(etl_handle, classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dml,在insert语句中调用etl_handle进行预处理和写入tableEnv.executeSql(INSERT_DWD_TABLE1)tableEnv.executeSql(INSERT_DWD_TABLE2)...
}
当有多个ods-dwd操作放在同一个flink作业中时发现这种方式会导致每次insert操作都是单独的DAG,非常消耗资源特别是这些处理都是比较轻量级的最好是能融合在同一个DAG中共享资源。 解决方案
查看flink文档INSERT 语句 | Apache Flink 因此可以采用statementset的方式将不同insert sql进行分组执行每组的insert sql会先被缓存到 StatementSet 中并在StatementSet.execute() 方法被调用时同一组的 insert sql(sink) 会被优化成一张DAG共用taskmanager减少资源浪费即类似
def main(args: Array[String]): Unit {val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment StreamTableEnvironment.create(env)val configuration: Configuration tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction(etl_handle, classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dmltableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE1).addInsertSql(INSERT_DWD_TABLE2).addInsertSql(INSERT_DWD_TABLE3).execute()tableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE4).addInsertSql(INSERT_DWD_TABLE5).addInsertSql(INSERT_DWD_TABLE6).execute()
} 其他
如果是纯flink sql而不用data stream api,也是可以达到同样的效果的。