当前位置: 首页 > news >正文

asp.net mvc 统计网站流量数据帮一个公司做网站多少钱

asp.net mvc 统计网站流量数据,帮一个公司做网站多少钱,代理域名网站的公司,服务器上如何做网站接上文#xff1a;Flink实战四_TableAPISQL 在学习Flink的状态机制之前#xff0c;我们需要理解什么是状态。回顾我们之前介绍的很多流计算的计算过程#xff0c;有些计算方法#xff0c;比如说我们之前多次使用的将stock.txt中的一行文本数据转换成Stock股票对象的ma… 接上文Flink实战四_TableAPISQL 在学习Flink的状态机制之前我们需要理解什么是状态。回顾我们之前介绍的很多流计算的计算过程有些计算方法比如说我们之前多次使用的将stock.txt中的一行文本数据转换成Stock股票对象的map操作。来一个数据就计算一个数据这些操作只需要依赖于当前输入的数据就够了不需要其他的辅助数据。输入相同的文本数据输出的肯定是一个相同的Stock股票对象。 而另外一些操作比如在WindowFunctionDemo中介绍的计算平均值的方法。同样是来一个数据处理一个数据但是他每次计算出来的结果除了依赖于当前输入的数据还需要依赖于accumulate累加器中的数据。输入相同的股票数据由于累加器中的数据不同输出的股票平均价格也就不同。 像累加器这种由一个任务来维护并且要参与到数据计算过程中的数据就称为状态。这一类计算任务也称为有状态的任务。比如reduce、sum、min、minby等等操作都是典型的有状态的算子。而与之对应的只依赖于输入数据的计算任务就称为无状态的任务。多个任务叠加在一起就组成了一个客户端应用。 对于状态也可以认为就是一个本地变量他可以被一个客户端应用中的所有计算任务都访问到。对于状态的管理通常是比较复杂的尤其在分布式流式计算场景下。任务是并行计算的所以状态也需要分开保存。集群故障恢复后又需要合并读取。在算子并行度发生变化时又要维护状态的一致性。再考虑到状态数据要尽量高效的存储与访问等等。Flink的状态机制提供了对这类状态数据的统一管理。开发人员可以专注于开发业务逻辑而不用时刻考虑状态的各种复杂管理机制。 对于状态有两种管理机制一种是managed state就是Flink管理的状态机制对之前提到的一些状态管理的问题提供了统一的管理机制。另一种是raw state就是用户自己管理的状态机制。只需要Flink提供一个本地变量空间由应用程序自己去管理这一部分状态。Flink的状态管理机制非常强大所以在大部分的开发场景下我们使用Flink提供的状态管理机制就足够了。 Flink中管理的状态都是跟特定计算任务关联在一起的。他的状态主要有两种一种是operator state 算子状态一种是keyed State 键控状态。 1、Operator State 算子状态 算子状态的作用范围限定为当前计算任务内这种状态是跟一个特定的计算任务绑定的。算子状态的作用范围只限定在算子任务内由同一并行任务所处理的所有数据都可以访问到相同的状态。并且这个算子状态不能由其他子任务访问。比如WindowFunctionDemo中计算股票平均价格的MyAvg计算任务里的累加器就只能在当前计算任务中访问。即使在多个不同的应用程序中都可以使用MyAvg这个计算任务但是每个应用程序中访问到的累加器都是不同的。 这一类算子需要按任务分开保存而当任务的并行度发生变化时还需要支持在并行运算实例之间重新分配状态。 例如下面我们定义一个带状态的求和算子在这个示例中就给一个简单的求和算子保存了一个状态。 示例代码 com.flink.state.SumOperatorState import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** 算子状态*/ public class SumOperatorState {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.setStateBackend(new RocksDBStateBackend(hdfs://hadoop01:8020/SumOperatorState)); // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));final DataStreamSourceInteger stream env.fromElements(1, 2, 3, 4, 5,6);final SingleOutputStreamOperatorInteger stream2 stream.map(new MySumMapper(mysummapper));stream2.print(); // final DataStreamInteger union stream.union(stream2);env.execute(stream);}public static class MySumMapper implements MapFunctionInteger,Integer, CheckpointedFunction {private int sum;private String stateKey;private ListStateInteger checkpointedState;public MySumMapper(String stateKey){this.stateKey stateKey;}Overridepublic Integer map(Integer value) throws Exception {return sum value;}Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();checkpointedState.add(sum);}Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptorInteger descriptor new ListStateDescriptorInteger(stateKey,TypeInformation.of(new TypeHintInteger() {}));checkpointedState context.getOperatorStateStore().getListState(descriptor);if(context.isRestored()){for (Integer subSum : checkpointedState.get()) {sum subSum;}}}} }可以看到Flink中的算子状态操作还是比较简单的可以给算子继承一个CheckpointedFunction接口这个接口有两个方法snapshotState方法会在算子执行过程中调用进行状态保存。initializeState方法是在任务启动时加载初始化的状态。这样算子在执行过程中就可以将中间结果保存到checkpointedState状态中。当算子异常终止时下一次启动又可以从这个checkpointedState状态中加载之前的计算结果。用户程序只需要定义逻辑而不需要管触发的时机。 关于不同的状态类型 在获取状态的地方 context.getOperatorStateStore()这个方法有几个重载的方法getListStategetUnionListStategetBroadcastState。 其中getListState和getUnionListState这两个方法都是处理ListState也就是不同的任务节点他的状态也不相同。只是这两种状态的底层状态分配机制不同。ListState是将不同的子状态分配好了之后分给不同的算子实例去处理。而 UnionListState则是将所有的子状态都分配给所有的算子实例由算子实例自行调节每个实例获取哪些状态。FlinkKafkaConsumer就是使用的UnionListState。 最后一个getBoradcastState是处理广播状态也就是所有任务节点的状态都是一样的。 其他的算子包括function,source,sink都可以自行添加状态管理。这其中需要理解的就是checkpointedState的形式为什么是一个集合状态ListState 这是因为Flink的计算任务都是并行执行的那么在计算过程中每一个并行的实例都会有一个自己的状态所以在snapshotState保存状态时是将每个并行实例内的状态进行保存那整个任务整体就会保存成一个集合。所以示例中保存的其实是每个子任务内计算到的sum和。 当任务重新启动时Flink可能还需要对子任务的状态进行重新分配因为任务的并行度有可能进行了调整。所以示例中initializeState方法加载状态时也是将各个子状态的sum加到一起才是一个完整的求和计算。 2、keyed State 键控状态 算子状态针对的是普通算子在任何DataStream和DataSet中都可以使用。但是如果针对KeyedStream情况又有所不同。相比算子状态keyedState键控状态是针对keyby产生的KeyedStream。KeyedStream的计算任务都跟当前分配的key直接关联。相对应的KeyedState状态也就跟key有关。而key是在计算任务运行时分配的。这一类状态无法在任务启动过程中完成状态的分配。需要在任务执行过程中根据key的分配不同而进行不同的分配。Flink针对keyedStream会在内部根据每个key维护一个键控状态。在具体运算过程中根据key的分配情况将状态分配给不同的计算任务。 针对键控状态 Flink提供了一系列Rich开头的富计算因子抽象类这些抽象类提供了更丰富的计算任务生命周期管理。用户程序通过继承这些抽象类就可以获取到与当前分配的key相关的状态。 我们先来看一个关于KeyedStream的状态示例。下面实现了一个自定义的求word count的算子。 示例代码 com.flink.state.WCKeyedState import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream;/*** author roy* date 2021/9/6*/ public class WCKeyedState {public static void main(String[] args) throws Exception {//创建执行环境final StreamExecutionEnvironment environment StreamExecutionEnvironment.getExecutionEnvironment();final DataStreamSourceTuple2String, Integer stream environment.fromCollection(Arrays.asList(Tuple2.of(a, 1), Tuple2.of(a, 5), Tuple2.of(a, 7), Tuple2.of(a, 2), Tuple2.of(b, 2), Tuple2.of(b, 6), Tuple2.of(b, 3), Tuple2.of(b, 8), Tuple2.of(c, 4), Tuple2.of(c, 8), Tuple2.of(c, 4), Tuple2.of(c, 6)));//按照字符分组final KeyedStreamTuple2String, Integer, String keyedStream stream.keyBy((key) - key.f0);keyedStream.flatMap(new WCFlatMapFunction(WCKeyedState)).print();environment.execute(WCKeyedState);}public static class WCFlatMapFunction extends RichFlatMapFunctionTuple2String,Integer, Tuple2String,Integer{private String stateDesc;ValueStateTuple2String, Integer valueState;public WCFlatMapFunction(String stateDesc) {this.stateDesc stateDesc;}Overridepublic void flatMap(Tuple2String, Integer input, CollectorTuple2String, Integer out) throws Exception {Tuple2String, Integer wordCountList valueState.value();if(null wordCountList){wordCountList input;}else{wordCountList.f1 input.f1;}valueState.update(wordCountList);out.collect(wordCountList); // valueState.clear();}Overridepublic void open(Configuration parameters) {ValueStateDescriptorTuple2String,Integer descriptor new ValueStateDescriptor(stateDesc,TypeInformation.of(new TypeHintTuple2String, Integer() {}));//设置状态的存活时间StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();descriptor.enableTimeToLive(ttlConfig);valueState this.getRuntimeContext().getState(descriptor);//另外几种state类型 // this.getRuntimeContext().getMapState(); // this.getRuntimeContext().getListState(); // this.getRuntimeContext().getAggregatingState(); // this.getRuntimeContext().getReducingState();}} }在这个示例中看到其实键控状态与算子状态在应用代码层面最大的区别在于获取状态的方法。算子状态可以通过FunctionInitializationContext直接拿到状态而键控状态需要实现Rich***Function接口在open方法中通过getRuntimeContext()获取。 而更深层次的区别在于运行机制上。其实你可以把这个状态近似的理解为一个 (key,value)结构的本地缓存。算子状态的缓存是一个固定的key只是这个key跟当前计算任务有关只有当前这一个算子能够读到不管在哪个taskmanager上计算如果能读到这个缓存的话那读到的缓存就是一个固定的。而键控状态的缓存是一组(key,value)的缓存这一组缓存的key就是KeyedStream中的key分区键。而键控状态获取到的状态值都是取决于当前输入元素所代表的key分区键的因此每次任务时taskmanager上分配的key不同那就可能读取到不同的值。 另外根据状态类型不同 Flink也提供了几种不同的状态 ValueState: 保存一个可以更新和检索的值。 这个值可以通过 update(T) 进行更新通过 T value() 进行检索。ListState: 保存一个元素的列表。可以往这个列表中追加数据并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素通过Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。ReducingState: 保存一个单值表示添加到状态的所有值的聚合。接口与ListState 类似但使用 add(T) 增加元素会使用提供的ReduceFunction 进行聚合。AggregatingState: 保留一个单值表示添加到状态的所有值的聚合。和ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似但使用 add(IN) 添加的元素会用指定的AggregateFunction 进行聚合。MapState: 维护了一个映射列表。 你可以添加键值对到状态中也可以获得反映当前所有映射的迭代器。使用 put(UKUV) 或者 putAll(Map) 添加映射。使用 get(UK) 检索特定 key。 使用 entries()keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。 这些不同的状态都是跟Key相关的。使用时都需要通过构建一个对应的 StateDescriptor然后通过getRuntimeContext获取。 3、Checkpointing 检查点 Flink中的每个算子都可以是有状态的这些状态化的方法和算子可以使Flink的计算过程更为精确在实际开发中应该尽量使用带状态的算子。而对于这些状态除了可以通过算子状态和键控状态进行扩展外Flink也提供了另外一种自动的兜底机制CheckPointing检查点。 Checkpointing检查点是一种由Flink自动执行的一种状态备份机制其目的是能够从故障中恢复。快照中包含了每个数据源Source的指针(例如到文件或者kafka分区的偏移量)以及每个有状态算子的状态副本。 默认情况下检查点机制是禁用的需要在应用中通过StreamExecutionEnvironment 进行配置。基础的配置方式是通过StreamExecutionEnvironment的enableCheckpointing方法开启开启时需要传入一个参数表示多长时间执行一次快照。另外有一些高级的选项可以参见下面的示例。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // 每 1000ms 开始一次 checkpoint env.enableCheckpointing(1000);// 高级选项 // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ON CE); // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Checkpoint 必须在一分钟内完成否则就会被抛弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 开启在 job 中止后仍然保留的 externalized checkpoints env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpo intCleanup.RETAIN_ON_CANCELLATION); 4、Flink的容错重启机制 当某一个task发生故障时Flink需要重启出错的task以及其他收到影响的Task以使得作业恢复到正常执行的状态。Flink通过重启策略和故障恢复策略来控制Task重启重启策略决定是否可以重启以及重启的间隔故障恢复策略决定哪些Task需要重启。 重启策略可以通过配置文件flink-conf.yaml中通过restart-strategy属性进行配置同样也可以在应用程序中覆盖配置文件中的配置。如果没有启用checkpoint那就采用不重启的策略。如果启用了checkpoint并且没有配置重启策略那么就采用固定延时重启策略这种情况下最大尝试重启次数是Integer.MAX_VALUE基本就可以认为是会不停的尝试重启。 restart-strategy属性可选的配置有以下几种 none 或 off 或 disable: 不重启。checkpointing关闭后的默认值fixeddelay, fixed-delay: 固定延迟重启策略。checkpointing启用时的默认值failurerate, failure-rate: 失败率重启策略 这些配置项同样可以在应用程序中定制。例如 ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 尝试重启的次数 Time.of(10, TimeUnit.SECONDS) // 延时 ));fixeddelay策略还可以定制两个参数restart-strategy.fixed-delay.attempts 重试次数以及 restart-strategy.fixed-delay.delay延迟时间。第一个参数表示重启任务的尝试次数第二个参数表示重启失败后再次尝试重启的间隔时间。可以配置为 “1 min”,20 s这样。 例如在配置文件中 restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s或者在应用程序中 ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // 尝试重启的次数Time.of(10, TimeUnit.SECONDS) // 延时 ));Failure Rate 策略表示当故障率(每个时间假根发生故障的次数)超过设定的限制时作业就会最终失败。 在连续的两次重启尝试之间重启策略会等待一段固定长度的时间。 这种策略下可以定义三个详细的参数。 restart-strategy.failure-rate.max-failures-per-interval 任务失败之前在固定时间间隔内的最大重启尝试次数。restart-strategy.failure-rate.failure-rate-interval 检测失败率的窗口间隔。restart-strategy.failure-rate.delay 两次重启尝试之间的间隔时间。 例如在配置文件中 restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 5 min restart-strategy.failure-rate.delay: 10 s或者在应用中 ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.failureRateRestart(3, // 每个时间间隔的最大故障次数Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔Time.of(10, TimeUnit.SECONDS) // 延时 ));5、State Backend 状态存储方式与位置 通过算子状态键控状态以及检查点我们可以对计算过程中的中间状态进行保存。这些保存下来的状态即可以在计算中使用也可以在计算程序异常终止后恢复计算状态时使用。但是到目前为止我们都是直接拿来用而并没有去关注这些状态数据是以何种方式保存并且是保存在什么地方的。 针对这些状态Flink提供了多种State Backend 状态后端用来管理状态数据具体的存储方式与位置。Flink默认提供了三种状态后端jobmanager,filesystem,rocksdb。设置的方式可以在file-conf.yaml中通过state.backend属性进行配置。也可以在程序中通过StreamExecutionEnvironment配置。例如 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(...);另外在flink-conf.yaml中关于state.backend还有一些扩展的属性这些属性同样即可以在配置文件中配置也可以在程序中设置。应用程序中的配置优先级更高。 那这三种状态后端要如何取舍呢 jobmanager: jobmanager在后台由一个MemoryStateBackend类来实现从名字看出是基于内存实现的。状态信息会保存到TaskManager的JVM堆内存中而检查点信息则会直接保存到JobManager的内存中。这些检查点信息虽然都是基于内存工作但是也依然会持久化到文件系统当中。 由于检查点保存在Jobmanager中会加大taskmanager和jobmanager之间的 网络请求并且也会加大jobmanager的负担所以这种方式通常只用于实验场 景或者小状态的本地计算场景。 filesystem: filesystem在后台由一个FsStateBackend类来实现。他依然是基于内存和文件系统进行状态保存。但是检查点信息是由taskmanager进行保存的。保存的文件地址是可以自行配置的。由于taskmanager上执行的任务是动态分配的所以通常这个保存地址需要配置成所有taskmanager都能访问到的地方例如hdfs。而taskmanager上由于会有多个并行任务所以他们的文件存储地址也会用数字进行版本区分例如hdfs://namenode:port/flink-checkpoints/chk-17/.filesystem的状态访问很快速适合那些需要大的堆内存的场景。但是fliesystem是受限于内存和GC的所以他支持的状态数据大小优先。 rocksdb: rocksdb在后台是由一个 RocksDBStateBackend 类来实现的。RocksDB是一个访问快速的key-value本地缓存你可以把他理解为一个本地的Redis。但是他能够基于文件系统提供非常高效的访问。所以是一个非常常用的流式计算持久化工具。使用RocketDB后状态数据就不再受限于内存转而受限于硬盘RocketDBStateBackend适合支持非常大的状态信息存储。但是RocksDB毕竟是基于文件系统的所以他的执行速度会比filesystem稍慢官方提供的经验是大概比filesystem慢10倍但是这个速度在大多数场景下也依然够用了。 注如果在应用中使用rocksdb需要引入一个依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb_2.12/artifactIdversion${flink.version}/version /dependency然后使用StreamExecuteEnvironment设置 env.setStateBackend(new RocksDBStateBackend(key));章节总结 在流式计算的场景下应用程序通常是无法预知数据何时到来的只能一直运行随时等待数据接入。这时一旦应用程序突然出错终止就很容易造成数据丢失。所以在流式计算场景下我们需要对程序的健壮性做更多的考量。Flink提供了一系列的状态机制来加强程序的健壮性。但是在重要的生产环境中我们对程序健壮性做再多的考量都是不过分的因此通常还需要加上一些基于运维的监控机制例如监控flink的进程监控yarn中的任务状态等来了进一步保证流式计算程序的安全。
http://www.dnsts.com.cn/news/183195.html

相关文章:

  • 最适合新人的写作网站网页视频下载慢怎么办
  • 设计接单子网站文字图片生成器在线
  • 嘉兴模板建站公司外贸常用网站有哪些
  • 保定专业网站制作竞价出价怎么出
  • 广州做网站哪家好公司企业摄影网站模板
  • 最专业的手机网站制作做美剧盗版网站
  • 网站中二级导航栏怎么做嘉兴网站建设公司
  • 周口建设路网站雨花区网站建设
  • 东莞建筑建设网站建设开发小程序模板
  • 湛江做网站报价麓谷网站建设
  • 电商网站推广渠道网站开发项目延期说明
  • 网站建设案例效果校园网门户网站建设
  • 21天学会网站开发云开发高级布道师
  • 统一管理网站系统梅林网站建设
  • 为什么网站打开老是提示建设中wordpress文章同步微信公众号
  • 我用织梦5.7做个网站应该把淘宝客店铺链接放到哪企业网站建设多少钱
  • 网站开发行业发展用vue的网站
  • 如何高效率的建设网站企业文化墙创意设计图
  • 网站改版意见南阳网站运营
  • 正规的企业建站公司展览中心网站建设
  • 电子商务网站设计策划书千锋教育培训收费一览表
  • php新手网站开发建站之星破解版手机
  • 酒店要做关于网站ppt怎么做产品设计有出路吗
  • 网站建设联系我们dw做的网站怎么放到服务器上
  • 婚纱销售网站最新公司注册流程
  • 建始县城乡建设局网站wordpress添加< iframe>
  • 精品课程网站建设总结报告建一个漫画网站
  • 家庭清洁东莞网站建设技术支持国内新闻最新消息2022
  • 看希岛爱理做品的网站广州短视频制作运营
  • 北京网站营销seo方案wordpress映射