[flink] flink macm1pro 快速使用从零到一

文章目录

快速使用

  1. 打开 https://flink.apache.org/downloads/ 下载 flink

因为书籍介绍的是 1.12版本的,为避免不必要的问题,下载相同版本

image.png
image.png

  1. 解压
 tar -xzvf flink-1.11.2-bin-scala_2.11.tgz

image.png

  1. 启动 flink
./bin/start-cluster.sh

image.png

  1. 打开 flink web 页面 localhost:8081

image.png

  1. 编写结合 Kafka 词频统计程序

具体参考 https://weread.qq.com/web/reader/51032ac07236f8e05107816k1f032c402131f0e3dad99f3?

package org.example;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class WordCountKafkaInStdOut {

    public static void main(String[] args) throws Exception {

        // 设置Flink执行环境 
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka参数 
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-group");
        String inputTopic = "Shakespeare";
        String outputTopic = "WordCount";

        // Source 
        FlinkKafkaConsumer<String> consumer =
            new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(),
                properties);
        DataStream<String> stream = env.addSource(consumer);

        // Transformation 
        // 使用Flink  API对输入流的文本进行操作 
        // 按空格切词、计数、分区、设置时间窗口、聚合 
        DataStream<Tuple2<String, Integer>> wordCount = stream
            .flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
                String[] tokens = line.split("\\s");
                // 输出结果  
                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<>(token, 1));
                    }
                }
            })
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);

        // Sink 
        wordCount.print();

        // execute 
        env.execute("kafka streaming word count");

    }
} 
  1. 打包应用(当然在这之前需要本地调试一下,至少得运行通吧😄)
  2. 使用Flink提供的命令行工具flink,将打包好的作业提交到集群上。命令行的参数 --class 用来指定哪个主类作为入口。
./bin/flink run --class org.example.WordCountKafkaInStdOut xxtarget/flink_study-1.0-SNAPSHOT.jar

class 建议直接拷贝引用
image.png

  1. web 页面查看作业提交成功

image.png

  1. kafka 生产者随便发点消息

image.png

  1. 查看作业日志,词频统计结果

image.png
image.png

  1. 关闭 flink
./bin/stop-cluster.sh

image.png

相关推荐

  1. Django :pip 基本使用

    2024-04-01 14:50:02       23 阅读
  2. vue创建项目?

    2024-04-01 14:50:02       50 阅读
  3. 1设计一个丧葬行业小程序

    2024-04-01 14:50:02       49 阅读
  4. 屎山系列-游戏开发(Day1)

    2024-04-01 14:50:02       32 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-01 14:50:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-01 14:50:02       100 阅读
  3. 在Django里面运行非项目文件

    2024-04-01 14:50:02       82 阅读
  4. Python语言-面向对象

    2024-04-01 14:50:02       91 阅读

热门阅读

  1. python面试题(51~60)

    2024-04-01 14:50:02       42 阅读
  2. 学习总结!

    2024-04-01 14:50:02       45 阅读
  3. fpga_awb

    fpga_awb

    2024-04-01 14:50:02      34 阅读
  4. Day1 - Hive基础知识

    2024-04-01 14:50:02       40 阅读
  5. 在Debian 11上安装GCC

    2024-04-01 14:50:02       38 阅读
  6. static修饰的方法为什么不能被覆盖?

    2024-04-01 14:50:02       42 阅读
  7. leetcode93.复原IP地址

    2024-04-01 14:50:02       39 阅读
  8. npm 与 yarn 命令比较

    2024-04-01 14:50:02       41 阅读
  9. Spring与Spring Boot的区别

    2024-04-01 14:50:02       36 阅读