当前位置: 首页 > news >正文

织梦贷款网站源码宁波制作网站的公司

织梦贷款网站源码,宁波制作网站的公司,注册一家设计公司流程,网站网址没有被百度收录背景#xff1a; kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分#xff0c;分成多个数据块的形式#xff0c;每个数据源算子任务会被分配以读取…背景 kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分分成多个数据块的形式每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块判断文件是否可以进行分块的代码如下 protected boolean testForUnsplittable(FileStatus pathFile) {if (getInflaterInputStreamFactory(pathFile.getPath()) ! null) {unsplittable true;return true;}return false; }private InflaterInputStreamFactory? getInflaterInputStreamFactory(Path path) {String fileExtension extractFileExtension(path.getName());if (fileExtension ! null) {return getInflaterInputStreamFactory(fileExtension);} else {return null;} }后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分切分的具体代码如下所示 while (samplesTaken numSamples fileNum allFiles.size()) {// make a split for the sample and use it to read a recordFileStatus file allFiles.get(fileNum); // 根据偏移量进行切分FileInputSplit split new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);// we open the split, read one line, and take its lengthtry {open(split);if (readLine()) {totalNumBytes this.currLen this.delimiter.length;samplesTaken;}} finally {// close the file stream, do not release the bufferssuper.close();} // 偏移量迁移offset stepSize;// skip to the next file, if necessarywhile (fileNum allFiles.size() offset (file allFiles.get(fileNum)).getLen()) {offset - file.getLen();fileNum;} }再来看一下TextInputFormat如何支持checkpoint操作保存文件的偏移量的代码 Override public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);checkState(checkpointedState ! null, The operator state has not been properly initialized.);int subtaskIdx getRuntimeContext().getIndexOfThisSubtask();// 算子列表状态checkpointedState.clear();// 获取文件的当前读取的偏移ListT readerState getReaderState();try {for (T split : readerState) {//保存到检查点路径中checkpointedState.add(split);}} catch (Exception e) {checkpointedState.clear();throw new Exception(Could not add timestamped file input splits to to operator state backend of operator getOperatorName() .,e);}if (LOG.isDebugEnabled()) {LOG.debug({} (taskIdx{}) checkpointed {} splits: {}.,getClass().getSimpleName(),subtaskIdx,readerState.size(),readerState);} } 从检查点中恢复状态的代码如下 public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);checkState(checkpointedState null, The reader state has already been initialized.);// 初始化算子操作状态checkpointedState context.getOperatorStateStore().getListState(new ListStateDescriptor(splits, new JavaSerializer()));int subtaskIdx getRuntimeContext().getIndexOfThisSubtask();LOG.info(Restoring state for the {} (taskIdx{})., getClass().getSimpleName(), subtaskIdx);splits splits null ? new PriorityQueue() : splits;for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块splits.add(split);} }
http://www.dnsts.com.cn/news/173761.html

相关文章:

  • 成品网站灬1688个人备案的网站可以做淘宝客吗
  • 宜阳建站php网站开发框架
  • 平原做网站国外最开放的浏览器有什么优势
  • 屏蔽阿里云网站常见的微网站平台有哪些方面
  • 设计软件网站wordpress 优势
  • 大型网站开发教程住房和城乡建设部网站倪虹
  • 广西备案工信部网站济南网络优化推广
  • 广州网站seo优化排名wordpress 数据交互
  • 企业网站制作费做分录重庆工程建设造价信息网站
  • 北京网站建设外包公司哪家好网站网站怎么做
  • 网站设计需要多少费用建筑工程招聘信息网
  • 建站公司网站建设企业建设网
  • 无锡免费做网站惠城东莞网站建设
  • 机关网站建设前期准备工作张家界直播视频
  • 衡阳网站滁州新橙科技网站建设
  • 档案网站建设愿景网络营销理论起源
  • 沧州网站建设网海申newedge wordpress
  • 太原建站网站模板wordpress 线报主题
  • 网站上官网标识怎么做做网站的实验报告
  • 一诺千金 网站建设郑州便民核酸采样屋正在搭建中
  • 湖南平台网站建设设计回合网页游戏排行榜前十名
  • 做两个一摸一样的网站网站建设 选猴王网络
  • 营销型网站的建设要求都有什么影响大连企业制作网站
  • 宁波专业做网站做不了大流量网站
  • 专业建站公司主要做什么旅游网站做模板素材
  • 视频网站用什么cms优秀网站建设哪家便宜
  • 网站建设如何找客户网页界面设计评分标准
  • 自己做网站现实么现代郑州网站建设
  • 使用wordpress的网站专做教育网站拿站
  • 网站建设与网站主机的选择广州网站建设鞍山