当前位置: 首页 > news >正文

一个空间 多个网站超云seo优化

一个空间 多个网站,超云seo优化,问卷调查网站哪个好,自己制作菜单的app视频地址#xff1a;尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录 第7章 数仓开发之ODS层 P015 第8章 数仓开发之DIM层 P016 P017 P018 P019 01、node001节点Linux命令 02、KafkaUtil.java 03、DimSinkApp.java P020 P021 P022 P023 第7章 数… 视频地址尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录 第7章 数仓开发之ODS层 P015 第8章 数仓开发之DIM层 P016 P017 P018 P019 01、node001节点Linux命令 02、KafkaUtil.java 03、DimSinkApp.java P020 P021 P022 P023 第7章 数仓开发之ODS层 P015 第7章 数仓开发之ODS层 采集到 Kafka 的 topic_log 和 topic_db 主题的数据即为实时数仓的 ODS 层这一层的作用是对数据做原样展示和备份。 8.2.2 动态拆分维度表功能 由于Maxwell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个维度表拆开处理。 在实时计算中一般把维度数据写入存储容器一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。 这样的配置不适合写在配置文件中因为这样的话业务端随着需求变化每增加一张维度表表就要修改配置重启计算程序。所以这里需要一种动态配置方案把这种配置长期保存起来一旦配置有变化实时计算可以自动感知。这种可以有三个方案实现 一种是用Zookeeper存储通过Watch感知数据变化 另一种是用mysql数据库存储周期性的同步 再一种是用mysql数据库存储使用广播流。 这里选择第三种方案主要是MySQL对于配置数据初始化和维护管理使用FlinkCDC读取配置信息表将配置流作为广播流与主流进行连接。 第8章 数仓开发之DIM层 P016 8.1.1 Flink CDC 基于 Flink SQL CDC的实时数据同步方案https://github.com/ververica/flink-cdc-connectors P017 8.2 主要任务 package com.atguigu.edu.realtime.app.dim;import com.atguigu.edu.realtime.util.EnvUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DimSinkApp {public static void main(String[] args) {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据//env.fromSource();// TODO 3 对主流数据进行ETL// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务} } package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class EnvUtil {/*** 环境准备及状态后端设置获取对应的环境** param parallelism Flink 程序的并行度* return Flink 流处理环境对象*/public static StreamExecutionEnvironment getExecutionEnvironment(Integer parallelism) {//TODO 1 环境创建准备StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//设置并发env.setParallelism(parallelism);//TODO 2 设置状态后端env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);//设置超时时间env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);//设置最小间隔时间env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.minutes(1)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(hdfs://node001:8020/edu/ck);System.setProperty(HADOOP_USER_NAME, atguigu);return env;} } P018 package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSourceString getKafkaConsumer(String topic, String groupId) {return KafkaSource.Stringbuilder()// 必要参数 // .setBootstrapServers(EduConfig.KAFKA_BOOTSTRAPS)//“node001:9092”.setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchemaString() {Overridepublic String deserialize(byte[] message) throws IOException {if (message ! null message.length ! 0) {return new String(message);}return null;}Overridepublic boolean isEndOfStream(String nextElement) {return false;}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();} } P019 01、node001节点Linux命令 [atguigunode001 bin]$ jpsallnode001 4803 QuorumPeerMain 5236 Kafka 7941 Maxwell 5350 Application 6726 ConsoleConsumer 4458 NodeManager 8810 Jps 4043 DataNode 3869 NameNode 4654 JobHistoryServernode002 3505 ResourceManager 4066 QuorumPeerMain 4490 Kafka 5179 Jps 3660 NodeManager 3263 DataNodenode003 3505 SecondaryNameNode 5777 Jps 4369 Application 4279 Kafka 4569 Application 3354 DataNode 3851 QuorumPeerMain 3659 NodeManager [atguigunode001 bin]$ 启动hadoop、maxwell、kafka。 [atguigunode001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic topic_db [atguigunode001 ~]$ cd ~/bin [atguigunode001 bin]$ mysql_to_kafka_init.sh al {database:edu,table:video_info,type:bootstrap-insert,ts:1645429973,data:{id:5410,video_name:day20_11复习_总结.avi,during_sec:900,video_status:1,video_size:12003100,video_url:file://xxx/xxx,video_source_id:null,version_id:1,chapter_id:26305,course_id:39,publisher_id:99,create_time:2021-11-14 04:15:01,update_time:null,deleted:0}} {database:edu,table:video_info,type:bootstrap-insert,ts:1645429973,data:{id:5410,video_name:day20_11复习_总结.avi,during_sec:900,video_status:1,video_size:12003100,video_url:file://xxx/xxx,video_source_id:null,version_id:1,chapter_id:26305,course_id:39,publisher_id:99,create_time:2021-11-14 04:15:01,update_time:null,deleted:0} } 02、KafkaUtil.java package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSourceString getKafkaConsumer(String topic, String groupId) {return KafkaSource.Stringbuilder()// 必要参数.setBootstrapServers(node001:9092).setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchemaString() {Overridepublic String deserialize(byte[] message) throws IOException {if (message ! null message.length ! 0) {return new String(message);}return null;}Overridepublic boolean isEndOfStream(String nextElement) {return false;}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();} } 03、DimSinkApp.java package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.util.EnvUtil; import com.atguigu.edu.realtime.util.KafkaUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL // eduDS.map(new MapFunctionString, JSONObject() { // Override // public JSONObject map(String value) throws Exception { // return JSONObject.parseObject(value); // } // }).filter(new FilterFunctionJSONObject() { // Override // public boolean filter(JSONObject jsonObject) throws Exception { // String type jsonObject.getString(type); // if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) { // return false; // } // return true; // } // });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();} } P020 flinkCDC监控mysql中的binlog。 {before:null,after:{source_table:base_category_info,sink_table:dim_base_category_info,sink_columns:id,category_name,create_time,update_time,deleted,sink_pk:id,sink_extend:null},source:{version:1.6.4.Final,connector:mysql,name:mysql_binlog_source,ts_ms:0,snapshot:false,db:edu_config,sequence:null,table:table_process,server_id:0,gtid:null,file:,pos:0,row:0,thread:null,query:null},op:r,ts_ms:1695262804254,transaction:null} {before:null, # 被修改之前的数据after:{ # 被修改之后的数据source_table:base_category_info,sink_table:dim_base_category_info,sink_columns:id,category_name,create_time,update_time,deleted,sink_pk:id,sink_extend:null},source:{ # 数据来源version:1.6.4.Final,connector:mysql,name:mysql_binlog_source,ts_ms:0,snapshot:false,db:edu_config,sequence:null,table:table_process,server_id:0,gtid:null,file:,pos:0,row:0,thread:null,query:null},op:r, # optionr修改ts_ms:1695262804254,transaction:null } package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.bean.DimTableProcess; import com.atguigu.edu.realtime.util.EnvUtil; import com.atguigu.edu.realtime.util.KafkaUtil; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL // eduDS.map(new MapFunctionString, JSONObject() { // Override // public JSONObject map(String value) throws Exception { // return JSONObject.parseObject(value); // } // }).filter(new FilterFunctionJSONObject() { // Override // public boolean filter(JSONObject jsonObject) throws Exception { // String type jsonObject.getString(type); // if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) { // return false; // } // return true; // } // });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(node001).port(3306).databaseList(edu_config) // set captured database.tableList(edu_config.table_process) // set captured table.username(root).password(123456)//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSourceString configDS env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), mysql_source);configDS.print();// TODO 5 将配置表数据创建为广播流// TODO 6 连接流合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();} } P021 package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.bean.DimTableProcess; import com.atguigu.edu.realtime.util.EnvUtil; import com.atguigu.edu.realtime.util.KafkaUtil; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL // eduDS.map(new MapFunctionString, JSONObject() { // Override // public JSONObject map(String value) throws Exception { // return JSONObject.parseObject(value); // } // }).filter(new FilterFunctionJSONObject() { // Override // public boolean filter(JSONObject jsonObject) throws Exception { // String type jsonObject.getString(type); // if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) { // return false; // } // return true; // } // });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(node001).port(3306).databaseList(edu_config) // set captured database.tableList(edu_config.table_process) // set captured table.username(root).password(123456)//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSourceString configDS env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), mysql_source);configDS.print();// TODO 5 将配置表数据创建为广播流// key- 维度表名称value- mysql单行数据 使用javaBeanMapStateDescriptorString, DimTableProcess tableProcessState new MapStateDescriptor(table_process_state, String.class, DimTableProcess.class);BroadcastStreamString broadcastStream configDS.broadcast(tableProcessState);// TODO 6 连接流合并主流和广播流BroadcastConnectedStreamJSONObject, String connectCS jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new BroadcastProcessFunctionJSONObject, String, Object() {//处理主流Overridepublic void processElement(JSONObject jsonObject, BroadcastProcessFunctionJSONObject, String, Object.ReadOnlyContext readOnlyContext, CollectorObject collector) throws Exception {}//处理广播流Overridepublic void processBroadcastElement(String s, BroadcastProcessFunctionJSONObject, String, Object.Context context, CollectorObject collector) throws Exception {}});// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();} } package com.atguigu.edu.realtime.bean;import lombok.Data;Data public class DimTableProcess {//来源表String sourceTable;//输出表String sinkTable;//输出字段String sinkColumns;//主键字段String sinkPk;//建表扩展String sinkExtend; } P022 8.3.2 根据MySQL的配置表动态进行分流 7自定义函数DimBroadcastFunction package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction; import com.atguigu.edu.realtime.bean.DimTableProcess; import com.atguigu.edu.realtime.util.EnvUtil; import com.atguigu.edu.realtime.util.KafkaUtil; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSourceString eduDS env.fromSource(KafkaUtil.getKafkaConsumer(topic_db, dim_sink_app),WatermarkStrategy.noWatermarks(),kafka_source);// TODO 3 对主流数据进行ETL // eduDS.map(new MapFunctionString, JSONObject() { // Override // public JSONObject map(String value) throws Exception { // return JSONObject.parseObject(value); // } // }).filter(new FilterFunctionJSONObject() { // Override // public boolean filter(JSONObject jsonObject) throws Exception { // String type jsonObject.getString(type); // if (type.equals(bootstrap-complete) || type.equals(bootstrap-start)) { // return false; // } // return true; // } // });SingleOutputStreamOperatorJSONObject jsonDS eduDS.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSON.parseObject(value);String type jsonObject.getString(type);if (!(type.equals(bootstrap-complete) || type.equals(bootstrap-start))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println(数据转换json错误...);}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(node001).port(3306).databaseList(edu_config) // set captured database.tableList(edu_config.table_process) // set captured table.username(root).password(123456)//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSourceString configDS env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), mysql_source);configDS.print();// TODO 5 将配置表数据创建为广播流// key- 维度表名称value- mysql单行数据 使用javaBeanMapStateDescriptorString, DimTableProcess tableProcessState new MapStateDescriptor(table_process_state, String.class, DimTableProcess.class);BroadcastStreamString broadcastStream configDS.broadcast(tableProcessState);// TODO 6 连接流合并主流和广播流BroadcastConnectedStreamJSONObject, String connectCS jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new DimBroadcastProcessFunction(tableProcessState));// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();} } package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.bean.DimTableProcess; import com.atguigu.edu.realtime.common.EduConfig; import com.atguigu.edu.realtime.util.DruidDSUtil; import com.atguigu.edu.realtime.util.PhoenixUtil; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;import java.sql.*; import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunctionJSONObject, String, JSONObject {private MapStateDescriptorString, DimTableProcess tableProcessState;// 初始化配置表数据private HashMapString, DimTableProcess configMap new HashMap();public DimBroadcastProcessFunction(MapStateDescriptorString, DimTableProcess tableProcessState) {this.tableProcessState tableProcessState;}/*** param value flinkCDC直接输入的json* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** param value kafka中maxwell生成的json数据* param ctx* param out* throws Exception*/Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段} } P023 package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.edu.realtime.bean.DimTableProcess; import com.atguigu.edu.realtime.common.EduConfig; import com.atguigu.edu.realtime.util.DruidDSUtil; import com.atguigu.edu.realtime.util.PhoenixUtil; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;import java.sql.*; import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunctionJSONObject, String, JSONObject {private MapStateDescriptorString, DimTableProcess tableProcessState;// 初始化配置表数据private HashMapString, DimTableProcess configMap new HashMap();public DimBroadcastProcessFunction(MapStateDescriptorString, DimTableProcess tableProcessState) {this.tableProcessState tableProcessState;}Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection DriverManager.getConnection(jdbc:mysql://node001:3306/edu_config? userrootpassword123456useUnicodetrue characterEncodingutf8serverTimeZoneAsia/ShanghaiuseSSLfalse);PreparedStatement preparedStatement connection.prepareStatement(select * from edu_config.table_process);ResultSet resultSet preparedStatement.executeQuery();ResultSetMetaData metaData resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject new JSONObject();for (int i 1; i metaData.getColumnCount(); i) {String columnName metaData.getColumnName(i);String columnValue resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** param value flinkCDC直接输入的json* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** param value kafka中maxwell生成的json数据* param ctx* param out* throws Exception*/Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, CollectorJSONObject out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段} }
http://www.dnsts.com.cn/news/151390.html

相关文章:

  • 做盘石信用认证网站做电商一件代发的网站
  • 中山网站定制公司西餐甜点网站建设
  • 网站建设技术合作合同辽宁省住房和城乡建设厅
  • 网站建设珠海 新盈科技做视频网站需要什么证
  • 网站建设中敬请期待 图片到哪里学平面设计
  • 网站后台网址忘记了 php专门做评测的网站有哪些
  • 网站开发方面知识外贸大楼
  • 个人网站制作源代码下载南宁博信网络技术有限公司
  • 用百度地图 做gis网站那家网站做照片书好
  • 网站管理与维护的优势idc网站是用什么语言做的
  • 深圳市建设局科技处网站购物商城网站建设
  • 网站建设 付款方式建筑单位企业资质
  • 网站建设方案 备案评价高的企业网站开发
  • 建设银行网站打不开 显示停止工作php网站开发实训总结
  • 排版的网站wordpress主题ruikedu
  • 建立网站的目的上海做网站服务商
  • 济南建站免费模板wordpress更多的模板
  • 餐饮网站建设规划书重庆外贸网站建设公司
  • 深圳哪里做网站龙城区建设局网站
  • 品牌设计包括哪些设计宁波网络优化seo
  • 专业建站公司怎么收费电脑系统网站建设
  • 做网站su软件网页设计手机端
  • 佛山模板建站软件网站一般费用
  • 合肥商务科技学校网站建设推广是干嘛的
  • 中国太空网站一键生成100个原创视频
  • 网站中页面链接怎么做的sem是什么?
  • 建设一个网站是不必须备案公司网站用什么cms系统
  • 域名注册和网站哪个好百度sem竞价推广pdf
  • 建站公司经营网站服务类型怎么选
  • 网站建设和网站编辑是什么工作双德网站建设