当前位置: 首页 > news >正文

网站流量指数沈阳男科医院收费标准

网站流量指数,沈阳男科医院收费标准,非凡免费建网站平台,宁波市住房和城乡建设培训中心网站文章目录 1. 状态初始化总流程梳理2.创建StreamOperatorStateContext3. StateInitializationContext的接口设计。4. 状态初始化举例#xff1a;UDF状态初始化 在TaskManager中启动Task线程后#xff0c;会调用StreamTask.invoke()方法触发当前Task中算子的执行#xff0c;在… 文章目录 1. 状态初始化总流程梳理2.创建StreamOperatorStateContext3. StateInitializationContext的接口设计。4. 状态初始化举例UDF状态初始化 在TaskManager中启动Task线程后会调用StreamTask.invoke()方法触发当前Task中算子的执行在invoke()方法中会调用restoreInternal()方法这中间包括创建和初始化算子中的状态数据。 另外在invoke中可以通过判断任务状态来判断是否需要初始化状态。 // Allow invoking method invoke without having to call restore before it.if (!isRunning) {LOG.debug(Restoring during invoke will be called.);restoreInternal();}StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。 RegularOperatorChain. public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { Iterator var2 this.getAllOperators(true).iterator(); while(var2.hasNext()) { StreamOperatorWrapper?, ? operatorWrapper (StreamOperatorWrapper)var2.next(); StreamOperator? operator operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); } }找到了算子状态初始化的位置我们继续了解状态是如何初始化的。 1. 状态初始化总流程梳理 AbstractStreamOperator.initializeState中描述了状态初始化的总体流程如下代码以及注释 # AbstractStreamOperator.initializeStatepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { //1. 获取类型序列化器final TypeSerializer? keySerializer config.getStateKeySerializer(getUserCodeClassloader()); //2. get containingTaskfinal StreamTask?, ? containingTask Preconditions.checkNotNull(getContainingTask()); final CloseableRegistry streamTaskCloseableRegistry Preconditions.checkNotNull(containingTask.getCancelables()); //3. create StreamOperatorStateContextfinal StreamOperatorStateContext context streamTaskStateManager.streamOperatorStateContext( getOperatorID(), getClass().getSimpleName(), getProcessingTimeService(), this, keySerializer, streamTaskCloseableRegistry, metrics, config.getManagedMemoryFractionOperatorUseCaseOfSlot( ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), isUsingCustomRawKeyedState()); //4. create stateHandlerstateHandler new StreamOperatorStateHandler( context, getExecutionConfig(), streamTaskCloseableRegistry); timeServiceManager context.internalTimerServiceManager(); //5. initialize OperatorStatestateHandler.initializeOperatorState(this); //6. set KeyedStateStore in runtimeContextruntimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null)); }在StreamOperator初始化状态数据的过程中首先从StreamTask中获取创建状态需要的组件例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。 状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用例如在AbstractUdfStreamOperator中就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。 2.创建StreamOperatorStateContext 接下来看如何在Task实例初始化时创建这些组件并将其存储在StreamOperatorStateContext中供算子使用如下代码 StreamTaskStateInitializerImpl Override public StreamOperatorStateContext streamOperatorStateContext( Nonnull OperatorID operatorID, Nonnull String operatorClassName, Nonnull ProcessingTimeService processingTimeService, Nonnull KeyContext keyContext, Nullable TypeSerializer? keySerializer, Nonnull CloseableRegistry streamTaskCloseableRegistry, Nonnull MetricGroup metricGroup, double managedMemoryFraction, boolean isUsingCustomRawKeyedState) throws Exception { //1. 获取task实例信息TaskInfo taskInfo environment.getTaskInfo(); OperatorSubtaskDescriptionText operatorSubtaskDescription new OperatorSubtaskDescriptionText( operatorID, operatorClassName, taskInfo.getIndexOfThisSubtask(), taskInfo.getNumberOfParallelSubtasks()); final String operatorIdentifierText operatorSubtaskDescription.toString(); final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates taskStateManager.prioritizedOperatorState(operatorID); CheckpointableKeyedStateBackend? keyedStatedBackend null; OperatorStateBackend operatorStateBackend null; CloseableIterableKeyGroupStatePartitionStreamProvider rawKeyedStateInputs null; CloseableIterableStatePartitionStreamProvider rawOperatorStateInputs null; InternalTimeServiceManager? timeServiceManager; try { // 创建keyed类型的状态后端// -------------- Keyed State Backend -------------- keyedStatedBackend keyedStatedBackend( keySerializer, operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, metricGroup, managedMemoryFraction); //创建operator类型的状态后端// -------------- Operator State Backend -------------- operatorStateBackend operatorStateBackend( operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry); //创建原生类型状态后端// -------------- Raw State Streams -------------- rawKeyedStateInputs rawKeyedStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawKeyedState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs); rawOperatorStateInputs rawOperatorStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawOperatorState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); //创建Internal Timer Service Manager// -------------- Internal Timer Service Manager -------------- if (keyedStatedBackend ! null) { // if the operator indicates that it is using custom raw keyed state, // then whatever was written in the raw keyed state snapshot was NOT written // by the internal timer services (because there is only ever one user of raw keyed // state); // in this case, timers should not attempt to restore timers from the raw keyed // state. final IterableKeyGroupStatePartitionStreamProvider restoredRawKeyedStateTimers (prioritizedOperatorSubtaskStates.isRestored() !isUsingCustomRawKeyedState) ? rawKeyedStateInputs : Collections.emptyList(); timeServiceManager timeServiceManagerProvider.create( keyedStatedBackend, environment.getUserCodeClassLoader().asClassLoader(), keyContext, processingTimeService, restoredRawKeyedStateTimers); } else { timeServiceManager null; } // -------------- Preparing return value -------------- return new StreamOperatorStateContextImpl( prioritizedOperatorSubtaskStates.getRestoredCheckpointId(), operatorStateBackend, keyedStatedBackend, timeServiceManager, rawOperatorStateInputs, rawKeyedStateInputs); } catch (Exception ex) { 。。。。 }流程梳理 从environment中获取TaskInfo并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等最终用于创建OperatorStateBackend的状态存储后端。创建KeyedStateBackendKeyedStateBackend是KeyedState的状态管理后端提供创建和管理KeyedState的方法。创建OperatorStateBackendOperatorStateBackend是OperatorState的状态管理后端提供获取和管理OperatorState的接口。创建KeyGroupStatePartitionStreamProvider实例提供创建和获取原生KeyedState的方法。创建StatePartitionStreamProvider实例提供创建和获取原生OperatorState的方法。将所有创建出来的托管状态管理后端keyedStatedBackend和operatorStateBackend、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例全部封装在StreamOperatorStateContextImpl上下文对象中并返回给AbstractStreamOperator使用。 小结 StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件并使用它们创建指定类型的状态最终状态数据会存储在状态管理后端指定的物理介质上例如堆内存或RocksDB。 StateInitializationContext会被用于算子和UserDefinedFunction中实现算子或函数中的状态数据操作。 3. StateInitializationContext的接口设计。 StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。 ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。 FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致这主要是为了和算子使用的上下文进行区分但两者的操作基本一致。 StateInitializationContext提供了对托管状态数据的管理并在内部继承和拓展了获取及管理原生状态数据的方法如getRawOperatorStateInputs()、getRawKeyedStateInputs()等 StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端并基于状态管理操作状态数据。 4. 状态初始化举例UDF状态初始化 在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。 AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。 public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction); }恢复函数内部的状态数据涉及Checkpoint的实现我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。 《Flink设计与实现核心原理与源码解析》张利兵
http://www.dnsts.com.cn/news/19443.html

相关文章:

  • 网站人多怎么优化网站建设最花时间的是
  • 未成年人思想道德建设网站黄冈做网站价格
  • 菏泽网站建设制作厦门建设官网
  • 德国网站域名后缀做刷单网站犯法吗
  • 如何拉下对手网站十种人不适合做管理者
  • 我的世界封面制作网站邢台网站建设哪家好
  • 南宁seo网站排名优化网站网页设计专业公司
  • 网站从哪几个方面维护山西软件开发公司排行
  • 德语网站制作网站建设与网页设计期末考试
  • 网站设计的思想编程做网站容易还是做软件
  • 新乡企业建网站免费建立个人网站官网
  • 58同城一样的网站怎样建设个人养老金制度9月底前亮相
  • 制作网站步骤兴宁房产网
  • 做网站要坚持qq空间网站根目录
  • 建设内部网站目的让网站快速收录
  • jsp网站服务建设是什么国外美容院网站
  • 做网站主播要什么条件广州网站制作一般多少钱
  • 湛江怎么做网站关键词优化做网站怎么兼职
  • 广州网站设计首选刻腾讯有做淘宝客网站吗
  • 做搜索引擎的网站北京公司网站
  • 烟台哪个公司做网站好wordpress防止cc
  • dw网站log怎么做微信做单网站
  • 德州哪家网站优化好南京做网站找哪家好
  • 网站开发主流语言多多鱼网页模板
  • 建设淘宝网站需要多少钱做100个网站挂广告联盟
  • 桂林网站制作找志合网络公司做旅游网站教程
  • 海北公司网站建设哪家快沂源网站开发
  • 滨州哪里做网站郑州上市企业网站建设
  • 建一个公司网站多少钱?优化大师网页版
  • 建站属于什么行业盐城做网站哪家公司好