SparkStreaming编程-DStream创建

Spark Streaming 原生支持一些不同的数据源。

RDD队列

用法说明

通过使用ssc.queueStream(queueOfRDDs)来创建DStream,
每一个推送到这个队列中的RDD,都会作为一个DStream处理。

案例实操

/** 
需求:循环创建几个 RDD,将 RDD 放入队列。
通过 Spark Streaming创建 Dstream,计算 WordCount
*/

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable

object RDDQueueDemo {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("RDDQueueDemo").setMaster("local[*]")
        // 创建SparkStreaming入口对象,且设置5s为时间间隔
        val scc = new StreamingContext(conf, Seconds(5))
        // 根据SparkStreaming入口对象创建Spark对象
        val sc = scc.sparkContext
        
        // 创建一个可变队列
        val queue: mutable.Queue[RDD[Int]] = mutable.Queue[RDD[Int]]()
        
        val rddDS: InputDStream[Int] = scc.queueStream(queue, true)
        rddDS.reduce(_ + _).print
        
        // 开始接收数据并计算
        scc.start
        
        // 循环的方式向队列中添加 RDD
        for (elem <- 1 to 5) {
            queue += sc.parallelize(1 to 100)
            Thread.sleep(2000)
        }
        
        // 等待计算结束(要么手动退出,要么出现异常)才退出主程序
        scc.awaitTermination()
    }
}

自定义数据源

使用及说明

其实就是自定义接收器
需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。

案例实操

自定义数据源,实现监控某个端口号,获取该端口号内容。
自定义数据源
object MySource{
    def apply(host: String, port: Int): MySource = new MySource(host, port)
}

class MySource(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
    /*
    接收器启动的时候调用该方法. This function must initialize all resources (threads, buffers, etc.) necessary for receiving data.
    这个函数内部必须初始化一些读取数据必须的资源
    该方法不能阻塞, 所以 读取数据要在一个新的线程中进行.
     */
    override def onStart(): Unit = {

        // 启动一个新的线程来接收数据
        new Thread("Socket Receiver"){
            override def run(): Unit = {
                receive()
            }
        }.start()
    }

    // 此方法用来接收数据
    def receive()={
        val socket = new Socket(host, port)
        val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
        var line: String = null
        // 当 receiver没有关闭, 且reader读取到了数据则循环发送给spark
        while (!isStopped && (line = reader.readLine()) != null ){
            // 发送给spark
            store(line)
        }
        // 循环结束, 则关闭资源
        reader.close()
        socket.close()

        // 重启任务
        restart("Trying to connect again")
    }
    override def onStop(): Unit = {
    }
}
使用自定义数据源
object MySourceDemo {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")
        // 1. 创建SparkStreaming的入口对象: StreamingContext  参数2: 表示事件间隔
        val ssc = new StreamingContext(conf, Seconds(5))
        // 2. 创建一个DStream
        val lines: ReceiverInputDStream[String] = ssc.receiverStream[String](MySource("hadoop201", 9999))
        // 3. 一个个的单词
        val words: DStream[String] = lines.flatMap(_.split("""\s+"""))
        // 4. 单词形成元组
        val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
        // 5. 统计单词的个数
        val count: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
        //6. 显示
        count.print
        //7. 启动流式任务开始计算
        ssc.start()
        //8. 等待计算结束才退出主程序
        ssc.awaitTermination()
        ssc.stop(false)
    }
}
开启端口
nc -lk 10000

Kafka数据源

用法及说明

在工程中需要引入 Maven 依赖 spark-streaming-kafka_2.11来使用它。
包内提供的 KafkaUtils 对象可以在 StreamingContext和JavaStreamingContext中以你的 Kafka 消息创建出 DStream。
两个核心类:KafkaUtils、KafkaCluster

导入依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

案例实操

高级API 1

import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object HighKafka {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")
        val ssc = new StreamingContext(conf, Seconds(3))
        
        // kafka 参数
        //kafka参数声明
        val brokers = "hadoop201:9092,hadoop202:9092,hadoop203:9092"
        val topic = "first"
        val group = "bigdata"
        val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
        val kafkaParams = Map(
            ConsumerConfig.GROUP_ID_CONFIG -> group,
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
         )
        val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
            ssc, kafkaParams, Set(topic))
        
        dStream.print()
        
        ssc.start()
        ssc.awaitTermination()
    }
}

高级API 2

import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object HighKafka2 {
    
    def createSSC(): StreamingContext = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")
        val ssc = new StreamingContext(conf, Seconds(3))
        // 偏移量保存在 checkpoint 中, 可以从上次的位置接着消费
        ssc.checkpoint("./ck1")
        // kafka 参数
        //kafka参数声明
        val brokers = "hadoop201:9092,hadoop202:9092,hadoop203:9092"
        val topic = "first"
        val group = "bigdata"
        val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
        val kafkaParams = Map(
            "zookeeper.connect" -> "hadoop201:2181,hadoop202:2181,hadoop203:2181",
            ConsumerConfig.GROUP_ID_CONFIG -> group,
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
        )
        val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
            ssc, kafkaParams, Set(topic))
        
        dStream.print()
        ssc
    }
    
    def main(args: Array[String]): Unit = {
        
        val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck1", () => createSSC())
        ssc.start()
        ssc.awaitTermination()
    }
}

低级API

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object LowKafka {
    // 获取 offset
    def getOffset(kafkaCluster: KafkaCluster, group: String, topic: String): Map[TopicAndPartition, Long] = {
        // 最终要返回的 Map
        var topicAndPartition2Long: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()
        
        // 根据指定的主体获取分区信息
        val topicMetadataEither: Either[Err, Set[TopicAndPartition]] = kafkaCluster.getPartitions(Set(topic))
        // 判断分区是否存在
        if (topicMetadataEither.isRight) {
            // 不为空, 则取出分区信息
            val topicAndPartitions: Set[TopicAndPartition] = topicMetadataEither.right.get
            // 获取消费消费数据的进度
            val topicAndPartition2LongEither: Either[Err, Map[TopicAndPartition, Long]] =
                kafkaCluster.getConsumerOffsets(group, topicAndPartitions)
            // 如果没有消费进度, 表示第一次消费
            if (topicAndPartition2LongEither.isLeft) {
                // 遍历每个分区, 都从 0 开始消费
                topicAndPartitions.foreach {
                    topicAndPartition => topicAndPartition2Long = topicAndPartition2Long + (topicAndPartition -> 0)
                }
            } else { // 如果分区有消费进度
                // 取出消费进度
                val current: Map[TopicAndPartition, Long] = topicAndPartition2LongEither.right.get
                topicAndPartition2Long ++= current
            }
        }
        // 返回分区的消费进度
        topicAndPartition2Long
    }
    
    // 保存消费信息
    def saveOffset(kafkaCluster: KafkaCluster, group: String, dStream: InputDStream[String]) = {
        
        dStream.foreachRDD(rdd => {
            var map: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()
            // 把 RDD 转换成HasOffsetRanges对
            val hasOffsetRangs: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
            // 得到 offsetRangs
            val ranges: Array[OffsetRange] = hasOffsetRangs.offsetRanges
            ranges.foreach(range => {
                // 每个分区的最新的 offset
                map += range.topicAndPartition() -> range.untilOffset
            })
            kafkaCluster.setConsumerOffsets(group,map)
        })
    }
    
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")
        val ssc = new StreamingContext(conf, Seconds(3))
        // kafka 参数
        //kafka参数声明
        val brokers = "hadoop201:9092,hadoop202:9092,hadoop203:9092"
        val topic = "first"
        val group = "bigdata"
        val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
        val kafkaParams = Map(
            "zookeeper.connect" -> "hadoop201:2181,hadoop202:2181,hadoop203:2181",
            ConsumerConfig.GROUP_ID_CONFIG -> group,
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
        )
        // 读取 offset
        val kafkaCluster = new KafkaCluster(kafkaParams)
        val fromOffset: Map[TopicAndPartition, Long] = getOffset(kafkaCluster, group, topic)
        val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
            ssc,
            kafkaParams,
            fromOffset,
            (message: MessageAndMetadata[String, String]) => message.message()
        )
        dStream.print()
        // 保存 offset
        saveOffset(kafkaCluster, group, dStream)
        ssc.start()
        ssc.awaitTermination()
    }
}

相关推荐

  1. SparkStreaming编程-DStream创建

    2024-06-07 18:50:05       11 阅读
  2. mysql编程--创建存储过程

    2024-06-07 18:50:05       7 阅读
  3. 详解 Spark Streaming 的 DStream 对象

    2024-06-07 18:50:05       10 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-07 18:50:05       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-07 18:50:05       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-07 18:50:05       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-07 18:50:05       20 阅读

热门阅读

  1. 【AIGC调研系列】chatTTS与GPT-SoVITS的对比优劣势

    2024-06-07 18:50:05       10 阅读
  2. Advantages of high pressure cleaners

    2024-06-07 18:50:05       10 阅读
  3. 六个搜索算法及其python实现

    2024-06-07 18:50:05       7 阅读
  4. 思考、学习、创造、越来越有趣

    2024-06-07 18:50:05       8 阅读
  5. 深入理解Nginx的Round-Robin负载均衡策略

    2024-06-07 18:50:05       11 阅读
  6. 获得抖音商品评论 API 返回值

    2024-06-07 18:50:05       6 阅读