网站建设的注意,博客营销是一种新兴的网络营销方式,产品推广文章,组织建设方面存在的问题目录
Map算子使用
FlatMap算子使用
Filter算子使用-数据过滤
Distinct算子使用-数据去重
groupBy算子使用-数据分组
sortBy算子使用-数据排序 Map算子使用
# map算子主要使用长场景#xff0c;一个转化rdd中每个元素的数据类型#xff0c;拼接rdd中的元素数据#xf…目录
Map算子使用
FlatMap算子使用
Filter算子使用-数据过滤
Distinct算子使用-数据去重
groupBy算子使用-数据分组
sortBy算子使用-数据排序 Map算子使用
# map算子主要使用长场景一个转化rdd中每个元素的数据类型拼接rdd中的元素数据对rdd中的元素进行需求处理
# 需求处理hdfs中的学生数据单独获取每个学生的信息
from pyspark import SparkContextsc SparkContext()# 1-读取数据
rdd sc.textFile(hdfs://node1:8020/data/student.txt)
# 2- 使用转化算子进行数据处理
# map中的lambda表达式必须定义一个参数用来接收rdd中的元素数据, 注意x参数如何处理要看x接收的数据类型
rdd2 rdd.map(lambda x : x.split(,))
# 3-从rdd2中获取姓名数据
rdd3 rdd2.map(lambda x : x[1])# lambda 函数能进行简单的数据计算如果遇到复杂数据计算时就需要使用自定义函数
# 获取年龄数据并且转化年龄数据为int类型将年龄和性别合并一起保存成元组
## 获取年龄
def func(x):# 1-切割数据data_split x.split(,)# 2-转换数据类型age int(data_split[3])# 3-拼接性别与年龄data_tuple (data_split[2],age)return data_tuple# 将函数的名字传递到map中不要加括号
rdd4 rdd.map(func)# 触发执行算子,查看读取的数据
res rdd.collect()
print(res)res2 rdd2.collect()
print(res2)res3 rdd3.collect()
print(res3)res4 rdd4.collect()
print(res4)FlatMap算子使用
# FlatMap算子使用
# 主要场景是对二维嵌套的数据降维操作 [[1,张三],[2,李四],[3,王五]] --- [1,张三,2,李四,3,王五]
from pyspark import SparkContextsc SparkContext()# 生成的rdd
rdd sc.parallelize([[1, alice, F, 32], [2, Tom, M, 22], [3, lili, F, 18], [4, jerry, M, 24]])# 使用flatmap
rdd1 rdd.flatMap(lambda x: x) # 直接返回x会自动将x中的元素数据取出放入新的rdd中# 查看数据
res rdd1.collect()
print(res)Filter算子使用-数据过滤
# RDD数据过滤
# 需求过滤年龄大于20岁的信息
from pyspark import SparkContext
sc SparkContext()# 1- 读取hdfs中的学生数据
rdd sc.textFile(hdfs://node1:8020/data/student.txt)# 2- 使用转化算子进行数据处理
# map中的lambda表达式必须定义一个参数用来接收rdd中的元素数据, 注意x参数如何处理要看x接收的数据类型
rdd2 rdd.map(lambda x:x.split(,))
# 使用fliter方法进行数据过滤
# lambda x:过滤条件 可以当成 if 操作 if 条件
# 符合条件的数据会返回保存在新的rdd中
rdd3 rdd2.filter(lambda x :int(x[3]) 20)# 查看数据
res rdd2.collect()
print(res)res3 rdd3.collect()
print(res3) Distinct算子使用-数据去重
# distinct 去重算子
# rdd中有重复数据时可以进行去重
from pyspark import SparkContext
sc SparkContext()# 1- 读取hdfs中的学生数据
rdd sc.textFile(hdfs://node1:8020/data/student.txt)# 2- 使用转化算子进行数据处理
# map中的lambda表达式必须定义一个参数用来接收rdd中的元素数据, 注意x参数如何处理要看x接收的数据类型
rdd2 rdd.map(lambda x:x.split(,))# 3-从rdd2中获取性别数据
rdd3 rdd2.map(lambda x : x[2])# 对rdd3中重复数据去重
rdd4 rdd3.distinct()# 查看数据
res rdd3.collect()
print(res)res1 rdd4.collect()
print(res1) groupBy算子使用-数据分组
from pyspark import SparkContext
sc SparkContext()# 1- 读取hdfs中的学生数据
rdd sc.textFile(hdfs://node1:8020/data/student.txt)# 2- 使用转化算子进行数据处理
# map中的lambda表达式必须定义一个参数用来接收rdd中的元素数据, 注意x参数如何处理要看x接收的数据类型
rdd2 rdd.map(lambda x:x.split(,))# 3-对性别进行分组
# lambda x: hash取余的计算 hash(数据)%分组数 余数相同的数据会放在一起
rdd3 rdd.groupBy(lambda x:hash(x[2]) % 2)
# 查看分组的数据内容 mapValues 取出分组后的数据值对数据值转为列表即可
rdd4 rdd3.mapValues(lambda x:list(x))# 查看数据
res2 rdd2.collect()
print(res2)res3 rdd3.collect()
print(res3)res4 rdd4.collect()
print(res4) 分组算子用到了哈希算法,lambda x: hash取余的计算 hash(数据)%分组数 余数相同的数据会放在一起 rdd3 rdd.groupBy(lambda x:hash(x[2]) % 2) sortBy算子使用-数据排序
# RDD的数据排序
from pyspark import SparkContextsc SparkContext()# 创建数据
# 非kv数据
rdd sc.parallelize([10,45,27,18,5,29])# 在spark中可以使用元组表示kv数据k,v
rdd2 sc.parallelize([(张三,27),(李四,18),(王五,31),(赵六,21)])rdd1 sc.parallelize([(666,火眼金睛),(2000,筋斗云),(888,顺风耳),(1314,降龙十八掌)])# 数据排序
# 非kv数据
rdd3 rdd.sortBy(lambda x: x) # 默认升序,从小到大排
rdd4 rdd.sortBy(lambda x: x,ascendingFalse) # 降序# kv数据排序 x接收(k,v)数据 需要指定采用哪个值进行排序
# 根据v值进行排序
rdd5 rdd2.sortBy(lambda x: x[1])
rdd6 rdd2.sortBy(lambda x: x[1],ascendingFalse)# 根据k值进行排序
rdd7 rdd1.sortBy(lambda x: x[0])
rdd8 rdd1.sortBy(lambda x: x[0],ascendingFalse)# 查看结果
# 非kv数据
res1 rdd3.collect()
res2 rdd4.collect()
print(res1)
print(res2)# kv数据排序
res5 rdd5.collect()
res6 rdd6.collect()
print(res5)
print(res6)res7 rdd7.collect()
res8 rdd8.collect()
print(res7)
print(res8) join算子使用-数据关联 准备数据,模拟表关联 students.txt students2.txt from pyspark import SparkContext
# rdd也是使用join算子进行kv数据关联 如果需要将多个rdd数据关联在一起
# 需要现将rdd的数据转为kv结构关联的字段数据作为key
sc SparkContext()
# 分别读取两个文件数据
rdd1 sc.textFile(hdfs://node1:8020/data/students.txt)
rdd2 sc.textFile(hdfs://node1:8020/data/students2.txt)# 切割行数
rdd_line1 rdd1.map(lambda x:x.split(,))
rdd_line2 rdd2.map(lambda x:x.split(,))# 将rdd数据进行关联
# 将关联的数据转为kv结构
rdd_kv1 rdd_line1.map(lambda x:(x[0],x))
rdd_kv2 rdd_line2.map(lambda x:(x[0],x))# 使用join关联
rdd_join rdd_kv1.join(rdd_kv2) # 内关联
rdd_leftjoin rdd_kv1.leftOuterJoin(rdd_kv2) # 左关联
rdd_rightjoin rdd_kv1.rightOuterJoin(rdd_kv2) # 右关联# 查看数据res3 rdd_join.sortBy(lambda x:x[0]).collect() # 找相同数据
print(res3)res4 rdd_leftjoin.collect() # 左表数据全部展示右边右相同数据展示没有相同数据为空None
print(res4)res5 rdd_rightjoin.collect() # 右表数据全部展示左边右相同数据展示没有相同数据为空None
print(res5) join内关联:只有共同的才展示 leftOuterJoin左关联:左表数据全部展示右边右相同数据展示没有相同数据为空None rightOuterJoin右关联:右表数据全部展示左边右相同数据展示没有相同数据为空None