黄埔营销型网站建设,中国风风格网站模板,dede 网站标题,欧美网站特点5、DataStream API#xff08;基础篇#xff09;
Flink 有非常灵活的分层 API 设计#xff0c;其中的核心层就是 DataStream/DataSet API。由于新版本已经实现了流批一体#xff0c;DataSet API 将被弃用#xff0c;官方推荐统一使用 DataStream API 处理流数据和批数据。…5、DataStream API基础篇
Flink 有非常灵活的分层 API 设计其中的核心层就是 DataStream/DataSet API。由于新版本已经实现了流批一体DataSet API 将被弃用官方推荐统一使用 DataStream API 处理流数据和批数据。
DataStream数据流本身是 Flink 中一个用来表示数据集合的类Class我们编写的Flink 代码其实就是基于这种数据类型的处理所以这套核心 API 就以 DataStream 命名。对于批处理和流处理我们都可以用这同一套 API 来实现。
一个 Flink 程序其实就是对 DataStream 的各种转换。具体来说代码基本上都由以下几部分构成如下图所示 获取执行环境execution environment读取数据源source定义基于数据的转换操作transformations定义计算结果的输出位置sink触发程序执行execute
其中获取环境和触发执行都可以认为是针对执行环境的操作。
5.1、执行环境Execution Environment
Flink 程序可以在各种上下文环境中运行我们可以在本地 JVM 中执行程序也可以提交到远程集群上运行。不同的环境代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时首先必须获取当前 Flink 的运行环境从而建立起与 Flink 框架之间的联系。只有获取了环境上下文信息才能将具体的任务调度到不同的 TaskManager 执行。
5.1.1、创建执行环境
我们要获取的执行环境是StreamExecutionEnvironment 类的对象在代码中创建执行环境的方式就是调用这个类的静态方法具体有以下三种
getExecutionEnvironment 最简单的方式就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文直接得到正确的结果如果程序是独立运行的就返回一个本地执行环境如果是创建了 jar包然后从命令行调用它并提交到集群执行那么就返回集群的执行环境。也就是说这个方法会根据当前运行的方式自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();这种“智能”的方式不需要我们额外做判断用起来简单高效是最常用的一种创建执行环境的方式。
createLocalEnvironment 这个方法返回一个本地执行环境。可以在调用时传入一个参数指定默认的并行度如果不传入则默认并行度就是本地的 CPU 核心数。
StreamExecutionEnvironment localEnv treamExecutionEnvironment.createLocalEnvironment();createRemoteEnvironment 这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号并指定要在集群中运行的 Jar 包。
StreamExecutionEnvironment remoteEnv StreamExecutionEnvironment.createRemoteEnvironment(host, // JobManager 主机名1234, // JobManager 进程端口号path/to/jarFile.jar // 提交给 JobManager 的 JAR 包
); 在获取到程序执行环境后我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链还可以定义程序的时间语义、配置容错机制。
5.1.2、执行模式(Execution Mode)
从 1.12.0 版本起Flink 实现了 API 上的流批统一。DataStream API 新增了一个重要特性可以支持不同的“执行模式”execution mode通过简单的设置就可以让一段 Flink 程序在流处理和批处理之间切换。
流执行模式STREAMING 这是 DataStream API 最经典的模式一般用于需要持续实时处理的无界数据流。默认情况下程序使用的就是 STREAMING 执行模式。批执行模式BATCH 专门用于批处理的执行模式, 这种模式下Flink 处理作业的方式类似于 MapReduce 框架。对于不会持续计算的有界数据我们用这种模式处理会更方便。自动模式AUTOMATIC 在这种模式下将由程序根据输入数据源是否有界来自动选择执行模式。
5.1.2.1、BATCH 模式的配置方法
由于 Flink 程序默认是 STREAMING 模式我们这里重点介绍一下 BATCH 模式的配置。主要有两种方式
通过命令行配置 bin/flink run -Dexecution.runtime-modeBATCH ...在提交作业时增加 execution.runtime-mode 参数指定值为 BATCH。
通过代码配置
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);在代码中直接基于执行环境调用 setRuntimeMode 方法传入 BATCH 模式。
建议: 不要在代码中配置而是使用命令行。这同设置并行度是类似的在提交作业时指定参数可以更加灵活同一段应用程序写好之后既可以用于批处理也可以用于流处理。
5.1.2.2、什么时候选择 BATCH 模式
用 BATCH 模式处理批量数据用 STREAMING 模式处理流式数据。因为数据有界的时候直接输出结果会更加高效而当数据无界的时候, 我们没得选择——只有 STREAMING 模式才能处理持续的数据流。
5.1.3、触发程序执行
有了执行环境我们就可以构建程序的处理流程了基于环境读取数据源进而进行各种转换操作最后输出结果到外部系统写完输出sink操作并不代表程序已经结束因为当 main()方法被调用时其实只是定义了作业的每个执行操作然后添加到数据流图中这时并没有真正处理数据因为数据可能还没来Flink 是由事件驱动的只有等到数据到来才会触发真正的计算这也被称为“延迟执行”或“懒执行”lazy execution。
所以我们需要显式地调用执行环境的 execute()方法来触发程序执行。execute()方法将一直等待作业完成然后返回一个执行结果JobExecutionResult。
env.execute();5.2、源算子Source Flink 可以从各种来源获取数据然后构建 DataStream 进行转换处理一般将数据的输入来源称为数据源(data source)而读取数据的算子就是源算子source operator所以source就是我们整个处理程序的输入端。
Flink 代码中通用的添加 source 的方式是调用执行环境的 addSource()方法
DataStreamString stream env.addSource(...);方法传入一个对象参数需要实现 SourceFunction 接口返回 DataStreamSource。这里的DataStreamSource 类继承自 SingleOutputStreamOperator 类又进一步继承自 DataStream。所以 很明显读取数据的 source 操作是一个算子得到的是一个数据流DataStream。
5.2.1、创建event
package com.song.wc.entity;
import java.sql.Timestamp;/*** 创建事件实体*/
public class Event {public String user;public String url;public Long timestamp;public Event() {}public Event(String user, String url, Long timestamp) {this.user user;this.url url;this.timestamp timestamp;}Overridepublic String toString() {return Event{ user user \ , url url \ , timestamp new Timestamp(timestamp) };}
}这里需要注意我们定义的 Event有这样几个特点
类是公有public的有一个无参的构造方法所有属性都是公有public的所有属性的类型都是可以序列化的
Flink 会把这样的类作为一种特殊的 POJO 数据类型来对待方便数据的解析和序列化另外我们在类中还重写了 toString 方法主要是为了测试输出显示更清晰。
5.2.2、从集合中读取数据
最简单的读取数据的方式就是在代码中直接创建一个 Java 集合然后调用执行环境的fromCollection 方法进行读取。这相当于将数据临时存储到内存中形成特殊的数据结构后作为数据源使用 一般用于测试
package com.song.wc;import com.song.wc.entity.Event;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class Test {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);ArrayListEvent clicks new ArrayList();clicks.add(new Event(Mary, ./home, 1000L));clicks.add(new Event(Bob, ./cart, 2000L));DataStreamEvent stream env.fromCollection(clicks);stream.print();env.execute();}
}
也可以不构建集合直接将元素列举出来调用 fromElements 方法进行读取数据 DataStreamSourceEvent stream2 env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));5.2.3、从文件读取数据
通常情况下我们会从存储介质中获取数据一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。
DataStreamString stream env.readTextFile(clicks.csv);说明:
参数可以是目录也可以是文件路径可以是相对路径也可以是绝对路径相对路径是从系统属性 user.dir 获取路径: idea 下是 project 的根目录, standalone 模式下是集群节点根目录也可以从 hdfs 目录下读取, 使用路径 hdfs://…, 由于 Flink 没有提供 hadoop 相关依赖, 需要 pom 中添加相关依赖:
dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion2.7.5/versionscopeprovided/scope
/dependency5.2.4、从 Socket 读取数据
读取 socket 文本流。但是这种方式由于吞吐量小、稳定性较差一般也是用于测试。
DataStreamString stream env.socketTextStream(localhost, 7777);5.2.5、从 Kafka 读取数据
Kafka 作为分布式消息传输队列是一个高吞吐、易于扩展的消息系统。而消息队列的传输方式恰恰和流处理是完全一致的。所以可以说 Kafka 和 Flink 天生一对是当前处理流式数据的双子星。
在如今的实时流处理应用中由 Kafka 进行数据的收集和传输Flink 进行分析计算这样的架构已经成为众多企业的首选如下图所示。 略微遗憾的是与 Kafka 的连接比较复杂Flink 内部并没有提供预实现的方法。所以我们只能采用通用的 addSource 方式、实现一个 SourceFunction 了Flink官方提供了连接工具flink-connector-kafka直接帮我们实现了一个消费者 FlinkKafkaConsumer它就是用来读取 Kafka 数据的SourceFunction。
所以想要以 Kafka 作为数据源获取数据我们只需要引入 Kafka 连接器的依赖。Flink 官方提供的是一个通用的 Kafka 连接器它会自动跟踪最新版本的 Kafka 客户端。目前最新版本只支持 0.10.0 版本以上的 Kafka读者使用时可以根据自己安装的 Kafka 版本选定连接器的依赖版本。这里我们需要导入的依赖如下。
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency然后调用 env.addSource()传入 FlinkKafkaConsumer 的对象实例就可以了。
package com.song.wc;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class Test {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties new Properties();properties.setProperty(bootstrap.servers, hadoop102:9092);properties.setProperty(group.id, consumer-group);properties.setProperty(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);properties.setProperty(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);properties.setProperty(auto.offset.reset, latest);DataStreamSourceString stream env.addSource(newFlinkKafkaConsumerString(clicks,new SimpleStringSchema(),properties));stream.print(Kafka);env.execute();}
}
创建 FlinkKafkaConsumer 时需要传入三个参数
第一个参数 topic定义了从哪些主题中读取数据。可以是一个 topic也可以是 topic列表还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据时Kafka 连接器将会处理所有 topic 的分区将这些分区的数据放到一条流中去。第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消息被存储为原始的字节数据所以需要反序列化成 Java 或者 Scala 对象。上面代码中使用的 SimpleStringSchema是一个内置的 DeserializationSchema它只是将字节数组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口所以我们也可以自定义反序列化逻辑。第三个参数是一个 Properties 对象设置了 Kafka 客户端的一些属性
5.2.6、自定义 Source
大多数情况下前面的数据源已经能够满足需要。但是凡事总有例外如果遇到特殊情况我们想要读取的数据源来自某个外部系统而 flink 既没有预实现的方法、也没有提供连接器那就只好自定义实现 SourceFunction 了。
接下来我们创建一个自定义的数据源实现 SourceFunction 接口。主要重写两个关键方法run()和 cancel()
run()方法使用运行时上下文对象SourceContext向下游发送数据cancel()方法通过标识位控制退出循环来达到中断数据源的效果。
package com.song.wc.customersource;import com.song.wc.entity.Event;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Calendar;
import java.util.Random;public class ClickSource implements SourceFunctionEvent {// 声明一个布尔变量作为控制数据生成的标识位private Boolean running true;Overridepublic void run(SourceContextEvent ctx) throws Exception {// 在指定的数据集中随机选取数据Random random new Random();String[] users {Mary, Alice, Bob, Cary};String[] urls {./home, ./cart, ./fav, ./prod?id1,./prod?id2};while (running) {ctx.collect(new Event(users[random.nextInt(users.length)],urls[random.nextInt(urls.length)],Calendar.getInstance().getTimeInMillis()));// 隔 1 秒生成一个点击事件方便观测Thread.sleep(1000);}}Overridepublic void cancel() {running false;}
}有了自定义的 source function接下来只要调用 addSource()就可以了
package com.song.wc.customersource;import com.song.wc.entity.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceCustom {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//有了自定义的 source function调用 addSource 方法DataStreamSourceEvent stream env.addSource(new ClickSource());stream.print(SourceCustom);env.execute();}
}这里要注意的是 SourceFunction 接口定义的数据源并行度只能设置为 1如果数据源设置为大于 1 的并行度则会抛出异常。
package com.song.wc.customersource;import com.song.wc.entity.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceCustom {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);//有了自定义的 source function调用 addSource 方法
// DataStreamSourceEvent stream env.addSource(new ClickSource());env.addSource(new ClickSource()).setParallelism(2).print();env.execute();}
}我们想要自定义并行的数据源的话需要使用 ParallelSourceFunction示例程序如下
package com.song.wc.customersource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;import java.util.Random;public class ParallelSourceExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new CustomSource()).setParallelism(2).print();env.execute();}public static class CustomSource implements ParallelSourceFunctionInteger {private boolean running true;private Random random new Random();Overridepublic void run(SourceContextInteger sourceContext) throws Exception {while (running) {sourceContext.collect(random.nextInt());}}Overridepublic void cancel() {running false;}}
}5.2.7、Flink 支持的数据类型
5.2.7.1、Flink 的类型系统
为什么会出现“不支持”的数据类型呢因为 Flink 作为一个分布式处理框架处理的是以数据对象作为元素的流要分布式地处理这些数据就不可避免地要面对数据的网络传输、状态的落盘和故障恢复等问题这就需要对数据进行序列化和反序列化。
为了方便地处理数据Flink 有自己一整套类型系统。 Flink 使用“类型信息”TypeInformation来统一表示数据类型。 TypeInformation 类是 Flink 中所有类型描述符的基类。它涵盖了类型的一些基本属性并为每个数据类型生成特定的序列化器、反序列化器和比较器。
5.2.7.2、Flink 支持的数据类型
简单来说对于常见的 Java 和 Scala 数据类型Flink 都是支持的。Flink 在内部Flink对支持不同的类型进行了划分这些类型可以在 Types 工具类中找到
基本类型 所有 Java 基本类型及其包装类再加上 Void、String、Date、BigDecimal 和 BigInteger。数组类型 包括基本类型数组PRIMITIVE_ARRAY和对象数组(OBJECT_ARRAY)复合数据类型 Java 元组类型TUPLE这是 Flink 内置的元组类型是 Java API 的一部分。最多25 个字段也就是从 Tuple0~Tuple25不支持空字段Scala 样例类及 Scala 元组不支持空字段行类型ROW可以认为是具有任意个字段的元组,并支持空字段POJOFlink 自定义的类似于 Java bean 模式的类 辅助类型 Option、Either、List、Map 等泛型类型GENERIC Flink 支持所有的 Java 类和 Scala 类。不过如果没有按照上面 POJO 类型的要求来定义就会被 Flink 当作泛型类来处理。Flink 会把泛型类型当作黑盒无法获取它们内部的属性它们也不是由 Flink 本身序列化的而是由 Kryo 序列化的。在这些类型中元组类型和 POJO 类型最为灵活因为它们支持创建复杂类型。而相比之下POJO 还支持在键key的定义中直接使用字段名这会让我们的代码可读性大大增加。所以在项目实践中往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型。Flink 对 POJO 类型的要求如下 类是公共的public和独立的standalone也就是说没有非静态的内部类类有一个公共的无参构造方法类中的所有字段是 public 且非 final 的或者有一个公共的 getter 和 setter 方法这些方法需要符合 Java bean 的命名规范。
5.2.7.3、类型提示Type Hints
Flink 还具有一个类型提取系统可以分析函数的输入和返回类型自动获取类型信息从而获得对应的序列化器和反序列化器。但是由于 Java 中泛型擦除的存在在某些特殊情况下比如 Lambda 表达式中自动提取的信息是不够精细的——只告诉 Flink 当前的元素由“船头、船身、船尾”构成根本无法重建出“大船”的模样这时就需要显式地提供类型信息才能使应用程序正常工作或提高其性能。
为了解决这类问题Java API 提供了专门的“类型提示”type hints。之前的 wordCount 流处理程序我们在将 String 类型的每个词转换成wordcount二元组后就明确地用 returns 指定了返回的类型。因为对于 map 里传入的 Lambda 表达式系统只能推断出返回的是 Tuple2 类型而无法得到 Tuple2String, Long。只有显式地告诉系统当前的返回类型才能正确地解析出完整数据。
.map(word - Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));这是一种比较简单的场景二元组的两个元素都是基本数据类型。那如果元组中的一个元素又有泛型该怎么处理呢
Flink 专门提供了 TypeHint 类它可以捕获泛型的类型信息并且一直记录下来为运行时提供足够的信息。我们同样可以通过.returns()方法明确地指定转换之后的 DataStream 里元素的类型。
returns(new TypeHintTuple2Integer, SomeType(){})5.3、转换算子Transformation 数据源读入数据之后我们就可以使用各种转换算子将一个或多个 DataStream 转换为新的 DataStream如上图 所示。一个 Flink 程序的核心其实就是所有的转换操作它们决定了处理的业务逻辑。
5.3.1、基本转换算子
5.3.1.1、映射map
map 主要用于将数据流中的数据进行转换形成新的数据流。简单来说就是一个“一一映射”消费一个元素就产出一个元素如下图所示。 我们只需要基于 DataStrema 调用 map()方法就可以进行转换处理。方法需要传入的参数是接口 MapFunction 的实现返回值类型还是 DataStream不过泛型流中的元素类型可能改变。
下面的代码用不同的方式实现了提取 Event 中的 user 字段的功能。
package com.song.wc;
import com.song.wc.entity.Event;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Test {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));// 传入匿名类实现 MapFunctionstream.map(new MapFunctionEvent, String() {Overridepublic String map(Event e) throws Exception {return e.user;}});// 传入 MapFunction 的实现类stream.map(new UserExtractor()).print();env.execute();}public static class UserExtractor implements MapFunctionEvent, String {Overridepublic String map(Event e) throws Exception {return e.user;}}
}
上面代码中MapFunction 实现类的泛型类型与输入数据类型和输出数据的类型有关。在实现 MapFunction 接口的时候需要指定两个泛型分别是输入事件和输出事件的类型还需要重写一个 map()方法定义从一个输入事件转换为另一个输出事件的具体逻辑。另外通过查看 Flink 源码可以发现基于 DataStream 调用 map 方法返回的其实是一个 SingleOutputStreamOperator。
public R SingleOutputStreamOperatorR map(MapFunctionT, R mapper){}这表示 map 是一个用户可以自定义的转换transformation算子它作用于一条数据流上转换处理的结果是一个确定的输出类型。当然SingleOutputStreamOperator 类本身也继承自 DataStream 类所以说 map 是将一个 DataStream 转换成另一个 DataStream 是完全正确的。
5.3.1.2、过滤filter
filter 转换操作顾名思义是对数据流执行一个过滤通过一个布尔条件表达式设置过滤条件对于每一个流内元素进行判断若为 true 则元素正常输出若为 false 则元素被过滤掉如下图所示。 进行 filter 转换之后的新数据流的数据类型与原数据流是相同的。filter 转换需要传入的参数需要实现 FilterFunction 接口而 FilterFunction 内要实现 filter()方法就相当于一个返回布尔类型的条件表达式。
下面的代码会将数据流中用户 Mary 的浏览行为过滤出来 。
package com.song.wc;import com.song.wc.entity.Event;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransFilterTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));// 传入匿名类实现 FilterFunctionstream.filter(new FilterFunctionEvent() {Overridepublic boolean filter(Event e) throws Exception {return e.user.equals(Mary);}});// 传入 FilterFunction 实现类stream.filter(new UserFilter()).print();env.execute();}public static class UserFilter implements FilterFunctionEvent {Overridepublic boolean filter(Event e) throws Exception {return e.user.equals(Mary);}}
}5.3.1.3、扁平映射flatMap
flatMap 操作又称为扁平映射主要是将数据流中的整体一般是集合类型拆分成一个一个的个体使用。消费一个元素可以产生 0 到多个元素。flatMap 可以认为是“扁平化”flatten和“映射”map两步操作的结合也就是先按照某种规则对数据进行打散拆分再对拆分后的元素做转换处理如下图所示此前 WordCount 程序的第一步分词操作就用到了flatMap。 同 map 一样flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式来进行传参返回值类型取决于所传参数的具体逻辑可以与原数据流相同也可以不同。
flatMap 操作会应用在每一个输入事件上面FlatMapFunction 接口中定义了 flatMap 方法用户可以重写这个方法在这个方法中对输入数据进行处理并决定是返回 0 个、1 个或多个结果数据。
因此 flatMap 并没有直接定义返回值类型而是通过一个“收集器”Collector来指定输出。希望输出结果时只要调用收集器.collect()方法就可以了这个方法可以多次调用也可以不调用。所以 flatMap 方法也可以实现 map 方法和 filter 方法的功能当返回结果是 0 个的时候就相当于对数据进行了过滤当返回结果是 1 个的时候相当于对数据进行了简单的转换操作。
package com.song.wc;import com.song.wc.entity.Event;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class TransFilterTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));stream.flatMap(new MyFlatMap()).print();env.execute();}public static class MyFlatMap implements FlatMapFunctionEvent, String {Overridepublic void flatMap(Event value, CollectorString out) throws Exception {if (value.user.equals(Mary)) {out.collect(value.user);} else if (value.user.equals(Bob)) {out.collect(value.user);out.collect(value.url);}}}
}5.3.2、聚合算子Aggregation
5.3.2.1、按键分区keyBy
对于 Flink 而言DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理这样才能提高效率。所以在 Flink 中要做聚合需要先进行分区这个操作就是通过 keyBy 来完成的。
keyBy 是聚合前必须要用到的一个算子keyBy 通过指定键key可以将一条流从逻辑上划分成不同的分区partitions。这里所说的分区其实就是并行处理的子任务也就对应着任务槽task slot。 基于不同的 key流中的数据将被分配到不同的分区中去如下图所示这样一来所有具有相同的 key 的数据都将被发往同一个分区那么下一步算子操作就将会在同一个 slot中进行处理了。 在内部是通过计算 key 的哈希值hash code对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话必须要重写 hashCode()方法。
keyBy()方法需要传入一个参数这个参数指定了一个或一组 key。有很多不同的方法来指定 key比如对于 Tuple 数据类型可以指定字段的位置或者多个位置的组合对于 POJO 类型可以指定字段的名称String另外还可以传入 Lambda 表达式或者实现一个键选择器KeySelector用于说明从数据中提取 key 的逻辑。
我们可以以 id 作为 key 做一个分区操作代码实现如下
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));// 使用 Lambda 表达式KeyedStreamEvent, String keyedStream stream.keyBy(e - e.user);// 使用匿名类实现 KeySelectorKeyedStreamEvent, String keyedStream1 stream.keyBy(new KeySelectorEvent, String() {Overridepublic String getKey(Event e) throws Exception {return e.user;}});env.execute();}需要注意的是keyBy 得到的结果将不再是 DataStream而是会将 DataStream 转换为KeyedStream。
KeyedStream 可以认为是“分区流”或者“键控流”它是对 DataStream 按照key 的一个逻辑分区所以泛型有两个类型除去当前流中的元素类型外还需要指定 key 的类型。
KeyedStream 也继承自 DataStream所以基于它的操作也都归属于 DataStream API。但它跟之前的转换操作得到的 SingleOutputStreamOperator 不同只是一个流的分区操作并不是一个转换算子。
KeyedStream 是一个非常重要的数据结构只有基于它才可以做后续的聚合操作比如 sumreduce而且它可以将当前算子任务的状态state也按照 key 进行划分、限定为仅对当前 key 有效。
5.3.2.2、简单聚合
Flink 为我们内置实现了一些最基本、最简单的聚合 API主要有以下几种
sum()在输入流上对指定的字段做叠加求和的操作。min()在输入流上对指定的字段求最小值。max()在输入流上对指定的字段求最大值。minBy()与 min()类似在输入流上针对指定字段求最小值。不同的是min()只计算指定字段的最小值其他字段会保留最初第一个数据的值而 minBy()则会返回包含字段最小值的整条数据。maxBy()与 max()类似在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。
这些聚合方法调用时也需要传入参数但并不像基本转换算子那样需要实现自定义函数只要说明聚合指定的字段就可以了。指定字段的方式有两种
指定位置指定名称
对于元组类型的数据同样也可以使用这两种方式来指定字段。需要注意的是元组中字段的名称是以 f0、f1、f2、…来命名的。
package com.song.wc;import com.song.wc.entity.Event;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransFilterTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceTuple2String, Integer stream env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 3),Tuple2.of(b, 3),Tuple2.of(b, 4));stream.keyBy(r - r.f0).sum(1).print();stream.keyBy(r - r.f0).sum(f1).print();stream.keyBy(r - r.f0).max(1).print();stream.keyBy(r - r.f0).max(f1).print();System.out.println();stream.keyBy(r - r.f0).min(1).print();stream.keyBy(r - r.f0).min(f1).print();stream.keyBy(r - r.f0).maxBy(1).print();stream.keyBy(r - r.f0).maxBy(f1).print();stream.keyBy(r - r.f0).minBy(1).print();stream.keyBy(r - r.f0).minBy(f1).print();env.execute();}
}而如果数据流的类型是 POJO 类那么就只能通过字段名称来指定不能通过位置来指定了。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));stream.keyBy(e - e.user).max(timestamp).print(); // 指定字段名称env.execute();}简单聚合算子返回的同样是一个 SingleOutputStreamOperator也就是从 KeyedStream 又转换成了常规的 DataStream。所以可以这样理解keyBy 和聚合是成对出现的先分区、后聚合得到的依然是一个 DataStream。而且经过简单聚合之后的数据流元素的数据类型保持不变。
一个聚合算子会为每一个key保存一个聚合的值在Flink中我们把它叫作“状态”state。所以每当有一个新的数据输入算子就会更新保存的聚合结果并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说这些状态是永远不会被清除的所以我们使用聚合算子应该只用在含有有限个 key 的数据流上。
5.3.2.3、归约聚合reduce
reduce 算子就是一个一般化的聚合统计操作了它可以对已有的数据进行归约处理把每一个新输入的数据和当前已经归约出来的值再做一个聚合计算。
与简单聚合类似reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型所以输出类型和输入类型是一样的。调用 KeyedStream 的 reduce 方法时需要传入一个参数实现 ReduceFunction 接口。接口在源码中的定义如下
public interface ReduceFunctionT extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}ReduceFunction 接口里需要实现 reduce()方法这个方法接收两个输入事件经过转换处理之后输出一个相同类型的事件
所以对于一组数据我们可以先取两个进行合并然后再将合并的结果看作一个数据、再跟后面的数据合并最终会将它“简化”成唯一的一个数据这也就是 reduce“归约”的含义。
在流处理的底层实现过程中实际上是将中间“合并的结果”作为任务的一个状态保存起来的之后每来一个新的数据就和之前的聚合状态进一步做归约。其实reduce 的语义是针对列表进行规约操作运算规则由 ReduceFunction 中的 reduce方法来定义而在 ReduceFunction 内部会维护一个初始值为空的累加器注意累加器的类型和输入元素的类型相同当第一条元素到来时累加器的值更新为第一条元素的值当新的元素到来时新元素会和累加器进行累加操作这里的累加操作就是 reduce 函数定义的运算规则。然后将更新以后的累加器的值向下游输出。
我们可以单独定义一个函数类实现 ReduceFunction 接口也可以直接传入一个匿名类。当然同样也可以通过传入 Lambda 表达式实现类似的功能。与简单聚合类似reduce 操作也会将 KeyedStream 转换为 DataStrema。它不会改变流的元素数据类型所以输出类型和输入类型是一样的。
下面案例我们将数据流按照用户 id 进行分区然后用一个 reduce 算子实现 sum 的功能统计每个 用户访问的频次进而将所有统计结果分到一组用另一个 reduce 算子实现 maxBy 的功能记录所有用户中访问频次最高的那个也就是当前访问量最大的用户是谁。
package com.song.wc;import com.song.wc.customersource.ClickSource;
import com.song.wc.entity.Event;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Test {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 这里的 ClickSource()使用了之前自定义数据源小节中的 ClickSource()env.addSource(new ClickSource())// 将 Event 数据类型转换成元组类型.map(new MapFunctionEvent, Tuple2String, Long() {Overridepublic Tuple2String, Long map(Event e) throws Exception {return Tuple2.of(e.user, 1L);}}).keyBy(r - r.f0) // 使用用户名来进行分流.reduce(new ReduceFunctionTuple2String, Long() {Overridepublic Tuple2String, Long reduce(Tuple2String, Long value1,Tuple2String, Long value2) throws Exception {// 每到一条数据用户 pv 的统计值加 1return Tuple2.of(value1.f0, value1.f1 value2.f1);}}).keyBy(r - true) // 为每一条数据分配同一个 key将聚合结果发送到一条流中去.reduce(new ReduceFunctionTuple2String, Long() {Overridepublic Tuple2String, Long reduce(Tuple2String, Long value1,Tuple2String, Long value2) throws Exception {// 将累加器更新为当前最大的 pv 统计值然后向下游发送累加器的值return value1.f1 value2.f1 ? value1 : value2;}}).print();env.execute();}
}reduce 同简单聚合算子一样也要针对每一个 key 保存状态。因为状态不会清空所以需要将 reduce 算子作用在一个有限 key 的流上。
5.3.3、用户自定义函数UDF
5.3.3.1、 函数类Function Classes
用户自定义函数UDF最简单直接的方式就是自定义一个函数类实现对应的接口来完成处理逻辑的定义。
下面例子实现了 FilterFunction 接口用来筛选 url 中包含“home”的事件
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent clicks env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));DataStreamEvent stream clicks.filter(new FlinkFilter());stream.print();env.execute();}public static class FlinkFilter implements FilterFunctionEvent {Overridepublic boolean filter(Event value) throws Exception {return value.url.contains(home);}}当然还可以通过匿名类来实现 FilterFunction 接口
DataStreamString stream clicks.filter(new FilterFunctionEvent() {Overridepublic boolean filter(Event value) throws Exception {return value.url.contains(home);}
});为了类可以更加通用我们还可以将用于过滤的关键字home抽象出来作为类的属性调用构造方法时传进去。
DataStreamEvent stream clicks.filter(new KeyWordFilter(home));
public static class KeyWordFilter implements FilterFunctionEvent {private String keyWord;KeyWordFilter(String keyWord) { this.keyWord keyWord; }Overridepublic boolean filter(Event value) throws Exception {return value.url.contains(this.keyWord);}
}5.3.3.2、 匿名函数Lambda
Flink 的所有算子都可以使用 Lambda 表达式的方式来进行编码但是当 Lambda 表达式使用 Java 的泛型时我们需要显式的声明类型信息。
下例演示了如何使用 Lambda 表达式来实现一个简单的 map() 函数我们使用 Lambda 表达式来计算输入的平方。不需要声明 map() 函数的输入 i 和输出参数的数据类型因为 Java 编译器会对它们做出类型推断。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent clicks env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));//map 函数使用 Lambda 表达式返回简单类型不需要进行类型声明DataStreamString stream1 clicks.map(event - event.url);stream1.print();env.execute();}由于 OUT 是 String 类型而不是泛型所以 Flink 可以从函数签名 OUT map(IN value) 的实现中自动提取出结果的类型信息。
但是对于像 flatMap() 这样的函数它的函数签名 void flatMap(IN value, CollectorOUT out) 被 Java 编译器编译成了 void flatMap(IN value, Collector out)也就是说将 Collector 的泛型信息擦除掉了。这样 Flink 就无法自动推断输出的类型信息了。
// flatMap 使用 Lambda 表达式抛出异常
DataStreamString stream2 clicks.flatMap((event, out) - {
out.collect(event.url);
});
stream2.print();如果执行程序Flink 会抛出如下异常 在这种情况下我们需要显式地指定类型信息否则输出将被视为 Object 类型这会导致低效的序列化。 DataStreamString stream2 clicks.flatMap((Event event, CollectorStringout) - {out.collect(event.url);}).returns(Types.STRING);stream2.print();当使用 map() 函数返回 Flink 自定义的元组类型时也会发生类似的问题。下例中的函数签名 Tuple2String, Long map(Event value) 被类型擦除为 Tuple2 map(Event value)。
//使用 map 函数也会出现类似问题以下代码会报错
DataStreamTuple2String, Long stream3 clicks.map( event - Tuple2.of(event.user, 1L) );
stream3.print();一般来说这个问题可以通过多种方式解决
package com.song.wc;import com.song.wc.customersource.ClickSource;
import com.song.wc.entity.Event;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Test {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent clicks env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L));// 想要转换成二元组类型需要进行以下处理// 1) 使用显式的 .returns(...)DataStreamTuple2String, Long stream3 clicks.map(event - Tuple2.of(event.user, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));stream3.print();// 2) 使用类来替代 Lambda 表达式clicks.map(new MyTuple2Mapper()).print();// 3) 使用匿名类来代替 Lambda 表达式clicks.map(new MapFunctionEvent, Tuple2String, Long() {Overridepublic Tuple2String, Long map(Event value) throws Exception {return Tuple2.of(value.user, 1L);}}).print();env.execute();}// 自定义 MapFunction 的实现类public static class MyTuple2Mapper implements MapFunctionEvent, Tuple2String,Long {Overridepublic Tuple2String, Long map(Event value) throws Exception {return Tuple2.of(value.user, 1L);}}
}这些方法对于其它泛型擦除的场景同样适用。
5.3.3.3、 富函数类Rich Function Classes
“富函数类”也是 DataStream API 提供的一个函数类的接口所有的 Flink 函数类都有其Rich 版本。富函数类一般是以抽象类的形式出现的。例如RichMapFunction、RichFilterFunction、RichReduceFunction 等。
既然“富”那么它一定会比常规的函数类提供更多、更丰富的功能。与常规函数类的不同主要在于富函数类可以获取运行环境的上下文并拥有一些生命周期方法所以可以实现更复杂的功能。
Rich Function 有生命周期的概念。典型的生命周期方法有
open()方法是 Rich Function 的初始化方法也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前open()会首先被调用。所以像文件 IO 的创建数据库连接的创建配置文件的读取等等这样一次性的工作都适合在 open()方法中完成。close()方法是生命周期中的最后一个调用的方法类似于解构方法。一般用来做一些清理工作。需要注意的是这里的生命周期方法对于一个并行子任务来说只会调用一次而对应的实际工作方法例如 RichMapFunction 中的 map()在每条数据到来后都会触发一次调用。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceEvent clicks env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L),new Event(Alice, ./prod?id1, 5 * 1000L),new Event(Cary, ./home, 60 * 1000L));// 将点击事件转换成长整型的时间戳输出clicks.map(new RichMapFunctionEvent, Long() {Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println( 索 引 为 getRuntimeContext().getIndexOfThisSubtask() 的任务开始);}Overridepublic Long map(Event value) throws Exception {return value.timestamp;}Overridepublic void close() throws Exception {super.close();System.out.println( 索 引 为 getRuntimeContext().getIndexOfThisSubtask() 的任务结束);}}).print();env.execute();}输出结果是
索引为 0 的任务开始
索引为 1 的任务开始
1 1000
2 2000
2 60000
1 5000
索引为 0 的任务结束
索引为 1 的任务结束另外富函数类提供了 getRuntimeContext()方法可以获取到运行时上下文的一些信息例如程序执行的并行度任务名称以及状态state。这使得我们可以大大扩展程序的功能特别是对于状态的操作使得 Flink 中的算子具备了处理复杂业务的能力。
5.3.4、物理分区Physical Partitioning
“分区”partitioning操作就是要将数据进行重新分布传递到不同的流分区去进行下一步处理。
keyBy它就是一种按照键的哈希值来进行重新分区的操作。只不过这种分区操作只能保证把数据按key“分开”至于分得均不均匀、每个 key 的数据具体会分到哪一区去这些是完全无从控制的——所以我们有时也说keyBy 是一种逻辑分区logical partitioning操作。
如果说 keyBy 这种逻辑分区是一种“软分区”那真正硬核的分区就应该是所谓的“物理分区”physical partitioning。也就是我们要真正控制分区策略精准地调配数据告诉每个数据到底去哪里。
其实这种分区方式在一些情况下已经在发生了例如我们编写的程序可能对多个处理任务设置了不同的并行度那么当数据执行的上下游任务并行度变化时数据就不应该还在当前分区以直通forward方式传输了——因为如果并行度变小当前分区可能没有下游任务了而如果并行度变大所有数据还在原先的分区处理就会导致资源的浪费。所以这种情况下系统会自动地将数据均匀地发往下游所有的并行任务保证各个分区的负载均衡。
有些时候我们还需要手动控制数据分区分配策略。比如当发生数据倾斜的时候系统无法自动调整这时就需要我们重新进行负载均衡将数据流较为平均地发送到下游任务操作分区中去。
Flink 对于经过转换操作之后的 DataStream提供了一系列的底层操作接口能够帮我们实现数据流的手动重分区。为了同 keyBy 相区别我们把这些操作统称为“物理分区”操作。物理分区与 keyBy 另一大区别在于keyBy 之后得到的是一个 KeyedStream而物理分区之后结果仍是 DataStream且流中元素数据类型保持不变。从这一点也可以看出分区算子并不对数据进行转换处理只是定义了数据的传输方式。
常见的物理分区策略有
随机分配Random轮询分配Round-Robin重缩放Rescale广播Broadcast
5.3.4.1、随机分区shuffle
最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的.shuffle()方法将数据随机地分配到下游算子的并行任务中去。
随机分区服从均匀分布uniform distribution所以可以把流中的数据随机打乱均匀地传递到下游任务分区如下图所示。因为是完全随机的所以对于同样的输入数据, 每次执行得到的结果也不会相同。 经过随机分区之后得到的依然是一个 DataStream。 我们可以做个简单测试将数据读入之后直接打印到控制台将输出的并行度设置为 4中间经历一次 shuffle。执行多次观察结果是否相同。 public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源并行度为 1DataStreamSourceEvent stream env.addSource(new ClickSource());// 经洗牌后打印输出并行度为 4stream.shuffle().print(shuffle).setParallelism(4);env.execute();}5.3.4.2、轮询分区Round-Robin
轮询也是一种常见的重分区方式。简单来说就是“发牌”按照先后顺序将数据做依次分发如下图所示。通过调用 DataStream 的.rebalance()方法就可以实现轮询重分区。
rebalance使用的是 Round-Robin 负载均衡算法可以将输入流数据平均分配到下游的并行任务中去。 public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源并行度为 1DataStreamSourceEvent stream env.addSource(new ClickSource());// 经轮询重分区后打印输出并行度为 4stream.rebalance().print(rebalance).setParallelism(4);env.execute();}输出结果的形式如下所示可以看到数据被平均分配到所有并行任务中去了。
5.3.4.3、重缩放分区rescale
重缩放分区和轮询分区非常相似。当调用 rescale()方法时其实底层也是使用 Round-Robin算法进行轮询但是只会将数据轮询发送到下游并行任务的一部分中如下图所示。也就是说“发牌人”如果有多个那么 rebalance 的方式是每个发牌人都面向所有人发牌而 rescale的做法是分成小团体发牌人只给自己团体内的所有人轮流发牌。 当下游任务数据接收方的数量是上游任务数据发送方数量的整数倍时rescale的效率明显会更高。比如当上游任务数量是 2下游任务数量是 6 时上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中。
由于 rebalance 是所有分区数据的“重新平衡”当 TaskManager 数据量较多时这种跨节点的网络传输必然影响效率而如果我们配置的 task slot 数量合适用 rescale 的方式进行“局部重缩放”就可以让数据只在当前 TaskManager 的多个 slot 之间重新分配从而避免了网络传输带来的损耗。
从底层实现上看rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务发送数据方和所有下游任务接收数据方之间建立通信通道这是一个笛卡尔积的关系而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道节省了很多资源。
5.3.4.4、 广播broadcast
这种方式其实不应该叫做“重分区”因为经过广播之后数据会在不同的分区都保留一份可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法将输入数据复制并发送到下游算子的所有并行任务中去。 public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源并行度为 1DataStreamSourceEvent stream env.addSource(new ClickSource());// 经广播后打印输出并行度为 4stream.broadcast().print(broadcast).setParallelism(4);env.execute();}5.3.4.5、 自定义分区Custom
当Flink提供的所有分区策略都不能满足用户的需求时我们可以通过使用partitionCustom()方法来自定义分区策略。
在调用时方法需要传入两个参数第一个是自定义分区器Partitioner对象第二个是应用分区器的字段它的指定方式与 keyBy 指定 key 基本一样可以通过字段名称指定也可以通过字段位置索引来指定还可以实现一个 KeySelector。
例如我们可以对一组自然数按照奇偶性进行重分区。代码如下
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 将自然数按照奇偶分区env.fromElements(1, 2, 3, 4, 5, 6, 7, 8).partitionCustom(new PartitionerInteger() {Overridepublic int partition(Integer key, int numPartitions) {return key % 2;}}, new KeySelectorInteger, Integer() {Overridepublic Integer getKey(Integer value) throws Exception {return value;}}).print().setParallelism(2);env.execute();}5.4、输出算子Sink Flink 作为数据处理框架最终还是要把计算处理的结果写入外部存储为外部应用提供支持如上图 所示。
5.4.1、连接到外部系统
Flink 的 DataStream API 专门提供了向外部写入数据的方法addSink。与 addSource 类似addSink 方法对应着一个“Sink”算子主要就是用来实现与外部系统连接、并将数据提交写入的Flink 程序中所有对外的输出操作一般都是利用 Sink 算子完成的。
与 Source 算子非常类似除去一些 Flink 预实现的 Sink一般情况下 Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));addSource 的参数需要实现一个 SourceFunction 接口类似地addSink 方法同样需要传入一个参数实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用
default void invoke(IN value, Context context) throws Exception当然SinkFuntion 多数情况下同样并不需要我们自己实现。Flink 官方提供了一部分的框架的 Sink 连接器。如下图所示列出了 Flink 官方目前支持的第三方系统连接器 除 Flink 官方之外Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目也实现了一些其他第三方系统与 Flink 的连接器初次之外还可以自定义sink 连接器。 5.4.2、输出到文件
Flink 也有一些非常简单粗暴的输出到文件的预实现方法如 writeAsText()、writeAsCsv()可以直接将输出结果保存到文本文件或 Csv 文件。但是这种方式是不支持同时写入一份文件的所以我们往往会将最后的 Sink 操作并行度设为 1这就大大拖慢了系统效率而且对于故障恢复后的状态一致性也没有任何保证。所以目前这些简单的方法已经要被弃用。
Flink 为此专门提供了一个流式文件系统的连接器StreamingFileSink它继承自抽象类 RichSinkFunction而且集成了 Flink 的检查点checkpoint机制用来保证精确一次exactly once的一致性语义。
StreamingFileSink 为批处理和流处理提供了一个统一的 Sink它可以将分区文件写入 Flink支持的文件系统。它可以保证精确一次的状态一致性大大改进了之前流式文件 Sink 的方式。它的主要操作是将数据写入桶buckets每个桶中的数据都可以分割成一个个大小有限的分区文件这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作默认的分桶方式是基于时间的我们每小时写入一个新的桶。换句话说每个桶内保存的文件记录的都是 1 小时的输出数据。
StreamingFileSink 支持行编码Row-encoded和批量编码Bulk-encoded比如 Parquet格式。这两种不同的方式都有各自的构建器builder调用方法也非常简单可以直接调用StreamingFileSink 的静态方法
行编码StreamingFileSink.forRowFormatbasePathrowEncoder。批量编码StreamingFileSink.forBulkFormatbasePathbulkWriterFactory。 在创建行或批量编码 Sink 时我们需要传入两个参数用来指定存储桶的基本路径basePath和数据的编码逻辑rowEncoder 或 bulkWriterFactory。
下面我们就以行编码为例将一些测试数据直接写入文件 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);DataStreamSourceEvent stream env.fromElements(new Event(Mary,./home, 1000L),new Event(Bob, ./cart, 2000L),new Event(Alice, ./prod?id100, 3000L),new Event(Alice, ./prod?id200, 3500L),new Event(Bob, ./prod?id2, 2500L),new Event(Alice, ./prod?id300, 3600L),new Event(Bob, ./home, 3000L),new Event(Bob, ./prod?id1, 2300L),new Event(Bob, ./prod?id3, 3300L));StreamingFileSinkString fileSink StreamingFileSink.StringforRowFormat(new Path(./output),new SimpleStringEncoder(UTF-8)).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(15)).withInactivityInterval(TimeUnit.MINUTES.toMillis(5)).withMaxPartSize(1024 * 1024 * 1024).build()).build();// 将 Event 转换成 String 写入文件stream.map(Event::toString).addSink(fileSink);env.execute();}这里我们创建了一个简单的文件 Sink通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到因为文件会有内容持续不断地写入所以我们应该给一个标准到什么时候就开启新的文件将之前的内容归档保存。也就是说上面的代码设置了在以下 3 种情况下我们就会滚动分区文件
至少包含 15 分钟的数据最近 5 分钟没有收到新的数据文件大小已达到 1 GB
5.4.3、输出到 Kafka
Kafka 是一个分布式的基于发布/订阅的消息系统本身处理的也是流式数据所以经常会作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器我们可以用它方便地从 Kafka 读写数据。
现在我们要将数据输出到 Kafka整个数据处理的闭环已经形成所以可以完整测试如下
添加 Kafka 连接器依赖启动 Kafka 集群编写输出到 Kafka 的示例代码
我们可以直接将用户行为数据保存为文件 clicks.csv,读取后不做转换直接写入 Kafka主题topic命名为“clicks”。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties new Properties();properties.put(bootstrap.servers, hadoop102:9092);DataStreamSourceString stream env.readTextFile(input/clicks.csv);stream.addSink(new FlinkKafkaProducerString(clicks,new SimpleStringSchema(),properties));env.execute();}addSink 传入的参数是一个 FlinkKafkaProducer。因为需要向 Kafka 写入数据自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证能够真正做到精确一次exactly once的状态一致性。
Linux 主机启动一个消费者, 查看是否收到数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic clicks5.4.4、输出到 Redis
Redis 是一个开源的内存式的数据存储提供了像字符串string、哈希表hash、列表list、集合set、排序集合sorted set、位图bitmap、地理索引和流stream等一系列常用的数据结构。因为它运行速度快、支持的数据类型丰富在实际项目中已经成为了架构优化必不可少的一员一般用作数据库、缓存也可以作为消息代理。
Flink 没有直接提供官方的 Redis 连接器不过 Bahir 项目提供了 Flink-Redis 的连接工具。 具体测试步骤如下
导入的 Redis 连接器依赖
dependencygroupIdorg.apache.bahir/groupIdartifactIdflink-connector-redis_2.11/artifactIdversion1.0/version
/dependency启动 Redis 集群输出到 Redis 的示例代码
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 创建一个到 redis 连接的配置FlinkJedisPoolConfig conf new FlinkJedisPoolConfig.Builder().setHost(hadoop102).build();env.addSource(new ClickSource()).addSink(new RedisSinkEvent(conf, new MyRedisMapper()));env.execute();}public static class MyRedisMapper implements RedisMapperEvent {Overridepublic String getKeyFromData(Event e) {return e.user;}Overridepublic String getValueFromData(Event e) {return e.url;}Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, clicks);}}保存到 Redis 时调用的命令是 HSET所以是保存为哈希表hash表名为“clicks”保存的数据以 user 为 key以 url 为 value每来一条数据就会做一次转换。
运行代码Redis 查看是否收到数据
$ redis-cli
hadoop102:6379hgetall clicks
1) “Mary”
2) “./home”
3) “Bob”
4) “./cart”5.4.5、输出到 Elasticsearch
ElasticSearch 是一个分布式的开源搜索和分析引擎适用于所有类型的数据。ElasticSearch有着简洁的 REST 风格的 API以良好的分布式特性、速度和可扩展性而闻名在大数据领域应用非常广泛。
Flink 为 ElasticSearch 专门提供了官方的 Sink 连接器Flink 1.13 支持当前最新版本的ElasticSearch。
写入数据的 ElasticSearch 的测试步骤如下。
添加 Elasticsearch 连接器依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-elasticsearch7_${scala.binary.version}/artifactIdversion${flink.version}/version/dependency启动 Elasticsearch 集群编写输出到 Elasticsearch 的示例代码
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L),new Event(Alice, ./prod?id100, 3000L),new Event(Alice, ./prod?id200, 3500L),new Event(Bob, ./prod?id2, 2500L),new Event(Alice, ./prod?id300, 3600L),new Event(Bob, ./home, 3000L),new Event(Bob, ./prod?id1, 2300L),new Event(Bob, ./prod?id3, 3300L));ArrayListHttpHost httpHosts new ArrayList();httpHosts.add(new HttpHost(hadoop102, 9200, http));// 创建一个 ElasticsearchSinkFunctionElasticsearchSinkFunctionEvent elasticsearchSinkFunction new ElasticsearchSinkFunctionEvent() {Overridepublic void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {HashMapString, String data new HashMap();data.put(element.user, element.url);IndexRequest request Requests.indexRequest().index(clicks).type(type) // Es 6 必须定义 type.source(data);indexer.add(request);}};stream.addSink(new ElasticsearchSink.BuilderEvent(httpHosts,elasticsearchSinkFunction).build());
// stream.addSink(esBuilder.build());env.execute();}与 RedisSink 类 似 连 接 器 也 为 我 们 实 现 了 写 入 到 Elasticsearch 的SinkFunction——ElasticsearchSink。区别在于这个类的构造方法是私有private的我们需要使用elasticsearchSink 的 Builder 内部静态类调用它的 build()方法才能创建出真正的SinkFunction。而 Builder 的构造方法中又有两个参数
httpHosts连接到的 Elasticsearch 集群主机列表elasticsearchSinkFunction这并不是我们所说的 SinkFunction而是用来说明具体处理逻辑、准备数据向 Elasticsearch 发送请求的函数
具体的操作需要重写中 elasticsearchSinkFunction 中的 process 方法我们可以将要发送的数据放在一个 HashMap 中包装成 IndexRequest 向外部发送 HTTP 请求。
运行代码访问 Elasticsearch 查看是否收到数据查询结果如下所示。
{took : 5,timed_out : false,_shards : {total : 1,successful : 1,skipped : 0,failed : 0},hits : total : {value : 9,relation : eq},max_score : 1.0,hits : [{_index : clicks,_type : _doc,_id : dAxBYHoB7eAyu-y5suyU,_score : 1.0,_source : {Mary : ./home}}...]}
}5.4.6、输出到 MySQLJDBC
写入数据的 MySQL 的测试步骤如下。
添加依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency
dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.47/version
/dependency启动 MySQL在 database 库下建表 clicks
mysql create table clicks(- user varchar(20) not null,- url varchar(100) not null);编写输出到 MySQL 的示例代码
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceEvent stream env.fromElements(new Event(Mary, ./home, 1000L),new Event(Bob, ./cart, 2000L),new Event(Alice, ./prod?id100, 3000L),new Event(Alice, ./prod?id200, 3500L),new Event(Bob, ./prod?id2, 2500L),new Event(Alice, ./prod?id300, 3600L),new Event(Bob, ./home, 3000L),116new Event(Bob, ./prod?id1, 2300L),new Event(Bob, ./prod?id3, 3300L));stream.addSink(JdbcSink.sink(INSERT INTO clicks (user, url) VALUES (?, ?),(statement, r) - {statement.setString(1, r.user);statement.setString(2, r.url);},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://localhost:3306/userbehavior)// 对于 MySQL 5.7用com.mysql.jdbc.Driver.withDriverName(com.mysql.cj.jdbc.Driver).withUsername(username).withPassword(password).build()));env.execute();}运行代码用客户端连接 MySQL查看是否成功写入数据
mysql select * from clicks;
--------------------
| user | url |
--------------------
| Mary | ./home |
| Alice| ./prod?id300 |
| Bob | ./prod?id3 |
---------------------
3 rows in set (0.00 sec)5.4.7、自定义 Sink 输出
与 Source 类似Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类只要实现它通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。
例如Flink 并没有提供 HBase 的连接器在实现 SinkFunction 的时候需要重写的一个关键方法 invoke()在这个方法中我们就可以实现将流里的数据发送出去的逻辑。我们这里使用了 SinkFunction 的富函数版本因为这里我们又使用到了生命周期的概念创建 HBase 的连接以及关闭 HBase 的连接需要分别放在 open()方法和 close()方法中。
导入依赖
dependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-client/artifactIdversion${hbase.version}/version
/dependency编写输出到 HBase 的示例代码
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.fromElements(hello, world).addSink(new RichSinkFunctionString() {public org.apache.hadoop.conf.Configuration configuration; // 管理 Hbase 的配置信息,这里因为 Configuration 的重名问题将类以完整路径导入public Connection connection; // 管理 Hbase 连接Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);configuration HBaseConfiguration.create();configuration.set(hbase.zookeeper.quorum, hadoop102:2181);connection ConnectionFactory.createConnection(configuration);}Overridepublic void invoke(String value, Context context) throws Exception {Table table connection.getTable(TableName.valueOf(test)); // 表名为 testPut put new Put(rowkey.getBytes(StandardCharsets.UTF_8)); // 指定 rowkeyput.addColumn(info.getBytes(StandardCharsets.UTF_8) // 指定列名, value.getBytes(StandardCharsets.UTF_8) // 写入的数据, 1.getBytes(StandardCharsets.UTF_8)); // 写入的数据table.put(put); // 执行 put 操作table.close(); // 将表关闭}Overridepublic void close() throws Exception {super.close();connection.close(); // 关闭连接}});env.execute();}SSS