当前位置: 首页 > news >正文

广告牌图片100例关键词推广seo

广告牌图片100例,关键词推广seo,看片狂人,网站改版301是什么统计固定时间内两条流数据的匹配情况#xff0c;需要自定义来实现——可以用窗口#xff08;window#xff09;来表示。为了更方便地实现基于时间的合流操作#xff0c;Flink 的 DataStrema API 提供了内置的 join 算子。 窗口联结#xff08;Window Join#xff09; 一… 统计固定时间内两条流数据的匹配情况需要自定义来实现——可以用窗口window来表示。为了更方便地实现基于时间的合流操作Flink 的 DataStrema API 提供了内置的 join 算子。 窗口联结Window Join 一段时间的双流合并 定义时间窗口并将两条流中共享一个公共键key的数据放在窗口中进行配对处理。 stream1.join(stream2).where(KeySelector) // stream1 的 keyBy.equalTo(KeySelector) // stream2 的 keyBy.window(WindowAssigner).apply(JoinFunction)public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 2),Tuple2.of(b, 3),Tuple2.of(c, 4)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String,IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));SingleOutputStreamOperatorTuple3String, Integer, Integer ds2 env.fromElements(Tuple3.of(a, 1, 1),Tuple3.of(a, 11, 1),Tuple3.of(b, 2, 1),Tuple3.of(b, 12, 1),Tuple3.of(c, 14, 1),Tuple3.of(d, 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String,Integer, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));DataStreamString join ds1.join(ds2).where(r1 - r1.f0) // ds1 的keyby.equalTo(r2 - r2.f0) // ds2 的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/*** 关联上的数据调用 join 方法* param first ds1 的数据* param second ds2 的数据*/Overridepublic String join(Tuple2String, Integer first, Tuple3String, Integer, Integer second) throws Exception {return first ----- second;}});join.print();env.execute();} }输出 window join 两条流落在同一个时间窗口范围内才能匹配根据 keyBy 的 key来进行匹配关联只能拿到匹配上的数据类似有固定时间范围的inner join 间隔联结Interval Join 存在如下场景两条流匹配的两个数据有可能刚好“卡在”窗口边缘两侧窗口内就都没有匹配了可以使用“间隔联结”interval join来解决。 原理 给定两个时间点分别叫作间隔的“上界”upperBound和“下界”lowerBound可以开辟一段时间间隔[a.timestamp lowerBound, a.timestamp upperBound] 即以 a 的时间戳为中心下至下界点、上至上界点的一个闭区间这段时间作为可以匹配另一条流数据的“窗口”范围。 匹配的条件为 a.timestamp lowerBound b.timestamp a.timestamp upperBound stream1 .keyBy(KeySelector)// KeyedStream 调用 .intervalJoin(stream2.keyBy(KeySelector)) .between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunctionInteger, Integer, String(){Overridepublic void processElement(Integer left, Integer right,Context ctx, CollectorString out){out.collect(left , right);} });处理迟到数据可以使用左右侧输出流 完整代码 public class IntervalJoinWithLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.socketTextStream(hadoop102, 7777).map((MapFunctionString, Tuple2String, Integer) value - {String[] datas value.split(,);return Tuple2.of(datas[0], Integer.valueOf(datas[1]));}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String,IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));SingleOutputStreamOperatorTuple3String, Integer, Integer ds2 env.socketTextStream(hadoop102, 8888).map((MapFunctionString, Tuple3String, Integer, Integer) value - {String[] datas value.split(,);return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));/*** 【Interval join】* 1、只支持事件时间* 2、指定上界、下界的偏移负号代表时间往前正号代表时间往后* 3、process 中只能处理 join 上的数据* 4、两条流关联后的 watermark以两条流中最小的为准* 5、如果 当前数据的事件时间 当前的 watermark就是迟到数据主流的 process 不处理* between 后可以指定将 左流 或 右流的迟到数据放入侧输出流* *///1. 分别做 keybykey 其实就是关联条件KeyedStreamTuple2String, Integer, String ks1 ds1.keyBy(r1 - r1.f0);KeyedStreamTuple3String, Integer, Integer, String ks2 ds2.keyBy(r2 - r2.f0);//2. 调用 interval join// 左右测输出流迟到标签OutputTagTuple2String, Integer ks1LateTag new OutputTag(ks1-late, Types.TUPLE(Types.STRING, Types.INT));OutputTagTuple3String, Integer, Integer ks2LateTag new OutputTag(ks2-late, Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperatorString process ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)) // 指定上下界.sideOutputLeftLateData(ks1LateTag) // 将ks1的迟到数据放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将ks2的迟到数据放入侧输出流.process(new ProcessJoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/*** 两条流的数据匹配上才会调用这个方法* param left ks1 的数据* param right ks2 的数据* param ctx 上下文* param out 采集器*/Overridepublic void processElement(Tuple2String, Integer left, Tuple3String, Integer, Integer right, Context ctx, CollectorString out) throws Exception {// 进入这个方法是关联上的数据out.collect(left ------ right);}});process.print(主流);process.getSideOutput(ks1LateTag).printToErr(ks1迟到数据);process.getSideOutput(ks2LateTag).printToErr(ks2迟到数据);env.execute();} }
http://www.dnsts.com.cn/news/222478.html

相关文章:

  • 比较好的室内设计网站广东省东莞阳光网
  • 医院网站优化策划网站开发客户提供素材
  • 什么网站可以做任务领赏金怎么用dw做简单网站
  • 免费网站整站模板下载开发人员选项怎么打开
  • 应用宝aso优化seo优化培训
  • 如何做类似优酷的视频网站怎样成立网站
  • 网站编程脚本语言文章网站后台
  • 做网站 包含详情页设计吗wordpress 去优酷广告插件
  • 做相册的网站 ppt网站截图怎么做
  • 企业网站制作多少钱网站建设竞品分析
  • 长春电商网站建设费用广安建设网站
  • h5开发网站优点衡水做外贸网站建设
  • 如何制作个人网站教程怎么用网站做调查表
  • 天空网站开发者经典企业网站欣赏
  • 中国建设银行官网首页网站聊天网站模板
  • 郑州建站公司网站网站编写费用
  • 创意网站 模板wordpress首页分类
  • 南昌市新农村建设网站wordpress 405
  • 佛山网站seo公司网络服务许可证
  • Spring做网站和什么镇江seo网络推广定制
  • 网站上的洗衣液瓶子做花瓶怎么材质甲蛙网站建设
  • h5制作哪个网站好上海松江建设银行网站
  • 利用虚拟主机建设企业网站顺企网官网下载
  • 网站建站网站80s隐秘而伟大中国建设银行网上银行网站
  • 定制建站 app建设网站建设痛点
  • 专业的昆明网站建设自适应网站导航怎么做
  • php企业中英文网站源码太原互联网公司有哪些
  • asp.net网站后台源码网站建设的主要情况说明书
  • 做网站年赚千万网上手机商城网站建设
  • 免费建学校网站怎么做网站上的模拟动画