当前位置: 首页 > news >正文

网站开发终止合作协议wordpress 关联微信

网站开发终止合作协议,wordpress 关联微信,个人网站的设计与实现参考文献,手机软件开发学校Flink-Windows 是将无限数据切割成有限的“数据块”进行处理#xff0c;这就是所谓的“窗口”#xff08;Window#xff09;。 注意#xff1a;Flink 中窗口并不是静态准备好的#xff0c;而是动态创建——当有落在这个窗口区间范围的数据达到时#xff0c;才创建对应的窗…Flink-Windows 是将无限数据切割成有限的“数据块”进行处理这就是所谓的“窗口”Window。 注意Flink 中窗口并不是静态准备好的而是动态创建——当有落在这个窗口区间范围的数据达到时才创建对应的窗口。【事件驱动没有数据到达永远都不会创建窗口】 1窗口分类 1按照驱动类型分 1时间窗口 时间窗口以时间点来定义窗口的开始(start)和结束(end)截取出的就是某一时间段的数据。 2计数窗口 计数窗口基于元素的个数截取数据到达固定的个数时就触发计算并关闭窗口。 2按照窗口分配数据的规则分类 根据分配数据的规则窗口的具体实现可以分为 4 类滚动窗口Tumbling Window、滑动窗口Sliding Window、会话窗口Session Window以及全局窗口Global Window。 1滚动窗口(Tumbling Windows) 滚动窗口有固定的大小是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠也不会有间隔是 “首尾相接”的状态。这是最简单的窗口形式每个数据都会被分配到一个窗口而且只会属于一个窗口。 滚动窗口应用非常广泛可以对每个时间段做聚合统计很多BI分析指标都可以用它来实现。 2滑动窗口(Sliding Windows) 滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的而是可以“错开”一定的位置。定义滑动窗口的参数有两个除去窗口大小(window size)之外还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果那么滑动步长就代表了计算频率。 滚动窗口也可以看作是一种特殊的滑动窗口一一窗口大小等于滑动步长(sizeslide) 滑动窗口适合计算结果更新频率非常高的场景。 3会话窗口(Session Windows) 会话窗口是基于“会话”(session)来来对数据进行分组的。会话窗口只能基于时间来定义。 会话窗口中最重要的参数就是会话的超时时间也就是两个会话窗口之间的最小距离。如果相邻两个数据到 来的时间间隔(gap)小于指定的大小(size)那说明还在保持会话它们就属于同一个窗口如果gap大于size, 那么新来的数据就应该属于新的会话窗口而前一个窗口就应该关闭了。 会话窗口之间一定是不会重叠的而且会留有至少为size的间隔(session 在一些类似保持会话的场景下可以使用会话窗口来进行数据的处理统计。 4全局窗口(Global Windows) “全局窗口”这种窗口全局有效会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时侯 默认是不会做触发计算的如果希望它能对数据进行计算处理还需要自定义“触发器”(Trigger)。 2窗口 API 1按键分区Keyed和非按键分区Non-Keyed 1按键分区窗口Keyed Windows 经过按键分区 keyBy 操作后数据流会按照 key 被分为多条逻辑流logical streams这就是 KeyedStream。 stream.keyBy(...) .window(...)2非按键分区Non-Keyed Windows 如果没有进行 keyBy那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务task上执行就相当于并行度变成了 1。 stream.windowAll(...)注意对于非按键分区的窗口操作手动调大窗口算子的并行度也是无效的windowAll本身就是一个非并行的操作。 2窗口分配器Window Assigners和窗口函数WindowFunctions stream.keyBy(key selector) .window(window assigner) .aggregate(window function)窗口分配器 1时间窗口 滚动处理时间窗口 stream.keyBy(...) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...).of()还有一个重载方法可以传入两个 Time 类型的参数size 和offset。第一个参数当然还是窗口大小第二个参数则表示窗口起始点的偏移量。 滑动处理时间窗口 stream.keyBy(...) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate(...)滑动窗口同样可以追加第三个参数用于指定窗口起始点的偏移量用法与滚动窗口完全一致。 处理时间会话窗口 stream.keyBy(...) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...)还可以调用 withDynamicGap()方法定义 session gap 的动态提取逻辑。 滚动事件时间窗口 stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)滑动事件时间窗口 stream.keyBy(...) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate(...)事件时间会话窗口 stream.keyBy(...) .window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)2计数窗口 滚动计数窗口 stream.keyBy(...) .countWindow(10)滑动计数窗口 stream.keyBy(...) .countWindow(10, 3)全局窗口 stream.keyBy(...) .window(GlobalWindows.create());注意使用全局窗口必须自行定义触发器才能实现窗口计算否则起不到任何作用。 窗口函数 1增量聚合函数ReduceFunction / AggregateFunction 归约函数ReduceFunction 类似Reduce算子只不过固定时间才会输出 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceString stream env.socketTextStream(124.222.253.33, 7777);stream.map(new WaterSensorMapFunction()).keyBy(WaterSensor::getId)// 设置滚动事件时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunctionWaterSensor() {Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println(调用reduce 方法之前的结果: value1 ,现在来的数据: value2);return new WaterSensor(value1.getId(), System.currentTimeMillis(), value1.getVc() value2.getVc());}}).print();env.execute();聚合函数AggregateFunction ReduceFunction 可以解决大多数归约聚合的问题但是这个接口有一个限制就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。 有三种类型输入类型IN、累加器类型ACC和输出类型OUT。 输入类型IN 就是输入流中元素的数据类型累加器类型 ACC 是进行聚合的中间状态类型而输出类型OUT是最终计算结果的类型。 接口中有四个方法 createAccumulator()创建一个累加器为聚合创建了一个初始状态每个聚合任务只会调用一次。add()将输入的元素添加到累加器中。getResult()从累加器中提取聚合的输出结果。merge()合并两个累加器并将合并后的状态作为一个累加器返回。 AggregateFunction 的工作原理首先调用createAccumulator()为任务初始化一个状态累加器而后每来一个数据就调用一次 add()方法对数据进行聚合得到的结果保存在状态中等到了窗口需要输出时再调用 getResult()方法得到计算结果。很明显与 ReduceFunction 相同AggregateFunction 也是增量式的聚合而由于输入、中间状态、输出的类型可以不同使得应用更加灵活方便。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(124.222.253.33, 7777).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(WaterSensor::getId);WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperatorString aggregate sensorWS.aggregate(new AggregateFunctionWaterSensor, Integer, String() {Overridepublic Integer createAccumulator() {System.out.println(创建累加器);return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println( 调用add方法,value value);return accumulator value.getVc();}Overridepublic String getResult(Integer accumulator) {System.out.println(调用getResult方法);return accumulator.toString();}Overridepublic Integer merge(Integer a, Integer b) {System.out.println(调用merge方法);return null;}});aggregate.print();env.execute();2全窗口函数full window functions 基于全部的数据计算 全窗口函数有两种WindowFunction 和 ProcessWindowFunction。 窗口函数WindowFunction 基于 WindowedStream 调用.apply()方法传入一个 WindowFunction 的实现类。 stream .keyBy(key selector) .window(window assigner) .apply(new MyWindowFunction());该类中可以获取到包含窗口所有数据的可迭代集合Iterable还可以拿到窗口Window本身的信息。 不过 WindowFunction 能提供的上下文信息较少也没有更高级的功能。事实上它的作用可以被 ProcessWindowFunction 全覆盖所以之后可能会逐渐弃用。 处理窗口函数ProcessWindowFunction ProcessWindowFunction 还可以获取到一个“上下文对象”Context。上下文对象非常强大不仅能够获取窗口信息还可以访问当前的时间和状态信息。 时间就包括了处理时间processing time和事件时间水位线event time watermark。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(124.222.253.33, 7777).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(WaterSensor::getId);WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperatorString process sensorWS.process(new ProcessWindowFunctionWaterSensor,String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long count elements.spliterator().estimateSize();long windowStartTs context.window().getStart();long windowEndTs context.window().getEnd();String windowStart DateFormatUtils.format(windowStartTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(windowEndTs, yyyy-MM-dd HH:mm:ss.SSS);out.collect(key s 的窗口[ windowStart , windowEnd ) 包含 count 条数据 elements);}});process.print();env.execute();增量聚合和全窗口函数结合使用 // ReduceFunction 与 WindowFunction 结合 public R SingleOutputStreamOperatorR reduce( ReduceFunctionT reduceFunctionWindowFunctionTRKWfunction) // ReduceFunction 与 ProcessWindowFunction 结合 public R SingleOutputStreamOperatorR reduce( ReduceFunctionT reduceFunctionProcessWindowFunctionTRKW function)// AggregateFunction 与 WindowFunction 结合 public ACCVR SingleOutputStreamOperatorR aggregate(AggregateFunctionTACCV aggFunctionWindowFunctionVRKW windowFunction) // AggregateFunction 与 ProcessWindowFunction 结合 public ACCVR SingleOutputStreamOperatorR aggregate(AggregateFunctionTACCV aggFunction, ProcessWindowFunctionVRKW 结合使用 public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(124.222.253.33, 7777).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 2. 窗口函数/*增量聚合 Aggregate 全窗口 process1、增量聚合函数处理数据 来一条计算一条2、窗口触发时 增量聚合的结果只有一条传递给全窗口函数3、经过全窗口函数的处理包装后输出结合两者的优点1、增量聚合 来一条计算一条存储中间的计算结果占用的空间少2、全窗口函数 可以通过 上下文 实现灵活的功能*/// sensorWS.reduce() //也可以传两个SingleOutputStreamOperatorString result sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}public static class MyAgg implements AggregateFunctionWaterSensor, Integer, String {Overridepublic Integer createAccumulator() {System.out.println(创建累加器);return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(调用 add 方法,value value);return accumulator value.getVc();}Overridepublic String getResult(Integer accumulator) {System.out.println(调用 getResult 方法);return accumulator.toString();}Overridepublic Integer merge(Integer a, Integer b) {System.out.println(调用 merge 方法);return null;}}// 全窗口函数的输入类型 增量聚合函数的输出类型public static class MyProcess extends ProcessWindowFunctionString, String, String, TimeWindow {Overridepublic void process(String s, Context context, IterableString elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyyMM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements);}} }Flink-Time Event Time事件时间一个是数据产生的时间时间戳Timestamp)Processing time处理时间数据真正被处理的时间 事件时间在实际应用中更为广泛从Flink 1.12版本开始Flink已经将事件时间作为默认的时间语义。
http://www.dnsts.com.cn/news/117287.html

相关文章:

  • 济宁专业网站开发公司wordpress快速建站教程视频教程
  • 衡水做网站优化网站优化自己可以做吗
  • 海口网站建设优化案例同城类网站建设多少钱
  • 网页版qq怎么登录seo引擎优化平台培训
  • 网站开发及上线过程黄冈网站推广代运营
  • 负责加强局网站建设wordpress离线文章发布
  • 镇江还有什么网站吗如何用dw制作网页
  • 风车网站做花盆磨具深圳宝安区是富人区吗
  • 煤矿建设工程质量监督总站网站广州app开发网站建设
  • 五金配件东莞网站建设技术支持做个中英文网站多少钱
  • 住房城乡建设部网站软文推广范文
  • 试玩平台类网站怎么做的专做logo网站叫什么地方
  • 为学校网站建设销售人员管理方案
  • 如何编程制作自己的网站做一个app大概要多少钱
  • 三亚网页制作网站排名网站优化
  • 自己建的网站地址wordpress 多媒体插件
  • 哈尔滨网络公司案例陕西网站建设优化建站
  • 公司网站开发怎么做账wordpress当前菜单
  • 做网站提成如何提交网站给百度
  • 网站建设类图书有哪些公司网站建设需要注意哪些问题
  • 网站开发技术方法与路线查公司信息的网站
  • 磁县企业做网站推广wordpress首页打开变慢
  • 网站建设和运营的成本是多少钱织梦模板大气网站建设类网站模板下载
  • 南昌做网站优化的公司网站空间管理面板
  • 厦门网站建设和人才库建设上海市建设工程安全质量监督总站网站
  • 万全网站建设郑州平台制作
  • 网站建设制作定制网站开发赚钱吗 知乎
  • 山西公司网站建设国内平台有哪些
  • 网站flash素材海南房产网站建设
  • 网站有什么功能网站开发项目私活