网站seo推广,提高网站目标流量,网络营销代运营服务,网站开发总结 优帮云题目
1 讲一下你门公司的大数据项目架构#xff1f;2 你在工作中都负责哪一部分3 spark提交一个程序的整体执行流程4 spark常用算子列几个#xff0c;6到8个吧5 transformation跟action算子的区别6 map和flatmap算子的区别7 自定义udf#xff0c;udtf#xff0c;udaf讲一下…题目
1 讲一下你门公司的大数据项目架构2 你在工作中都负责哪一部分3 spark提交一个程序的整体执行流程4 spark常用算子列几个6到8个吧5 transformation跟action算子的区别6 map和flatmap算子的区别7 自定义udfudtfudaf讲一下这几个函数的区别编写的时候要继承什么类实现什么方法8 hive创建一个临时表有哪些方法9 讲一下三范式三范式解决了什么问题有什么优缺点10 讲一下维度建模的过程 11 维度表有哪几种12 事实表有几种13 什么是维度一致性总线架构事实一致性14 什么是缓慢变化维有哪几种15 什么是拉链表如何实现16 什么是微型维度、支架表什么时候会用到17 讲几个你工作中常用的spark 或者hive 的参数以及这些参数做什么用的18 工作中遇到数据倾斜处理过吗是怎么处理的针对你刚刚提的方案讲一下具体怎么实现。用代码实现以及用sql实现。19 讲一下kafka对接flume 有几种方式。20 讲一下spark是如何将一个sql翻译成代码执行的里面的原理介绍一下21 spark 程序里面的count distinct 具体是如何执行的22 不想用spark的默认分区怎么办自定义Partitioner 实现里面要求的方法 具体是哪几个方法23 有这样一个需求统计一个用户的已经曝光了某一个页面想追根溯是从哪几个页面过来的然后求出在这几个来源所占的比例。你要怎么建模处理23 说一下你对元数据的理解哪些数据算是元数据24 有过数据治理的经验吗25 说一下你门公司的数据是怎么分层处理的每一层都解决了什么问题26 讲一下星型模型和雪花模型的区别以及应用场景
答案
1 讲一下你门公司的大数据项目架构
实时流和离线计算两条线 数仓输入客户端日志服务端日志数据库 传输过程flumekafka 数仓输出报表画像推荐等 2 你在工作中都负责哪一部分
离线数据数仓建模、数据治理、业务开发、稳定性 3 Spark提交一个程序的整体执行流程
包括向yarn申请资源、DAG切割、TaskScheduler、执行task等过程
Spark运行的基本流程
用户在Driver端提交任务初始化运行环境(SparkContext等)Driver根据配置向ResoureManager申请资源(executors及内存资源)ResoureManager资源管理器选择合适的worker节点创建executor进程Executor向Driver注册并等待其分配task任务Driver端完成SparkContext初始化创建DAG分配taskset到Executor上执行。Executor启动线程执行task任务返回结果。
Spark On Yarn的基本流程:
Spark Driver端提交程序并向Yarn申请ApplicationYarn接受请求响应在NodeManager节点上创建AppMasterAppMaster向Yarn ResourceManager申请资源(Container)选择合适的节点创建Container(Executor进程)后续的Driver启动调度运行任务
Yarn Client、Yarn Cluster模式在某些环节会有差异但是基本流程类似。 参考十分钟彻底弄懂Spark内存管理机制 - 掘金 Spark作业基本运行原理 详细原理见上图。我们使用spark-submit提交一个Spark作业之后这个作业就会启动一个对应的Driver进程。根据你使用的部署模式deploy-mode不同Driver进程可能在本地启动也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数占有一定数量的内存和CPU core。而Driver进程要做的第一件事情就是向集群管理器可以是Spark Standalone集群也可以是其他的资源管理集群美团•大众点评使用的是YARN作为资源管理集群申请运行Spark作业需要使用的资源这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数在各个工作节点上启动一定数量的Executor进程每个Executor进程都占有一定数量的内存和CPU core。
在申请到了作业执行所需的资源之后Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage每个stage执行一部分代码片段并为每个stage创建一批task然后将这些task分配到各个Executor进程中执行。task是最小的计算单元负责执行一模一样的计算逻辑也就是我们自己编写的某个代码片段只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后会在各个节点本地的磁盘文件中写入计算中间结果然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复直到将我们自己编写的代码逻辑全部执行完并且计算完所有的数据得到我们想要的结果为止。
Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子比如reduceByKey、join等那么就会在该算子处划分出一个stage界限来。可以大致理解为shuffle算子执行之前的代码会被划分为一个stageshuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候它的每个task可能都会从上一个stage的task所在的节点去通过网络传输拉取需要自己处理的所有key然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作比如reduceByKey()算子接收的函数。这个过程就是shuffle。
当我们在代码中执行了cache/persist等持久化操作时根据我们选择的持久化级别的不同每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。
因此Executor的内存主要分为三块第一块是让task执行我们自己编写的代码时使用默认是占Executor总内存的20%第二块是让task通过shuffle过程拉取了上一个stage的task的输出后进行聚合等操作时使用默认也是占Executor总内存的20%第三块是让RDD持久化时使用默认占Executor总内存的60%。
task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task都是以每个task一条线程的方式多线程并发运行的。如果CPU core数量比较充足而且分配到的task数量比较合理那么通常来说可以比较快速和高效地执行完这些task线程。
以上就是Spark作业的基本运行原理的说明大家可以结合上图来理解。理解作业基本原理是我们进行资源参数调优的基本前提。 参考Spark性能优化指南——基础篇 - 美团技术团队 4 Spark常用算子列几个6到8个吧
常用的RDD转换算子
filter(func) 筛选出满足函数func的元素并返回一个新的数据集map(func) 将每个元素传递到函数func中并将结果返回为一个新的数据集flatMap(func) 与map()相似但每个输入元素都可以映射到0或多个输出结果groupByKey() 应用于(K,V)键值对的数据集时返回一个新的(K, Iterable)形式的数据集reduceByKey(func) 应用于(K,V)键值对的数据集时返回一个新的(K, V)形式的数据集其中每个值是将每个key传递到函数func中进行聚合后的结果
行动操作常用算子
count() 返回数据集中的元素个数collect() 以数组的形式返回数据集中的所有元素first() 返回数据集中的第一个元素take(n) 以数组的形式返回数据集中的前n个元素reduce(func) 通过函数func输入两个参数并返回一个值聚合数据集中的元素foreach(func) 将数据集中的每个元素传递到函数func中运行
5 transformation跟action算子的区别
所有的transformation都是采用的懒策略就是如果只是将transformation提交是不会执行计算的计算只有在action被提交的时候才被触发。
Transformation 变换/转换这种变换并不触发提交作业完成作业中间过程处理。Transformation 操作是延迟计算的也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行需要等到有 Action 操作的时候才会真正触发运算。Action 行动算子这类算子会触发 SparkContext 提交 Job 作业。 Action 算子会触发 Spark 提交作业Job。
transformation操作
map(func):对调用map的RDD数据集中的每个element都使用func然后返回一个新的RDD,这个返回的数据集是分布式的数据集filter(func): 对调用filter的RDD数据集中的每个元素都使用func然后返回一个包含使func为true的元素构成的RDDflatMap(func):和map差不多但是flatMap生成的是多个结果mapPartitions(func):和map很像但是map是每个element而mapPartitions是每个partitionmapPartitionsWithSplit(func):和mapPartitions很像但是func作用的是其中一个split上所以func中应该有indexsample(withReplacement,faction,seed):抽样union(otherDataset)返回一个新的dataset包含源dataset和给定dataset的元素的集合distinct([numTasks]):返回一个新的dataset这个dataset含有的是源dataset中的distinct的elementgroupByKey(numTasks):返回(K,Seq[V])也就是hadoop中reduce函数接受的key-valuelistreduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比如求和求平均数sortByKey([ascending],[numTasks]):按照key来进行排序是升序还是降序ascending是boolean类型join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W)返回的是(K,(V,W))的dataset,numTasks为并发的任务数cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W)返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数cartesian(otherDataset)笛卡尔积就是m*n大家懂的
action操作
reduce(func)说白了就是聚集但是传入的函数是两个参数输入返回一个值这个函数必须是满足交换律和结合律的collect()一般在filter或者足够小的结果的时候再用collect封装返回一个数组count():返回的是dataset中的element的个数first():返回的是dataset中的第一个元素take(n):返回前n个elements这个士driverprogram返回的takeSample(withReplacementnumseed)抽样返回一个dataset中的num个元素随机种子seedsaveAsTextFilepath把dataset写到一个textfile中或者hdfs或者hdfs支持的文件系统中spark把每条记录都转换为一行记录然后写到file中saveAsSequenceFile(path):只能用在key-value对上然后生成SequenceFile写到本地或者hadoop文件系统countByKey()返回的是key对应的个数的一个map作用于一个RDDforeach(func):对dataset中的每个元素都使用func 参考Spark常用算子详解_spark算子-CSDN博客 6 map和flatMap算子的区别
map执行完map后会得到一个新的分布式数据集数据集中每个元素是之前的RDD映射得来的与之前RDD每个元素存在一一对应的关系。flatmap而flatmap有一点不同每个输入的元素可以被映射为0个或者多个输出的元素原RDD与新RDD的元素是一对多的关系。当然光看定义比较抽象下面用一个图说明 参考Spark之Map VS FlatMap - 知乎 7 自定义udfudtfudaf讲一下这几个函数的区别编写的时候要继承什么类实现什么方法
区别
UDF输入一行输出一行 UDF用户定义普通函数只对单行数值产生作用UDTF输入一行输出多行类似explode函数 UDTFUser-Defined Table-Generating Functions用户定义表生成函数用来解决输入一行输出多行UDAF输入多行输出一行类似聚合函数 UDAFUser- Defined Aggregation Funcation用户定义聚合函数可对多行数据产生作用等同与SQL中常用的SUM()AVG()也是聚合函数
Hive实现
类型类方法UDF 类 GenericUDF 包路径 org.apache.hadoop.hive.ql.udf.generic initialize类型检查返回结果类型 入参ObjectInspector[] 出参ObjectInspector evaluate功能逻辑实现 入参DeferredObject[] 出参Object getDisplayString函数名称 入参String[] 出参String close关闭函数释放资源等 入参无 出参void UDTF 类 GenericUDTF 包路径 org.apache.hadoop.hive.ql.udf.generic initialize类型检查返回结果类型 入参StructObjectInspector 出参StructObjectInspector process功能逻辑实现 **调用forward输出一行数据可多次调用 入参Object[] 出参void close关闭函数释放资源等 入参无 出参void UDAF 类 AbstractGenericUDAFResolver 包路径 org.apache.hadoop.hive.ql.udf.generic 类 GenericUDAFEvaluator 包路径 org.apache.hadoop.hive.ql.udf.generic 类 AbstractAggregationBuffer 包路径 org.apache.hadoop.hive.ql.udf.generic -----AbstractGenericUDAFResolver----- getEvaluator获取计算器 入参TypeInfo[] 出参GenericUDAFEvaluator ---------GenericUDAFEvaluator---------- init 入参ModeObjectInspector[] 出参ObjectInspector getNewAggregationBuffer 入参无 出参AggregationBuffer reset: 入参AggregationBuffer 出参void iterate 入参AggregationBufferObject[] 出参void merge 入参AggregationBufferObject 出参void terminate 入参AggregationBuffer 出参Object terminatePartial 入参AggregationBuffer 出参Object --------AbstractAggregationBuffer------- estimate评估内存占用大小 入参无 出参int
UDAF说明
一个Buffer作为中间处理数据的缓冲获取getNewAggregationBuffer、重置reset四个阶段Mode PARTIAL1Map阶段 from original data to partial aggregation data: iterate() and terminatePartial() will be called.PARTIAL2Map的Combiner阶段 from partial aggregation data to partial aggregation data: merge() and terminatePartial() will be called.FINALReduce 阶段 from partial aggregation to full aggregation: merge() and terminate() will be called.COMPLETEMap Only阶段 from original data directly to full aggregation: iterate() and terminate() will be called.五个方法 初始化init遍历iteratePARTIAL1和COMPLETE阶段合并mergePARTIAL2和FINAL阶段终止terminatePartialPARTIAL1和PARTIAL2阶段terminateCOMPLETE和FINAL阶段
Spark实现 参考Spark - 自定义函数UDF、UDAF、UDTF - 知乎 8 hive创建一个临时表有哪些方法
WITH创建临时表
如果这个临时表并不需要保存并且下文只需要用有限的几次我们可以采用下面的方法。
with as 也叫做子查询部分首先定义一个sql片段该sql片段会被整个sql语句所用到为了让sql语句的可读性更高些作为提供数据的部分也常常用在union等集合操作中。
with as就类似于一个视图或临时表可以用来存储一部分的sql语句作为别名不同的是with as 属于一次性的而且必须要和其他sql一起使用才可以
其最大的好处就是适当的提高代码可读性而且如果with子句在后面要多次使用到这可以大大的简化SQL更重要的是一次分析多次使用这也是为什么会提供性能的地方达到了“少读”的目标。
WITH t1 AS (SELECT *FROM a), t2 AS (SELECT *FROM b)
SELECT *
FROM t1
JOIN t2
;
注意
这里必须要整体作为一条sql查询即with as语句后不能加分号不然会报错。with子句必须在引用的select语句之前定义,同级with关键字只能使用一次,多个只能用逗号分割如果定义了with子句但其后没有跟select查询则会报错前面的with子句定义的查询在后面的with子句中可以使用。但是一个with子句内部不能嵌套with子句
Temporary创建临时表
create temporary table 临时表表名 as
select * from 表名
创建的临时表仅仅在当前会话可见数据会被暂存到hdfs上退出当前会话表和数据将会被删除。数据将存储在用户的scratch目录中并在会话结束时删除。 从Hive1.1开始临时表可以存储在内存或SSD使用hive.exec.temporary.table.storage参数进行配置该参数有三种取值memory、ssd、default。 如果内存足够大将中间数据一直存储在内存可以大大提升计算性能。 如果临时表的命名的表名和hive的表名一样当前会话则会查询临时表的数据用户在这个会话内将不能使用原表除非删除或者重命名临时表临时表不支持分区字段不支持创建索引。 参考大数据开发之Hive篇7-Hive临时表 - 知乎 9 讲一下三范式三范式解决了什么问题有什么优缺点
三范式
第一范式列的原子性字段值不可再分比如某个字段的取值是姓名手机号那就要把姓名和手机号分成两个字段第二范式第一范式的基础上非主键列不能依赖主键的一部分例如字段a和字段b组成的主键某个字段只依赖a就需要把这个字段剥离到a对应的表第三范式第二范式的基础上非主键列不能传递依赖主键例如字段c依赖字段b字段b依赖主键字段a那么就可以把这个字段c剥离到字段b为主键的表
三范式是要解决字段冗余节省存储空间数据维护更方便不需要多处更新同样的字段
缺点是不方便查询要进行多表join效率低不适合分析类的查询。 范式化设计的优点可以减少数据冗余数据表体积小更新快范式化的更新操作比 反范式化更快范式化的表通常比反范式化更小。 缺点对于查询需要对多个表会关联多个表在应用中进行表关联的成本是很高 更难进行索引优化 反范式化设计的优点可以减少表的关联可以对查询更好的进行索引优化 缺点表结构存在数据冗余和数据维护异常对数据的修改需要更多资源。 因此在设计数据库结构的时候要将反范式化和范式化结合起来 参考你了解数据库三大范式吗用来解决什么问题_数据库三范式解决了什么问题_我是等闲之辈的博客-CSDN博客mysql--数据库优化的目的、数据库设计的步骤以及什么是三范式、三范式的优缺点-CSDN博客 10 讲一下维度建模的过程
四个步骤①选择业务过程 ②确定粒度 ③确定维度 ④确定事实表 维度模型是数据仓库领域的 Ralph Kimball 大师所倡导的他的The Data Warehouse Toolkit-The Complete Guide to Dimensional Modeling 是数据仓库工程领域最流行的数据仓库建模的经典。 维度建模从分析决策的需求出发构建模型为分析需求服务因此它重点关注用户如何更快速地完成需求分析同时具有较好的大规模复杂查询的响应性能。其典型的代表是星形模型以及在一些特殊场景下使用的雪花模型。其设计分为以下几个步骤。 第一步选择业务过程
选择需要进行分析决策的业务过程。业务过程可以是单个业务事 件比如交易的支付、退款等;也可以是某个事件的状态比如 当前的账户余额等;还可以是一系列相关业务事件组成的业务流 程具体需要看我们分析的是某些事件发生情况还是当前状态 或是事件流转效率。
第二步确定粒度
选择粒度。在事件分析中我们要预判所有分析需要细分的程度从而决定选择的粒度。粒度是维度的一个组合。
第三步确定维度
识别维表。选择好粒度之后就需要基于此粒度设计维表包括维度属性用于分析时进行分组和筛选。
第四步确定事实
选择事实。确定分析需要衡量的指标 。 参考【总结】维度数据建模过程及举例-腾讯云开发者社区-腾讯云 11 维度表有哪几种
按是否规范化设计划分分符合三范式的维表和反规范化的维表对应雪花模型和星型模型按是否包含属性层次结构分分包含层次结构的维表和不包含层次结构的维表如行业维表就是包含层次结构的维表递归层次一般采用①扁平化、②层次桥接表两种方式设计按水平拆分基于业务类型可以划分为主维表存放公共属性子维度表包含公共属性和特有属性按垂直拆分基于性能可以划分为主维表存放使用频率高的属性扩展维表存放使用频率低的属性按是否归档处理分为历史维度表普通表按缓慢变化维划分一种是使用代理键的就是Kimball中的8种类型的缓慢变化维度如果不使用代理键则划分快照维表、采用极限存储的维度表一些特殊维度类型支架维度、杂项维度、行为维度事实衍生维度如买家常用地址、多值维度 参考数据仓库系列4-维度表 - 知乎 12 事实表有几种
事务性事实表又分为单事务事实表多事务事实表周期快照事实表又分为单维度的每天快照事实表混合维度的每天快照事实表全量快照事实表累计快照事实表 13 什么是维度一致性总线架构事实一致性
维度建模的数据仓库中有一个概念叫Conformed Dimension中文一般翻译为“一致性维度”。一致性维度是Kimball的多维体系结构中的三个关键性概念之一另两个是总线架构Bus Architecture和一致性事实Conformed Fact。
总线架构初期进行需求沟通和整体设计的产物汇总一致性维度和业务过程的表格
一致性维度维度定义和维表实现的同一性
一致性事实指标定义(包括单位)和实现的一致性 总线矩阵业务过程和维度的交点 一致性维度同一集市的维度表内容相同或包含 一致性事实不同集市的同一事实需保证口径一致单位统一。 参考一篇文章搞懂数据仓库总线架构、一致性维度、一致性事实-阿里云开发者社区 14 什么是缓慢变化维有哪几种
缓慢变化维SCDSlowly Changing Dimensions数仓的特点之一就是反映历史变化与增长较快的事实表相比维度变化相对缓慢。Kimball整理的处理方法一共有8种但往往只有3种比较常用类型1、类型2、类型3。
类型0属性值不可能变保留原样。类型1重写覆盖原值类型1不能反映历史。类型2新增一行并生成新的代理键事实表旧的记录使用旧的代理键新的记录使用新的代理键同时维表增加创建时间和截止时间来标识最新有效的记录和历史记录新的记录有效期可以设置一个极大值如9999.12.31类型2的缺点是旧的记录只能用旧的维度分析新的记录只能用新的维度进行分析。类型3类型1只能满足新维度分析的需求类型2既不能满足新维度分析的需求也不能满足旧维度分析的需求如果要同时满足新旧两种维度分析的需求可以考虑使用类型3。类型3是通过两个字段分别存储新旧值来满足这种两种同时都要的需求的但是类型3的问题在如果只变化一次还行但如果变化第2次、第3次就需要增加新的字段会比较麻烦。类型4微型维度。解决的是高频变更导致的记录过多的问题剥离高频变化的维度如果是数值可以用范围值来减少数量有多个高频变化的维度就用这些维度的笛卡尔积来组合成维表可以用组合缩写来生成代理键注意微型维度没有自然键。另外如果需要记录精确值可以考虑无事实的事实表。需要注意的是类型4只能在事实表中出现如果维度表和微型维度发生的关联那就是类型5。类型5类型1微型维度。主维表通过类型1关联微信维度反映最新值微型维度与事实表进行关联通过事实表来反映历史变化。类型6类型1类型2类型3。通过类型3构建新旧两列同一个自然键的通过类型1更新为最新值通过类型2处理历史变化。类型6的缺点是如果这种列很多150个列那就要翻倍来存300个列这是可以考虑类型7。类型7双类型1类型2。用类型2生成主维表同时生成一个最新值的视图取9999.12.31事实表中存放两个外键或者单个外键连关联这两张维表。代理键可以反映当时的变化自然键超自然键可以反映当前最新值。双重外键一个键存放主维表的代理键反映当时的情况一个键存放最新视图的自然键最好是超自然键反映最新情况。单外键存的是主维表的代理键视图里存当时的代理键和最新的代理键。 参考深入解析缓慢变化维 - 知乎 15 什么是拉链表如何实现
拉链表
主要是为了记录历史变化并节省存储空间。适合大数据量场景下变化频率又不高的情况。如果每天存储全量表很多都是重复的记录造成浪费。可以应用在维表也可以应用在事实表。比如某个员工维表为了反映部门历史变动订单事实表为了反映订单状态的历史变化未支付、已支付、已发货、已完成。
实现过程
有些业务系统的表不保存变更流水记录的只保存最新的值比如订单表只保留订单的最新状态。如果要反映历史变化需要跟进binlog日志或者定时采集来记录变化。具体实现步骤
第一步创建并初始化拉链表执行一次第二步合并当日新增和修改的数据到临时表每天执行一次 每日变更的记录uion all拉链表的记录注意要修改历史记录的有效日期。第三步把临时表覆盖写入拉链表每天执行一次
进阶问题
拉链表如何回滚参考https://www.cnblogs.com/lxbmaomao/p/9821128.html
极限存储怎么实现参考拉链表-极限存储-CSDN博客 参考 拉链表的详细实现过程好文点赞收藏_拉链表的实现过程_KG大数据的博客-CSDN博客 https://www.cnblogs.com/lxbmaomao/p/9821128.html 16 什么是微型维度、支架表什么时候会用到
微型维度缓慢变化维度类型4参考第14题支架表 是一种受限的雪花维度是星型模型和雪花模型之间的一种折中如日期维度表、地址维度表使用场景当一个属性集合例如日期、地点在某个维度或多个维度表中反复出现时就可以考虑使用支架表。使用条件 ①在单个维度表中反复出现该支架属性时 ②被调用的属性值较多时 ③被多个维度、事实表调用且被调用时的属性值定义完全相同 ④基本不需要修改或修改频次极小 参考 深入解析缓慢变化维 - 知乎 深入解析支架表 - 知乎 17 讲几个你工作中常用的spark 或者hive 的参数以及这些参数做什么用的
hive参数
----------------------小文件合并----------------------
----------map输入端合并-----------
## Map端输入、合并文件之后按照block的大小分割默认
set hive.input.formatorg.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
## Map端输入不合并
set hive.input.formatorg.apache.hadoop.hive.ql.io.HiveInputFormat;
------------输出端合并------------
## 是否合并Map输出文件, 默认值为true
set hive.merge.mapfilestrue;
## 是否合并Reduce端输出文件,默认值为false
set hive.merge.mapredfilestrue;
## 合并文件的大小,默认值为256000000 256M
set hive.merge.size.per.task256000000;
## 每个Map 最大分割大小
set mapred.max.split.size256000000;
## 一个节点上split的最少值
set mapred.min.split.size.per.node1; // 服务器节点
## 一个机架上split的最少值
set mapred.min.split.size.per.rack1; // 服务器机架
hive.merge.size.per.task 和 mapred.min.split.size.per.node 联合起来##1、默认情况先把这个节点上的所有数据进行合并如果合并的那个文件的大小超过了256M就开启另外一个文件继续合并
##2、如果当前这个节点上的数据不足256M那么就都合并成一个逻辑切片。----------------------Map Task并行度----------------------
## 切片大小计算公式 long splitSize Math.max(minSize, Math.min(maxSize, blockSize))
## dfs.blocksize128MB split.minsize1 split.maxsize256MBset mapreduce.input.fileinputformat.split.minsize1
set mapreduce.input.fileinputformat.split.maxsize256000000## 输入文件总大小total_size
## HDFS 设置的数据块大小dfs_block_size
## default_mapper_num total_size / dfs_block_size
## mapred.map.tasks这个参数设置只有在大于 default_mapper_num 的时候才会生效
set mapred.map.tasks10; ## 默认值是2## map task计算公式
## split_size max(mapred.min.split.size, dfs_block_size)
## split_num total_size / split_size
## compute_map_num Math.min(split_num, Math.max(default_mapper_num,
## mapred.map.tasks))## 总结
## 1、如果想增加 MapTask 个数可以设置 mapred.map.tasks 为一个较大的值
## 2、如果想减少 MapTask 个数可以设置 maperd.min.split.size 为一个较大的值
## 3、如果输入是大量小文件想减少 mapper 个数可以通过设置 hive.input.format 合并小文----------------------Reduce Task并行度----------------------
## 参数1hive.exec.reducers.bytes.per.reducer (默认256M)
## 参数2hive.exec.reducers.max (默认为1009)
## 参数3mapreduce.job.reduces (默认值为-1表示没有设置那么就按照以上两个参数进行设置)
## ReduceTask 的计算公式为:
## N Math.min(参数2总输入数据大小 / 参数1)----------------------Join 优化----------------------
----------开启map join-----------
## 是否根据输入小表的大小自动将reduce端的common join 转化为map join将小表刷入内存中。
## 对应逻辑优化器是MapJoinProcessor
set hive.auto.convert.join true;
## 刷入内存表的大小(字节)
set hive.mapjoin.smalltable.filesize 25000000;
## hive会基于表的size自动的将普通join转换成mapjoin
set hive.auto.convert.join.noconditionaltasktrue;
## 多大的表可以自动触发放到内层LocalTask中默认大小10M
set hive.auto.convert.join.noconditionaltask.size10000000;----------开启bucket map join SMB map join-----------
## 当用户执行bucket map join的时候发现不能执行时禁止查询
set hive.enforce.sortmergebucketmapjoinfalse;
## 如果join的表通过sort merge join的条件join是否会自动转换为sort merge join
set hive.auto.convert.sortmerge.jointrue;
## 当两个分桶表 join 时如果 join on的是分桶字段小表的分桶数是大表的倍数时可以启用
mapjoin 来提高效率。
# bucket map join优化默认值是 false
set hive.optimize.bucketmapjoinfalse;
## bucket map join 优化默认值是 false
set hive.optimize.bucketmapjoin.sortedmergefalse;------------------数据倾斜优化-------------------
---------Map端聚合----------
## 开启Map端聚合参数设置
set hive.map.aggrtrue;
# 设置map端预聚合的行数阈值超过该值就会分拆job默认值100000
set hive.groupby.mapaggr.checkinterval100000
# 自动优化有数据倾斜的时候进行负载均衡默认是false 如果开启设置为true
set hive.groupby.skewindatafalse;
## 1、在第一个 MapReduce 任务中map 的输出结果会随机分布到 reduce 中每个 reduce 做部分聚合操作并输出结果这样处理的结果是相同的group by key有可能分发到不同的 reduce 中从而达到负载均衡的目的
## 2、第二个 MapReduce 任务再根据预处理的数据结果按照 group by key 分布到各个 reduce 中最后完成最终的聚合操作。---------join优化----------
# join的键对应的记录条数超过这个值则会进行分拆值根据具体数据量设置
set hive.skewjoin.key100000;
# 如果是join过程出现倾斜应该设置为true
set hive.optimize.skewjoinfalse;------------------使用 vectorization 矢量查询技术-------------------
set hive.vectorized.execution.enabledtrue ;
set hive.vectorized.execution.reduce.enabledtrue;---------------本地执行优化------------------
## 打开hive自动判断是否启动本地模式的开关
set hive.exec.mode.local.autotrue;
## map任务数最大值不启用本地模式的task最大个数
set hive.exec.mode.local.auto.input.files.max4;
## map输入文件最大大小不启动本地模式的最大输入文件大小
set hive.exec.mode.local.auto.inputbytes.max134217728;---------------并行执行---------------
## 可以开启并行执行。
set hive.exec.paralleltrue;
## 同一个sql允许最大并行度默认为8。
set hive.exec.parallel.thread.number16;---------------严格模式---------------
## 设置Hive的严格模式
set hive.mapred.modestrict;
set hive.exec.dynamic.partition.modenostrict;Spark参数
Spark提交任务参数设置
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 4g --classcom.abc.sparktuning.utils.InitUtil spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
指定集群模式和部署模式 spark-submit中通过--master参数指定集群的资源管理器也可以在代码中硬编码指定master通过--deploy-mode参数指定以client模式运行还是以cluster模式运行。 master常用参数有localDriver和Executor运行在同一个节点的同一个JVM中、Spark Standalone集群Spark自带的简易资源管理器分Master节点和Worker节点、mesos、yarn、k8s。 部署模式client和cluster两种client模式是Driver进程在提交任务的节点运行cluster模式是Driver进程在Woker节点中运行Driver进程。
资源参数设置举例 Executor内存设置以单台服务器 128G 内存32 核为例考虑到系统基础服务和 HDFS 等组件的余量yarn.nodemanager.resource.cpu-vcores 配置为:28核每个 executor 的最大核数。根据经验实践设定在 3~6 之间比较合理。这里设置为4那么每个Yarn节点可以同时跑28 / 4 7个executor。假设集群节点为 10那么 num-executors 7 * 10 70所以num-executors是70executor-cores是4 如果 yarn-nodemanager.resource.memory-mb100G那么每个 Executor 大概就是 100G/7≈14G同时加上堆外内存要不大于 yarn.scheduler.maxinum-allocation-mb 容器最大内存设置因为executor-memory指定的是堆内内存除了堆内内存还有堆外内存。 Driver内存设置Yarn Client 和 Cluster 两种方式提交Executor和Driver的内存分配情况也是不同的。Yarn中的ApplicationMaster都启用一个Container来运行 Client模式下的Container默认有1G内存1个cpu核Cluster模式的配置则由driver-memory和driver-cpu来指定也就是说Client模式下的driver是默认的内存值Cluster模式下的dirver则是自定义的配置。
设置Kryo序列化降低RDD缓存磁盘占用
new SparkConf().set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
...
result.persist(StorageLevel.MEMORY_ONLY_SER) 并行度参数设置
SparkConf conf new SparkConf()
conf.set(spark.default.parallelism, 500)
注意 spark.default.parallelism只有在处理RDD时才会起作用对Spark SQL的无效。 spark.sql.shuffle.partitions则是对Spark SQL专用的设置
我们也可以在提交作业的通过 --conf 来修改这两个设置的值方法如下 spark-submit --conf spark.sql.shuffle.partitions500 --conf spark.default.parallelism500 spark.default.parallelism用户配置同一的并行度统一性 reduceByKey(1000)类shuffle算子具体任务中可以特定配置的并行度。特定性 两者优先级算子类传入的并行度 同一设置的并行度。
spark.sql.shuffle.partitions对sparksql中的joins和aggregations有效但其他的无效针对这种情况可以采用repartition算子对dataframe进行重分区
如果想要让任务运行的最快当然是一个 task 对应一个 vcore,但 是一般不会这样设置为了合理利用资源一般会将并行度(task 数)设置成并发度 (vcore数)的2倍到3倍。
设置2~3倍的具体原因参考如下 因为实际情况与理想情况不同的有些task会运行的快一点比如50s就完了有些task可能会慢一点要1分半才运行完所以如果你的task数量刚好设置的跟cpu core数量相同可能还是会导致资源的浪费因为比如150个task10个先运行完了剩余140个还在运行但是这个时候有10个cpu core就空闲出来了就导致了浪费。那如果task数量设置成cpu core总数的2~3倍那么一个task运行完了以后另一个task马上可以补上来就尽量让cpu core不要空闲同时也是尽量提升spark作业运行的效率和速度提升性能。 18 工作中遇到数据倾斜处理过吗是怎么处理的针对你刚刚提的方案讲一下具体怎么实现。用代码实现以及用sql实现。
过滤无效的导致倾斜的key【适用场景不多】比如过滤null、空字符串、非整数字符串等 方案一剥离null不参与shuffle方案二对null加随机数打散shufflereduce join转为map join【适合小表join大表】spark可以通过broadcast来广播小表增加shuffle并行度【缓解】shuffle算子入reduceByKey(1000)传入并行度数值或者SparkSQL设置spark.sql.shuffle.partitions参数默认200都能提高shuffle read task的并行度两阶段聚合局部聚合整体聚合【适用于聚合类倾斜】 第一步给每个key都打上一个随机前缀。 第二步对打上随机前缀的key进行局部聚合。 第三步去除每个key的随机前缀。 第四步全局聚合。采样倾斜key并分开join【适合大表join大表一个大表有少数几个key倾斜另一个比较均匀】 通过sample采样出倾斜的key然后包这些key从两张表里剥离出来一张加随机数打散成n份一张扩充n倍进行Join剥离出key剩下的数据进行普通join即可最后把结果合并在一起全部数据加随机数打散N份扩容N倍【大表Join大表扩容导致内存消耗大】对join类型的数据倾斜基本都可以处理而且效果也相对比较显著性能提升效果非常不错。
加随机数group by
with tmp1 as (select 1 as id, a as label, 1 as valueunion allselect 1 as id, a as label, 2 as valueunion allselect 1 as id, a as label, 3 as valueunion allselect 2 as id, b as label, 4 as value
)
select label, sum(cnt) as all from
(select rd, label, sum(1) as cnt from(select id, round(rand(),2) as rd, label, value from tmp1) as tmpgroup by rd, label
) as tmp
group by label;
加随机数join
with t1 as (select 1 as id, a as label, 1 as valueunion allselect 1 as id, a as label, 2 as valueunion allselect 1 as id, a as label, 3 as valueunion allselect 2 as id, b as label, 4 as value
), t2 as (select 1 as id, a as label, 10 as valueunion allselect 1 as id, a as label, 20 as valueunion allselect 1 as id, a as label, 30 as valueunion allselect 2 as id, b as label, 40 as value
), tmp1 as (select id,round(rand(),1) as rd,label,value from t1
), tmp2 as (select id, rd, label, value from t2lateral viewexplode(split(0.0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0,,)) mytable as rd
), tmp3 as (select tmp1.rd as rd, tmp1.label as label, tmp1.value*tmp2.value as value fromtmp1jointmp2on tmp1.rd tmp2.rd and tmp1.label tmp2.label
), tmp4 as (select rd, label, sum(value) as value fromtmp3group by rd,label
)
select label,sum(value) as all from
tmp4
group by label;
null处理 方案一不参与shuffle
SELECT *
FROM log a
LEFT JOIN users bON a.user_id IS NOT NULLAND a.user_id b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;
方案二加随机数shfffle
SELECT *
FROM log a
LEFT JOIN users b
ON if(a.user_id is null,concat(hive,rand()),a.user_id) b.user_id; 参考 [Hive]Hive数据倾斜大表join大表_大小表join hive 3.0-CSDN博客 Spark性能优化指南——高级篇 - 美团技术团队 19 讲一下kafka对接flume 有几种方式
三种source、channel、sink
source和sink对接方式Flume对接Kafka详细过程_flume kafka_杨哥学编程的博客-CSDN博客
channel对接方式flume--KafkaChannel的使用_kafka channel为什么没有sink-CSDN博客
20 讲一下spark是如何将一个sql翻译成代码执行的里面的原理介绍一下
SparkSQL主要是通过Catalyst优化器将SQL翻译成最终的RDD算子的
阶段产物执行主体解析Unresolved Logical Plan未解析的逻辑计划sqlParser分析Resolved Logical Plan解析的逻辑计划Analyzer优化Optimized Logical Plan优化后的逻辑计划Optimizer转换Physical Plan物理计划Query Planner 无论是使用 SQL语句还是直接使用 DataFrame 或者 DataSet 算子都会经过Catalyst一系列的分析和优化最终转换成高效的RDD的操作主要流程如下 1. sqlParser 解析 SQL生成 Unresolved Logical Plan未解析的逻辑计划 2. 由 Analyzer 结合 Catalog 信息生成 Resolved Logical Plan解析的逻辑计划 3. Optimizer根据预先定义好的规则(RBO)对 Resolved Logical Plan 进行优化并生成 Optimized Logical Plan优化后的逻辑计划 4. Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan物理计划。然后由CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan最终执行的物理计划 5. Spark运行物理计划先是对物理计划再进行进一步的优化最终映射到RDD的操作上和Spark Core一样以DAG图的方式执行SQL语句。 在最新的Spark3.0版本中还增加了Adaptive Query Execution功能会根据运行时信息动态调整执行计划从而得到更高的执行效率 整体的流程图如下所示 参考SparkSQL运行流程浅析_简述spark sql的工作流程-CSDN博客 21 spark 程序里面的count distinct 具体是如何执行的 一般对count distinct优化就是先group by然后再count变成两个mapreduce过程先去重再count。 spark类似会发生两次shuffle产生3个stage经过4个步骤①先map端去重②然后再shuffle到reduce端去重③然后通过map做一次partial_count④最后shuffle到一个reduce加总。 spark中多维count distinct会发生数据膨胀问题会把所有需要 count distinct 的N个key组合成List行数就翻了N倍这时最好分开来降低单个任务的数据量。 参考大数据SQL COUNT DISTINCT实现原理 - 知乎 22 不想用spark的默认分区怎么办自定义Partitioner 实现里面要求的方法 具体是哪几个方法
abstract class Partitioner extends Serializable {def numPartitions: Intdef getPartition(key: Any): Int
}参考Spark自定义分区器-CSDN博客 23 有这样一个需求统计一个用户的已经曝光了某一个页面想追根溯是从哪几个页面过来的然后求出在这几个来源所占的比例。你要怎么建模处理
面试官的意思是将所有埋点按时间顺序存在一个List 里然后可能需要自定义udf函数更主要的是考虑一些异常情况比如点击流中间是断开的或者点击流不全怎么应对 参考Hive基于SQL创建漏斗模型-CSDN博客 用户行为分析模型实践二—— 漏斗分析模型-腾讯云开发者社区-腾讯云 23 说一下你对元数据的理解哪些数据算是元数据
技术元数据、业务元数据、操作元数据、管理元数据 元数据管理技术架构 参考元数据数据治理的基石-腾讯云开发者社区-腾讯云一文彻底了解元数据管理与架构设计-腾讯云开发者社区-腾讯云 浅谈数仓的元数据管理 - 知乎 数据治理之元数据管理的利器——Atlas入门宝典-腾讯云开发者社区-腾讯云 你真的了解数仓元数据吗数据地图你又知道多少 - 知乎 24 有过数据治理的经验吗 数据治理涉及的方面 基于业务现状面临的问题和挑战来讲。 参考业务数据治理体系化思考与实践 - 美团技术团队 DataMan-美团旅行数据质量监管平台实践 - 美团技术团队 数据治理一体化实践之体系化建模 - 美团技术团队 25 说一下你门公司的数据是怎么分层处理的每一层都解决了什么问题
阿里 数据引入层ODSOperational Data Store又称数据基础层数据公共层CDMCommon Dimensions Model 维度层DIMDimension明细数据层DWDData Warehouse Detail汇总数据层DWSData Warehouse Summary数据应用层ADSApplication Data Store
美团 参考业务数据治理体系化思考与实践 - 美团技术团队 OneData建设探索之路SaaS收银运营数仓建设 - 美团技术团队 什么是数仓分层,各层有哪些用途_智能数据建设与治理 Dataphin-阿里云帮助中心 26 讲一下星型模型和雪花模型的区别以及应用场景
雪花模型去除了冗余设计复杂可读性差关联的维度表多查询效率低但是可扩展性好适合OLAP星型模型冗余度高设计简单可读性高关联的维度表少查询效率高可扩展性低适合OLTP
区别 星型模型和雪花模型最根本的区别就是维度表是直接连接到事实表还是其他的维度表。 1星型模型因为数据的冗余所以很多统计查询不需要做外部的连接因此一般情况下效率比雪花模型要高。 2星型模型不用考虑很多正规化的因素设计和实现都比较简单。 3雪花模型由于去除了冗余有些统计就需要通过表的连接才能产生所以效率不一定有星型模型高。 4正规化也是一种比较复杂的过程相应的数据库结构设计、数据的ETL、以及后期的维护都要复杂一些。因此在冗余可以接受的前提下实际运用中星型模型使用更多也更有效率。
数据仓库更适合使用星型模型来构建底层数据 hive 表通过数据冗余来减少查询次数以提高查询效率。雪花模型在关系型数据库中MySQL/Oracle更加常见。在具体规划设计时应结合具体场景及两者的优缺点来进行设计找到一个平衡点去开展工作。 属性 星型模型 雪花模型 维表 一级维表 多层级维表 数据总量 多 少 数据冗余度 高 低 可读性 高 低 表个数 少 多 表宽度 宽 窄 查询逻辑 简单 复杂 查询性能 高 低 扩展性 差 好 参考 维度建模 -- 星型模型和雪花模型的区别-CSDN博客 一文搞清楚数据仓库模型星型模型和雪花模型的区别 - 简书 三大数据模型星型模型、雪花模型、星座模型-腾讯云开发者社区-腾讯云 星型模型与雪花模型的区别、分别有哪些优缺点_星型模型和雪花模型的区别和使用场景?-CSDN博客