微信公众号链接网站怎么做,网站的方案,科技节小发明小制作,网站创建免费用户Spark中常见的两种数据倾斜现象如下
stage部分task执行特别慢 一般情况下是某个task处理的数据量远大于其他task处理的数据量#xff0c;当然也不排除是程序代码没有冗余#xff0c;异常数据导致程序运行异常。
作业重试多次某几个task总会失败
常见的退出码143、53、137…Spark中常见的两种数据倾斜现象如下
stage部分task执行特别慢 一般情况下是某个task处理的数据量远大于其他task处理的数据量当然也不排除是程序代码没有冗余异常数据导致程序运行异常。
作业重试多次某几个task总会失败
常见的退出码143、53、137、52以及heartbeat timed out异常通常可认为是executor内存被打满。
RDD调优方法
查看数据分布 Spark Core中shuffle算子出现数据倾斜时可在Spark作业中加入查看key分布的代码也可以将代码拆解出来使用spark-shell做测试
val rdd sc.parallelize(Array(hello, hello, hello, hi)).map((_,1))// 数据量较少
rdd.reduceByKey(_ _)
.sortBy(_._2, false)
.take(20)
// 数据量较大, 用sample采样后在统计
rdd.sample(false, 0.1)
.reduceByKey(__)
.sortBy(_._2, false)
.take(20)调整shuffle并行度 原理Spark在做shuffle时默认使用HashPartitioner非Hash Shuffle对数据进行分区。如果并行度设置的不合适如比较小可能造成大量不相同的key对应的数据被分配到了同一个task上造成该task所处理的数据远大于其它task从而造成数据倾斜
调优建议
使用spark.default.parallelism调整分区数默认值200建议500或更大在shuffle的算子上直接设置分区数如a.join(b, 500)、rdd.reduceByKey(_ _, 500)
reduce join转map join 原理不使用join算子直接进行连接操作而使用broadcast变量与map类算子实现join操作进而完全规避掉shuffle类的操作彻底避免数据倾斜的出现
调优建议
broadcast的数据量不要超过500M, 过大driver/executor可能会oom
// 1.broadcast小表
val rdd1Broadcast sc.broadcast(rdd1.collect())
// 2.map join
rdd2.map { x val rdd1DataMap rdd1Broadcast.value.toMaprdd1DataMap.get(x._1) match {case Some(v) (x._1, (x._2, v))case None (x._1, (x._2, null))}
}
// 2.或者直接
rdd2.join(rdd1Broadcast)分拆join在union 原理将有数据倾斜的RDD1中倾斜key对应的数据集单独抽取出来加盐随机前缀另外一个RDD2每条数据分别与所有的随机前缀结合形成新的RDD相当于将其数据增到到原来的N倍N即为随机前缀的总个数然后将二者join之后去掉前缀然后将不包含倾斜key的剩余数据进行join最后将两次join的结果集通过union合并即可得到全部join结果。
调优建议
// 1.统计数量最大的key
val skewedKeySet rdd1.sample(false, 0.2).reduceByKey(_ _).sortBy(_._2, false).take(10).map(x x._1).toSet// 2.拆分异常的rdd, 倾斜key加上随机数
val rdd1_1 rdd1.filter(x skewedKeySet.contains(x._1)).map { x val prefix scala.util.Random.nextInt(10).toString(s${prefix}_${x._1}, x._2)
}
val rdd1_2 rdd1.filter(x !skewedKeySet.contains(x._1))// 3.正常rdd存在倾斜key的部分进行膨胀
val rdd2_1 rdd2.filter(x skewedKeySet.contains(x._1)).flatMap { x val list 0 until 10list.map(i (s${i}_${x._1}, x._2))}val rdd2_2 rdd2.filter(x !skewedKeySet.contains(x._1))// 4.倾斜key的rdd进行join
val skewedRDD rdd1_1.join(rdd2_1).map(x (x._1.split(_)(1), x._2))
// 5.普通key的rdd进行join
val sampleRDD rdd1_2.join(rdd2_2)
// 6.结果union
skewedRDD.union(sampleRDD)SQL调优方法
查看数据分布 统计某个查询结果或表中出现次数超过200次的key
WITH a AS (${query})
SELECT ks
FROM (SELECT ${key} AS kcount(*) AS sFROM aGROUP BY ${key}
)
WHERE s 200自动调整shuffle并行度 原理自适应执行开启的前提下AQE假设我们设置的shuffle partition个数为5在map stage结束之后我们知道每一个partition的大小分别是70MB30MB20MB10MB和50MB。假设我们设置每一个reducer处理的目标数据量是64MB那么在运行时我们可以实际使用3个reducer。第一个reducer处理partition 0 (70MB)第二个reducer处理连续的partition 1 到3共60MB第三个reducer处理partition 4 (50MB)
Spark参数
参数说明推荐值spark.sql.adaptive.enabled开启自适应执行线上默认值truespark.sql.adaptive.coalescePartitions.minPartitionNum自适应执行中使用的最小shuffle后分区数默认值executor*core数无spark.sql.adaptive.coalescePartitions.initialPartitionNum合并前的初始shuffle分区数量默认值spark.sql.shuffle.partitions无spark.sql.adaptive.advisoryPartitionSizeInBytes合并小分区到建议的目标值, 默认256m无spark.sql.shuffle.partitionsjoin等操作分区数默认值200推荐500或更大
自动优化Join 原理自适应执行开启的前提下AQE我们可以获得SortMergeJoin两个子stage的数据量在满足条件的情况下即一张表小于broadcast阈值可以将SortMergeJoin转化成BroadcastHashJoin
参数说明推荐值spark.sql.adaptive.enabled开启自适应执行线上默认值truespark.sql.autoBroadcastJoinThreshold默认10M设置为-1可以禁用广播实际根据hive表存储的统计信息或文件预估大小与此值做判断看是否做broadcast由于文件是压缩格式一般情况下此参数并不可靠建议膨胀系数spark.sql.sources.fileCompressionFactor10推荐此参数保持默认调整自适应的broadcast参数spark.sql.adaptive.autoBroadcastJoinThreshold此参数仅影响自适应执行阶段join优化时broadcast阈值设置为-1可以禁用广播默认值spark.sql.autoBroadcastJoinThreshold自适应执行得到的数据比较准确driver内存足够的前提下可以将此值调大如200M
自动处理数据倾斜 原理自适应执行开启的前提下AQE我们可以在运行时很容易地检测出有数据倾斜的partition。当执行某个stage时我们收集该stage每个mapper 的shuffle数据大小和记录条数。如果某一个partition的数据量或者记录条数超过中位数的N倍并且大于某个预先配置的阈值我们就认为这是一个数据倾斜的partition需要进行特殊的处理
参数说明推荐值spark.sql.adaptive.enabled开启自适应执行线上默认值truespark.sql.adaptive.skewJoin.enabled开启自动解决数据倾斜默认值true无spark.sql.adaptive.skewJoin.skewedPartitionFactor影响因子某分区数据大小超过所有分区中位数与影响因子乘积才会被认为发生了数据倾斜无spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes视为倾斜分区的分区数据最小值无