当前位置: 首页 > news >正文

建工网校建筑工程网郑州企业网站优化服务哪家好

建工网校建筑工程网,郑州企业网站优化服务哪家好,建设房地产法律网站,沧浪手机网站建设方案一、Flink 窗口 理解 在流处理应用中#xff0c;数据是连续不断的#xff0c;因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次#xff0c;但是有时我们需要做一些聚合类的处理#xff0c;例如#xff1a;在过去的1分钟内有多少用户点击…一、Flink 窗口 理解 在流处理应用中数据是连续不断的因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次但是有时我们需要做一些聚合类的处理例如在过去的1分钟内有多少用户点击了我们的网页。在这种情况下我们必须定义一个窗口用来收集最近一分钟内的数据并对这个窗口内的数据进行计算。 流式计算是一种被设计用于处理无限数据集的数据处理引擎而无限数据集是指一种不断增长的本质上无限的数据集而Window窗口是一种切割无限数据为有限块进行处理的手段。 在Flink中, 窗口(window)是处理无界流的核心. 窗口把流切割成有限大小的多个存储桶(bucket), 我们在这些桶上进行计算. 时间窗口 时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸。在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp())时间窗口又分3种:滚动窗口、滑动窗口、会话窗口。 二、数据准备 准备一个WaterSensor类方便演示 package com.lyh.bean;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;Data NoArgsConstructor AllArgsConstructor public class WaterSensor {private String id;private Long ts;private Integer vc; }三、时间滚动窗口 滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口。滚动窗口能将数据流切分成不重叠的窗口每一个事件只能属于一个窗口。 滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口. 滚动窗口能将数据流切分成不重叠的窗口每一个事件只能属于一个窗口 1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定2.我们传递给window函数的对象叫窗口分配器. 时间滚动窗口代码 package com.lyh.flink07;import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream(hadoop100,9999).map(line - {String[] data line.split(,);return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunctionWaterSensor, String,String, TimeWindow() {Overridepublic void process(String key,Context ctx,IterableWaterSensor elements,CollectorString out) throws Exception {ListWaterSensor list toList(elements);long starttime ctx.window().getStart();long endtime ctx.window().getEnd();out.collect(窗口 starttime endtime key: key list: list);}}).print();env.execute();}private static TListT toList(IterableT it) {ListT list new ArrayList();for (T t : it) {list.add(t);}return list;} } 运行结果 在hadoop100 服务器 输入nc -lk 999 启动socket 消费结果 四、时间滑动窗口 与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率. 所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中 例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据。 时间滑动窗口代码 package com.lyh.flink07;import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream(hadoop100,9999).map(line - {String[] data line.split(,);return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId) // .window(TumblingProcessingTimeWindows.of(Time.seconds(5))).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).process(new ProcessWindowFunctionWaterSensor, String,String, TimeWindow() {Overridepublic void process(String key,Context ctx,IterableWaterSensor elements,CollectorString out) throws Exception {ListWaterSensor list toList(elements);long starttime ctx.window().getStart();long endtime ctx.window().getEnd();out.collect(窗口 starttime endtime key: key list: list);}}).print();env.execute();}private static TListT toList(IterableT it) {ListT list new ArrayList();for (T t : it) {list.add(t);}return list;} }执行结果 在hadoop100 服务器 输入nc -lk 999 启动socket 消费结果 五、时间会话窗口 会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间. 如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔) 我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口。 时间会话窗口代码 package com.lyh.flink07;import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream(hadoop100,9999).map(line - {String[] data line.split(,);return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId) // .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))).process(new ProcessWindowFunctionWaterSensor, String,String, TimeWindow() {Overridepublic void process(String key,Context ctx,IterableWaterSensor elements,CollectorString out) throws Exception {ListWaterSensor list toList(elements);long starttime ctx.window().getStart();long endtime ctx.window().getEnd();out.collect(窗口 starttime endtime key: key list: list);}}).print();env.execute();}private static TListT toList(IterableT it) {ListT list new ArrayList();for (T t : it) {list.add(t);}return list;} }运行结果 在hadoop100 服务器 输入nc -lk 999 启动socket 消费结果 因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction 。 六、基于元素个数的滚动窗口 默认的CountWindow是一个滚动窗口只需要指定窗口大小即可当元素数量达到窗口大小时就会触发窗口的执行。 实例代码 .countWindow(3) 说明:哪个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口. 基于元素个数的滚动窗口代码 package com.lyh.flink07;import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream(hadoop100,9999).map(line - {String[] data line.split(,);return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).countWindow(2).process(new ProcessWindowFunctionWaterSensor, String, String, GlobalWindow() {Overridepublic void process(String key,Context ctx,IterableWaterSensor elements,CollectorString out) throws Exception {ListWaterSensor list toList(elements);out.collect(窗口 key: key list: list);}}).print();env.execute();}private static TListT toList(IterableT it) {ListT list new ArrayList();for (T t : it) {list.add(t);}return list;} }运行结果 七、基于元素个数的滑动窗口 滑动窗口和滚动窗口的函数名是完全一致的只是在传参数时需要传入两个参数一个是window_size一个是sliding_size。下面代码中的sliding_size设置为了2也就是说每收到两个相同key的数据就计算一次每一次计算的window范围最多是3个元素。 实例代码 .countWindow(3, 2) package com.lyh.flink07;import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream(hadoop100,9999).map(line - {String[] data line.split(,);return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId) // .countWindow(2).countWindow(3,2).process(new ProcessWindowFunctionWaterSensor, String, String, GlobalWindow() {Overridepublic void process(String key,Context ctx,IterableWaterSensor elements,CollectorString out) throws Exception {ListWaterSensor list toList(elements);out.collect(窗口 key: key list: list);}}).print();env.execute();}private static TListT toList(IterableT it) {ListT list new ArrayList();for (T t : it) {list.add(t);}return list;} }运行结果
http://www.dnsts.com.cn/news/255877.html

相关文章:

  • 网站建设 响应式 北京设计工作室的经营范围
  • 企业集团网站网站建设方案南通做网站的公司
  • php网站开发专业介绍欢迎访问中国建设银行网站个人客户
  • 做游戏网站要通过什么审核做网站会什么
  • 2018年的网站制作嘉兴网站开发与制作
  • 克隆网站带后台网站开发需要做什么工作
  • .net cms网站管理系统网站建设 重庆
  • 网站分析内容建筑公司资质等级
  • 网站备案时间要多久网站建设是前端吗
  • 莒县网站建设外贸网站源码怎么建
  • 快速开发网站的应用程序石家庄关键词排名工具
  • 都是做面食网站域名备案查询系统工信部
  • 济宁网站运营策略网站开发 自定义首页显示
  • 第18讲:商品模型 织梦网站系统 dedecms 教学课件苏州网站建设方案外包
  • 网站风险解除手机网站生成
  • 移动积分兑换商城官方网站南山住房和建设局网站官网
  • 标书制作公司网站北京建设信源咨询有限公司网站
  • 建行网站登录不了太原网站域名开发
  • 上海网站搭建公司哪家好如何看到网站的建设时间
  • 专业网站开发设计可画人物插画设计
  • 永年企业做网站推广在拼多多上怎么开网店
  • 做一个好的公司网站有什么好处导航网站html模板
  • 重庆 网站建设大全福利温州网站制作策划
  • mvc5 网站开发之学 pdf机械加工网站哪里找
  • 旅游的网站建设策划书国外设计文章的网站
  • 有没有做淘宝首页特效的网站263企业邮箱注册入口
  • 查域名被墙惠州搜索引擎优化
  • mvc5 网站开发美學 pdf导游网站如何建设的
  • 网站搭建费用做公众号app,网站,app
  • 上海做兼职哪个网站免费建站cms