当前位置: 首页 > news >正文

mvc 手机网站开发免费ppt资源网站

mvc 手机网站开发,免费ppt资源网站,设计一个软件需要多少钱,网站多域名怎么做文章目录 前言StreamTask 部署启动Task 线程启动StreamTask 初始化StreamTask 执行 前言 Flink的StreamTask的启动和执行是一个复杂的过程#xff0c;涉及多个关键步骤。以下是StreamTask启动和执行的主要流程#xff1a; 初始化#xff1a;StreamTask的初始化阶段涉及多个… 文章目录 前言StreamTask 部署启动Task 线程启动StreamTask 初始化StreamTask 执行 前言 Flink的StreamTask的启动和执行是一个复杂的过程涉及多个关键步骤。以下是StreamTask启动和执行的主要流程 初始化StreamTask的初始化阶段涉及多个任务包括Operator的配置、task特定的初始化以及初始化算子的State等。在这个阶段Flink将业务处理函数抽象为operator并通过operatorChain将业务代码串起来执行以完成业务逻辑的处理。同时还会调用具体task的init方法进行初始化。读取数据和事件StreamTask通过mailboxProcessor读取数据和事件。运行业务逻辑在StreamTask的beforeInvoke方法中主要调用生成operatorChain并执行相关的业务逻辑。这些业务逻辑可能包括Source算子和map算子等它们将被Chain在一起并在一个线程内同步执行。资源清理在执行完业务逻辑后StreamTask会进行关闭和资源清理的操作这部分在afterInvoke阶段完成。 值得注意的是从资源角度来看每个TaskManager内部有多个slot每个slot内部运行着一个subtask即每个slot内部运行着一个StreamTask。这意味着StreamTask是由TaskManagerTM部署并执行的本地处理单元。 总的来说Flink的StreamTask启动和执行是一个由多个阶段和组件协同工作的过程涉及数据的读取、业务逻辑的执行以及资源的清理等多个方面。这些步骤确保了StreamTask能够高效、准确地处理数据流并满足实时计算和分析的需求。 StreamTask 部署启动 当 TaskExecutor 接收提交 Task 执行的请求则调用 TaskExecutor.submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId,Time timeout){// 构造 Task 对象Task task new Task(jobInformation, taskInformation, ExecutionAttemptId,AllocationId, SubtaskIndex, ....);// 启动 Task 的执行task.startTaskThread(); }Task对象的构造方法 public Task(.....){ // 封装一个 Task信息对象 TaskInfoTaskInfo, JobInfoJobMasterInfo this.taskInfo new TaskInfo(....); // 各种成员变量赋值 ...... // 一个Task的执行有输入也有输出 关于输出的抽象 ResultPartition 和 ResultSubPartitionPipelinedSubpartition // 初始化 ResultPartition 和 ResultSubPartition final ResultPartitionWriter[] resultPartitionWriters shuffleEnvironment.createResultPartitionWriters(....); this.consumableNotifyingPartitionWriters ConsumableNotifyingResultPartitionWriterDecorator.decorate(....); // 一个Task的执行有输入也有输出 关于输入的抽象 InputGate 和 InputChannel从上有 一个Task节点拉取数据 // InputChannel 可能有两种实现 Local Remote // 初始化 InputGate 和 InputChannel final IndexedInputGate[] gates shuffleEnvironment.createInputGates(.....); // 初始化一个用来执行 Task 的线程目标对象就是 Task 自己 executingThread new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); }Task 线程启动 Task 的启动是通过启动 Task 对象的内部 executingThread 来执行 Task 的具体逻辑在 run 方法中 private void doRun() { // 1、先更改 Task 的状态 CREATED DEPLOYING transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING); // 2、准备 ExecutionConfig final ExecutionConfig executionConfig serializedExecutionConfig.deserializeValue(userCodeClassLoader); // 3、初始化输入和输出组件, 拉起 ResultPartition 和 InputGate setupPartitionsAndGates(consumableNotifyingPartitionWriters, inputGates); // 4、注册 输出 for(ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) { taskEventDispatcher.registerPartition(partitionWriter.getPartitionId()); } / / 5、初始 环境对象 RuntimeEnvironment, 包装在 Task 执行过程中需要的各种组件 Environment env new RuntimeEnvironment(jobId, vertexId, executionId, ....); // 6、初始化 调用对象 // 两种最常见的类型 SourceStreamTask、OneInputStreamTask、 TwoInputStreamTask // 父类 StreamTask // 通过反射实例化 StreamTask 实例(可能的两种情况 SourceStreamTask OneInputStreamTask) AbstractInvokable invokable loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // 7、先更改 Task 的状态 DEPLOYING RUNNING transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING); // 8、真正把 Task 启动起来了 invokable.invoke(); // 9、StreamTask 需要正常结束处理 buffer 中的数据 for(ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) { if(partitionWriter ! null) { partitionWriter.finish(); } } / / 10、先更改 Task 的状态 RUNNING FINISHED transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED);StreamTask 初始化 StreamTask 初始化指的就是 SourceStreamTask 和 OneInputStreamTask 的实例对象的构建Task 这个类只是一个笼统意义上的 Task就是一个通用 Task 的抽象不管是批处理的还是流式处理的不管是 源Task 还是逻辑处理 Task 都被抽象成 Task 来进行调度执行 private SourceStreamTask(Environment env, Object lock) throws Exception {super(env,null,FatalExitExceptionHandler.INSTANCE,StreamTaskActionExecutor.synchronizedExecutor(lock));this.lock Preconditions.checkNotNull(lock);this.sourceThread new LegacySourceFunctionThread();getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);}Overrideprotected void init() {// we check if the source is actually inducing the checkpoints, rather// than the triggerSourceFunction? source mainOperator.getUserFunction();if (source instanceof ExternallyInducedSource) {externallyInducedCheckpoints true;ExternallyInducedSource.CheckpointTrigger triggerHook new ExternallyInducedSource.CheckpointTrigger() {Overridepublic void triggerCheckpoint(long checkpointId) throws FlinkException {// TODO - we need to see how to derive those. We should probably not// encode this in the// TODO - sources trigger message, but do a handshake in this task// between the trigger// TODO - message from the master, and the sources trigger// notificationfinal CheckpointOptions checkpointOptions CheckpointOptions.forConfig(CheckpointType.CHECKPOINT,CheckpointStorageLocationReference.getDefault(),configuration.isExactlyOnceCheckpointMode(),configuration.isUnalignedCheckpointsEnabled(),configuration.getAlignedCheckpointTimeout().toMillis());final long timestamp System.currentTimeMillis();final CheckpointMetaData checkpointMetaData new CheckpointMetaData(checkpointId, timestamp, timestamp);try {SourceStreamTask.super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions).get();} catch (RuntimeException e) {throw e;} catch (Exception e) {throw new FlinkException(e.getMessage(), e);}}};((ExternallyInducedSource?, ?) source).setCheckpointTrigger(triggerHook);}getEnvironment().getMetricGroup().getIOMetricGroup().gauge(MetricNames.CHECKPOINT_START_DELAY_TIME,this::getAsyncCheckpointStartDelayNanos);recordWriter.setMaxOverdraftBuffersPerGate(0);}StreamTask 执行 核心步骤如下 public final void invoke() throws Exception { // Task 正式工作之前 beforeInvoke(); // Task 开始工作: 针对数据执行正儿八经的逻辑处理 runMailboxLoop(); // Task 要结束 afterInvoke(); // Task 最后执行清理 cleanUpInvoke(); }总结一下要点 在 beforeInvoke() 中主要是初始化 OperatorChain然后调用 init() 执行初始化然后恢复状态更改 Task 自己的状态为 isRunning true在 runMailboxLoop() 中主要是不停的处理 mail这里是 FLink-1.10 的一项改进使用了mailbox 模型来处理任务在 afterInvoke() 中主要是完成 Task 要结束之前需要完成的一些细节比如把 Buffer 中还没flush 的数据 flush 出来在 cleanUpInvoke() 中主要做一些资源的释放执行各种关闭动作set falseinterrupt shutdownclosecleanupdispose 等
http://www.dnsts.com.cn/news/67089.html

相关文章:

  • 如何让网站被百度快速收录攻击Wordpress网站
  • 找个网站怎么这么难wordpress滑动门
  • 有哪些做包装设计网站好些网上书店网站建设实训报告总结
  • 中国科协网站建设招标网站建设的现状和未来
  • 阿里云免费建站wordpress快
  • 禁止拿我们的网站做宣传网站建设需求有什么用
  • 如何在外管局网站做延期收汇免费建网站的作用
  • 免费网站设计平台惠州做棋牌网站建设
  • 做网站的前期准备免费做图片的网站有哪些
  • 厦门 网站建设 闽icpwordpress熊掌号关注
  • 网站权重排行免费推广渠道有哪些方式
  • 知名网站开发哪家好搬家公司网站制作
  • 长春网站建设团队商标自助查询系统官网
  • 网页游戏交易网站wordpress 4.5.9
  • 公司网站的推广长沙网站优化外包公司
  • 做视频在哪个网站找素材河南热点新闻事件
  • 超级简历网站网站制作公司智能 乐云践新
  • 那种投票网站里面怎么做网站建设 软件有哪些内容
  • 深圳个性化网站建设公司编写一个android应用程序
  • 泰来县城乡建设局网站湖州网站建设公司
  • 网站平台建设意见医生可以自己做网站吗
  • 有什么教做甜品的网站重庆网站推广公司电话
  • 免费响应式模板网站模板下载中国建设行业峰会网站
  • 建设网站机构wordpress爆破工具
  • xml做网站大型购物网站建设费用
  • 何炅做的代言网站什么是全网营销推广
  • 成都企业网站建设费用wordpress注册未发送邮件
  • 网站开发电销常遇到问题怎样设计网站模板
  • 做网站用啥软件好网站推广平台
  • 如何做国外的社交网站wordpress重定向次数过多