一个空间 多个网站,超云seo优化,问卷调查网站哪个好,自己制作菜单的app视频地址#xff1a;尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录
第7章 数仓开发之ODS层
P015
第8章 数仓开发之DIM层
P016
P017
P018
P019
01、node001节点Linux命令
02、KafkaUtil.java
03、DimSinkApp.java
P020
P021
P022
P023 第7章 数… 视频地址尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录
第7章 数仓开发之ODS层
P015
第8章 数仓开发之DIM层
P016
P017
P018
P019
01、node001节点Linux命令
02、KafkaUtil.java
03、DimSinkApp.java
P020
P021
P022
P023 第7章 数仓开发之ODS层
P015 第7章 数仓开发之ODS层 采集到 Kafka 的 topic_log 和 topic_db 主题的数据即为实时数仓的 ODS 层这一层的作用是对数据做原样展示和备份。 8.2.2 动态拆分维度表功能 由于Maxwell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个维度表拆开处理。 在实时计算中一般把维度数据写入存储容器一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。 这样的配置不适合写在配置文件中因为这样的话业务端随着需求变化每增加一张维度表表就要修改配置重启计算程序。所以这里需要一种动态配置方案把这种配置长期保存起来一旦配置有变化实时计算可以自动感知。这种可以有三个方案实现 一种是用Zookeeper存储通过Watch感知数据变化 另一种是用mysql数据库存储周期性的同步 再一种是用mysql数据库存储使用广播流。 这里选择第三种方案主要是MySQL对于配置数据初始化和维护管理使用FlinkCDC读取配置信息表将配置流作为广播流与主流进行连接。 第8章 数仓开发之DIM层
P016 8.1.1 Flink CDC 基于 Flink SQL CDC的实时数据同步方案https://github.com/ververica/flink-cdc-connectors P017 8.2 主要任务 package com.atguigu.edu.realtime.app.dim;import com.atguigu.edu.realtime.util.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DimSinkApp {public static void main(String[] args) {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据//env.fromSource();// TODO 3 对主流数据进行ETL// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务}
}
package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class EnvUtil {/*** 环境准备及状态后端设置获取对应的环境** param parallelism Flink 程序的并行度* return Flink 流处理环境对象*/public static StreamExecutionEnvironment getExecutionEnvironment(Integer parallelism) {//TODO 1 环境创建准备StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//设置并发env.setParallelism(parallelism);//TODO 2 设置状态后端env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);//设置超时时间env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);//设置最小间隔时间env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.minutes(1)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(hdfs://node001:8020/edu/ck);System.setProperty(HADOOP_USER_NAME, atguigu);return env;}
}
P018
package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSourceString getKafkaConsumer(String topic, String groupId) {return KafkaSource.Stringbuilder()// 必要参数
// .setBootstrapServers(EduConfig.KAFKA_BOOTSTRAPS)//“node001:9092”.setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchemaString() {Overridepublic String deserialize(byte[] message) throws IOException {if (message ! null message.length ! 0) {return new String(message);}return null;}Overridepublic boolean isEndOfStream(String nextElement) {return false;}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();}
}
P019 01、node001节点Linux命令
[atguigunode001 bin]$ jpsallnode001
4803 QuorumPeerMain
5236 Kafka
7941 Maxwell
5350 Application
6726 ConsoleConsumer
4458 NodeManager
8810 Jps
4043 DataNode
3869 NameNode
4654 JobHistoryServernode002
3505 ResourceManager
4066 QuorumPeerMain
4490 Kafka
5179 Jps
3660 NodeManager
3263 DataNodenode003
3505 SecondaryNameNode
5777 Jps
4369 Application
4279 Kafka
4569 Application
3354 DataNode
3851 QuorumPeerMain
3659 NodeManager
[atguigunode001 bin]$ 启动hadoop、maxwell、kafka。 [atguigunode001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic topic_db [atguigunode001 ~]$ cd ~/bin [atguigunode001 bin]$ mysql_to_kafka_init.sh al {database:edu,table:video_info,type:bootstrap-insert,ts:1645429973,data:{id:5410,video_name:day20_11复习_总结.avi,during_sec:900,video_status:1,video_size:12003100,video_url:file://xxx/xxx,video_source_id:null,version_id:1,chapter_id:26305,course_id:39,publisher_id:99,create_time:2021-11-14 04:15:01,update_time:null,deleted:0}}
{database:edu,table:video_info,type:bootstrap-insert,ts:1645429973,data:{id:5410,video_name:day20_11复习_总结.avi,during_sec:900,video_status:1,video_size:12003100,video_url:file://xxx/xxx,video_source_id:null,version_id:1,chapter_id:26305,course_id:39,publisher_id:99,create_time:2021-11-14 04:15:01,update_time:null,deleted:0}
}
02、KafkaUtil.java
package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSourceString getKafkaConsumer(String topic, String groupId) {return KafkaSource.Stringbuilder()// 必要参数.setBootstrapServers(node001:9092).setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchemaString() {Overridepublic String deserialize(byte[] message) throws IOException {if (message ! null message.length ! 0) {return new String(message);}return null;}Overridepublic boolean isEndOfStream(String nextElement) {return false;}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();}
}
03、DimSinkApp.java
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunctionString, JSONObject() {
// Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunctionJSONObject() {
// Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type jsonObject.getString(type);
// if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
P020 flinkCDC监控mysql中的binlog。 {before:null,after:{source_table:base_category_info,sink_table:dim_base_category_info,sink_columns:id,category_name,create_time,update_time,deleted,sink_pk:id,sink_extend:null},source:{version:1.6.4.Final,connector:mysql,name:mysql_binlog_source,ts_ms:0,snapshot:false,db:edu_config,sequence:null,table:table_process,server_id:0,gtid:null,file:,pos:0,row:0,thread:null,query:null},op:r,ts_ms:1695262804254,transaction:null}
{before:null, # 被修改之前的数据after:{ # 被修改之后的数据source_table:base_category_info,sink_table:dim_base_category_info,sink_columns:id,category_name,create_time,update_time,deleted,sink_pk:id,sink_extend:null},source:{ # 数据来源version:1.6.4.Final,connector:mysql,name:mysql_binlog_source,ts_ms:0,snapshot:false,db:edu_config,sequence:null,table:table_process,server_id:0,gtid:null,file:,pos:0,row:0,thread:null,query:null},op:r, # optionr修改ts_ms:1695262804254,transaction:null
}
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunctionString, JSONObject() {
// Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunctionJSONObject() {
// Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type jsonObject.getString(type);
// if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(node001).port(3306).databaseList(edu_config) // set captured database.tableList(edu_config.table_process) // set captured table.username(root).password(123456)//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSourceString configDS env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), mysql_source);configDS.print();// TODO 5 将配置表数据创建为广播流// TODO 6 连接流合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
P021
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunctionString, JSONObject() {
// Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunctionJSONObject() {
// Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type jsonObject.getString(type);
// if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(node001).port(3306).databaseList(edu_config) // set captured database.tableList(edu_config.table_process) // set captured table.username(root).password(123456)//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSourceString configDS env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), mysql_source);configDS.print();// TODO 5 将配置表数据创建为广播流// key- 维度表名称value- mysql单行数据 使用javaBeanMapStateDescriptorString, DimTableProcess tableProcessState new MapStateDescriptor(table_process_state, String.class, DimTableProcess.class);BroadcastStreamString broadcastStream configDS.broadcast(tableProcessState);// TODO 6 连接流合并主流和广播流BroadcastConnectedStreamJSONObject, String connectCS jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new BroadcastProcessFunctionJSONObject, String, Object() {//处理主流Overridepublic void processElement(JSONObject jsonObject, BroadcastProcessFunctionJSONObject, String, Object.ReadOnlyContext readOnlyContext, CollectorObject collector) throws Exception {}//处理广播流Overridepublic void processBroadcastElement(String s, BroadcastProcessFunctionJSONObject, String, Object.Context context, CollectorObject collector) throws Exception {}});// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
package com.atguigu.edu.realtime.bean;import lombok.Data;Data
public class DimTableProcess {//来源表String sourceTable;//输出表String sinkTable;//输出字段String sinkColumns;//主键字段String sinkPk;//建表扩展String sinkExtend;
}
P022 8.3.2 根据MySQL的配置表动态进行分流 7自定义函数DimBroadcastFunction package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL
// eduDS.map(new MapFunctionString, JSONObject() {
// Override
// public JSONObject map(String value) throws Exception {
// return JSONObject.parseObject(value);
// }
// }).filter(new FilterFunctionJSONObject() {
// Override
// public boolean filter(JSONObject jsonObject) throws Exception {
// String type jsonObject.getString(type);
// if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) {
// return false;
// }
// return true;
// }
// });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(node001).port(3306).databaseList(edu_config) // set captured database.tableList(edu_config.table_process) // set captured table.username(root).password(123456)//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSourceString configDS env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), mysql_source);configDS.print();// TODO 5 将配置表数据创建为广播流// key- 维度表名称value- mysql单行数据 使用javaBeanMapStateDescriptorString, DimTableProcess tableProcessState new MapStateDescriptor(table_process_state, String.class, DimTableProcess.class);BroadcastStreamString broadcastStream configDS.broadcast(tableProcessState);// TODO 6 连接流合并主流和广播流BroadcastConnectedStreamJSONObject, String connectCS jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new DimBroadcastProcessFunction(tableProcessState));// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunctionJSONObject, String, JSONObject {private MapStateDescriptorString, DimTableProcess tableProcessState;// 初始化配置表数据private HashMapString, DimTableProcess configMap new HashMap();public DimBroadcastProcessFunction(MapStateDescriptorString, DimTableProcess tableProcessState) {this.tableProcessState tableProcessState;}/*** param value flinkCDC直接输入的json* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** param value kafka中maxwell生成的json数据* param ctx* param out* throws Exception*/Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}
P023
package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunctionJSONObject, String, JSONObject {private MapStateDescriptorString, DimTableProcess tableProcessState;// 初始化配置表数据private HashMapString, DimTableProcess configMap new HashMap();public DimBroadcastProcessFunction(MapStateDescriptorString, DimTableProcess tableProcessState) {this.tableProcessState tableProcessState;}Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection DriverManager.getConnection(jdbc:mysql://node001:3306/edu_config? userrootpassword123456useUnicodetrue characterEncodingutf8serverTimeZoneAsia/ShanghaiuseSSLfalse);PreparedStatement preparedStatement connection.prepareStatement(select * from edu_config.table_process);ResultSet resultSet preparedStatement.executeQuery();ResultSetMetaData metaData resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject new JSONObject();for (int i 1; i metaData.getColumnCount(); i) {String columnName metaData.getColumnName(i);String columnValue resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** param value flinkCDC直接输入的json* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** param value kafka中maxwell生成的json数据* param ctx* param out* throws Exception*/Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}