栾城seo整站排名,网站开发投资成本,培训心得体会范文,网易云音乐网站开发介绍前言#xff1a; 这是一个Flink自定义开发的基础教学。本文将通过flink的DataStream模块API#xff0c;以kafka为数据源#xff0c;构建一个基础测试环境#xff1b;包含一个kafka生产者线程工具#xff0c;一个自定义FilterFunction算子#xff0c;一个自定义MapFunctio…前言 这是一个Flink自定义开发的基础教学。本文将通过flink的DataStream模块API以kafka为数据源构建一个基础测试环境包含一个kafka生产者线程工具一个自定义FilterFunction算子一个自定义MapFunction算子用一个flink任务的代码逻辑将实时读kafka并多层处理串起来让读者体会通过Flink构建自定义函数的技巧。
一、Flink的开发模块分析
Flink提供四个基础模块核心SDK开发API分别是处理实时计算的DataStream和处理离线计算的DataSet基于这两个SDK在其上包装了TableAPI开发模块的SDK在Table API之上定义了高度抽象可用SQL开发任务的FlinkSQL。在核心开发API之下还有基础API的接口可用于对时间状态算子等最细粒度的特性对象做操作如包装自定义算子的ProcessWindowFunction和ProcessFunction等基础函数以及内置的对象状态StateTtlConfig
FLINK开发API关系结构如下 二、定制化开发Demo演示
2.1 场景介绍
Flink实时任务的的通用技术架构是消息队列中间件Flink任务
将数据采集到Kafka或pulser这类队列中间件的Topic,然后使用Flink内置的kafkaSource监控Topic的数据情况做实时处理。
这里提供一个kafka的生产者线程可以自定义构建需要的数据和上传时间用于控制写入kafka的数据源重写两个DataStream的基础算子FilterFunction和MapFunction,用于让读者体会如何对FLINK函数的重新包装后续更基础的函数原理一样我这里用String数据对象做处理减少对象转换的SDK引入通常要基于业务做数据polo的加工这个自己处理将对象换成业务对象然后使用Flink将整个业务串起来从kafka读数据经过两层处理最终输出需要的结果
2.2 本地demo演示
2.2.1 pom文件
这里以flink1.14.6scala1.12版本为例
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.example/groupIdartifactIdflinkCDC/artifactIdversion1.0-SNAPSHOT/version/parentartifactIdflinkStream/artifactIdpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink-version1.14.6/flink-versionscala-version2.12/scala-versionhadop-common-version2.9.1/hadop-common-versionelasticsearch.version7.6.2/elasticsearch.versiontarget.java.version1.8/target.java.versionscala.binary.version2.12/scala.binary.versionmaven.compiler.source${target.java.version}/maven.compiler.sourcemaven.compiler.target${target.java.version}/maven.compiler.targetlog4j.version2.17.1/log4j.version/propertiesrepositoriesrepositoryidapache.snapshots/idnameApache Development Snapshot Repository/nameurlhttps://repository.apache.org/content/repositories/snapshots//urlreleasesenabledfalse/enabled/releasessnapshots/snapshots/repository/repositoriesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink-version}/version!-- scopeprovided/scope--/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-core/artifactIdversion${flink-version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_${scala-version}/artifactIdversion${flink-version}/version!-- scopeprovided/scope--/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_${scala-version}/artifactIdexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactId/exclusionexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusionsversion${flink-version}/version/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.4.1/version/dependency/dependenciesbuildplugins!-- Java Compiler --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.1/versionconfigurationsource${target.java.version}/sourcetarget${target.java.version}/target/configuration/plugin!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --!-- Change the value of mainClass.../mainClass if your program entry point changes. --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.1.1/versionexecutions!-- Run shade goal on package phase --executionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationcreateDependencyReducedPomfalse/createDependencyReducedPomartifactSetexcludesexcludeorg.apache.flink:flink-shaded-force-shading/excludeexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludeorg.apache.logging.log4j:*/exclude/excludes/artifactSetfiltersfilter!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --artifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformerimplementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer/transformerimplementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClassmyflinkml.DataStreamJob/mainClass/transformer/transformers/configuration/execution/executions/plugin/pluginspluginManagementplugins!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --plugingroupIdorg.eclipse.m2e/groupIdartifactIdlifecycle-mapping/artifactIdversion1.0.0/versionconfigurationlifecycleMappingMetadatapluginExecutionspluginExecutionpluginExecutionFiltergroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversionRange[3.1.1,)/versionRangegoalsgoalshade/goal/goals/pluginExecutionFilteractionignore//action/pluginExecutionpluginExecutionpluginExecutionFiltergroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversionRange[3.1,)/versionRangegoalsgoaltestCompile/goalgoalcompile/goal/goals/pluginExecutionFilteractionignore//action/pluginExecution/pluginExecutions/lifecycleMappingMetadata/configuration/plugin/plugins/pluginManagement/build
/project2.2.2 kafka生产者线程方法 package org.example.util;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.*;/*** 向kafka生产数据** author i7杨* date 2024/01/12 13:02:29*/public class KafkaProducerUtil extends Thread {private String topic;public KafkaProducerUtil(String topic) {super();this.topic topic;}private static ProducerString, String createProducer() {// 通过Properties类设置Producer的属性Properties properties new Properties();
// 测试环境 kafka 配置properties.put(bootstrap.servers, ip2:9092,ip:9092,ip3:9092);properties.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);properties.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);return new KafkaProducerString, String(properties);}Overridepublic void run() {ProducerString, String producer createProducer();Random random new Random();Random random2 new Random();while (true) {int nums random.nextInt(10);int nums2 random.nextInt(50);
// double nums2 random2.nextDouble();String time new Date().getTime() / 1000 5 ;String type pv;try {if (nums2 % 2 0) {type pv;} else {type uv;}
// String info {\user\: nums ,\item\: nums * 10 ,\category\: nums2 ,\pv\: nums2 * 5 ,\ts\:\ time \};String info nums nums2;System.out.println(message : info);producer.send(new ProducerRecordString, String(this.topic, info));} catch (Exception e) {e.printStackTrace();}System.out.println(数据已经写入);try {sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {new KafkaProducerUtil(test01).run();}public static void sendMessage(String topic, String message) {ProducerString, String producer createProducer();producer.send(new ProducerRecordString, String(topic, message));}}2.2.3 自定义基础函数
这里自定义了filter和map两个算子函数测试逻辑按照数据结构变化
自定义FilterFunction函数算子阈值小于40的过滤掉
package org.example.funtion;import org.apache.flink.api.common.functions.FilterFunction;/*** FilterFunction重构** author i7杨* date 2024/01/12 13:02:29*/public class InfoFilterFunction implements FilterFunctionString {private double threshold;public InfoFilterFunction(double threshold) {this.threshold threshold;}Overridepublic boolean filter(String value) throws Exception {if (value.split().length 2)// 阈值过滤return Double.valueOf(value.split()[1]) threshold;else return false;}
}自定义MapFunction函数后缀为2的添加上特殊信息
package org.example.funtion;import org.apache.flink.api.common.functions.MapFunction;public class ActionMapFunction implements MapFunctionString, String {Overridepublic String map(String value) throws Exception {System.out.println(value: value);if (value.endsWith(2))return value.concat(:Special processing information);else return value;}
}
2.2.4 flink任务代码
任务逻辑使用kafka工具产生数据然后监控kafka的topic讲几个函数串起来输出结果
package org.example.service;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.funtion.ActionMapFunction;
import org.example.funtion.InfoFilterFunction;import java.util.*;public class FlinkTestDemo {public static void main(String[] args) throws Exception {// 设置执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// Kafka 配置Properties kafkaProps new Properties();kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip1:9092,ip2:9092,ip3:9092);kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, flink-consumer-group);kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);// 创建 Kafka 消费者FlinkKafkaConsumerString kafkaConsumer new FlinkKafkaConsumer(test01,// Kafka 主题名称new SimpleStringSchema(),kafkaProps);// 从 Kafka 中读取数据流DataStreamString kafkaStream env.addSource(kafkaConsumer);env.disableOperatorChaining();kafkaStream.filter(new InfoFilterFunction(40)).map(new ActionMapFunction()).print(阈值大于40以上的message);// 执行任务env.execute(This is a testing task);}}运行结果