网站服务器怎么选,手机与pc的网站开发,阳江网页设计培训试听,做网站宣传多少钱引言:目前flink的文章比较多,但一般都关注某一特定方面,很少有一个文章,从一个简单的例子入手,说清楚从编码、构建、部署全流程是怎么样的。所以编写本文,自己做个记录备查同时跟大家分享一下。本文以简单的mysql cdc为例展开说明。
环境说明:MySQL:5.7;flink:1.14.0…
引言:目前flink的文章比较多,但一般都关注某一特定方面,很少有一个文章,从一个简单的例子入手,说清楚从编码、构建、部署全流程是怎么样的。所以编写本文,自己做个记录备查同时跟大家分享一下。本文以简单的mysql cdc为例展开说明。
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。1.MySQL
1.1 创建数据库和测试数据
数据库脚本:
CREATE DATABASE `flinktest`;
USE `flinktest`;
CREATE TABLE `products` (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(255) NOT NULL,`description` varchar(512) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4;insert into `products`(`id`,`name`,`description`) values
(1,'aaa','aaaa'),
(2,'ccc','ccc'),
(3,'dd','ddd'),
(4,'eeee','eee'),
(5,'ffff','ffff'),
(6,'hhhh','hhhh'),
(7,'iiii','iiii'),
(8,'jjjj','jjjj');账号使用root就行。
1.2 开启binlog
参考:https://core815.blog.csdn.net/article/details/144233298
踩坑:测试过程中发现mysql 9.0一直无法获取更新的数据,最终使用的5.7。
2.编码
2.1 主要实现
package com.zl;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;import static com.mysql.cj.conf.PropertyKey.useSSL;public class MysqlExample {public static void main(String[] args) throws Exception {ListString SYNC_TABLES = Arrays.asList("flinktest.products");MySqlSourceString mySqlSource = MySqlSource.Stringbuilder().hostname("10.86.37.169").port(3306).databaseList("flinktest").tableList(String.join(",", SYNC_TABLES)).username("root").password("pwd").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();/// 配置flink访问页面-开始/* Configuration config = new Configuration();// 启用 Web UI,访问地址【http://ip:port】config.setBoolean("web.ui.enabled", true); config.setString(RestOptions.BIND_PORT,"8081");
// 这个使用jar直接运行可以,如果提交给yarn会报错,需要改为getExecutionEnvironment()StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);*////配置flink访问页面-结束StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/// 设置CK存储-开始(不需要可注释掉)// hadoop部署见:https://core815.blog.csdn.net/article/details/144022938// hdfs访问地址见:/home/hadoop-3.3.3/etc/hadoop/core-site.xmlenv.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000"+"/flinktest/");env.getCheckpointConfig().setCheckpointInterval(3000);/// 设置CK存储-结束// 如果不能正常读取mysql的binlog://①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);// ②可能是数据库ip、port、账号、密码错误。env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1).print();env.execute("Print MySQL Snapshot + Binlog");}}
2.2 依赖
project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/maven-v4_0_0.xsd"modelVersion4.0.0/modelVersiongroupIdcom.zl.flinkcdc/groupIdartifactIdFlickCDC/artifactIdpackagingjar/packagingversion1.0-SNAPSHOT/versionnameFlickCDC/namepropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingproject.reporting.outputEncodingUTF-8/project.reporting.outputEncodingmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink-version1.14.0/flink-versionflink-cdc-version2.4.0/flink-cdc-versionhadoop.version3.0.0/hadoop.versionslf4j.version1.7.25/slf4j.versionlog4j.version2.16.0/log4j.version/propertiesdependenciesdependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion3.8.1/versionscopetest/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink-version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.11/artifactIdversion${flink-version}/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion${flink-cdc-version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-shaded-guava/artifactIdversion30.1.1-jre-15.0/version/dependency!--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-shaded-guava/artifactIdversion18.0-13.0/version/dependency--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion${flink-version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.11/artifactIdversion${flink-version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink-version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web_2.11/artifactIdversion${flink-version}/version/dependency!-- hadoop相关依赖--dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion${hadoop.version}/versionscopeprovided/scopeexclusionsexclusionartifactIdcommons-cli/artifactIdgroupIdcommons-cli/groupId/exclusionexclusionartifactIdcommons-compress/artifactIdgroupIdorg.apache.commons/groupId/exclusionexclusionartifactIdguava/artifactIdgroupIdcom.google.guava/groupId/exclusionexclusionartifactIdjackson-annotations/artifactIdgroupIdcom.fasterxml.jackson.core/groupId/exclusionexclusionartifactIdjackson-core/artifactIdgroupIdcom.fasterxml.jackson.core/groupId/exclusionexclusionartifactIdjackson-databind/artifactIdgroupIdcom.fasterxml.jackson.core/groupId/exclusionexclusionartifactIdslf4j-api/artifactIdgroupIdorg.slf4j/groupId/exclusion/exclusions/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion${hadoop.version}/versionscopeprovided/scopeexclusionsexclusionartifactIdasm/artifactIdgroupIdorg.ow2.asm/groupId/exclusionexclusionartifactIdavro/artifactIdgroupIdorg.apache.avro/groupId/exclusionexclusionartifactIdcommons-cli/artifactIdgroupIdcommons-cli/groupId/exclusionexclusionartifactIdcommons-codec/artifactIdgroupIdcommons-codec/groupId/exclusionexclusionartifactIdcommons-compress/artifactIdgroupIdorg.apache.commons/groupId/exclusionexclusionartifactIdcommons-io/artifactIdgroupIdcommons-io/groupId/exclusionexclusionartifactIdcommons-lang3/artifactIdgroupIdorg.apache.commons/groupId/exclusionexclusionartifactIdcommons-logging/artifactIdgroupIdcommons-logging/groupId/exclusionexclusionartifactIdcommons-math3/artifactIdgroupIdorg.apache.commons/groupId/exclusionexclusionartifactIdguava/artifactIdgroupIdcom.google.guava/groupId/exclusionexclusionartifactIdjackson-databind/artifactIdgroupIdcom.fasterxml.jackson.core/groupId/exclusionexclusionartifactIdjaxb-api/artifactIdgroupIdjavax.xml.bind/groupId/exclusionexclusionartifactIdlog4j/artifactIdgroupIdlog4j/groupId/exclusionexclusionartifactIdnimbus-jose-jwt/artifactIdgroupIdcom.nimbusds/groupId/exclusionexclusionartifactIdslf4j-api/artifactIdgroupIdorg.slf4j/groupId/exclusionexclusionartifactIdslf4j-log4j12/artifactIdgroupIdorg.slf4j/groupId/exclusionexclusionartifactIdzookeeper/artifactIdgroupIdorg.apache.zookeeper/groupId/exclusionexclusionartifactIdjsr305/artifactIdgroupIdcom.google.code.findbugs/groupId/exclusionexclusionartifactIdgson/artifactIdgroupIdcom.google.code.gson/groupId/exclusion/exclusions/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-hdfs/artifactIdversion${hadoop.version}/versionscopeprovided/scopeexclusionsexclusionartifactIdcommons-cli/artifactIdgroupIdcommons-cli/groupId/exclusionexclusionartifactIdguava/artifactIdgroupIdcom.google.guava/groupId/exclusionexclusionartifactIdjackson-databind/artifactIdgroupIdcom.fasterxml.jackson.core/groupId/exclusion/exclusions/dependencydependencygroupIdcommons-cli/groupIdartifactIdcommons-cli/artifactIdversion1.5.0/version/dependency!--mvn install:install-file -Dfile=D:/maven/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar-DgroupId=org.apache.flink -DartifactId=flink-shaded-hadoop-3 -Dversion=3.1.1.7.2.9.0-173-9.0 -Dpackaging=jar--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-shaded-hadoop-3/artifactIdversion3.1.1.7.2.9.0-173-9.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.0.0/versionconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefsarchivemanifestaddClasspathtrue/addClasspathmainClasscom.zl.MysqlExample/mainClass/manifest/archive/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins/build
/project完整代码见:https://gitee.com/core815/flink-cdc-mysql
3.打包
mvn版本:3.5.4。
到pom.xml所在路径,执行“mvn package”打包效果:4.jar直接运行
java -jar FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar5.flink yarn运行
hadoop、flink、yarn环境见:https://core815.blog.csdn.net/article/details/144022938
把FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar放到“/home”路径下。
执行下面命令:
flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcMysql" -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.MysqlExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar控制台看到如下打印:yarn管理页面:运行日志查看步骤:下面即可看到完整日志:6.常见问题
6.1 问题1
日志错误:
The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
解决:
修改my.cnf文件。
[mysqld]
default-time-zone=‘Asia/Shanghai’
重启MySQL服务。
6.2 问题2:hdfs
日志错误:
Permission denied: user=PC2023, access=WRITE, inode=“/”:root:supergroup:drwxr-xr-x
解决:
临时解决
hadoop fs -chmod -R 777 /6.3 问题3:guava30 guava18冲突
分析:
flink 1.13 cdc2.3的组合容易出这个问题。
解决:
参考:https://developer.aliyun.com/ask/574901
flink 使用1.14.0版本;cdc使用2.4.0版本。
6.4 问题4
日志错误:
/user/root/.flink/application_1733492706887_0002/log4j.properties could only be written to 0 of the 1 minReplication nodes
解决:
https://www.pianshen.com/article/1554707968/