花生壳做的网站,网站利润分析,大学生网站设计作品成品代码,网站设计 网站建设使用Apache Flink实现实时数据同步与清洗#xff1a;MySQL和Oracle到目标MySQL的ETL流程
实现数据同步的ETL#xff08;抽取、转换、加载#xff09;过程通常涉及从源系统#xff08;如数据库、消息队列或文件#xff09;中抽取数据#xff0c;进行必要的转换#xff0…使用Apache Flink实现实时数据同步与清洗MySQL和Oracle到目标MySQL的ETL流程
实现数据同步的ETL抽取、转换、加载过程通常涉及从源系统如数据库、消息队列或文件中抽取数据进行必要的转换然后将数据加载到目标系统如另一个数据库或数据仓库。在这里我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。
1. 从源数据库(MySQL和Oracle)实时抽取数据。2. 对数据进行清洗和转换。3. 将转换后的数据写入目标数据库(MySQL)。
我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力适合处理实时数据同步和转换任务。
环境准备
确保MySQL和Oracle数据库运行**并创建相应的表。创建Spring Boot项目并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。
第一步创建源和目标数据库表
假设我们有以下三个表
source_mysql_tableMySQL中的源表source_oracle_tableOracle中的源表target_table目标MySQL表
MySQL源表
CREATE DATABASE source_mysql_db;
USE source_mysql_db;CREATE TABLE source_mysql_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL
);Oracle源表
CREATE TABLE source_oracle_table (id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY,user_id VARCHAR2(255) NOT NULL,action VARCHAR2(255) NOT NULL,timestamp VARCHAR2(255) NOT NULL,PRIMARY KEY (id)
);目标MySQL表
CREATE DATABASE target_db;
USE target_db;CREATE TABLE target_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL
);第二步添加项目依赖
在pom.xml中添加Flink、MySQL和Oracle相关的依赖
dependencies!-- Spring Boot dependencies --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependency!-- Apache Flink dependencies --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.12/artifactIdversion1.14.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion1.14.0/version/dependency!-- MySQL JDBC driver --dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.23/version/dependency!-- Oracle JDBC driver --dependencygroupIdcom.oracle.database.jdbc/groupIdartifactIdojdbc8/artifactIdversion19.8.0.0/version/dependency
/dependencies第三步编写Flink ETL任务
创建一个Flink任务类来实现ETL逻辑。
创建一个POJO类表示数据结构
package com.example.flink;public class UserAction {private int id;private String userId;private String action;private String timestamp;// Getters and setterspublic int getId() {return id;}public void setId(int id) {this.id id;}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId userId;}public String getAction() {return action;}public void setAction(String action) {this.action action;}public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp timestamp;}
}编写Flink任务类
package com.example.flink;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;Component
public class FlinkETLJob implements CommandLineRunner {Overridepublic void run(String... args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 从MySQL读取数据DataStreamUserAction mysqlDataStream env.addSource(new MySQLSource());// 从Oracle读取数据DataStreamUserAction oracleDataStream env.addSource(new OracleSource());// 合并两个数据流DataStreamUserAction mergedStream mysqlDataStream.union(oracleDataStream);// 清洗和转换数据DataStreamUserAction transformedStream mergedStream.map(new MapFunctionUserAction, UserAction() {Overridepublic UserAction map(UserAction value) throws Exception {// 进行清洗和转换value.setAction(value.getAction().toUpperCase());return value;}});// 将数据写入目标MySQL数据库transformedStream.addSink(new MySQLSink());// 执行任务env.execute(Flink ETL Job);}public static class MySQLSource implements SourceFunctionUserAction {private static final String JDBC_URL jdbc:mysql://localhost:3306/source_mysql_db;private static final String JDBC_USER source_user;private static final String JDBC_PASSWORD source_password;private volatile boolean isRunning true;Overridepublic void run(SourceContextUserAction ctx) throws Exception {try (Connection connection DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql SELECT * FROM source_mysql_table;try (PreparedStatement statement connection.prepareStatement(sql);ResultSet resultSet statement.executeQuery()) {while (resultSet.next()) {UserAction userAction new UserAction();userAction.setId(resultSet.getInt(id));userAction.setUserId(resultSet.getString(user_id));userAction.setAction(resultSet.getString(action));userAction.setTimestamp(resultSet.getString(timestamp));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流每5秒查询一次}}}Overridepublic void cancel() {isRunning false;}}public static class OracleSource implements SourceFunctionUserAction {private static final String JDBC_URL jdbc:oracle:thin:localhost:1521:orcl;private static final String JDBC_USER source_user;private static final String JDBC_PASSWORD source_password;private volatile boolean isRunning true;Overridepublic void run(SourceContextUserAction ctx) throws Exception {try (Connection connection DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql SELECT * FROM source_oracle_table;try (PreparedStatement statement connection.prepareStatement(sql);ResultSet resultSet statement.executeQuery()) {while (resultSet.next()) {UserAction userAction new UserAction();userAction.setId(resultSet.getInt(id));userAction.setUserId(resultSet.getString(user_id));userAction.setAction(resultSet.getString(action));userAction.setTimestamp(resultSet.getString(timestamp));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流每5秒查询一次}}}Overridepublic void cancel() {isRunning false;}}public static class MySQLSink extends RichFlatMapFunctionUserAction, Void {private static final String JDBC_URL jdbc:mysql://localhost:3306/target_db;private static final String JDBC_USER target_user;private static final String JDBC_PASSWORD target_password;private transient Connection connection;private transient PreparedStatement statement;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);String sql INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?);statement connection.prepareStatement(sql);}Overridepublic void flatMap(UserAction value, CollectorVoid out) throws Exception {statement.setString(1, value.getUserId());statement.setString(2, value.getAction());statement.setString(3, value.getTimestamp());statement.executeUpdate();}Overridepublic void close() throws Exception {super.close();if (statement ! null) {statement.close();}if (connection ! null) {connection.close();}}}
}第四步配置Spring Boot
在application.properties中添加必要的配置
# Spring Boot configuration
server.port8080第五步运行和测试
启动MySQL和Oracle数据库确保你的源和目标数据库已经运行并且创建了相应的数据库和表。启动Spring Boot应用启动Spring Boot应用程序会自动运行Flink ETL任务。测试Flink ETL任务插入一些数据到源数据库的表中验证数据是否同步到目标数据库的表中。
总结
通过上述步骤你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据进行数据清洗和转换并将结果加载到目标MySQL数据库中。根据你的具体需求你可以扩展和修改这个示例处理更复杂的数据转换和加载逻辑。