当前位置: 首页 > news >正文

如何快速推广一个网站昆明网站建设推荐力鼎科技

如何快速推广一个网站,昆明网站建设推荐力鼎科技,自己做的网站打不开怎么回事,怎么做招标公司网站概述 窗口的长度(大小): 决定了要计算最近多长时间的数据 窗口的间隔: 决定了每隔多久计算一次 举例#xff1a;每隔10min,计算最近24h的热搜词#xff0c;24小时是长度#xff0c;每隔10分钟是间隔。 窗口的分类 1、根据window前是否调用keyBy分为键控窗口和非键控窗口… 概述 窗口的长度(大小): 决定了要计算最近多长时间的数据 窗口的间隔: 决定了每隔多久计算一次 举例每隔10min,计算最近24h的热搜词24小时是长度每隔10分钟是间隔。 窗口的分类 1、根据window前是否调用keyBy分为键控窗口和非键控窗口 2、根据window中参数的配置分为基于时间的基于条数的会话窗口 SlidingProcessingTimeWindows —— 滑动窗口按照处理时间 TumblingProcessingTimeWindows —— 滚动窗口按照处理时间 ProcessingTimeSessionWindows —— 会话窗口 Keyed Window --键控窗口 // Keyed Window stream.keyBy(...) - 按照一个Key进行分组.window(...) - 将数据流中的元素分配到相应的窗口中[.trigger(...)] - 指定触发器Trigger可选[.evictor(...)] - 指定清除器Evictor(可选).reduce/aggregate/process/apply() - 窗口处理函数Window Function Non-Keyed Window // Non-Keyed Window stream.windowAll(...) - 不分组将数据流中的所有元素分配到相应的窗口中[.trigger(...)] - 指定触发器Trigger可选[.evictor(...)] - 指定清除器Evictor(可选).reduce/aggregate/process() - 窗口处理函数Window Function 方括号([…]) 中的命令是可选的。 首先我们要决定是否对一个DataStream按照Key进行分组这一步必须在窗口计算之前进行。 经过keyBy的数据流将形成多组数据下游算子的多个实例可以并行计算。 windowAll不对数据流进行分组所有数据将发送到下游算子单个实例上。 决定是否分组之后窗口的后续操作基本相同。 经过windowAll的算子是不分组的窗口Non-Keyed Window它们的原理和操作与Keyed Window类似唯一的区别在于所有数据将发送给下游的单个实例或者说下游算子的并行度为1。 Flink窗口的骨架结构中有两个必须的两个操作 使用窗口分配器WindowAssigner将数据流中的元素分配到对应的窗口。 当满足窗口触发条件后对窗口内的数据使用窗口处理函数Window Function进行处理常用的Window Function有reduce、aggregate、process。 其他的trigger、evictor则是窗口的触发和销毁过程中的附加选项主要面向需要更多自定义的高级编程者如果不设置则会使用默认的配置。 基于时间的窗口  滚动窗口- TumblingWindow概念 package com.bigdata.day04;public class _01_windows {/*** 1、实时统计每个红绿灯通过的汽车数量* 2、实时统计每个红绿灯每个1分钟统计最近1分钟通过的汽车数量 ——滚动* 3、实时统计每个红绿灯每个1分钟统计最近2分钟通过的汽车数量 ——滑动*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSourceString socketStream env.socketTextStream(localhost, 8889);//3. transformation-数据处理转换socketStream.map(new MapFunctionString, Tuple2Integer,Integer() {Overridepublic Tuple2Integer,Integer map(String line) throws Exception {String[] words line.split( );return new Tuple2(Integer.parseInt(words[0]),Integer.parseInt(words[1]));}}).keyBy(new KeySelectorTuple2Integer, Integer, Integer() {Overridepublic Integer getKey(Tuple2Integer, Integer value) throws Exception {return value.f0;}})// 基于这个部分实现 滚动窗口 每一分钟 统计前一分钟的数据.window(TumblingProcessingTimeWindows.of(Time.minutes(1))).sum(1).print();env.execute();} } 滑动窗口– SlidingWindow概念  package com.bigdata.day04;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window;/*** 基本功能:* program:flinkProject* author: jinnian* create:2024-11-25 10:13:46**/ public class _01_windows {/*** 1、实时统计每个红绿灯通过的汽车数量* 2、实时统计每个红绿灯每个1分钟统计最近1分钟通过的汽车数量 ——滚动* 3、实时统计每个红绿灯每个1分钟统计最近2分钟通过的汽车数量 ——滑动*/public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSourceString socketStream env.socketTextStream(localhost, 8889);//3. transformation-数据处理转换socketStream.map(new MapFunctionString, Tuple2Integer,Integer() {Overridepublic Tuple2Integer,Integer map(String line) throws Exception {String[] words line.split( );return new Tuple2(Integer.parseInt(words[0]),Integer.parseInt(words[1]));}}).keyBy(new KeySelectorTuple2Integer, Integer, Integer() {Overridepublic Integer getKey(Tuple2Integer, Integer value) throws Exception {return value.f0;}})// 基于这一部分实现每30秒统计前一分钟的数据大的在前小的在后.window(SlidingProcessingTimeWindows.of(Time.minutes(1),Time.seconds(30))).sum(1).print();//5. execute-执行env.execute();} } 如何显示窗口时间——apply ——apply将reduce替代 kafka生产数据 package com.bigdata.day04;public class _02_kafka生产数据 {public static void main(String[] args) throws InterruptedException {// Properties 它是map的一种Properties properties new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bigdata01:9092);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString, String producer new KafkaProducer(properties);String[] arr {联通换猫,遥遥领先,恒大歌舞团,恒大足球队,郑州烂尾楼};Random random new Random();for (int i 0; i 5000; i) {int index random.nextInt(arr.length);ProducerRecordString, String record new ProducerRecord(edu, arr[index]);producer.send(record);Thread.sleep(30);}} } flink消费数据 package com.bigdata.day04;public class _02_flink消费数据 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);Properties properties new Properties();properties.setProperty(bootstrap.servers, bigdata01:9092);properties.setProperty(group.id, gw2);FlinkKafkaConsumerString consumer new FlinkKafkaConsumerString(edu,new SimpleStringSchema(),properties);DataStreamSourceString source env.addSource(consumer);source.map(new MapFunctionString, Tuple2String,Integer() {Overridepublic Tuple2String,Integer map(String value) throws Exception {return Tuple2.of(value,1);}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}}).window(SlidingProcessingTimeWindows.of(Time.minutes(1),Time.seconds(30)))//.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))/****/.apply(new WindowFunctionTuple2String, Integer, String, String, TimeWindow() {Overridepublic void apply(String key, TimeWindow window, IterableTuple2String, Integer input, CollectorString out) throws Exception {StringBuilder sb new StringBuilder();long start window.getStart();long end window.getEnd();String startStr DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss);String endStr DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss);int sum 0;for (Tuple2String, Integer tuple2 : input) {sum tuple2.f1;}sb.append(开始时间startStr,).append(结束时间endStr,).append(key: key ,).append(数量sum);out.collect(sb.toString());}}).print();env.execute();} } 基于条数的窗口——countWindow package com.bigdata.day04;public class _04_agg函数 {public static final Tuple3[] ENGLISH new Tuple3[] {Tuple3.of(class1, 张三, 100L),Tuple3.of(class1, 李四, 40L),Tuple3.of(class1, 王五, 60L),Tuple3.of(class2, 赵六, 20L),Tuple3.of(class2, 小七, 30L),Tuple3.of(class2, 小八, 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSourceTuple3String,String,Long dataStreamSource env.fromElements(ENGLISH);// 此时我要获取每个班级的平均成绩// 输入数据的类型(IN)、累加器的类型ACC和输出数据的类型OUT// IN——Tuple3String, String, Long// ACC——Tuple3String, Integer,Long 第一个是班级key第二个是数量第三个是总的成绩// OUT —— Tuple2String,Double 第一个是班级 第二个是平均成绩dataStreamSource.countWindowAll(3).aggregate(new AggregateFunctionTuple3String, String, Long, Tuple3String, Integer,Long, Tuple2String,Double() {// 初始化一个 累加器Overridepublic Tuple3String, Integer, Long createAccumulator() {return Tuple3.of(null,0,0L);}// 累加器和输入的值进行累加// Tuple3String, String, Long value 第一个是传入的值// Tuple3String, Integer, Long accumulator 第二个是累加器的值Overridepublic Tuple3String, Integer, Long add(Tuple3String, String, Long value, Tuple3String, Integer, Long accumulator) {return Tuple3.of(value.f0,accumulator.f11,accumulator.f2value.f2);}// 获取结果——在不同节点的结果进行汇总后实现Overridepublic Tuple2String, Double getResult(Tuple3String, Integer, Long accumulator) {return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);}// 由于flink是分布式所以在别的节点也会进行累加 该方法是不同节点的结果进行汇总// 即累加器之间的累加Overridepublic Tuple3String, Integer, Long merge(Tuple3String, Integer, Long a, Tuple3String, Integer, Long b) {return Tuple3.of(a.f0,a.f1b.f1,a.f2b.f2);}}).print();//4. sink-数据输出//5. execute-执行env.execute();} } 会话窗口 package com.bigdata.day04;public class _03_会话窗口 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);DataStreamSourceString source env.socketTextStream(localhost, 8889);source.map(new MapFunctionString, Tuple2String,Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] s value.split( );return Tuple2.of(s[0],Integer.valueOf(s[1]));}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}// 1、主要就是 ProcessingTimeSessionWindows 参数的使用// 2、使用 EventTimeSessionWindows的时候若没有水印就不会有结果}).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).reduce(new ReduceFunctionTuple2String, Integer() {Overridepublic Tuple2String, Integer reduce(Tuple2String, Integer value1, Tuple2String, Integer value2) throws Exception {return Tuple2.of(value1.f0,value1.f1value2.f1);}}).print();env.execute();} }
http://www.dnsts.com.cn/news/140933.html

相关文章:

  • 公司网站上线流程成都房地产市场分析
  • 微信小程序 创建网站工程项目信息查询平台
  • 英文网站建设方案 PPT站长工具seo综合查询adc
  • 老太太做受网站常德网站网站建设
  • 网站建设预算方案模板陕西宁德建设工程有限公司网站
  • 广州建设银行官方网站应用公园app的功能介绍
  • 可以免费注册的网站湖南企业seo优化报价
  • 网站虚拟主机租用温州网站设计只找亿企邦
  • 福州专业网站建设价格北京公司注册地址查询
  • 成都建设施工安全协会网站惠州网站建设排名
  • 湖北网站建设搭建seo加盟代理
  • 山东平台网站建设企业中美关系最新消息视频
  • 网站基本常识保康网站建设
  • 购买腾讯云主机可以直接做网站腾讯学生服务器做网站
  • 吉林企业网站模板建站哪个好wordpress 小程序
  • 帝国做视频网站在线做行测的网站
  • 能建设铁塔的公司网站缅甸做网站
  • h5游戏大厅深圳seo网络优化
  • 鹿班设计网站官网layui+wordpress
  • 怎样买空间做网站背景图片设计在线制作
  • 做电脑回收什么网站好wordpress自带企业主题下载
  • 网站在哪里变更备案信息wordpress大前端主题下载
  • 怎样在中国建设银行网站开通短信提醒凡科电脑版
  • 在西宁做网站可以吗网站登录不了怎么办
  • 网站服务内容网站建设江苏省交通厅门户网站建设管理
  • 在自己电脑建设网站ppt精美模板
  • seo网站优化对象深圳高端网站制作多少钱
  • wps演示做的和网站导航2017做网站挣钱吗
  • wordpress站内查找wordpress 域名 根目录
  • 瑞安联科网站建设个人建什么网站好