当前位置: 首页 > news >正文

定制做网站设计哈尔滨城乡建设网站

定制做网站设计,哈尔滨城乡建设网站,网站建设管理报告,优秀的网络搜索引擎营销案例Flink源码解析之#xff1a;Flink On Yarn模式任务提交部署过程解析 一、Flink on Yarn部署模式概述 Apache Hadoop YARN 在许多数据处理框架中都很流行。 Flink 服务提交给 YARN 的 ResourceManager#xff0c;后者会在 YARN NodeManagers 管理的机器上生成容器。 Flink 将…Flink源码解析之Flink On Yarn模式任务提交部署过程解析 一、Flink on Yarn部署模式概述 Apache Hadoop YARN 在许多数据处理框架中都很流行。 Flink 服务提交给 YARN 的 ResourceManager后者会在 YARN NodeManagers 管理的机器上生成容器。 Flink 将其 JobManager 和 TaskManager 实例部署到这些容器中。 Flink 可根据在 JobManager 上运行的作业所需的处理插槽数量动态分配和取消分配 TaskManager 资源。 Flink on Yarn的部署模式包括三种方式Application Mode、Per-Job Mode、Session Mode。对于生成环境来说更推荐使用Application Mode或Per-Job Mode因为这两种模式能够提供更好的应用隔离性。 Application Mode Application Mode模式将在 YARN 上启动一个 Flink 集群应用程序 jar 的 main() 方法将在 YARN 的 JobManager 上执行。 应用程序一旦完成群集就会关闭。 该种方式相比Per-Job模式来说将应用main()方法的执行StreamGraph、JobGraph的生成放在了Flink集群侧来实现。Per-Job Mode Per-job 模式将在 YARN 上启动一个 Flink 集群在客户端生成StreamGraph、JobGraph并上传依赖项。最后将 JobGraph 提交给 YARN 上的 JobManager。 如果通过—detached参数配置了分离模式则客户端将在提交被接受后立即停止。Session Mode Session部署模式会在YARN上部署一个长期运行的Flink集群会话该会话可以接受并执行多个Flink作业。 Session部署模式包含两种操作模式 attach modedefault执行yarn-session.sh文件在Yarn上启动Flink集群启动后客户端会一致运行来追踪/监听集群状态。一旦集群异常客户端会获取异常信息并展示。如果客户端异常终止了则会发送signal到Flink集群此时Flink集群同样也会终止。detach mode 使用-d or --detached参数设置。在这种模式下当执行yarn-session.sh文件在Yarn上启动Flink集群后客户端会直接返回。要停止 Flink 群集需要再次调用客户端或 YARN 工具。 三种提交模式的对比 由bin/flink.sh脚本可知客户端提交过程统一由org.apache.flink.client.cli.CliFronted入口类触发。Per-Job模式和Session模式下Flink应用main方法都会在客户端执行。客户端解析生成JobGraph后会将依赖项和JobGraph序列化后的二进制数据一起发往集群上。当客户端机器上有大量作业提交时需要大量的网络带宽下载依赖项并将二进制文件发送到集群会造成客户端消耗大量的资源。尤其在大量用户共享客户端时问题更加突出。为解决该问题社区提出了Application模式将Flink应用main方法触发过程后置到了JobManager生成过程中以此将带宽压力分散到集群各个节点上。 鉴于Application部署模式的优势本文会以Application部署模式的源码来进行解析探究Flink以Application模式提交任务到Yarn集群中所经过的大致流程为我们理解Flink On Yarn的部署有一个更深入和清晰的认识。 二、Flink Application部署模式源码解析 一CliFronted入口类 本节以Application部署模式为例介绍Flink On Yarn的客户端提交源码流程。正如上文说的由bin/flink.sh脚本可知客户端提交过程统一由org.apache.flink.client.cli.CliFronted入口类触发为此我们首先进入到该方法的源码中来观察下该入口类的实现逻辑 /** Submits the job based on the arguments. */ public static void main(final String[] args) {EnvironmentInformation.logEnvironmentInfo(LOG, Command Line Client, args);// 1. find the configuration directory// 用来获取配置目录该目录通常包含Flink的配置文件如flink-conf.yaml。final String configurationDirectory getConfigurationDirectoryFromEnv();// 2. load the global configuration// 加载flink的全局配置final Configuration configuration GlobalConfiguration.loadConfiguration(configurationDirectory);// 3. load the custom command lines// 加载自定义的命令行配置final ListCustomCommandLine customCommandLines loadCustomCommandLines(configuration, configurationDirectory);int retCode 31;try {// 实例化了CliFronted对象CliFronted是Flink为CLI客户端提供的API它提供了一系列的操作例如作业的提交取消以及打印job的状态等。final CliFrontend cli new CliFrontend(configuration, customCommandLines);SecurityUtils.install(new SecurityConfiguration(cli.configuration));// 启动Flink作业的入口parseAndRun方法会解析命令行参数并启动Flink作业。retCode SecurityUtils.getInstalledContext().runSecured(() - cli.parseAndRun(args));} catch (Throwable t) {final Throwable strippedThrowable ExceptionUtils.stripException(t, UndeclaredThrowableException.class);LOG.error(Fatal error while running command line interface., strippedThrowable);strippedThrowable.printStackTrace();} finally {System.exit(retCode);} }上述代码是CliFronted的入口main方法该方法首先根据Flink的配置路径加载全局配置比如flink-conf.xml配置文件接着加载自定义命令行配置并实例化了CliFronted对象。CliFronted是Flink为CLI客户端提供的API它提供了一系列的操作例如作业的提交取消以及打印job的状态等。最后调用cli.parseAndRun(args)方法该方法会解析命令行参数并启动Flink作业。 在parseAndRun方法中会根据传入参数的第一个参数值来决定Flink集群的部署模式 run-application则会进入到CliFronted类的runApplication方法中执行Application部署流程。run则会进入到CliFronted类的run方法中在客户端执行作业的main方法利用反射来执行 这也是为什么我在使用命令行以Application模式部署Flink集群时命令的开始要用以下形式 /bin/flink run-application -t yarn-application...二runApplication 接下来我们继续进入到runApplicaiton方法来看看它的实现逻辑 protected void runApplication(String[] args) throws Exception {LOG.info(Running run-application command.);// 解析传入的命令行参数final Options commandOptions CliFrontendParser.getRunCommandOptions();final CommandLine commandLine getCommandLine(commandOptions, args, true);// 如果命令行参数中包含帮助选项-h/--help则调用下述方法打印帮助信息并返回if (commandLine.hasOption(HELP_OPTION.getOpt())) {CliFrontendParser.printHelpForRunApplication(customCommandLines);return;}// 验证并获取激活的自定义命令行 CustonCommandLine是Flink用来处理不同部署模式的工具例如Yarn,Standlone等以便针对不同模式解析对应的特定设置和参数final CustomCommandLine activeCommandLine validateAndGetActiveCommandLine(checkNotNull(commandLine));// 初始化ApplicationClusterDeployer实例 这是Flink用来启动Application的工具final ApplicationDeployer deployer new ApplicationClusterDeployer(clusterClientServiceLoader);final ProgramOptions programOptions;final Configuration effectiveConfiguration;// No need to set a jarFile path for Pyflink job.if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {programOptions ProgramOptionsUtils.createPythonProgramOptions(commandLine);effectiveConfiguration getEffectiveConfiguration(activeCommandLine,commandLine,programOptions,Collections.emptyList());} else {programOptions new ProgramOptions(commandLine);programOptions.validate();final URI uri PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());effectiveConfiguration getEffectiveConfiguration(activeCommandLine,commandLine,programOptions,Collections.singletonList(uri.toString()));}// 根据programOptions获取程序参数和入口类名来创建ApplicationConfiguration实例final ApplicationConfiguration applicationConfiguration new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());// 最后调用deployer.run()来运行应用。这一步通常包括联系Flink集群提交应用程序并安排其在集群中执行。deployer.run(effectiveConfiguration, applicationConfiguration); }上述代码的实现流程与原理如下所示 解析命令行参数首先调用getCommandLine函数解析传入的命令行参数args。 处理帮助选项如果命令行参数中包含帮助选项(-h/–help)则调用CliFrontendParser.printHelpForRunApplication打印帮助信息并返回。 获取激活的CustomCommandLine通过validateAndGetActiveCommandLine函数获取激活的自定义命令行CustomCommandLine。CustomCommandLine是Flink用来处理不同部署模式的工具例如YarnStandalone等以便于针对不同模式解析对应的特定设置和参数。 部署器配置初始化ApplicationClusterDeployer实例这是Flink用来启动Application的工具。 提取程序选项和计算有效配置区分Python作业和其他作业生成对应的ProgramOptions并验证其有效性。此外根据激活的命令行、解析得到的命令行参数和程序选项计算出有效的配置effectiveConfiguration。 构造应用配置使用从ProgramOptions中获取的程序参数和入口点类名创建ApplicationConfiguration实例。 运行应用最后调用deployer.run()来运行应用。这一步通常包括联系Flink集群提交应用程序并安排其在集群中执行。 ProgramOptions.entryPointClass的成员值是flink命令行 -c 选项指定的Flink应用入口类com.xxx.xxx.FlinkApplicationDemo后续会以反射的形式触发main()方法的执行。 三ClusterDescriptor.deployApplicationCluster 上面代码中deployer.run(...)方法负责加载Yarn Application模式客户端信息等。 首先代码会根据configuration配置信息来获取ClusterClientFactory对象获取的逻辑过程是根据configuration配置中的execution.target参数来决定的。 当执行命令行bin/flink run时 execution.target参数对应的枚举值可以如下 remotelocalyarn-per-jobyarn-sessionkubernetes-session 当执行命令行bin/flink run-application时execution.target参数对应的枚举值可以如下yarn-applicationkubernetes-application 当execution.target参数为yarn-application时Flink便会生成相应的YarnClusterClientFactory客户端工厂类然后调用该工厂类的createClusterDescriptor方法该方法中会新建YarnClient实例YarnClient实例负责在客户端提交Flink应用程序并最终生成ClusterDescriptor实例该实例包含用于在Yarn上部署Flink集群的部署信息Descriptor。 Override public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {checkNotNull(configuration);final String configurationDirectory configuration.get(DeploymentOptionsInternal.CONF_DIR);YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);return getClusterDescriptor(configuration); }private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {final YarnClient yarnClient YarnClient.createYarnClient();final YarnConfiguration yarnConfiguration Utils.getYarnAndHadoopConfiguration(configuration);yarnClient.init(yarnConfiguration);yarnClient.start();return new YarnClusterDescriptor(configuration,yarnConfiguration,yarnClient,YarnClientYarnClusterInformationRetriever.create(yarnClient),false); } 有了该实例后会调用deployApplicationCluster方法来部署Application模式的Flink集群。集群将在提交应用程序时创建并在应用程序终止时拆除。此外应用程序用户代码的{code main()}将在集群上执行而不是在客户端上执行。 YarnClusterDescriptor.deployApplicationCluster(…)方法调用过程如下 (1)、YarnClusterDescriptor.deployApplicationCluster(…);进行一些配置和检查并调用deployInternal(…)方法。 (2)、YarnClusterDescriptor.deployInternal(…); 其中最重要的方法是deployInternal方法 四YarnClusterDescriptor.deployInternal 在该方法中首先会判断Hadoop集群是否启用了Kerberos安全认证如果开启了则Flink会首先确认当前用户是否拥有有效的kerberos凭证。如果无效则会抛出异常部署作业失败。 紧接着进行资源检查和部署模式判断。 在validateClusterResources方法中会根据配置的JobManager和TaskManager的资源大小与集群资源进行比对。 如果JobManager的配置内存大小 Yarn配置的最小调度分配内存(yarn.scheduler.minimum-allocation-mb参数默认1024MB)则JobManager的内存大小会设置为该配置值。如果JobManager大小 YARN 集群能够提供的单个容器的最大资源则抛出异常The cluster does not have the requested resources for the JobManager available!如果TaskManager大小 YARN 集群能够提供的单个容器的最大资源则抛出异常The cluster does not have the requested resources for the TaskManagers available!如果TaskManager大小 当前YARN集群剩余资源单个任务容器分配的最大资源容量则会打印告警日志The requested amount of memory for the TaskManagers is more than the largest possible YARN container: freeClusterResources.containerLimit如果JobManager大小 当前YARN集群剩余资源单个任务容器分配的最大资源容量则会打印告警日志The requested amount of memory for the JobManager is more than the largest possible YARN container: freeClusterResources.containerLimit 经过资源检查后会将最后确定的JobManager和TaskManager资源保存在ClusterSpecification对象中。 在部署模式决定中Flink 提供了两种部署模式Detach模式和Non-Detach模式。如果是 Detach模式Flink 作业提交到YARN后客户端可以直接退出而作业将继续在YARN集群上运行。而在 Non-Detach模式下客户端将持续等待作业执行完成。 然后就到了一个非常重要的方法中startAppMaster。 会根据上面决定的ClusterSpecification资源实例启动用于管理Flink作业的Application Master。 startAppMaster方法比较长。 这段代码主要是用于启动Flink在YARN集群上的Application Master的过程代码中包含了几个主要部分 首先核心的首步骤是初始化文件系统并获取对应的 FileSystem 实例。代码检查了文件系统的类型如果是本地文件系统类型file://开头会抛出警告因为Flink在YARN上运行需要分布式文件系统来存储文件。然后获取了用于提交应用程序的 ApplicationSubmissionContext并将 Flink 应用所需的各种文件如jar包、配置文件等上传到HDFS并将这些文件的HDFS路径作为本地资源 LocalResources添加到ApplicationSubmissionContext里。在文件上传阶段包括了一系列复杂的步骤首先是将 flink 配置、job graph、用户 jar、依赖库等上传到HDFS并将这些文件的路径添加到应用的classpath其次如果设置了 security options例如Kerberos认证信息会将相关文件也上传到HDFS并且对配置了Kerberos认证的 flink 应用会从 YARN 获取 HDFS delegation tokens。在收集完上述一系列依赖文件后final ContainerLaunchContext amContainer setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec) 负责设置启动ApplicationMaster的命令操作。设置ApplicationMaster的环境变量诸如_FLINK_CLASSPATH、_FLINK_DIST_JAR(Flink jar resource location (in HDFS))、KRB5_PATH、_YARN_SITE_XML_PATH等环境变量。最后调用amContainer.setEnvironment(appMasterEnv);方法进行设置。接着会将上述配置好的amContainer实例放入ApplicationSubmissionContext对象中以及ApplicationName和所需的资源大小最终交给交给YarnClient去提交并随后通过周期性地获取应用状态来等待应用处于RUNNING或FINISHED状态完成应用的提交过程。如果在这一系列操作中有任何异常或错误发生会触发失败保护钩子 DeploymentFailureHook进行必要的清理工作。 上面这段代码体现了 Flink on YARN 的工作原理Flink 通过 YARN Client 提交应用启动 Application Master 来进行资源申请和任务调度这是典型的 YARN 应用程序模型。各种文件包括 flink 本身、用户 jar、配置文件等都被上传到HDFS然后再从HDFS分发到运行任务的 YARN 容器中这样做是为了实现文件的分布式共享并且利用了 YARN 的 LocalResource 机制来进行文件的分发。 对于第四点中的setupApplicationMasterContainer方法该方法构造了ApplicationMaster的命令行启动命令如下所示 ContainerLaunchContext setupApplicationMasterContainer(String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) {// ------------------ Prepare Application Master Container ------------------------------// respect custom JVM options in the YAML fileString javaOpts flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() 0) {javaOpts flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);}// krb5.conf file will be available as local resource in JM/TM containerif (hasKrb5) {javaOpts -Djava.security.krb5.confkrb5.conf;}// Set up the container launch context for the application masterContainerLaunchContext amContainer Records.newRecord(ContainerLaunchContext.class);final MapString, String startCommandValues new HashMap();startCommandValues.put(java, $JAVA_HOME/bin/java);String jvmHeapMem JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);startCommandValues.put(jvmmem, jvmHeapMem);startCommandValues.put(jvmopts, javaOpts);startCommandValues.put(logging, YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));startCommandValues.put(class, yarnClusterEntrypoint);startCommandValues.put(redirects,1 ApplicationConstants.LOG_DIR_EXPANSION_VAR /jobmanager.out 2 ApplicationConstants.LOG_DIR_EXPANSION_VAR /jobmanager.err);String dynamicParameterListStr JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);startCommandValues.put(args, dynamicParameterListStr);final String commandTemplate flinkConfiguration.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);final String amCommand BootstrapTools.getStartCommand(commandTemplate, startCommandValues);amContainer.setCommands(Collections.singletonList(amCommand));LOG.debug(Application Master start command: amCommand);return amContainer; }启动命令的参数包括以下部分 “java”Java二进制文件的路径。一般来说在YARN容器中Java的路径会被设置为$JAVA_HOME/bin/java。“jvmmem”JVM参数主要是内存参数比如最大堆内存、最小堆内存等。这些参数会基于Flink配置以及JobManager的内存配置来生成。“jvmopts”JVM选项。这些选项来自Flink配置文件中设置的JVM选项以及若存在Kerberos krb5.conf文件还会添加-Djava.security.krb5.confkrb5.conf。“logging”日志配置项用于配置Flink的日志选项。“class”启动类即YARN集群入口点类名yarnClusterEntrypoint。“redirects”输出重定向的参数将stdout输出流和stderr错误流重定向到日志文件中。“args”传递给启动类的参数主要是JobManager的动态配置参数。 ⠀这些参数最后会填入一个启动命令模板通常为%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%来生成实际启动Flink应用的命令。 启动后的ApplicationMaster在YARN集群上起着以下的关键作用 作为应用程序的主控制器管理和监视应用程序的执行。负责请求YARN ResourceManager分配所需的资源例如容器。启动和监视任务执行器(TaskExecutor)它们在分配的容器中运行。与Flink的client例如命令行界面或Web界面以及ResourceManager进行交互提供应用程序的状态和进度信息。在应用程序出现异常或失败时它可以选择重新请求资源并重启失败的任务提供了一定程度的错误恢复能力。 ⠀因此Application Master是Flink在YARN上运行的关键组件它负责管理Flink应用程序的生命周期和资源。 五YarnApplicationClusterEntryPoint 在上面的setupApplicationMasterContainer方法中我们说该方法构建了ApplicationMaster的启动命令。从该命令行中可以看到命令行的启动入口类为yarnClusterEntrypoint参数对于Yarn Application部署模式来说参数对应的入口类即为YarnApplicationClusterEntryPoint。在第四部分的分析中当通过yarnClient将ApplicationMaster提交到Yarn集群后便会申请Container来执行ApplicationMaster执行该入口类。 为此接下来我们来分析一下YarnApplicationClusterEntryPoint入口类的执行逻辑。 public static void main(final String[] args) {// startup checks and loggingEnvironmentInformation.logEnvironmentInfo(LOG, YarnApplicationClusterEntryPoint.class.getSimpleName(), args);SignalHandler.register(LOG);JvmShutdownSafeguard.installAsShutdownHook(LOG);MapString, String env System.getenv();// 获取工作路径final String workingDirectory env.get(ApplicationConstants.Environment.PWD.key());Preconditions.checkArgument(workingDirectory ! null,Working directory variable (%s) not set,ApplicationConstants.Environment.PWD.key());try {YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);} catch (IOException e) {LOG.warn(Could not log YARN environment information., e);}final Configuration dynamicParameters ClusterEntrypointUtils.parseParametersOrExit(args,new DynamicParametersConfigurationParserFactory(),YarnApplicationClusterEntryPoint.class);final Configuration configuration YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);PackagedProgram program null;try {// 获取用户应用程序jar程序参数、入口类名等信息封装为PackagedProgram实例program getPackagedProgram(configuration);} catch (Exception e) {LOG.error(Could not create application program., e);System.exit(1);}try {configureExecution(configuration, program);} catch (Exception e) {LOG.error(Could not apply application configuration., e);System.exit(1);}YarnApplicationClusterEntryPoint yarnApplicationClusterEntrypoint new YarnApplicationClusterEntryPoint(configuration, program);// 执行Application Cluster ClusterEntrypoint.runClusterEntrypoint(yarnApplicationClusterEntrypoint); } 上面这段代码中使用getPackagedProgram(configuration)方法获取用户应用程序jar程序参数、入口类名等信息封装为PackagedProgram实例便于后续调用。 最后调用runClusterEntrypoint方法启动执行Application Cluster集群。 ClusterEntrypoint.runClusterEntrypoint(...)方法的调用链路如下 ClusterEntrypoint.runClusterEntrypoint(...)ClusterEntrypoint.startCluster(...)ClusterEntrypoint.runCluster(...)DispatcherResourceManagerComponentFactory.create(…) 在DispatcherResourceManagerComponentFactory.create方法中启动了一系列服务比如 LeaderRetrievalServiceWebMonitorEndpointResourceManagerServiceDispatcherRunner 本流程中主要需要关注的服务是DispatcherRunner该方法中会调用dispatcherRunnerFactory.createDispatcherRunner来初始化dispatchRunner实例dispatcherRunner实例负责dispatcher组件的高可用leader选举操作同时dispatcher组件负责触发Flink用户应用main(…)方法执行。 在创建DispatchRunner的过程中包含高可用Leader选举过程经过一系列的方法链调用会选举出一个Leader DispatchRunner服务来负责后续的处理流程。 DispatcherResourceManagerComponentFactory.createDispatcherRunnerDefaultDispatcherRunner.create()DispatcherRunnerLeaderElectionLifecycleManager.createFor()DefaultLeaderElectionService.start()LeaderElectionDriverFactory.createLeaderElectionDriver()new ZooKeeperLeaderElectionDriverLeaderLatch.start()LeaderLatch.internalStart()LeaderLatch.reset()LeaderLatch.setLeadership()ZooKeeperLeaderElectionDriver.isLeader()DefaultLeaderElectionService.onGrantLeadership()DefaultDispatcherRunner.grantLeadership()DefaultDispatcherRunner.startNewDispatcherLeaderProcess() 选举为leader的DefaultDispatcherRunner实例候选者在回调动作过程中会一直调用到上面的grantLeadership(…)方法并在startNewDispatcherLeaderProcess(…)方法中生成dispatcherLeaderProcess表示一个Ledaer Dispatcher进程来提供服务并通过newDispatcherLeaderProcess::start方法来启动执行该服务的后续处理流程。Leader候选者回调动作触发过程会另起篇幅详细讲解此处先这样理解。 在后续的处理流程中我们需要关注的点是在何时触发用户应用程序的main方法执行为此继续深入以下调用链 AbstractDispatcherLeaderProcess.startInternal()SessionDispatcherLeaderProcess.onStart()SessionDispatcherLeaderProcess.createDispatcherIfRunning()SessionDispatcherLeaderProcess.createDispatcher()ApplicationDispatcherGatewayServiceFactory.create()new ApplicationDispatcherBootstrap(...) 上述调用链中createDispatcher(…)方法会调用dispatcherGatewayServiceFactory.create(…)方法dispatcherGatewayServiceFactory实际类型是ApplicationDispatcherGatewayServiceFactory。在dispatcherGatewayServiceFactory.create(…)方法中新建ApplicationDispatcherBootstrap实例。 在ApplicationDispatcherBootstrap实例中继续通过以下方法调用链fixJobIdAndRunApplicationAsync(…) - runApplicationAsync(…) - runApplicationEntryPoint(…) - ClientUtils.executeProgram(…) - program.invokeInteractiveModeForExecution() - callMainMethod(mainClass, args) - mainMethod.invoke(null, (Object) args)触发Flink应用main(…)方法的执行。 ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync()ApplicationDispatcherBootstrap.runApplicationAsync()ApplicationDispatcherBootstrap.runApplicationEntryPoint()ClientUtils.executeProgram()PackagedProgram.invokeInteractiveModeForExecution()PackagedProgram.callMainMethod()mainMethod.invoke(null, (Object) args); 最终在ApplicationDispatcherBootstrap类的实现中我们找到了用户应用程序的main方法执行入口。 三、回顾与总结 回顾一下上面的整体流程首先我们通过ApplicationMaster的启动命令找到AM组建执行的入口类为YarnApplicationClusterEntryPoint接着在启动集群时我们发现Flink会初始化一些诸如LeaderRetrievalService、WebMonitorEndpoint、ResourceManagerService、DispatcherRunner的服务这些服务分别发挥不同的用途与Yarn和Flink集群进行交互。在本次分析过程中我们着重探究了DispatcherRunner服务的创建流程。 首先会执行高可用的选举流程最终选举出一个Leader DispatcherRunner来执行服务。选举完成后该Leader DispatchRunner会调用ClientUtils.executeProgram方法从封装好的PackagedProgram实例中获取用户应用程序的入口类mainClass以及程序入参并最终利用反射触发mainClass的main方法的执行完成用户自定义Flink应用的执行。 以上就是主要的Flink On Yarn客户端作业的提交过程解析。这个提交过程相对来说还是比较复杂的包含着很多部署配置参数资源以及权限的校验和分配ApplicationMaster的提交启动并伴随AM启动后执行的一系列Flink服务初始化以及我们关心的用户应用程序的调用入口发现了在Application的部署模式下用户应用程序的调用是在集群侧也就是Leader DispatchRunner服务中完成的。 当然DispatchRunner服务负责的任务远不止于此上述流程中还有更多的细节等待我们去挖掘和学习这篇文章可能只是让我们对提交流程有了一个初步的大体认识对于更多深入的部分需要我们不断思考不断挖掘也欢迎大家交流观点和看法感谢
http://www.dnsts.com.cn/news/182695.html

相关文章:

  • 锡盟本地网站建设购物网站 英文介绍
  • 网站内容计划wordpress悬浮广告
  • 英文建站软件公共营养师报考条件
  • 可以做防盗水印的网站sz住房和城乡建设部网站
  • html5网站素材域名注册及网站建设
  • 做静态网站软件自媒体账号下载注册
  • 做卫浴软管的网站网络运维课程
  • 建设数字官方网站羽毛球赛事2023赛程
  • php搭建网站后台wordpress 数据库备份插件下载
  • wordpress销售插件360优化大师官方下载
  • 网站模板前台后台logo字体设计在线生成
  • jsp做的简单的图书馆网站做宠物食品的网站
  • 织梦网站文章内容模板资料代做网站
  • 延吉市网站建设双语企业网站源码
  • 横向拖动的网站在线设计培训
  • 云服务器可以做两个网站吗正规的金融行业网站开发
  • 网站单页郑州百姓网招聘
  • 做网站如何接单子买国外的东西在哪个平台
  • 深圳seo优化外包公司合肥关键词优化平台
  • wordpress站群教程桂林象鼻山门票价格
  • 做兼职网上哪个网站好网页游戏开服表 怎么删除
  • 如何自己搭建网站专业写作网站
  • 厦门+外贸公司做网站厦门市建设局与管理局网站
  • 昆明网站建设代理江门外贸集团有限公司英文网站
  • 网站卖东西怎么做的郑州妇科医院哪个医院最好
  • 好的俄文网站设计ppt怎么做
  • 江西省水利水电建设集团招标网站东莞汽车总站停止营业
  • 网站建设论文答辩ppt网站建设logo设计
  • 外包做的网站可以直接去收录吗腾讯街景地图实景下载
  • 设计网站意味着什么工作室是个体户还是公司