班级网站怎么做网页制作,wordpress评论系统,创意设计一个网站,陕西建设工程信息网官网一、Flink状态编程
有状态的计算是流处理框架要实现的重要功能#xff0c;因为稍复杂的流处理场景都需要记录状态#xff0c;然后在新流入数据的基础上不断更新状态。 SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编…一、Flink状态编程
有状态的计算是流处理框架要实现的重要功能因为稍复杂的流处理场景都需要记录状态然后在新流入数据的基础上不断更新状态。 SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。 Flink的状态管理是它的优势之一。
二、什么是状态
在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作)。
流式计算分为无状态计算和有状态计算两种情况。 无状态的计算观察每个独立事件并根据最后一个事件输出结果。例如流处理应用程序从传感器接收水位数据并在水位超过指定高度时发出警告。
在简单聚合、窗口聚合、处理函数的应用都会有状态的身影出现。在Flink这样的分布式系统中我们不仅需要定义出状态在任务并行时的处理方式还需要考虑如何持久化保存、以便发生故障时能正确地恢复这就需要一套完整的管理机制来处理所有状态。
三、为什么需要管理状态
下面的几个场景都需要使用流处理的状态功能: 去重 数据流中的数据有重复我们想对重复数据去重需要记录哪些数据已经流入过应用当新数据流入时根据已流入过的数据来判断去重。 检测 检查输入流是否符合某个特定的模式需要将之前流入的元素以状态的形式缓存下来。比如判断一个温度传感器数据流中的温度是否在持续上升。 聚合 对一个时间窗口内的数据进行聚合分析分析一个小时内水位的情况 更新机器学习模型 在线机器学习场景下需要根据新流入数据不断更新机器学习的模型参数。
四、Flink中的状态分类
Managed State 状态管理方式 Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩 状态数据结构 Flink提供多种常用数据结构, 例如:ListState, MapState等 使用场景 绝大数Flink算子。
Raw State 状态管理方式 用户自己管理 状态数据结构 字节数组: byte[] 使用场景 所有算子
从具体使用场景来说绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类在里面使用Managed State。Raw State一般是在已有算子和Managed State不够用时用户自定义算子时使用。 在我们平时的使用中Managed State已经足够我们使用。 对Managed State继续细分它又有2种类型 Operator State(算子状态) Keyed State(键控状态)
Operator State 适用用算子类型 可用于所有算子: 常用于source, sink, 例如FlinkKafkaConsumer 状态分配一个算子的子任务对应一个状态 创建和访问方式 实现CheckpointedFunction或ListCheckpointed(已经过时)接口 横向扩展 并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量 支持的数据结构 ListState,UnionListStste和BroadCastState
Keyed State 适用用算子类型 只能用于用于KeyedStream上的算子 状态分配 一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State 创建和访问方式重写RichFunction, 通过里面的RuntimeContext访问w 横向扩展 并发改变, State随着Key在实例间迁移 支持的数据结构ValueState, ListState,MapState ReduceState, AggregatingState
五、算子状态的使用
Operator State可以用在所有算子上每个算子子任务或者说每个算子实例共享一个状态流入这个算子子任务的数据可以访问和更新这个状态。
注意: 算子子任务之间的状态不能互相访问 Operator State的实际应用场景不如Keyed State多它经常被用在Source或Sink等算子上用来保存流入数据的偏移量或对输出数据做缓存以保证Flink应用的Exactly-Once语义。
Flink为算子状态提供三种基本数据结构 列表状态List state将状态表示为一组数据的列表
联合列表状态Union list state也是将状态表示为数据的列表。它与常规列表状态的区别在于在发生故障时或者从保存点savepoint启动应用程序时如何恢复。一种是均匀分配(List state)另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。
广播状态Broadcast state 是一种特殊的算子状态. 如果一个算子有多项任务而它的每项任务状态又都相同那么这种特殊情况最适合应用广播状态。
三种状态的实现
六、键控状态的使用
键控状态是根据输入数据流中定义的键key来维护和访问的只能用于KeyedStreamkeyBy算子处理之后。相同key的所有数据都会访问相同的状态。 键控状态支持的数据类型 注意: a)所有的类型都有clear(), 清空当前key的状态 b)这些状态对象仅用于用户与状态进行交互. c)状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方 d)从状态获取的值与输入元素的key相关
七、状态后端
状态的存储、访问以及维护由一个可插入的组件决定这个组件就叫做状态后端。 状态后端主要负责两件事 本地(taskmanager)的状态管理 将检查点checkpoint状态写入远程存储
状态后端的分类及配置 状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适的状态后端。 MemoryStateBackend 内存级别的状态后端(默认), 存储方式:本地状态存储在TaskManager的内存中, checkpoint 存储在JobManager的内存中. 特点:快速, 低延迟, 但不稳定 使用场景: 1. 本地测试 2. 几乎无状态的作业(ETL) 3. JobManager不容易挂, 或者挂了影响不大. 4. 不推荐在生产环境下使用 FsStateBackend 存储方式: 本地状态在TaskManager内存, Checkpoint时, 存储在文件系统(hdfs)中 特点: 拥有内存级别的本地访问速度, 和更好的容错保证 使用场景: 1. 常规使用状态的作业. 例如分钟级别窗口聚合, join等 2. 需要开启HA的作业 3. 可以应用在生产环境中 RocksDBStateBackend 将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储) 存储方式: 1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存磁盘) 2. Checkpoint在外部文件系统(hdfs)中. 使用场景: 1. 超大状态的作业, 例如天级的窗口聚合 2. 需要开启HA的作业 3. 对读写状态性能要求不高的作业 4. 可以使用在生产环境
八、案例列表状态
package com.lyh.flink09;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.List;public class state_programe1_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream(hadoop100,9999).map(new MyMapFunctin()).print();env.execute();}public static class MyMapFunctin implements MapFunctionString,Long, CheckpointedFunction {private Long count 0L;private ListStateLong state;Overridepublic Long map(String value) throws Exception {count;return count;}// 初始化时会调用这个方法向本地状态中填充数据. 每个子任务调用一次Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println(initialize.....);state context.getOperatorStateStore().getListState(new ListStateDescriptorLong(state,Long.class));for (Long c : state.get()) {count c;}}// Checkpoint时会调用这个方法我们要实现具体的snapshot逻辑比如将哪些本地状态持久化Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println(snapshot.....);state.clear();state.add(count);}}
}九、案例广播状态
package com.lyh.flink09;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class state_broad1_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);DataStreamSourceString dataStream env.socketTextStream(hadoop100, 9999);DataStreamSourceString controlStream env.socketTextStream(hadoop100, 8888);MapStateDescriptorString, String stateDescriptor new MapStateDescriptor(state, String.class, String.class);// 广播流BroadcastStreamString broadcastStream controlStream.broadcast(stateDescriptor);dataStream.connect(broadcastStream).process(new BroadcastProcessFunctionString, String, String() {Overridepublic void processElement(String value, ReadOnlyContext ctx, CollectorString out) throws Exception {// 从广播状态中取值, 不同的值做不同的业务ReadOnlyBroadcastStateString, String state ctx.getBroadcastState(stateDescriptor);if (1.equals(state.get(switch))) {out.collect(切换到1号配置....);} else if (0.equals(state.get(switch))) {out.collect(切换到0号配置....);} else {out.collect(切换到其他配置....);}}Overridepublic void processBroadcastElement(String value, Context ctx, CollectorString out) throws Exception {BroadcastStateString, String state ctx.getBroadcastState(stateDescriptor);// 把值放入广播状态state.put(switch, value);}}).print();env.execute();}}