做网站需要多久,重养网站建设,动感技术网站建设,利用虚拟主机建设企业网站实验报告目录 前言广播变量广播变量的作用 广播变量的使用方式 累加器累加器的作用累加器的优缺点累加器的使用方式 PySpark实战笔记系列第四篇
10-用PySpark建立第一个Spark RDD(PySpark实战笔记系列第一篇)11-pyspark的RDD的变换与动作算子总结(PySpark实战笔记系列第二篇))12-pysp… 目录 前言广播变量广播变量的作用 广播变量的使用方式 累加器累加器的作用累加器的优缺点累加器的使用方式 PySpark实战笔记系列第四篇
10-用PySpark建立第一个Spark RDD(PySpark实战笔记系列第一篇)11-pyspark的RDD的变换与动作算子总结(PySpark实战笔记系列第二篇))12-pyspark的RDD算子注意事项总结(PySpark实战笔记系列第三篇)13-pyspark的共享变量用法总结(PySpark实战笔记系列第四篇) 前言 spark提供两种特定的共享方式广播变量 和 累加器。
广播变量 广播变量允许程序缓存一个只读变量在集群的每个机器上。广播变量就是普通变量的一个包装变量。
广播变量的作用 可以用一种更高效的方式来共享一些数据比如一个全局配置文件可以通过广播变量共享给所有节点。 广播变量的使用方式 创建通过调用SparkContext.broadcast()方法来将一个普通变量创建为一个广播变量。 访问通过value方法来访问。 更新通过unpersist()方法声明更新然后修改原始变量的值通过再次广播从而被其他节点获取。 销毁通过destroy()方法可以把广播变量的数据和元数据一起销毁掉销毁后不能再使用。
# 示例
import findspark
findspark.init()
##############################################
from pyspark.sql impot SparkSession
spark SparkSession.builder \.master(local[2]) \.appName(broadcastDemo) \.getOrCreate();
sc spark.SparkContext
##############################################
ip_mes {ip:127.0.0.2,key:password}
# 创建广播变量
brVar sc.broadcast(ip_mes)# 获取广播变量的值
val brVar.value
# {ip:127.0.0.2,key:password}
print(val)
# password
print(val[key])
# 更新广播变量
brVar.unpersist()
ip_mes[key] admin
brVar sc.broadcast(ip_mes) #再次广播
# 获取广播后的变量值
val brVar.value
# {ip:127.0.0.2,key:admin}
print(val)
# 销毁广播变量
brVar.destroy()
##############################################
sc.stop()累加器 除了广播变量进行变数共享外Spark还提供了一种累加器用于在集群中共享数据。。Spark原生支持数值类型的累加器开发人员可以根据自己的需求来支持其他数据类型。
累加器的作用 一个常见的作用是在调试时对作业的执行过程中的相关事件进行计数。
累加器的优缺点 优点能够快速执行操作。 缺点只能利用关联操作做“加”操作的变量。
累加器的使用方式 创建通过SparkContext.accumulator()方法来创建出累加器对象。 访问通过value方法来访问。 更新不同节点上的计算任务都可以利用add方法或者使用**操作**来给累加器加值。 注意事项 累加器是一种只可加的变量对象比如不能执行-操作。使用累加器时为了保证准确性只能使用一次动作操作。如果需要使用多次动作操作则在RDD对象上执行cache或persist操作来切断依赖。 # 示例
import findspark
findspark.init()
##############################################
from pyspark.sql impot SparkSession
spark SparkSession.builder \.master(local[2]) \.appName(broadcastDemo) \.getOrCreate();
sc spark.SparkContext
##############################################
rdd sc.range(1,101)
# 创建累加器初始值0
acc sc.accumulator(0)
def countEnve(x):global accif x%2 0:acc 1 # 累加器更新
rdd_count rdd.map(countEnve)
# 获取累加器值
# 0 因为未执行动作操作即countEnve函数的逻辑还未执行
print(acc.value)保证多次正确获取累加器值否则当我们再次执行rdd_count.count()
累加器会再次执行。
rdd_counter.persist()切断了动作操作的链条因此只会执行一次。rdd_count.persist()
# 100
print(rdd_count.count())
# 50
print(acc.value)# 100
print(rdd_count.count())
# 50
print(acc.value)
##############################################
sc.stop()ps上述示例代码待实际反复运行确认其运行过程。 参考文档
https://spark.apache.org/docs/latest/api/python/reference/pyspark.html《Python大数据处理库PySpark实战》 博主写博文就是方便对自己所学所做的事做一备份记录或回顾总结。欢迎留言沟通学习。 刚开始接触请多指教欢迎留言交流