手机网站怎么备案,百度推广网站吸引力,东莞企石网站设计,广告联盟广告点击一次多少钱文章目录 Structured Steaming一、结构化流介绍#xff08;了解#xff09;1、有界和无界数据2、基本介绍3、使用三大步骤(掌握)4.回顾sparkSQL的词频统计案例 二、结构化流的编程模型#xff08;掌握#xff09;1、数据结构2、读取数据源2.1 File Source2.2 Socket Source… 文章目录 Structured Steaming一、结构化流介绍了解1、有界和无界数据2、基本介绍3、使用三大步骤(掌握)4.回顾sparkSQL的词频统计案例 二、结构化流的编程模型掌握1、数据结构2、读取数据源2.1 File Source2.2 Socket Source2.3 Rate Source 3、数据处理4、数据输出4.1 输出模式4.1.1 append 模式4.1.2 complete模式4.1.3 update模式 4.2 输出终端/位置 5、综合案例(练习)词频统计_读取文件方式词频统计_Socket方式自动生成数据_Rate方式 6、设置触发器Trigger7、CheckPoint检查点目录设置  JSON是什么?三、Spark 和 Kafka 整合掌握0、整合Kafka准备工作1.spark和kafka集成1.1 官网文档链接:1.2 常见选项:1.3 常见参数 2、从kafka中读取数据2.1 流式处理官方示例:练习示例 2.2 批处理官方示例:演示示例  3、数据写入Kafka中3.1 流式处理官方示例:练习示例 3.2 批处理官方示例:演示示例    01_回顾sparkSQL词频统计过程.py02_结构化流词频统计案例_读取文件方式.py03_结构化流词频统计案例_socket方式.py04_结构化流词频统计案例_设置触发器和检查点.py05_流方式读取kafka数据.py06_流方式写数据到kafka.py Structured Steaming 简单来说Structured Streaming是Spark提供的一种流处理引擎就像是“实时数据处理的流水线”能够以批处理的方式处理实时数据流。  具体而言 核心概念 流式DataFrame将实时数据流视为一个无限扩展的DataFrame支持类似批处理的API。触发器控制流处理的时间间隔如每1秒处理一次数据。输出模式支持多种输出模式如append追加、update更新和complete完整输出。 特点 易用性提供与Spark SQL一致的API降低学习成本。容错性通过检查点机制Checkpoint确保数据处理的可靠性。扩展性支持从Kafka、文件系统等多种数据源读取数据并输出到多种目标系统。   实际生产场景 在实时监控中使用Structured Streaming处理传感器数据实时生成报警。在用户行为分析中使用Structured Streaming处理点击流数据实时更新用户画像。  总之Structured Streaming通过易用的API和强大的容错机制为实时数据处理提供了高效、可靠的解决方案广泛应用于实时监控、用户行为分析等场景。  一、结构化流介绍了解 
1、有界和无界数据 简单来说有界数据就像是“有限的书本”数据量固定且已知无界数据则像是“无限的河流”数据持续生成且量未知。  具体而言 有界数据 定义数据量固定且已知处理完成后任务结束。示例存储在文件或数据库中的历史数据。处理方式适合批处理Batch Processing如使用Spark的RDD或DataFrame处理。 无界数据 定义数据持续生成且量未知处理任务通常不会结束。示例实时日志流、传感器数据、用户点击流。处理方式适合流处理Stream Processing如使用Spark的Structured Streaming或Flink处理。   实际生产场景 在历史数据分析中使用有界数据进行批处理生成报表和洞察。在实时监控中使用无界数据进行流处理实时生成报警和推荐。  总之有界数据和无界数据分别适合批处理和流处理根据数据特点选择合适的处理方式能够高效地完成数据分析和处理任务。  有界数据: 
有界数据: 指的数据有固定的开始和固定的结束数据大小是固定。我们称之为有界数据。对于有界数据一般采用批处理方案离线计算特点1-数据大小是固定2-程序处理有界数据程序最终一定会停止无界数据: 
无界数据: 指的数据有固定的开始但是没有固定的结束。我们称之为无界数据
注意: 对于无界数据我们一般采用流式处理方案实时计算特点1-数据没有明确的结束也就是数据大小不固定2-数据是源源不断的过来3-程序处理无界数据程序会一直运行不会结束2、基本介绍 
 结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API比如 Python Java Scala SQL … 
 Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作RDD是针对的有界的数据集但是为了能够兼容实时计算的处理场景提供微批处理模型本质上还是批处理只不过批与批之间的处理间隔时间变短了让我们感觉是在进行流式的计算操作目前默认的微批可以达到100毫秒一次 
 真正的流处理引擎: Storm(早期流式处理引擎)、Flink、Flume(流式数据采集) 
3、使用三大步骤(掌握) 
StructuredStreaming在进行数据流开发时的三个步骤 
1、读取数据流数据 : 指定数据源模式 sparksession对象.readStream.format(指定读取的数据源).option(指定读取的参数).load() 2、数据处理: 使用dsl或者sql方式计算数据和SparkSQL操作一样3、将计算的结果保存 : 指定输出模式,指定位置 writeStream.outputMode(输出模式).option(输出的参数配置).format(指定输出位置).start().awaitTermination()  
4.回顾sparkSQL的词频统计案例 
# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3# 创建main函数
if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder.appName(pyspark_demo).master(local[*]).getOrCreate()# 2.数据输入df  spark.read\.format(text)\.load(file:///export/data/spark_project/structured_Streaming/data/w1.txt)# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# SQL方式df.createTempView(tb)sql_df  spark.sql(select words,count(1) as cntfrom (select explode(split(value, )) as words from tb) t group by words)# DSL方式dsl_df  df.select(F.explode(F.split(value, )).alias(words)).groupBy(words).agg(F.count(words).alias(cnt))# 4.数据输出sql_df.show()dsl_df.show()# 5.关闭资源spark.stop() 
二、结构化流的编程模型掌握 
1、数据结构 在结构化流中我们可以将DataFrame称为无界的DataFrame或者无界的二维表 
2、读取数据源 对应官网文档内容 https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#input-sources 结构化流默认提供了多种数据源从而可以支持不同的数据源的处理工作。目前提供了如下数据源 File Source文件数据源。读取文件系统一般用于测试。如果文件夹下发生变化有新文件产生那么就会触发程序的运行  Socket Source网络套接字数据源一般用于测试。也就是从网络上消费/读取数据  Rate Source速率数据源。了解即可一般用于基准测试。通过配置参数由结构化流自动生成测试数据。  Kafka SourceKafka数据源。也就是作为消费者来读取Kafka中的数据。一般用于生产环境。  
2.1 File Source 相关的参数: 
option参数描述说明maxFilesPerTrigger每次触发时要考虑的最大新文件数 (默认: no max)latestFirst是否先处理最新的新文件, 当有大量文件积压时有用 (默认: false)fileNameOnly是否检查新文件只有文件名而不是完整路径默认值false将此设置为 true 时以下文件将被视为同一个文件因为它们的文件名“dataset.txt”相同 “file:///dataset.txt” “s3://a/dataset.txt  “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt” 
将目录中写入的文件作为数据流读取支持的文件格式为text、csv、json、orc、parquet。。。。 
文件数据源特点
1- 只能监听目录不能监听具体的文件
2- 可以通过*通配符的形式监听目录中满足条件的文件 
3- 如果监听目录中有子目录那么无法监听到子目录的变化情况读取代码通用格式: 
# 原生API
sparksession.readStream.format(CSV|JSON|Text|Parquet|ORC...).option(参数名1,参数值1).option(参数名2,参数值2).option(参数名N,参数值N).schema(元数据信息).load(需要监听的目录地址)# 简化API	
针对具体数据格式还有对应的简写API格式例如sparksession.readStream.csv(path需要监听的目录地址,schema元数据信息。。。)可能遇到的错误一 原因: 如果是文件数据源需要手动指定schema信息可能遇到的错误二 原因: File source只能监听目录不能监听具体文件2.2 Socket Source 首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据下载命令: yum -y install nc执行nc命令, 开启端口号, 写入数据: nc -lk 端口号查看端口号是否被使用命令: netstat -nlp | grep 要查询的端口注意: 要先启动nc再启动我们的程序 
代码格式:df  spark.readStream \.format(socket) \.option(host, 主机地址) \.option(port, 端口号) \.load()2.3 Rate Source 此数据源的提供, 主要是用于进行基准测试 
option参数描述说明rowsPerSecond每秒应该生成多少行 : 例如 100默认值1rampUpTime在生成速度变为rowsPerSecond之前应该经过多久的加速时间例如5 s默认0numPartitions生成行的分区: 例如 10默认值Spark 的默认并行度 
3、数据处理 
 指的是数据处理部分该操作和Spark SQL中是完全一致。可以使用SQL方式进行处理也可以使用DSL方式进行处理。 
4、数据输出 
 在结构化流中定义好DataFrame或者处理好DataFrame之后调用**writeStream()**方法完成数据的输出操作。在输出的过程中我们可以设置一些相关的属性然后启动结构化流程序运行。 4.1 输出模式 
可能遇到的错误 原因: 在结构化流中不能调用show()方法
解决办法: 需要使用writeStream().start()进行结果数据的输出在进行数据输出的时候必须通过outputMode来设置输出模式。输出模式提供了3种不同的模式 append模式 - 定义只输出新增的数据适用于不需要更新历史结果的场景。 - 示例实时日志处理中只输出新产生的日志记录。 update模式 - 定义输出新增或更新的数据适用于需要更新历史结果的场景。 - 示例实时用户行为分析中输出用户的最新行为数据。 complete模式 - 定义输出完整的结果集适用于需要全局统计结果的场景。 - 示例实时销售统计中输出所有销售数据的汇总结果。 实际生产场景 在实时日志处理中使用append模式输出新日志记录。在实时用户行为分析中使用update模式输出用户的最新行为数据。在实时销售统计中使用complete模式输出所有销售数据的汇总结果。 1- append模式增量模式 (默认) 特点当结构化程序处理数据的时候如果有了新数据才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作直接报错。而且也不支持排序操作。如果有了排序直接报错。  2- complete模式完全全量模式 特点当结构化程序处理数据的时候每一次都是针对全量的数据进行处理。由于数据越来越多所以在数据处理阶段必须要有聚合操作。如果没有聚合操作直接报错。另外还支持排序但是不是强制要求。  3- update模式更新模式 特点支持聚合操作。当结构化程序处理数据的时候如果处理阶段没有聚合操作该模式效果和append模式是一致。如果有了聚合操作只会输出有变化和新增的内容。但是不支持排序操作如果有了排序直接报错。  
4.1.1 append 模式 1- append模式增量模式 特点当结构化程序处理数据的时候如果有了新数据才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作直接报错。而且也不支持排序操作。如果有了排序直接报错。 如果有了聚合操作会报如下错误 如果有了排序操作会报如下错误 4.1.2 complete模式 2- complete模式完全全量模式 特点当结构化程序处理数据的时候每一次都是针对全量的数据进行处理。由于数据越来越多所以在数据处理阶段必须要有聚合操作。如果没有聚合操作直接报错。另外还支持排序但是不是强制要求。 如果没有聚合操作会报如下错误 4.1.3 update模式 3- update模式更新模式 特点支持聚合操作。当结构化程序处理数据的时候如果处理阶段没有聚合操作该模式效果和append模式是一致。如果有了聚合操作只会输出有变化和新增的内容。但是不支持排序操作如果有了排序直接报错。 如果有了排序操作会报如下错误  
4.2 输出终端/位置 
默认情况下Spark的结构化流支持多种输出方案 
1- console sink: 将结果数据输出到控制台。主要是用在测试中并且支持3种输出模式2- File sink: 输出到文件。将结果数据输出到某个目录下形成文件数据。只支持append模式3- foreach sink 和 foreachBatch sink: 将数据进行遍历处理。遍历后输出到哪里取决于自定义函数。并且支持3种输出模式4- memory sink: 将结果数据输出到内存中。主要目的是进行再次的迭代处理。数据大小不能过大。支持append模式和complete模式5- Kafka sink: 将结果数据输出到Kafka中。类似于Kafka中的生产者角色。并且支持3种输出模式5、综合案例(练习) 
需求: 已知文件中存储了多个单词,要求计算统计出现的次数 
词频统计_读取文件方式 
# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3# 创建main函数
if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(pyspark_demo)\.master(local[*])\.getOrCreate()# 2.数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df  spark.readStream\.format(text)\.load(file:///export/data/spark_project/structured_Streaming/data/)# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView(tb)sql_df  spark.sql(select words,count(1) as cntfrom (select explode(split(value, )) as words from tb) t group by words)# DSL方式dsl_df  df.select(F.explode(F.split(value, )).alias(words)).groupBy(words).agg(F.count(words).alias(cnt))# 4.数据输出# 注意: 输出不能使用原来sparksql的show()# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format(console).outputMode(complete).start()dsl_df.writeStream.format(console).outputMode(complete).start().awaitTermination()# 5.关闭资源spark.stop()词频统计_Socket方式 
首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据下载命令: yum -y install nc# 注意: 端口号: 范围0-65535   但是0-1024都是知名端口号查看端口号是否被使用命令: netstat -nlp | grep 55555执行nc命令, 开启端口号(选择没有被占用), 写入数据: nc -lk 55555注意: 要先启动nc再启动我们的程序 
代码格式:df  spark.readStream \.format(socket) \.option(host, 主机地址) \.option(port, 端口号) \.load()# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3# 创建main函数
if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(pyspark_demo)\.master(local[*])\.getOrCreate()# 2.数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df  spark.readStream\.format(socket)\.option(host,192.168.88.161)\.option(port,55555)\.load()# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView(tb)sql_df  spark.sql(select words,count(1) as cntfrom (select explode(split(value, )) as words from tb) t group by words)# DSL方式dsl_df  df.select(F.explode(F.split(value, )).alias(words)).groupBy(words).agg(F.count(words).alias(cnt))# 4.数据输出# 注意: 输出不能使用原来sparksql的show()# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format(console).outputMode(complete).start()dsl_df.writeStream.format(console).outputMode(complete).start().awaitTermination()# 5.关闭资源spark.stop() 
自动生成数据_Rate方式 
from pyspark.sql import SparkSession
import osos.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/pythonif __name__  __main__:# 1.创建SparkSession对象spark  SparkSession.builder \.appName(StructuredStream_rate) \.master(local[*]) \.getOrCreate()# 2。读取数据df  spark.readStream \.format(rate) \.option(rowsPerSecond, 5) \.option(numPartitions, 1) \.load()# 3.数据处理# 略# 4.数据输出:df.writeStream \.format(console) \.outputMode(update) \.option(truncate, false) \.start() \.awaitTermination()# 5.关闭资源spark.stop()6、设置触发器Trigger 
触发器Trigger决定多久执行一次操作并且输出结果。也就是在结构化流中处理完一批数据以后等待一会再处理下一批数据 
主要提供如下几种触发器 1- 默认方案也就是不使用触发器的情况。如果没有明确指定那么结构化流会自动进行决策每一个批次的大小。在运行过程中会尽可能让每一个批次间的间隔时间变得更短 result_df.writeStream\.outputMode(append)\.start()\.awaitTermination()2- 配置固定的时间间隔在结构化流运行的过程中当一批数据处理完以后下一批数据需要等待一定的时间间隔才会进行处理**常用推荐使用** result_df.writeStream\.outputMode(append)\.trigger(processingTime5 seconds)\.start()\.awaitTermination()情形说明
1- 上一批次的数据在时间间隔内处理完成了那么会等待我们配置触发器固定的时间间隔结束才会开始处理下一批数据
2- 上一批次的数据在固定时间间隔结束的时候才处理完成那么下一批次会立即被处理不会等待
3- 上一批次的数据在固定时间间隔内没有处理完成那么下一批次会等待上一批次处理完成以后立即开始处理不会等待3- 仅此一次在运行的过程中程序只需要执行一次然后就退出。这种方式适用于进行初始化操作以及关闭资源等 result_df.writeStream.foreachBatch(func)\.outputMode(append)\.trigger(onceTrue)\.start()\.awaitTermination()7、CheckPoint检查点目录设置 
设置检查点目的是为了提供容错性。当程序出现失败了可以从检查点的位置直接恢复处理即可。避免出现重复处理的问题 
默认位置: hdfs的/tmp/xxx 
如何设置检查点: 
1- SparkSession.conf.set(spark.sql.streaming.checkpointLocation, 检查点路径)
2- option(checkpointLocation, 检查点路径)推荐: 检查点路径支持本地和HDFS。推荐使用HDFS路径检查点目录主要包含以下几个目录位置:  
1-偏移量offsets: 记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据。在处理数据之前会将offset信息写入到该目录2-提交记录commits: 记录已经处理完成的批次。重启任务的时候会检查完成的批次和offsets目录中批次的记录进行对比。确定接下来要处理的批次3-元数据文件metadata: 和整个查询关联的元数据信息目前只保留当前的job id4-数据源sources: 是数据源Source各个批次的读取的详情5-数据接收端sinks: 是数据接收端各个批次的写出的详情6-状态state: 当有状态操作的时候例如累加、聚合、去重等操作场景这个目录会用来记录这些状态数据。根据配置周期性的生成。snapshot文件用于记录状态JSON是什么? 简单来说JSONJavaScript Object Notation是一种轻量级的数据交换格式就像是“数据的通用语言”易于人阅读和编写也易于机器解析和生成。  具体而言 结构 对象用花括号{}表示包含键值对键和值之间用冒号:分隔键值对之间用逗号,分隔。数组用方括号[]表示包含多个值值之间用逗号,分隔。值可以是字符串、数字、布尔值、对象、数组或null。 示例{name: Alice,age: 30,isStudent: false,courses: [Math, Science],address: {city: Beijing,zip: 100000}
}特点 轻量级相比于XMLJSON格式更简洁数据量更小。易读性结构清晰易于人阅读和编写。跨平台支持多种编程语言如JavaScript、Python、Java等。   实际生产场景 在Web开发中使用JSON作为前后端数据交换的格式。在API设计中使用JSON作为请求和响应的数据格式。在配置文件中使用JSON存储配置信息。  总之JSON是一种轻量级、易读、跨平台的数据交换格式广泛应用于Web开发、API设计和配置文件等领域。  三、Spark 和 Kafka 整合掌握 
 Spark天然支持集成Kafka, 基于Spark读取Kafka中的数据, 同时可以实施精准一次仅且只会处理一次的语义, 作为程序员, 仅需要关心如何处理消息数据即可, 结构化流会将数据读取过来, 转换为一个DataFrame的对象, DataFrame就是一个无界的DataFrame, 是一个无限增大的表 
0、整合Kafka准备工作 
说明: Jar包上传的位置说明 
如何放置相关的Jar包?  1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,目录位置: /export/server/spark/jars2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包目录位置: /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars jar包路径jar包下载地址: https://mvnrepository.com/1.spark和kafka集成 
1.1 官网文档链接: 
https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html 
1.2 常见选项: 
选项值解释kafka.bootstrap.servers以英文逗号分隔的host:port列表指定kafka服务的地址subscribe以逗号分隔的Topic主题列表订阅一个主题topic1或者多个主题topic1,topic2subscribePattern正则表达式字符串订阅主题的模式。可以用 topic.* 代表多个主题assign通过一个Json 字符串的方式来表示: {“topicA”:[0,1],“topicB”:[2,4]}要使用的特定TopicPartitionsincludeHeaders默认false是否在行中包含Kafka headers。startingOffsets流或者批的查询开始时的起始点: “earliest”(批默认), “latest” (流默认), or json string json串格式如下 { “topicA”: {“0”:23,“1”:-1}, “topicB”:{“0”:-2} }“earliest”表示最早的偏移量 “latest”表示最近的偏移量 或每个TopicPartition起始偏移量的json字符串。在json中-2作为偏移量表示最早-1表示最晚。注意: 对于批量查询:不允许使用latest(无论是隐式查询还是在json中使用-1)。 对于流查询: 这只适用于新查询开始时恢复总是从查询结束的地方继续。在查询期间新发现的分区将最早开始。endingOffsets批量查询结束时的结束点: latest(默认) , or json string {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}}“latest”指的是最新的 或每个TopicPartition结束偏移量的json字符串。在json中-1可以用来表示最近的偏移量-2(最早的)是不允许的! 
1.3 常见参数 
参数类型解释topicstring表示消息是从哪个Topic中消费出来valuebinary最重要的字段。发送数据的value值也就是消息内容。如果没有就为nullkeybinary发送数据的key值。如果没有就为nullpartitionint分区编号。表示消费到的该条数据来源于Topic的哪个分区offsetlong表示消息偏移量timestamptimestamp接收的时间戳 
2、从kafka中读取数据 
2.1 流式处理 
官方示例: 
# 订阅Kafka的一个Topic从最新的消息数据开始消费
df  spark \.readStream \.format(kafka) \.option(kafka.bootstrap.servers, host1:port1,host2:port2) \.option(subscribe, topic1) \.load()
df.selectExpr(CAST(key AS STRING), CAST(value AS STRING))# 订阅Kafka的多个Topic多个Topic间使用英文逗号进行分隔。从最新的消息数据开始消费
df  spark \.readStream \.format(kafka) \.option(kafka.bootstrap.servers, host1:port1,host2:port2) \.option(subscribe, topic1,topic2) \.load()
df.selectExpr(CAST(key AS STRING), CAST(value AS STRING))# 订阅符合规则的Topic从最新的数据开始消费
df  spark \.readStream \.format(kafka) \.option(kafka.bootstrap.servers, host1:port1,host2:port2) \.option(subscribePattern, topic.*) \.load()
df.selectExpr(CAST(key AS STRING), CAST(value AS STRING))# 订阅一个Topic并且指定header信息
df  spark \.readStream \.format(kafka) \.option(kafka.bootstrap.servers, host1:port1,host2:port2) \.option(subscribe, topic1) \.option(includeHeaders, true) \.load()
df.selectExpr(CAST(key AS STRING), CAST(value AS STRING), headers)练习示例 
从某一个Topic中读取消息数据 
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 1- 创建SparkSession对象spark  SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(ss_read_kafka_1_topic)\.master(local[*])\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df  spark.readStream\.format(kafka)\.option(kafka.bootstrap.servers,node1:9092)\.option(subscribe,itheima)\.option(startingoffsets,earliest)\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df  df.select(df.topic,F.decode(df.value, utf8).alias(key),F.decode(df.key,utf8).alias(value),df.partition,df.offset,df.timestamp,df.timestampType)# 获取数据etl_df.writeStream.format(console).outputMode(append).start().awaitTermination()# 3- 数据处理# result_df1  df.select(F.expr(cast(value as string) as value))# # selectExpr  select  F.expr# result_df2  df.selectExpr(cast(value as string) as value)# result_df3  df.withColumn(value,F.expr(cast(value as string)))# 4- 数据输出# 5- 启动流式任务如果有多个输出那么只能在最后一个start的后面写awaitTermination()# result_df1.writeStream.queryName(result_df1).format(console).outputMode(append).start()# result_df2.writeStream.queryName(result_df2).format(console).outputMode(append).start()# result_df3.writeStream.queryName(result_df3).format(console).outputMode(append).start().awaitTermination() 
2.2 批处理 
官方示例: 
# 订阅一个Topic主题, 默认从最早到最晚的偏移量范围
df  spark \.read \.format(kafka) \.option(kafka.bootstrap.servers, host1:port1,host2:port2) \.option(subscribe, topic1) \.load()
df.selectExpr(CAST(key AS STRING), CAST(value AS STRING))# 通过正则匹配多个Topic, 默认从最早到最晚的偏移量范围
df  spark \.read \.format(kafka) \.option(kafka.bootstrap.servers, host1:port1,host2:port2) \.option(subscribePattern, topic.*) \.option(startingOffsets, earliest) \.option(endingOffsets, latest) \.load()
df.selectExpr(CAST(key AS STRING), CAST(value AS STRING))# 订阅多个主题明确指定Kafka偏移量
df  spark \
.read \
.format(kafka) \
.option(kafka.bootstrap.servers, host1:port1,host2:port2) \
.option(subscribe, topic1,topic2) \
.option(startingOffsets, {topic1:{0:23,1:-2},topic2:{0:-2}}) \
.option(endingOffsets, {topic1:{0:50,1:-1},topic2:{0:-1}}) \
.load()
df.selectExpr(CAST(key AS STRING), CAST(value AS STRING))演示示例 
订阅一个Topic 
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 1- 创建SparkSession对象spark  SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(sparksql_read_kafka_1_topic)\.master(local[*])\.getOrCreate()# 2- 数据输入# 默认从Topic开头一直消费到结尾df  spark.read\.format(kafka)\.option(kafka.bootstrap.servers,node1:9092,node2:9092)\.option(subscribe,itheima)\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df  df.select(F.expr(cast(key as string) as key),F.decode(df.key,utf8),F.expr(cast(value as string) as value),F.decode(df.value, utf8),df.topic,df.partition,df.offset)# 获取数据etl_df.show()# # 3- 数据处理# result_df1  init_df.select(F.expr(cast(value as string) as value))# # selectExpr  select  F.expr# result_df2  init_df.selectExpr(cast(value as string) as value)# result_df3  init_df.withColumn(value,F.expr(cast(value as string)))# # 4- 数据输出# print(result_df1)# result_df1.show()# print(result_df2)# result_df2.show()# print(result_df3)# result_df3.show()# # 5- 释放资源# spark.stop()3、数据写入Kafka中 
3.1 流式处理 
官方示例: 
# 将Key和Value的数据都写入到Kafka当中
ds  df \
.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) \
.writeStream \
.format(kafka) \
.option(kafka.bootstrap.servers, host1:port1,host2:port2) \
.option(topic, topic1) \
.start()# 将Key和Value的数据都写入到Kafka当中。使用DataFrame数据中的Topic字段来指定要将数据写入到Kafka集群
# 的哪个Topic中。这种方式适用于消费多个Topic的情况
ds  df \
.selectExpr(topic, CAST(key AS STRING), CAST(value AS STRING)) \
.writeStream \
.format(kafka) \
.option(kafka.bootstrap.servers, host1:port1,host2:port2) \
.start()练习示例 
写出到指定Topic 
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 1- 创建SparkSession对象spark  SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(ss_read_kafka_1_topic)\.master(local[*])\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费init_df  spark.readStream\.format(kafka)\.option(kafka.bootstrap.servers,node1:9092,node2:9092)\.option(subscribe,itheima)\.load()# 3- 数据处理result_df  init_df.select(F.expr(concat(cast(value as string),_itheima) as value))# 4- 数据输出# 注意: 咱们修改完直接保存到kafka的itcast主题中,所以控制台没有数据,这是正常的哦!!!# 5- 启动流式任务result_df.writeStream\.format(kafka)\.option(kafka.bootstrap.servers,node1:9092,node2:9092)\.option(topic,itcast)\.option(checkpointLocation, hdfs://node1:8020/ck)\.start()\.awaitTermination()3.2 批处理 
官方示例: 
# 从DataFrame中写入key-value数据到一个选项中指定的特定Kafka topic中
df.selectExpr(CAST(key AS STRING), CAST(value AS STRING)) \.write \.format(kafka) \.option(kafka.bootstrap.servers, host1:port1,host2:port2) \.option(topic, topic1) \.save()# 使用数据中指定的主题将key-value数据从DataFrame写入Kafka
df.selectExpr(topic, CAST(key AS STRING), CAST(value AS STRING)) \.write \.format(kafka) \.option(kafka.bootstrap.servers, host1:port1,host2:port2) \.save()演示示例 
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 1- 创建SparkSession对象spark  SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(ss_read_kafka_1_topic)\.master(local[*])\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费init_df  spark.read\.format(kafka)\.option(kafka.bootstrap.servers,node1:9092,node2:9092)\.option(subscribe,itheima)\.load()# 3- 数据处理result_df  init_df.select(F.expr(concat(cast(value as string),_666) as value))# 4- 数据输出# 5- 启动流式任务result_df.write.format(kafka)\.option(kafka.bootstrap.servers,node1:9092,node2:9092)\.option(topic,itcast)\.option(checkpointLocation, hdfs://node1:8020/ck)\.save()01_回顾sparkSQL词频统计过程.py 
# 导包
import os
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 先创建spark session对象spark  SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 1.读取文件生成dfdf  spark.read.text(file:///export/data/spark_project/09_结构化流/data3.txt)# df.show()# 2.数据处理etl_df  df.dropDuplicates().fillna(未知)# 3.数据分析# 需求: 统计每个单词出现的次数# 方式1: sql方式etl_df.createTempView(word_tb)sql_result_df  spark.sql(with t as (select explode(split(value, )) as wordfrom word_tb)select word,count(*) as cnt from t group by word)# 方式2: dsl方式dsl_result_df  etl_df.select(F.explode(F.split(value,  )).alias(word)).groupby(word).agg(F.count(word).alias(cnt))# 4.数据展示/导出sql_result_df.show()dsl_result_df.show()# 注意: 最后一定释放资源spark.stop() 
02_结构化流词频统计案例_读取文件方式.py 
# 导包
import os
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 1.创建SparkContext对象spark  SparkSession.builder \.config(spark.sql.shuffle.partitions, 1) \.appName(pyspark_demo) \.master(local[*]) \.getOrCreate()# 2.TODO 数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df  spark.readStream \.format(text) \.load(file:///export/data/spark_project/09_结构化流/data/)# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView(tb)sql_df  spark.sql(select words,count(1) as cntfrom (select explode(split(value, )) as words from tb) t group by words)# DSL方式 略# 4.数据输出# 注意: 输出不能使用原来sparksql的show(),否则报错# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format(console).outputMode(complete).start().awaitTermination()# 注意: 最后一定释放资源spark.stop() 
03_结构化流词频统计案例_socket方式.py 
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 先创建spark session对象spark  SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# 1.读取socket发来的消息df  spark.readStream \.format(socket) \.option(host, 192.168.88.161) \.option(port, 55555) \.load()# 2.数据处理# 3.数据分析# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView(tb)sql_df  spark.sql(select words,count(1) as cntfrom (select explode(split(value, )) as words from tb) t group by words)# DSL方式 略# 4.数据输出sql_df.writeStream.format(console).outputMode(complete).start().awaitTermination()# 注意: 最后一定释放资源spark.stop() 
04_结构化流词频统计案例_设置触发器和检查点.py 
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 先创建spark session对象spark  SparkSession.builder.appName(spark_demo).master(local[*]).getOrCreate()# TODO: 设置检查点路径spark.conf.set(spark.sql.streaming.checkpointLocation, hdfs://node1:8020/ckpt2)# 1.读取socket发来的消息df  spark.readStream \.format(socket) \.option(host, 192.168.88.161) \.option(port, 55555) \.load()# 2.数据处理# 3.数据分析# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView(tb)sql_df  spark.sql(select words,count(1) as cntfrom (select explode(split(value, )) as words from tb) t group by words)# DSL方式 略# 4.数据输出# TODO: .trigger(processingTime5 seconds)添加触发器sql_df.writeStream.format(console).outputMode(complete).trigger(processingTime5 seconds).start().awaitTermination()# 注意: 最后一定释放资源spark.stop() 
05_流方式读取kafka数据.py 
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 1- 创建SparkSession对象spark  SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(ss_read_kafka_1_topic)\.master(local[*])\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df  spark.readStream\.format(kafka)\.option(kafka.bootstrap.servers,node2:9092)\.option(subscribe,kafka_spark1)\.option(startingoffsets,earliest)\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df  df.select(df.topic,F.decode(df.key, utf8).alias(key),F.decode(df.value,utf8).alias(value),df.partition,df.offset,df.timestamp,df.timestampType)# 展示数据# 直接展示到控制台etl_df.writeStream.format(console).outputMode(append).start().awaitTermination()06_流方式写数据到kafka.py 
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ[SPARK_HOME]  /export/server/spark
os.environ[PYSPARK_PYTHON]  /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON]  /root/anaconda3/bin/python3if __name__  __main__:# 1- 创建SparkSession对象spark  SparkSession.builder\.config(spark.sql.shuffle.partitions,1)\.appName(ss_read_kafka_1_topic)\.master(local[*])\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df  spark.readStream\.format(kafka)\.option(kafka.bootstrap.servers,node2:9092)\.option(subscribe,kafka_spark1)\.option(startingoffsets,earliest)\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df  df.select(F.decode(df.value,utf8).alias(value))# TODO: 原来默认展示到控制台,接下来演示如何把数据存储到kafka中etl_df.writeStream\.format(kafka)\.option(kafka.bootstrap.servers,node2:9092)\.option(topic,kafka_spark2)\.option(checkpointLocation, hdfs://node1:8020/ckpt3)\.start()\.awaitTermination()