网站建设需求登记表 免费下载,新手运营从哪开始学,asp网站制作软件,企业查询app排行榜列裁剪与分区裁剪
在生产环境中#xff0c;会面临列很多或者数据量很大时#xff0c;如果使用select * 或者不指定分区进行全列或者全表扫描时效率很低。Hive在读取数据时#xff0c;可以只读取查询中所需要的列#xff0c;忽视其他的列#xff0c;这样做可以节省读取开销…列裁剪与分区裁剪
在生产环境中会面临列很多或者数据量很大时如果使用select * 或者不指定分区进行全列或者全表扫描时效率很低。Hive在读取数据时可以只读取查询中所需要的列忽视其他的列这样做可以节省读取开销中间表存储开销和数据整合开销
1.列裁剪在查询时只读取需要的列。避免select *
2.分区裁剪在查询中只读取需要的分区。
遵循一个原则尽量少的读入数据尽早地数据收敛
分组聚合优化
Map端聚合
Hive中未经优化的分组聚合是通过一个MapReduce Job实现的。Map端负责读取数据并按照分组字段分区通过Shuffle将数据发往Reduce端各组数据在Reduce端完成最终的聚合运算。
Hive对分组聚合的优化主要围绕着减少Shuffle数据量进行具体做法是map-side聚合。所谓map-side聚合在Hive的Map阶段开启预聚合先在Map阶段预聚合然后在Reduce阶段进行全局的聚合。map-side聚合能有效减少shuffle的数据量提高分组聚合运算的效率。
通俗理解假设有张8000w数据表聚合后30组数据有10个map若是没有开启分组聚合则会map将8000w条数据传给reduce开启分组聚合后就会每个map先进行分组10个map各有30组再将这30*10组数据从map传给reduce这样效率就会大大增加
map-side 聚合相关的参数如下 --启用map-side聚合一般默认开启 set hive.map.aggrtrue; 默认为开启状态 --用于检测源表数据是否适合进行map-side聚合。检测的方法是先对若干条数据进行map-side聚合若聚合后的条数和聚合前的条数比值小于该值则认为该表适合进行map-side聚合否则认为该表数据不适合进行map-side聚合后续数据便不再进行map-side聚合。 set hive.map.aggr.hash.min.reduction0.5; --用于检测源表是否适合map-side聚合的条数。 set hive.groupby.mapaggr.checkinterval100000;
默认为0.5和10w条
会先从大的数据表内先抽取10w数据进行检测判断看分组后的数据/10w是否在0.5以下若是则会启用map的分组聚合 --map-side聚合所用的hash table占用map task堆内存的最大比例若超出该值则会对hash table进行一次flush。 set hive.map.aggr.hash.force.flush.memory.threshold0.9;
优化案例
示例SQL hive (default) set hive.map.aggrtrue/false; explain select t1.province_id,count(*) as cnt from ds_hive.ch12_order_detail_orc t1 group by t1.province_id ;
开启分组聚合后执行时间26s左右 application_1716866155638_175450 关闭分组聚合后执行时间在60s左右效率提升了34s application_1716866155638_175451 由上图的详细执行过程分析可知开启map聚合后map输出--reduce接受的数据是340而关闭map分组聚合后map数据--reduce接受的数据是8000w条传输时间大大影响
大致的运行前后的步骤对比 Count Distinct 的优化
在Hive中DISTINCT关键字用于对查询结果进行去重以返回唯一的值。其主要作用是消除查询结果中的重复记录使得返回的结果集中每个值只出现一次。
具体而言当你在Hive中使用SELECT DISTINCT时系统会对指定的列或表达式进行去重操作。尽管Hive中的DISTINCT关键字对于去重查询是非常有用的但在某些情况下可能存在一些缺点性能开销、数据倾斜、内存需求等。
group by 操作的具体实现原理。 1.map阶段将group by后的字段组合作为一个key如果group by单个字段那么key就一个。将group by之后要进行的聚合操作字段作为值如果要进行count则value是赋1如要sum另一个字段那么value就是该字段。
2.shuffle阶段按照key的不同分发到不同的reducer。注意此时可能因为key分布不均匀而出现数据倾斜的问题。这个问题是我们处理数据倾斜比较常规的查找原因的方法之一也是我们解决数据倾斜的处理阶段。(当执行过程中出现其他任务都已完成持续等待一个reudce过程的时候就看出现了数据倾斜问题)
3.reduce阶段如果是count将相同key的值累加,如果是其他操作按需要的聚合操作得到结果。
distinct 的具体实现当执行Distinct操作时Hive会将操作转化为一个MapReduce作业并按照指定的列进行分组。在Map阶段每个Mapper会读取输入数据并将指定的列作为输出的key然后通过Shuffle过程将具有相同key的数据发送到同一个Reducer中。 当distinct一个字段时这里会将group by的字段和distinct的字段组合在一起作为map输出的keyvalue设置为1同时将group by的字段定为分区键这一步非常重要这样就可以将GroupBy字段作为reduce的key在reduce阶段利用mapreduce的排序输入天然就是按照组合key排好序的。根据分区键将记录分发到reduce端后按顺序取出组合键中的distinct字段这时distinct字段也是排好序的。依次遍历distinct字段每找到一个不同值计数器就自增1即可得到count distinct结果。
count(distinct)全局合操作的时候即使我们设定了reduce task的具体个数例如set mapred.reduce.tasks100hive最终也只会启动一个reducer。这就造成了所有map端传来的数据都在一个tasks中执行这唯一的Reduce Task需要Shuffle大量的数据并且进行排序聚合等处理这使得这个操作成为整个作业的IO和运算瓶颈。 针对上述说的问题我们可以修改对应的sql来进行优化 countgroup by 或者sumgroup by的方案来优化在第一阶段选出全部的非重复的字段id在第二阶段再对这些已消重的id进行计数
重到细粒度的日再聚合到粗粒度省份
(目前测试结果不能完全验证如上理论暂放确定后再更新) -- count(distinct) select count(distinct province_id) from ds_hive.ch12_order_detail_orc ; -- 优化版 count group by select count(product_id) from (select product_id from ds_hive.ch12_order_detail_orc group by product_id )t; 第一阶段我们可以通过增大Reduce的并发数并发处理Map输出。在第二阶段由于id已经消重因此COUNT(*)操作在Map阶段不需要输出原id数据只输出一个合并后的计数即可。这样即使第二阶段Hive强制指定一个Reduce Task的时候极少量的Map输出数据也不会使单一的Reduce Task成为瓶颈。
其实在实际运行时Hive还对这两阶段的作业做了额外的优化。它将第二个MapReduce作业Map中的Count过程移到了第一个作业的Reduce阶段。这样在第一阶Reduce就可以输出计数值而不是消重的全部id。这一优化大幅地减少了第一个作业的Reduce输出IO以及第二个作业Map的输入数据量。最终在同样的运行环境下优化后的语句可以说是大大提升了执行效率。
Join优化
Hive拥有多种join算法包括Common JoinMap JoinBucket Map JoinSort Merge Buckt Map Join等下面对每种join算法做简要说明
Common Join (完整进行map-reduce阶段)
Common Join是Hive中最稳定的join算法其通过一个MapReduce Job完成一个join操作。Map端负责读取join操作所需表的数据并按照关联字段进行分区通过Shuffle将其发送到Reduce端相同key的数据在Reduce端完成最终的Join操作。 如果不指定MapJoin或者不符合MapJoin的条件那么Hive解析器会将Join操作转换成Common Join即在Reduce阶段完成join。
整个过程包含Map、Shuffle、Reduce阶段。
1Map阶段
Step1: 读取源表的数据Map输出时候以Join on条件中的列为key如果Join有多个关联键则以这些关联键的组合作为key;
Step2: Map输出的value为join之后所关心的(select或者where中需要用到的)列同时在value中还会包含表的Tag信息用于标明此value对应哪个表
Step3: 按照key进行排序。
2Shuffle阶段
根据key的值进行hash并将key/value按照hash值推送至不同的reduce中这样确保两个表中相同的key位于同一个reduce中。
3Reduce阶段
根据key的值完成join操作期间通过Tag来识别不同表中的数据。
举个例子 SELECT t1.stu_id ,t1.score ,t2.sex from ds_hive.ch7_score_info t1 join ds_hive.ch7_stu_info t2 on t1.stu_idt2.stu_id ;
执行计划如下完整的common join 会完整的经过map-reduce阶段 执行过程如下 JOIN操作涉及合并两个或多个表的数据以便通过共同的列值将它们关联起来。这样的关联操作在处理大规模数据时可能会面临一些性能挑战因此有必要进行优化。
性能开销 JOIN操作通常涉及将分布在不同节点上的数据进行合并。在传统的MapReduce执行环境中这意味着需要进行数据的分发、排序和聚合操作这些操作都会带来较大的性能开销。
Shuffle开销 在传统的MapReduce中JOIN操作的Shuffle阶段涉及将相同键的数据合并到一起。这个过程需要大量的网络通信和数据传输尤其是当数据分布不均匀时。
内存消耗 处理大规模数据的JOIN操作可能需要大量的内存特别是在进行排序和合并时。这可能导致内存不足的问题进而影响性能。
复杂度 JOIN操作可能涉及复杂的计算特别是在关联多个表或在多列上进行关联时。这增加了查询的复杂性可能导致较长的执行时间。 Map Join 大表join小表
Map Join算法可以通过两个只有map阶段的Job完成一个join操作。其适用场景为大表join小表。因为只经mapmap阶段减少了shuffle的处理reduce的读取和处理过程从而进行性能优化。若某join操作满足要求则
第一个Job会读取小表数据将其制作为hash table并上传至Hadoop分布式缓存本质上是上传至每个执行任务的NodeManager节点本地磁盘。
第二个Job会先从分布式缓存中读取小表数据并缓存在Map Task的内存中然后扫描大表数据这样在map端即可完成关联操作。 Map Join有两种触发方式一种是用户在SQL语句中增加hint提示另外一种是Hive优化器根据参与join表的数据量大小自动触发。
1Hint提示
用户可通过如下方式指定通过map join算法并且将作为map join中的小表。这种方式已经过时不推荐使用。 hive (default) select /* mapjoin(ta) */ ta.id, tb.id from table_a ta join table_b tb on ta.idtb.id ;
2自动触发
Hive在编译SQL语句阶段起初所有的join操作均采用Common Join算法实现。
之后在物理优化阶段Hive会根据每个Common Join任务所需表的大小判断该Common Join任务是否能够转换为Map Join任务若满足要求便将Common Join任务自动转换为Map Join任务。
但有些Common Join任务所需的表大小在SQL的编译阶段是未知的例如对子查询进行join操作所以这种Common Join任务是否能转换成Map Join任务在编译阶是无法确定的。
针对这种情况Hive会在编译阶段生成一个条件任务Conditional Task其下会包含一个计划列表计划列表中包含转换后的Map Join任务以及原有的Common Join任务。最终具体采用哪个计划是在运行时决定的。大致思路如下图所示 Map join自动转换的具体判断逻辑如下图所示 参数如下 --启动Map Join自动转换 set hive.auto.convert.jointrue; --一个Common Join operator转为Map Join operator的判断条件,若该Common Join相关的表中,存在n-1张表的大小总和该值,则生成一个Map Join计划,此时可能存在多种n-1张表的组合均满足该条件,则hive会为每种满足条件的组合均生成一个Map Join计划,同时还会保留原有的Common Join计划作为后备(back up)计划,实际运行时,优先执行Map Join计划若不能执行成功则启动Common Join后备计划。 set hive.mapjoin.smalltable.filesize25000000; --开启无条件转Map Join set hive.auto.convert.join.noconditionaltaskfalse; --无条件转Map Join时的小表之和阈值,若一个Common Join operator相关的表中存在n-1张表的大小总和该值,此时hive便不会再为每种n-1张表的组合均生成Map Join计划,同时也不会保留Common Join作为后备计划。而是只生成一个最优的Map Join计划。 set hive.auto.convert.join.noconditionaltask.size 20971520;
3示例SQL set hive.auto.convert.joinfalsetrue; explain SELECT t1.province_id ,count(*) as cnt from ds_hive.ch12_order_detail_orc t1 join ds_hive.ch12_product_info_orc t3 on t1.province_idt3.id group by t1.province_id ;
参数设置为false未优化既有map又有reduce然后join是在reduce阶段 执行完时间 161s 参数设置为true优化第一个map加载本地文件第二个map进行join 执行时间35s 结论未开启mapjoin进行commonjoin执行时间161s使用mapjoin执行时间35s执行效率大大提升
接着我们再来测试另外一个参数。调整hive.auto.convert.join.noconditionaltask.size参数小于此设置的表会识别为小表使其小于t3 表 的大小 set hive.auto.convert.jointrue; --重要一般企业来调整此参数来决定小表的大小比如让一些略大于小表阈值的表进行mapjoin set hive.auto.convert.join.noconditionaltask.size252300; explain SELECT t1.user_id ,t1.create_time ,t3.id from ds_hive.ch12_order_detail_orc t1 join ds_hive.ch12_product_info_orc t3 on t1.province_idt3.id ; --------此类优化用的少 set hive.auto.convert.join.noconditionaltaskfalse; set hive.mapjoin.smalltable.filesize 15230000;
第一个sql因为设置了.noconditionaltask.size252300小于表的大小最终选择了commonjoin执行第二遍我们关闭有条件执行由于smalltable.filesize大于小表只有commonjoin这时候调大set hive.mapjoin.smalltable.filesize379000002;让其小表大于smalltable.filesize这时候最终会选择mapjoin。
Bucket Map Join大表join大表
两张表都相对较大若采用普通的Map Join算法则Map端需要较多的内存来缓存数据当然可以选择为Map段分配更多的内存来保证任务运行成功。但是Map端的内存不可能无上限的分配所以当参与Join的表数据量均过大时就可以考虑采用Bucket Map Join算法。Bucket Map Join是对Map Join算法的改进其打破了Map Join只适用于大表join小表的限制可用于大表join大表的场景。
Bucket Map Join的核心思想是若能保证参与join的表均为分桶表且关联字段为分桶字段且其中一张表的分桶数量是另外一张表分桶数量的整数倍就能保证参与join的两张表的分桶之间具有明确的关联关系所以就可以在两表的分桶间进行Map Join操作了。这样一来第二个Job的Map端就无需再缓存小表的全表数据了而只需缓存其所需的分桶即可。其原理如图所示 优化条件
1 set hive.optimize.bucketmapjoin true;
2 一个表的bucket数是另一个表bucket数的整数倍
3 bucket列 join列
4 必须是应用在map join的场景中
Bucket Map Join不支持自动转换发须通过用户在SQL语句中提供如下Hint提示并配置如下相关参数方可使用。
1Hint提示 hive (default) select /* mapjoin(ta) */ ta.id, tb.id from table_a ta join table_b tb on ta.idtb.id;
2相关参数 --关闭cbo优化cbo会导致hint信息被忽略 set hive.cbo.enablefalse; --map join hint默认会被忽略(因为已经过时)需将如下参数设置为false set hive.ignore.mapjoin.hintfalse; --启用bucket map join优化功能 set hive.optimize.bucketmapjoin true;
Sort Merge Bucket Map Join大表join大表
Sort Merge Bucket Map Join简称SMB Map Join基于Bucket Map Join。SMB Map Join要求参与join的表均为分桶表且需保证分桶内的数据是有序的且分桶字段、排序字段和关联字段为相同字段且其中一张表的分桶数量是另外一张表分桶数量的整数倍。
SMB Map Join同Bucket Join一样同样是利用两表各分桶之间的关联关系在分桶之间进行join操作不同的是分桶之间的join操作的实现原理。Bucket Map Join两个分桶之间的join实现原理为Hash Join算法而SMB Map Join两个分桶之间的join实现原理为Sort Merge Join算法。
Hash Join和Sort Merge Join均为关系型数据库中常见的Join实现算法。Hash Join的原理相对简单就是对参与join的一张表构建hash table然后扫描另外一张表然后进行逐行匹配。Sort Merge Join需要在两张按照关联字段排好序的表中进行其原理如图所示 Hive中的SMB Map Join就是对两个分桶的数据按照上述思路进行Join操作。可以看出SMB Map Join与Bucket Map Join相比在进行Join操作时Map端是无需对整个Bucket构建hash table也无需在Map端缓存整个Bucket数据的每个Mapper只需按顺序逐个key读取两个分桶的数据进行join即可。
Sort Merge Bucket Map Join有两种触发方式包括Hint提示和自动转换。Hint提示已过时不推荐使用。下面是自动转换的相关参数 --启动Sort Merge Bucket Map Join优化 set hive.optimize.bucketmapjoin.sortedmergetrue; --使用自动转换SMB Join set hive.auto.convert.sortmerge.jointrue;
两张表都相对较大除了可以考虑采用Bucket Map Join算法还可以考虑SMB Join。相较于Bucket Map JoinSMB Map Join对分桶大小是没有要求的。
谓词下推
谓词下推predicate pushdown是指尽量将过滤操作前移以减少后续计算步骤的数据量。数仓实际开发中经常会涉及到多表关联这个时候就会涉及到on与where的使用。一般在面试的时候会提问条件写在where里和写在on有什么区别
相关参数为 --是否启动谓词下推predicate pushdown优化 set hive.optimize.ppd true;
示例SQL语句 hive (default) explain SELECT t1.id ,t2.province_name from ds_hive.ch12_order_detail_orc t1 left join ds_hive.ch12_province_info_orc t2 on t1.province_idt2.id where t1.product_num20 and t2.province_name江苏 ;
关闭谓词下推优化 --是否启动谓词下推predicate pushdown优化 set hive.optimize.ppd false; --为了测试效果更加直观关闭cbo优化 set hive.cbo.enablefalse; --为了测试效果更加直观关闭mapjoin set hive.auto.convert.joinfalse;
通过执行计划可以看到当我们把谓词下推关闭以后数据是所有数据关联以后才进行过滤的这样如果量表数据量大就大大降低了我们的执行效率 开启谓词下推优化 --是否启动谓词下推predicate pushdown优化 set hive.optimize.ppd true; --为了测试效果更加直观关闭cbo优化 set hive.cbo.enablefalse; 通过执行计划可以看出过滤操作位于执行计划中的join操作之前。大大减少了关联的数据量。对整体执行效率有很大提升。
开启谓词执行做关联优化一下SQL --是否启动谓词下推predicate pushdown优化 set hive.optimize.ppd true; --为了测试效果更加直观关闭cbo优化 set hive.cbo.enablefalse; -- 将where条件里关于t2的过滤放到on条件后 explain SELECT t1.id ,t2.province_name from ds_hive.ch12_order_detail_orc t1 left join ds_hive.ch12_province_info_orc t2 on t1.province_idt2.id and t2.province_name江苏 where t1.product_num20 ; 通过执行计划可以看出t1和t2过滤操作都位于执行计划中的join操作之前对俩个表都是先过滤再关联效率更一步提升。
当我们使用左关联的时候1.所有条件写在where中只有左边的条件先过滤2.当所有条件写在 on 里面只有右边的条件起作用3.为了可以让条件都起作用就把左表条件写在where里右边条件写在 on 里两者都先过滤。
结论
对于Join(Inner Join)、Full outer Join条件写在on后面还是where后面性能上面没有区别join谓词下推都生效Full outer Join都不生效对于Left outer Join 右侧的表写在on后面、左侧的表写在where后面性能上有提高对于Right outer Join左侧的表写在on后面、右侧的表写在where后面性能上有提高 合理选择排序
order by 全局排序只走一个reducer当表数据量较大时容易计算不出来性能不佳慎用在严格模式下需要加limit
sort by 局部排序即保证单个reduce内结果有序但没有全局排序的能力。
distribute by 按照指定的字段把数据划分输出到不同的reducer中是控制数据如何从map端输出到reduce端hive会根据distribute by后面的字段和对应reducer的个数进行hash分发
cluster by 拥有distrubute by的能力同时也拥有sort by的能力所以可以理解cluster by是 distrubute bysort by
实例代码优化
-- 优化前
select
id
,count(*) as cnt
from ds_hive.ch12_order_detail_orc t1
group by id
order by cnt
limit 100
;执行时间109.9s -- 优化后
select
id
,cnt
from
(select id ,count(*) as cntfrom ds_hive.ch12_order_detail_orc t1group by id
) t1
distribute by cnt
sort by id
limit 100
;
执行时间69s 通过优化前后时间对比可以看到优化效果
注意实际企业运维可以通过参数 set hive.mapred.modestrict 来设置严格模式这个时候使用 orderby 全局排序必须加 limit建议如果不是非要全局有序的话局部有序的话建议使用 sortby,它会视情况启动多个 reducer 进行排序并且保证每个 reducer 内局部有序。为了控制map 端数据分配到 reducer 的 key往往还要配合 distribute by 一同使用。如果不加 distribute by 的话map 端数据就会随机分配到 reducer。