当前位置: 首页 > news >正文

这个网站的建设流程网站备案完成后不解析

这个网站的建设流程,网站备案完成后不解析,手工活外包加工官方网,抖音优化公司目录 一、有状态计算与无状态计算 #xff08;一#xff09;概念差异 #xff08;二#xff09;应用场景 二、有状态计算中的状态分类 #xff08;一#xff09;托管状态#xff08;Managed State#xff09;与原生状态#xff08;Raw State#xff09; 两者的…目录 一、有状态计算与无状态计算 一概念差异 二应用场景 二、有状态计算中的状态分类 一托管状态Managed State与原生状态Raw State 两者的区别 具体区别 管理方式 数据结构支持 使用场景 二托管状态细分  Keyed State 和 Operator State Keyed State键控状态 按键分区状态Keyed State分类 Operator State算子状态 列表状态ListState 联合列表状态UnionListState 广播状态BroadcastState 三、Keyed State 详解与代码实战 一数据格式 二代码示例模拟 maxBy 求每个 key 的最大值 三代码示例 体温异常监测统计输出 四、总结 在大数据流处理领域Apache Flink 凭借其卓越的性能和丰富的功能备受青睐。而 Flink 中的状态State管理机制更是支撑复杂流处理任务的关键支柱。无论是数据去重、模式匹配还是窗口聚合分析状态管理都发挥着不可或缺的作用。本文将深入浅出地剖析 Flink 状态相关知识结合实际代码案例助你理解这一重要概念。 一、有状态计算与无状态计算 一概念差异 无状态计算 无需考虑历史数据具有幂等性相同输入必然得到相同输出。例如 map 操作输入单词 “hello” 就记为 (hello, 1)每次输入 “hello” 输出结果恒定。 有状态计算 要依据历史数据相同输入可能因状态变化产生不同输出。像 sum、reduce、maxBy 等聚合操作首次输入 (hello, 1) 得 (hello, 1)再输入 (hello, 1) 会更新状态输出 (hello, 2)。 二应用场景 无状态计算场景 适用于数据转换、过滤等基础操作直接采用 map、filter 算子便捷高效。比如只想筛选出日志数据中特定级别的信息进行输出用 filter 按日志级别规则过滤即可。 有状态计算场景 涉及聚合求和、求最值等、比较操作时就得倚仗有状态算子。像统计电商平台各品类商品销售额借助 sum 基于商品品类 key 聚合金额数据。 二、有状态计算中的状态分类 Flink状态   - 托管状态    - KeyedState ( 在keyBy之后可以使用状态 )       - ValueState  (存储一个值)       - ListState   (存储多个值)       - MapState    (存储key-value)     - OperatorState ( 没有keyBy的情况下也可以使用 ) [不用]  - 原生状态 (不用) 有状态的计算是流处理框架要实现的重要功能因为稍复杂的流处理场景都需要记录状态State然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能 数据流中的数据有重复想对重复数据去重需要记录哪些数据已经流入过应用当新数据流入时根据已流入过的数据来判断去重。检查输入流是否符合某个特定的模式需要将之前流入的元素以状态的形式缓存下来。比如判断一个温度传感器数据流中的温度是否在持续上升。对一个时间窗口内的数据进行聚合分析分析一个小时内某项指标的75分位或99分位的数值。 其实窗口本身就是状态他不是立即出结果而是将数据都保存起来达到触发条件才计算。 一个状态更新和获取的流程如下图所示一个算子子任务接收输入流获取对应的状态根据新的计算结果更新状态。一个简单的例子是对一个时间窗口内输入流的某个整数字段求和那么当算子子任务接收到新元素时会获取已经存储在状态中的数值然后将当前输入加到状态上并将状态数据更新。 以wordcout为例说明上图的流程 一托管状态Managed State与原生状态Raw State 两者的区别 Managed State是由Flink管理的Flink帮忙存储、恢复和优化Raw State是开发者自己管理的需要自己序列化。 具体区别 管理方式 托管状态由 Flink Runtime 托管自动存储、恢复并行度调整时能自动重新分布状态开发者无需操心底层存储细节Flink 已做优化处理。原生状态开发者全程自主把控要手动序列化以字节数组存储Flink 不了解存储的数据结构管理成本高。 数据结构支持 托管状态支持 ValueState存单值、ListState存列表值、MapState存键值对等多样结构方便依业务逻辑选择适配。原生状态仅支持字节上层复杂数据结构得自行序列化为字节数组再操作。 使用场景 托管状态多数算子继承 Rich 函数类等接口即可轻松使用涵盖日常众多流处理任务。原生状态在现有算子与托管状态无法满足特殊、复杂自定义需求比如自研特殊聚合逻辑算子时才会启用。 二托管状态细分  Keyed State 和 Operator State Keyed State键控状态 Flink 为每个键值维护一个状态实例并将具有相同键的所有数据都分区到同一个算子任务中这个任务会维护和处理这个key对应的状态。当任务处理一条数据时它会自动将状态的访问范围限定为当前数据的key。因此具有相同key的所有数据都会访问相同的状态。 需要注意的是键控状态只能在 KeyedStream 上进行使用可以通过 stream.keyBy(...) 来得到 KeyedStream 。 按键分区状态Keyed State分类 Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State) ValueState存储单值类型的状态。可以使用 update(T) 进行更新并通过 T value() 进行检索。ListState存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素并通过 get() 获得整个列表。ReducingState用于存储经过 ReduceFunction 计算后的结果使用 add(T) 增加元素。AggregatingState用于存储经过 AggregatingState 计算后的结果使用 add(IN) 添加元素。FoldingState已被标识为废弃会在未来版本中移除官方推荐使用 AggregatingState 代替。MapState维护 Map 类型的状态。 Operator State算子状态 引用Flink中的状态管理_flink有状态和无状态-CSDN博客 三.算子状态Operator State 算子状态Operator State就是一个算子并行实例上定义的状态作用范围被限定为当前算子任务。每个算子子任务共享一个算子状态子任务间不共享 算子状态的实际应用场景不如Keyed State多一般用在Source或Sink等与外部系统连接的算子上一般使用不多。 当算子的并行度发生变化时算子状态也支持在并行的算子任务实例之间进行重组分配。根据状态类型的不同重组分配的方案也会有所差异。 综上所述算子状态是一种在特定算子上定义的状态其作用范围仅限于该算子任务且与数据的键值无关。相比之下按键分区状态提供了更丰富的功能和操作适用于处理与特定键值相关联的数据。在实际应用中需要根据具体需求和场景选择合适的状态类型来管理数据和共享状态。 算子状态也支持不同的结构类型主要有三种ListState、UnionListState和BroadcastState。 列表状态ListState 与Keyed State中的ListState一样将状态表示为一组数据的列表。 与Keyed State中的列表状态的区别是在算子状态的上下文中不会按键key分别处理状态所以每一个并行子任务上只会保留一个“列表”list也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度彼此之间完全独立。 当算子并行度进行缩放调整时算子的状态列表将会被全部收集收集起来再通过轮询的方式重新依次分配给新的所有并行任务。 算子状态中不会存在“键组”key group这样的结构所以为了方便重组分配就把它直接定义成了“列表”list。这也就解释了为什么算子状态中没有最简单的值状态ValueState。 联合列表状态UnionListState 与ListState类似联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于算子并行度进行缩放调整时对于状态的分配方式不同。 在并行度进行缩放调整时联合列表与普通列表不同联合列表会将所有并行子任务的列表状态收集起来并直接向所有并行子任务广播完整的列表。如果列表中状态项太多则不推荐使用联合里欸包状态。 使用上也与ListState类似只需要在实现CheckpointedFunction类的initializeState方法时通过上下文获取算子状态使用 .getUnionListState() 即可其他与ListState无异。 广播状态BroadcastState 有时我们希望算子并行子任务都保持同一份“全局”状态用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态状态就像被“广播”到所有分区一样这种特殊的算子状态就叫作广播状态BroadcastState。 在并行度进行缩放操作时由于是全局状态也不会造成影响。          简单来说就是一条流广播后专门读取配置与普通的数据流进行连结然后广播流将配置加载到广播状态中这样普通的数据流就能够在不重启程序的情况下通过上下文动态读取配置。 三、Keyed State 详解与代码实战 一数据格式 Flink 为 Keyed State 提供多种实用数据格式 ValueState存储单个值用 update(T) 更新T value() 检索取值常用于保存当前最值、计数等单一数据指标。ListState能存多个值通过 add(T)、addAll(List) 添加元素get() 获取完整列表适合缓存历史数据序列如记录用户近期操作记录列表。ReducingState基于 ReduceFunction 计算结果存储add(T) 增添元素持续归约数据。AggregatingState存放 AggregatingState 计算结果按自定义聚合逻辑 add(IN) 处理输入更新状态满足复杂聚合求值。MapState维护 Map 类型状态灵活处理键值对形式数据存储与操作。 二代码示例模拟 maxBy 求每个 key 的最大值 import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _01_KeyedStateDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamTuple2String, Long tupleDS env.fromElements(Tuple2.of(北京, 1L),Tuple2.of(上海, 2L),Tuple2.of(北京, 6L),Tuple2.of(上海, 8L),Tuple2.of(北京, 3L),Tuple2.of(上海, 4L),Tuple2.of(北京, 7L));//2. source-加载数据tupleDS.keyBy(new KeySelectorTuple2String, Long, String() {Overridepublic String getKey(Tuple2String, Long value) throws Exception {return value.f0;}}).map(new RichMapFunctionTuple2String, Long, Tuple2String,Long() {// 借助状态这个API实现ValueStateLong maxValueState null;Overridepublic void open(Configuration parameters) throws Exception {// 就是对ValueState初始化ValueStateDescriptorLong stateDescriptor new ValueStateDescriptorLong(valueState,Long.class);maxValueState getRuntimeContext().getState(stateDescriptor);}Overridepublic Tuple2String, Long map(Tuple2String, Long value) throws Exception {Long val value.f1;if(maxValueState.value() null){maxValueState.update(val);}else{if(maxValueState.value() val){maxValueState.update(val);}}return Tuple2.of(value.f0,maxValueState.value());}}).print();//.maxBy(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();} } 在此代码中先构建 Flink 执行环境、加载数据按城市 keyBy 后在 map 里利用 ValueState。open 方法初始化状态map 里对比输入值与状态存储的最大值按需更新并输出对应城市及最大值清晰展示 Keyed State 求最值用法。 通过Map计算最大值的案例 完整代码 package com.bigdata.state;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.HashMap; import java.util.Map;public class _01_KeyedStateDemo_MapTest {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamTuple2String, Long tupleDS env.fromElements(Tuple2.of(北京, 1L),Tuple2.of(上海, 2L),Tuple2.of(北京, 6L),Tuple2.of(上海, 8L),Tuple2.of(北京, 3L),Tuple2.of(上海, 4L),Tuple2.of(北京, 7L));//2. source-加载数据tupleDS.keyBy(new KeySelectorTuple2String, Long, String() {Overridepublic String getKey(Tuple2String, Long value) throws Exception {return value.f0;}}).map(new RichMapFunctionTuple2String, Long, Tuple2String,Long() {// 借助状态这个API实现MapString,Long map new HashMapString,Long();Overridepublic Tuple2String, Long map(Tuple2String, Long value) throws Exception {Long val value.f1;if(!map.containsKey(value.f0)){map.put(value.f0,value.f1);}else{Long mapValue map.get(value.f0);if(mapValue val){map.put(value.f0,value.f1);}}return Tuple2.of(value.f0,map.get(value.f0));}}).print();//.maxBy(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();} } 三代码示例 体温异常监测统计输出 package com.bigdata.state;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.planner.expressions.In; import org.apache.flink.util.Collector;import java.util.ArrayList;public class _02_KeyedStateDemo2 {// 如果一个人的体温超过阈值38度超过3次及以上则输出: 姓名 [温度1,温度2,温度3]public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSourceString dataStreamSource env.socketTextStream(localhost, 8889);//3. transformation-数据处理转换 zs,37dataStreamSource.map(new MapFunctionString, Tuple2String,Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] arr value.split(,);return Tuple2.of(arr[0],Integer.valueOf(arr[1]));}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer value) throws Exception {return value.f0;}}).flatMap(new RichFlatMapFunctionTuple2String, Integer, Tuple2String, ArrayListInteger() {ValueStateInteger valueState null;ListStateInteger listState null;Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptorInteger stateDescriptor new ValueStateDescriptorInteger(numState,Integer.class);valueState getRuntimeContext().getState(stateDescriptor);ListStateDescriptorInteger listStateDescriptor new ListStateDescriptor(listState, Integer.class);listState getRuntimeContext().getListState(listStateDescriptor);}Overridepublic void flatMap(Tuple2String, Integer value, CollectorTuple2String, ArrayListInteger out) throws Exception {Integer tiwen value.f1;if(tiwen 38){valueState.update(valueState.value()null?1:(valueState.value()1));listState.add(tiwen);}if(valueState.value()!null valueState.value() 3){ArrayListInteger list new ArrayList();IterableInteger iterable listState.get();for (Integer tiwenwen : iterable) {list.add(tiwenwen);}out.collect(Tuple2.of(value.f0,list));}}}).print();//4. sink-数据输出//5. execute-执行env.execute();} } 这段代码从本地 socket 读取姓名与体温数据转换格式后按姓名 keyBy。在 flatMap 里用 ValueState 统计体温超 38 度次数ListState 缓存超温数据次数达标则输出对应姓名及体温列表巧妙实现复杂业务规则监测。 四、总结 Flink 状态管理是构建强大流处理应用的基石合理运用有状态、无状态计算精准抉择托管状态类型及对应数据格式结合实战代码灵活处理业务逻辑能解锁 Flink 在大数据场景无限潜能高效应对各类数据处理挑战助你在大数据开发之路上稳步前行。后续可深入探索状态持久化、故障恢复等进阶特性深挖 Flink 流处理精髓。
http://www.dnsts.com.cn/news/59767.html

相关文章:

  • 用vue做网站的实例网站建设接活
  • 青岛网站制作计划济南建设工程备案网站
  • 小程序免费制作平台有哪些seo到底是做什么的
  • 网站的分类有哪些内容百度推广登录网址
  • 网站彩票怎么做长沙装修公司前十强
  • 目前网页设计工资多少自己的网站怎么做关键词优化
  • 乐清做网站公司哪家好优良的定制网站建设
  • 满山红厦门网站建设优化大师app下载
  • 网站备案成功造价材料价格信息网
  • 建设网站需要准备什么资料班级网站布局
  • 网站设计评价免费网站重生做军嫂
  • 做网站容易学吗电子商务毕业设计 网站建设
  • 大企业网站建设花瓣网是仿国外那个网站做的
  • 网站怎么收费手机端网站建设的费用清单
  • 惠城营销网站制作网络营销就是
  • 上线了做网站价格贵网页传奇游戏黑屏怎么解决
  • 网站开发公司会计科目wordpress 解析插件
  • 新乡建设网站昆明出入最新规定
  • 河北商城网站建设价格低网店分销系统
  • 做网站怎么加水平线网络营销的特点不包括什么
  • 建一个网站素材哪里来wordpress不能分类
  • 合肥花境建设网站给大家推荐免费视频服务器
  • 铁法能源公司网站网站底部导航菜单
  • 网站如何留住用户有域名 如何免费建设网站
  • 百度搜索这个网站为什么这么差最新软件发布平台
  • wordpress主题美化黑帽seo培训多少钱
  • 专业模板网站制作价格免费商品列表网页模板源代码
  • 上海松江区做网站的公司google官网登录
  • 临沂网站建设教程惠州网站网站建设
  • 广州市官网网站建设多少钱一个网站能卖多少钱?