社交网站建站,哪里可以接网站开发的活,网站建设费用IP,网站备案组织机构代码随着互联网信息呈爆炸式增长#xff0c;爬虫技术被广泛用于从海量网页中抓取有价值的数据。然而#xff0c;爬取到的数据往往存在格式不规范、重复、噪声等诸多问题#xff0c;需要高效的数据清洗流程来保障数据质量#xff0c;Spark 在其中发挥了关键作用。
什么是Spark …随着互联网信息呈爆炸式增长爬虫技术被广泛用于从海量网页中抓取有价值的数据。然而爬取到的数据往往存在格式不规范、重复、噪声等诸多问题需要高效的数据清洗流程来保障数据质量Spark 在其中发挥了关键作用。
什么是Spark
Spark 是当今大数据领域最活跃、最热门、最高效的大数据通用计算平台之一。
Spark 是为大规模数据处理而设计的分布式计算框架旨在处理海量数据的存储和分析任务。它可以在集群环境中运行将计算任务分布到多个节点上利用集群的并行处理能力来加速数据处理过程。提供了基础的弹性分布式数据集RDD抽象是 Spark 的核心部分可进行通用的分布式数据处理操作。
Spark的优点
快:与Hadoop的MapReduce相比Spark基于内存的运算要快100倍以上而基于磁盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎可以通过基于内存来高效地处理数据流。易用:Spark支持Java、Python和Scala的API还支持超过80种高级算法使用户可以快速构建不同应用。而且Spark支持交互式的Python和Scala的Shell这意味着可以非常方便的在这些Shell中使用Spark集群来验证解决问题的方法而不是像以前一样需要打包、上传集群、验证等。这对于原型开发非常重要。通用性Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询通用Spark SQL、实时流处理通过Spark Streaming、机器学习通过Spark MLlib和图计算通过Spark GraphX。这些不同类型的处理都可以在同一应用中无缝使用。Spark统一的解决方案非常具有吸引力毕竟任何公司都想用统一的平台处理问题减少开发和维护的人力成本和部署平台的物理成本。当然还有作为统一的解决方案Spark并没有以牺牲性能为代价。相反在性能方面Spark具有巨大优势。可融合性Spark非常方便的与其他开源产品进行融合。比如Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器并且可以处理所有Hadoop支持的数据包括HDFS、HBase和Cassanda等。这对于已部署Hadoop集群的用户特别重要因为不需要做任何数据迁移就可以使用Spark强大的处理能力。Spark也可以不依赖第三方的资源管理器和调度器它实现了Standalone作为其内置资源管理器和调度框架这样进一步降低了Spark的使用门槛使得所有人可以非常容易地部署和使用Spark。此外Spark还提供了在EC2上部署Standalone的Spark集群的工具。
Spark的使用
Spark大至使用流程 要先将数据进行存放在一个txt文本文件当中使用Spark进行读取txt中的文本数据进行数据处理将清洗后的数据转存到原来的txt文件当中想要存放到数据库当中则将txt文本文件中的数据再次读出出来存放进去即可
Spark的maven依赖
Spark想要在Springboot项目中使用要引入相应的maven依赖
dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion3.4.1/version
/dependency
dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion3.4.1/version
/dependency
爬取出数据的存储 本项目进行爬取的是求职网站会将求职岗位的信息放入到一个List集合当中通过遍历这个List集合将数据存放到txt文本中 /*** param jobs 爬取下来的数据集合* param filePath 你要存放数据的地址*/public static void writeDataToTxt(ListJob jobs, String filePath) {//进行检查文件是否存在 若不存在则进行创建一个路径为filePath的文件File file new File(filePath);if (!file.exists()) {try {file.createNewFile();} catch (IOException e) {throw new RuntimeException(e);}}try (BufferedWriter writer new BufferedWriter(new FileWriter(filePath))) {// 将数据写入文件for(Job job : jobs) {//这里因为直接进行放入到txt文本文件难以进行格式处理 我们使用手动进行拼接操作String j job.getJobName(),job.getCompany(),job.getSalary(),job.getExperience(),job.getEducational(),job.getCity(),job.getCompanyScale(),job.getCompanyStatus();writer.write(j);// 写入换行符writer.newLine();}} catch (IOException e) {// 处理可能出现的异常System.err.println(写入文件时出现错误: e.getMessage());}}
Spark的数据清洗
本项目中的Spark进行数据清洗是通过JavaRDDE
JavaRDD 是 Spark 为 Java 开发者提供的弹性分布式数据集Resilient Distributed DatasetRDD接口。从本质上讲它是一个分布式的集合其中的数据被划分成多个分区Partitions这些分区可以分布在集群中的不同节点上进行并行处理。这种分布式的特性使得 JavaRDD 能够高效地处理大规模的数据。
JavaRDD 是 Spark Core 的重要组成部分是构建其他高级功能如 Spark SQL、MLlib 和 Spark Streaming的基础。在 Spark SQL 中数据通常被封装成 DataFrame 或 Dataset但这些高级数据结构的底层实现往往也依赖于 RDD 的基本概念和操作。
JavaRDD 可以存储各种类型的数据包括 Java 基本数据类型如int、double、boolean等和自定义的 Java 对象。
使用JavaRDD 的filter方法和map方法进行对数据的处理
filter 方法是用来对流中的元素进行筛选的它的返回值应该是一个 boolean 类型的表达式用来判断该元素是否应该保留在流中 map 方法是一种转换操作它对 JavaRDD 中的每个元素应用一个函数并返回一个新的 JavaRDD /*** param path 要进行清洗的txt文件路径*/public static void DataClening(String path){SparkSession spark SparkSession.builder()//将 SparkContext、SQLContext 和 HiveContext 等功能集成在一起.appName(Java Spark)//为 Spark 应用程序设置一个名称 名称会显示在 Spark 集群的监控界面上有助于识别和管理应用程序.master(local[*])//表示在本地运行 Spark.getOrCreate();//尝试获取现有的 SparkSession 实例如果不存在则创建一个新的。这样可以确保在同一个 JVM 中不会创建多个 SparkSession避免资源浪费JavaSparkContext sc JavaSparkContext.fromSparkContext(spark.sparkContext());//读取数据源到RDD中JavaRDDString rdd sc.textFile(path);//对RDD中的数据进行清洗处理JavaRDDString cleanedData cleanData(rdd);// 打印清洗后的数据可以根据需要保存到文件或进行其他操作 此处不要省略 不然会报错cleanedData.collect().forEach(System.out::println);//进行检查文件是否存在File file new File(path);if (file.exists()) {file.delete();}try (BufferedWriter writer new BufferedWriter(new FileWriter(path))) {// 将数据写入文件for(String job : cleanedData.collect()) {writer.write(job);// 写入换行符writer.newLine();}} catch (IOException e) {// 处理可能出现的异常System.err.println(清洗数据后写入文件时出现错误: e.getMessage());}// 停止Spark上下文sc.stop();}/*** 对数据进行清洗操作* param rawData 需要进行清洗的JavaRDD对象* return 返回进行清洗的JavaRDD对象*/private static JavaRDDString cleanData(JavaRDDString rawData) {return rawData//filter 方法是用来对流中的元素进行筛选的它的返回值应该是一个 boolean 类型的表达式用来判断该元素是否应该保留在流中// 移除空行.filter(line - !line.trim().isEmpty())// 例如移除含有特定字符如null的行//.filter(line - !line.contains(null))// map 是一种转换操作它对 JavaRDD 中的每个元素应用一个函数并返回一个新的 JavaRDD// 可能需要根据分隔符例如逗号拆分字段并重新格式化.map(line - {String[] fields line.replace(null, 暂无数据信息).split(,);//进行数据检查 确保这工作名称等8项信息完整 不完整直接进行设置为空if (fields.length 8) {//将fields字符串数组中的信息进行拼接起来 这里仅做简单拼接为清洗后的格式实际清洗可以根据需求更复杂return String.join(,, fields);} else {return ; //将不满足8项信息的数据进行设置为空处理}})// 去除清洗后的空行或不需要的数据.filter(line - !line.trim().isEmpty())//去重.distinct();}
清洗后数据存放
此处会将txt文件中的数据进行取出放入到一个List集合当中想要放入到数据库中只需要进行遍历这个集合即可 /*** * param path 将指定位置的txt文件中的信息 放入到List集合中 便于插入到数据库中* return 返回文本中的·数据信息*/public static ListJob getJobbyTxtFile(String path){ListJob jobs new ArrayList();try (BufferedReader reader new BufferedReader(new FileReader(path))) {String line;while ((line reader.readLine())! null) {jobs.add(toJob(line));}} catch (IOException e) {e.printStackTrace();}return jobs;}public static Job toJob(String job){String[] jobsContext job.split(,);Job j new Job();j.setJobName(jobsContext[0]);j.setCompany(jobsContext[1]);j.setSalary(jobsContext[2]);j.setExperience(jobsContext[3]);j.setEducational(jobsContext[4]);j.setCity(jobsContext[5]);j.setCompanyScale(jobsContext[6]);j.setCompanyStatus(jobsContext[7]);return j;}