重生做网站小说,网站修改,设计一个网站页面需要多少钱,深圳 汽车网站建设目录
处理函数分类
概览介绍
KeydProcessFunction和ProcessFunction
定时器TimeService
窗口处理函数 多流转换
分流-侧输出流
合流
联合#xff08;Uniion#xff09;
连接#xff08;connect#xff09; 广播连接流#xff08;BroadcatConnectedStream#xf…目录
处理函数分类
概览介绍
KeydProcessFunction和ProcessFunction
定时器TimeService
窗口处理函数 多流转换
分流-侧输出流
合流
联合Uniion
连接connect 广播连接流BroadcatConnectedStream
基于时间的合流 -双流联结
窗口连接windowjoin
间隔联结Interval join
窗口同组联结window CoGroup 处理函数分类
窗口 | Apache Flink
处理函数分8种datastream调用keyby()后得到keyedStream,进而调用window()得到WindowedStream对于不同的流都可以调用process方法进行自定处理这是传入的函数都叫处理函数
概览介绍
flink提供8种不同的处理函数
窗口和流都可以使用
1.ProcessFunction 是最基本的处理函数基于DataStream直接调用process()时作为参数传入
2.KeydProcessFunction:是对流进行分区后的处理函数基于KeyedStream调用process()时作为参数传入。只有该方法支持定时器功能onTime
窗口函数只有窗口可以使用
3.ProcessWindowFunction:是开窗之后的处理函数也是全窗口函数的代表基于windowedStream调用process()时作为参数传入
4.ProcessAllWindowFunction:开窗后处理函数基于allWindowedStream的process()时作为参数传入。
连接函数流join使用
5.CoProcessFunction:是合并两条留之后的处理函数基于ConnectedStreams调用process()时作为参数传入
窗口的join使用
6.ProcessJoinFunction是间隔连接两条流字之后的处理函数基于IntervalJOined调用process()时作为参数传入
广播状态
7.BroadcastProcessFunction:是广播连接流处理函数基于BroadcastConnectedStream调用process时作为参数传入这里BroadcastConnectedStream是一个未做keyby处理的普通DataStream与一个广播流(BroadcastStream连接之后的产物
8.KeyedBroadcastFunction是按键分区的广播连接流处理函数同样基于BroadConnectedStream调用process时作为参数传入;与BroadcastProcessFunction不同的是是的广播流是keyedStream与一个广播流(BroadcastStream连接之后的产物
KeydProcessFunction和ProcessFunction
我们在看源码的时候看到ProcessFunction 和KeydProcessFunction结构一样都有两个接口一个必须实现的processElement()抽象方法一个非抽象方法onTimer()。差别在上下文Context中KeydProcessFunction多一个获取当前分区key的方法 getCurrentKey。当使用ProcessFunction使用定时器时程序运行会报错提示定时器只支持keyStream使用 stream.process(new ProcessFunction Event, String() {Overridepublic void processElement(Event value, ProcessFunctionEvent, String.Context ctx, CollectorString out) throws Exception {Long currTs ctx.timerService().currentProcessingTime();out.collect(数据到达到达时间 new Timestamp(currTs));// 注册一个10秒后的定时器ctx.timerService().registerProcessingTimeTimer(currTs 10 * 1000L);}Overridepublic void onTimer(long timestamp, ProcessFunctionEvent, String.OnTimerContext ctx, CollectorString out) throws Exception {out.collect(定时器触发触发时间 new Timestamp(timestamp));}})程序运行后报错
Caused by: java.lang.UnsupportedOperationException: Setting timers is only supported on a keyed streams.at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.registerProcessingTimeTimer(ProcessOperator.java:118)at com.atguigu.chapter07.ProcessingTimeTimerTest$1.processElement(ProcessingTimeTimerTest.java:55)at com.atguigu.chapter07.ProcessingTimeTimerTest$1.processElement(ProcessingTimeTimerTest.java:47)at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)at com.atguigu.chapter05.ClickSource.run(ClickSource.java:26)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) 定时器TimeService
TimeService中有六个方法可以分为基于处理时间和基于事件时间的方法两大类种时间精度为毫秒
获取当前处理时间
long currentProcessingTime();
获取当前水印时间
long currentWatermark();
注册处理时间为定时器
void registerProcessingTimeTimer(long time);long coalescedTime ((ctx.timestamp() timeout) / 1000) * 1000;
ctx.timerService().registerProcessingTimeTimer(coalescedTime);
注册时间时间为定时器
void registerEventTimeTimer(long time);long coalescedTime ctx.timerService().currentWatermark() 1;
ctx.timerService().registerEventTimeTimer(coalescedTime);
删除处理时间定时器
void deleteProcessingTimeTimer(long time);long timestampOfTimerToStop ...
ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
删除时间时间定时器
void deleteEventTimeTimer(long time);
long timestampOfTimerToStop ...
ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
注意另外定时器使用处理时间和时间在触发上有区别当设置定时任务为处理时间时即便后续没有数据写入定时器依然可以正常触发计算当如果设置为时间时定时任务时间依赖水印时间线。只有当水印时间大于定时器触发时间时才会触发计算即如果后入没有实时数据进入时最后一个定时器一直不会触发
窗口处理函数
ProcessWindowFunction和ProcessAllWindowFunction既是创立函数又是全窗口函数从名称上看他更倾向于窗口函数。用法与处理函数不同。 没有ontime借口和定时器服务一般窗口有使用窗口触发器Trigger在作用上可以类似timeservice的作用。
源码上看
abstract class ProcessWindowFunction[IN, OUT, KEY, W : Window]extends AbstractRichFunction {/*** Evaluates the window and outputs none or several elements.** param key The key for which this window is evaluated.* param context The context in which the window is being evaluated.* param elements The elements in the window being evaluated.* param out A collector for emitting elements.* throws Exception The function may throw exceptions to fail the program and trigger recovery.*/throws[Exception]def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT])// 如果有自定义状态该方法调用清理throws[Exception]def clear(context: Context) {}abstract class Context {def window: Wdef currentProcessingTime: Longdef currentWatermark: Long
// 获取自定义窗口状态 对当前key当前窗口有效def windowState: KeyedStateStore
// 获取自定义全局状态 对当前key的全部窗口有效def globalState: KeyedStateStoredef output[X](outputTag: OutputTag[X], value: X);}
}
--------------用法演示------------DataStreamTuple2String, Long input ...;input.keyBy(t - t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());/* ... */public class MyProcessWindowFunction extends ProcessWindowFunctionTuple2String, Long, String, String, TimeWindow {Overridepublic void process(String key, Context context, IterableTuple2String, Long input, CollectorString out) {long count 0;for (Tuple2String, Long in: input) {count;}out.collect(Window: context.window() count: count);}
} 多流转换
分流-侧输出流
处理函数有一个特殊功能来处理迟到数据和进行分流侧输出流(Side Output),再使用可以通过processElement或onTimer的下文context的output()方法就可以了
旁路输出 | Apache Flink
一下函数都可以获取
ProcessFunctionKeyedProcessFunctionCoProcessFunctionKeyedCoProcessFunctionProcessWindowFunctionProcessAllWindowFunction
用法示例
DataStreamInteger input ...;
//定义
final OutputTagString outputTag new OutputTagString(side-output){};SingleOutputStreamOperatorInteger mainDataStream input.process(new ProcessFunctionInteger, Integer() {Overridepublic void processElement(Integer value,Context ctx,CollectorInteger out) throws Exception {// 发送数据到主要的输出out.collect(value);// 发送数据到旁路输出ctx.output(outputTag, sideout- String.valueOf(value));}});//外部获取测试给出流
DataStreamString sideOutputStream mainDataStream.getSideOutput(outputTag);
分流之前有split在1.13版本中已经弃用直接使用处理函数的侧输出流
合流
联合Uniion
将多个流合成一个流而一个流中数据类型必须是相同的因此要求多个流的数据类型必须相同才能合并合并后流包含所有流的元素如果流有水位线合流之后的水位线为最小的为准
stream1.union(Stream2,stream3,...), 连接connect
connect得到的是connectedStreams与联合有本质的不同两个是量多流合并成一个流数据是混在一个流中跟一个流没什么区别而connect合并后内部仍然各自保持自己的数据形式不变彼此独立。因此可以处理不同类型的数据但是只能两个流连接。如果想要得到新的DataStream,还需要自定义一个“同处理”co-propcess转换操作对不同类型数据进行分别处理转换成同一种类型。 DataStreamInteger someStream //...
DataStreamString otherStream //...ConnectedStreamsInteger, String connectedStreams someStream.connect(otherStream); coMap和coflatmap函数
connectedStreams.map(new CoMapFunctionInteger, String, Boolean() {Overridepublic Boolean map1(Integer value) {return true;}Overridepublic Boolean map2(String value) {return false;}
});
connectedStreams.flatMap(new CoFlatMapFunctionInteger, String, String() {Overridepublic void flatMap1(Integer value, CollectorString out) {out.collect(value.toString());}Overridepublic void flatMap2(String value, CollectorString out) {for (String word: value.split( )) {out.collect(word);}}
});
CoprocessFunction
是处理函数中的亿元与处理函数用法相识,keyby的key类型必须相同 appStream.connect(thirdpartStream).keyBy(data - data.f0, data - data.f0).process(new OrderMatchResult()).print();public static class OrderMatchResult extends CoProcessFunctionTuple3String, String, Long, Tuple4String, String, String, Long, String{// 定义状态变量用来保存已经到达的事件private ValueStateTuple3String, String, Long appEventState;private ValueStateTuple4String, String, String, Long thirdPartyEventState;Overridepublic void open(Configuration parameters) throws Exception {appEventState getRuntimeContext().getState(new ValueStateDescriptorTuple3String, String, Long(app-event, Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));thirdPartyEventState getRuntimeContext().getState(new ValueStateDescriptorTuple4String, String, String, Long(thirdparty-event, Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG)));}Overridepublic void processElement1(Tuple3String, String, Long value, Context ctx, CollectorString out) throws Exception {// 看另一条流中事件是否来过if (thirdPartyEventState.value() ! null){out.collect(对账成功 value thirdPartyEventState.value());// 清空状态thirdPartyEventState.clear();} else {// 更新状态appEventState.update(value);// 注册一个5秒后的定时器开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f2 5000L);}}Overridepublic void processElement2(Tuple4String, String, String, Long value, Context ctx, CollectorString out) throws Exception {if (appEventState.value() ! null){out.collect(对账成功 appEventState.value() value);// 清空状态appEventState.clear();} else {// 更新状态thirdPartyEventState.update(value);// 注册一个5秒后的定时器开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f3 5000L);}}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {// 定时器触发判断状态如果某个状态不为空说明另一条流中事件没来if (appEventState.value() ! null) {out.collect(对账失败 appEventState.value() 第三方支付平台信息未到);}if (thirdPartyEventState.value() ! null) {out.collect(对账失败 thirdPartyEventState.value() app信息未到);}appEventState.clear();thirdPartyEventState.clear();}}} 广播连接流BroadcatConnectedStream
DataStream在调用connect时传入的参数可以不是一个DataStream而是一个广播流BroadcastStream这是合并两条流得到的就是一个广播连接流BroadcastConnectedStream,比较实用动态定义规则或配置的场景。下游算子收到广播规则后吧保存为状态。这就是广播状态。
广播状态底层是一个映射map结构来保存的。可以直接DataStream.broadcast()方法调用
// 一个 map descriptor它描述了用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptorString, Rule ruleStateDescriptor new MapStateDescriptor(RulesBroadcastState,BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHintRule() {}));// 广播流广播规则并且创建 broadcast state
BroadcastStreamRule ruleBroadcastStream ruleStream.broadcast(ruleStateDescriptor);//使用
DataStreamString output colorPartitionedStream.connect(ruleBroadcastStream).process(// KeyedBroadcastProcessFunction 中的类型参数表示// 1. key stream 中的 key 类型// 2. 非广播流中的元素类型// 3. 广播流中的元素类型// 4. 结果的类型在这里是 stringnew KeyedBroadcastProcessFunctionColor, Item, Rule, String() {// 模式匹配逻辑});
为了关联一个非广播流keyed 或者 non-keyed与一个广播流BroadcastStream我们可以调用非广播流的方法 connect()并将 BroadcastStream 当做参数传入。 这个方法的返回参数是 BroadcastConnectedStream具有类型方法 process()传入一个特殊的 CoProcessFunction 来书写我们的模式识别逻辑。 具体传入 process() 的是哪个类型取决于非广播流的类型
如果流是一个 keyed 流那就是 KeyedBroadcastProcessFunction 类型如果流是一个 non-keyed 流那就是 BroadcastProcessFunction 类型。
BroadcastProcessFunction 和 KeyedBroadcastProcessFunction
在传入的 BroadcastProcessFunction 或 KeyedBroadcastProcessFunction 中我们需要实现两个方法。processBroadcastElement() 方法负责处理广播流中的元素processElement() 负责处理非广播流中的元素。 两个子类型定义如下
public abstract class BroadcastProcessFunctionIN1, IN2, OUT extends BaseBroadcastProcessFunction {public abstract void processElement(IN1 value, ReadOnlyContext ctx, CollectorOUT out) throws Exception;public abstract void processBroadcastElement(IN2 value, Context ctx, CollectorOUT out) throws Exception;
}public abstract class KeyedBroadcastProcessFunctionKS, IN1, IN2, OUT {public abstract void processElement(IN1 value, ReadOnlyContext ctx, CollectorOUT out) throws Exception;public abstract void processBroadcastElement(IN2 value, Context ctx, CollectorOUT out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, CollectorOUT out) throws Exception;
}
基于时间的合流 -双流联结
窗口连接windowjoin
flink内置的join算子join()和coGroup()适用于窗口统计的不用再进行自定义触发器简化了开发逻辑 等同于sql的inner join on 或 select * from table1 t1,table2 t2 where t1.idt2.id
wehre()和 equalTo()方法制定两条流中连接的key然后通过window开窗口并调用apply传入自连接窗口函数进行计算
stream.join(otherStream).where(KeySelector) //stream的key.equalTo(KeySelector) //otherStream的key.window(WindowAssigner).apply(JoinFunction)//案例stream1.join(stream2).where(r - r.f0).equalTo(r - r.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunctionTuple2String, Long, Tuple2String, Long, String() {Overridepublic String join(Tuple2String, Long left, Tuple2String, Long right) throws Exception {return left right;}}).print(); join function 不是真正的窗口函数只是定义了窗口函数在调用是对匹配数据额具体处理逻辑。
Public
FunctionalInterface
public interface JoinFunctionIN1, IN2, OUT extends Function, Serializable {/*** The join method, called once per joined pair of elements.** param first The element from first input.* param second The element from second input.* return The resulting element.* throws Exception This method may throw exceptions. Throwing an exception will cause the* operation to fail and may trigger recovery.*/OUT join(IN1 first, IN2 second) throws Exception;
}
join时数据先按照key进行分组、进入对应的窗口存储当窗口结束时算子会先统计出窗口内两条流的数据所有组合即做一个笛卡尔积然后进行遍历传入joinfunction的join方法中 出了JoinFunction在apply方法中还可以闯入FlatJoinFunction使用方法类似区别是内部join实现犯法没有返回值使用收集器来实现。 间隔联结Interval join
间隔联结需要设定两个时间点对应上界(upperBound)和下届(lowerBound)对于同一条流A的任意一个元素a开辟一段时间间隔[a.timestamplowerBound,a.timestampupperBound],即开辟以a为中心上下届点为边界的一个闭区间相当于窗口。对于另外一条流B中的元素b如果时间戳b.timestampa.timestamplowerBound and b.timestampa.timestampupperBoundm 那么a和b就可以匹配上 调用
orderStream.keyBy(data - data.f0).intervalJoin(clickStream.keyBy(data - data.user)).between(Time.seconds(-5), Time.seconds(10)).process(new ProcessJoinFunctionTuple3String, String, Long, Event, String() {Overridepublic void processElement(Tuple3String, String, Long left, Event right, Context ctx, CollectorString out) throws Exception {out.collect(right left);}}).print(); 窗口同组联结window CoGroup
使用与join相同。将window的join替换成cogroup即可。与join不同是cogroup传递的一个可以遍历的集合没有做笛卡尔积。出了实现inner join还可以实现左外连接右外连接全外连接。
并且窗口联结底层也是通过同组联结实现 stream1.coGroup(stream2).where(r - r.f0).equalTo(r - r.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunctionTuple2String, Long, Tuple2String, Long, String() {//与join 区别是参数非单个元素而是遍历集合Overridepublic void coGroup(IterableTuple2String, Long iter1, IterableTuple2String, Long iter2, CollectorString collector) throws Exception {collector.collect(iter1 iter2);}}).print();
窗口联结地城代码 stream1.join(stream2).where(r - r.f0).equalTo(r - r.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunctionTuple2String, Long, Tuple2String, Long, String() {Overridepublic String join(Tuple2String, Long left, Tuple2String, Long right) throws Exception {return left right;}}).print();查看apply源码public T DataStreamT apply(JoinFunctionT1, T2, T function) {TypeInformationT resultType TypeExtractor.getBinaryOperatorReturnType(function,JoinFunction.class,0,1,2,TypeExtractor.NO_INDEX,input1.getType(),input2.getType(),Join,false);// 继续点击apply 查看源码return apply(function, resultType);}public T DataStreamT apply(JoinFunctionT1, T2, T function, TypeInformationT resultType) {// clean the closurefunction input1.getExecutionEnvironment().clean(function);//源码使用coGroup继续点击cocoGroupedWindowedStream input1.coGroup(input2).where(keySelector1).equalTo(keySelector2).window(windowAssigner).trigger(trigger).evictor(evictor).allowedLateness(allowedLateness);// 点击查看实现的JoinCoGroupFunction源码return coGroupedWindowedStream.apply(new JoinCoGroupFunction(function), resultType);}
查看实现JoinCoGroupFunction源码中将两个集合做笛卡尔积 public JoinCoGroupFunction(JoinFunctionT1, T2, T wrappedFunction) {super(wrappedFunction);}Overridepublic void coGroup(IterableT1 first, IterableT2 second, CollectorT out)throws Exception {for (T1 val1 : first) {for (T2 val2 : second) {out.collect(wrappedFunction.join(val1, val2));}}}}