当前位置: 首页 > news >正文

花生壳做的网站网站利润分析

花生壳做的网站,网站利润分析,大学生网站设计作品成品代码,网站设计 网站建设使用Apache Flink实现实时数据同步与清洗#xff1a;MySQL和Oracle到目标MySQL的ETL流程 实现数据同步的ETL#xff08;抽取、转换、加载#xff09;过程通常涉及从源系统#xff08;如数据库、消息队列或文件#xff09;中抽取数据#xff0c;进行必要的转换#xff0…使用Apache Flink实现实时数据同步与清洗MySQL和Oracle到目标MySQL的ETL流程 实现数据同步的ETL抽取、转换、加载过程通常涉及从源系统如数据库、消息队列或文件中抽取数据进行必要的转换然后将数据加载到目标系统如另一个数据库或数据仓库。在这里我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。 1. 从源数据库(MySQL和Oracle)实时抽取数据。2. 对数据进行清洗和转换。3. 将转换后的数据写入目标数据库(MySQL)。 我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力适合处理实时数据同步和转换任务。 环境准备 确保MySQL和Oracle数据库运行**并创建相应的表。创建Spring Boot项目并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。 第一步创建源和目标数据库表 假设我们有以下三个表 source_mysql_tableMySQL中的源表source_oracle_tableOracle中的源表target_table目标MySQL表 MySQL源表 CREATE DATABASE source_mysql_db; USE source_mysql_db;CREATE TABLE source_mysql_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL );Oracle源表 CREATE TABLE source_oracle_table (id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY,user_id VARCHAR2(255) NOT NULL,action VARCHAR2(255) NOT NULL,timestamp VARCHAR2(255) NOT NULL,PRIMARY KEY (id) );目标MySQL表 CREATE DATABASE target_db; USE target_db;CREATE TABLE target_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL );第二步添加项目依赖 在pom.xml中添加Flink、MySQL和Oracle相关的依赖 dependencies!-- Spring Boot dependencies --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependency!-- Apache Flink dependencies --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.12/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion1.14.0/version/dependency!-- MySQL JDBC driver --dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.23/version/dependency!-- Oracle JDBC driver --dependencygroupIdcom.oracle.database.jdbc/groupIdartifactIdojdbc8/artifactIdversion19.8.0.0/version/dependency /dependencies第三步编写Flink ETL任务 创建一个Flink任务类来实现ETL逻辑。 创建一个POJO类表示数据结构 package com.example.flink;public class UserAction {private int id;private String userId;private String action;private String timestamp;// Getters and setterspublic int getId() {return id;}public void setId(int id) {this.id id;}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId userId;}public String getAction() {return action;}public void setAction(String action) {this.action action;}public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp timestamp;} }编写Flink任务类 package com.example.flink;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component;import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet;Component public class FlinkETLJob implements CommandLineRunner {Overridepublic void run(String... args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 从MySQL读取数据DataStreamUserAction mysqlDataStream env.addSource(new MySQLSource());// 从Oracle读取数据DataStreamUserAction oracleDataStream env.addSource(new OracleSource());// 合并两个数据流DataStreamUserAction mergedStream mysqlDataStream.union(oracleDataStream);// 清洗和转换数据DataStreamUserAction transformedStream mergedStream.map(new MapFunctionUserAction, UserAction() {Overridepublic UserAction map(UserAction value) throws Exception {// 进行清洗和转换value.setAction(value.getAction().toUpperCase());return value;}});// 将数据写入目标MySQL数据库transformedStream.addSink(new MySQLSink());// 执行任务env.execute(Flink ETL Job);}public static class MySQLSource implements SourceFunctionUserAction {private static final String JDBC_URL jdbc:mysql://localhost:3306/source_mysql_db;private static final String JDBC_USER source_user;private static final String JDBC_PASSWORD source_password;private volatile boolean isRunning true;Overridepublic void run(SourceContextUserAction ctx) throws Exception {try (Connection connection DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql SELECT * FROM source_mysql_table;try (PreparedStatement statement connection.prepareStatement(sql);ResultSet resultSet statement.executeQuery()) {while (resultSet.next()) {UserAction userAction new UserAction();userAction.setId(resultSet.getInt(id));userAction.setUserId(resultSet.getString(user_id));userAction.setAction(resultSet.getString(action));userAction.setTimestamp(resultSet.getString(timestamp));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流每5秒查询一次}}}Overridepublic void cancel() {isRunning false;}}public static class OracleSource implements SourceFunctionUserAction {private static final String JDBC_URL jdbc:oracle:thin:localhost:1521:orcl;private static final String JDBC_USER source_user;private static final String JDBC_PASSWORD source_password;private volatile boolean isRunning true;Overridepublic void run(SourceContextUserAction ctx) throws Exception {try (Connection connection DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql SELECT * FROM source_oracle_table;try (PreparedStatement statement connection.prepareStatement(sql);ResultSet resultSet statement.executeQuery()) {while (resultSet.next()) {UserAction userAction new UserAction();userAction.setId(resultSet.getInt(id));userAction.setUserId(resultSet.getString(user_id));userAction.setAction(resultSet.getString(action));userAction.setTimestamp(resultSet.getString(timestamp));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流每5秒查询一次}}}Overridepublic void cancel() {isRunning false;}}public static class MySQLSink extends RichFlatMapFunctionUserAction, Void {private static final String JDBC_URL jdbc:mysql://localhost:3306/target_db;private static final String JDBC_USER target_user;private static final String JDBC_PASSWORD target_password;private transient Connection connection;private transient PreparedStatement statement;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);String sql INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?);statement connection.prepareStatement(sql);}Overridepublic void flatMap(UserAction value, CollectorVoid out) throws Exception {statement.setString(1, value.getUserId());statement.setString(2, value.getAction());statement.setString(3, value.getTimestamp());statement.executeUpdate();}Overridepublic void close() throws Exception {super.close();if (statement ! null) {statement.close();}if (connection ! null) {connection.close();}}} }第四步配置Spring Boot 在application.properties中添加必要的配置 # Spring Boot configuration server.port8080第五步运行和测试 启动MySQL和Oracle数据库确保你的源和目标数据库已经运行并且创建了相应的数据库和表。启动Spring Boot应用启动Spring Boot应用程序会自动运行Flink ETL任务。测试Flink ETL任务插入一些数据到源数据库的表中验证数据是否同步到目标数据库的表中。 总结 通过上述步骤你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据进行数据清洗和转换并将结果加载到目标MySQL数据库中。根据你的具体需求你可以扩展和修改这个示例处理更复杂的数据转换和加载逻辑。
http://www.dnsts.com.cn/news/149010.html

相关文章:

  • 建设厅官方网站职称互联网推广服务
  • 电商做网站如何做网站霸屏
  • 网站建设小企业案例做动漫主题的网站
  • wordpress企业站烟台网站开发多少钱
  • 四川建设数据共享平台青海seo技术培训
  • 学校的网站如何建设最火爆的网页游戏
  • 金华建设网站备案查询
  • 网站开发计划和预算百度 网站 说明
  • app推广团队在线优化工具
  • 网站后台看不到部分内容在哪个网站可以一对一做汉教
  • 安陆网站建设wordpress用旧的编辑器
  • 手机模板网站下载岱岳区网站设计
  • 中国建设银行官网站汽车卡wordpress 采集微信公众号
  • 网站登录页面模板下载ftp上传不了wordpress
  • 可以做兼职笔译的网站如何做超市的网站
  • wordpress边栏添加标签云seo的含义是什么意思
  • 专业图库网站 西安杨凌网站开发
  • xp系统建设网站网页设计和网站开发有什么区别
  • 网站信息内容建设找谁做网站比较好
  • 上海行业网站建设钢材料 网站建设 中企动力
  • 正规网站建设咨询电话wordpress右侧广告
  • 做移门配件的网站安徽专业网站建设
  • 商务网站设计与制作wordpress tipton
  • 做网站和微信公众号需要多少钱宁波营销网站建设
  • php企业网站demo如何写wordpress插件
  • 药品在哪些网站做推广免费手游推广代理平台渠道
  • 最大的设计公司湖南关键词优化推荐
  • 河北省建设工程招标投标协会网站网页设计与制作教程第三版答案
  • 网站 在线支付功能用asp做的一个网站实例源代码
  • 网页设计制作音乐网站潮阳网站建设公司