当前位置: 首页 > news >正文

上海优化网站排名河津网站制作

上海优化网站排名,河津网站制作,旅游网站开发背景意义,济南企业网站设计公司文章目录 一、样例一#xff1a;读 csv 文件生成 csv 文件二、样例二#xff1a;读 starrocks 写 starrocks三、样例三#xff1a;DataSet、Table Sql 处理后写入 StarRocks四、遇到的坑 dependencygroupIdorg.apache.flink/groupIdartifactId读 csv 文件生成 csv 文件二、样例二读 starrocks 写 starrocks三、样例三DataSet、Table Sql 处理后写入 StarRocks四、遇到的坑 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.9.1/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.11/artifactIdversion1.9.1/version/dependency!--使用Java编程语言支持DataStream / DataSet API的TableSQL API--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_2.11/artifactIdversion1.9.1/version!--scopeprovided/scope--/dependency!--表程序规划器和运行时--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.11/artifactIdversion1.9.1/version!--scopeprovided/scope--/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-jdbc_2.11/artifactIdversion1.9.1/version!--scopeprovided/scope--/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.16.18/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.49/version/dependency一、样例一读 csv 文件生成 csv 文件 参考3Flink学习- Table API SQL编程 import lombok.Data; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.sinks.CsvTableSink;public class SQLWordCount {public static void main(String[] args) throws Exception {// 1、获取执行环境 ExecutionEnvironment 批处理用这个对象ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv BatchTableEnvironment.create(env); // DataSetWC input env.fromElements( // WC.of(hello, 1), // WC.of(hqs, 1), // WC.of(world, 1), // WC.of(hello, 1) // );// 注册数据集 // tEnv.registerDataSet(WordCount, input, word, frequency);// 2、加载数据源到 DataSetDataSetStudent csv env.readCsvFile(D:\\tmp\\data.csv).ignoreFirstLine().pojoType(Student.class, name, age);// 3、将DataSet装换为TableTable students bTableEnv.fromDataSet(csv);bTableEnv.registerTable(student, students);// 4、注册student表Table result bTableEnv.sqlQuery(select name,age from student);result.printSchema();DataSetStudent dset bTableEnv.toDataSet(result, Student.class);System.out.println(count-- dset.count());dset.print();// 5、sink输出CsvTableSink sink1 new CsvTableSink(D:\\tmp\\result.csv, ,, 1, FileSystem.WriteMode.OVERWRITE);String[] fieldNames {name, age};TypeInformation[] fieldTypes {Types.STRING, Types.INT};bTableEnv.registerTableSink(CsvOutPutTable, fieldNames, fieldTypes, sink1);result.insertInto(CsvOutPutTable);env.execute(SQL-Batch);}Datapublic static class Student {private String name;private int age;} }准备测试文件 data.csv name,age zhangsan,23 lisi,43 wangwu,12运行程序后会生成 D:\\tmp\\result.csv 文件。 二、样例二读 starrocks 写 starrocks import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat JDBCInputFormat.buildJDBCInputFormat().setDrivername(com.mysql.jdbc.Driver).setDBUrl(jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncodingutf8).setUsername(root).setPassword().setQuery(select * from student).setRowTypeInfo(rowTypeInfo).finish();final ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 方式一DataSource s env.createInput(jdbcInputFormat);s.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername(com.mysql.jdbc.Driver).setDBUrl(jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncodingutf8).setUsername(root).setPassword().setQuery(insert into student values(?, ?)).finish());// 方式二 // DataSetRow dataSource env.createInput(jdbcInputFormat); // // dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat() // .setDrivername(com.mysql.jdbc.Driver) // .setDBUrl(jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncodingutf8) // .setUsername(root).setPassword() // .setQuery(insert into student values(?, ?)) // .finish() // );env.execute(SQL-Batch);} }数据准备 CREATE TABLE student (name STRING,age INT ) ENGINEOLAP DUPLICATE KEY(name) DISTRIBUTED BY RANDOM PROPERTIES ( compression LZ4, fast_schema_evolution false, replicated_storage true, replication_num 1 );insert into student values(zhangsan, 23);参考 flink 读取mysql源 JDBCInputFormat、自定义数据源 flink1.10中三种数据处理方式的连接器说明 flink读写MySQL的两种方式 注意如果运行 java -cp flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.xiaoqiang.app.SQLWordCount 时报错Exception in thread main com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf jar:file:flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer} 解决报错Flink Could not resolve substitution to a value: ${akka.stream.materializer} buildplugins!-- Java Compiler --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.1/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion2.3/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationtransformers!--transformer implementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClassflink.KafkaDemo1/mainClass/transformer--transformer implementationorg.apache.maven.plugins.shade.resource.AppendingTransformerresourcereference.conf/resource/transformer/transformers/configuration/execution/executions/plugin/plugins/build三、样例三DataSet、Table Sql 处理后写入 StarRocks import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat JDBCInputFormat.buildJDBCInputFormat().setDrivername(com.mysql.jdbc.Driver).setDBUrl(jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncodingutf8).setUsername(root).setPassword().setQuery(select * from student).setRowTypeInfo(rowTypeInfo).finish();ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv BatchTableEnvironment.create(env);DataSetRow dataSource env.createInput(jdbcInputFormat);dataSource.print();Table students bTableEnv.fromDataSet(dataSource);bTableEnv.registerTable(student, students);Table result bTableEnv.sqlQuery(select name, age from (select f0 as name, f1 as age from student) group by name, age);result.printSchema();DataSetRow dset bTableEnv.toDataSet(result, Row.class);dset.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername(com.mysql.jdbc.Driver).setDBUrl(jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncodingutf8).setUsername(root).setPassword().setQuery(insert into student values(?, ?)).finish());env.execute(SQL-Batch);} }四、遇到的坑 坑1Bang equal ! is not allowed under the current SQL conformance level   解决将 sql 中的 ! 修改为 坑2java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to execute(), count(), collect(), or print().   解释在最后一行代码 env.execute() 执行的时候没有新的数据接收器被定义对于 Flink 批处理而前一行代码 result.print() 已经触发了代码的执行和输出所以再执行 env.execute()就是多余的了因此报了上面的异常。   解决方法去掉最后一行代码 env.execute(); 就可以了。
http://www.dnsts.com.cn/news/144105.html

相关文章:

  • wordpress手机端发布软件seo哪家公司好
  • 给网站添加百度地图服装网站建设竞争对手调查分析
  • 鱼台网站建设学院评估 网站建设整改
  • 哪个网站可以做平面兼职电商网站开发工具
  • 株洲网站开发公司电话北京视频网站建设
  • 自己电脑做网站服务器小工具如何设计一个网页步骤
  • 简速做网站工作室代销网站源码
  • dw建设网站教案企业展厅设计设计公司
  • 网站后台不能上传图片公司变更名字需要什么手续
  • php电子商务网站模板公司网站后台登陆
  • 免费建网站广告语本机运行wordpress
  • 广州企业网站seo上海门户网站的亮点
  • 衡阳做网站的公司博山信息港
  • 如何推广自己的外贸网站wordpress 当前页码
  • 浙江乐清新闻今天网络公司优化关键词
  • 在线a视频网站一级a做爰片平台推广计划
  • 深圳响应式网站免费网页模板源代码
  • 网站模板目录扫描成都网页设计美工培训
  • 什么网站管理系统好百度云盘官网登录入口
  • 模板生成网站页面设计的对称方法包括哪几种形式
  • 免费发布推广信息网站会用wordpress建站
  • 代做毕业设计网站徐州做网站的培训机构
  • 公司有些网站打不开江西萍乡做网站公司
  • 宝塔建设网站wordpress 去掉meta
  • 网站建设陷阱网站建设的常用软件有哪些
  • 中国做网站正邦沈阳市官网
  • 北京seo网站开发wordpress降低sql查询
  • jsp网站开发期末大作业怎么做一个网站
  • 济南网站优化技术厂家建设app下载官网
  • 路桥网站设计php图书管理系统网站开发