当前位置: 首页 > news >正文

四川网站开发国家出台建设工程政策的网站

四川网站开发,国家出台建设工程政策的网站,网站管理员权限怎么设置,比较出名的文创产品​ 介绍Flink的安装、启动以及如何进行Flink程序的开发#xff0c;如何运行部署Flink程序等 2.1 Flink的安装和启动 本地安装指的是单机模式 0、前期准备 java8或者java11#xff08;官方推荐11#xff09;下载Flink安装包 https://flink.apache.org/zh/downloads/hadoop如何运行部署Flink程序等 2.1 Flink的安装和启动 本地安装指的是单机模式 0、前期准备 java8或者java11官方推荐11下载Flink安装包 https://flink.apache.org/zh/downloads/hadoop后面Flink on Yarn部署模式需要服务器我是使用虚拟机创建了三个centos的实例hadoop102、hadoop103、Hadoop104 1、本地安装单机 第一步解压 [roothadoop102 software]# tar -zxvf flink-1.17.1-bin-scala_2.12.tgz -C /opt/module/第二步启动 [roothadoop102 bin]# cd /opt/module/flink-1.17.1/bin [roothadoop102 bin]# ./start-cluster.sh Starting cluster. Starting standalonesession daemon on host hadoop102. Starting taskexecutor daemon on host hadoop102.查看进程 [roothadoop102 ~]# jps 24817 StandaloneSessionClusterEntrypoint 25330 Jps 25117 TaskManagerRunner有StandaloneSessionClusterEntrypoint和TaskManagerRunner就说明成功启动。 第三步提交作业 # 命令 ./flink run xxx.jarFlink提供了一些示例程序已经打成了jar包可直接运行 # 运行一个统计单词数量的Flink示例程序 [roothadoop102 bin]# ./flink run ../examples/streaming/WordCount.jar# 查看输出 [roothadoop102 bin]# tail ../log/flink-*-taskexecutor-*.out (nymph,1) (in,3) (thy,1) (orisons,1) (be,4) (all,2) (my,1) (sins,1) (remember,1) (d,4)第四步停止集群 [roothadoop102 bin]# ./stop-cluster.sh★★★ 在企业中单机模式无法支撑业务所以都是以集群的方式安装故后续内容都是以集群展开。 2、集群安装 0集群角色 为了更好理解安装配置过程这里先提一下Flink集群的几个关键组件 三个关键组件 客户端JobClient接收用户的代码并做一些转换会生成一个执行计划这个执行计划我们也叫数据流data flow然后发送给JobManager去进行下一步的执行执行完成后客户端会将结果返回给用户。客户端并不是Flink程序执行的内部组成部分但它是执行的起点。 JobManager主进程Flink集群的“管事人”对作业进行中央调度管理主要职责包括计划任务、管理检查点、故障恢复等。获取到要执行的作业后会做进一步的转换然后分发给众多的TaskManager。 TaskManager真正干活的人,数据的处理操作都是他们来做的。 1集群规划 节点服务器hadoop102hadoop103hadoop104角色JobManagerTaskManagerTaskManagerTaskManager 2集群安装及启动 第一步下载解压见本地安装 下载jar上传到hadoop102上然后解压。如果本地安装已经操作则无需操作。 第二步修改集群配置 进入conf目录 /opt/module/flink-1.17.1/conf a.进入conf目录修改flink-conf.yaml文件 [roothadoop102 conf]# vim flink-conf.yaml以下几个地方需要修改 # JobManager节点地址. jobmanager.rpc.address: hadoop102 jobmanager.bind-host: 0.0.0.0 rest.address: hadoop102 rest.bind-address: 0.0.0.0 # TaskManager节点地址.需要配置为当前机器名 taskmanager.bind-host: 0.0.0.0 taskmanager.host: hadoop102b.workers指定hadoop102、hadoop103和hadoop104为TaskManager [roothadoop102 conf]# vim workers 修改为 hadoop102 hadoop103 hadoop104c.修改masters文件指定hadoop102为JobManager [roothadoop102 conf]# vim masters hadoop102:8081在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置可先自行了解下 主要配置项如下 jobmanager.memory.process.sizetaskmanager.memory.process.sizetaskmanager.numberOfTaskSlotsparallelism.default 第三步发送到其它所有服务器hadoop103、Hadoop04 [roothadoop102 module]# scp -r flink-1.17.1 roothadoop103:/opt/module/ [roothadoop102 module]# scp -r flink-1.17.1 roothadoop104:/opt/module/hadoop103、hadoop104配置修改 taskmanager.host [roothadoop103 conf]# vim flink-conf.yaml taskmanager.host: hadoop103[roothadoop104 conf]# vim flink-conf.yaml taskmanager.host: hadoop104第四步启动集群 hadoop102上执行start-cluster.sh [roothadoop102 bin]# ./start-cluster.sh Starting cluster. Starting standalonesession daemon on host hadoop102. Starting taskexecutor daemon on host hadoop102. Starting taskexecutor daemon on host hadoop103. Starting taskexecutor daemon on host hadoop104.查看进程 [roothadoop102 bin]# jps 28656 TaskManagerRunner 28788 Jps 28297 StandaloneSessionClusterEntrypoint[roothadoop103 conf]# jps 4678 TaskManagerRunner 4750 Jps[roothadoop104 ~]# jps 6593 TaskManagerRunner 6668 JpsStandaloneSessionClusterEntrypointJobManager进程 TaskManagerRunnerTaskManager进程 访问WEB UIhttp://hadoop102:8081/ 第五步停止集群 [roothadoop102 bin]# ./stop-cluster.sh2.2 Flink应用开发 开发工具IDEA 0、创建项目 1创建工程 1打开IntelliJ IDEA创建一个Maven工程。 2填写项目信息 (3) 添加项目依赖 pom.xml ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.zlin/groupIdartifactIdflink-study/artifactIdpackagingpom/packagingversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink.version1.17.0/flink.versionslf4j.version2.0.5/slf4j.version/propertiesdependencies!-- Flink相关依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependency!-- 日志管理相关依赖 --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.19.0/version/dependency/dependencies/project1、代码编写WordCount 在开发中如果我们有很多子项目则可以创建一个个Module。相当于一个个子项目这样结构清晰而且所有子项目都拥有父项目pom文件中的依赖。 需求统计一段文字中每个单词出现的频次。 环境准备在src/main/java目录下新建一个包命名为com.atguigu.wc 这里也给出了批处理的代码可以和流处理做下对比。 1.批处理 1数据准备 工程目录下创建一个目录 input 目录下创建一个文件文件名随意写一些单词 1.txt hello udian hello flink test test2代码编写 创建package com.zlin.wc 创建类BatchWordCount package com.zlin.wc;import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;/*** 单词统计批处理* author ZLin* since 2022/12/17*/ public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据按行读取DataSourceString lineDs env.readTextFile(input/);// 3. 转换数据格式FlatMapOperatorString, Tuple2String, Long wordAndOnes lineDs.flatMap((String line, CollectorTuple2String, Long out) - {String[] words line.split( );for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 按照word(下标为0)进行分组UnsortedGroupingTuple2String, Long wordAndOneUg wordAndOnes.groupBy(0);// 5. 分组内聚合统计AggregateOperatorTuple2String, Long sum wordAndOneUg.sum(1);// 6. 打印结果sum.print();} }3输出 (java,1) (flink,1) (test,2) (hello,2)2.流处理 a.从文件读取 package com.zlin.wc;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.util.Collector;import java.util.Arrays;/*** 有界流* author ZLin* since 2022/12/19*/ public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 配置数据源FileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path(input/)).build();// 3. 从数据源中读取数据DataStreamSourceString lineDss env.fromSource(fileSource, WatermarkStrategy.noWatermarks(),file-source);// 4.转换格式 (word, 1L)SingleOutputStreamOperatorTuple2String, Long wordAndOne lineDss.flatMap((String line, CollectorString words) - Arrays.stream(line.split( )).forEach(words::collect)).returns(Types.STRING).map(word - Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));// 5. 按单词分组KeyedStreamTuple2String, Long, String wordAndOneKs wordAndOne.keyBy(t - t.f0);// 6. 求和SingleOutputStreamOperatorTuple2String, Long result wordAndOneKs.sum(1);// 7. 打印result.print();// 8. 执行env.execute(单词统计有界流);} }输出 (java,1) (flink,1) (test,2) (hello,2)b.从socket读取 package com.zlin.wc;import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;import java.util.Arrays;/*** 单词统计无界流* author ZLin* since 2022/12/20*/ public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//2. 从socket读取文本流DataStreamSourceString lineDss env.socketTextStream(hadoop102, 7777);//3. 转换数据格式SingleOutputStreamOperatorTuple2String, Long wordAndOne lineDss.flatMap((String line, CollectorString words) - Arrays.stream(line.split( )).forEach(words::collect)).returns(Types.STRING).map(word - Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4. 分组KeyedStreamTuple2String, Long, String wordAndOneKs wordAndOne.keyBy(t - t.f0);//5. 求和SingleOutputStreamOperatorTuple2String, Long result wordAndOneKs.sum(1);//6. 打印result.print();//7. 执行env.execute(单词统计无界流);} }测试-在hadoop102中用 netcat 命令进行发送测试 nc -lk 7777 注意这里要先在hadoop102上先执行nc -lk 7777把端口打开再在IDEA中运行程序否则连不上端口会报错。 输出 4 (hello,1) 2 (java,1) 4 (hello,2) 10 (flink,1) 7 (test,1) 7 (test,2)2.3 Flink应用提交到集群 在IDEA中我们开发完了项目后我们需要把我们的项目部署到集群中。 首先将程序打包 1pom.xml文件添加打包插件的配置 buildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.1.0/versionconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins /build点击Maven-你的moudle-package 进行打包显示如下即打包成功。 [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 15.263 s [INFO] Finished at: 2022-12-22T00:53:5008:00 [INFO] ------------------------------------------------------------------------Process finished with exit code 0 可在target目录下看到打包成功的jar包 -with-dependencies是带依赖的另一个是不带依赖的。 如果运行的环境中已经有程序所要运行的依赖则直接使用不带依赖的。 1. Web UI 点击Add New上传我们的jar包然后填写配置最后点击提交 注意: 由于我们的程序是统计Hadoop102:7777这个端口发送过来的数据所以我们需要先开启这个端口。不然程序提交会报错。 [roothadoop102 bin]# nc -lk 7777 之后我们再submit我们的任务。 我们发送一些数据测试一下 [roothadoop102 bin]# nc -lk 7777 heelo 222 ppp fff hello world how are you hello flink2. 命令行方式 确认flink集群已经启动 第一步将jar包上传到服务器上 第二步开启hadoop102:7777端口 [roothadoop102 bin]# nc -lk 7777第三步提交作业 [roothadoop102 jars]# flink run -m hadoop102:8081 -c com.zlin.wc.StreamWordCount ./chapter2-1.0-SNAPSHOT.jar Job has been submitted with JobID f00421ad4c893deb17068047263a4e9e发送一些数据 [roothadoop102 bin]# nc -lk 7777 666 777 888
http://www.dnsts.com.cn/news/136490.html

相关文章:

  • 无锡网站建设制作公司毕业设计做网站简单吗
  • 怎么做网站游戏阿里云网站建设方案书模板
  • 济南网页制作公司宁波seo教程网
  • 网站优化主旨广东省城乡和建设厅网站
  • 干果坚果网站建设电子商务网站平台建设目标
  • 网站提交网址淘宝店铺交易
  • 网站建设案例教程视频教程全网营销策划公司
  • 医院建设网站虹口 教育 网站建设
  • 百度做网站按点击量收费吗济南建设局网站
  • 买域名和服务器做自己的网站快速建设网站工具
  • 高校网站建设及管理制度网站建设费用大全
  • 网站的推广城乡建设网站人力资源
  • 有哪些做网站好的公司好青羊建站报价
  • 资源下载站wordpress主题网站效果图设计思路
  • 上海建设网站是多少做房产中介需要有内部网站吗
  • 付网站建设费用会计分录闽侯做网站
  • 腾虎广州网站建设wordpress整站打包
  • 西安做网站公司哪家好个人做企业 网站
  • 江苏省建设工程质量监督站网站直播平台有哪些
  • 常熟建设网站素材下载网站
  • 威海建设局网站如何做kindle电子书下载网站
  • 一个公司网站开发多少钱西安网站建设公司西安网络公司
  • 宁波seo网站排名优化公司网站行业
  • 郓城网站建设外贸推广公司哪家好
  • 佛山大型网站建设建设银行金牛支行网站
  • 网站中qq跳转怎么做的网站的静态页面谁做
  • seo网站推广软件 快排网络推广专员好做吗
  • 招聘做网站鹤壁做网站推广
  • 内销网站要怎么做wordpress 企业主题下载
  • 陕西建设集团招聘信息网站深圳精准网络营销推广