郑州高端定制网站,大良营销网站建设讯息,网站改版 301跳转,雅思培训班价格一览表Flink集群搭建 Flink集群搭建集群规划下载并解压安装包修改集群配置分发安装目录启动集群访问Web UI Flink集群HA高可用概述集群规划配置flink配置master、workers配置ZK分发安装目录启动HA集群测试 Flink参数配置配置历史服务器概述配置启动、停止历史服务器提交一个Job任务查… Flink集群搭建 Flink集群搭建集群规划下载并解压安装包修改集群配置分发安装目录启动集群访问Web UI Flink集群HA高可用概述集群规划配置flink配置master、workers配置ZK分发安装目录启动HA集群测试 Flink参数配置配置历史服务器概述配置启动、停止历史服务器提交一个Job任务查看历史Job信息 Flink集群搭建
集群规划
节点node01node02node03角色JobManager TaskManagerTaskManagerTaskManager
下载并解压安装包
wget https://repo.huaweicloud.com/apache/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz在node01节点下载flink安装包同时解压、重命名。 tar -zxvf flink-1.17.0-bin-scala_2.12.tgz mv flink-1.17.0 flink修改集群配置 进入flink的conf目录修改集群配置 vim /usr/local/program/flink/conf/flink-conf.yaml1.修改flink-conf.yaml文件 JobManager节点配置 # jobmanager.rpc.address: localhost
# jobmanager.bind-host: localhost
jobmanager.rpc.address: node01
jobmanager.bind-host: 0.0.0.0# rest.address: localhost
# rest.bind-address: localhost
rest.address: node01
rest.bind-address: 0.0.0.0TaskManager节点配置 # taskmanager.host: localhost
# taskmanager.bind-host: localhosttaskmanager.host: node01
taskmanager.bind-host: 0.0.0.0注意需要在/etc/hosts文件中配置各个节点信息
172.29.234.1 node01 node01
172.29.234.2 node02 node02
172.29.234.3 node03 node032.修改workers文件 指定node01、node02、node03等节点为TaskManager # localhost
node01
node02
node033.修改masters文件
# localhost:8081
node01:8081分发安装目录 在node01节点安装、配置好后将Flink安装目录分发给另外两个节点服务器。 [rootnode01 program]# pwd
/usr/local/program
[rootnode01 program]# ls
flink jdk8[rootnode01 program]# scp -r flink node02:/usr/local/program/flink[rootnode01 program]# scp -r flink node03:/usr/local/program/flink在node02、node03节点修改flink-conf.yaml 配置
1.node02节点
# taskmanager.host: localhosttaskmanager.host: node022.node03节点
# taskmanager.host: localhosttaskmanager.host: node03启动集群 Flink附带了相关的bash脚本可以用于启动、停止集群。 # 启动集群
./bin/start-cluster.sh# 停止集群
./bin/stop-cluster.sh在node01节点服务器上执行start-cluster.sh脚本以启动Flink集群
[rootnode01 bin]# cd /usr/local/program/flink/bin[rootnode01 bin]# ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node01.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.查看进程情况
[rootnode01 bin]# jps
6788 StandaloneSessionClusterEntrypoint
7256 Jps
7116 TaskManagerRunner[rootnode02 conf]# jps
16884 TaskManagerRunner
16959 Jps[rootnode03 conf]# jps
17139 TaskManagerRunner
17214 Jps访问Web UI
当如上所示一样后代表启动成功此时可以访问http://node01:8081对flink集群和任务进行监控管理。 注意关闭防火墙否则可能无法访问或者集群的TaskManager数量、Slot数量显示异常
systemctl stop firewalld提交任务
[rootnode01 bin]# flink run ../examples/streaming/WordCount.jar查看运行结果
[rootnode01 bin]# tail flink-*-taskexecutor-*.out也可以通过Flink的 Web UI来监视集群的状态和正在运行的作业
Flink集群HA高可用
概述 集群实际上只有一个JobManager是存在单点故障的官方提供了Standalone Cluster HA模式来实现集群高可用。 集群可以有多个JobManager但只有一个处于active状态其余的则处于备用状态Flink使用 ZooKeeper来选举出Active JobManager并依赖其来提供一致性协调服务所以需要预先安装 ZooKeeper 。 Flink本身提供了内置ZooKeeper插件可以直接修改conf/zoo.cfg并且使用 /bin/start-zookeeper-quorum.sh直接启动。 集群规划
节点node01node02node03角色JobManager TaskManagerJobManager TaskManagerTaskManager
配置flink
基于Flink集群的node01节点配置的情况下修改conf/flink-conf.yaml文件增加如下配置
# 配置使用zookeeper来开启高可用模式
high-availability.type: zookeeper# 配置zookeeper的地址采用zookeeper集群时可以使用逗号来分隔多个节点地址
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181# 在zookeeper上存储flink集群元信息的路径
high-availability.zookeeper.path.root: /flink# 集群id 放置集群的所有必需协调数据
high-availability.cluster-id: /cluster_one# 持久化存储JobManager元数据的地址zookeeper上存储的只是指向该元数据的指针信息
high-availability.storageDir: hdfs://node01:9000/flink/recovery配置master、workers
修改conf/masters文件配置master节点
node01:8081
node02:8081修改conf/workers文件配置worker节点
node01
node02
node03配置ZK
编辑vim zoo.cfg文件
server.1node01:2888:3888
server.2node02:2888:3888
server.3node03:2888:3888分发安装目录 在node01节点安装、配置好后将Flink安装目录分发给另外两个节点服务器。 [rootnode01 program]# pwd
/usr/local/program
[rootnode01 program]# ls
flink jdk8[rootnode01 program]# scp -r flink node02:/usr/local/program/flink[rootnode01 program]# scp -r flink node03:/usr/local/program/flink在node02、node03节点修改flink-conf.yaml 配置
1.node02节点
jobmanager.rpc.address: node02taskmanager.host: node022.node03节点
taskmanager.host: node03启动HA集群
分发Flink相关配置到其他节点然后确保Hadoop和ZooKeeper已经启动后使用以下命令来启动集群
[rootnode01 flink]# bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host node01.
Starting standalonesession daemon on host node02.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.访问http://node01:8081 访问http://node02:8081
测试
查看ZKJobManager节点信息 kill node01节点上的JobManager进程
[rootnode01 flink]# jps
2564 DataNode
3508 NodeManager
18741 Jps
7784 QuorumPeerMain
16666 TaskManagerRunner
2363 NameNode
16300 StandaloneSessionClusterEntrypoint
3117 ResourceManager
[rootnode01 flink]# kill -9 16300查看Active JobManager是否变化
Flink参数配置 flink-conf.yaml文件中有大量的配置参数基本常见参数如下 # jobmanager地址
jobmanager.rpc.address: node01# JobManager 的 JVM 堆内存大小默认为 1024m
jobmanager.heap.size: 1024m# rpc通信端口
jobmanager.rpc.port: 6123# 进程使用的全部内存大小,可以根据集群规模进行适当调整
jobmanager.memory.process.size1600m# Taskmanager 的 JVM 堆内存大小默认为 1024m
taskmanager.heap.size: 1024m# 进程使用的全部内存大小,可以根据集群规模进行适当调整
taskmanager.memory.process.size: 1728m# 每个TaskManager能够分配的Slot数量进行配置默认为1
# 通常设置为 CPU 核心的数量或其一半
# Slot就是TaskManager中具体运行一个任务所分配的计算资源
taskmanager.numberOfTaskSlots: 1# flink任务执行的并行度默认为1
# 优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量
parallelism.default: 1# 重启策略
jobmanager.execution.failover-strategy: region# 存储临时文件的路径如果没有配置则默认采用服务器的临时目录如 LInux 的 /tmp 目录
io.tmp.dirs: /tmp参考Flink的官方手册更多配置
配置历史服务器
概述 运行Flink job的集群一旦停止只能去yarn或本地磁盘上查看日志对于Job任务信息的查看、异常问题的排查非常不友好。 Flink提供了历史服务器用来在相应的Flink集群关闭后查询已完成作业的统计信息。通过History Server可以查询这些已完成作业的统计信息无论是正常退出还是异常退出。 Flink任务停止后JobManager会将已经完成任务的统计信息进行存档History Server进程则在任务停止后可以对任务统计信息进行查询。 配置
创建存储目录
[rootnode01 flink]# hadoop fs -mkdir -p /logs/flink-job在flink-config.yaml中添加如下配置
#
# HistoryServer
## The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
jobmanager.archive.fs.dir: hdfs://node01:9000/logs/flink-job# The address under which the web-based HistoryServer listens.
#historyserver.web.address: 0.0.0.0
historyserver.web.address: node01# The port under which the web-based HistoryServer listens.
#historyserver.web.port: 8082
historyserver.web.port: 8082# Comma separated list of directories to monitor for completed jobs.
#historyserver.archive.fs.dir: hdfs:///completed-jobs/
historyserver.archive.fs.dir: hdfs://node01:9000/logs/flink-job# Interval in milliseconds for refreshing the monitored directories.
#historyserver.archive.fs.refresh-interval: 10000
historyserver.archive.fs.refresh-interval: 5000启动、停止历史服务器
启动历史服务器
[rootnode01 flink]# bin/historyserver.sh start
Starting historyserver daemon on host node01.停止历史服务器
[rootnode01 flink]# bin/historyserver.sh stop
Stopping historyserver daemon (pid: 30749) on host node01.提交一个Job任务
[rootnode01 flink]# bin/flink run -t yarn-per-job -c com.atguigu.wc.WordCountStreamUnboundedDemo /root/FlinkTutorial-1.17-1.0-SNAPSHOT.jar2023-06-12 23:41:00,719 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 23:41:00,742 INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted false, remoteHostTrusted false
2023-06-12 23:41:00,761 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 23:41:00,766 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1686577483648_0012
2023-06-12 23:41:00,792 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1686577483648_0012
2023-06-12 23:41:00,792 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2023-06-12 23:41:00,793 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2023-06-12 23:41:04,565 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2023-06-12 23:41:04,565 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node02:38887 of application application_1686577483648_0012.
Job has been submitted with JobID cd41d983c93d8eb906c9aa899dcdefd0访问http://node01:8088/cluster查看Hadoop 访问Web UI查看提交任务信息
查看历史Job信息
在浏览器地址栏输入http://node01:8082 查看已经停止的 job 的统计信息 停止提交任务
[rootnode01 flink]# bin/flink cancel -t yarn-per-job -Dyarn.application.idapplication_1686577483648_0012 cd41d983c93d8eb906c9aa899dcdefd0访问http://node01:9870/explorer.html#/logs/flink-job查看HDFS中的归档文件 等一段时间几分钟后查看历史服务器 查看Job具体信息