网站上线要准备什么,做视频网站注意什么,手机网站开发c 教程,西安百度公司怎么样Flink源码解析之#xff1a;如何根据算法生成StreamGraph过程
在我们日常编写Flink应用的时候#xff0c;会首先创建一个StreamExecutionEnvironment.getExecutionEnvironment()对象#xff0c;在添加一些自定义处理算子后#xff0c;会调用env.execute来执行定义好的Flin…Flink源码解析之如何根据算法生成StreamGraph过程
在我们日常编写Flink应用的时候会首先创建一个StreamExecutionEnvironment.getExecutionEnvironment()对象在添加一些自定义处理算子后会调用env.execute来执行定义好的Flink应用程序。我们知道Flink在实际执行任务前会根据应用生成StreamGraph再生成JobGraph最终提交到集群中进行执行。那么Flink是如何将我们自定义的应用程序转换成StreamGraph的呢这一过程中实现了什么逻辑 接下来我们通过源码来深入了解一下。
在本次分析源码的过程中主要涉及到StreamExecutionEnvironment、DataStream、Transformation、StreamGraph、StreamGraphGenerator几下个类这里先汇总介绍一下在生成StreamGraph过程中这些类的交互处理流程有了这个印象后再阅读下面的源码流程更容易串起来和理解。 一、Function - Transformation转换
在我们编写Flink应用程序时会自定义一系列算子拼接在数据流链路中比如当我们调用datastream.flatMap(flatMapFunction)方法时就会将传入的算子函数转换成Transformation对象添加到StreamExecutionEnvironment对象的ListTransformation? transformations 属性中。接下来我们就来看一下是如何进行转换的。
首先进入到DataStream类中找到比如flatMap方法
public R SingleOutputStreamOperatorR flatMap(FlatMapFunctionT, R flatMapper) {TypeInformationR outType TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true);return flatMap(flatMapper, outType);
}public R SingleOutputStreamOperatorR flatMap(FlatMapFunctionT, R flatMapper, TypeInformationR outputType) {return transform(Flat Map, outputType, new StreamFlatMap(clean(flatMapper)));
}
上面代码中将flatMap封装到StreamFlatMap方法中用于表示一个StreamOperator操作符。StreamFlatMap操作符会针对每一个StreamRecord通过processElement方法调用用户函数去处理该流数据:
Override
public void processElement(StreamRecordIN element) throws Exception {collector.setTimestamp(element);// 调用用户函数执行数据流元素处理逻辑userFunction.flatMap(element.getValue(), collector);
}
回到DataSteram的FlatMap方法中我们继续看transform方法里做了什么
PublicEvolving
public R SingleOutputStreamOperatorR transform(String operatorName,TypeInformationR outTypeInfo,OneInputStreamOperatorT, R operator) {return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
上面根据传入的StreamOperator创建一个SimpleOperatorFactory对象StreamOperatorFactory是一个工厂类其主职责是为特定类型的StreamOperator在运行时创建实例。它还提供了其他附加功能如做一些操作配置比如chaining。
接下来继续进入doTransform方法
protected R SingleOutputStreamOperatorR doTransform(String operatorName,TypeInformationR outTypeInfo,StreamOperatorFactoryR operatorFactory) {// read the output type of the input Transform to coax out errors about MissingTypeInfo// 获取当前数据流上一个Transformation的输出类型。这样可以做类型检查并在类型信息缺失时提前引发错误。transformation.getOutputType();// 创建一个新的OneInputTransformation这个新的OneInputTransformation即为要添加的新操作// 对于flatMap操作来说不存在分区所以上下游是一对一的关系所以这里用的是OneInputTransformationOneInputTransformationT, R resultTransform new OneInputTransformation(this.transformation,operatorName,operatorFactory,outTypeInfo,environment.getParallelism());// 创建一个SingleOutputStreamOperator对象该对象将接收新加入的操作的输出SuppressWarnings({unchecked, rawtypes})SingleOutputStreamOperatorR returnStream new SingleOutputStreamOperator(environment, resultTransform);// 将新的Transformation添加到当前的执行环境中这个操作将并入到计算流图中。getExecutionEnvironment().addOperator(resultTransform);// 代表了新添加的操作输出结果的数据流便于在这个数据流上继续构建后续的计算。return returnStream;
}上述代码内容就是将userFunction转换成Transformation的具体执行逻辑了因为我们最初举例是flatMap方法因此在将userFunction转换成Transformation时会使用OneInputTransformation来表示。同时这里可以看到在转换完成后会调用getExecutionEnvironment().addOperator(resultTransform)将得到的Transformation添加到当前执行环境的计算流图中实际上也就是添加到我们刚刚所说的执行环境的ListTransformation? transformations属性中了。
二、StreamGraphGenerator生成StreamGraph
在将用户函数userFunction转换成Transformation并保存到StreamExecutionEnvironment的transformations属性中后我们就收集抽象好了所有的用户函数及处理链路接下来就是根据这些封装好的Transformation来生成StreamGraph。
首先进入到StreamExecutionEnvironment的execute执行入口方法中:
public JobExecutionResult execute() throws Exception {return execute(getStreamGraph());
}Internal
public StreamGraph getStreamGraph() {return getStreamGraph(true);
}Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {final StreamGraph streamGraph getStreamGraphGenerator(transformations).generate();if (clearTransformations) {transformations.clear();}return streamGraph;
}在上面的getStreamGraph方法中使用getStreamGraphGenerator方法生成一个StreamGraphGenerator对象这里的transformations参数实际上指的就是上面保存的每个用户函数转换得到的Transformation对象。
接下来我们主要看generator方法进入到StreamGraphGenerator类中这个类也是创建StreamGraph最核心的类。
public StreamGraph generate() {// 根据不同的配置信息创建一个StreamGraph对象streamGraph new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);// 设置 StreamGraph 是否在任务结束后启用checkpoint这个布尔值从配置中获取。streamGraph.setEnableCheckpointsAfterTasksFinish(configuration.get(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));shouldExecuteInBatchMode shouldExecuteInBatchMode();configureStreamGraph(streamGraph);// 初始化一个哈希映射alreadyTransformed用于存储已经被转换过的Transformation。alreadyTransformed new HashMap();// 遍历transformations列表对每个transformation对象进行转换// 这里是转换的核心逻辑for (Transformation? transformation : transformations) {transform(transformation);}// 将slotSharingGroupResources设置为StreamGraph的资源配置。streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);setFineGrainedGlobalStreamExchangeMode(streamGraph);// 获取StreamGraph中所有的StreamNode检查它们的输入边缘是否满足禁用未对齐的checkpointing的条件如果满足条件则将边的supportsUnalignedCheckpoints属性设置为false。for (StreamNode node : streamGraph.getStreamNodes()) {if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {for (StreamEdge edge : node.getInEdges()) {edge.setSupportsUnalignedCheckpoints(false);}}}// 清理streamGraph和alreadyTransformed以释放资源并防止后续的错误使用并保存当前的streamGraph实例到builtStreamGraph中。final StreamGraph builtStreamGraph streamGraph;alreadyTransformed.clear();alreadyTransformed null;streamGraph null;// 最后返回构建好的StreamGraph。return builtStreamGraph;
}上面代码中最主要的核心逻辑在for循环遍历transformations中调用transform方法对每个Transformation对象进行转换。我们主要进入到该方法中进行分析
/*** Transforms one {code Transformation}.** pThis checks whether we already transformed it and exits early in that case. If not it* delegates to one of the transformation specific methods.*/
private CollectionInteger transform(Transformation? transform) {// 快速检查传入的 transform 对象是否已经在 alreadyTransformed 字典一个缓存中如果已存在则直接返回对应的ID这种早期退出的机制避免了对同一任务的重复转换。if (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}LOG.debug(Transforming transform);if (transform.getMaxParallelism() 0) {// if the max parallelism hasnt been set, then first use the job wide max parallelism// from the ExecutionConfig.int globalMaxParallelismFromConfig executionConfig.getMaxParallelism();if (globalMaxParallelismFromConfig 0) {transform.setMaxParallelism(globalMaxParallelismFromConfig);}}// 若 transform 对象指定了 SlotSharingGroup 那么会从 SlotSharingGroup 中提取资源并更新到 slotSharingGroupResources 中。transform.getSlotSharingGroup().ifPresent(slotSharingGroup - {final ResourceSpec resourceSpec SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) {slotSharingGroupResources.compute(slotSharingGroup.getName(),(name, profile) - {if (profile null) {return ResourceProfile.fromResourceSpec(resourceSpec, MemorySize.ZERO);} else if (!ResourceProfile.fromResourceSpec(resourceSpec, MemorySize.ZERO).equals(profile)) {throw new IllegalArgumentException(The slot sharing group slotSharingGroup.getName() has been configured with two different resource spec.);} else {return profile;}});}});// call at least once to trigger exceptions about MissingTypeInfo// 调用 transform.getOutputType() 进行安全检查确保类型信息的完整性。transform.getOutputType();// 根据 transform 对象的类型获取对应的转换逻辑 translator. SuppressWarnings(unchecked)final TransformationTranslator?, Transformation? translator (TransformationTranslator?, Transformation?)translatorMap.get(transform.getClass());// 如果找到了相应的 translator使用它进行转换否则使用旧的转换策略 legacyTransform()。CollectionInteger transformedIds;if (translator ! null) {transformedIds translate(translator, transform);} else {transformedIds legacyTransform(transform);}// 在转换完成后检查 transform 是否已经被记录在 alreadyTransformed 字典中。如果尚未记录则将转换后的对象ID添加到字典中。// need this check because the iterate transformation adds itself before// transforming the feedback edgesif (!alreadyTransformed.containsKey(transform)) {alreadyTransformed.put(transform, transformedIds);}// 将转换后产生的节点ID返回以供后续使用。return transformedIds;
}很明显这种转换不可能多次进行因为这会浪费计算资源。因此我们需要一个机制来记录哪些Transformation已经被转换过。在Flink中这是通过一个名为alreadyTransformed的哈希映射实现的。如果当前的Transformation已经存在于alreadyTransformed中那么就无需再次进行转换直接返回对应的集合即可。
接下来根据transform的具体类型从translatorMap中获取相应的translator转换器具体的translatorMap内容可以在代码中看到。找到转换器后调用translate方法来执行转换。那么我们又需要进入到translate方法中一探究竟
private CollectionInteger translate(final TransformationTranslator?, Transformation? translator,final Transformation? transform) {checkNotNull(translator);checkNotNull(transform);// 通过调用getParentInputIds()方法获取当前transform对象的所有输入父级Transformation的ID。final ListCollectionInteger allInputIds getParentInputIds(transform.getInputs());// 再次检查当前transform对象是否已在alreadyTransformed字典中如果是直接返回对应的ID。// the recursive call might have already transformed thisif (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}// 确定slotSharingGroup这是一个根据transform输入和slotSharingGroup名称决定slot sharing策略的过程。final String slotSharingGroup determineSlotSharingGroup(transform.getSlotSharingGroup().isPresent()? transform.getSlotSharingGroup().get().getName(): null,allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList()));// 创建一个TransformationTranslator.Context对象里面包含了StreamGraphslotSharingGroup和配置信息该上下文会在转换过程中使用。final TransformationTranslator.Context context new ContextImpl(this, streamGraph, slotSharingGroup, configuration);// 根据执行模式不同调用转换方法translateForBatch()或translateForStreaming()进行具体的转换工作。return shouldExecuteInBatchMode? translator.translateForBatch(transform, context): translator.translateForStreaming(transform, context);
}每一个TransformationTranslator实例都绑定了一个特定类型的Transformation的转换逻辑例如OneInputTransformationTranslatorSourceTransformation等。通过这份代码我们可以看到Flink的灵活性和可扩展性。你可以为特定的Transformation添加不同的激活逻辑或者处理逻辑。这种设计确保了Flink在处理不同类型Transformation时的高效性并且很容易添加新类型的Transformation。
这里我们仍然以OneInputTransformationTranslator的转换逻辑来举例看一下Flink的Transformation转换逻辑执行了什么操作
protected CollectionInteger translateInternal(final TransformationOUT transformation,final StreamOperatorFactoryOUT operatorFactory,final TypeInformationIN inputType,Nullable final KeySelectorIN, ? stateKeySelector,Nullable final TypeInformation? stateKeyType,final Context context) {checkNotNull(transformation);checkNotNull(operatorFactory);checkNotNull(inputType);checkNotNull(context);// 即获取 StreamGraph、slotSharingGroup 和transformation的 ID。final StreamGraph streamGraph context.getStreamGraph();final String slotSharingGroup context.getSlotSharingGroup();final int transformationId transformation.getId();final ExecutionConfig executionConfig streamGraph.getExecutionConfig();// addOperator() 方法把转换Transformation 添加到 StreamGraph 中。此操作包括transformation的 IDslotSharingGroupCoLocationGroupKey工厂类输入类型输出类型以及操作名。streamGraph.addOperator(transformationId,slotSharingGroup,transformation.getCoLocationGroupKey(),operatorFactory,inputType,transformation.getOutputType(),transformation.getName());// 如果 stateKeySelector用于从输入中提取键的函数非空使用 stateKeyType 创建密钥序列化器并在 StreamGraph 中设置用于接收单输入的状态键if (stateKeySelector ! null) {TypeSerializer? keySerializer stateKeyType.createSerializer(executionConfig);streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);}// 根据 Transformation 和 executionConfig 设置并行度。int parallelism transformation.getParallelism() ! ExecutionConfig.PARALLELISM_DEFAULT? transformation.getParallelism(): executionConfig.getParallelism();streamGraph.setParallelism(transformationId, parallelism);streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());final ListTransformation? parentTransformations transformation.getInputs();checkState(parentTransformations.size() 1,Expected exactly one input transformation but found parentTransformations.size());// 根据转换的输入和输出添加边到 StreamGraph。每个输入转换都添加一条边。for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {streamGraph.addEdge(inputId, transformationId, 0);}// 方法返回包含转换 ID 的单个元素集合。return Collections.singleton(transformationId);
}上面这段代码实际上就是构建StreamGraph的主体逻辑部分了translateInternal() 方法实现了从 Transformation 到 StreamGraph 中操作的转换。在该方法中对于每一个Transformation会调用streamGraph.addOperator方法生成一个StreamNode对象存储在StreamGraph的streamNode属性中该属性是一个MapInteger, StreamNode结构表示每个Transformation ID对应的StreamNode节点。
protected StreamNode addNode(Integer vertexID,Nullable String slotSharingGroup,Nullable String coLocationGroup,Class? extends TaskInvokable vertexClass,StreamOperatorFactory? operatorFactory,String operatorName) {if (streamNodes.containsKey(vertexID)) {throw new RuntimeException(Duplicate vertexID vertexID);}StreamNode vertex new StreamNode(vertexID,slotSharingGroup,coLocationGroup,operatorFactory,operatorName,vertexClass);streamNodes.put(vertexID, vertex);return vertex;
}看完translateInternal方法中streamGraph.addOperator的执行逻辑后接下来还需要关注的一个步骤是streamGraph.addEdge这里是连接StreamGraph中各StreamNode节点的逻辑所在
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,null,new ArrayListString(),null,null);
}在addEdgeInternal方法中会区分当前节点是虚拟节点还是物理节点从而添加物理边还是虚拟边。由于我们用OneInputTransformationTranslator会创建物理节点所以进入到创建物理边的分支代码中
private void createActualEdge(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner? partitioner,OutputTag outputTag,StreamExchangeMode exchangeMode) {// 首先通过节点ID获取上游和下游的StreamNode。StreamNode upstreamNode getStreamNode(upStreamVertexID);StreamNode downstreamNode getStreamNode(downStreamVertexID);// 检查分区器partitioner是否已经设置如果没有设置且上游节点与下游节点的并行度相等那么使用ForwardPartitioner; 如果并行度不相等则使用RebalancePartitioner。// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.if (partitioner null upstreamNode.getParallelism() downstreamNode.getParallelism()) {partitioner new ForwardPartitionerObject();} else if (partitioner null) {partitioner new RebalancePartitionerObject();}if (partitioner instanceof ForwardPartitioner) {if (upstreamNode.getParallelism() ! downstreamNode.getParallelism()) {throw new UnsupportedOperationException(Forward partitioning does not allow change of parallelism. Upstream operation: upstreamNode parallelism: upstreamNode.getParallelism() , downstream operation: downstreamNode parallelism: downstreamNode.getParallelism() You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.);}}if (exchangeMode null) {exchangeMode StreamExchangeMode.UNDEFINED;}/*** Just make sure that {link StreamEdge} connecting same nodes (for example as a result of* self unioning a {link DataStream}) are distinct and unique. Otherwise it would be* difficult on the {link StreamTask} to assign {link RecordWriter}s to correct {link* StreamEdge}.*/// 在上述配置都设置好之后创建StreamEdge对象并将其添加到上游节点的出边和下游节点的入边。int uniqueId getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();StreamEdge edge new StreamEdge(upstreamNode,downstreamNode,typeNumber,partitioner,outputTag,exchangeMode,uniqueId);getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);
}从上述代码中可以看出createActualEdge()方法实现了在StreamGraph中添加实际的边的过程这是构建Flink StreamGraph的一个重要步骤。
至此我们就看到了创建StreamGraph并根据Transformation来生成StreamNode并添加StreamEdge边的过程最终构建好一个完成的StreamGraph来表示Flink应用程序的数据流执行拓扑图。
当然这里我们只是以OneInputTransformationTranslator转换器举例来分析流程实际上其他的转换器应该会更复杂一些有兴趣的可以继续深入研究本文便不再赘述。同时本文也仍然有很多细节暂时因为理解不够深入没有涉及欢迎各位一起交流学习。
最终在我们构造好StreamGraph后就需要考虑如何将StreamGraph转换成JobGraph了下一篇将继续介绍StreamGraph - JobGraph的转换。