网站个人备案步骤,网站侧边 跟随 样式,温州市门户网站,广州市区引入
前面我们已经深入了HDFS的设计与实现#xff0c;对于分布式系统也有了不错的理解。
但HDFS仅仅解决了海量数据存储和读写的问题。要想让数据产生价值#xff0c;一定是需要从数据中挖掘出价值才行#xff0c;这就需要我们拥有海量数据的计算处理能力。
下面我们还是…引入
前面我们已经深入了HDFS的设计与实现对于分布式系统也有了不错的理解。
但HDFS仅仅解决了海量数据存储和读写的问题。要想让数据产生价值一定是需要从数据中挖掘出价值才行这就需要我们拥有海量数据的计算处理能力。
下面我们还是老样子来数据一下要实现海量计算处理能力有些什么核心痛点
大数据计算核心痛点
量级大
在稍微大一点的互联网企业需要计算处理的数据量都开始以PB计了。而传统的计算处理模型中一个程序所能调度的网络带宽通常在数百MB、内存容量通常就几十GB 、磁盘大小通常也就数TB根本解决不了这么大量级的数据计算需求。什么我打宿傩
易用差
虽然在04年已经有了分布式计算但是那个时候的分布式计算都是专用的系统只能专门处理某一类计算比如进行大规模数据的排序。这样的系统没办法复用到其他的大数据计算场景每一种应用都需要开发与维护专门的系统。很难让让没有分布式系统知识和经验的人可以快速简便地去利用分布式计算处理海量数据。
门槛高
而且因为分布式系统中遇到故障和失败是一个很常见的问题传统的分布式程序设计如MPI非常复杂用户需要关注的细节非常多比如数据分片、数据传输、节点间通信等因而设计分布式程序的门槛非常高。
容错差
在分布式环境下随着集群规模的增加集群中的故障率会显著增加进而导致任务失败和数据丢失的可能性增加。 这里的“故障”主要指磁盘损坏、机器宕机、节点间通信失败等硬件故障和坏数据以及用户程序bug产生的软件故障等。 Hadoop MapReduce设计
针对这些痛点MapReduce的核心设计目标在保障扩展性和容错性的前提下提升海量数据计算处理的易用性
而它的实现的核心思路也很简答就是通过开发统一通用的编程模型并构建一个抽象和高层的编程接口和框架屏蔽分布式领域的复杂问题让开发者能够专注于分析程序的业务逻辑。 模型本质就是对现实世界中某种事物或现象的一种概括、抽象的表示。 比如函数是输入和输出之间关系的抽象数学公式是对物理与数学规律的抽象软件架构图是软件工程师对软件系统的抽象。 通过前面深入HDFS的篇章我们知道从本质上来看HDFS就是通过抽象、封装的思想把成百上千台服务器、成千上万块硬盘的硬件做了一个封装屏蔽了底层复杂实现让使用者可以把它当成一块硬盘来使用这极大的降低了它的使用门槛。 其实无论是什么领域学会去抽象总结才能把握事物的内在规律而不是被纷繁复杂的事物表象所迷惑才能更进一步深刻地认识这个世界 为了让大家先对MapReduce这个通用模型有个初步的概念我举个通俗的例子——假设我们有一大堆杂乱无序的相同品牌的扑克牌要快速把它们梳理出有多少副完整的扑克牌就可以分几个组一组人分别梳理一堆牌把相同的牌放到一个位置然后下一组人基于这些牌去计数不同的牌有多少张最后汇总起来就能知道可以组成多少副完整的扑克牌了。
下面我们看看Hadoop MapReduce的设计与落地的总体思路。
参考思想Unix设计哲学Unix下的Bash和管道
MapReduce的设计哲学和Unix是一样的叫做 “Do one thing, and do it well”也就是每个模块只做一件事情但是把这件事情彻底做好。
而MapReduce的计算流程设计思想也是参考了Unix系统中利用一个个命令通过管道把数据处理流程串接起来处理的模式。
参考设计Lisp类函数式编程
其实MapReduce编程模型并不是Hadoop原创甚至也不是Google原创而是借鉴了Lisppython、scala这类函数式编程语言的思想。
熟悉Java Stream API的都对这种编程模式并不陌生它实际上就是map、groupingBy、reduce之类的操作这种编程模型分离了程序的业务逻辑和控制逻辑使得程序在大规模的分布式环境下运行成为了可能。
另外尽管MapReduce编程模型非常简单现实中的大多数任务却都可以用这种编程模型来表达这在函数式编程语言中已经得到了证明它为MapReduce后来广泛地流行奠定了基础。
参考论文MapReduce: Simplified data processing on large cluster
Google在2004年发布的这篇论文也是大数据的三驾马车之一
该论文主要包含下面内容 MapReduce的计算模型和应用场景 MapReduce实际是如何实现的使得开发者无需关心分布式的存在 如何逐步迭代优化MapReduce的性能。
这里就不单独开一篇文章介绍了下面我们把论文里面的核心内容梳理一下。
需求场景
第一种是对所有的数据都只需要单条数据就能完成处理。
比如有很多网页的内容要从里面提取出来每一个网页的标题。这样的计算可以完全并行化。
第二种是需要汇总多条数据才能完成计算。
比如要统计日志里面某个URL被访问了多少次只需要简单累加就可以了。
比如统计某个URL下面的唯一用户数就需要将所有相同URL的数据搬运到同一个计算节点上进行处理。不过在搬运之后不同的URL还是可以放到不同的节点进行处理的。
第三种自然是一、二两种情况的组合了。
比如先从网页数据里面提取出网页的URL和标题然后根据标题里面的关键字统计特定关键字出现在多少个不同的URL里面这就需要同时采用一二这两种情况的操作。
当然还有更复杂的数据操作但是这些动作也都可以抽象成前面的两个动作的组合。因为无非要处理的数据要么是完全独立的要么需要多条数据之间的依赖。
计算模型
前面需求场景的
第一种动作就是 MapReduce 里面的 Map
Map 函数顾名思义就是一个映射函数它会接受一个 key-value 对然后把这个 key-value 对转换成0到多个新的 key-value 对并输出出去。
第二种动作就是 MapReduce 里面的 Reduce
Reduce 函数则是一个化简函数它接受一个 Key以及这个 Key 下的一组 Value然后化简成一组新的值 Value 输出出去。
Map 函数的输出结果会被整个 MapReduce 程序接手进行 shuffle 操作。也就是数据搬运的过程。
shuffle 会把 Map 函数输出的所有相同的 Key 的 Value 整合到一个列表中给到 Reduce 函数。并且给到 Reduce 函数的 Key在每个 Reduce 里都是按照 Key 排好序的。
它们就构建了 MapReduce 的计算模型 注意 shuffle 过程的排序操作并不是 MapReduce 框架本身的核心需求而是为了技术上实现方便。因为我们要把相同 Key 的数据放到一起处理而通过一个 HashMap 把所有的数据放在内存里又不一定放得下。那么利用硬盘进行外部排序是一个最简单的没有内存大小依赖的对数据根据 Key 进行分组的解决办法。 MapReduce 计算模型的设计其实就是典型的模版方法模式Template Method Pattern。 与其说它是一个分布式数据处理系统不如说是分布式数据处理框架。 因为 MapReduce 框架已经设定好了整个数据处理的流程用户只需要实现 Map 和 Reduce 这两个接口函数就能完成海量的数据处理。 应用场景
论文里列了以下六个应用场景 分布式 grep ; 统计 URL 的访问频次; 反转网页-链接图; 分域名的词向量; 生成倒排索引; 分布式排序。
实现挑战
要想让写 Map 和 Reduce 函数的人不需要关心“分布式”的存在那么 MapReduce 框架本身就需要解决好三个很重要的问题 第一个自然是如何做好各个服务器节点之间的“协同”以及解决出现各种软硬件问题后的“容错”这两部分的设计。 第二个性能问题。MapReduce 框架非常容易遇到网络性能瓶颈。尽量充分利用 MapReduce 集群的计算能力并让整个集群的性能可以随硬件的增加接近于线性增长可以说是非常大的一个挑战。 最后一个易用性问题。Map 函数和 Reduce 函数最终还是运行在多个不同的机器上的并且在 Map 和 Reduce 函数中还会遇到各种千奇百怪的数据。当我们的程序在遭遇到奇怪的数据出错的时候我们需要有办法来进行 debug。
MapReduce 的协同
MapReduce的集群通常就是分布式存储系统GFS的集群。
在这个集群里本身会有一个调度系统Scheduler。
当我们要运行一个MapReduce任务的时候其实就是把整个MapReduce的任务提交给这个调度系统让这个调度系统来分配和安排 Map 函数和 Reduce 函数以及后面会提到的 master 在不同的硬件上运行。
在MapReduce任务提交了之后整个MapReduce任务就会按照这样的顺序来执行 第一步由于写好的MapReduce程序已经指定了输入路径。所以MapReduce会先找到GFS 上的对应路径然后把对应路径下的所有数据进行分片Split。每个分片的大小通常是 64MB这个尺寸也是GFS里面一个块Block的大小。接着MapReduce 会在整个集群上启动很多个MapReduce程序的复刻fork进程。 第二步在这些进程中有一个和其他不同的特殊进程就是一个master进程剩下的都是worker进程。然后会有M个map的任务以及R个 reduce 的任务分配给这些worker进程去进行处理。这里的master进程是负责找到空闲的idleworker进程然后再把map任务或者reduce任务分配给worker进程去处理。 这里需要注意一点并不是每一个map和reduce任务都会单独建立一个新的worker 进程来执行。而是master进程会把map和reduce任务分配给有限的worker因为一个worker通常可以顺序地执行多个map 和reduce 的任务。 第三步被分配到map任务的worker会读取某一个分片分片里的数据会变成一个个key-value对喂给map任务然后等Map函数计算完后会生成的新的key-value对缓冲在内存里。 第四步这些缓冲了的key-value对会定期地写到map任务所在机器的本地硬盘上。 并且按照一个分区函数partitioning function把输出的数据分成R个不同的区域。 而这些本地文件的位置会被worker传回给到master节点再由master节点将这些地址转发给reduce任务所在的worker 那里。 第五步运行reduce任务的worker在收到master的通知之后会通过RPC(远程过程调用)来从map任务所在机器的本地磁盘上抓取数据。当reduce任务的worker 获取到所有的中间文件之后它就会将中间文件根据Key进行排序。这样所有相同Key的Value 的数据会被放到一起也就是完成了混洗(Shuffle)的过程。 第六步reduce会对排序后的数据执行实际的Reduce函数并把reduce的结果输出到当前这个reduce分片的最终输出文件里。 第七步当所有的map任务和reduce任务执行完成之后master会唤醒启动MapReduce任务的用户程序然后回到用户程序里往下执行MapReduce任务提交之后的代码逻辑。
整个MapReduce的执行过程也是一个典型的 Master-Slave 的分布式系统。map和 reduce所在的worker之间并不会直接通信它们都只和master通信。另外像是map 的输出数据在哪里这样的信息也是告诉master让master转达给reduce 所在的 worker。reduce从map里获取数据也是直接拿到数据所在的地址去抓取而不是让reduce通过RPC调用map所在的worker去获取数据。 MapReduce 的容错Fault Tolerance
MapReduce的容错机制非常简单可以简单地用两个关键词来描述就是重新运行和写Checkpoints。
worker 节点的失效Master Failure
对于Worker 节点的失效MapReduce框架解决问题的方式非常简单。就是换一台服务器重新运行这个Worker节点被分配到的所有任务。master节点会定时地去ping每一个worker 节点一旦worker节点没有响应就会认为这个节点失效了。于是master会重新在另一台服务器上启动一个worker进程并且在新的worker进程所在的节点上重新运行所有失效节点上被分配到的任务。而无论失效节点上之前的map和 reduce任务是否执行成功这些任务都会重新运行。因为在节点ping不通的情况下很难保障它的本地硬盘还能正常访问。
master 节点的失效Worker Failure
对于 master节点的失效直接就任由master节点失败了也就是整个MapReduce任务失败了。而对于开发者来说解决这个问题的办法也很简单就是再次提交一下任务去重试。
因为master进程在整个任务中只有一个它会失效的可能性很小。而MapReduce的任务也是一个用户离线数据处理的任务并不是一个实时在线的服务失败重来通常也没有什么影响只是晚一点拿到数据结果罢了。
虽然在论文发表的时候谷歌并没有实现对于master的失效自动恢复机制但他们也给出了一个很简单的解决方案那就是让master定时把它里面存放的信息作为一个个的Checkpoint写入到硬盘中去。
针对这个其实可以把这个Checkpoint直接写到GFS里然后让调度系统监控master。这样一旦master失效就可以启动一个新的master来读取Checkpoints 数据然后就可以恢复任务并继续执行了而不需要重新运行整个任务。
对错误数据视而不见
worker 和 master 的节点失效以及对应的恢复机制通常都是来自于硬件问题。但是在海量数据处理的情况下比如在TB乃至PB级别的数据下还会经常遇到“脏数据”的问题。
这些数据可能是日志采集的时候就出错了也可能是一个非常罕见的边界情况edge-case我们的Map和Reduce 函数正好处理不了。甚至有可能只是简单的硬盘硬件的问题带来的错误数据。
那么对于这些异常数据我们固然可以不断debug一一修正。但是这么做大多数时候都是划不来的因为很可能为了一条数据记录由于Map函数处理不了你就要重新扫描几TB的数据。
所以MapReduce不仅为节点故障提供了容错机制对于这些极少数的数据异常带来的问题也提供了一个容错机制。MapReduce会记录Map或者Reduce函数运行出错的具体数据的行号如果同样行号的数据执行重试还是出错它就会跳过这一行的数据。如果这样的数据行数在总体数据中的比例很小那么整个MapReduce程序会忽视这些错误仍然执行完成。毕竟一个URL被访问了1万次还是9999次对于搜素引擎的排序结果不会有什么影响。
MapReduce 的性能优化
MapReduce集群里的硬件配置方面的最大瓶颈自然和 GFS 也一样——网络带宽。
把程序搬到数据那儿去
既然网络带宽是瓶颈那么优化的办法自然就是尽可能减少需要通过网络传输的数据。在MapReduce这个框架下就是在分配map任务的时候根据需要读取的数据在哪里进行分配。由于GFS是知道每一个Block 的数据是在哪台服务器上的。而MapReduce会找到同样服务器上的worker来分配对应的map 任务。如果那台服务器上没有那么它就会找离这台服务器最近的、有worker 的服务器来分配对应的任务。
除此之外由于MapReduce程序的代码往往很小可能只有几百KB或者几MB但是每个map需要读取的一个分片的数据是64MB大小。这样我们通过把要执行的MapReduce程序复制到数据所在的服务器上就不用多花那10倍乃至100倍的网络传输量了。 通过Combiner减少网络数据传输
除了Map函数需要读取输入的分片数据之外Reduce所在的worker去抓取中间数据一样也需要通过网络。那么要在这里减少网络传输最简单的办法就是尽可能让中间数据的数据量小一些。
在MapReduce的框架里MapReduce允许开发者自己定义一个Combiner 函数。这个Combiner函数会对在同一个服务器上所有map 输出的结果运行一次然后进行数据合并。实际上不仅是同一个Map函数的输出可以合并同一台服务器上多个Map的输出我们都可以合并。反正它们都在一台机器上合并只需要本地的硬盘读写和CPU并不需要我们最紧缺的网络资源。以域名的访问次数为例它的数据分布一定有很强的头部效应少量20%的域名可能占了80%的访问记录。这样一合并我们要传输的数据至少可以减少60%。如果考虑一台 16 核的服务器有16个map的worker运行应该还能再减少80%以上。这样通过一个中间的Combiner我们要传输的数据一下子就下降了两个数量级大大缓解了网络传输的压力。 注意不是所有场景都能预聚合处理的比如求中位数。 MapReduce 的 debug 信息
虽然我们一直说我们希望MapReduce让开发者意识不到分布式的存在。但是归根到底map和reduce的任务都是在分布式集群上运行的这个就给我们对程序debug 带来了很大的挑战。无论是通过debugger做单步调试还是打印出日志来看程序执行的情况都不太可行。
所以MapReduce也为开发者贴心地提供了三个办法来解决这个问题 第一个是提供一个单机运行的MapReduce的库这个库在接收到MapReduce任务之后会在本地执行完成map和reduce的任务。这样你就可以通过拿一点小数据在本地调试你的MapReduce任务了无论是debugger还是打日志都行得通。 第二个是在master 里面内嵌了一个HTTP服务器然后把master的各种状态展示出来给开发者看到。这样一来你就可以看到有多少个任务执行完了有多少任务还在执行过程中它处理了多少输入数据有多少中间数据有多少输出的结果数据以及任务完成的百分比等等。同样的里面还有每一个任务的日志信息。 另外通过这个HTTP 服务器你还可以看到具体是哪一个worker里的任务失败了对应的错误日志是什么。这样你就可以快速在线上定位你的程序出了什么错是在哪台服务器上。 最后一个是MapReduce框架里提供了一个计数器(counter)的机制。作为开发者你可以自己定义几个计数器然后在Map 和Reduce的函数里去调用这个计数器进行自增。所有 map 和reduce的计数器都会汇总到master节点上通过上面的HTTP服务器里展现出来。 比如你就可以利用这个计数器去统计有多少输入日志的格式和预期的不一样。如果比例太高那么多半你的程序就有Bug没有兼容所有合法的日志。
遗憾与缺陷
尽管MapReduce框架已经作出了很多努力但是今天来看整个计算框架的缺陷还是不少的。
主要的缺陷有两个 第一个是还没有100%做到让用户意识不到“分布式”的存在无论是Combiner 还是Partitioner都是让开发者意识到它面对的还是分布式的数据和分布式的程序。 第二个是性能仍然不太理想这体现在两个方面 一个是每个任务都有比较大的overhead都需要预先把程序复制到各个 worker 节点然后启动进程; 另一个是所有的中间数据都要读写多次硬盘。map 的输出结果要写到硬盘上reduce抓取数据排序合并之后也要先写到本地硬盘上再进行读取所以快不起来。
Hadoop MapReduce核心设计
Hadoop MapReduce 参考了上面的相关内容其设计和落地在企业落地后也是有不断优化迭代的。
和MapReduce的论文不太一样。在Hadoop1.0实现里每一个MapReduce的任务并没有一个独立的master进程而是直接让调度系统承担了所有的worker 的master 的角色这就是Hadoop1.0里的 JobTracker。在Hadoop1.0里MapReduce论文里面的worker就是TaskTracker用来执行map 和 reduce的任务。而分配任务以及和TaskTracker沟通任务的执行情况都由单一的JobTracker 来负责。
这个设计也导致了只要服务器数量一多JobTracker的负载就会很重。所以早年间单个Hadoop 集群能够承载的服务器上限被卡在了4000台。而且JobTracker也成为了整个Hadoop 系统很脆弱的“单点”。
在Hadoop 2.0Hadoop社区把JobTracker的角色拆分成了进行任务调度的Resource Mananger以及监控单个MapReduce任务执行的Application Master回到了和MapReduce论文相同的架构。
MRv1
第一代MapReduce计算框架由两部分组成编程模型programming model和运行时环境runtime environment。
基本编程模型是将问题抽象成Map和Reduce两个阶段。 Map阶段将输入数据解析成 key/value迭代调用 map() 函数处理后再以 key/value 的形式输出到本地目录 Reduce 阶段则将 key 相同的 value 进行 reduce 处理并将最终结果写到 HDFS 上。
运行时环境由两类服务组成JobTracker 和 TaskTracker。 JobTracker 负责资源管理和所有作业的控制 TaskTracker 负责接收来自JobTracker的命令并执行它
YARN/MRv2
针对MRv1中的MapReduce在扩展性和多框架支持方面的不足提出了全新的资源管理框架YARNYet Another Resource Negotiator。
将JobTracker中的资源管理和作业控制功能分开分别由两个不同进程ResourceManager和ApplicationMaster实现。 ResourceManager负责所有应用程序的资源分配 ApplicationMaster仅负责管理一个应用程序
总结
今天我们梳理了MapReduce的设计与实现的思路后面我们深入源码去看看MapReduce有哪些有意思的东西。