网站服务器续费,便宜的域名购买,小公司网站,多个网站做计划以下参数中有sql字眼的一般只有spark-sql模块生效#xff0c;如果你看过spark的源码#xff0c;你会发现sql模块是在core模块上硬生生干了一层#xff0c;所以反过来spark-sql可以复用core模块的配置#xff0c;例外的时候会另行说明#xff0c;此外由于总结这些参数是在不…以下参数中有sql字眼的一般只有spark-sql模块生效如果你看过spark的源码你会发现sql模块是在core模块上硬生生干了一层所以反过来spark-sql可以复用core模块的配置例外的时候会另行说明此外由于总结这些参数是在不同时间段当时使用的spark版本也不一样因此要注意是否有效验证的方法也很简单配置之后如果当前版本有效你就可以在任务ui的Environment参数列表中查到如果本博主已经踩了坑的会直接说明。看完之后如果有core模块优化参数不多的感觉无需自扰因为core模块的开发本身就是80%依赖代码级优化实现的比如rdd集的分区拆分、加盐、转换等等都是在代码级别完成的而不是任务提交参数。
1、任务使用资源限制基本参数注意的是这些资源配置有spark前缀是因为他们是标准的conf配置也就是submit脚本你调用--conf参数写的和--driver.memory这种属于不同的优先级--driver.memory这种优先级比它高对于spark来讲数据量和计算量是两个不同的概念计算任务本身不止有单一的MR架构那样一个map一个reduce的直白执行逻辑还有很多复杂的任务task所以随着执行计划的不同往往计算量要大于数据量而且这个差距是成正比的放大要使用的计算资源也更多除了计算任务本身还有伴随计算产生的附加消耗因此往往1G的数据要付出3G的计算资源甚至更多在具体计算的时候使用多少资源就需要经验了不过初学者可以参考一个公式min计算数据大小/容器内存20%左右的预留容器个数任务开始执行后配合其他参数可支持并行task最多时的容器个数、driver的内存(spark.driver.memory)永远大于driver可收集数据集大小(spark.driver.maxResultSize) 、单容器的内存和核数的比例是10:1、单容器的大小不应太大一般在6C/60G左右就最多了
在容器资源的估算上除了上面提到的用数据量来估算也有的地方使用总核数来估算也就是用总量可用有多少再除以其他的相关数据值这种情况是因为拥有的集群资源不多没有办法支撑任务完全自主扩张虽然两种估算方法最后的目的是一样的但是后者对于大任务所需资源来讲肯定会影响任务的运行具体使用的时候看情况而定即可
在具体配置的时候容器个数最好交给动态延展去处理这样不会造成在启动容器量和在计算数据量上的不协调除非你容器设置的本身不够。当你的任务特别大大到超过了容器读写性能的瓶颈再考虑用num的方式直接指定定额的容器个数因为随着不同集群性能的影响过大的任务在容器动态延展上会很吃力任务会不稳定。
至于一个集群的性能读写瓶颈如果你能拿到当前集群的冒烟测试结果那是最好的但是越大的集群冒烟测试越不好做所以除非是私有云的小项目否则一般很难拿到此时最直观的观察点就是这个集群的shuffer额度在保证任务跑通的情况下一个集群能容纳的shuffer量越高他的读写瓶颈就越高本作者最高操作过读写瓶颈在10Tshuffer下保证任务运行10t以上就不太好说了
spark.driver.memory20G #applicationmaster启动的driver进程占用内存
spark.driver.cores4 #applicationmaster启动的driver进程占用核数
spark.executor.cores4 #容器占用核数
spark.executor.memory40G #容器占用的内存数
spark.num.executors10 #任务用到的总容器数2、限制sql任务运行时拉取的分区数和拉去文件总大小的上线
spark.sql.watchdog.maxPartitions2000
spark.sql.watchdog.maxFileSize3t3、文件聚合文件聚合的阈值会参照分区大小决定就是说去设置AQE
spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabledtrue #是否对不发生shuffer的stage做聚合
spark.sql.optimizer.insertRepartitionBeforeWrite.enabledtrue #是否在写入文件之间聚合
spark.sql.optimizer.finalStageConfigIsolation.enabledtrue #最后任务的最后阶段文件聚合会有一个落盘前聚合的执行计划但是注意上面这三个文件聚合不是Apache原生版本的参数是kyuubi的我写在这里是要告诉大家开源社区对离线文件聚合这方面只做了AQE的其他可观测到的参数都是某个单独类库用的比如spark.files.maxPartitionBytes这个配置其他文献会告诉你它可以修改离线处理的分区大小但是它只在BinaryFileRDD 这个RDD才用到这个参数下面是spark源码 因此对于Apache原生版本来讲处理小文件聚合的最可靠办法就是用AQE在这一点上由于被白嫖太多所以Apache原生团队就没继续做但是国内大厂有自己的开发基于已有的聚合代码做出了自己的东西比如阿里基础架构计算引擎3.2.1升级点概要 中提到的支持非动态分区支持合并小文件 诸如此类的参数各家引擎提供商都不一样因此如果你用的不是开源就问一下提供方是否有相关参数同时这也更加突出了开发的重要尤其是在这个AI满天飞的时代很多公共的姿势资源差距被拉平了所以大家努力共勉提升自己的技术水平还是很有必要的
4、任务最后阶段消耗资源多少通常是配合压缩和自适应分区的相关配置来做任务优化这两个参数同上也是kyuubi的放在这里是想告诉大家不同版本下的spark架构有着不同的特点
spark.sql.finalWriteStage.executorMemory10g
spark.sql.finalWriteStage.executorCores25、文件压缩
mapreduce.output.fileoutputformat.compresstrue #是否对任务的输出数据压缩
mapreduce.output.fileoutputformat.compress.codecorg.apache.hadoop.io.compress.GzipCodec #用到的压缩类
mapreduce.map.output.compresstrue #是否对map阶段的输出进行压缩
mapreduce.map.output.compress.codecorg.apache.hadoop.io.compress.GzipCodec #同上
hive.exec.compress.outputtrue #hive端数据压缩⭐️⭐️3.x的spark之后共用上面map的压缩策略但是2.x的版本有一个mapred.map.output.compress.codec使用的时候注意版本
spark.sql.parquet.compression.codecsnappy #如果表数据存储类型是parquet那就要另行制定压缩方式默认是snappy可以选择gzip、lzo、或者uncompressed不压缩6、sql任务的shuffer分为两个阶段第一阶段叫shuffer-read第二个阶段叫shuffer-write使用下面的配置可以更改read阶段的并行度但是这个配置的生效前提是其他算子在执行计划中的分区数失效了才使用它所以大多情况下没有作用
spark.sql.shuffle.partitions2007、sql任务自适应分区数查询AQE注意同时配置了AQE的合并分区相关和倾斜时会先合并再调整倾斜设计到的分区大小建议50300M
spark.sql.adaptive.enabledtrue # 开启aqe
spark.sql.adaptive.forceApplytrue #强制开启AQE一般不带这个参数当发现aqe的效果不明显的时候再用
spark.sql.adaptive.logLevelinfo #aqe的日志级别一般保持默认不用改
spark.sql.adaptive.coalescePartitions.enabledtrue # 自动合并分区
spark.sql.adaptive.coalescePartitions.initialPartitionNum100 # 初始的分区数。默认为spark.sql.shuffle.partitions的值
spark.sql.adaptive.coalescePartitions.minPartitionNum1 # 最小的分区数。默认为spark.sql.shuffle.partitions的值一般不另行配置
spark.sql.adaptive.coalescePartitions.maxPartitionNum1 # 最大的分区数。AQE的常用配置一般不要太大
spark.sql.adaptive.advisoryPartitionSizeInBytes128M # 每个分区建议大小默认单位字节
spark.sql.adaptive.shuffle.targetPostShuffleInputSize128M #设置shuffer阶段后下一阶段任务输入预期的数据大小一般不另行配置
spark.sql.adaptive.fetchShuffleBlocksInBatchtrue #默认是true当获取连续的shuffle分区时对于同一个map的shuffle block可以批量获取而不是一个接一个的获取来提升io提升性能
spark.sql.adaptive.localShuffleReader.enabledtrue #允许在自适应时采用本地进程优化shuffer分险是如果报错这部分日志无法聚合到yarn
spark.sql.adaptive.skewJoin.enabledtrue # 开启join时的数据倾斜检测
spark.sql.adaptive.skewJoin.skewedPartitionFactor5 # 默认5当某个分区大小大于所有分区的大小中间值5倍就打散数据
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256M #通过直接指定分区大小的阈值来决定是否打散分区 默认256M和上面的参数一起生效用来因对不同的情况
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin0.2 #参与join的表非空分区对于整体任务而言占比小于该比例那么该表不会被作为广播表去使用默认0.2一般不改因为通常广播能力是禁用掉的广播会非常耗driver的内存尤其是在TB级数据处理中随意广播是一个比较危险的操作
spark.sql.autoBroadcastJoinThreshold-1 #这个配置通常保持-1它是指给一个字节大小小于这个字节的表都会在join操作时被广播-1表示禁用广播能力spark.sql.mergesmallfilesize256M #分区建议大小 ⭐️⭐️注意这个配置在3.x之后废弃2.x需要试一下你用的版本对于AQE的分区数一定要知道不是说你设置多少web页面上就能直观的体现出来多少执行计划上是以task的形式展示执行计划的而task和分区是两个东西因此你要有那种感觉来调整分区数这种感觉只能靠经验去喂就和神枪手一样。
而且AQE的coalescePartitions对数据集倾斜的作用很明显但是对join是发生的热点key倾斜就不太有力了所以如果有join倾斜一定要开启skewJoin
8、任务容器的动态伸缩建议一般情况下不要使用直接指定而是尽量使用动态扩容来规定任务的容器个数因为直接指定时很容易造成资源的倾斜除非你的任务特别大这个时候动态扩容的能力会成为累赘至于如何判断你可以在任务启动之后spark页面的excutor页面看到实际启动了多少个容器如果total个数和单个容器使用资源综合考虑后的结果反馈出任务最终使用的计算资源超过了有预留后的总资源或者该任务的shuffer无论你如何调整它的shuffer大小都较高甚至逼近当前集群的shuffer能力上限这种情况就不建议用动态容器扩展了
spark.dynamicAllocation.enabledtrue # 开启动态收缩容器资源默认false
spark.dynamicAllocation.shuffleTracking.enabledtrue # shuffle动态跟踪默认true
spark.dynamicAllocation.initialExecutors10 # 初始化申请资源
spark.dynamicAllocation.maxExecutors100 # 最大容器个数
spark.dynamicAllocation.minExecutors10 # 最小容器个数
spark.dynamicAllocation.executorAllocationRatio1 # 这个用来设置动态容器资源模式下任务可尝试的最多资源占比默认为1本身是个浮点数值也就是0.1到一般不另行配置9、是否对分区做动态裁剪默认true这个配置一般不关它目的就是优化执行开启后你可以在spark的任务web界面看到有的执行计划就被skip了当然skip不全是因为它容器的动态伸缩和自适应分区数也会造成。
spark.sql.optimizer.dynamicPartitionPruning.enabledtrue10、spark-sql提供了Hint需要你去查看官方文档https://spark.apache.org/docs/3.5.2/sql-ref-syntax-qry-select-hints.html#content看的时候注意你用的版本这个就是在写sql的时候加入建议的执行计划比如当你希望sql执行的时候直接指定希望的分区数你可以写成如下的格式但是这种方式其实就是嫌少了用户使用时的代码量一般用的不多
SELECT /* COALESCE(3) */ * FROM t;11、core任务中rdd的默认分区数这个配置一般不直接在任务外配置有需要的话调用算子的parallelism方法了
spark.default.parallelism1012、存储内存占用比例这个配置越大留给shuffer和计算本身的内存就越少反之越小跑任务的时候数据暂时落盘的次数就越频繁默认值0.5
spark.memory.storageFraction0.5对于落盘的阈值在整个spark中有个spark.reducer.maxReqSizeShuffleToMem参数用来直接用数据大小来控制落盘时机但是该参数变动很频繁不同版本名称也不一样所以一般不用这里的落盘是指计算过程中随着计算任务的output输出或者input来的数据太多使得容器内部用来存储数据的那部分堆内存到了上限之后数据就会被暂时写在文件里面当恢复平衡之后会逐步再读取出来处理
13、下面22点修改kryo序列化后还可以更改使用的缓存大小这个配置是当driver调用collect等算子回收大量数据到driver端可能会抛buffer limit exceeded异常这个时候就要调大该参数
spark.kryoserializer.buffer64k14、第13点设置的是缓存大小这个配置设置的是driver收集数据使用的内存资源最大是多少默认1g0表示不限制
spark.driver.maxResultSize1g15、下一个数据块定位次数在数据落盘的时候如果网络延迟等极端原因会导致driver定位数据块写入位置时收不到任何datanode的回馈这个时候可以尝试调大这个值一般不会遇到博主只遇到过一次出现问题的时候会抛出Unable to close file because the last block does not have enough number of replicas异常对应的bug在spark2.7.4已修复这个配置的默认值是5挂了就设置为6
dfs.client.block.write.locateFollowingBlock.retries516、shuffle write时会先写到BufferedOutputStream缓冲区中然后再写到磁盘该参数就是缓存区大小默认32k建议设置为64k这个配置是数据量不较大的时候减少一些系列化次数和让小文件聚合异曲同工设置的时候注意要和17平衡
spark.shuffle.file.buffer32k17、shuffle溢写磁盘过程中需要将数据序列化和反序列化这个参数是一个批次处理的条数默认是10000需要的话调大该值2万5万都可以但是一定要成比例的设置16的配置值
spark.shuffle.spill.batchSize1000018、shuffle read拉取数据时由于网络异常或者gc导致拉取失败会自动重试改参数就是配置重试次数在数据量达到十亿、百亿级别的时候最好调大该参数以增加稳定性默认是3次建议设置为10到20。
spark.shuffle.io.maxRetries319、该参数是 spark.shuffle.io.maxRetries的重试间隔默认是0.5s。
spark.shuffle.io.retryWait50020、shuffle read拉取数据时的缓存区大小也就是一次拉取的数据大小默认64计算单位是M要注意的它是从n个节点上一次总共拉取64M数据而不是从单个节点获取64M。并且它拉取数据时并行的发送n个请求每个请求拉取的最大长度是 64M / n但是实际拉取时都是以block为最小单位的所以实际获取的有可能会大于64M / n。所以这个配置就有点迷属于理论上不行但实际由于block大小而不得不行的配置
spark.reducer.maxSizeInFlight64在你对上面缓存区的大小做修改的时候不要设置的太大因为要考虑下面的这个配置
spark.reducer.maxReqsInFlightInt.MaxValue该配置用来限制每个批次拉数据时能够发出的请求数上限默认是scala中Int类型的最大值一般不另行改动但是如果你缓存区大小设置的不合理或者碰上任务生产的中间文件普遍不大造成spark为了靠近你设置的缓存区大小文件请求一次性发出去很多这就会造成大量的网络IO导致任务失败遇到这种情况要先使用文件聚合然后考虑AQE、最后调整任务资源因为前面两个对资源的消耗是有一定的影响的总之再次就是想告诉你有这种顾虑存在至于这个上限限制一般不改
21、spark允许你限制每个reduce任务能够对执行计划中的某个datanode上获取最多多少个数据块不过一般遇不到改的情况和上面缓存区面临的请求数一样是一个要知道的概念
spark.reducer.maxBlocksInFlightPerAddressInt.MaxValue注意:列出18-21的配置是为了引对一种极端情况如果你的上游开发不是你但是上游表生成了巨量的小文件导致你的任务在执行计划中看到的情况明显具体计算参与率很低大量的开销都耗在了拉数据和数据倾斜上同时伴随着网络套接字断开的问题联系上游人家不鸟你那你只能注意文件聚合、调整任务资源、配置数据倾斜之外在把拉取的数据批次大小放低重试和重试间隔放大最后就阿弥陀佛吧 22、修改系列化方式这里的序列化是针对shuffle、广播和rdd cache的序列化方式默认使用java的序列化方式org.apache.spark.serializer.JavaSerializer性能比较低所以一般都使用org.apache.spark.serializer.KryoSerializer 至于spark task的序列化由参数spark.closure.serializer配置目前只支持JavaSerializer。
spark.serializerorg.apache.spark.serializer.KryoSerializer23、如果你的数据类型是Parquet且使用spark计算引擎处理hive数据要注意这个配置用来决定是否采用spark自己的Serde来解析Parquet文件Spark SQL为了更好的性能在读取hive metastore创建的parquet文件时会采用自己Parquet Serde而不是采用hive的Parquet Serde来序列化和反序列化由于两者底层实现差异比较大所以很容易造成null值和decimal精度问题默认为true设为false即可(会采用与hive相同的Serde)。
spark.sql.hive.convertMetastoreParquetfalse当你操作spark要对hive表的Parquet类型数据写入的时候一定要注意下面的配置。
spark.sql.parquet.writeLegacyFormattrue这个参数用来决定是否使用hive的方式来写Parquet文件这是由于对数据精度的把控上两个计算框架不一样比如decimal精度上两者的实现就有差别导致hive读取spark创建的Parquet文件会报错在hive中decimal类型是固定的用int32来表示而标准的parquet规范约定根据精度的不同会采用int32和int64来存储而spark就是采用的标准的parquet格式所以对于精度不同decimal的底层的存储类型有变化所以使用spark存储的parquet文件在使用hive读取时报错因此要将spark.sql.parquet.writeLegacyFormat(默认false)配置设为true即采用与hive相同的format类来写parquet文件
24、和上面的Parquet一样orc数据spark和hive的底层实现也不太一样因此如果你用spark处理hive的orc数据要注意下面的配置
spark.sql.hive.convertMetaStoreOrcfalse上面这个配置用来决定spark读取orc的时候是否转换成spark内部的Parquet格式的表如果你的orc数据来自于hive就要设置为false如果为true发生兼容性问题的概率很大
orc.force.positional.evolutiontrue上面这个配置决定spark读取orc时是否强制解析orc数据这里的强制说的是由于orc是列式存储在不同版本之间很容易发生字段底层存储的顺序不同或其他不兼容问题为true时意味着强制解析这些数据为数据分析提供了一定的兼容性保证。一种常见的需要强制解析场景就是当你对orc格式的表修改了字段名或者增加列并且你没有刷新数据的话不强制解析的情况下select出来的列名就是数据里面真实存储的字段名也就是原来的字段名
spark.hadoop.hive.exec.orc.split.strategyBI上面这个配置是用来决定spark读取orc文件时的切分策略有三种可选值分别为BI(文件级别)、ETL(stripe条带级别)、HYBRID(混合默认)在网上能找到的其他文献中说这个配置的默认值是BI但本博主在使用spark3.2.1的时候遇到了一次读取orc数据报数组下标越界问题规避了空值影响之后发现ETL模式在参与计算的数据切片较大时不太稳定而在spark3.2.1的源码里面在读取orc数据时数据切分用的是混合模式因此在发生同样问题的时候直接指定BI就行注意这个问题如果大家遇到了那如果上游任务数据生成时的文件切片数可以放大让每个文件的大小缩小也是可以解决的
hive.exec.orc.default.stripe.size67108864上面这个配置用来决定spark读取orc文件时混合切分策略的阈值默认是256MB如果你任然向使用混合模式哪就调小至64M。 ⭐️⭐️⭐️但是要注意这个参数在3.x之后失效了去更改分区大小来控制
spark.sql.orc.implhive上面该配置决定spark用那种方式写入orc数据默认是native即内置的类库但是如果数要流向hive就要配置成hive
hive.merge.orcfile.stripe.leveltrue上面这个参数用来控制orc数据写入时进行的合并策略为true使用stripe级别进行而当该参数设置为false时合并操作会在文件级别进行。这种合并操作是通过启动一个独立的map-reduce任务来实现的旨在优化存储和提升查询效率。具体来说ORC文件的合并有助于减少小文件的数量从而避免因大量小文件而导致的处理效率低下问题。此外ORC文件的索引结构如行组级别、条带级别的索引使得查询时能够快速定位到需要的数据块避免了不必要的磁盘和网络I/O进而提高了查询效率。 这种配置对于处理大量小文件特别有用尤其是在数据仓库环境中可以显著提升数据处理的速度和效率。通过合理地配置这个参数可以根据具体的业务需求和数据特点优化Hive处理ORC文件的策略从而达到更好的性能表现。
注意操作orc和Parquet格式时一定要合理的带上上面的参数否则轻则数据精度丢失重则不可识别数据文件导致任务在读取文件阶段就失败了
25、任务重试次数这个配置在工作中不同的开发版技术环境是不同的值如果有需要可以更改默认的原生栈是2
spark.yarn.maxAppAttempts226、有的时候spark任务当你不太愿意给再多的资源时候但任务缺失由于数据太多比如数据块拉去比较耗时之类的会触发任务超时这个时候你可以设置下面的两个参数把超时时间延长或者设置为0不预防超时
spark.network.timeout600s #网络超时时间默认单位毫秒默认值10000
spark.task.timeout100 #task任务超时时间默认单位秒默认值327、推测执行这个配置和hive任务的推测执行一样的目的启动另一个相同的task并行那个先成功就用那个另一个关闭通常情况下为了任务的稳步运行和资源的择优要确保是关闭的但如果集群部分节点的状态不佳导致任务执行缓慢等就开启这个配置开启后再执行任务会看到job有skip的就是一个成功后另一个暂停的表现
spark.speculationfalse28、这个参数是容器和driver的心跳机制间隔
spark.executor.heartbeatInterval20s #默认单位毫秒默认值在 Spark 1.6 版本之前默认值是 10000 在 1.6 及之后的版本中默认值是 300029、如果你的sql要使用模糊匹配字符涉及了字母的大小写你可以设置下面这个参数为false忽略大小写这样就不需要LOWER或UPPER函数了
spark.sql.caseSensitivefalse30、在3.x之前sparksql操作数据是不能向一个非空路径下建表并写数据的会抛出运行异常而3.x之后可以但是这需要注意写入数据和已有元数据不一致的风险通过设置下面的参数实现
spark.sql.legacy.allowNonEmptyLocationInCTAStrue31、当select的表路径下存在非数据文件的路径时在数据分析的时候会报错这个时候需要用下面的参数让spark强制对目录递归读取
mapred.input.dir.recursivetrue;
mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirstrue;
mapreduce.input.fileinputformat.input.dir.recursivetrue;32、关闭sql计算时的全段代码融合能力默认值是true在sql计算的时候如果你时常关注任务的web执行计划你会发现spark的sql架构常常会将多个有关联可以并行的执行计划融合成一个阶段执行这种能力可以优化内部算子的调用开销但是在大任务处理的时候会发生编译错误因此如果你的任务很大那么最好把这个能力关掉虽然这会影响一些任务的性能和增加一些计算开销
spark.sql.codegen.wholeStagefalse
spark.sql.codegen.aggregate.map.twolevel.enabledfalse33、节点黑名单该功能在spark2.2开始支持这些配置通常可以被配置在spark-defaults.conf中做为默认参数存在这是一种保障机制当你的集群中存在某些节点状态异常你可以配置黑名单的方式使得调度器开始记录任务失败情况达到一个阈值的时候尽量不再向该节点上运行task注意是尽量除非达到黑名单阈值的上限并且当你有黑名单能力需求的时候通常会一起打开推测执行。不过要注意的是spark的节点黑名单最小的管控单位是以执行器为单位的也就是executor这就导致很容易出现一种情况就是同一节点重启了一个executor这种时候就会越过executor的黑名单拦截不过spark还提供了对executor失败使得节点进入黑名单的设置
spark.blacklist.enabled
这个参数用于启用或禁用节点黑名单功能。如果启用Spark会记录任务失败的节点并尝试避免在这些节点上重新调度任务。默认值是false。spark.blacklist.application.maxFailedTasksPerExecutor
这个参数定义了当前任务单个executor上允许失败的task数量。当一个executor上失败task数量达到这个阈值时该executor会被列入黑名单。默认值通常是2。spark.blacklist.application.maxFailedExecutorsPerNode
这个参数定义了一个节点上允许失败的执行器数量。当一个节点上的失败执行器数量达到这个阈值时该节点会被列入黑名单。默认值通常是2。
执行器失败一般很少发生因为执行器的失败虽然和task失败的原因很多是一样的但是执行器本身只是负责对task监工的进程所以失败的概率甚少除非是服务器本身存在故障这种硬性错误而向OOM等这种软性错误一般不发生------------上面这两个是当一个任务所有task执行中阈值以上的失败发生则对应发生失败的执行器或者失败执行器所在节点进入黑名单spark.blacklist.stage.maxFailedTasksPerExecutor
这个参数定义了一个阶段stage中一个执行器上允许失败的任务数量。当一个执行器在当前阶段中的失败任务数量达到这个阈值时该执行器会被列入黑名单。默认值通常是2。spark.blacklist.stage.maxFailedExecutorsPerNode
这个参数定义了一个阶段中一个节点上允许失败的执行器数量。当一个节点在当前阶段中的失败执行器数量达到这个阈值时该节点会被列入黑名单。默认值通常是2。-------------上面这两个和开头两个的区别是生效范围在一个stage中也就是单个的taskset中spark.blacklist.task.maxTaskAttemptsPerExecutor
这个参数定义了一个执行器上尝试执行一个task的最大次数。当一个task在一个执行器上的尝试次数达到这个阈值时仍然无法正常执行该执行器会被列入黑名单。默认值通常是1。注意这个参数的生效前提是设置的值小于等于执行器或节点进入黑名单的阈值并且这个配置是指单个的task原地重试spark.blacklist.task.maxTaskAttemptsPerNode
这个参数定义了一个节点上尝试执行一个任务的最大次数。当一个任务在一个节点上的尝试次数达到这个阈值时该节点会被列入黑名单。默认值通常是2。这个配置和上面的spark.blacklist.task.maxTaskAttemptsPerExecutor一样都是对task原地重试不同的是重试放在了同节点的其他执行器上生效前提也是小于等于上面的节点配置--------------上面这两个在正常配置的时候往往会使得进入黑名单的阈值小于开头的4个配置spark.blacklist.timeout
这个参数定义了一个节点被列入黑名单的时间。超过这个时间后节点会被自动移出黑名单。默认值通常是-1表示不会自动移出黑名单。配置的时候注意改配置单位是小时spark.blacklist.static.hostshost1,host2,host3
spark.blacklist.static.executorsexecutor1,executor2,executor3
静态黑名单列表这个配置指向的节点和执行器会始终在黑名单之中34、上面提到对于分区大小、分区数在sql模块上用的是aqe去把控。但是core模块只有从新调整分区数的方法而直接改动分区大小的方法在早起的版本中用到的是上面提到的spark.sql.mergesmallfilesize但是在后续的版本中官方可能是觉得两个维度调整在使用上有冲突所以将分区大小的直接控制融合在了conbiner算法中你可以在源码中看到这个方法使用上只需要通过修改分区数这一种方法来优化任务的执行就可以了具体的流程可以在web的执行计划上看到程序会经过conbiner。
总体上core模块中提供了五种不同情况下的分区个数修改方法
a、使用repartition()方法
repartition()方法可以用于重新分区RDD并返回一个新的RDD。
它允许用户指定新的分区数并会触发shuffle操作来重新分配数据。
示例代码newRDD rdd.repartition(numPartitions)其中numPartitions是新的分区数。b、使用coalesce()方法
coalesce()方法也可以用于调整RDD的分区数但它主要用于减少分区数以减少小任务的数量和降低调度开销。
与repartition()不同coalesce()在减少分区数时默认不会触发shuffle操作除非设置了shuffletrue这在处理大数据集时更为高效。
示例代码newRDD rdd.coalesce(numPartitions)其中numPartitions是新的分区数。c、在读取数据时指定分区数
当使用Spark的API如textFile()、parquetFile()等读取外部数据源时可以通过可选的参数minPartitions来指定最小的分区数。
这有助于在数据读取阶段就控制RDD的分区数以便后续处理。d、配置Spark的默认并行度
可以通过设置Spark配置参数spark.default.parallelism来指定默认的并行度即RDD的分区数。
这个参数会影响所有没有显式指定分区数的RDD操作。
示例配置val conf new SparkConf().setAppName(appName).setMaster(masterURL).set(spark.default.parallelism, numPartitions)。e、使用自定义分区器
对于需要基于特定逻辑进行分区的场景可以使用自定义分区器。
自定义分区器需要继承Partitioner类并实现numPartitions和getPartition方法。
使用自定义分区器可以精确地控制数据的分区方式以满足特定的业务需求。而至于sql模块的aqe在实际使用中会发现分区具体大小生效的优先级高于分区个数。
35、在spark3.x之后sql模块对底层处理时间格式化的类不在使用之前的simpledateformat因为simple它对时间数据的校验不严谨造成数据生成会携带后缀。但类库的变动导致用spark3.x去处理2.x的时间数据就会报错因此spark提供了如下配置在运行时修复此问题。但是效果不太理想最直接的方法是用substr函数截取对应位数的时间字符串数据
spark.sql.legacy.timeParserPolicyLEGACY