当前位置: 首页 > news >正文

青岛网站制作公司设计类电子书网站

青岛网站制作公司,设计类电子书网站,做学校网站的内容,装修办公室装修设计系列文章目录 Hudi第一章#xff1a;编译安装 Hudi第二章#xff1a;集成Spark Hudi第二章#xff1a;集成Spark(二) Hudi第三章#xff1a;集成Flink 文章目录 系列文章目录前言一、环境准备1.上传并解压2.修改配置文件3.拷贝jar包4.启动sql-client1.启动hadoop2.启动ses…系列文章目录 Hudi第一章编译安装 Hudi第二章集成Spark Hudi第二章集成Spark(二) Hudi第三章集成Flink 文章目录 系列文章目录前言一、环境准备1.上传并解压2.修改配置文件3.拷贝jar包4.启动sql-client1.启动hadoop2.启动session3.启动sql-client 二、sql-client编码1.创建表2.插入数据3.查询数据4.更新数据5.流式插入 三、IDEA编码1.编写pom.xml2.编写demo 总结 前言 之前的两次博客学习了hudi和spark的集成现在我们来学习hudi和flink的集成。 一、环境准备 1.上传并解压 2.修改配置文件 vim /opt/module/flink-1.13.6/conf/flink-conf.yaml 直接在最后追加即可。 classloader.check-leaked-classloader: false taskmanager.numberOfTaskSlots: 4state.backend: rocksdb execution.checkpointing.interval: 30000 state.checkpoints.dir: hdfs://hadoop102:8020/ckps state.backend.incremental: truesudo vim /etc/profile.d/my_env.sh export HADOOP_CLASSPATHhadoop classpath export HADOOP_CONF_DIR$HADOOP_HOME/etc/hadoopsource /etc/profile.d/my_env.sh 3.拷贝jar包 cp /opt/software/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar /opt/module/flink-1.13.6/lib/ cp /opt/module/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink-1.13.6/lib/ cp /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/module/flink-1.13.6/lib/4.启动sql-client 1.启动hadoop 2.启动session /opt/module/flink-1.13.6/bin/yarn-session.sh -d3.启动sql-client bin/sql-client.sh embedded -s yarn-session启动成功后可以在web端看一下。 也可以跳转到flink的webui。 现在我们就可以在终端写代码了。 二、sql-client编码 1.创建表 CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),partition VARCHAR(20) ) PARTITIONED BY (partition) WITH (connector hudi,path hdfs://hadoop102:8020/tmp/hudi_flink/t1,table.type MERGE_ON_READ );2.插入数据 INSERT INTO t1 VALUES(id1,Danny,23,TIMESTAMP 1970-01-01 00:00:01,par1),(id2,Stephen,33,TIMESTAMP 1970-01-01 00:00:02,par1),(id3,Julian,53,TIMESTAMP 1970-01-01 00:00:03,par2),(id4,Fabian,31,TIMESTAMP 1970-01-01 00:00:04,par2),(id5,Sophia,18,TIMESTAMP 1970-01-01 00:00:05,par3),(id6,Emma,20,TIMESTAMP 1970-01-01 00:00:06,par3),(id7,Bob,44,TIMESTAMP 1970-01-01 00:00:07,par4),(id8,Han,56,TIMESTAMP 1970-01-01 00:00:08,par4);3.查询数据 我们先更改一下表格式默认的看得可能不习惯。 set sql-client.execution.result-modetableau; select * from t1;4.更新数据 前面说过hudi的更新操作就是插入一条主键相同的新数据由更新的ts来覆盖旧的。 insert into t1 values(id1,Danny,27,TIMESTAMP 1970-01-02 00:00:01,par1);可以看到数据已经完成了更新。 5.流式插入 flink最常用的还是流式数据的处理。 CREATE TABLE sourceT (uuid varchar(20),name varchar(10),age int,ts timestamp(3),partition varchar(20) ) WITH (connector datagen,rows-per-second 1 );create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),partition varchar(20) ) with (connector hudi,path /tmp/hudi_flink/t2,table.type MERGE_ON_READ );我们创建两张表第一张的连接器是datagen可以用来流式的生产数据。第二张表是正常的hudi表。 insert into t2 select * from sourceT;我们可以在webui看一下。 因为是流式处理所以这个进程是不会停止的。 select * from t2 limit 10;再查看一次 我们会发现是不断有数据产生。 三、IDEA编码 我们需要将编译好的一个包拉到本地。 然后将他倒入maven仓库 mvn install:install-file -DgroupIdorg.apache.hudi -DartifactIdhudi-flink_2.12 -Dversion0.12.0 -Dpackagingjar -Dfile./hudi-flink1.13-bundle-0.12.0.jar1.编写pom.xml ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu.hudi/groupIdartifactIdflink-hudi-demo/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink.version1.13.6/flink.versionhudi.version0.12.0/hudi.versionjava.version1.8/java.versionscala.binary.version2.12/scala.binary.versionslf4j.version1.7.30/slf4j.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scope !--不会打包到依赖中只参与编译不参与运行 --/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!--idea运行时也有webui--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion${slf4j.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.1.3/versionscopeprovided/scope/dependency!--手动install到本地maven仓库--dependencygroupIdorg.apache.hudi/groupIdartifactIdhudi-flink_2.12/artifactIdversion${hudi.version}/versionscopeprovided/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.2.4/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludelog4j:*/excludeexcludeorg.apache.hadoop:*/exclude/excludes/artifactSetfiltersfilter!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --artifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformers combine.childrenappendtransformer implementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer/transformer/transformers/configuration/execution/executions/plugin/plugins/build /project2.编写demo HudiDemo.java 一个简单的流式数据处理和刚刚一样。 package com.atguigu.hudi.flink;import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class HudiDemo {public static void main(String[] args) {System.setProperty(HADOOP_USER_NAME, atguigu);StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());// 设置状态后端RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend new EmbeddedRocksDBStateBackend(true);embeddedRocksDBStateBackend.setDbStoragePath(/home/chaoge/Downloads/hudi);embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);// checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig env.getCheckpointConfig();checkpointConfig.setCheckpointStorage(hdfs://hadoop102:8020/ckps);checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);StreamTableEnvironment sTableEnv StreamTableEnvironment.create(env);sTableEnv.executeSql(CREATE TABLE sourceT (\n uuid varchar(20),\n name varchar(10),\n age int,\n ts timestamp(3),\n partition varchar(20)\n ) WITH (\n connector datagen,\n rows-per-second 1\n ));sTableEnv.executeSql(create table t2(\n uuid varchar(20),\n name varchar(10),\n age int,\n ts timestamp(3),\n partition varchar(20)\n )\n with (\n connector hudi,\n path hdfs://hadoop102:8020/tmp/hudi_idea/t2,\n table.type MERGE_ON_READ\n ));sTableEnv.executeSql(insert into t2 select * from sourceT);} }当我们运行的时候可以再本地webui查看。 127.0.0.1:8081/ 也可以在hdfs路径看一下。 总结 flink第一次就先写到这里剩下的还要在写一次。
http://www.dnsts.com.cn/news/88972.html

相关文章:

  • 网站开通支付宝收款海南百度推广开户费用
  • 网站建设客户需求分析外贸网站解决方案
  • 网站建设佰金手指科杰十八如何做国外网站的镜像
  • 网站搭建多少钱徐州百都网络非常好天律网站建设
  • 网站备案与icp备案WordPress页面支持文件上传
  • 哪个网站做调查赚钱多成都到西安飞机
  • 东莞网站优化seo哪个网站做国内销海外的
  • 深圳哪个做网站好优化钓鱼网站在线生成器
  • 淘客网站要怎么做崇左市住房和城乡建设局网站
  • 莱特币做空国外网站旅游app界面设计
  • 网站的流量是怎么算的做网站小程序多少钱
  • 网站的用户体验主要有那些类型整站seoseo优化
  • 宝塔面板怎么搭建网站网站 php连接mysql 代码
  • 免费获取源码的网站后台风格网站
  • 有免费建站的网站吗优质手机网站建设推荐
  • 接工程网站潍坊做网站潍坊做网站
  • 网站建设需要多久才能学会免费海报背景素材
  • 江门好的建站网站网站版权信息的正确写法
  • 中小企业服务中心网站建设怎样在手机做自己的网站6
  • 网站开发的技术要求昨天军事新闻最新消息
  • 网站是不是每年都要续费绵阳做网站
  • 网站cn域名注册orchid wordpress
  • 国家建设标准网站公司网站后台维护
  • 怎样更新网站国内优秀网页设计网站
  • 建个企业网站需要多少钱合肥做的比较好的网站有那几家
  • 梅州市网站制作中国电信网上营业厅
  • 网站管理强化阵地建设网站建设 技术服务
  • 中国建设网站什么是网站推广?
  • 网站宣传策略网站建设策划书前言
  • 做pc端网站服务saas建站 cms