上海优化网站排名,河津网站制作,旅游网站开发背景意义,济南企业网站设计公司文章目录 一、样例一#xff1a;读 csv 文件生成 csv 文件二、样例二#xff1a;读 starrocks 写 starrocks三、样例三#xff1a;DataSet、Table Sql 处理后写入 StarRocks四、遇到的坑 dependencygroupIdorg.apache.flink/groupIdartifactId读 csv 文件生成 csv 文件二、样例二读 starrocks 写 starrocks三、样例三DataSet、Table Sql 处理后写入 StarRocks四、遇到的坑 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.9.1/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.11/artifactIdversion1.9.1/version/dependency!--使用Java编程语言支持DataStream / DataSet API的TableSQL API--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_2.11/artifactIdversion1.9.1/version!--scopeprovided/scope--/dependency!--表程序规划器和运行时--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.11/artifactIdversion1.9.1/version!--scopeprovided/scope--/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-jdbc_2.11/artifactIdversion1.9.1/version!--scopeprovided/scope--/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.16.18/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.49/version/dependency一、样例一读 csv 文件生成 csv 文件 参考3Flink学习- Table API SQL编程
import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;public class SQLWordCount {public static void main(String[] args) throws Exception {// 1、获取执行环境 ExecutionEnvironment 批处理用这个对象ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv BatchTableEnvironment.create(env);
// DataSetWC input env.fromElements(
// WC.of(hello, 1),
// WC.of(hqs, 1),
// WC.of(world, 1),
// WC.of(hello, 1)
// );// 注册数据集
// tEnv.registerDataSet(WordCount, input, word, frequency);// 2、加载数据源到 DataSetDataSetStudent csv env.readCsvFile(D:\\tmp\\data.csv).ignoreFirstLine().pojoType(Student.class, name, age);// 3、将DataSet装换为TableTable students bTableEnv.fromDataSet(csv);bTableEnv.registerTable(student, students);// 4、注册student表Table result bTableEnv.sqlQuery(select name,age from student);result.printSchema();DataSetStudent dset bTableEnv.toDataSet(result, Student.class);System.out.println(count-- dset.count());dset.print();// 5、sink输出CsvTableSink sink1 new CsvTableSink(D:\\tmp\\result.csv, ,, 1, FileSystem.WriteMode.OVERWRITE);String[] fieldNames {name, age};TypeInformation[] fieldTypes {Types.STRING, Types.INT};bTableEnv.registerTableSink(CsvOutPutTable, fieldNames, fieldTypes, sink1);result.insertInto(CsvOutPutTable);env.execute(SQL-Batch);}Datapublic static class Student {private String name;private int age;}
}准备测试文件 data.csv
name,age
zhangsan,23
lisi,43
wangwu,12运行程序后会生成 D:\\tmp\\result.csv 文件。
二、样例二读 starrocks 写 starrocks
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat JDBCInputFormat.buildJDBCInputFormat().setDrivername(com.mysql.jdbc.Driver).setDBUrl(jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncodingutf8).setUsername(root).setPassword().setQuery(select * from student).setRowTypeInfo(rowTypeInfo).finish();final ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 方式一DataSource s env.createInput(jdbcInputFormat);s.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername(com.mysql.jdbc.Driver).setDBUrl(jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncodingutf8).setUsername(root).setPassword().setQuery(insert into student values(?, ?)).finish());// 方式二
// DataSetRow dataSource env.createInput(jdbcInputFormat);
//
// dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
// .setDrivername(com.mysql.jdbc.Driver)
// .setDBUrl(jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncodingutf8)
// .setUsername(root).setPassword()
// .setQuery(insert into student values(?, ?))
// .finish()
// );env.execute(SQL-Batch);}
}数据准备
CREATE TABLE student (name STRING,age INT
) ENGINEOLAP
DUPLICATE KEY(name)
DISTRIBUTED BY RANDOM
PROPERTIES (
compression LZ4,
fast_schema_evolution false,
replicated_storage true,
replication_num 1
);insert into student values(zhangsan, 23);参考 flink 读取mysql源 JDBCInputFormat、自定义数据源 flink1.10中三种数据处理方式的连接器说明 flink读写MySQL的两种方式
注意如果运行 java -cp flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.xiaoqiang.app.SQLWordCount 时报错Exception in thread main com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf jar:file:flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer} 解决报错Flink Could not resolve substitution to a value: ${akka.stream.materializer} buildplugins!-- Java Compiler --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.1/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion2.3/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationtransformers!--transformer implementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClassflink.KafkaDemo1/mainClass/transformer--transformer implementationorg.apache.maven.plugins.shade.resource.AppendingTransformerresourcereference.conf/resource/transformer/transformers/configuration/execution/executions/plugin/plugins/build三、样例三DataSet、Table Sql 处理后写入 StarRocks
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat JDBCInputFormat.buildJDBCInputFormat().setDrivername(com.mysql.jdbc.Driver).setDBUrl(jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncodingutf8).setUsername(root).setPassword().setQuery(select * from student).setRowTypeInfo(rowTypeInfo).finish();ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv BatchTableEnvironment.create(env);DataSetRow dataSource env.createInput(jdbcInputFormat);dataSource.print();Table students bTableEnv.fromDataSet(dataSource);bTableEnv.registerTable(student, students);Table result bTableEnv.sqlQuery(select name, age from (select f0 as name, f1 as age from student) group by name, age);result.printSchema();DataSetRow dset bTableEnv.toDataSet(result, Row.class);dset.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername(com.mysql.jdbc.Driver).setDBUrl(jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncodingutf8).setUsername(root).setPassword().setQuery(insert into student values(?, ?)).finish());env.execute(SQL-Batch);}
}四、遇到的坑 坑1Bang equal ! is not allowed under the current SQL conformance level 解决将 sql 中的 ! 修改为 坑2java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to execute(), count(), collect(), or print(). 解释在最后一行代码 env.execute() 执行的时候没有新的数据接收器被定义对于 Flink 批处理而前一行代码 result.print() 已经触发了代码的执行和输出所以再执行 env.execute()就是多余的了因此报了上面的异常。 解决方法去掉最后一行代码 env.execute(); 就可以了。