服务推广网站,h5制作平台排行榜,漯河网站建设费用,中山百度网站建设热词统计案例#xff1a;
用flink中的窗口函数#xff08;apply#xff09;读取kafka中数据#xff0c;并对热词进行统计。
apply:全量聚合函数#xff0c;指在窗口触发的时候才会对窗口内的所有数据进行一次计算#xff08;等窗口的数据到齐#xff0c;才开始进行聚合…热词统计案例
用flink中的窗口函数apply读取kafka中数据并对热词进行统计。
apply:全量聚合函数指在窗口触发的时候才会对窗口内的所有数据进行一次计算等窗口的数据到齐才开始进行聚合计算可实现对窗口内的数据进行排序等需求。
代码演示
kafka发送消息端
package com.bigdata.Day04;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class Demo01_windows_kafka发消息 {public static void main(String[] args) throws Exception {// Properties 它是map的一种Properties properties new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bigdata01:9092);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 创建了一个消息生产者对象KafkaProducer kafkaProducer new KafkaProducer(properties);String[] arr {联通换猫,遥遥领先,恒大歌舞团,恒大足球队,郑州烂尾楼};Random random new Random();for (int i 0; i 500; i) {ProducerRecord record new ProducerRecord(topic1,arr[random.nextInt(arr.length)]);// 调用这个里面的send方法kafkaProducer.send(record);Thread.sleep(50);}kafkaProducer.close();}
}
kafka接受消息端
package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties new Properties();properties.setProperty(bootstrap.servers,bigdata01:9092);properties.setProperty(group.id, g2);FlinkKafkaConsumerString kafkaSource new FlinkKafkaConsumerString(topic1,new SimpleStringSchema(),properties);DataStreamSourceString dataStreamSource env.addSource(kafkaSource);// transformation-数据处理转换DataStreamTuple2String,Integer mapStream dataStreamSource.map(new MapFunctionString, Tuple2String,Integer() {Overridepublic Tuple2String,Integer map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStreamTuple2String, Integer, String keyedStream mapStream.keyBy(tuple2 - tuple2.f0);keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型第二个泛型是返回值类型 第三个是key 的类型 第四个是窗口对象.apply(new WindowFunctionTuple2String, Integer, String, String, TimeWindow() {Overridepublic void apply(String key, // 分组key {俄乌战争,[1,1,1,1,1]}TimeWindow window, // 窗口对象IterableTuple2String, Integer input, // 分组key在窗口的所有数据CollectorString out // 用于输出) throws Exception {long start window.getStart();long end window.getEnd();// lang3 包下的工具类String startStr DateFormatUtils.format(start,yyyy-MM-dd HH:mm:ss);String endStr DateFormatUtils.format(end,yyyy-MM-dd HH:mm:ss);int sum 0;for(Tuple2String,Integer tuple2: input){sum tuple2.f1;}out.collect(key , startStr ,endStr ,sumsum);}}).print();//5. execute-执行env.execute();}
}
当执行kafka接收消息端时会报如下错误 错误原因在对kafka中数据进行KeyBy分组处理时使用了lambda表达式 解决方法
在使用KeyBy时将函数的各种参数类型都写清楚修改后的代码如下
package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties new Properties();properties.setProperty(bootstrap.servers,bigdata01:9092);properties.setProperty(group.id, g2);FlinkKafkaConsumerString kafkaSource new FlinkKafkaConsumerString(topic1,new SimpleStringSchema(),properties);DataStreamSourceString dataStreamSource env.addSource(kafkaSource);// transformation-数据处理转换DataStreamTuple2String,Integer mapStream dataStreamSource.map(new MapFunctionString, Tuple2String,Integer() {Overridepublic Tuple2String,Integer map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStreamTuple2String, Integer, String keyedStream mapStream.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}});keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型第二个泛型是返回值类型 第三个是key 的类型 第四个是窗口对象.apply(new WindowFunctionTuple2String, Integer, String, String, TimeWindow() {Overridepublic void apply(String key, // 分组key {俄乌战争,[1,1,1,1,1]}TimeWindow window, // 窗口对象IterableTuple2String, Integer input, // 分组key在窗口的所有数据CollectorString out // 用于输出) throws Exception {long start window.getStart();long end window.getEnd();// lang3 包下的工具类String startStr DateFormatUtils.format(start,yyyy-MM-dd HH:mm:ss);String endStr DateFormatUtils.format(end,yyyy-MM-dd HH:mm:ss);int sum 0;for(Tuple2String,Integer tuple2: input){sum tuple2.f1;}out.collect(key , startStr ,endStr ,sumsum);}}).print();//5. execute-执行env.execute();}
}