mvc 手机网站开发,免费ppt资源网站,设计一个软件需要多少钱,网站多域名怎么做文章目录 前言StreamTask 部署启动Task 线程启动StreamTask 初始化StreamTask 执行 前言
Flink的StreamTask的启动和执行是一个复杂的过程#xff0c;涉及多个关键步骤。以下是StreamTask启动和执行的主要流程#xff1a;
初始化#xff1a;StreamTask的初始化阶段涉及多个… 文章目录 前言StreamTask 部署启动Task 线程启动StreamTask 初始化StreamTask 执行 前言
Flink的StreamTask的启动和执行是一个复杂的过程涉及多个关键步骤。以下是StreamTask启动和执行的主要流程
初始化StreamTask的初始化阶段涉及多个任务包括Operator的配置、task特定的初始化以及初始化算子的State等。在这个阶段Flink将业务处理函数抽象为operator并通过operatorChain将业务代码串起来执行以完成业务逻辑的处理。同时还会调用具体task的init方法进行初始化。读取数据和事件StreamTask通过mailboxProcessor读取数据和事件。运行业务逻辑在StreamTask的beforeInvoke方法中主要调用生成operatorChain并执行相关的业务逻辑。这些业务逻辑可能包括Source算子和map算子等它们将被Chain在一起并在一个线程内同步执行。资源清理在执行完业务逻辑后StreamTask会进行关闭和资源清理的操作这部分在afterInvoke阶段完成。
值得注意的是从资源角度来看每个TaskManager内部有多个slot每个slot内部运行着一个subtask即每个slot内部运行着一个StreamTask。这意味着StreamTask是由TaskManagerTM部署并执行的本地处理单元。
总的来说Flink的StreamTask启动和执行是一个由多个阶段和组件协同工作的过程涉及数据的读取、业务逻辑的执行以及资源的清理等多个方面。这些步骤确保了StreamTask能够高效、准确地处理数据流并满足实时计算和分析的需求。 StreamTask 部署启动
当 TaskExecutor 接收提交 Task 执行的请求则调用
TaskExecutor.submitTask(TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,Time timeout){// 构造 Task 对象Task task new Task(jobInformation, taskInformation, ExecutionAttemptId,AllocationId, SubtaskIndex, ....);// 启动 Task 的执行task.startTaskThread();
}Task对象的构造方法
public Task(.....){
// 封装一个 Task信息对象 TaskInfoTaskInfo, JobInfoJobMasterInfo
this.taskInfo new TaskInfo(....);
// 各种成员变量赋值
......
// 一个Task的执行有输入也有输出 关于输出的抽象 ResultPartition 和
ResultSubPartitionPipelinedSubpartition
// 初始化 ResultPartition 和 ResultSubPartition
final ResultPartitionWriter[] resultPartitionWriters
shuffleEnvironment.createResultPartitionWriters(....);
this.consumableNotifyingPartitionWriters
ConsumableNotifyingResultPartitionWriterDecorator.decorate(....);
// 一个Task的执行有输入也有输出 关于输入的抽象 InputGate 和 InputChannel从上有
一个Task节点拉取数据
// InputChannel 可能有两种实现 Local Remote
// 初始化 InputGate 和 InputChannel
final IndexedInputGate[] gates shuffleEnvironment.createInputGates(.....);
// 初始化一个用来执行 Task 的线程目标对象就是 Task 自己
executingThread new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
}Task 线程启动
Task 的启动是通过启动 Task 对象的内部 executingThread 来执行 Task 的具体逻辑在 run 方法中 private void doRun() {
// 1、先更改 Task 的状态 CREATED DEPLOYING
transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING);
// 2、准备 ExecutionConfig
final ExecutionConfig executionConfig
serializedExecutionConfig.deserializeValue(userCodeClassLoader);
// 3、初始化输入和输出组件, 拉起 ResultPartition 和 InputGate
setupPartitionsAndGates(consumableNotifyingPartitionWriters,
inputGates);
// 4、注册 输出
for(ResultPartitionWriter partitionWriter :
consumableNotifyingPartitionWriters) {
taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());
} /
/ 5、初始 环境对象 RuntimeEnvironment, 包装在 Task 执行过程中需要的各种组件
Environment env new RuntimeEnvironment(jobId, vertexId, executionId,
....);
// 6、初始化 调用对象
// 两种最常见的类型 SourceStreamTask、OneInputStreamTask、
TwoInputStreamTask
// 父类 StreamTask
// 通过反射实例化 StreamTask 实例(可能的两种情况 SourceStreamTask
OneInputStreamTask)
AbstractInvokable invokable
loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// 7、先更改 Task 的状态 DEPLOYING RUNNING
transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
// 8、真正把 Task 启动起来了
invokable.invoke();
// 9、StreamTask 需要正常结束处理 buffer 中的数据
for(ResultPartitionWriter partitionWriter :
consumableNotifyingPartitionWriters) {
if(partitionWriter ! null) {
partitionWriter.finish();
}
} /
/ 10、先更改 Task 的状态 RUNNING FINISHED
transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED);StreamTask 初始化
StreamTask 初始化指的就是 SourceStreamTask 和 OneInputStreamTask 的实例对象的构建Task 这个类只是一个笼统意义上的 Task就是一个通用 Task 的抽象不管是批处理的还是流式处理的不管是 源Task 还是逻辑处理 Task 都被抽象成 Task 来进行调度执行
private SourceStreamTask(Environment env, Object lock) throws Exception {super(env,null,FatalExitExceptionHandler.INSTANCE,StreamTaskActionExecutor.synchronizedExecutor(lock));this.lock Preconditions.checkNotNull(lock);this.sourceThread new LegacySourceFunctionThread();getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);}Overrideprotected void init() {// we check if the source is actually inducing the checkpoints, rather// than the triggerSourceFunction? source mainOperator.getUserFunction();if (source instanceof ExternallyInducedSource) {externallyInducedCheckpoints true;ExternallyInducedSource.CheckpointTrigger triggerHook new ExternallyInducedSource.CheckpointTrigger() {Overridepublic void triggerCheckpoint(long checkpointId) throws FlinkException {// TODO - we need to see how to derive those. We should probably not// encode this in the// TODO - sources trigger message, but do a handshake in this task// between the trigger// TODO - message from the master, and the sources trigger// notificationfinal CheckpointOptions checkpointOptions CheckpointOptions.forConfig(CheckpointType.CHECKPOINT,CheckpointStorageLocationReference.getDefault(),configuration.isExactlyOnceCheckpointMode(),configuration.isUnalignedCheckpointsEnabled(),configuration.getAlignedCheckpointTimeout().toMillis());final long timestamp System.currentTimeMillis();final CheckpointMetaData checkpointMetaData new CheckpointMetaData(checkpointId, timestamp, timestamp);try {SourceStreamTask.super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions).get();} catch (RuntimeException e) {throw e;} catch (Exception e) {throw new FlinkException(e.getMessage(), e);}}};((ExternallyInducedSource?, ?) source).setCheckpointTrigger(triggerHook);}getEnvironment().getMetricGroup().getIOMetricGroup().gauge(MetricNames.CHECKPOINT_START_DELAY_TIME,this::getAsyncCheckpointStartDelayNanos);recordWriter.setMaxOverdraftBuffersPerGate(0);}StreamTask 执行
核心步骤如下
public final void invoke() throws Exception {
// Task 正式工作之前
beforeInvoke();
// Task 开始工作: 针对数据执行正儿八经的逻辑处理
runMailboxLoop();
// Task 要结束
afterInvoke();
// Task 最后执行清理
cleanUpInvoke();
}总结一下要点
在 beforeInvoke() 中主要是初始化 OperatorChain然后调用 init() 执行初始化然后恢复状态更改 Task 自己的状态为 isRunning true在 runMailboxLoop() 中主要是不停的处理 mail这里是 FLink-1.10 的一项改进使用了mailbox 模型来处理任务在 afterInvoke() 中主要是完成 Task 要结束之前需要完成的一些细节比如把 Buffer 中还没flush 的数据 flush 出来在 cleanUpInvoke() 中主要做一些资源的释放执行各种关闭动作set falseinterrupt shutdownclosecleanupdispose 等