当前位置: 首页 > news >正文

服务推广网站h5制作平台排行榜

服务推广网站,h5制作平台排行榜,漯河网站建设费用,中山百度网站建设热词统计案例#xff1a; 用flink中的窗口函数#xff08;apply#xff09;读取kafka中数据#xff0c;并对热词进行统计。 apply:全量聚合函数#xff0c;指在窗口触发的时候才会对窗口内的所有数据进行一次计算#xff08;等窗口的数据到齐#xff0c;才开始进行聚合…热词统计案例 用flink中的窗口函数apply读取kafka中数据并对热词进行统计。 apply:全量聚合函数指在窗口触发的时候才会对窗口内的所有数据进行一次计算等窗口的数据到齐才开始进行聚合计算可实现对窗口内的数据进行排序等需求。 代码演示 kafka发送消息端  package com.bigdata.Day04;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties; import java.util.Random;public class Demo01_windows_kafka发消息 {public static void main(String[] args) throws Exception {// Properties 它是map的一种Properties properties new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bigdata01:9092);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 创建了一个消息生产者对象KafkaProducer kafkaProducer new KafkaProducer(properties);String[] arr {联通换猫,遥遥领先,恒大歌舞团,恒大足球队,郑州烂尾楼};Random random new Random();for (int i 0; i 500; i) {ProducerRecord record new ProducerRecord(topic1,arr[random.nextInt(arr.length)]);// 调用这个里面的send方法kafkaProducer.send(record);Thread.sleep(50);}kafkaProducer.close();} } kafka接受消息端  package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties new Properties();properties.setProperty(bootstrap.servers,bigdata01:9092);properties.setProperty(group.id, g2);FlinkKafkaConsumerString kafkaSource new FlinkKafkaConsumerString(topic1,new SimpleStringSchema(),properties);DataStreamSourceString dataStreamSource env.addSource(kafkaSource);// transformation-数据处理转换DataStreamTuple2String,Integer mapStream dataStreamSource.map(new MapFunctionString, Tuple2String,Integer() {Overridepublic Tuple2String,Integer map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStreamTuple2String, Integer, String keyedStream mapStream.keyBy(tuple2 - tuple2.f0);keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型第二个泛型是返回值类型 第三个是key 的类型 第四个是窗口对象.apply(new WindowFunctionTuple2String, Integer, String, String, TimeWindow() {Overridepublic void apply(String key, // 分组key {俄乌战争,[1,1,1,1,1]}TimeWindow window, // 窗口对象IterableTuple2String, Integer input, // 分组key在窗口的所有数据CollectorString out // 用于输出) throws Exception {long start window.getStart();long end window.getEnd();// lang3 包下的工具类String startStr DateFormatUtils.format(start,yyyy-MM-dd HH:mm:ss);String endStr DateFormatUtils.format(end,yyyy-MM-dd HH:mm:ss);int sum 0;for(Tuple2String,Integer tuple2: input){sum tuple2.f1;}out.collect(key , startStr ,endStr ,sumsum);}}).print();//5. execute-执行env.execute();} } 当执行kafka接收消息端时会报如下错误  错误原因在对kafka中数据进行KeyBy分组处理时使用了lambda表达式 解决方法 在使用KeyBy时将函数的各种参数类型都写清楚修改后的代码如下 package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties new Properties();properties.setProperty(bootstrap.servers,bigdata01:9092);properties.setProperty(group.id, g2);FlinkKafkaConsumerString kafkaSource new FlinkKafkaConsumerString(topic1,new SimpleStringSchema(),properties);DataStreamSourceString dataStreamSource env.addSource(kafkaSource);// transformation-数据处理转换DataStreamTuple2String,Integer mapStream dataStreamSource.map(new MapFunctionString, Tuple2String,Integer() {Overridepublic Tuple2String,Integer map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStreamTuple2String, Integer, String keyedStream mapStream.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}});keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型第二个泛型是返回值类型 第三个是key 的类型 第四个是窗口对象.apply(new WindowFunctionTuple2String, Integer, String, String, TimeWindow() {Overridepublic void apply(String key, // 分组key {俄乌战争,[1,1,1,1,1]}TimeWindow window, // 窗口对象IterableTuple2String, Integer input, // 分组key在窗口的所有数据CollectorString out // 用于输出) throws Exception {long start window.getStart();long end window.getEnd();// lang3 包下的工具类String startStr DateFormatUtils.format(start,yyyy-MM-dd HH:mm:ss);String endStr DateFormatUtils.format(end,yyyy-MM-dd HH:mm:ss);int sum 0;for(Tuple2String,Integer tuple2: input){sum tuple2.f1;}out.collect(key , startStr ,endStr ,sumsum);}}).print();//5. execute-执行env.execute();} }
http://www.dnsts.com.cn/news/248435.html

相关文章:

  • 学网站开发需要学那些自有服务器怎么做网站备案
  • 狼窝网站更新升级通知沈阳市网站建设
  • 成都网站排名公司做网站一天忙吗
  • 电子商务网站建设特点网页制作中的常见问题
  • 网站建设微商城wordpress模板左上角的logo换成自己的
  • 化纤公司网站建设国家企业信息认证系统
  • 2023网站分享口碑最好的装修公司排行
  • 学院网站建设策划书黄陂网站建设
  • 网站做专题主题该怎么选怎么用网站建设
  • asp做的手机网站个人网站后台管理
  • 山西自助建站费用低zimeiti wordpress
  • 沭阳做网站的公司怎样给网站做seo优化
  • 网站seo优化推推蛙wordpress付费下载主题
  • 旅游商城网站订单处理项目网站的建设有两种模式
  • 做网站前产品经理要了解什么深圳辰硕网站优化
  • 沈阳高端网站建设网站开发属于哪个税收分类
  • 关键词搜索工具好站网进出成都最新通知
  • iis新建网站无法浏览网站优化的基本思想与原则
  • 旅游网站设计图上海外贸seo推广
  • 公司网站建设维护合同范本在线表白网页制作
  • 免费商城网站申请网站建设的优缺点
  • 黄村做网站建设软件著作权登记证书
  • 佛山制作网站企业模板建站和仿站
  • 网站之间如何交换友情链接wordpress用户注册登录插件
  • 网站素材设计框架wordpress+商会+模版
  • 内网建设网站需要什么条件旭泽建站
  • 理财网站方案建设智能建造师证书的含金量
  • 网站源码下载网站怎么在企业站建立网站
  • 做宽屏网站营销型网站建设多少钱
  • 四川做网站企业融资方式有哪几种