spark: 从pulsar中读取数据

一、依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>pulsar-demo2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <pulsar.version>2.8.0</pulsar.version>
        <jackson.version>2.10.5</jackson.version>
        <!--<jackson.version>2.6.7</jackson.version>-->

    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client-all</artifactId>
            <version>${pulsar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client-kafka</artifactId>
            <version>${pulsar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-spark</artifactId>
            <version>${pulsar.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-streaming_2.10</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

<!--

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>-->


        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        
    </dependencies>

</project>

二、demo程序

package cn.edu.tju;

import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.spark.SparkStreamingPulsarReceiver;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import java.util.HashSet;
import java.util.Set;

public class SparkTest {
    public static void main(String[] args) throws Exception{
        String serviceUrl = "pulsar://xx.xx.xx.xx:6650/";
        String topic = "persistent://public/default/my-topic11";
        String subs = "test_sub";

        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(4));

        ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();

        Set<String> set = new HashSet<>();
        set.add(topic);
        pulsarConf.setTopicNames(set);
        pulsarConf.setSubscriptionName(subs);

        SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
                serviceUrl,
                pulsarConf,
                new AuthenticationDisabled());

        JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

        JavaDStream<String> resultStream = lineDStream.map(new Function<byte[], String>() {
            @Override
            public String call(byte[] bytes) throws Exception {
                return new String(bytes);
            }
        });

        resultStream.print();

        jsc.start();
        jsc.awaitTermination();
    }
}

相关推荐

  1. spark: pulsar读取数据

    2024-03-29 13:44:03       45 阅读
  2. flink: pulsar读取数据

    2024-03-29 13:44:03       39 阅读
  3. 【C语言】如何文件读取数据

    2024-03-29 13:44:03       41 阅读
  4. spark读取及存储数据

    2024-03-29 13:44:03       47 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-29 13:44:03       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-29 13:44:03       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-29 13:44:03       87 阅读
  4. Python语言-面向对象

    2024-03-29 13:44:03       97 阅读

热门阅读

  1. ESXi for ARM 1.15

    2024-03-29 13:44:03       40 阅读
  2. 力扣爆刷第107天之CodeTop100五连刷21-25

    2024-03-29 13:44:03       40 阅读
  3. Oracle清理闪回日志

    2024-03-29 13:44:03       35 阅读
  4. 【前端基础】使用 typeof 进行类型判断注意点

    2024-03-29 13:44:03       37 阅读
  5. 【计算机网络】计算机网络体系结构简要说明

    2024-03-29 13:44:03       36 阅读
  6. 使用VSCode搭建Vue 3开发环境

    2024-03-29 13:44:03       47 阅读