详解 Spark Streaming 的 DStream 对象

一、DStream 的创建

1. 通过 RDD 队列

DStream 在内部实现上是一系列连续的 RDD 来表示。每个 RDD 包含有采集周期内的数据

/**
基本语法:StreamingContext.queueStream(queueOfRDDs: Queue, oneAtATime = false)
*/
object DStreamFromRddQueue {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("ds")
        val ssc = new StreamingContext(conf, Seconds(3))
        
        val queueOfRdds = mutable.Queue[RDD[Int]]()
        
        val ds = ssc.queueStream(queueOfRdds, oneAtATime = false)
        ds.print()
        
        ssc.start()
        // 向 RDD 队列中添加元素
        for(i <- 1 to 5) {
            queueOfRdds += ssc.sparkContext.makeRDD(1 to 300, 10)
			Thread.sleep(2000)
        }
        
        ssc.awaitTermination()
    }
}

2. 通过自定义数据源

通过继承 Receiver 抽象类,并实现 onStart、onStop 方法来自定义数据源采集

/**
	实现步骤:
	1.继承 Receiver[T]() 抽象类,定义泛型,并传递参数
		1.1 泛型是采集的数据类型
		1.2 传递的参数是存储级别,StorageLevel 中的枚举值
	2.实现 onStart、onStop 方法
	3.使用 receiverStream(receiver) 创建 DStream
*/
object DStreamFromDiy {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("ds")
        val ssc = new StreamingContext(conf, Seconds(3))
        
        // 使用自定义数据源采集数据
        val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
        ds.print()
        
        ssc.start()
        ssc.awaitTermination()
    }
}

// 自定义数据源采集
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    private val flag = true
    // 当 ssc.start() 调用后,启动一个独立的线程去采集数据
    override def onStart(): Unit = {
        new Thread(new Runnable(){
            override def run() {
                while(flag) {
                	val data = "数据为:" + new Random().nextInt(10)
                    // 将数据存储封装为 DStream
                    store(data)
                    
                    Thread.sleep(500)
                }
            }
        }, "receiver").start()
    }
    
    // 停止数据采集
    override def onStop(): Unit = {
        flag = false
    }
}

3. 通过 Kafka 数据源

3.1 版本选型
  • ReceiverAPI:需要一个专门的 Executor 去接收数据,然后发送给其他的 Executor 做计算。所以当接收数据的 Executor 和计算的 Executor 速度不同时,特别在接收数据的 Executor 速度大于计算的 Executor 速度时,会导致计算数据的节点内存溢出。(早期版本中提供此方式,当前版本不适用)
  • DirectAPI:是由计算的 Executor 来主动接收消费 Kafka 的数据,速度由自身控制
3.2 实现
  • 引入依赖

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.10.1</version> 
    </dependency>
    
  • 编码

    /**
    基本语法:使用 KafkaUtils 工具类的 createDirectStream[K, V] 方法连接 Kafka 创建
    相关参数:
    	1.StreamingContext:环境对象
    	2.LocationStrategies:位置策略,PreferConsistent 表示自动匹配
    	3.ConsumerStrategies:消费策略,Subscribe[K,V](Set(topic)) 订阅主题
    	4.Map[String, Object]:Kafka 连接配置参数
    */
    object DStreamFromKafka {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setMaster("local[*]").setAppName("ds")
            val ssc = new StreamingContext(conf, Seconds(3))
            
            // 封装 Kafka 配置参数
            val kafkaConf: Map[String, Object] = Map[String, Object](
            	ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->  "linux1:9092,linux2:9092,linux3:9092",
    			ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
    			"key.deserializer" ->  "org.apache.kafka.common.serialization.StringDeserializer",
    			"value.deserializer" ->  "org.apache.kafka.common.serialization.StringDeserializer"
            )
            
            // 创建 Kafka 数据源的 DStream
            val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
                ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String, String](Set("topic1")),
                kafkaConf
            )
            
            // 打印输出
            val data: DStream[String] = ds.map(_.value())
            data.print()
            
            ssc.start()
            ssc.awaitTermination()
        }
    }
    
  • 测试

    • 启动 Zookeeper 和 Kafka 集群
    • 运行程序 main 方法
    • 向 Kafka 的主题中生产数据,并查看程序控制台输出

二、DStream 的转换

1. 无状态转换操作

无状态的操作只作用于一个采集周期的 RDD 中,不同采集周期的 RDD 之间的操作结果不会归约汇总

1.1 常见操作
/**
常见原语:map/flatMap/filter/repartition/reduceByKey/groupByKey
*/
object DStreamNoStateChange {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("ds")
        val ssc = new StreamingContext(conf, Seconds(3))
        
        val word = ssc.socketTextStream("localhost", 9999)
        val wordAsOne = line.map((_, 1))
        
        val wordCount = wordAsOne.reduceByKey(_ + _)
        
        wordCount.print()
        /*
        测试:在 cmd 窗口执行 nc -lp 999,然后分次输入 10 个 hello
        结果:由于采集周期为 3 秒,所以输出结果为多个 (hello, num),数量与采集周期个数一致,不同的采集周期结果是独立输出的
        	
        */
        
        ssc.start()
        ssc.awaitTermination()
    }
}

1.2 transform
/**
功能:可以将 DStream 中底层的 RDD 获取进行操作,可以扩展功能和实现周期性代码执行
基本语法:Dstream.transform(func: RDD => RDD): Dstream
*/
object DStreamTransform {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("ds")
        val ssc = new StreamingContext(conf, Seconds(3))
        
        val word = ssc.socketTextStream("localhost", 9999)
        
        word.transform(
            rdd => {
                // Driver端:此处的代码会周期性的执行,每个采集周期执行一次
                rdd.map(
                    str => {
                        // Executor 端
                    	str    
                    }
                )
            }
        )
        
        ssc.start()
        ssc.awaitTermination()
    }
}
1.3 join
/**
功能:对当前批次(采集周期)内的两个 DStream 中各自的 RDD 中相同的 key 进行 join,效果与两个 RDD 的 join 相同
基本语法:Dstream1.join(Dstream2)
*/
object DStreamTransform {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("ds")
        val ssc = new StreamingContext(conf, Seconds(3))
        
        val ds9999 = ssc.socketTextStream("localhost", 9999)
        val ds8888 = ssc.socketTextStream("localhost", 8888)
        
        val data: DStream[(String, (Int, Int))] = ds9999.map((_, 1)).join(ds888.map((_, 2)))
        data.print()
        
        ssc.start()
        ssc.awaitTermination()
    }
}

2. 有状态转换操作

有状态转换操作会将一个采集周期的结果(状态)保存到检查点,并且不断将下一个采集周期的结果(状态)更新保存到检查点中,最终输出所有采集周期归约汇总的结果

2.1 updateStateByKey
/**
	基本语法:DStream.updateStateByKey(func: (seq: Seq[T], op: Option[T]) => op)
	参数:
		1.seq 表示当前采集周期相同 key 的 Value 集合
		2.op 表示检查点中相同 key 的总 Value (Some 或 None)
	说明:
		1.使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态
		2.updateStateByKey会根据 key 对数据的状态进行更新
*/
object DStreamStateChange {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("ds")
        val ssc = new StreamingContext(conf, Seconds(3))
        
        // 必须设置检查点保存路径
        ssc.checkpoint("cp")
        
        val word = ssc.socketTextStream("localhost", 9999)
        val wordAsOne = line.map((_, 1))
        
        // val wordCount = wordAsOne.reduceByKey(_ + _)
        val wordCount = wordAsOne.updateStateByKey(
        	(seq: Seq[Int], op: Option[Int]) => {
                val sum = seq.sum
                val newVal = op.getOrElse(0) + sum
                Option(newVal)
            }
        )
        
        wordCount.print()
        /*
        	测试:在 cmd 窗口执行 nc -lp 999,然后分次输入 10 个 hello
        	结果:最终的输出结果为 (hello, 10)
        */
        
        ssc.start()
        ssc.awaitTermination()
    }
}

2.2 window 操作
/**
	基本语法:
    1.DStream.window(windowSize: Duration, step: Duration)
        参数:
            1.windowSize 表示窗口大小
            2.step 表示窗口滑动步长
        说明:
            1.窗口大小和步长必须为采集周期大小的整数倍
            2.步长默认为一个采集周期大小
            
    2.countByWindow(windowSize: Duration, step: Duration):统计滑动窗口计数流中的元素个数
	3.reduceByWindow(func, windowSize: Duration, step: Duration):通过自定义函数聚合滑动窗口流中的元素
	4.reduceByKeyAndWindow(func, windowSize: Duration, step: Duration, [numTasks]):通过自定义函数聚合滑动窗口流中相同 key 的 value
	5.reduceByKeyAndWindow(func, invFunc, windowSize: Duration, step: Duration, [numTasks])
		参数说明:
			1.func 表示窗口中相同 key 的聚合计算方式
			2.invFunc 表示删除在窗口滑动后不再存在的数据值
*/
object DStreamWindow {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("ds")
        val ssc = new StreamingContext(conf, Seconds(3))
        
        ssc.checkpoint("cp")
        
        val word = ssc.socketTextStream("localhost", 9999)
        val wordAsOne = line.map((_, 1))
        
        // val ds = wordAsOne.window(Seconds(6)) // 会有重复数据
        val ds = wordAsOne.window(Seconds(6), Seconds(6))
        
        val wordCount = ds.reduceByKey(_ + _)
        
        // 必须设置检查点保存路径
        val wordCount1 = wordAsOne.reduceByKeyAndWindow(
        	(x: Int, y: Int) => x + y,
            (x: Int, y: Int) => x - y,
            Seconds(6), 
            Seconds(6)
        )
        
        wordCount.print()
        // wordCount1.print()
		
        ssc.start()
        ssc.awaitTermination()
    }
}

三、DStream 的输出

SparkStreaming 也有惰性机制,执行输出操作才会触发所有 DStream 计算的执行

/**
	基本语法:
        1.print():将 DStream 输出到控制台,只有这个输出会带时间戳
        2.saveAsTextFiles(prefix, [suffix]):将 DStream 保存为 text 格式文件,每一批次的存储文件名基于参数中的 prefix 和 suffix (prefix-Time_IN_MS[.suffix])
        3.saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 DStream 中的数据保存为
    SequenceFiles,每一批次的存储文件名为 "prefix-TIME_IN_MS[.suffix]"
        4.saveAsHadoopFiles(prefix, [suffix]):将 DStream 中的数据保存为 Hadoop files,每一批次的存储文件名为 "prefix-TIME_IN_MS[.suffix]"
        5.foreachRDD(func):最通用的输出操作,将函数 func 用于 DStream 的每一个 RDD,可以将 RDD 存入文件或者通过网络将其写入数据库
	
	说明:使用foreachRDD(func)把数据写到 MySQL 的外部数据库的注意事项:
        1.创建连接对象不能写在 driver 层面(因为所有的连接对象都不能序列化)
        2.如果写在 foreachRDD 中则每个 RDD 中的每一条数据都会创建连接,影响性能和资源;
        3.推荐使用 RDD 的 foreachPartition() 算子,在每个分区迭代中创建连接
*/
object DStreamOutput {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("ds")
        val ssc = new StreamingContext(conf, Seconds(3))
        
        val word = ssc.socketTextStream("localhost", 9999)
        val wordAsOne = line.map((_, 1))
        
        val wordCount = wordAsOne.reduceByKey(_ + _)
                
        // wordCount.print() // SparkStreaming 没有输出操作会报错
		
        wordCount.foreachRDD(
        	rdd => {
                rdd.foreach(println)
            }
        )
        
        ssc.start()
        ssc.awaitTermination()
    }
}

四、SparkStreaming 优雅的关闭

SparkStreaming 任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,而分布式程序没办法做到一个个进程去停止,所以需要使用第三方系统 (MySQL/Redis/Zookeepr/HDFS) 来控制内部程序关闭

import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}

class MonitorStop(ssc: StreamingContext) extends Runnable {
	override def run(): Unit = {
		val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "hello")
		while(true) {
			try{
				Thread.sleep(5000)
			} catch {
				case e: InterruptedException => e.printStackTrace()
			}
            
			val state: StreamingContextState = ssc.getState
			val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark"))
			if(bool) {
				if(state == StreamingContextState.ACTIVE) {
                    // 优雅地关闭,停止接收新数据,并将已有的数据处理完后再关闭
					ssc.stop(stopSparkContext = true, stopGracefully = true)
					System.exit(0)
				}
			}
		}
	}
}


import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkTest {
    
	def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
		val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: 
Option[Int]) => {
            //当前批次内容的计算
            val sum: Int = values.sum
            //取出状态信息中上一次状态 
            val lastStatu: Int = status.getOrElse(0)
            Some(sum + lastStatu)
        }
        
		val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkTest")
        
		//设置优雅的关闭
		sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
        
		val ssc = new StreamingContext(sparkConf, Seconds(5))
		ssc.checkpoint("./ck")
        
		val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
		val word: DStream[String] = line.flatMap(_.split(" "))
		val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
		val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
		wordAndCount.print()
		ssc
	}
    
	def main(args: Array[String]): Unit = {
        // 从检查点恢复数据
		val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC())
		new Thread(new MonitorStop(ssc)).start()
		ssc.start()
		ssc.awaitTermination()
	}
    
}

相关推荐

  1. 详解 Spark Streaming DStream 对象

    2024-06-06 10:00:04       10 阅读
  2. SparkStreaming编程-DStream创建

    2024-06-06 10:00:04       10 阅读
  3. es6对于Promise 对象详解(2024-04-11)

    2024-06-06 10:00:04       21 阅读
  4. 面向对象编程中详解

    2024-06-06 10:00:04       6 阅读
  5. 对象存储详细介绍

    2024-06-06 10:00:04       12 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-06 10:00:04       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-06 10:00:04       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-06 10:00:04       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-06 10:00:04       18 阅读

热门阅读

  1. 机器学习_决策树与随机森林

    2024-06-06 10:00:04       8 阅读
  2. web3之女巫(sybil)

    2024-06-06 10:00:04       8 阅读
  3. linux常用命令及用法

    2024-06-06 10:00:04       8 阅读
  4. 完整状态码面试背

    2024-06-06 10:00:04       6 阅读
  5. 服务器端口聚合

    2024-06-06 10:00:04       6 阅读
  6. Python 快速入门

    2024-06-06 10:00:04       6 阅读
  7. CSS Web前端框架:深入剖析与应用实践

    2024-06-06 10:00:04       9 阅读