阿里云服务器如何配置多网站,建设工程分包合同,用html做简单网站,财经大学网站建设手把手教学#xff0c;flink connector打通clickhouse大数据库#xff0c;通过下发flink sql#xff0c;来使用ck。
组件版本jdk1.8flink1.17.2clickhouse23.12.2.59
1.背景
flink官方不支持clickhouse连接器#xff0c;工作中难免会用到。
2.方案
利用GitHub大佬提供…手把手教学flink connector打通clickhouse大数据库通过下发flink sql来使用ck。
组件版本jdk1.8flink1.17.2clickhouse23.12.2.59
1.背景
flink官方不支持clickhouse连接器工作中难免会用到。
2.方案
利用GitHub大佬提供的源代码我用的是release-1.16https://github.com/itinycheng/flink-connector-clickhouse/tree/release-1.16
3.编译
导入IDEAmaven编译即可生成flink-connector-clickhouse-1.16.0-SNAPSHOT.jar 4.将此依赖包导入flink工程
spring boot工程
4.1pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersion
!-- parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.13/versionrelativePath/ lt;!ndash; lookup parent from repository ndash;gt;/parent--parentgroupIdcom.mit.microgrid/groupIdartifactIdmit-microgrid/artifactIdversion${project.build.version}/version/parentartifactIdmit-microgrid-flink/artifactIdnamemit-microgrid-flink/namedescriptionflink connector clickhouse/descriptionpropertiesjava.version1.8/java.versionflink.version1.17.2/flink.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId!-- 排除SpringBoot自带的日志依赖 --exclusionsexclusiongroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-logging/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-config/artifactId/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-discovery/artifactId/dependency!--flink--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scopeexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactId/exclusion/exclusions/dependency!--flink connector--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion${flink.version}/version/dependency!--flink connector clickhouse--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-clickhouse/artifactIdversion1.16.0-SNAPSHOT/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactId
!-- artifactIdflink-clients_2.12/artifactId--version${flink.version}/version
!-- scopeprovided/scope--/dependency!-- flink sql --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-loader/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java/artifactIdversion${flink.version}/version/dependency!-- Flink JDBC Connector --
!-- dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_2.12/artifactIdversion1.14.6/version lt;!ndash; 与您的Flink版本匹配 ndash;gt;/dependency--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.2-1.17/version/dependency!-- ClickHouse JDBC Driver --dependencygroupIdru.yandex.clickhouse/groupIdartifactIdclickhouse-jdbc/artifactIdversion0.3.2/version !-- 请根据实际情况选择最新稳定版本 --/dependency!-- 添加clickhouse-maven依赖--dependencygroupIdru.ivi.opensource/groupIdartifactIdflink-clickhouse-sink/artifactIdversion1.2.0/version/dependency!--module--dependencygroupIdcom.mit.microgrid/groupIdartifactIdmit-microgrid-common-core/artifactIdversion${project.build.version}/version/dependencydependencygroupIdcom.mit.microgrid/groupIdartifactIdmit-microgrid-api-history/artifactIdversion${project.build.version}/version/dependency!--sql parse--dependencygroupIdorg.apache.calcite/groupIdartifactIdcalcite-core/artifactIdversion1.37.0/version/dependency
!-- dependencygroupIdorg.apache.calcite/groupIdartifactIdcalcite-server/artifactIdversion1.37.0/version/dependency--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-sql-parser/artifactIdversion${flink.version}/version/dependency!--mysql--dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.30/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-uber/artifactIdversion1.17.2/version/dependencydependencygroupIdorg.apache.commons/groupIdartifactIdcommons-text/artifactIdversion1.12.0/version/dependency/dependenciesbuild!--pluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdconfigurationexcludesexcludegroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/exclude/excludes/configurationexecutionsexecutiongoalsgoalrepackage/goal/goals/execution/executions/plugin--
!-- plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.2.4/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludelog4j:*/exclude/excludes/artifactSetfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformers combine.childrenappendtransformerimplementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer/transformer/transformers/configuration/execution/executions/plugin--
!-- /plugins--finalName${project.artifactId}/finalNamepluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdversion2.7.3/versionconfigurationmainClasscom.mit.microgrid.flink.MitMicrogridFlinkApplication/mainClassforktrue/forklayoutZIP/layoutincludeSystemScopetrue/includeSystemScope/configurationexecutionsexecutiongoalsgoalrepackage/goal/goalsconfigurationclassifier-with-dependencies/classifier/configuration/execution/executions/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-jar-plugin/artifactIdversion3.3.0/versionconfigurationarchiveaddMavenDescriptorfalse/addMavenDescriptormanifestmainClasscom.mit.microgrid.flink.MitMicrogridFlinkApplication/mainClassaddClasspathtrue/addClasspathclasspathPrefixlib//classpathPrefix/manifest/archive/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.3.0/versionconfigurationdescriptorsdescriptorsrc/main/resources/assembly/assembly.xml/descriptor/descriptorsoutputDirectory./../out/outputDirectory/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins/build/project4.2)核心方法 /*** multiple sql execute** param sqlList*/public static JobClient flinkSqlJobClientMultiple(ListString sqlList) {log.info(参数sqlList: {}, sqlList);
// StreamExecutionEnvironment sEnv StreamExecutionEnvironment.getExecutionEnvironment();
// StreamTableEnvironment tEnv StreamTableEnvironment.create(sEnv);EnvironmentSettings setting EnvironmentSettings.newInstance().inBatchMode().build();TableEnvironment tEnv TableEnvironment.create(setting);if (CollectionUtil.isNullOrEmpty(sqlList)) {log.warn(sqlList参数为空);return null;}for (String s : sqlList) {TableResult tableResult tEnv.executeSql(s);OptionalJobClient jobClientOptional tableResult.getJobClient();if (jobClientOptional.isPresent()) {JobClient jobClient jobClientOptional.get();log.info(jobClient: jobClient);return jobClient;}}log.error(没有可执行的job);return null;}
5.源码地址
https://github.com/genghongsheng0/mit-microgrid-flink