个人简介网站源码,宁德网站建设,平台公司运营模式,网站200m虚拟主机能放多少东西背景
AggregateFunction接口是我们经常用的窗口聚合函数#xff0c;其中有一个merge方法#xff0c;我们一般情况下也是实现了的#xff0c;但是你知道吗#xff0c;其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现
AggregateFunction.merge方法调用时…背景
AggregateFunction接口是我们经常用的窗口聚合函数其中有一个merge方法我们一般情况下也是实现了的但是你知道吗其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现
AggregateFunction.merge方法调用时机
AggregateFunction.merge方法其实只有在使用会话窗口进行窗口合并的时候才会用到如下所示
对应的源码首先查看WindowOperator.processElement方法对要合并的窗口的状态进行合并
public void processElement(StreamRecordIN element) throws Exception {final CollectionW elementWindows windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);// if element is handled by none of assigned elementWindowsboolean isSkippedElement true;final K key this.KgetKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {MergingWindowSetW mergingWindows getMergingWindowSet();for (W window : elementWindows) {// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we dont merge then// actualWindow windowW actualWindow mergingWindows.addWindow(window,new MergingWindowSet.MergeFunctionW() {Overridepublic void merge(W mergeResult,CollectionW mergedWindows,W stateWindowResult,CollectionW mergedStateWindows)throws Exception {triggerContext.key key;triggerContext.window mergeResult;triggerContext.onMerge(mergedWindows);for (W m : mergedWindows) {triggerContext.window m;triggerContext.clear();deleteCleanupTimer(m);}// 合并窗口的状态windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);}});继续查看AbstractHeapMergingState.mergeNamespaces方法
public void mergeNamespaces(N target, CollectionN sources) throws Exception {if (sources null || sources.isEmpty()) {return; // nothing to do}final StateTableK, N, SV map stateTable;SV merged null;// merge the sourcesfor (N source : sources) {// get and remove the next source per namespace/keySV sourceState map.removeAndGetOld(source);if (merged ! null sourceState ! null) {//此处合并状态并调用AggregateFunction.merge方法merged mergeState(merged, sourceState);} else if (merged null) {merged sourceState;}}// merge into the target, if neededif (merged ! null) {map.transform(target, merged, mergeTransformation);}
}//真正调用AggregateFunction.merge方法合并自定义的状态
Override
protected ACC mergeState(ACC a, ACC b) {return aggregateTransformation.aggFunction.merge(a, b);
}这样AggregateFunction.merge的调用过程就清楚了实际应用中我们只需要在使用会话窗口时才需要实现这个方法其他的基于时间窗口的方式不需要实现这个方法,当然实现了也不会有错