网站建设需求调研方法,南宫企业做网站,镇江企业网站制作,上海网站建设|网站制作文章目录Spark OOM问题常见解决方式1.map过程产生大量对象导致内存溢出2.数据不平衡导致内存溢出3.coalesce调用导致内存溢出4.shuffle后内存溢出5. standalone模式下资源分配不均匀导致内存溢出6.在RDD中#xff0c;共用对象能够减少OOM的情况优化1.使用mapPartitions代替大部…
文章目录Spark OOM问题常见解决方式1.map过程产生大量对象导致内存溢出2.数据不平衡导致内存溢出3.coalesce调用导致内存溢出4.shuffle后内存溢出5. standalone模式下资源分配不均匀导致内存溢出6.在RDD中共用对象能够减少OOM的情况优化1.使用mapPartitions代替大部分map操作或者连续使用的map操作2.broadcast join和普通join3.先filter在join4.partitonBy优化5.combineByKey的使用6.内存不足时的优化7.在spark使用hbase的时候spark和hbase搭建在同一个集群参数优化部分8.spark.driver.memory (default 1g)9.spark.rdd.compress (default false)10.spark.serializer (default org.apache.spark.serializer.JavaSerializer )11.spark.memory.storageFraction (default 0.5)12.spark.locality.wait (default 3s)13.spark.speculation (default false)Spark OOM问题常见解决方式
1.map过程产生大量对象导致内存溢出
这种溢出的原因是在单个map中产生了大量的对象导致的。例如
rdd.map(xfor(i - 1 to 10000) yield i.toString)这个操作在rdd中每个对象都产生了10000个对象这肯定很容易产生内存溢出的问题。 针对这种问题在不增加内存的情况下可以通过减少每个Task的大小以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。 具体做法可以在会产生大量对象的map操作之前调用repartition方法分区成更小的块传入map。例如
rdd.repartition(10000).map(xfor(i - 1 to 10000) yield i.toString)。面对这种问题注意不能使用rdd.coalesce方法这个方法只能减少分区不能增加分区不会有shuffle的过程。
2.数据不平衡导致内存溢出
数据不平衡除了有可能导致内存溢出外也有可能导致性能的问题解决方法调用repartition重新分区。
3.coalesce调用导致内存溢出
理想情况下所以Spark计算后如果产生的文件太小我们会调用coalesce合并文件再存入hdfs中。例如在coalesce之前有100个文件这也意味着能够有100个Task现在调用coalesce(10)最后只产生10个文件。
但是事实上因为coalesce会降低父RDD的分区数这意味着coalesce并不是按照原本想的那样先执行100个Task再将Task的执行结果合并成10个而是从头到位只有10个Task在执行原本100个文件是分开执行的现在每个Task同时一次读取10个文件使用的内存是原来的10倍这导致了OOM。 源码参考 解决这个问题的方法是: 令程序按照我们想的先执行100个Task再将结果合并成10个文件这个问题同样可以通过repartition解决调用repartition(10)因为这就有一个shuffle的过程shuffle前后是两个Stage一个100个分区一个是10个分区就能按照我们的想法执行。
4.shuffle后内存溢出
shuffle内存溢出的情况可以说都是shuffle后单个文件过大导致的。 在Spark中joinreduceByKey这一类型的过程都会有shuffle的过程在shuffle的使用需要传入一个partitioner大部分Spark中的shuffle操作默认的partitioner都是HashPatitioner默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) 参数只对HashPartitioner有效.
所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出就需要从partitioner的代码增加partitions的数量。
5. standalone模式下资源分配不均匀导致内存溢出
在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 这两个参数但是没有配置–executor-cores这个参数的话就有可能导致每个Executor的memory是一样的但是cores的数量不同那么在cores数量多的Executor中由于能够同时执行多个Task就容易导致内存溢出的情况。 这种情况的解决方法就是同时配置–executor-cores或者spark.executor.cores参数确保Executor资源分配均匀。
6.在RDD中共用对象能够减少OOM的情况
下面这段代码会OOM因为每次(“key”,”value”)都产生一个Tuple对象
rdd.flatMap(xfor(i - 1 to 1000) yield (“key”,”value”))但是下面这段就不会出现OOM,”key””value”不管多少个都只有一个String对象指向常量池
rdd.flatMap(xfor(i - 1 to 1000) yield “key””value”)如果RDD中有大量的重复数据,或者Array中需要存大量重复数据的时候我们都可以将重复数据转化为String,能够有效的减少内存使用. 优化
1.使用mapPartitions代替大部分map操作或者连续使用的map操作
这里需要稍微讲一下RDD和DataFrame的区别。RDD强调的是不可变对象每个RDD都是不可变的当调用RDD的map类型操作的时候都是产生一个新的对象这就导致了一个问题如果对一个RDD调用大量的map类型操作的话每个map操作会产生一个到多个RDD对象这虽然不一定会导致内存溢出但是会产生大量的中间数据增加了gc操作。另外RDD在调用action操作的时候会出发Stage的划分但是在每个Stage内部可优化的部分是不会进行优化的例如rdd.map(1).map(1)这个操作在数值型RDD中是等价于rdd.map(_2)的但是RDD内部不会对这个过程进行优化。DataFrame则不同DataFrame由于有类型信息所以是可变的并且在可以使用sql的程序中都有除了解释器外都会有一个sql优化器DataFrame也不例外有一个优化器Catalyst具体介绍看后面参考的文章。
上面说到的这些RDD的弊端有一部分就可以使用mapPartitions进行优化mapPartitions可以同时替代rdd.map,rdd.filter,rdd.flatMap的作用所以在长操作中可以在mapPartitons中将RDD大量的操作写在一起避免产生大量的中间rdd对象另外是mapPartitions在一个partition中可以复用可变类型这也能够避免频繁的创建新对象。使用mapPartitions的弊端就是牺牲了代码的易读性。
2.broadcast join和普通join
在大数据分布式系统中大量数据的移动对性能的影响也是巨大的。基于这个思想在两个RDD进行join操作的时候如果其中一个RDD相对小很多可以将小的RDD进行collect操作然后设置为broadcast变量这样做之后另一个RDD就可以使用map操作进行join这样能够有效的减少相对大很多的那个RDD的数据移动。
3.先filter在join
这个就是谓词下推这个很显然filter之后再joinshuffle的数据量会减少这里提一点是spark-sql的优化器已经对这部分有优化了不需要用户显示的操作个人实现rdd的计算的时候需要注意这个。
4.partitonBy优化
这一部分在另一篇文章《spark partitioner使用技巧 》有详细介绍这里不说了。
5.combineByKey的使用
这个操作在Map-Reduce中也有这里举个例子rdd.groupByKey().mapValue(_.sum)比rdd.reduceByKey的效率低
combineByKey的过程减少了shuffle的数据量下面的没有。combineByKey是key-value型rdd自带的API可以直接使用。
6.内存不足时的优化
在内存不足的使用使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache(): rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的在内存不足的时候rdd.cache()的数据会丢失再次使用的时候会重算而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘避免重算只是消耗点IO时间。
7.在spark使用hbase的时候spark和hbase搭建在同一个集群
在spark结合hbase的使用中spark和hbase最好搭建在同一个集群上上或者spark的集群节点能够覆盖hbase的所有节点。hbase中的数据存储在HFile中通常单个HFile都会比较大另外Spark在读取Hbase的数据的时候不是按照一个HFile对应一个RDD的分区而是一个region对应一个RDD分区。所以在Spark读取Hbase的数据时通常单个RDD都会比较大如果不是搭建在同一个集群数据移动会耗费很多的时间。
参数优化部分
8.spark.driver.memory (default 1g)
这个参数用来设置Driver的内存。在Spark程序中SparkContextDAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行如果用户自己写的程序有过多的步骤切分出过多的Stage这部分信息消耗的是Driver的内存这个时候就需要调大Driver的内存。
9.spark.rdd.compress (default false)
这个参数在内存吃紧的时候又需要persist数据有良好的性能就可以设置这个参数为true这样在使用persist(StorageLevel.MEMORY_ONLY_SER)的时候就能够压缩内存中的rdd数据。减少内存消耗就是在使用的时候会占用CPU的解压时间。
10.spark.serializer (default org.apache.spark.serializer.JavaSerializer )
建议设置为 org.apache.spark.serializer.KryoSerializer因为KryoSerializer比JavaSerializer快但是有可能会有些Object会序列化失败这个时候就需要显示的对序列化失败的类进行KryoSerializer的注册这个时候要配置spark.kryo.registrator参数或者使用参照如下代码 valconfnewSparkConf().setMaster(…).setAppName(…) conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2])) valsc newSparkContext(conf)
11.spark.memory.storageFraction (default 0.5)
这个参数设置内存表示 Executor内存中 storage/(storageexecution)虽然spark-1.6.0的版本内存storage和execution的内存已经是可以互相借用的了但是借用和赎回也是需要消耗性能的所以如果明知道程序中storage是多是少就可以调节一下这个参数。
12.spark.locality.wait (default 3s)
spark中有4中本地化执行levelPROCESS_LOCAL-NODE_LOCAL-RACK_LOCAL-ANY,一个task执行完等待spark.locality.wait时间如果第一次等待PROCESS的Task到达如果没有等待任务的等级下调到NODE再等待spark.locality.wait时间依次类推直到ANY。分布式系统是否能够很好的执行本地文件对性能的影响也是很大的。如果RDD的每个分区数据比较多每个分区处理时间过长就应该把 spark.locality.wait 适当调大一点让Task能够有更多的时间等待本地数据。特别是在使用persist或者cache后这两个操作过后在本地机器调用内存中保存的数据效率会很高但是如果需要跨机器传输内存中的数据效率就会很低。
13.spark.speculation (default false)
一个大的集群中每个节点的性能会有差异spark.speculation这个参数表示空闲的资源节点会不会尝试执行还在运行并且运行时间过长的Task避免单个节点运行速度过慢导致整个任务卡在一个节点上。这个参数最好设置为true。与之相配合可以一起设置的参数有spark.speculation.×开头的参数。参考中有文章详细说明这个参数。
参考