当前位置: 首页 > news >正文

四川网站开发云空间的网站如何做

四川网站开发,云空间的网站如何做,wordpress多站点用户互通,wordpress4.8 php版本​ 介绍Flink的安装、启动以及如何进行Flink程序的开发#xff0c;如何运行部署Flink程序等 2.1 Flink的安装和启动 本地安装指的是单机模式 0、前期准备 java8或者java11#xff08;官方推荐11#xff09;下载Flink安装包 https://flink.apache.org/zh/downloads/hadoop如何运行部署Flink程序等 2.1 Flink的安装和启动 本地安装指的是单机模式 0、前期准备 java8或者java11官方推荐11下载Flink安装包 https://flink.apache.org/zh/downloads/hadoop后面Flink on Yarn部署模式需要服务器我是使用虚拟机创建了三个centos的实例hadoop102、hadoop103、Hadoop104 1、本地安装单机 第一步解压 [roothadoop102 software]# tar -zxvf flink-1.17.1-bin-scala_2.12.tgz -C /opt/module/第二步启动 [roothadoop102 bin]# cd /opt/module/flink-1.17.1/bin [roothadoop102 bin]# ./start-cluster.sh Starting cluster. Starting standalonesession daemon on host hadoop102. Starting taskexecutor daemon on host hadoop102.查看进程 [roothadoop102 ~]# jps 24817 StandaloneSessionClusterEntrypoint 25330 Jps 25117 TaskManagerRunner有StandaloneSessionClusterEntrypoint和TaskManagerRunner就说明成功启动。 第三步提交作业 # 命令 ./flink run xxx.jarFlink提供了一些示例程序已经打成了jar包可直接运行 # 运行一个统计单词数量的Flink示例程序 [roothadoop102 bin]# ./flink run ../examples/streaming/WordCount.jar# 查看输出 [roothadoop102 bin]# tail ../log/flink-*-taskexecutor-*.out (nymph,1) (in,3) (thy,1) (orisons,1) (be,4) (all,2) (my,1) (sins,1) (remember,1) (d,4)第四步停止集群 [roothadoop102 bin]# ./stop-cluster.sh★★★ 在企业中单机模式无法支撑业务所以都是以集群的方式安装故后续内容都是以集群展开。 2、集群安装 0集群角色 为了更好理解安装配置过程这里先提一下Flink集群的几个关键组件 三个关键组件 客户端JobClient接收用户的代码并做一些转换会生成一个执行计划这个执行计划我们也叫数据流data flow然后发送给JobManager去进行下一步的执行执行完成后客户端会将结果返回给用户。客户端并不是Flink程序执行的内部组成部分但它是执行的起点。 JobManager主进程Flink集群的“管事人”对作业进行中央调度管理主要职责包括计划任务、管理检查点、故障恢复等。获取到要执行的作业后会做进一步的转换然后分发给众多的TaskManager。 TaskManager真正干活的人,数据的处理操作都是他们来做的。 1集群规划 节点服务器hadoop102hadoop103hadoop104角色JobManagerTaskManagerTaskManagerTaskManager 2集群安装及启动 第一步下载解压见本地安装 下载jar上传到hadoop102上然后解压。如果本地安装已经操作则无需操作。 第二步修改集群配置 进入conf目录 /opt/module/flink-1.17.1/conf a.进入conf目录修改flink-conf.yaml文件 [roothadoop102 conf]# vim flink-conf.yaml以下几个地方需要修改 # JobManager节点地址. jobmanager.rpc.address: hadoop102 jobmanager.bind-host: 0.0.0.0 rest.address: hadoop102 rest.bind-address: 0.0.0.0 # TaskManager节点地址.需要配置为当前机器名 taskmanager.bind-host: 0.0.0.0 taskmanager.host: hadoop102b.workers指定hadoop102、hadoop103和hadoop104为TaskManager [roothadoop102 conf]# vim workers 修改为 hadoop102 hadoop103 hadoop104c.修改masters文件指定hadoop102为JobManager [roothadoop102 conf]# vim masters hadoop102:8081在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置可先自行了解下 主要配置项如下 jobmanager.memory.process.sizetaskmanager.memory.process.sizetaskmanager.numberOfTaskSlotsparallelism.default 第三步发送到其它所有服务器hadoop103、Hadoop04 [roothadoop102 module]# scp -r flink-1.17.1 roothadoop103:/opt/module/ [roothadoop102 module]# scp -r flink-1.17.1 roothadoop104:/opt/module/hadoop103、hadoop104配置修改 taskmanager.host [roothadoop103 conf]# vim flink-conf.yaml taskmanager.host: hadoop103[roothadoop104 conf]# vim flink-conf.yaml taskmanager.host: hadoop104第四步启动集群 hadoop102上执行start-cluster.sh [roothadoop102 bin]# ./start-cluster.sh Starting cluster. Starting standalonesession daemon on host hadoop102. Starting taskexecutor daemon on host hadoop102. Starting taskexecutor daemon on host hadoop103. Starting taskexecutor daemon on host hadoop104.查看进程 [roothadoop102 bin]# jps 28656 TaskManagerRunner 28788 Jps 28297 StandaloneSessionClusterEntrypoint[roothadoop103 conf]# jps 4678 TaskManagerRunner 4750 Jps[roothadoop104 ~]# jps 6593 TaskManagerRunner 6668 JpsStandaloneSessionClusterEntrypointJobManager进程 TaskManagerRunnerTaskManager进程 访问WEB UIhttp://hadoop102:8081/ 第五步停止集群 [roothadoop102 bin]# ./stop-cluster.sh2.2 Flink应用开发 开发工具IDEA 0、创建项目 1创建工程 1打开IntelliJ IDEA创建一个Maven工程。 2填写项目信息 (3) 添加项目依赖 pom.xml ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.zlin/groupIdartifactIdflink-study/artifactIdpackagingpom/packagingversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink.version1.17.0/flink.versionslf4j.version2.0.5/slf4j.version/propertiesdependencies!-- Flink相关依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependency!-- 日志管理相关依赖 --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.19.0/version/dependency/dependencies/project1、代码编写WordCount 在开发中如果我们有很多子项目则可以创建一个个Module。相当于一个个子项目这样结构清晰而且所有子项目都拥有父项目pom文件中的依赖。 需求统计一段文字中每个单词出现的频次。 环境准备在src/main/java目录下新建一个包命名为com.atguigu.wc 这里也给出了批处理的代码可以和流处理做下对比。 1.批处理 1数据准备 工程目录下创建一个目录 input 目录下创建一个文件文件名随意写一些单词 1.txt hello udian hello flink test test2代码编写 创建package com.zlin.wc 创建类BatchWordCount package com.zlin.wc;import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;/*** 单词统计批处理* author ZLin* since 2022/12/17*/ public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据按行读取DataSourceString lineDs env.readTextFile(input/);// 3. 转换数据格式FlatMapOperatorString, Tuple2String, Long wordAndOnes lineDs.flatMap((String line, CollectorTuple2String, Long out) - {String[] words line.split( );for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 按照word(下标为0)进行分组UnsortedGroupingTuple2String, Long wordAndOneUg wordAndOnes.groupBy(0);// 5. 分组内聚合统计AggregateOperatorTuple2String, Long sum wordAndOneUg.sum(1);// 6. 打印结果sum.print();} }3输出 (java,1) (flink,1) (test,2) (hello,2)2.流处理 a.从文件读取 package com.zlin.wc;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.util.Collector;import java.util.Arrays;/*** 有界流* author ZLin* since 2022/12/19*/ public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 配置数据源FileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path(input/)).build();// 3. 从数据源中读取数据DataStreamSourceString lineDss env.fromSource(fileSource, WatermarkStrategy.noWatermarks(),file-source);// 4.转换格式 (word, 1L)SingleOutputStreamOperatorTuple2String, Long wordAndOne lineDss.flatMap((String line, CollectorString words) - Arrays.stream(line.split( )).forEach(words::collect)).returns(Types.STRING).map(word - Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));// 5. 按单词分组KeyedStreamTuple2String, Long, String wordAndOneKs wordAndOne.keyBy(t - t.f0);// 6. 求和SingleOutputStreamOperatorTuple2String, Long result wordAndOneKs.sum(1);// 7. 打印result.print();// 8. 执行env.execute(单词统计有界流);} }输出 (java,1) (flink,1) (test,2) (hello,2)b.从socket读取 package com.zlin.wc;import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;import java.util.Arrays;/*** 单词统计无界流* author ZLin* since 2022/12/20*/ public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//2. 从socket读取文本流DataStreamSourceString lineDss env.socketTextStream(hadoop102, 7777);//3. 转换数据格式SingleOutputStreamOperatorTuple2String, Long wordAndOne lineDss.flatMap((String line, CollectorString words) - Arrays.stream(line.split( )).forEach(words::collect)).returns(Types.STRING).map(word - Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4. 分组KeyedStreamTuple2String, Long, String wordAndOneKs wordAndOne.keyBy(t - t.f0);//5. 求和SingleOutputStreamOperatorTuple2String, Long result wordAndOneKs.sum(1);//6. 打印result.print();//7. 执行env.execute(单词统计无界流);} }测试-在hadoop102中用 netcat 命令进行发送测试 nc -lk 7777 注意这里要先在hadoop102上先执行nc -lk 7777把端口打开再在IDEA中运行程序否则连不上端口会报错。 输出 4 (hello,1) 2 (java,1) 4 (hello,2) 10 (flink,1) 7 (test,1) 7 (test,2)2.3 Flink应用提交到集群 在IDEA中我们开发完了项目后我们需要把我们的项目部署到集群中。 首先将程序打包 1pom.xml文件添加打包插件的配置 buildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.1.0/versionconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins /build点击Maven-你的moudle-package 进行打包显示如下即打包成功。 [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 15.263 s [INFO] Finished at: 2022-12-22T00:53:5008:00 [INFO] ------------------------------------------------------------------------Process finished with exit code 0 可在target目录下看到打包成功的jar包 -with-dependencies是带依赖的另一个是不带依赖的。 如果运行的环境中已经有程序所要运行的依赖则直接使用不带依赖的。 1. Web UI 点击Add New上传我们的jar包然后填写配置最后点击提交 注意: 由于我们的程序是统计Hadoop102:7777这个端口发送过来的数据所以我们需要先开启这个端口。不然程序提交会报错。 [roothadoop102 bin]# nc -lk 7777 之后我们再submit我们的任务。 我们发送一些数据测试一下 [roothadoop102 bin]# nc -lk 7777 heelo 222 ppp fff hello world how are you hello flink2. 命令行方式 确认flink集群已经启动 第一步将jar包上传到服务器上 第二步开启hadoop102:7777端口 [roothadoop102 bin]# nc -lk 7777第三步提交作业 [roothadoop102 jars]# flink run -m hadoop102:8081 -c com.zlin.wc.StreamWordCount ./chapter2-1.0-SNAPSHOT.jar Job has been submitted with JobID f00421ad4c893deb17068047263a4e9e发送一些数据 [roothadoop102 bin]# nc -lk 7777 666 777 888
http://www.dnsts.com.cn/news/190746.html

相关文章:

  • 北京网站开发哪家好薇私人做网站需要多少钱
  • 荆州做网站网页建站要多久
  • 一个网站同时做百度和360 百度商桥都可以接收客户信息吗邢台做移动网站公司电话号码
  • 做买家秀的网站北京网站建设 app
  • 网站上的分享wordpress付费订阅插件
  • 广州网站建设信科便宜已经备案的网站新增ip怎么做
  • 泌阳县网站建设中原城市领先指数
  • 从化做网站建设免费网站软件app大全
  • 太原网站建设与维护网站模版 蓝色
  • 公司主营业务网站建设吴江网页制作
  • 深圳商城网站设计电话软件开发工程师英文
  • 给公司做企业网站建设门户网站培训通知
  • 网站建设350元做网站一般多少钱
  • 做药的常用网站有哪些论坛网站免费建设模板下载安装
  • 公司网站改版方案盛世平面设计网站中文
  • 美工素材网站有哪些有域名了网站怎么做
  • 地理云门户网站建设庆阳官网贴吧
  • 青岛做网站建设的公司排名企业营销策划书
  • 张店低价网站建设湖北广盛建设集团网站
  • c 网站开发框架做商城型网站
  • 商务网站开发实验电商平台代运营
  • 新手做网站看什么书4a网站建设公司
  • 平台型网站建设方案wordpress外网访问没模版
  • 百度收录最快的网站泰安集团网站建设地点
  • 青海市住房和城乡建设厅网站2018年主流网站开发语言
  • 网站正在建设中色免费开网店怎么开
  • 公众号开发者葫岛百度seo
  • 青岛网站推广招商是普通网站地图好还是rss地图好一点
  • 青锐成长计划网站开发人员杭州企业自助建站系统
  • 华为等五家公司南昌关键词优化平台