四川网站开发,云空间的网站如何做,wordpress多站点用户互通,wordpress4.8 php版本 介绍Flink的安装、启动以及如何进行Flink程序的开发#xff0c;如何运行部署Flink程序等 2.1 Flink的安装和启动 本地安装指的是单机模式 0、前期准备
java8或者java11#xff08;官方推荐11#xff09;下载Flink安装包 https://flink.apache.org/zh/downloads/hadoop如何运行部署Flink程序等 2.1 Flink的安装和启动 本地安装指的是单机模式 0、前期准备
java8或者java11官方推荐11下载Flink安装包 https://flink.apache.org/zh/downloads/hadoop后面Flink on Yarn部署模式需要服务器我是使用虚拟机创建了三个centos的实例hadoop102、hadoop103、Hadoop104
1、本地安装单机
第一步解压
[roothadoop102 software]# tar -zxvf flink-1.17.1-bin-scala_2.12.tgz -C /opt/module/第二步启动
[roothadoop102 bin]# cd /opt/module/flink-1.17.1/bin
[roothadoop102 bin]# ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop102.查看进程
[roothadoop102 ~]# jps
24817 StandaloneSessionClusterEntrypoint
25330 Jps
25117 TaskManagerRunner有StandaloneSessionClusterEntrypoint和TaskManagerRunner就说明成功启动。
第三步提交作业
# 命令
./flink run xxx.jarFlink提供了一些示例程序已经打成了jar包可直接运行 # 运行一个统计单词数量的Flink示例程序
[roothadoop102 bin]# ./flink run ../examples/streaming/WordCount.jar# 查看输出
[roothadoop102 bin]# tail ../log/flink-*-taskexecutor-*.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)第四步停止集群
[roothadoop102 bin]# ./stop-cluster.sh★★★ 在企业中单机模式无法支撑业务所以都是以集群的方式安装故后续内容都是以集群展开。
2、集群安装
0集群角色 为了更好理解安装配置过程这里先提一下Flink集群的几个关键组件 三个关键组件 客户端JobClient接收用户的代码并做一些转换会生成一个执行计划这个执行计划我们也叫数据流data flow然后发送给JobManager去进行下一步的执行执行完成后客户端会将结果返回给用户。客户端并不是Flink程序执行的内部组成部分但它是执行的起点。 JobManager主进程Flink集群的“管事人”对作业进行中央调度管理主要职责包括计划任务、管理检查点、故障恢复等。获取到要执行的作业后会做进一步的转换然后分发给众多的TaskManager。 TaskManager真正干活的人,数据的处理操作都是他们来做的。
1集群规划
节点服务器hadoop102hadoop103hadoop104角色JobManagerTaskManagerTaskManagerTaskManager
2集群安装及启动
第一步下载解压见本地安装
下载jar上传到hadoop102上然后解压。如果本地安装已经操作则无需操作。
第二步修改集群配置
进入conf目录
/opt/module/flink-1.17.1/conf
a.进入conf目录修改flink-conf.yaml文件
[roothadoop102 conf]# vim flink-conf.yaml以下几个地方需要修改
# JobManager节点地址.
jobmanager.rpc.address: hadoop102
jobmanager.bind-host: 0.0.0.0
rest.address: hadoop102
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: hadoop102b.workers指定hadoop102、hadoop103和hadoop104为TaskManager
[roothadoop102 conf]# vim workers
修改为
hadoop102
hadoop103
hadoop104c.修改masters文件指定hadoop102为JobManager
[roothadoop102 conf]# vim masters
hadoop102:8081在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置可先自行了解下 主要配置项如下
jobmanager.memory.process.sizetaskmanager.memory.process.sizetaskmanager.numberOfTaskSlotsparallelism.default
第三步发送到其它所有服务器hadoop103、Hadoop04
[roothadoop102 module]# scp -r flink-1.17.1 roothadoop103:/opt/module/
[roothadoop102 module]# scp -r flink-1.17.1 roothadoop104:/opt/module/hadoop103、hadoop104配置修改 taskmanager.host
[roothadoop103 conf]# vim flink-conf.yaml
taskmanager.host: hadoop103[roothadoop104 conf]# vim flink-conf.yaml
taskmanager.host: hadoop104第四步启动集群
hadoop102上执行start-cluster.sh
[roothadoop102 bin]# ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop103.
Starting taskexecutor daemon on host hadoop104.查看进程
[roothadoop102 bin]# jps
28656 TaskManagerRunner
28788 Jps
28297 StandaloneSessionClusterEntrypoint[roothadoop103 conf]# jps
4678 TaskManagerRunner
4750 Jps[roothadoop104 ~]# jps
6593 TaskManagerRunner
6668 JpsStandaloneSessionClusterEntrypointJobManager进程
TaskManagerRunnerTaskManager进程
访问WEB UIhttp://hadoop102:8081/ 第五步停止集群
[roothadoop102 bin]# ./stop-cluster.sh2.2 Flink应用开发 开发工具IDEA 0、创建项目
1创建工程
1打开IntelliJ IDEA创建一个Maven工程。 2填写项目信息 (3) 添加项目依赖
pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.zlin/groupIdartifactIdflink-study/artifactIdpackagingpom/packagingversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink.version1.17.0/flink.versionslf4j.version2.0.5/slf4j.version/propertiesdependencies!-- Flink相关依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependency!-- 日志管理相关依赖 --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.19.0/version/dependency/dependencies/project1、代码编写WordCount 在开发中如果我们有很多子项目则可以创建一个个Module。相当于一个个子项目这样结构清晰而且所有子项目都拥有父项目pom文件中的依赖。 需求统计一段文字中每个单词出现的频次。 环境准备在src/main/java目录下新建一个包命名为com.atguigu.wc 这里也给出了批处理的代码可以和流处理做下对比。 1.批处理
1数据准备
工程目录下创建一个目录 input 目录下创建一个文件文件名随意写一些单词
1.txt
hello udian hello flink
test test2代码编写
创建package com.zlin.wc 创建类BatchWordCount
package com.zlin.wc;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** 单词统计批处理* author ZLin* since 2022/12/17*/
public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据按行读取DataSourceString lineDs env.readTextFile(input/);// 3. 转换数据格式FlatMapOperatorString, Tuple2String, Long wordAndOnes lineDs.flatMap((String line, CollectorTuple2String, Long out) - {String[] words line.split( );for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 按照word(下标为0)进行分组UnsortedGroupingTuple2String, Long wordAndOneUg wordAndOnes.groupBy(0);// 5. 分组内聚合统计AggregateOperatorTuple2String, Long sum wordAndOneUg.sum(1);// 6. 打印结果sum.print();}
}3输出
(java,1)
(flink,1)
(test,2)
(hello,2)2.流处理
a.从文件读取
package com.zlin.wc;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.util.Collector;import java.util.Arrays;/*** 有界流* author ZLin* since 2022/12/19*/
public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 配置数据源FileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path(input/)).build();// 3. 从数据源中读取数据DataStreamSourceString lineDss env.fromSource(fileSource, WatermarkStrategy.noWatermarks(),file-source);// 4.转换格式 (word, 1L)SingleOutputStreamOperatorTuple2String, Long wordAndOne lineDss.flatMap((String line, CollectorString words) - Arrays.stream(line.split( )).forEach(words::collect)).returns(Types.STRING).map(word - Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));// 5. 按单词分组KeyedStreamTuple2String, Long, String wordAndOneKs wordAndOne.keyBy(t - t.f0);// 6. 求和SingleOutputStreamOperatorTuple2String, Long result wordAndOneKs.sum(1);// 7. 打印result.print();// 8. 执行env.execute(单词统计有界流);}
}输出
(java,1)
(flink,1)
(test,2)
(hello,2)b.从socket读取
package com.zlin.wc;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Arrays;/*** 单词统计无界流* author ZLin* since 2022/12/20*/
public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//2. 从socket读取文本流DataStreamSourceString lineDss env.socketTextStream(hadoop102, 7777);//3. 转换数据格式SingleOutputStreamOperatorTuple2String, Long wordAndOne lineDss.flatMap((String line, CollectorString words) - Arrays.stream(line.split( )).forEach(words::collect)).returns(Types.STRING).map(word - Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4. 分组KeyedStreamTuple2String, Long, String wordAndOneKs wordAndOne.keyBy(t - t.f0);//5. 求和SingleOutputStreamOperatorTuple2String, Long result wordAndOneKs.sum(1);//6. 打印result.print();//7. 执行env.execute(单词统计无界流);}
}测试-在hadoop102中用 netcat 命令进行发送测试
nc -lk 7777 注意这里要先在hadoop102上先执行nc -lk 7777把端口打开再在IDEA中运行程序否则连不上端口会报错。 输出
4 (hello,1)
2 (java,1)
4 (hello,2)
10 (flink,1)
7 (test,1)
7 (test,2)2.3 Flink应用提交到集群 在IDEA中我们开发完了项目后我们需要把我们的项目部署到集群中。 首先将程序打包
1pom.xml文件添加打包插件的配置
buildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.1.0/versionconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins
/build点击Maven-你的moudle-package 进行打包显示如下即打包成功。
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 15.263 s
[INFO] Finished at: 2022-12-22T00:53:5008:00
[INFO] ------------------------------------------------------------------------Process finished with exit code 0
可在target目录下看到打包成功的jar包 -with-dependencies是带依赖的另一个是不带依赖的。
如果运行的环境中已经有程序所要运行的依赖则直接使用不带依赖的。
1. Web UI
点击Add New上传我们的jar包然后填写配置最后点击提交 注意: 由于我们的程序是统计Hadoop102:7777这个端口发送过来的数据所以我们需要先开启这个端口。不然程序提交会报错。 [roothadoop102 bin]# nc -lk 7777
之后我们再submit我们的任务。 我们发送一些数据测试一下
[roothadoop102 bin]# nc -lk 7777
heelo 222
ppp
fff
hello world
how are you
hello flink2. 命令行方式 确认flink集群已经启动 第一步将jar包上传到服务器上
第二步开启hadoop102:7777端口
[roothadoop102 bin]# nc -lk 7777第三步提交作业
[roothadoop102 jars]# flink run -m hadoop102:8081 -c com.zlin.wc.StreamWordCount ./chapter2-1.0-SNAPSHOT.jar
Job has been submitted with JobID f00421ad4c893deb17068047263a4e9e发送一些数据
[roothadoop102 bin]# nc -lk 7777
666
777
888