排版的网站,wordpress主题ruikedu,网站建设员工资,在线crm软件有哪些优势?在离线 Hive 中#xff0c;我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢#xff1f;Flink DataStream API 为我们提供了3个算子来实现双流 join#xff0c;分别是#xff1a; join coGroup intervalJoin
下面我们分别详细看一下这…在离线 Hive 中我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢Flink DataStream API 为我们提供了3个算子来实现双流 join分别是 join coGroup intervalJoin
下面我们分别详细看一下这3个算子是如何实现双流 Join 的。 1. Join
Joining | Apache Flink
Join 算子提供的语义为 “Window join”即按照指定字段和滚动/滑动/会话窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。
Join 可以支持处理时间和事件时间两种时间特征。
Join 通用用法如下
stream.join(otherStream)
.where(KeySelector)
.equalTo(KeySelector)
.window(WindowAssigner)
.apply(JoinFunction)
Join 语义类似与离线 Hive 的 InnnerJoin (内连接)这意味着如果一个流中的元素在另一个流中没有相对应的元素则不会输出该元素。
下面我们看一下 Join 算子在不同类型窗口上的具体表现。
1.1 滚动窗口Join
当在滚动窗口上进行 Join 时所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。 如上图所示我们定义了一个大小为 2 秒的滚动窗口最终产生 [0,1][2,3]… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是在滚动窗口 [6,7] 中由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素因此该窗口不会输出任何内容。
下面我们一起看一下如何实现上图所示的滚动窗口 Join
:::color3 可以通过两个socket流将数据合并为一个三元组key,value1,value2
代码演示
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
public class _ShuangLiuJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来因为本地的并行度是16只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String greenStream env.socketTextStream(localhost, 8888).map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(绿色 Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(绿色的时间timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用因为kafka占用这个端口 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String orangeStream env.socketTextStream(localhost, 7777).map(new MapFunctionString, Tuple3String,Integer,String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(橘色 Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(橘色的时间timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream greenStream.join(orangeStream).where(tup3 - tup3.f0).equalTo(tup3 - tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunctionTuple3String, Integer, String, Tuple3String, Integer, String, Tuple3String, Integer, Integer() {Overridepublic Tuple3String, Integer, Integer join(Tuple3String, Integer, String t1, Tuple3String, Integer, String t2) throws Exception {System.out.println(t1.f2);System.out.println(t2.f2);return Tuple3.of(t1.f0, t1.f1, t2.f1);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();}
}
总结非常重要
1 要想测试这个效果需要将并行度设置为1
2窗口中数据的打印是需要触发的没有触发的数据窗口内是不会进行计算的所以记得输入触发的数据。
假如使用了EventTime 作为时间语义不管是窗口开始和结束时间还是触发的条件都跟系统时间没有关系而跟输入的数据有关系举例
假如你的第一条数据是key,0,2021-03-26 12:09:01 窗口的大小是5s水印是3秒 窗口的开始时间为
2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 触发时间是2021-03-26 12:09:08
为什么呢 水印时间 结束时间
水印时间是2021-03-26 12:09:08 - 3 2021-03-26 12:09:05 2021-03-26 12:09:05
::: 如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略设置100毫秒的最大可容忍的延迟时间同时也会为流分配事件时间戳。假设输入流为 格式两条流输入元素如下所示
绿色流
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11
橘色流
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11
1.2 滑动窗口Join [解释一下即 ]
当在滑动窗口上进行 Join 时所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联并最终传递到 JoinFunction 进行处理。 如上图所示我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是一个元素可能会落在不同的窗口中因此会在不同窗口中发生关联例如绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素则不会输出该元素。