网站标签怎么做跳转页面,html网站开头怎么做的,怎样进行关键词推广,投资公司的经营范围有哪些文章目录 Spark On Hive 详解一、项目配置1. 创建工程2. 配置文件3. 工程目录 二、代码实现2.1 Class SparkFactory2.2 Object SparkFactory Spark On Hive 详解
本文基于Spark重构基于Hive的电商数据分析的项目需求#xff0c;在重构的同时对Spark On Hive的全流程进行详细的… 文章目录 Spark On Hive 详解一、项目配置1. 创建工程2. 配置文件3. 工程目录 二、代码实现2.1 Class SparkFactory2.2 Object SparkFactory Spark On Hive 详解
本文基于Spark重构基于Hive的电商数据分析的项目需求在重构的同时对Spark On Hive的全流程进行详细的讲解。
一、项目配置
1. 创建工程
首先创建一个空的Maven工程在创建之后我们需要检查一系列配置以保证JDK版本的一致性。同时我们需要创建出Scala的编码环境。具体可参考以下文章: Maven工程配置与常见问题解决指南 和 Scala01 —— Scala基础
2. 配置文件
2.1 在Spark On Hive的项目中我们需要有两个核心配置文件。
pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.ybg/groupIdartifactIdwarehouse_ebs_2/artifactIdversion1.0/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingspark.version3.1.2/spark.versionspark.scala.version2.12/spark.scala.versionhadoop.version3.1.3/hadoop.versionmysql.version8.0.33/mysql.versionhive.version3.1.2/hive.versionhbase.version2.3.5/hbase.versionjackson.version2.10.0/jackson.version/propertiesdependencies!-- spark-core --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_${spark.scala.version}/artifactIdversion${spark.version}/version/dependency!-- spark-sql --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_${spark.scala.version}/artifactIdversion${spark.version}/version/dependency!-- spark-hive --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_${spark.scala.version}/artifactIdversion${spark.version}/version/dependency!-- hadoop-common --dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion${hadoop.version}/version/dependency!-- mysql --dependencygroupIdcom.mysql/groupIdartifactIdmysql-connector-j/artifactIdversion${mysql.version}/version/dependency!-- hive-exec --dependencygroupIdorg.apache.hive/groupIdartifactIdhive-exec/artifactIdversion${hive.version}/versionexclusionsexclusiongroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-slf4j-impl/artifactId/exclusion/exclusions/dependency!-- HBase 驱动 --dependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-client/artifactIdversion${hbase.version}/version/dependency!-- jackson-databind --dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-core/artifactIdversion${jackson.version}/version/dependency!-- jackson-databind --dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactIdversion${jackson.version}/version/dependency/dependencies/projectlog4j.properties log4j.properties 文件的主要作用是配置日志系统的行为包括控制日志信息的输出和实现滚动事件日志。
log4j.rootLoggerERROR, stdout, logfile
log4j.appender.stdoutorg.apache.log4j.ConsoleAppender
log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout
----------------------- 滚动事件日志代码 -----------------------
log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n
log4j.appender.logfileorg.apache.log4j.DailyRollingFileAppender
log4j.appender.logfile.DatePattern.yyyy-MM-dd
log4j.appender.logfile.appendtrue
---------------------------------------------------------------
log4j.appender.logfile.Filelog/spark_first.log
log4j.appender.logfile.layoutorg.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern%d %p [%c] - %m%n2.2 组件核心配置文件 在工程的resources目录下需要存放在虚拟机中大数据服务的核心组件的配置文件以便于Spark On Hive中调用大数据组件服务能够正常进行。
3. 工程目录
二、代码实现 创建数据校验方法 check 用于确保配置项的值有效。 检查值是否为 null。 对字符串类型的值进行非空和正则表达式匹配校验。 创建配置设置方法 set 先校验配置项名称和值的有效性。 使用 SparkConf.set 方法设置有效的配置项和值。 单例对象 SparkFactory 提供基础配置方法如设置应用名称、主节点等。 提供 baseConfig 方法集中进行基础配置。 提供 end 方法返回配置好的 SparkSession 实例。 在 SparkFactory 类中实现上述方法 定义 build 方法返回包含 check 和 set 方法的 Builder 对象。 在 Builder 对象中实现各种配置方法每个方法都调用 set 方法。 使用 SparkSession.builder() 方法在 end 方法中创建并返回 SparkSession 实例。
SparkFactory配置表如下 配置表
2.1 Class SparkFactory
作用SparkFactory类的作用是能够工厂化地创建和配置SparkSession实例通过一系列的set和check方法来确保配置项的有效性和正确性并最终生成一个配置好的SparkSession实例。注意我们需要在Spark官网配置页获取所有配置项的标准名称。代码
class SparkFactory {def build():Builder{new Builder {val conf new SparkConf()/*** 数据校验* param title 校验主题* param value 待校验的值* param regex 若待校验值为字符串且有特定的规则那么提供正则表达式进一步验证格式*/private def check(title:String,value:Any,regex:Stringnull){if (null value) {throw new RuntimeException()(svalue for $title null pointer exception)}if(value.isInstanceOf[String]){if(value.toString.isEmpty){throw new RuntimeException(svalue for $title empty string exception)}if(regex!null){if(!value.toString.matches(regex)){throw new RuntimeException(s$title is not match regex $regex)}}}}/*** 先检查配置项名称是否正确* 再检查配置项的值是否正确* param item 配置项名称* param value 配置项值* param regexValue 配置项正则规则*/private def set(item:String,value:String,regexValue:Stringnull){check(name_of_config_item,item,^spark\\..*)check(item,value,regexValue)conf.set(item,value)}// Baseprivate def setBaseAppName(appName:String){set(spark.app.name,appName,^\\w$)}private def setBaseMaster(master:String){set(spark.master,master,yarn|spark://([a-z]\\w|\\d{1,3}(\\.\\d{1,3}){3}):\\d{4,5}|local(\\[\\*|[1-9][0-9]*]))}private def setBaseDeployMode(deployMode:String){set(spark.submit.deployMode,deployMode,client|cluster)}private def setBaseEventLogEnabled(eventLogEnabled:Boolean){set(spark.eventLog.enabled,s$eventLogEnabled)}override def baseConfig(appName: String, master: String local[*], deployMode: String client, eventLogEnabled: Boolean false): Builder {setBaseAppName(appName)setBaseMaster(master)setBaseDeployMode(deployMode)setBaseEventLogEnabled(eventLogEnabled)this}// Driverprivate def setDriverMemory(memoryGB:Int){set(spark.driver.memory,s${memoryGB}g,[1-9]\\d*)}private def setDriverCoreNum(coreNum: Int) {set(spark.driver.cores, s${coreNum}g, [1-9]\\d*)}private def setDriverMaxResultGB(maxRstGB:Int){set(spark.driver.maxResultSize,s${maxRstGB}g,[1-9]\\d*)}private def setDriverHost(driverHost:String){set(spark.submit.deployMode,driverHost,localhost|[a-z]\\w)}override def optimizeDriver(memoryGB: Int 2, coreNum: Int 1, maxRstGB: Int 1, driverHost: String localhost): Builder {setDriverCoreNum(coreNum)setDriverMemory(memoryGB)/*** 每一个Spark行动算子触发的所有分区序列化结果大小上限*/setDriverMaxResultGB(maxRstGB)/*** Standalone 模式需要设置 DriverHost便于 executor 与 master 通信*/if (conf.get(spark.master).startsWith(spark://)) {setDriverHost(driverHost)}setDriverHost(driverHost)this}// Executorprivate def setExecutorMemory(memoryGB: Int) {set(spark.executor.memory, s${memoryGB}g, [1-9]\\d*)}private def setExecutorCoreNum(coreNum: Int) {set(spark.executor.cores, s$coreNum, [1-9]\\d*)}override def optimizeExecutor(memoryGB:Int1,coreNum:Int1):Builder{setExecutorMemory(memoryGB)/*** Yarn模式下只能由1个核* 其他模式下核数为所有可用的核*/if(!conf.get(spark.master).equals(yarn)){setExecutorCoreNum(coreNum)}this}// Limitprivate def setLimitMaxCores(maxCores:Int){set(spark.cores.max,s$maxCores,[1-9]\\d*)}private def setLimitMaxTaskFailure(maxTaskFailure:Int){set(spark.task.maxFailures,s$maxTaskFailure,[1-9]\\d*)}private def setLimitMaxLocalWaitS(maxLocalWaitS:Int){set(spark.locality.wait,s${maxLocalWaitS}s,[1-9]\\d*)}override def optimizeLimit(maxCores:Int4,maxTaskFailure:Int3,maxLocalWaitS:Int3):Builder{if (conf.get(spark.master).startsWith(spark://)) {setLimitMaxCores(maxCores)}/*** 单个任务允许失败最大次数超出会杀死本次任务*/setLimitMaxTaskFailure(maxTaskFailure)/*** 数据本地化读取加载的最大等待时间* 大任务建议适当增加此值*/setLimitMaxLocalWaitS(maxLocalWaitS)this}// Serializeroverride def optimizeSerializer(serde:Stringorg.apache.spark.serializer.JavaSerializer,clas:Array[Class[_]]null):Builder{/*** 设置将需要通过网络发送或快速缓存的对象序列化工具类* 默认为JavaSerializer* 为了提速推荐设置为KryoSerializer* 若采用 KryoSerializer需要将所有自定义的实体类(样例类)注册到配置中心*/set(spark.serializer,serde,([a-z]\\.)[A-Z]\\w*)if(serde.equals(org.apache.spark.serializer.KryoSerializer)){conf.registerKryoClasses(clas)}this}// Netprivate def setNetTimeout(netTimeoutS:Int){set(spark.cores.max,s${netTimeoutS}s,[1-9]\\d*)}private def setNetSchedulerMode(schedulerMode:String){set(spark.scheduler.mode,schedulerMode,FAIR|FIFO)}override def optimizeNetAbout(netTimeOusS:Int120,schedulerMode:StringFAIR):Builder{/*** 所有和网络交互相关的超时阈值*/setNetTimeout(netTimeOusS)/*** 多人工作模式下建议设置为FAIR*/setNetSchedulerMode(schedulerMode)this}// Dynamicprivate def setDynamicEnabled(dynamicEnabled:Boolean){set(spark.dynamicAllocation.enabled,s$dynamicEnabled)}private def setDynamicInitialExecutors(initialExecutors:Int){set(spark.dynamicAllocation.initialExecutors,s$initialExecutors,[1-9]\\d*)}private def setDynamicMinExecutors(minExecutors:Int){set(spark.dynamicAllocation.minExecutors,s$minExecutors,[1-9]\\d*)}private def setDynamicMaxExecutors(maxExecutors:Int){set(spark.dynamicAllocation.maxExecutors,s$maxExecutors,[1-9]\\d*)}override def optimizeDynamicAllocation(dynamicEnabled:Booleanfalse,initialExecutors:Int3,minExecutors:Int0,maxExecutors:Int6):Builder{/*** 根据应用的工作需求动态分配executor*/setDynamicEnabled(dynamicEnabled)if(dynamicEnabled){setDynamicInitialExecutors(initialExecutors)setDynamicMinExecutors(minExecutors)setDynamicMaxExecutors(maxExecutors)}this}override def optimizeShuffle(parallelism:Int3,shuffleCompressEnabled:Booleanfalse,maxSizeMB:Int128,shuffleServiceEnabled:Booleantrue):Builder{null}override def optimizeSpeculation(enabled:Booleanfalse,interval:Int15,quantile:Float0.75F):Builder{null}override def warehouseDir(hdfs:String):Builder{null}override def end():SparkSession{SparkSession.builder().getOrCreate()}}}
}2.2 Object SparkFactory
object SparkFactory {trait Builder{// 默认值能给就给/*** 基本配置* param appName* param master 默认是本地方式* param deployMode 默认是集群模式* param eventLogEnabled 生产环境打开测试环境关闭* return*/def baseConfig(appName:String,master:Stringlocal[*],deployMode:Stringclient,eventLogEnabled:Booleanfalse):Builder/*** 驱动端优化配置* param memoryGB 驱动程序的内存大小* param coreNum 驱动程序的核数* param maxRstGB 驱动程序的最大结果大小* param driverHost 驱动程序的主机地址驱动程序会在主机地址上运行并且集群中的其他节点会通过这个地址与驱动程序通信* return*/def optimizeDriver(memoryGB:Int2,coreNum:Int1,maxRstGB:Int1,driverHost:Stringlocalhost):Builderdef optimizeExecutor(memoryGB:Int1,coreNum:Int1):Builder/*** 整体限制配置* param maxCores 整体可用的最大核数* param maxTaskFailure 单个任务失败的最大次数* param maxLocalWaitS 容错机制数据读取阶段允许等待的最长时间超过时间切换到其他副本。* return*/def optimizeLimit(maxCores:Int4,maxTaskFailure:Int,maxLocalWaitS:Int30):Builder/*** 默认使用Java序列化* 推荐使用Kryo序列化 提速或对速度又要i去* 所有的自定义类型都要注册到Spark中才能完成序列化。* param serde 全包路径* param classes 自定义类型默认认为不需要指定Class[_]表示类型未知。* return Builder*/def optimizeSerializer(serde:Stringorg.apache.spark.serializer.JavaSerializer,clas:Array[Class[_]]null):Builder/*** 在Spark的官方配置中netTimeOutS可能被很多超时的数据调用。* param netTimeOusS 判定网络超时的时间* param schedulerMode 可能很多任务一起跑因此公平调度* return*/def optimizeNetAbout(netTimeOusS:Int180,schedulerMode:StringFAIR):Builder/*** 动态分配-按需分配* 类似于配置线程池中的最大闲置线程数根据需要去做动态分配* param dynamicEnabled 是否开启动态分配* param initialExecutors 初始启用的Executors的数量* param minExecutors 最小启用的Executors的数量* param maxExecutors 最大启用的Executors的数量* return*/def optimizeDynamicAllocation(dynamicEnabled:Booleanfalse,initialExecutors:Int3,minExecutors:Int0,maxExecutors:Int6):Builder/*** 特指在没有指定分区数时对分区数的配置。* 并行度和初始启用的Executors的数量一致避免额外开销。** param parallelism* param shuffleCompressEnabled* param maxSizeMB* param shuffleServiceEnabled* return*/def optimizeShuffle(parallelism:Int3,shuffleCompressEnabled:Booleanfalse,maxSizeMB:Int128,shuffleServiceEnabled:Booleantrue):Builder/*** 推测执行将运行时间长的任务放到队列中等待运行时间短的任务运行完成后再运行。* param enabled* param interval Spark检查任务执行时间的时间间隔单位是秒。* param quantile 如果某个任务的执行时间超过指定分位数如75%的任务执行时间则认为该任务执行时间过长需要启动推测执行。*/def optimizeSpeculation(enabled:Booleanfalse,interval:Int15,quantile:Float0.75F):Builderdef warehouseDir(hdfs:String):Builderdef end():SparkSession}
}