当前位置: 首页 > news >正文

如何在各种网站投放广告大学生网站的设计风格

如何在各种网站投放广告,大学生网站的设计风格,怎么做网站优化推广,网站设计怎么学目录 1、状态概述 1.1 无状态算子 1.2 有状态算子 2、状态分类 ​编辑 2.1 算子状态 2.1.1 列表状态#xff08;ListState#xff09; 2.1.2 联合列表状态#xff08;UnionListState#xff09; 2.1.3 广播状态#xff08;BroadcastState#xff09; 2.2 按键分…目录 1、状态概述 1.1 无状态算子 1.2 有状态算子 2、状态分类 ​编辑 2.1 算子状态 2.1.1 列表状态ListState 2.1.2 联合列表状态UnionListState 2.1.3 广播状态BroadcastState 2.2 按键分区状态  2.2.1 值状态ValueState 2.2.2 列表状态ListState 2.2.3 Map状态MapState 2.2.4 归约状态ReducingState 2.2.5 聚合状态AggregatingState 2.2.6 状态生存时间TTL 3、状态后端State Backends 3.1 状态后端的分类HashMapStateBackend/RocksDB 3.1.1 哈希表状态后端HashMapStateBackend 3.1.2 内嵌RocksDB状态后端EmbeddedRocksDBStateBackend 3.2 如何选择正确的状态后端 3.3 状态后端的配置 1、状态概述 1.1 无状态算子 根据当前的输入可以直接转换得到输出结果这种鼻子就是无状态算子如map,flatMap,filter 1.2 有状态算子 除当前处理之外还需要其他处理才能得到计算结果。如聚合算子窗口算子等 2、状态分类 Flink的状态有两种托管状态Managed State和原始状态Raw State。托管状态就是由Flink统一管理的状态的存储访问、故障恢复和重组等一系列问题都由Flink实现我们只要调接口就可以而原始状态则是自定义的相当于就是开辟了一块内存需要我们自己管理实现状态的序列化和故障恢复。 通常我们采用Flink托管状态来实现需求。 2.1 算子状态 一个算子任务会按照并行度分为多个并行子任务执行而不同的子任务会占据不同的任务槽task slot。由于不同的slot在计算资源上是物理隔离的所以Flink能管理的状态在并行任务间是无法共享的每个状态只能针对当前子任务的实例有效。 算子状态Operator State就是一个算子并行实例上定义的状态作用范围被限定为当前算子任务。          算子状态的实际应用场景不如Keyed State多一般用在Source或Sink等与外部系统连接的算子上或者完全没有key定义的场景。比如Flink的Kafka连接器中就用到了算子状态。 算子状态也支持不同的结构类型主要有三种ListState、UnionListState和BroadcastState。 2.1.1 列表状态ListState 与Keyed State中的列表状态的区别是在算子状态的上下文中不会按键key分别处理状态所以每一个并行子任务上只会保留一个“列表”list也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度彼此之间完全独立。 案例实操在map算子中计算数据的个数。 public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream(hadoop102, 7777).map(new MyCountMapFunction()).print();env.execute();}// TODO 1.实现 CheckpointedFunction 接口public static class MyCountMapFunction implements MapFunctionString, Long, CheckpointedFunction {private Long count 0L;private ListStateLong state;Overridepublic Long map(String value) throws Exception {return count;}/*** TODO 2.本地变量持久化将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用** param context* throws Exception*/Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println(snapshotState...);// 2.1 清空算子状态state.clear();// 2.2 将 本地变量 添加到 算子状态 中state.add(count);}/*** TODO 3.初始化本地变量程序启动和恢复时 从状态中 把数据添加到 本地变量每个子任务调用一次** param context* throws Exception*/Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println(initializeState...);// 3.1 从 上下文 初始化 算子状态state context.getOperatorStateStore().getListState(new ListStateDescriptorLong(state, Types.LONG));// 3.2 从 算子状态中 把数据 拷贝到 本地变量if (context.isRestored()) {for (Long c : state.get()) {count c;}}}} } 2.1.2 联合列表状态UnionListState 与ListState类似联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于算子并行度进行缩放调整时对于状态的分配方式不同。 UnionListState的重点就在于“联合”union。在并行度调整时常规列表状态是轮询分配状态项而联合列表状态的算子则会直接广播状态的完整列表。 如果列表中状态项数量太多为资源和效率考虑一般不建议使用联合重组的方式。 使用方式同ListState区别在getUnionListState(new ListStateDescriptorLong(union-state, Types.LONG)); state context.getOperatorStateStore().getUnionListState(new ListStateDescriptorLong(union-state, Types.LONG)); 2.1.3 广播状态BroadcastState 有时我们希望算子并行子任务都保持同一份“全局”状态用来做统一的配置和规则设定。 案例实操水位超过指定的阈值发送告警阈值可以动态修改。 public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 数据流SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());// 配置流用来广播配置DataStreamSourceString configDS env.socketTextStream(hadoop102, 8888);// TODO 1. 将 配置流 广播MapStateDescriptorString, Integer broadcastMapState new MapStateDescriptor(broadcast-state, Types.STRING, Types.INT);BroadcastStreamString configBS configDS.broadcast(broadcastMapState);// TODO 2.把 数据流 和 广播后的配置流 connectBroadcastConnectedStreamWaterSensor, String sensorBCS sensorDS.connect(configBS);// TODO 3.调用 processsensorBCS.process(new BroadcastProcessFunctionWaterSensor, String, String() {/*** 数据流的处理方法 数据流 只能 读取 广播状态不能修改* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, CollectorString out) throws Exception {// TODO 5.通过上下文获取广播状态取出里面的值只读不能修改ReadOnlyBroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);Integer threshold broadcastState.get(threshold);// 判断广播状态里是否有数据因为刚启动时可能是数据流的第一条数据先来threshold (threshold null ? 0 : threshold);if (value.getVc() threshold) {out.collect(value ,水位超过指定的阈值 threshold !!!);}}/*** 广播后的配置流的处理方法: 只有广播流才能修改 广播状态* param value* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorString out) throws Exception {// TODO 4. 通过上下文获取广播状态往里面写数据BroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);broadcastState.put(threshold, Integer.valueOf(value));}}).print();env.execute();} } 2.2 按键分区状态  而很多有状态的操作比如聚合、窗口都是要先做keyBy进行按键分区的。按键分区之后任务所进行的所有计算都应该只针对当前key有效所以状态也应该按照key彼此隔离。 它的特点非常鲜明就是以key为作用范围进行隔离。 需要注意使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream即使转换算子实现了对应的富函数类也不能通过运行时上下文访问Keyed State。 2.2.1 值状态ValueState public interface ValueStateT extends State {T value() throws IOException;void update(T value) throws IOException; } T value()获取当前状态的值update(T value)对状态进行更新传入的参数value就是要覆写的状态值。 在具体使用时为了让运行时上下文清楚到底是哪个状态我们还需要创建一个“状态描述器”StateDescriptor来提供状态的基本信息。例如源码中ValueState的状态描述器构造方法如下 public ValueStateDescriptor(String name, ClassT typeClass) {super(name, typeClass, null); } 案例需求检测每种传感器的水位值如果连续的两个水位值超过10就输出报警。 public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {// TODO 1.定义状态ValueStateInteger lastVcState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 2.在open方法中初始化状态// 状态描述器两个参数第一个参数起个名字不重复第二个参数存储的类型lastVcState getRuntimeContext().getState(new ValueStateDescriptorInteger(lastVcState, Types.INT));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception { // lastVcState.value(); // 取出 本组 值状态 的数据 // lastVcState.update(); // 更新 本组 值状态 的数据 // lastVcState.clear(); // 清除 本组 值状态 的数据// 1. 取出上一条数据的水位值(Integer默认值是null判断)int lastVc lastVcState.value() null ? 0 : lastVcState.value();// 2. 求差值的绝对值判断是否超过10Integer vc value.getVc();if (Math.abs(vc - lastVc) 10) {out.collect(传感器 value.getId() 当前水位值 vc ,与上一条水位值 lastVc ,相差超过10);}// 3. 更新状态里的水位值lastVcState.update(vc);}}).print();env.execute();} 2.2.2 列表状态ListState 将需要保存的数据以列表List的形式组织起来。在ListStateT接口中同样有一个类型参数T表示列表中数据的类型。ListState也提供了一系列的方法来操作状态使用方式与一般的List非常相似。 IterableT get()获取当前的列表状态返回的是一个可迭代类型IterableTupdate(ListT values)传入一个列表values直接对状态进行覆盖add(T value)在状态列表中添加一个元素valueaddAll(ListT values)向列表中添加多个元素以列表values形式传入。 类似地ListState的状态描述器就叫作ListStateDescriptor用法跟ValueStateDescriptor完全一致。 案例:针对每种传感器输出最高的3个水位值 public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ListStateInteger vcListState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState getRuntimeContext().getListState(new ListStateDescriptorInteger(vcListState, Types.INT));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 1.来一条存到list状态里vcListState.add(value.getVc());// 2.从list状态拿出来(Iterable) 拷贝到一个List中排序 只留3个最大的IterableInteger vcListIt vcListState.get();// 2.1 拷贝到List中ListInteger vcList new ArrayList();for (Integer vc : vcListIt) {vcList.add(vc);}// 2.2 对List进行降序排序vcList.sort((o1, o2) - o2 - o1);// 2.3 只保留最大的3个(list中的个数一定是连续变大一超过3就立即清理即可)if (vcList.size() 3) {// 将最后一个元素清除第4个vcList.remove(3);}out.collect(传感器id为 value.getId() ,最大的3个水位值 vcList.toString());// 3.更新list状态vcListState.update(vcList);// vcListState.get(); //取出 list状态 本组的数据是一个Iterable // vcListState.add(); // 向 list状态 本组 添加一个元素 // vcListState.addAll(); // 向 list状态 本组 添加多个元素 // vcListState.update(); // 更新 list状态 本组数据覆盖 // vcListState.clear(); // 清空List状态 本组数据}}).print();env.execute();} } 2.2.3 Map状态MapState 把一些键值对key-value作为状态整体保存起来可以认为就是一组key-value映射的列表。 MapState提供了操作映射状态的方法与Map的使用非常类似。 UV get(UK key)传入一个key作为参数查询对应的value值put(UK key, UV value)传入一个键值对更新key对应的value值putAll(MapUK, UV map)将传入的映射map中所有的键值对全部添加到映射状态中remove(UK key)将指定key对应的键值对删除boolean contains(UK key)判断是否存在指定的key返回一个boolean值。 另外MapState也提供了获取整个映射相关信息的方法 IterableMap.EntryUK, UV entries()获取映射状态中所有的键值对IterableUK keys()获取映射状态中所有的键key返回一个可迭代Iterable类型IterableUV values()获取映射状态中所有的值value返回一个可迭代Iterable类型boolean isEmpty()判断映射是否为空返回一个boolean值。 案例需求统计每种传感器每种水位值出现的次数。 public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {MapStateInteger, Integer vcCountMapState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState getRuntimeContext().getMapState(new MapStateDescriptorInteger, Integer(vcCountMapState, Types.INT, Types.INT));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 1.判断是否存在vc对应的keyInteger vc value.getVc();if (vcCountMapState.contains(vc)) {// 1.1 如果包含这个vc的key直接对value1Integer count vcCountMapState.get(vc);vcCountMapState.put(vc, count);} else {// 1.2 如果不包含这个vc的key初始化put进去vcCountMapState.put(vc, 1);}// 2.遍历Map状态输出每个k-v的值StringBuilder outStr new StringBuilder();outStr.append(\n);outStr.append(传感器id为 value.getId() \n);for (Map.EntryInteger, Integer vcCount : vcCountMapState.entries()) {outStr.append(vcCount.toString() \n);}outStr.append(\n);out.collect(outStr.toString());// vcCountMapState.get(); // 对本组的Map状态根据key获取value // vcCountMapState.contains(); // 对本组的Map状态判断key是否存在 // vcCountMapState.put(, ); // 对本组的Map状态添加一个 键值对 // vcCountMapState.putAll(); // 对本组的Map状态添加多个 键值对 // vcCountMapState.entries(); // 对本组的Map状态获取所有键值对 // vcCountMapState.keys(); // 对本组的Map状态获取所有键 // vcCountMapState.values(); // 对本组的Map状态获取所有值 // vcCountMapState.remove(); // 对本组的Map状态根据指定key移除键值对 // vcCountMapState.isEmpty(); // 对本组的Map状态判断是否为空 // vcCountMapState.iterator(); // 对本组的Map状态获取迭代器 // vcCountMapState.clear(); // 对本组的Map状态清空}}).print();env.execute();} } 2.2.4 归约状态ReducingState 归约逻辑的定义是在归约状态描述器ReducingStateDescriptor中通过传入一个归约函数ReduceFunction来实现的。这里的归约函数就是我们之前介绍reduce聚合算子时讲到的ReduceFunction所以状态类型跟输入的数据类型是一样的。 public ReducingStateDescriptor(String name, ReduceFunctionT reduceFunction, ClassT typeClass) {...} 案例计算每种传感器的水位和 .process(new KeyedProcessFunctionString WaterSensor Integer() {private ReducingStateInteger sumVcState;Overridepublic void open(Configuration parameters) throws Exception {sumVcState this.getRuntimeContext().getReducingState(new ReducingStateDescriptorInteger(sumVcStateInteger::sumInteger.class));}Overridepublic void processElement(WaterSensor value Context ctx CollectorInteger out) throws Exception {sumVcState.add(value.getVc());out.collect(sumVcState.get());} }) 2.2.5 聚合状态AggregatingState 与归约状态非常类似聚合状态也是一个值用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数AggregateFunction来定义的这也就是之前我们讲过的AggregateFunction里面通过一个累加器Accumulator来表示状态所以聚合的状态类型可以跟添加进来的数据类型完全不同使用更加灵活。 案例需求计算每种传感器的平均水位 public class KeyedAggregatingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {AggregatingStateInteger, Double vcAvgAggregatingState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAvgAggregatingState getRuntimeContext().getAggregatingState(new AggregatingStateDescriptorInteger, Tuple2Integer, Integer, Double(vcAvgAggregatingState,new AggregateFunctionInteger, Tuple2Integer, Integer, Double() {Overridepublic Tuple2Integer, Integer createAccumulator() {return Tuple2.of(0, 0);}Overridepublic Tuple2Integer, Integer add(Integer value, Tuple2Integer, Integer accumulator) {return Tuple2.of(accumulator.f0 value, accumulator.f1 1);}Overridepublic Double getResult(Tuple2Integer, Integer accumulator) {return accumulator.f0 * 1D / accumulator.f1;}Overridepublic Tuple2Integer, Integer merge(Tuple2Integer, Integer a, Tuple2Integer, Integer b) { // return Tuple2.of(a.f0 b.f0, a.f1 b.f1);return null;}},Types.TUPLE(Types.INT, Types.INT)));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 将 水位值 添加到 聚合状态中vcAvgAggregatingState.add(value.getVc());// 从 聚合状态中 获取结果Double vcAvg vcAvgAggregatingState.get();out.collect(传感器id为 value.getId() ,平均水位值 vcAvg);// vcAvgAggregatingState.get(); // 对 本组的聚合状态 获取结果 // vcAvgAggregatingState.add(); // 对 本组的聚合状态 添加数据会自动进行聚合 // vcAvgAggregatingState.clear(); // 对 本组的聚合状态 清空数据}}).print();env.execute();} } 2.2.6 状态生存时间TTL 在实际应用中很多状态会随着时间的推移逐渐增长如果不加以限制最终就会导致存储空间的耗尽。 配置状态的TTL时需要创建一个StateTtlConfig配置对象然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。 StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptorString stateDescriptor new ValueStateDescriptor(my state, String.class);stateDescriptor.enableTimeToLive(ttlConfig); 这里用到了几个配置项 .newBuilder() 状态TTL配置的构造器方法必须调用返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数这就是设定的状态生存时间。 .setUpdateType() 设置更新类型。更新类型指定了什么时候更新状态失效时间这里的OnCreateAndWrite表示只有创建状态和更改状态写操作时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间也就是只要对状态进行了访问就表明它是活跃的从而延长生存时间。这个配置默认为OnCreateAndWrite。 .setStateVisibility() 设置状态的可见性。所谓的“状态可见性”是指因为清除操作并不是实时的所以当状态过期之后还有可能继续存在这时如果对它进行访问能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为表示从不返回过期值也就是只要过期就认为它已经被清除了应用不能继续读取这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp就是如果过期状态还存在就返回它的值。 public class StateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ValueStateInteger lastVcState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 1.创建 StateTtlConfigStateTtlConfig stateTtlConfig StateTtlConfig.newBuilder(Time.seconds(5)) // 过期时间5s // .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入更新 更新 过期时间.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入更新 更新 过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值.build();// TODO 2.状态描述器 启用 TTLValueStateDescriptorInteger stateDescriptor new ValueStateDescriptor(lastVcState, Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);this.lastVcState getRuntimeContext().getState(stateDescriptor);}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 先获取状态值打印 》 读取状态Integer lastVc lastVcState.value();out.collect(key value.getId() ,状态值 lastVc);// 如果水位大于10更新状态值 》 写入状态if (value.getVc() 10) {lastVcState.update(value.getVc());}}}).print();env.execute();} } 3、状态后端State Backends 在Flink中状态的存储、访问以及维护都是由一个可插拔的组件决定的这个组件就叫作状态后端state backend。状态后端主要负责管理本地状态的存储方式和位置。 3.1 状态后端的分类HashMapStateBackend/RocksDB Flink中提供了两类不同的状态后端一种是“哈希表状态后端”HashMapStateBackend另一种是“内嵌RocksDB状态后端”EmbeddedRocksDBStateBackend。 系统默认的状态后端是HashMapStateBackend。 3.1.1 哈希表状态后端HashMapStateBackend HashMapStateBackend是把状态存放在内存里。具体实现上哈希表状态后端在内部会直接把状态当作对象objects保存在Taskmanager的JVM堆上。 普通的状态以及窗口中收集的数据和触发器都会以键值对的形式存储起来所以底层是一个哈希表HashMap这种状态后端也因此得名。 3.1.2 内嵌RocksDB状态后端EmbeddedRocksDBStateBackend RocksDB是一种内嵌的key-value存储介质可以把数据持久化到本地硬盘。 配置EmbeddedRocksDBStateBackend后会将处理中的数据全部放入RocksDB数据库中RocksDB默认存储在TaskManager的本地数据目录里。 3.2 如何选择正确的状态后端 HashMap和RocksDB两种状态后端最大的区别就在于本地状态存放在哪里。 HashMapStateBackend是内存计算读写速度非常快但是状态的大小会受到集群可用内存的限制如果应用的状态随着时间不停地增长就会耗尽内存资源。 而RocksDB是硬盘存储所以可以根据可用的磁盘空间进行扩展所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化而且可能需要直接从磁盘读取数据这就会导致性能的降低平均读写性能要比HashMapStateBackend慢一个数量级。 3.3 状态后端的配置 3.3.1 配置默认的状态后端 #flink-conf.yaml# 默认状态后端 state.backend: hashmap# 存放检查点的文件路径 # 这里的state.checkpoints.dir配置项定义了检查点和元数据写入的目录。 state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints 3.3.2 为每个作业Per-job/Application单独配置状态后端 通过执行环境设置HashMapStateBackend。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new HashMapStateBackend()); 通过执行环境设置EmbeddedRocksDBStateBackend。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new EmbeddedRocksDBStateBackend()); 需要注意如果想在IDE中使用EmbeddedRocksDBStateBackend需要为Flink项目添加依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion${flink.version}/version /dependency
http://www.dnsts.com.cn/news/31805.html

相关文章:

  • 做网站怎么删除图片做搜狗pc网站快速排
  • 网站建设 从入门到精通pdfwordpress 动图
  • 如何在电影网站中做淘客甘肃网站建设公司电话
  • 网站集群建设解决方案企业网站的建设目的有什么
  • 网站制作青岛公司为什么要做外贸网站
  • 秦皇岛建设网站公司哪家好网站建设与规划实训总结
  • 眼科医院网站优化服务商海口建网站
  • 通辽做网站制作wordpress文章详情展示不了
  • 中国科协网站建设招标电子商务网站开发教程论文6
  • 盘石网站做的怎么样百度网站改版工具
  • ui设计软件哪个好小小课堂seo自学网
  • 全球旅游网站排名秦皇岛市住房公积金管理中心
  • 佛山专业的免费建站wordpress调用最新文章列表
  • 信息类网站天猫购买平台
  • 如果做一个网站100元建网站
  • 代做ppt网站建设及优化方案
  • 嘉兴做网站设计做房产网站能赚钱吗
  • 企业网站流程图做的网站百度搜索不出来的
  • 城子河网站建设小米商城网站设计论文
  • 佛山市建设小学网站wordpress移动端底部导航栏
  • 浙江创都建设有限公司网站常州营销推广公司
  • 购物网站 开发赣州信息港
  • 手机做推广比较好的网站延安网站优化
  • 廊坊网站建设精灵化工seo顾问
  • 龙岩网站建设加盟电商代理
  • 温州大凯工艺品有限公司英文网站官方网站下载微博
  • 北京 网站建设沈阳男科医院哪家口碑好
  • 软件公司 网站模板住宅项目建设背景
  • 网站建设前言和背景国外服务器ip大全
  • 贵州建设厅网站怎样查询电工证做搜狗手机网站优化