做网站那个服务器好,深圳装修公司口碑排名,qq企业邮箱登录,2021热点新闻事件一、DStream 的创建
1. 通过 RDD 队列 DStream 在内部实现上是一系列连续的 RDD 来表示。每个 RDD 包含有采集周期内的数据 /**
基本语法#xff1a;StreamingContext.queueStream(queueOfRDDs: Queue, oneAtATime false)
*/
object DStreamFromRddQueue {def main(args: Ar…一、DStream 的创建
1. 通过 RDD 队列 DStream 在内部实现上是一系列连续的 RDD 来表示。每个 RDD 包含有采集周期内的数据 /**
基本语法StreamingContext.queueStream(queueOfRDDs: Queue, oneAtATime false)
*/
object DStreamFromRddQueue {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(ds)val ssc new StreamingContext(conf, Seconds(3))val queueOfRdds mutable.Queue[RDD[Int]]()val ds ssc.queueStream(queueOfRdds, oneAtATime false)ds.print()ssc.start()// 向 RDD 队列中添加元素for(i - 1 to 5) {queueOfRdds ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}
}2. 通过自定义数据源 通过继承 Receiver 抽象类并实现 onStart、onStop 方法来自定义数据源采集 /**实现步骤1.继承 Receiver[T]() 抽象类定义泛型并传递参数1.1 泛型是采集的数据类型1.2 传递的参数是存储级别StorageLevel 中的枚举值2.实现 onStart、onStop 方法3.使用 receiverStream(receiver) 创建 DStream
*/
object DStreamFromDiy {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(ds)val ssc new StreamingContext(conf, Seconds(3))// 使用自定义数据源采集数据val ds: ReceiverInputDStream[String] ssc.receiverStream(new MyReceiver())ds.print()ssc.start()ssc.awaitTermination()}
}// 自定义数据源采集
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {private val flag true// 当 ssc.start() 调用后启动一个独立的线程去采集数据override def onStart(): Unit {new Thread(new Runnable(){override def run() {while(flag) {val data 数据为 new Random().nextInt(10)// 将数据存储封装为 DStreamstore(data)Thread.sleep(500)}}}, receiver).start()}// 停止数据采集override def onStop(): Unit {flag false}
}3. 通过 Kafka 数据源
3.1 版本选型
ReceiverAPI需要一个专门的 Executor 去接收数据然后发送给其他的 Executor 做计算。所以当接收数据的 Executor 和计算的 Executor 速度不同时特别在接收数据的 Executor 速度大于计算的 Executor 速度时会导致计算数据的节点内存溢出。(早期版本中提供此方式当前版本不适用)DirectAPI是由计算的 Executor 来主动接收消费 Kafka 的数据速度由自身控制
3.2 实现 引入依赖 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion3.0.0/version
/dependency
dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-core/artifactIdversion2.10.1/version
/dependency编码 /**
基本语法使用 KafkaUtils 工具类的 createDirectStream[K, V] 方法连接 Kafka 创建
相关参数1.StreamingContext环境对象2.LocationStrategies位置策略PreferConsistent 表示自动匹配3.ConsumerStrategies消费策略Subscribe[K,V](Set(topic)) 订阅主题4.Map[String, Object]Kafka 连接配置参数
*/
object DStreamFromKafka {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(ds)val ssc new StreamingContext(conf, Seconds(3))// 封装 Kafka 配置参数val kafkaConf: Map[String, Object] Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - linux1:9092,linux2:9092,linux3:9092,ConsumerConfig.GROUP_ID_CONFIG - atguigu,key.deserializer - org.apache.kafka.common.serialization.StringDeserializer,value.deserializer - org.apache.kafka.common.serialization.StringDeserializer)// 创建 Kafka 数据源的 DStreamval ds: InputDStream[ConsumerRecord[String, String]] KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set(topic1)),kafkaConf)// 打印输出val data: DStream[String] ds.map(_.value())data.print()ssc.start()ssc.awaitTermination()}
}测试 启动 Zookeeper 和 Kafka 集群运行程序 main 方法向 Kafka 的主题中生产数据并查看程序控制台输出
二、DStream 的转换
1. 无状态转换操作 无状态的操作只作用于一个采集周期的 RDD 中不同采集周期的 RDD 之间的操作结果不会归约汇总 1.1 常见操作
/**
常见原语map/flatMap/filter/repartition/reduceByKey/groupByKey
*/
object DStreamNoStateChange {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(ds)val ssc new StreamingContext(conf, Seconds(3))val word ssc.socketTextStream(localhost, 9999)val wordAsOne line.map((_, 1))val wordCount wordAsOne.reduceByKey(_ _)wordCount.print()/*测试在 cmd 窗口执行 nc -lp 999然后分次输入 10 个 hello结果由于采集周期为 3 秒所以输出结果为多个 (hello, num)数量与采集周期个数一致不同的采集周期结果是独立输出的*/ssc.start()ssc.awaitTermination()}
}
1.2 transform
/**
功能可以将 DStream 中底层的 RDD 获取进行操作可以扩展功能和实现周期性代码执行
基本语法Dstream.transform(func: RDD RDD): Dstream
*/
object DStreamTransform {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(ds)val ssc new StreamingContext(conf, Seconds(3))val word ssc.socketTextStream(localhost, 9999)word.transform(rdd {// Driver端此处的代码会周期性的执行每个采集周期执行一次rdd.map(str {// Executor 端str })})ssc.start()ssc.awaitTermination()}
}1.3 join
/**
功能对当前批次(采集周期)内的两个 DStream 中各自的 RDD 中相同的 key 进行 join效果与两个 RDD 的 join 相同
基本语法Dstream1.join(Dstream2)
*/
object DStreamTransform {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(ds)val ssc new StreamingContext(conf, Seconds(3))val ds9999 ssc.socketTextStream(localhost, 9999)val ds8888 ssc.socketTextStream(localhost, 8888)val data: DStream[(String, (Int, Int))] ds9999.map((_, 1)).join(ds888.map((_, 2)))data.print()ssc.start()ssc.awaitTermination()}
}2. 有状态转换操作 有状态转换操作会将一个采集周期的结果(状态)保存到检查点并且不断将下一个采集周期的结果(状态)更新保存到检查点中最终输出所有采集周期归约汇总的结果 2.1 updateStateByKey
/**基本语法DStream.updateStateByKey(func: (seq: Seq[T], op: Option[T]) op)参数1.seq 表示当前采集周期相同 key 的 Value 集合2.op 表示检查点中相同 key 的总 Value (Some 或 None)说明1.使用 updateStateByKey 需要对检查点目录进行配置会使用检查点来保存状态2.updateStateByKey会根据 key 对数据的状态进行更新
*/
object DStreamStateChange {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(ds)val ssc new StreamingContext(conf, Seconds(3))// 必须设置检查点保存路径ssc.checkpoint(cp)val word ssc.socketTextStream(localhost, 9999)val wordAsOne line.map((_, 1))// val wordCount wordAsOne.reduceByKey(_ _)val wordCount wordAsOne.updateStateByKey((seq: Seq[Int], op: Option[Int]) {val sum seq.sumval newVal op.getOrElse(0) sumOption(newVal)})wordCount.print()/*测试在 cmd 窗口执行 nc -lp 999然后分次输入 10 个 hello结果最终的输出结果为 (hello, 10)*/ssc.start()ssc.awaitTermination()}
}
2.2 window 操作
/**基本语法1.DStream.window(windowSize: Duration, step: Duration)参数1.windowSize 表示窗口大小2.step 表示窗口滑动步长说明1.窗口大小和步长必须为采集周期大小的整数倍2.步长默认为一个采集周期大小2.countByWindow(windowSize: Duration, step: Duration)统计滑动窗口计数流中的元素个数3.reduceByWindow(func, windowSize: Duration, step: Duration)通过自定义函数聚合滑动窗口流中的元素4.reduceByKeyAndWindow(func, windowSize: Duration, step: Duration, [numTasks])通过自定义函数聚合滑动窗口流中相同 key 的 value5.reduceByKeyAndWindow(func, invFunc, windowSize: Duration, step: Duration, [numTasks])参数说明1.func 表示窗口中相同 key 的聚合计算方式2.invFunc 表示删除在窗口滑动后不再存在的数据值
*/
object DStreamWindow {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(ds)val ssc new StreamingContext(conf, Seconds(3))ssc.checkpoint(cp)val word ssc.socketTextStream(localhost, 9999)val wordAsOne line.map((_, 1))// val ds wordAsOne.window(Seconds(6)) // 会有重复数据val ds wordAsOne.window(Seconds(6), Seconds(6))val wordCount ds.reduceByKey(_ _)// 必须设置检查点保存路径val wordCount1 wordAsOne.reduceByKeyAndWindow((x: Int, y: Int) x y,(x: Int, y: Int) x - y,Seconds(6), Seconds(6))wordCount.print()// wordCount1.print()ssc.start()ssc.awaitTermination()}
}三、DStream 的输出 SparkStreaming 也有惰性机制执行输出操作才会触发所有 DStream 计算的执行 /**基本语法1.print()将 DStream 输出到控制台只有这个输出会带时间戳2.saveAsTextFiles(prefix, [suffix])将 DStream 保存为 text 格式文件每一批次的存储文件名基于参数中的 prefix 和 suffix (prefix-Time_IN_MS[.suffix])3.saveAsObjectFiles(prefix, [suffix])以 Java 对象序列化的方式将 DStream 中的数据保存为SequenceFiles每一批次的存储文件名为 prefix-TIME_IN_MS[.suffix]4.saveAsHadoopFiles(prefix, [suffix])将 DStream 中的数据保存为 Hadoop files每一批次的存储文件名为 prefix-TIME_IN_MS[.suffix]5.foreachRDD(func)最通用的输出操作将函数 func 用于 DStream 的每一个 RDD可以将 RDD 存入文件或者通过网络将其写入数据库说明使用foreachRDD(func)把数据写到 MySQL 的外部数据库的注意事项1.创建连接对象不能写在 driver 层面因为所有的连接对象都不能序列化2.如果写在 foreachRDD 中则每个 RDD 中的每一条数据都会创建连接影响性能和资源3.推荐使用 RDD 的 foreachPartition() 算子在每个分区迭代中创建连接
*/
object DStreamOutput {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(ds)val ssc new StreamingContext(conf, Seconds(3))val word ssc.socketTextStream(localhost, 9999)val wordAsOne line.map((_, 1))val wordCount wordAsOne.reduceByKey(_ _)// wordCount.print() // SparkStreaming 没有输出操作会报错wordCount.foreachRDD(rdd {rdd.foreach(println)})ssc.start()ssc.awaitTermination()}
}四、SparkStreaming 优雅的关闭 SparkStreaming 任务需要 7*24 小时执行但是有时涉及到升级代码需要主动停止程序而分布式程序没办法做到一个个进程去停止所以需要使用第三方系统 (MySQL/Redis/Zookeepr/HDFS) 来控制内部程序关闭 import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}class MonitorStop(ssc: StreamingContext) extends Runnable {override def run(): Unit {val fs: FileSystem FileSystem.get(new URI(hdfs://linux1:9000), new Configuration(), hello)while(true) {try{Thread.sleep(5000)} catch {case e: InterruptedException e.printStackTrace()}val state: StreamingContextState ssc.getStateval bool: Boolean fs.exists(new Path(hdfs://linux1:9000/stopSpark))if(bool) {if(state StreamingContextState.ACTIVE) {// 优雅地关闭停止接收新数据并将已有的数据处理完后再关闭ssc.stop(stopSparkContext true, stopGracefully true)System.exit(0)}}}}
}import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkTest {def createSSC(): _root_.org.apache.spark.streaming.StreamingContext {val update: (Seq[Int], Option[Int]) Some[Int] (values: Seq[Int], status:
Option[Int]) {//当前批次内容的计算val sum: Int values.sum//取出状态信息中上一次状态 val lastStatu: Int status.getOrElse(0)Some(sum lastStatu)}val sparkConf: SparkConf new SparkConf().setMaster(local[4]).setAppName(SparkTest)//设置优雅的关闭sparkConf.set(spark.streaming.stopGracefullyOnShutdown, true)val ssc new StreamingContext(sparkConf, Seconds(5))ssc.checkpoint(./ck)val line: ReceiverInputDStream[String] ssc.socketTextStream(linux1, 9999)val word: DStream[String] line.flatMap(_.split( ))val wordAndOne: DStream[(String, Int)] word.map((_, 1))val wordAndCount: DStream[(String, Int)] wordAndOne.updateStateByKey(update)wordAndCount.print()ssc}def main(args: Array[String]): Unit {// 从检查点恢复数据val ssc: StreamingContext StreamingContext.getActiveOrCreate(./ck, () createSSC())new Thread(new MonitorStop(ssc)).start()ssc.start()ssc.awaitTermination()}}