公司网站建设内容,wordpress小工具分享,服装设计师必看的网站,网络营销成功案例任务提交流程
Flink 的提交流程随着部署模式、资源管理平台的不同#xff0c;会有不同的变化。这里做进一步的抽象#xff0c;形成一个大概高视角的任务执行流程图#xff0c;如下#xff1a;
Flink按照集群和资源管理的划分运行模式有#xff1a;Standalone、Flink On…任务提交流程
Flink 的提交流程随着部署模式、资源管理平台的不同会有不同的变化。这里做进一步的抽象形成一个大概高视角的任务执行流程图如下
Flink按照集群和资源管理的划分运行模式有Standalone、Flink On Yarn、K8S等。
Standalone
Standalone为独立模式独立运行不依赖外部资源调度管理框架。如果资源不足或出现故障没有自动扩展和重分配的机制需要手动处理。一般适合开发测试和作业较少的场景。支持的部署模式有会话部署模式、应用部署模式。不支持PerJob单作业部署模式。
会话模式
首先启动集群然后Web访问JobManager的8081端口提交任务或命令提交提交任务如下
cd ${FLINK_HOME}
bin/start-cluster.sh # 启动集群根据配置文件TM的slot划分成静态的
bin/flink run -m master:8081 -c pers.xxm.flink.MyFlink /tmp/mytask.jar
bin/flink cancel app_id # id可通过flink list或UI查看
bin/stop-cluster.sh # 停止集群再次提交第二个Job时JobManager和TaskManager还是原来的进程在JobManager内部会重新启动JobMaster线程类似Spark的Driver。新的任务继续占用TaskManager的插槽如果插槽不足任务提交失败。
应用模式
该模式不用启动集群。提交任务如下
cd ${FLINK_HOME}
mv /tmp/mytask.jar lib/ # 将jar包放入lib目录
bin/standalone-job.sh start --job-classname pers.xxm.flink.MyFlink # JobManager机器执行
bin/taskmanager.sh start # 在所有需要跑TaskManager的机器执行
bin/taskmanager.sh stop # 停止集群同时作业停止集群销毁
bin/standalone-job.sh stop # 停止集群该模式在8081端口也可以看到集群和任务运行。此时如果通过UI取消作业运行集群也会销毁。
Flink On Yarn
Flink集群安装在Hadoop集群上或者下载Flink依赖的Jar包建议安装在Hadoop集群上参照环境配置如下
# 配置环境变量所有机器
vim /etc/profile.d/my_env.sh # 内容如下4行
HADOOP_HOME/usr/local/hadoop
export PATH$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATHhadoop classpath
source /etc/profile.d/my_env.sh
# 启动YARN集群master运行当前节点启动RM在slaves配置的节点启动NM
start-yarn.sh
start-dfs.sh # 如果需要也可启动HDFSYARN模式根据并行度除以每个TM插槽数向上取整动态申请TM每个TM的slot参考Flink配置文件。该运行模式下支持的部署包括会话、单作业、应用模式。
会话模式
首先需要申请YARN会话Yarn Session然后启动Flink集群。启动会话应用如下
cd $FLINK_HOME
# 执行后在YARN的8088端口UI查看生成了一个应用
bin/yarn-session.sh -nm my_app此时在flink-conf.yaml的配置被覆盖即无效。启动后YARN会自动分配JM的机器和端口在终端日志中会打印JM Web Interface地址也可通过YARN的界面找到Tracking UI进入JM这时使用YARN代理的方式进入。
未提交作业时TM个数为0因为Flink会根据运行在JM的作业所需slot动态分配TM。可提交多个作业。Job取消或结束后占用的slot和tm会显示为可用但过一会会被回收显示总数和可用都为0。也可通过命令行提交任务到YARN会话如下
# 提交时查找/tmp/.yarn-properties-username文件根据该文件对应YARN提交任务
bin/flink run -c pers.xxm.flink.MyFlink /tmp/mytask.jar
# 关闭YARN会话集群该命令在启动上述会话时已经打印在标准输出中
echo stop | bin/yarn-session.sh -id app_id单作业模式
每个作业占用一个YARN应用即YARN集群提交方式如下
# 加上参数-d会推出占用模式在后台运行CTRLC退出不会影响集群的运行
bin/flink run -t yarn-per-job -c pers.xxm.flink.MyFlink /tmp/mytask.jar
bin/flink list -t yarn-per-job -Dyarn.application.idappid # 查看集群中作业ID
bin/flink cancel -t yarn-per-job -Dyarn.application.idappid job_id # 关闭上面的作业ID在Flink的UI界面CANCEL任务后YARN的应用状态变为FINISHED这种关闭和上面命令行是一样的。
应用模式
Flink-1.11之后加入应用模式和上个YARN单作业提交区别是此时提交的客户端不做代码解析等操作这也是推荐的模式。提交方式如下
bin/flink run-application -t yarn-application -c pers.xxm.flink.MyFlink /tmp/mytask.jar
bin/flink list -t yarn-application -Dyarn.application.idappid # 查看集群中作业ID
bin/flink cancel -t yarn-application -Dyarn.application.idappid job_id # 关闭作业IDYARN模式优化
YARN的工作原理是每次执行任务时都需要将Flink和用户的Jar包上传到HDFS上所以在YARN的单作业和应用部署模式下可以将依赖JAR包先上传到HDFS然后指定路径此时每次提交任务不会再次上传jar包到HDFS。
hadoop fs -mkdir /yarn/flink/dist # 创建目录
hadoop fs -mkdir /yarn/flink/jobs # 创建目录
hadoop fs -put lib/ /yarn/flink/dist # 将lib目录上传到dist目录下
hadoop fs -put plugins/ /yarn/flink/dist
hadoop fs -put /tmp/mytask.jar /yarn/flink/jobs # 自己的任务jar包放到jobs目录
# 此时以应用部署模式为例提交任务如下
bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirshdfs://master:8020/yarn/flink/dist -c pers.xxm.flink.MyFlink hdfs://master:8020/yarn/flink/jobs/mytask.jar