网站注册怎么做,wordpress文章输出数,网站推广的软文,wordpress云音乐插件2 数据清洗、转换
此实验使用S3作为数据源 ETL: E extract 输入 T transform 转换 L load 输出 大纲 2 数据清洗、转换2.1 架构图2.2 数据清洗2.3 编辑脚本2.3.1 连接数据源#xff08;s3#xff09;2.3.2. 数据结构转换2.3.2 数据结构拆分…2 数据清洗、转换
此实验使用S3作为数据源 ETL: E extract 输入 T transform 转换 L load 输出 大纲 2 数据清洗、转换2.1 架构图2.2 数据清洗2.3 编辑脚本2.3.1 连接数据源s32.3.2. 数据结构转换2.3.2 数据结构拆分、定义2.3.3 清洗后的数据写入新s32.3.4 运行作业 2.4 数据分区2.4.1 编辑脚本2.4.2 运行脚本 2.5 总结 2.1 架构图 2.2 数据清洗
此步会将S3中的原始数据清洗成我们想要的自定义结构的数据。之后我们可通过APIGatewayLambdaAthena来实现一个无服务器的数据分析服务。
步骤图例1、入口2、创建Jobs3作为数据源则Type选择Spark若为Kinesis等选择Stream Spark3、IAM角色需要有s3与Glue的权限4、选择s3脚本位置,若已经完成脚本的编写工作则可以选择第二项或第三项若无则Glue会提供默认脚本5、安全配置参数建议添加参数–enable-auto-scaling为true。每次在我们执行Job任务时会根据运行 ETL 任务的数据处理单元DPU的个数来分配动态IP在我们子网的动态IP数低于DPU数时Job将会执行失败。此参数将会动态分配IP。6、数据源7、数据目标我们会将清洗后的数据存储到新的s3桶8、设计架构在本案例中我们会自定义脚本。所以不再在此处设计架构此处设计后脚本会自动生成相关代码9、保存
2.3 编辑脚本 脚本中的args参数的键值需要从Job的安全配置参数中定义
2.3.1 连接数据源s3
#数据源
datasource glueContext.create_dynamic_frame.from_catalog(database args[db_name], table_name tableName, transformation_ctx datasource)2.3.2. 数据结构转换
mapped_readings ApplyMapping.apply(frame datasource, mappings [(lclid, string, meter_id, string), \(datetime, string, reading_time, string), \(KWH/hh (per half hour), double, reading_value, double)], \transformation_ctx mapped_readings)2.3.2 数据结构拆分、定义
mapped_readings_df DynamicFrame.toDF(mapped_readings)mapped_readings_df mapped_readings_df.withColumn(obis_code, lit())
mapped_readings_df mapped_readings_df.withColumn(reading_type, lit(INT))reading_time to_timestamp(col(reading_time), yyyy-MM-dd HH:mm:ss)
mapped_readings_df mapped_readings_df \.withColumn(week_of_year, weekofyear(reading_time)) \.withColumn(date_str, regexp_replace(col(reading_time).substr(1,10), -, )) \.withColumn(day_of_month, dayofmonth(reading_time)) \.withColumn(month, month(reading_time)) \.withColumn(year, year(reading_time)) \.withColumn(hour, hour(reading_time)) \.withColumn(minute, minute(reading_time)) \.withColumn(reading_date_time, reading_time) \.drop(reading_time)2.3.3 清洗后的数据写入新s3
# write data to S3
filteredMeterReads DynamicFrame.fromDF(mapped_readings_df, glueContext, filteredMeterReads)s3_clean_path s3:// args[clean_data_bucket]glueContext.write_dynamic_frame.from_options(frame filteredMeterReads,connection_type s3,connection_options {path: s3_clean_path},format parquet,transformation_ctx s3CleanDatasink)2.3.4 运行作业 执行成功后状态将变为SUCCESS失败将会给出失败信息可在CloudWatch 中查看详情 清洗后的数据保存到了s3 数据清洗完毕后可通过上一篇中的爬网程序步骤将清洗后的数据的结构创建表到数据目录中 此时我们可以使用Athena对清洗后的数据进行分析。
2.4 数据分区
接下来我们对数据进行分区处理此处只提供了按天分区 重新进行数据清洗中的创建Job操作后重写脚本
2.4.1 编辑脚本
连接数据源。表为上一步最后重新爬取生成的新表。
cleanedMeterDataSource glueContext.create_dynamic_frame.from_catalog(database args[db_name], table_name tableName, transformation_ctx cleanedMeterDataSource)根据type与data_str分区
business_zone_bucket_path_daily s3://{}/daily.format(args[business_zone_bucket])businessZone glueContext.write_dynamic_frame.from_options(frame cleanedMeterDataSource, \connection_type s3, \connection_options {path: business_zone_bucket_path_daily, partitionKeys: [reading_type, date_str]},\format parquet, \transformation_ctx businessZone)2.4.2 运行脚本
分区后的数据结果 再次创建、运行爬网程序将会在数据目录中生成新的分区表。
2.5 总结
到这一步我们已经使用Glue ETL对s3桶中的数据进行了清洗、分区操作。在进行上篇中的Athena操作后我们已经可以通过Athena直接查询到清洗、分区后的数据集了。 接下来我们会通过使用APIGatewayLambdaAthena来构建一个无服务器的数据查询分析服务。