当前位置: 首页 > news >正文

做兼职的设计网站有哪些自动翻译网站软件

做兼职的设计网站有哪些,自动翻译网站软件,常用的网络推广方法有哪些,爱站网怎么打不开目录 相关资料 一、概述 1.1 基本概念 1.2 两种处理模型 #xff08;1#xff09;微批处理 #xff08;2#xff09;持续处理 1.3 Structured Streaming和Spark SQL、Spark Streaming关系 二、编写Structured Streaming程序的基本步骤 三、输入源 3.1 File源 1微批处理 2持续处理 1.3 Structured Streaming和Spark SQL、Spark Streaming关系 二、编写Structured Streaming程序的基本步骤 三、输入源 3.1 File源 1创建程序生成JSON格式的File源测试数据 2创建程序对数据进行统计 3测试运行程序 4处理警告 5总结分析 3.2 Kafka源 1启动Kafka 2编写生产者Producer程序 3安装Python3的Kafka支持 4运行生产者程序 5编写并运行消费者Consumer程序 方式一 方式二 总结 3.3 Socket源 3.4 Rate源 相关资料 1.厦大 Kafka和Structured Streaming的组合使用Scala版 Kafka和Structured Streaming的组合使用Spark 3.2.0_厦大数据库实验室博客 (xmu.edu.cn)https://dblab.xmu.edu.cn/blog/3160/2.Structured Streaming Kafka集指南 Structured Streaming Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.2.0 Documentation (apache.org)https://spark.apache.org/docs/3.2.0/structured-streaming-kafka-integration.html3.Pyspark手册DataStreamReader pyspark.sql.streaming.DataStreamReader — PySpark 3.2.0 documentation (apache.org)https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.streaming.DataStreamReader.html#pyspark.sql.streaming.DataStreamReader4.Kafka安装教程 Ubuntu22.04下安装kafka_2.12-2.6.0并运行简单实例_ubuntu22.04kafka安装-CSDN博客文章浏览阅读723次点赞22次收藏31次。安装Kafka - 启动Kafka - 测试Kafka是否正常工作_ubuntu22.04kafka安装https://blog.csdn.net/qq_67822268/article/details/1386264125.Maven中央仓库 Central Repository: (maven.org)https://repo1.maven.org/maven2/6.Maven Repository mvnrepository.comhttps://mvnrepository.com/7.kafka-python文档 kafka-python — kafka-python 2.0.2-dev documentationhttps://kafka-python.readthedocs.io/en/master/index.html8.strcuted streaming OutputMode讲解 strcuted streaming OutputMode讲解 - 简书 (jianshu.com)https://www.jianshu.com/p/ed1398c2470a 一、概述 1.1 基本概念 Structured Streaming 是 Apache Spark 提供的一种流处理引擎它基于 Spark SQL 引擎并提供了更高级别、更易用的 API使得处理实时数据流变得更加简单和直观。 Structured Streaming 的一些特点和优势 基于 DataFrame 和 Dataset APIStructured Streaming 构建在 Spark 的 DataFrame 和 Dataset API 之上使得对流数据的处理与批处理非常类似降低了学习成本。 容错性Structured Streaming 提供端到端的容错保证指在分布式系统中整个数据处理流程从数据输入到输出的全过程都能够保证容错性。换句话说无论是数据的接收、处理还是输出系统都能够在发生故障或异常情况时保持数据的完整性和一致性能够确保在发生故障时不会丢失数据并且能够保证精确一次处理语义。 高性能Structured Streaming 充分利用了 Spark 引擎的优化能力能够进行查询优化、状态管理和分布式处理从而提供高性能的实时处理能力。 灵活的事件时间处理Structured Streaming 支持事件时间event-time处理可以轻松处理乱序事件、延迟事件等场景并提供丰富的窗口操作支持。 集成性Structured Streaming 提供了与各种数据源的集成包括 Kafka、Flume、HDFS、S3 等同时也支持将结果写入各种存储系统。 易于调试和监控Structured Streaming 提供了丰富的监控和调试功能包括进度报告、状态查询等方便用户监控作业的执行情况。 Structured Streaming的关键思想是将实时数据流视为一张正在不断添加数据的表 可以把流计算等同于在一个静态表上的批处理查询Spark会在不断添加数据的无界输入表上运行计算并进行增量查询 在无界表上对输入的查询将生成结果表系统每隔一定的周期会触发对无界表的计算并更新结果表 1.2 两种处理模型 1微批处理 Structured Streaming默认使用微批处理执行模型这意味着Spark流计算引擎会定期检查流数据源并对自上一批次结束后到达的新数据执行批量查询 数据到达和得到处理并输出结果之间的延时超过100毫秒 在这里回答三个问题 1.什么是偏移量 在 Structured Streaming 中偏移量Offset是指用于标识数据流中位置的标记它表示了数据流中的一个特定位置或者偏移量。在流处理中偏移量通常用于记录已经处理的数据位置以便在失败恢复、断点续传或者状态管理等场景下能够准确地从中断处继续处理数据。 具体来说在结构化流处理中偏移量通常与输入数据源紧密相关比如 Kafka、File Source 等。当 Spark 结构化流启动时会从数据源中读取偏移量并使用这些偏移量来确定应该从哪里开始读取数据。随着数据被处理Spark 会不断更新偏移量以确保在发生故障或重启情况下能够准确地恢复到之前处理的位置。 2.为什么要记录偏移量 容错和故障恢复记录偏移量可以确保在流处理过程中发生故障或者需要重启时能够准确地恢复到之前处理的位置避免数据的丢失和重复处理。通过记录偏移量流处理系统能够知道从哪里继续读取数据从而保证数据处理的完整性和一致性。 精确一次处理语义记录偏移量也有助于实现精确一次处理语义即确保每条输入数据只被处理一次。通过准确记录偏移量并在发生故障后能够准确地恢复到之前的位置流处理系统能够避免重复处理数据从而确保处理结果的准确性。 断点续传记录偏移量还使得流处理系统能够支持断点续传的功能即在流处理过程中可以随时停止并在之后恢复到之前的处理位置而不需要重新处理之前已经处理过的数据。 通过记录偏移量结构化流处理可以实现精确一次处理语义并确保即使在出现故障和重启的情况下也能够保证数据不会被重复处理或丢失。因此偏移量在结构化流处理中扮演着非常重要的角色是实现流处理的容错性和准确性的关键之一。 关于偏移量的理解可以参考关于偏移量的理解-CSDN博客 3.为什么延时超过100毫秒 Driver 驱动程序通过将当前待处理数据的偏移量保存到预写日志中来对数据处理进度设置检查点以便今后可以使用它来重新启动或恢复查询。 为了获得确定性的重新执行Deterministic Re-executions和端到端语义在下一个微批处理之前就要将该微批处理所要处理的数据的偏移范围保存到日志中。所以当前到达的数据需要等待先前的微批作业处理完成且它的偏移量范围被记入日志后才能在下一个微批作业中得到处理这会导致数据到达和得到处理并输出结果之间的延时超过100毫秒。 2持续处理 微批处理的数据延迟对于大多数实际的流式工作负载如ETL和监控已经足够了然而一些场景确实需要更低的延迟。比如在金融行业的信用卡欺诈交易识别中需要在犯罪分子盗刷信用卡后立刻识别并阻止但是又不想让合法交易的用户感觉到延迟从而影响用户的使用体验这就需要在 1020毫秒的时间内对每笔交易进行欺诈识别这时就不能使用微批处理模型而需要使用持续处理模型。   Spark从2.3.0版本开始引入了持续处理的试验性功能可以实现流计算的毫秒级延迟。 在持续处理模式下Spark不再根据触发器来周期性启动任务而是启动一系列的连续读取、处理和写入结果的长时间运行的任务。 为了缩短延迟引入了新的算法对查询设置检查点在每个任务的输入数据流中一个特殊标记的记录被注入。当任务遇到标记时任务把处理后的最后偏移量异步任务的执行不必等待其他任务完成或某个事件发生地报告给引擎引擎接收到所有写入接收器的任务的偏移量后写入预写日志。由于检查点的写入是完全异步的任务可以持续处理因此延迟可以缩短到毫秒级。也正是由于写入是异步的会导致数据流在故障后可能被处理超过一次以上所以持续处理只能做到“至少一次”的一致性。因此需要注意到虽然持续处理模型能比微批处理模型获得更好的实时响应性能但是这是以牺牲一致性为代价的。微批处理可以保证端到端的完全一致性而持续处理只能做到“至少一次”的一致性。 微批处理和持续处理是流处理中两种常见的处理模式将他们进行对比 处理方式 微批处理micro-batch processing将连续的数据流按照一定的时间间隔或者数据量划分成小批量进行处理每个批量数据被视为一个微批作业类似于批处理的方式进行处理。持续处理continuous processing对不间断的数据流进行实时处理没有明确的批次边界数据到达后立即进行处理和输出。 延迟和实时性 微批处理通常会导致一定的延迟因为数据需要等待下一个批次的处理才能输出结果因此微批处理一般无法做到完全的实时性。持续处理具有更好的实时性因为数据到达后立即进行处理可以更快地输出结果。 容错和状态管理 微批处理通常通过检查点机制来实现容错和状态管理每个微批作业之间会保存处理状态以便故障恢复和重新执行。持续处理也需要考虑容错和状态管理但通常需要使用更复杂的机制来实现实时的状态管理和故障恢复。 资源利用 微批处理可以更好地利用批处理系统的资源因为可以对数据进行分批处理适用于一些需要大批量数据一起处理的场景。持续处理需要更多的实时资源和更高的实时性能适用于对数据要求实时性较高的场景。 1.3 Structured Streaming和Spark SQL、Spark Streaming关系 Structured Streaming处理的数据跟Spark Streaming一样也是源源不断的数据流区别在于Spark Streaming采用的数据抽象是DStream本质上就是一系列RDD而Structured Streaming采用的数据抽象是DataFrame。Structured Streaming可以使用Spark SQL的DataFrame/Dataset来处理数据流。虽然Spark SQL也是采用DataFrame作为数据抽象但是Spark SQL只能处理静态的数据而Structured Streaming可以处理结构化的数据流。这样Structured Streaming就将Spark SQL和Spark Streaming二者的特性结合了起来。Structured Streaming可以对DataFrame/Dataset应用各种操作包括select、where、groupBy、map、filter、flatMap等。Spark Streaming只能实现秒级的实时响应而Structured Streaming由于采用了全新的设计方式采用微批处理模型时可以实现100毫秒级别的实时响应采用持续处理模型时可以支持毫秒级的实时响应。 二、编写Structured Streaming程序的基本步骤 编写Structured Streaming程序的基本步骤包括 导入pyspark模块创建SparkSession对象创建输入数据源定义流计算过程启动流计算并输出结果 实例任务一个包含很多行英文语句的数据流源源不断到达Structured Streaming程序对每行英文语句进行拆分并统计每个单词出现的频率 在/home/hadoop/sparksj/mycode/structured目录下创建StructuredNetworkWordCount.py文件 # 导入必要的 SparkSession 和函数库 from pyspark.sql import SparkSession from pyspark.sql.functions import split from pyspark.sql.functions import explode# 程序的入口点判断是否在主程序中执行 if __name__ __main__:# 创建 SparkSession 对象设置应用程序名字为 StructuredNetworkWordCountspark SparkSession \.builder \.appName(StructuredNetworkWordCount) \.getOrCreate()# 设置 Spark 日志级别为 WARN减少日志输出spark.sparkContext.setLogLevel(WARN)# 从指定的主机localhost和端口9999读取数据流使用 socket 格式lines spark \.readStream \.format(socket) \.option(host, localhost) \.option(port, 9999) \.load()# 将每行数据按空格分割成单词并使用 explode 函数将单词展开成行words lines.select(explode(split(lines.value, )).alias(word))# 对单词进行分组计数wordCounts words.groupBy(word).count()# 将结果写入到控制台输出模式为 complete每8秒触发一次流处理query wordCounts \.writeStream \.outputMode(complete) \.format(console) \.trigger(processingTime8 seconds) \.start()# 等待流查询终止query.awaitTermination()在执行StructuredNetworkWordCount.py之前需要启动HDFS start-dfs.sh 新建一个终端记作“数据源终端”输入如下命令 nc -lk 9999 再新建一个终端记作“流计算终端”执行如下命令 cd /home/hadoop/sparksj/mycode/structured spark-submit StructuredNetworkWordCount.py 执行程序后在“数据源终端”内用键盘不断敲入一行行英文语句nc程序会把这些数据发送给StructuredNetworkWordCount.py程序进行处理 输出结果内的Batch后面的数字说明这是第几个微批处理系统每隔8秒会启动一次微批处理并输出数据。如果要停止程序的运行则可以在终端内键入“CtrlC”来停止。 三、输入源 3.1 File源 File源或称为“文件源”以文件流的形式读取某个目录中的文件支持的文件格式为csv、json、orc、parquet、text等。 需要注意的是文件放置到给定目录的操作应当是原子性的即不能长时间在给定目录内打开文件写入内容而是应当采取大部分操作系统都支持的、通过写入到临时文件后移动文件到给定目录的方式来完成。 File 源的选项option主要包括如下几个 path输入路径的目录所有文件格式通用。path 支持glob 通配符路径但是目录或glob通配符路径的格式不支持以多个逗号分隔的形式。maxFilesPerTrigger每个触发器中要处理的最大新文件数默认无最大值。latestFirst是否优先处理最新的文件当有大量文件积压时设置为True可以优先处理新文件默认为False。fileNameOnly是否仅根据文件名而不是完整路径来检查新文件默认为False。如果设置为True则以下文件将被视为相同的文件因为它们的文件名“dataset.txt”相同 file:///dataset.txt s3://a/dataset.txt s3n://a/b/dataset.txt s3a://a/b/c/dataset.txt 特定的文件格式也有一些其他特定的选项具体可以参阅Spark手册内DataStreamReader中的相关说明 以.csv文件源为例以下为示例代码 csvDF spark \.readStream \.format(csv) \.option(seq,;) \.load(SOME_DIR) 其中seq选项指定了.csv的间隔符号。 实例 以一个JSON格式文件的处理来演示File源的使用方法主要包括以下两个步骤 创建程序生成JSON格式的File源测试数据创建程序对数据进行统计 1创建程序生成JSON格式的File源测试数据 生成模拟的电商购买行为数据并将数据保存为 JSON 文件。模拟了用户的登录、登出和购买行为包括事件发生的时间戳、动作类型和地区等信息 在/home/hadoop/sparksj/mycode/structured目录下创建a.py文件 import os # 导入 os 模块用于处理文件和目录路径 import shutil # 导入 shutil 模块用于文件操作比如移动文件 import random # 导入 random 模块用于生成随机数 import time # 导入 time 模块用于获取时间戳# 定义测试数据存储的临时目录和最终目录 TEST_DATA_TEMP_DIR /tmp/ # 临时目录用于生成文件 TEST_DATA_DIR /tmp/testdata/ # 最终目录存储生成的文件# 定义可能的行为和地区 ACTION_DEF [login, logout, purchase] # 可能的行为 DISTRICT_DEF [fujian, beijing, shanghai, guangzhou] # 可能的地区# JSON 行的模板包含时间、行为和地区 JSON_LINE_PATTERN {{eventTime: {}, action: {}, district: {}}}\n# 设置测试环境清空最终目录 def test_setUp():if os.path.exists(TEST_DATA_DIR): # 检查最终目录是否存在shutil.rmtree(TEST_DATA_DIR, ignore_errorsTrue) # 如果存在递归删除目录及其内容os.mkdir(TEST_DATA_DIR) # 创建最终目录# 清理测试环境删除最终目录及其内容 def test_tearDown():if os.path.exists(TEST_DATA_DIR): # 检查最终目录是否存在shutil.rmtree(TEST_DATA_DIR, ignore_errorsTrue) # 如果存在递归删除目录及其内容# 写入文件并移动到最终目录 def write_and_move(filename, data):with open(TEST_DATA_TEMP_DIR filename,wt, encodingutf-8) as f: # 打开临时目录下的文件并写入数据f.write(data) # 写入数据到文件shutil.move(TEST_DATA_TEMP_DIR filename, TEST_DATA_DIR filename) # 将文件移动到最终目录# 主程序 if __name__ __main__: # 程序的入口如果作为脚本直接执行则会执行下面的代码test_setUp() # 设置测试环境清空最终目录# 生成模拟数据循环生成100个文件for i in range(100):filename e-mall-{}.json.format(i) # 生成文件名格式为 e-mall-i.jsoncontent # 初始化内容为空字符串rndcount list(range(10)) # 生成一个包含0到9的列表random.shuffle(rndcount) # 打乱列表顺序随机生成行数for _ in rndcount: # 遍历每一个随机数content JSON_LINE_PATTERN.format( # 根据模板生成一行 JSON 数据str(int(time.time())), # 时间戳当前时间的秒数转换为字符串random.choice(ACTION_DEF), # 随机选择行为random.choice(DISTRICT_DEF)) # 随机选择地区write_and_move(filename, content) # 调用函数写入数据到文件并移动到最终目录time.sleep(1) # 休眠1秒模拟数据生成间隔test_tearDown() # 清理测试环境删除最终目录及其内容这段程序首先建立测试环境清空测试数据所在的目录接着使用for循环一千次来生成一千个文件文件名为“e-mall-数字.json” 文件内容是不超过100行的随机JSON行行的格式是类似如下 {eventTime: 1546939167, action: logout, district: fujian}\n 其中时间、操作和省与地区均随机生成。测试数据是模拟电子商城记录用户的行为可能是登录、退出或者购买并记录了用户所在的省与地区。为了让程序运行一段时间每生成一个文件后休眠1秒。在临时目录内生成的文件通过移动move的原子操作移动到测试目录。 2创建程序对数据进行统计 同样在/home/hadoop/sparksj/mycode/structured目录下创建b.py文件 import os # 导入 os 模块用于处理文件和目录路径 import shutil # 导入 shutil 模块用于文件操作比如移动文件 from pprint import pprint # 导入 pprint 模块用于漂亮地打印数据结构from pyspark.sql import SparkSession # 从 PySpark 中导入 SparkSession用于创建 Spark 应用程序 from pyspark.sql.functions import window, asc # 从 PySpark 中导入窗口函数和升序排序函数 from pyspark.sql.types import StructType, StructField, TimestampType, StringType # 从 PySpark 中导入结构类型和时间戳类型、字符串类型TEST_DATA_DIR_SPARK file:///tmp/testdata/ # 测试数据存储的目录使用 file:/// 开头表示本地文件系统路径if __name__ __main__: # 程序入口如果作为脚本直接执行则执行下面的代码# 定义模拟数据的结构schema StructType([StructField(eventTime, TimestampType(), True), # 定义事件时间字段类型为时间戳StructField(action, StringType(), True), # 定义行为字段类型为字符串StructField(district, StringType(), True)]) # 定义地区字段类型为字符串# 创建 SparkSession如果已存在则获取否则创建一个新的spark SparkSession \.builder \.appName(StructuredEMallPurchaseCount) \ # 设置应用程序名称.getOrCreate()spark.sparkContext.setLogLevel(WARN) # 设置日志级别为 WARN以减少不必要的日志输出# 从文件流中读取 JSON 数据应用指定的模式lines spark \.readStream \.format(json) \.schema(schema) \.option(maxFilesPerTrigger, 100) \ # 每次触发处理的最大文件数以控制处理速度.load(TEST_DATA_DIR_SPARK)windowDuration 1 minutes # 定义时间窗口的持续时间# 对购买行为进行筛选、按地区和时间窗口进行分组统计购买次数并按时间窗口排序windowedCounts lines \.filter(action purchase) \.groupBy(district, window(eventTime, windowDuration)) \.count() \.sort(asc(window))# 将结果写入控制台query windowedCounts \.writeStream \.outputMode(complete) \.format(console) \.option(truncate, false) \ # 控制台输出不截断.trigger(processingTime10 seconds) \ # 触发处理的时间间隔.start()query.awaitTermination() # 等待查询终止该程序的目的是过滤用户在电子商城里的购买记录并根据省与地区以1分钟的时间窗口统计各个省与地区的购买量并按时间排序后输出。 3测试运行程序 程序运行过程需要访问HDFS因此需要启动HDFS start-dfs.sh 新建一个终端执行如下命令生成测试数据 cd /home/hadoop/sparksj/mycode/structured python3 a.py 再次新建一个终端执行如下命令运行数据统计程序 cd /home/hadoop/sparksj/mycode/structured spark-submit b.py 运行程序以后可以看到类似如下的输出结果 4处理警告 如果运行过程中出现警告可忽略不影响正常运行 运行过程中出现如下警告当然也不影响运行也可以进行解决 意思就是处理时间触发器的批处理已经开始滞后。具体来说当前批处理花费的时间超过了触发器设定的时间间隔 上述代码中触发器的间隔被设置为 10000 毫秒也就是10秒但是当前批处理花费了16341毫秒远远超过了设定的时间间隔 可能会导致 处理延迟: 当批处理花费的时间超过触发器设定的时间间隔时可能会导致处理延迟因为下一个批处理可能无法按时启动。 资源利用不佳: 如果批处理持续花费较长时间可能会导致资源如CPU、内存等的浪费因为资源被用于等待而不是实际的处理任务。 上述警告可通过修改b.py代码中processingTime的值将它改成大于上图中的16341ms即可1秒1000毫秒 当然若读者厌烦于这些警告也可与选择设置 Apache Spark 的日志级别为 ERROR只记录 ERROR 级别及以上的日志信息 将b.py代码中的spark.sparkContext.setLogLevel(WARN)改为spark.sparkContext.setLogLevel(ERROR)即可 保存并再次运行可得到干净整洁的结果 5总结分析 a.py是一个 Python 脚本用于生成模拟的电商购买行为数据并将数据保存为 JSON 文件。它模拟了用户的登录、登出和购买行为包括事件发生的时间戳、动作类型和地区等信息。 b.py是一个 PySpark Structured Streaming 应用程序用于实时处理模拟的电商购买行为数据。它从指定的目录即a.py生成的 JSON 文件目录读取数据并进行实时统计计算每个地区在一分钟内的购买次数并按时间窗口排序然后将结果输出到控制台。 联系a.py生成的模拟购买行为数据是b.py的输入数据源。a.py生成的 JSON 文件包含了购买行为的模拟数据而b.py则通过 Spark Structured Streaming 读取这些 JSON 文件并实时处理统计购买行为数据最终将结果输出到控制台。 如果你先执行a.py生成了购买行为的模拟数据然后再执行b.py它将会从a.py生成的目录中读取数据并进行实时统计购买行为数据。这样你就可以通过实时监控控制台输出了解每个地区在一分钟内的购买情况从而进行实时的业务分析或监控。 3.2 Kafka源 Kafka 源的选项option包括如下几个: assign指定所消费的Kafka主题和分区。subscribe订阅的Kafka主题为逗号分隔的主题列表。subscribePattern订阅的Kafka主题正则表达式可匹配多个主题。kafka.bootstrap.serversKafka服务器的列表逗号分隔的“hostport”列表。startingOffsets起始位置偏移量。endingOffsets结束位置偏移量。failOnDataLoss布尔值表示是否在Kafka 数据可能丢失时主题被删除或位置偏移量超出范围等触发流计算失败。一般应当禁止以免误报。 实例使用生产者程序每0.1秒生成一个包含2个字母的单词并写入Kafka的名称为“wordcount-topic”的主题Topic内。Spark的消费者程序通过订阅wordcount-topic会源源不断收到单词并且每隔8秒钟对收到的单词进行一次词频统计把统计结果输出到Kafka的主题wordcount-result-topic内同时通过2个监控程序检查Spark处理的输入和输出结果。 1启动Kafka 新建一个终端记作“Zookeeper终端”输入下面命令启动Zookeeper服务不要关闭这个终端窗口一旦关闭Zookeeper服务就停止了 cd /usr/local/kafka ./bin/zookeeper-server-start.sh config/zookeeper.properties 另外打开第二个终端记作“Kafka终端”然后输入下面命令启动Kafka服务不要关闭这个终端窗口一旦关闭Kafka服务就停止了 cd /usr/local/kafka ./bin/kafka-server-start.sh config/server.properties 再新开一个终端记作“监控输入终端”执行如下命令监控Kafka收到的文本 cd /usr/local/kafka ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-topic 再新开一个终端记作“监控输出终端”执行如下命令监控输出的结果文本 cd /usr/local/kafka ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-result-topic 2编写生产者Producer程序 在/home/hadoop/sparksj/mycode/structured/kafkasource目录下创建并编辑spark_ss_kafka_producer.py文件 cd /home/hadoop/sparksj/mycode/structured/kafkasource vim spark_ss_kafka_producer.py import string import random import time from kafka import KafkaProducer# 导入所需的库if __name__ __main__:# 程序的入口点# 创建一个 Kafka 生产者指定 Kafka 服务器的地址producer KafkaProducer(bootstrap_servers[localhost:9092])while True:# 进入无限循环不断生成并发送消息# 生成两个随机小写字母组成的字符串s2 (random.choice(string.ascii_lowercase) for _ in range(2))word .join(s2)# 将字符串转换为字节数组value bytearray(word, utf-8)# 发送消息到名为 wordcount-topic 的 Kafka 主题# 并设置超时时间为 10 秒producer.send(wordcount-topic, valuevalue).get(timeout10)# 休眠 0.1 秒然后继续循环time.sleep(0.1) 3安装Python3的Kafka支持 在运行生产者程序之前要先安装kafka-python如果读者之前已经安装可跳过此小节。 1.首先确认有没有安装pip3如果没有使用如下命令安装笔者已经安装不在演示 sudo apt-get install pip3 2.安装kafka-python模块命令如下 sudo pip3 install kafka-python 安装完成后可以使用pip3 list命令列出当前 Python 环境中已安装的所有 Python 包查看是否有kafka-python包 pip3 list 可以看到存在kafka-python包版本为2.0.2 4运行生产者程序 新建一个终端在终端中执行如下命令运行生产者程序 cd /home/hadoop/sparksj/mycode/structured/kafkasource python3 spark_ss_kafka_producer.py 生产者程序执行以后在“监控输入终端”的窗口内就可以看到持续输出包含2个字母的单词。程序会生成随机字符串并将其发送到 Kafka 主题中主题接收到随机字符串后会展示到终端。 解释 执行1中的命令 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-topic 会启动 Kafka 的控制台消费者用于从指定的 Kafka 主题中读取消息并将其输出到控制台上。 而生产者程序是一个简单的 Kafka 生产者示例用于生成随机字符串并将其发送到名为 wordcount-topic 的 Kafka 主题中。 当启动 Kafka 的控制台消费者同时运行生产者程序时生产者代码会不断地生成随机字符串并发送到 wordcount-topic 主题而控制台消费者则会从该主题中读取并显示这些消息。因此会导致生产者不断地生成消息并且控制台消费者会即时地输出这些消息从而实现了消息的生产和消费过程。 上述用于测试 Kafka 环境的搭建和消息传递的过程以确保生产者能够成功地将消息发送到指定的主题同时消费者能够从该主题中接收并处理这些消息。 5编写并运行消费者Consumer程序 同样在/home/hadoop/sparksj/mycode/structured/kafkasource目录下创建并编辑spark_ss_kafka_consumer.py文件 cd /home/hadoop/sparksj/mycode/structured/kafkasource vim spark_ss_kafka_consumer.py from pyspark.sql import SparkSession# 主程序入口 if __name__ __main__:# 创建一个 SparkSessionspark SparkSession \.builder \.appName(StructuredKafkaWordCount) \ # 设置应用程序名称.getOrCreate() # 获取或创建 SparkSession 实例# 设置日志级别为WARN避免过多的输出信息spark.sparkContext.setLogLevel(WARN)# 从 Kafka 主题中读取数据lines spark \.readStream \ # 创建一个流式DataFrame.format(kafka) \ # 指定数据源格式为Kafka.option(kafka.bootstrap.servers, localhost:9092) \ # 设置Kafka集群的地址.option(subscribe, wordcount-topic) \ # 订阅名为wordcount-topic的主题.load() \ # 从Kafka主题中加载数据.selectExpr(CAST(value AS STRING)) # 将消息内容转换为字符串格式# 对数据进行聚合统计wordCounts lines.groupBy(value).count()# 将结果写入到另一个 Kafka 主题中query wordCounts \.selectExpr(CAST(value AS STRING) as key, CONCAT(CAST(value AS STRING), :, CAST(count AS STRING)) as value) \ # 格式化输出的key和value.writeStream \ # 创建一个流式DataFrame.outputMode(complete) \ # 定义输出模式为complete.format(kafka) \ # 指定输出数据源格式为Kafka.option(kafka.bootstrap.servers, localhost:9092) \ # 设置Kafka集群的地址.option(topic, wordcount-result-topic) \ # 指定输出的Kafka主题.option(checkpointLocation, file:///tmp/kafka-sink-cp) \ # 设置检查点目录.trigger(processingTime8 seconds) \ # 定时触发每8秒处理一次数据.start() # 启动流式查询query.awaitTermination() # 等待流式查询终止在运行消费者程序即spark_ss_kafka_consumer.py时请确保kafka成功启动监控输入终端与监控输出端成功启动生产者程序成功启动若采用方式一启动消费者程序则可以等会生产者程序因为jar包下载可能时间过长长时间生产者程序会产生大量的数据若采用方式二启动消费者程序则确保启动消费者程序前启动生产者程序正如下方视频所示 运行消费者程序可以有两种方式 方式一 spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 spark_ss_kafka_consumer.py 使用了--packages参数指定了要从Maven仓库中下载并包含的依赖包其中org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0是要添加的Kafka相关依赖。 作用在运行应用程序时动态下载Kafka相关的依赖包并将其添加到类路径中以便应用程序能够访问这些依赖 运行后会解析包依赖并从Maven中心仓库下载所需的JAR包下载完成后进行运行但这种方法依赖于自身网络环境笔者这边因为是校园网贼慢故不再展示运行结果 方式二 在执行下列代码之前需要下载spark-sql-kafka-0-10_2.12-3.2.0.jar、kafka-clients-2.6.0.jar、commons-pool2-2.9.0.jar和spark-token-provider-kafka-0-10_2.12-3.2.0.jar文件笔者spark版本为spark 3.2.0、kafka版本为kafka_2.12-2.6.0读者请根据自己的版本调整jar版本的下载将其放到“/usr/local/spark/jars”目录下现附上下载地址 spark-sql-kafka-0-10_2.12-3.2.0.jar文件下载页面https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.2.0 kafka-clients-2.6.0.jar文件下载页面https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.6.0 commons-pool2-2.9.0.jar文件下载页面https://mvnrepository.com/artifact/org.apache.commons/commons-pool2/2.9.0 spark-token-provider-kafka-0-10_2.12-3.2.0.jar文件下载页面https://mvnrepository.com/artifact/org.apache.spark/spark-token-provider-kafka-0-10_2.12/3.2.0 若上述网站不能打开可尝试电脑连接手机热点或使用如下网址进行下载 链接https://pan.baidu.com/s/121zVsgc4muSt9rgCWnJZmw  提取码wkk6 spark-sql-kafka-0-10_2.12-3.2.0.jar文件下载页面 Central Repository: org/apache/spark/spark-sql-kafka-0-10_2.12/3.2.0 (maven.org) kafka-clients-2.6.0.jar文件下载页面Central Repository: org/apache/kafka/kafka-clients/2.6.0 (maven.org) commons-pool2-2.9.0.jar文件下载页面Central Repository: org/apache/commons/commons-pool2/2.9.0 (maven.org) spark-token-provider-kafka-0-10_2.12-3.2.0.jar文件下载页面Central Repository: org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.2.0 (maven.org) 下列两段代码二选一执行 spark-submit --jars /usr/local/spark/jars/* spark_ss_kafka_consumer.py 或 spark-submit --jars /usr/local/kafka/libs/*:/usr/local/spark/jars/* spark_ss_kafka_consumer.py 使用了--jars参数指定了要包含在类路径中的外部JAR包的路径 /usr/local/kafka/libs/*和/usr/local/spark/jars/*是要包含的Kafka和Spark相关的JAR包的路径 作用显式地指定要包含在类路径中的JAR包而不是动态下载依赖 运行如下所示同样可以设置输出日志级别来控制日志的输出在此不再赘述 视频版 structured streaming使用kafka源 GIF版 嘿嘿嘿博主贴心的准备了视频和动图两个版本读者可按需自取 就麻烦各位点个赞啦~~(*/ω*) 总结 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-topic在终端监控名为wordcount-topic的Kafka主题的输入信息 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-result-topic 在终端监控名为wordcount-result-topic的Kafka主题的输出信息 spark_ss_kafka_producer.py 生成随机的两个小写字母字符串并将其发送到wordcount-topic主题中 spark_ss_kafka_consumer.py 从wordcount-topic主题中读取消息对单词进行计数然后将结果写入wordcount-result-topic主题。该程序会持续运行并等待新的输入消息 如果依次执行了上述四个命令或代码可以得到以下结果 监控输入终端会显示从wordcount-topic主题中接收到的随机小写字母字符串监控输入终端会显示从wordcount-result-topic主题中接收到的单词计数结果生产者程序会不断地生成随机字符串并将其发送到wordcount-topic主题消费者程序会持续地从wordcount-topic主题中读取消息对单词进行计数并将结果写入wordcount-result-topic主题 如果只执行第一条命令和生产者程序那么会看到终端不断打印出随机的两个小写字母字符串而不会有单词计数或结果输出。 3.3 Socket源 Socket 源的选项option包括如下几个 host主机 IP地址或者域名必须设置。port端口号必须设置。includeTimestamp是否在数据行内包含时间戳。使用时间戳可以用来测试基于时间聚合的功能。 Socket源从一个本地或远程主机的某个端口服务上读取数据数据的编码为UTF8。因为Socket源使用内存保存读取到的所有数据并且远端服务不能保证数据在出错后可以使用检查点或者指定当前已处理的偏移量来重放数据所以它无法提供端到端的容错保障。Socket源一般仅用于测试或学习用途。 实例可参考二、编写Structured Streaming程序的基本步骤 3.4 Rate源 Rate源是一种用于生成模拟数据流的内置数据源。 Rate源可每秒生成特定个数的数据行每个数据行包括时间戳和值字段。时间戳是消息发送的时间值是从开始到当前消息发送的总个数从0开始。Rate源一般用来作为调试或性能基准测试。 Rate 源的选项option包括如下几个 rowsPerSecond每秒产生多少行数据默认为1。rampUpTime生成速度达到rowsPerSecond 需要多少启动时间使用比秒更精细的粒度将会被截断为整数秒默认为0秒。numPartitions使用的分区数默认为Spark的默认分区数。 Rate 源会尽可能地使每秒生成的数据量达到rowsPerSecond可以通过调整numPartitions以尽快达到所需的速度。这几个参数的作用类似一辆汽车从0加速到100千米/小时并以100千米/小时进行巡航的过程通过增加“马力”numPartitions可以使得加速时间rampUpTime更短。 可以用一小段代码来观察 Rate 源的数据行格式和生成数据的内容。 可以用以下代码来观察Rate源的数据行格式和生成数据的内容 在/home/hadoop/sparksj/mycode/structured/ratesource目录下新建文件spark_ss_rate.py from pyspark.sql import SparkSessionif __name__ __main__:# 创建一个 SparkSession 对象spark SparkSession \.builder \.appName(TestRateStreamSource) \.getOrCreate()# 设置日志级别为WARNspark.sparkContext.setLogLevel(WARN)# 从 Rate source 中读取数据流lines spark \.readStream \.format(rate) \.option(rowsPerSecond, 5) \.load()# 打印出数据流的 schemaprint(lines.schema)# 将数据流写入控制台query lines \.writeStream \.outputMode(update) \.format(console) \.option(truncate, false) \.start()# 等待流处理的终止query.awaitTermination() 在Linux终端执行spark_ss_rate.py cd /home/hadoop/sparksj/mycode/structured/ratesource spark-submit spark_ss_rate.py 输出的第一行即上图红框框住的那一行StruckType就是print(lines.schema)输出的数据行的格式。 当运行这段代码时它会生成模拟的连续数据流并将其写入控制台进行显示。输出结果会包含时间戳和生成的值。同时程序会持续运行直到手动终止或出现异常。 同4处理警告也可以设置日志输出等级来忽略警告将spark.sparkContext.setLogLevel(WARN)改为spark.sparkContext.setLogLevel(ERROR) 再次执行结果如下干净整洁~~(❁´◡❁)~~☆*: .. o(≧▽≦)o ..:*☆
http://www.dnsts.com.cn/news/203483.html

相关文章:

  • 六安网站建设推广建设工程公司岗位职责
  • html5企业网站 源码软件系统设计流程
  • 人和动物做的电影网站大型资讯门户网站怎么做排名
  • 云南省建设系统网站阳江网红店有哪些
  • 建设银行附近网站点肥西县重点建设局网站
  • 淘宝客网站开发常用的搜索引擎网址
  • 太原网站推广怎么做广西公路建设协会网站
  • 小说网站怎么用html做le网站源码
  • 淘宝客代理网站怎么做单页面seo优化
  • 上海做网站建设公司wordpress留学主题
  • 中山 网站建设一条龙玉林市网站开发公司
  • 温州市网站建设公司重庆网站开发服务
  • 郑州网站建设服务网站商场系统软件
  • 哪个餐饮店微网站做的有特色设计一个网站要多久
  • 什么网站做电子元器件全球有多少亿人口
  • 做外贸网站 深圳情侣视频被4万人围观
  • 怎么查网站注册信息辽宁建设工程信息网网站
  • 新手怎么做网站优化中文wordpress搭建
  • 邯郸单位网站建设ps做的网站怎么到网站上预览
  • 网站建设启示演出公司网站建设
  • 网站建设吕凡科技开封市住房和城乡建设局网站
  • wordpress自动汉化沈阳做网站优化
  • 站长工具seo推广 站长工具查询60平方旧房翻新装修要多少钱
  • 网站建设工作分工手机网站模板单页
  • 设计网站还有哪些问题在凡科做网站编辑
  • 福州市建设局网站上海市网站设计
  • 大连网站建设找哪家腾讯中国联通
  • 天津网站建设普斯泰网站分为几级页面
  • 政法网 网站建设山东建设厅网站专职安全员
  • 网站建设开发多少钱做网站好公司哪家好