详解 Flink 流处理 API

Flink 新版本已经实现了流批一体处理,DataSet API 将被弃用,官方推荐统一使用 DataStream API 处理流数据和批数据

一、Environment

Environment 表示 Flink 执行上下文环境,可以分为本地环境和集群环境,创建方法分别为 createLocalEnvironment 和 createRemoteEnvironment,而使用 getExecutionEnvironment 方法会根据当前运行环境智能地选择创建本地或集群上下文环境对象,推荐使用该方法

public class TestFlink {
    public static void main(String[] args) {
        //创建本地环境对象,可以指定并行度,默认是CPU核心数
        StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
        
        //创建远程集群环境对象
        StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
            "host", //JobManager 主机名
            1234, //JobManager 进程端口号
            "path/to/jarFile.jar"  //提交给 JobManager 的 JAR 包
        );
        
        //自动选择创建对应环境对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    }
}

二、Source

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)

1. 从集合中读取

//定义封装数据的POJO类
//传感器温度读数类
public class SensorReading {
    private String id; //传感器id
    private Long timestamp; //读数时间戳
    private Double temperature; //温度
    
    //空参构造器
    public SensorReading() {}
    
    //带参构造器
    public SensorReading(String id, Long timestamp, Double temperature) {
        this.id = id;
        this.timestamp = timestamp;
        this.temperature = temperature;
    }
    
    //getter和setter
    public String getId() {
        return id;
    }
    
    public void setId(String id) {
        this.id = id;
    }
    
    public Long getTimestamp() {
        return timestamp;
    }
    
    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }
    
    public Double getTemperature() {
        return temperature;
    }
    
    public void setTemperature(Double temperature) {
        this.temperature= temperature;
    }
    
    //toString
    @Override
    public String toString() {
        return "SensorReading{id="+this.id+",timestamp="+this.timestamp+",temperature="+this.temperature+"}";
    }
}

public class SourceFromCollection {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //从集合中读取数据
        //fromCollection
        DataStream<SensorReading> dataSource = env.fromCollection(Arrays.asList(
        	new SensorReading("sensor_1", 1547718199L, 35.8),
            new SensorReading("sensor_6", 1547718201L, 15.4),
            new SensorReading("sensor_7", 1547718202L, 6.7),
            new SensorReading("sensor_10", 1547718205L, 38.1) 
        ));
        
        //fromElements
        DataStream<Integer> intDataSource = env.fromElements(1,6,7,9,11);
        
        //输出
        dataSource.print("data"); //指定输出流的名称;调用的是SensorReading的toString方法
        intDataSource.print("int");
        
        //执行
        env.execute("my-Job"); //指定作业名称
        
    }
}

2. 从文件中读取

public class SourceFromFile {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //从文件中读取数据,多用于批处理
        DataStream<String> data = env.readTextFile("SensorReading.txt");
        
        //输出
        data.print("file");
        
        //执行
        env.execute();
        
    }
}

3. 从 Socket 中读取

public class SourceFromSocket {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //从Socket中读取数据
        DataStream<String> data = env.socketTextStream("localhost", 7777);
        
        //输出
        data.print("socket");
        
        //执行
        env.execute();
        
    }
}

4. 从 Kafka 消息队列中读取

实际生产应用中使用

  • 引入 flink 连接 Kafka 的官方工具依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <!-- 0.11是kafka版本,2.12是scala版本 -->
        <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
        <!-- 工具版本与flink版本对应 -->
        <version>1.10.1</version>
    </dependency>
    
  • 测试代码

    public class SourceFromKafka {
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            //定义kafka连接配置信息
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "hadoop102:9092");
            properties.setProperty("group.id", "consumer-group");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("auto.offset.reset", "latest");
            
            //使用通用的 addSource 方式从Kafka读取数据
            //addSource方法需要传入一个 SourceFunction 接口的实现类
            DataStream<String> data = env.addSource(
            	new FlinkKafkaConsumer011<String>(
                    "sensor", //topic
                    new SimpleStringSchema(); //key的反序列化器
                    properties //配置信息
                );
            );
            
            //输出
            data.print("kafka");
            
            //执行
            env.execute();
        }
    }
    
  • 在集群上启动 Zookeeper 和 Kafka 服务,然后启动一个 Kafka 生产者向 sensor 主题发送消息

5. 自定义 Source

常用于模拟数据源进行测试

public class SourceFromUDF {
    public static void main(String[] args) throws Exception {
		//创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1); //自定义source的并行度必须设置为 1
        
        //使用通用 addSource 方法和自定义 SourceFunction 获取数据
        DataStream<SensorReading> data = env.addSource( new MySensorSource() );
        
        //输出
        data.print("udf");
        
        //执行
        env.execute();
    }
    
    //自定义类实现 SourceFunction 接口并重写方法
    //数据获取逻辑:周期性循环获取10个传感器变化后的温度值
    //SourceFunction的并行度必须为 1,ParallelSourceFunction 接口可以多并行
    public static class MySensorSource implements SourceFunction<SensorReading> {
        private boolean running = true;
        
        //Source程序执行,调用该方法获取数据
        @Override
        public void run(SourceContext<SensorReading> ctx) throws Exception {
            //定义一个随机数生成器
            Random random = new Random();

            //初始化10个传感器的温度
            Map<String, Double> sensorMap = new HashMap<>();
            for(int i=1, i<=10, i++) {
                //nextGaussian()是按正态分布获取随机数,范围是 -3~3
                Double initTemp = 60 + random.nextGaussian() * 20; //0~120之间
                sensorMap.put("sensor_" + i, initTemp);
            }
            
            //循环获取变化数据
            while(running) {
                for(String sensorId : sensorMap.keySet()) {
                    Double newTemp = sensorMap.get(sensorId) + random.nextGaussian();
                    sensorMap.put(sensorId, newTemp);
                    
                    //将数据写出
                    ctx.collect(new Sensor(sensorId, System.currentTimeMillis(), newTemp));
                }
             	
                //控制数据获取频率
            	Thread.sleep(1000);   
            }
        }
        
        //Source程序调用该方法停止获取数据
        @Override
        public void cancel() {
            running = false;
        }
        
    }
}

三、Transformation

转换算子

1. 简单转换

1.1 map
/**
	方法签名:DataStream map(MapFunction<T, R> mapFunction)
	参数:实现了 MapFunction 接口的类,T 为输入数据类型,R 为转换后输出数据类型 
*/
public class Transform_Map {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        /*sensorReading.txt
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1 
        */
        DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
        
        //将读取到的每一行字符串转换成长度输出
        DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() { //使用匿名类实现,重写 map 方法
            @Override
            public Integer map(String value) throws Exception {
            	return value.length();
            }
        });
        
        //输出
        mapStream.print("map");
        
        env.execute();
        
    }
}
1.2 flatMap
/**
	方法签名:DataStream flatMap(FlatMapFunction<T, O> flatMapFunction)
	参数:实现了 FlatMapFunction 接口的类,T 为输入数据类型,O 为转换后输出数据类型 
*/
public class Transform_FlatMap {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        /*sensorReading.txt
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1 
        */
        DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
        
        //将读取到的每一行字符串按逗号分割后输出
        DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() { //使用匿名类实现,重写 flatMap 方法
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] fields = value.split(",");
                
                for(String filed : fields) {
                	out.collect(field); 	   
                }
            }
        });
        
        //输出
        flatMapStream.print("flatMap");
        
        env.execute();
        
    }
}
1.3 filter
/**
	方法签名:DataStream filter(FilterFunction<T> filterFunction)
	参数:实现了 FilterFunction 接口的类,T 为输入数据类型
*/
public class Transform_Filter {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        /*sensorReading.txt
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1 
        */
        DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
        
        //将读取到的每一行字符串,只保留以 sensor_1 开头的id输出
        DataStream<String> filterStream = inputStream.filter(new FilterFunction<String>() { //使用匿名类实现,重写 flatMap 方法
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("sensor_1");
            }
        });
        
        //输出
        filterStream.print("filter");
        
        env.execute();
        
    }
}

2. 聚合操作

2.1 keyBy

DataStream 没有直接进行聚合的 API,需要使用 keyBy 先进行分区转换成 KeyedStream 再做聚合

/**
	方法签名:KeyedStream<T, KEY> keyBy(int... fields|String... fields|Keys<T> keys|KeySelector<T,K> key)
	说明:KeyedStream 继承自 DataStream
	泛型:T 是输入数据类型,KEY 是分区键的类型
	参数:
		1.int... fields:用于分区的字段索引位置,只能是元组类型的数据
		2.String... fields:用于分区的字段名称
		3.Keys<T> keys:该重载方法被 private 修饰,底层调用的
		4.KeySelector<T,K> key:函数式接口	
*/
public class Transform_KeyBy {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        /*sensorReading.txt
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1 
        */
        DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
        
        //将读取的字符串转换成POJO类
        //MapFunction接口是一个函数式接口,可以使用 lambda 表达式实现
        DataStream<SensorReading> mapStream = inputStream.map( line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //分区
        //指定分区字段名,调用POJO类的getter方法
        KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
        
        env.execute();
        
    }
}

2.2 Rolling Aggregation

滚动聚合算子

/**
	常见的滚动聚合算子:
		sum(...)
		min(...)
		max(...) 
		minBy(...)
		maxBy(...)
	说明:指定字段位置或字段名称来聚合
	注意:
		1.元组中字段的名称,是以 f0、f1、f2、...来命名的
		2.POJO 类只能通过字段名称来指定,不能通过位置来指定
*/
public class Transform_RollingAggregation {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        /*sensorReading.txt
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1
          sensor_1,1547718203,36.3
          sensor_1,1547718207,32.5
          sensor_1,1547718209,37.1
        */
        DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
        
        //将读取的字符串转换成POJO类
        //MapFunction接口是一个函数式接口,可以使用 lambda 表达式实现
        DataStream<SensorReading> mapStream = inputStream.map( line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //分区
        //指定分区字段名,调用POJO类的getter方法
        KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
        
        
        //聚合
        //1.获取最大的温度值
        //会滚动获取当前最大的温度值,但SensorReading其他字段的值还是第一条数据的值
        DataStream<SensorReading> resultStream = keyedStream.max("temperature");
        //会滚动获取当前最大的温度值,且SensorReading其他字段的值也是最大温度这一条数据的值
        //DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
        
        
        //输出
        resultStream.print();
        
        env.execute();
        
    }
}

2.3 reduce

归约聚合

/**
	方法签名:DataStream<T> reduce(ReduceFunction<T> rf)
	参数:实现 ReduceFunction 接口的类,T 是输入数据类型
*/
public class Transform_Reduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        /*sensorReading.txt
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1
          sensor_1,1547718203,36.3
          sensor_1,1547718207,32.5
          sensor_1,1547718209,37.1
        */
        DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
        
        //将读取的字符串转换成POJO类
        //MapFunction接口是一个函数式接口,可以使用 lambda 表达式实现
        DataStream<SensorReading> mapStream = inputStream.map( line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //分区
        //指定分区字段名,调用POJO类的getter方法
        KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
        
        
        //归约聚合
        //1.获取最大的温度值和当前最新的时间戳
        DataStream<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>(){
            @Override
            public SensorReading reduce(SensorReading value1,SensorReading value2) throws Exception { //value1是之前聚合后的状态值,value2是最新读取的值
                return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(),value2.getTemperature()));
            }
        });
        
        //lambda写法
        //DataStream<SensorReading> resultStream1 = keyedStream.reduce((currState, newData) -> { return new SensorReading(currState.getId(), newData.getTimestamp(), Math.max(currState.getTemperature(),newData.getTemperature())); });
        
        //输出
        resultStream.print();
        
        env.execute();
        
    }
}

3. 多流转换

3.1 split 和 select
/**
	方法签名:
		1.SplitStream<T> split(OutputSelector<T> output)
		2.DataStream<T> select(String... outputNames)
		
	说明:
		1.split方法可以根据判断条件进行分流并设置标签
		2.select方法可以根据标签获取对应的流
*/
public class Transform_SplitAndSelect {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        /*sensorReading.txt
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1
          sensor_1,1547718203,36.3
          sensor_1,1547718207,32.5
          sensor_1,1547718209,37.1
        */
        DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
        
        //将读取的字符串转换成POJO类
        //MapFunction接口是一个函数式接口,可以使用 lambda 表达式实现
        DataStream<SensorReading> mapStream = inputStream.map( line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //分流:传感器数据按照温度高低(以 30 度为界),拆分成两个流
        SplitStream<SensorReading> splitStream = mapStream.split(new OutputSelector<SensorReading>(){
           @Override
            public Iterable<String> select(SensorReading value) {
                return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });
        
        //获取对应流
        DataStream<SensorReading> highTempStream = splitStream.select("high");
        DataStream<SensorReading> lowTempStream = splitStream.select("low");
        
        
        //输出
        highTempStream.print("high");
        lowTempStream.print("low");
        
        env.execute();
        
    }
}

3.2 connect 和 coMap

两个流合流操作,各自的数据类型可以不一致

/**
	方法签名:ConnectedStreams<T, R> connect(DataStream<R> stream)
	说明:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立;T 是调用方法的流数据类型,R 是被合并的流数据类型
	真正合并:ConnectedStreams 调用 map/flatMap 等方法,传入 CoMapFunction、CoFlatMapFunction 接口实现,在重写方法中实现真正的合并为 DataStream
*/
public class Transform_SplitAndSelect {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        /*sensorReading.txt
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1
          sensor_1,1547718203,36.3
          sensor_1,1547718207,32.5
          sensor_1,1547718209,37.1
        */
        DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
        
        //将读取的字符串转换成POJO类
        //MapFunction接口是一个函数式接口,可以使用 lambda 表达式实现
        DataStream<SensorReading> mapStream = inputStream.map( line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //分流:传感器数据按照温度高低(以 30 度为界),拆分成两个流
        SplitStream<SensorReading> splitStream = mapStream.split(new OutputSelector<SensorReading>(){
           @Override
            public Iterable<String> select(SensorReading value) {
                return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });
        
        //获取对应流
        DataStream<SensorReading> highTempStream = splitStream.select("high");
        DataStream<SensorReading> lowTempStream = splitStream.select("low");
        
        //将高温流数据转换为二元组
        DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
            @Override 
            public Tuple2<String, Double> map(SensorReading value) throws Exception {
            	return new Tuple2<>(value.getId(), value.getTemperature());
            }
		});
        
        //合流
        ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = 
warningStream.connect(lowTempStream);
        
        //真正合并
        DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>(){ //Object表示最终合并的流的数据类型
            @Override
            public Object map1(Tuple2<String, Double> value) throws Exception {
            	return new Tuple3<>(value.f0, value.f1, "warning");
            }
            
            @Override
            public Object map2(SensorReading value) throws Exception {
            	return new Tuple2<>(value.getId(), "healthy");
            }
            
        });
        
        //输出
        resultStream.print();
        
        env.execute();
        
    }
}
3.3 union

多个流合流操作,各自的数据类型必须一致

/**
	方法签名:DataStream<T> union(DataStream<T>... streams)
	说明:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream,所有的 DataStream 的数据类型必须一致
*/
public class Transform_SplitAndSelect {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        /*sensorReading.txt
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1
          sensor_1,1547718203,36.3
          sensor_1,1547718207,32.5
          sensor_1,1547718209,37.1
        */
        DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
        
        //将读取的字符串转换成POJO类
        //MapFunction接口是一个函数式接口,可以使用 lambda 表达式实现
        DataStream<SensorReading> mapStream = inputStream.map( line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //分流:传感器数据按照温度高低(以 30 度为界),拆分成两个流
        SplitStream<SensorReading> splitStream = mapStream.split(new OutputSelector<SensorReading>(){
           @Override
            public Iterable<String> select(SensorReading value) {
                return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });
        
        //获取对应流
        DataStream<SensorReading> highTempStream = splitStream.select("high");
        DataStream<SensorReading> lowTempStream = splitStream.select("low");
        
        //合流
        DataStream<SensorReading> allTempStream = highTempStream.union(lowTempStream);
        
        //输出
        allTempStream.print();
        
        env.execute();
        
    }
}

四、DataStream 支持的数据类型

  • 基础数据类型:所有 Java 基本类型及其包装类再加上 Void、String、Date、BigDecimal 和 BigInteger;所有 Scala 基础数据类型
  • 数组类型:包括基本类型数组 (PRIMITIVE_ARRAY) 和对象数组 (OBJECT_ARRAY)
  • 复合数据类型:
    • Java 元组类型 (Tuple):这是 Flink 内置的元组类型,是 Java API 的一部分。最多可以包含 25 个字段,类名从 Tuple0~Tuple25,不支持空字段
    • Scala 样例类 (case class) 和 Scala 元组 (Tuple):不支持空字段
    • 行类型 (ROW):可以认为是具有任意个字段的元组并支持空字段
    • POJO:Flink 自定义的类似于 Java bean 模式的类,必须包含空参构造方法和属性的 getter 和 setter 方法
  • 辅助类型:Option、Either、List、Map 等

五、自定义 UDF 函数类

基本上所有基于 DataStream 调用的算子操作都需要传入一个参数,该参数需要实现一个接口,接口的名称都以算子操作名称 + Function 命名,例如:源算子需要实现 SourceFunction 接口,map 算子需要实现 MapFunction 接口,reduce 算子需要实现 ReduceFunction 接口。实现接口的方法有自定义函数类或使用匿名类实现接口、使用 Lambda 表达式实现接口以及自定义富函数类实现接口

1. 函数类

Function Classes

/**
	需求:筛选 url 中包含 “home” 的事件
*/
public class TestUDFFunction {
    public static void main(String[] args) throws Exception {
        //创建上下文环境对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        //获取数据流
        DataStream<String> data = env.fromElements(
        	"http://www.baidu.com/home",
            "http://www.baidu.com/list",
            "http://www.baidu.com/page",
        );
        
        //筛选过滤
        //1.使用自定义函数类
        DataStream<String> filterStream = data.filter(new MyFilter());
        
        //2.使用匿名函数类
        DataStream<String> filterStream2 = data.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.contains("home");
            }
        });
        
        //3.使用自定义函数类并能传入过滤属性
        DataStream<String> filterStream3 = data.filter(new UrlFilter("home"));
        
        filterStream.print("filterStream");
        filterStream2.print("filterStream2");
        filterStream3.print("filterStream3");
        
        env.execute();
        
    }
    
    //自定义函数类实现 FilterFunction 接口
    public static class MyFilter implements FilterFunction<String> {
        @Override
        public boolean filter(String value) throws Exception {
            return value.contains("home");
        }
    }
    
    //自定义函数类实现 FilterFunction 接口,且定义过滤属性
    public static class UrlFilter implements FilterFunction<String> {
        private String keyWord;
        
        public UrlFilter(String keyWord) {
            this.keyWord = keyWord;
        }
        
        @Override
        public boolean filter(String value) {
            return value.contains(this.keyWord);
        }
    }
    
}

2. Lambda 表达式

Flink 中所有的 算子操作名 + Function 的接口都继承 Function 接口,所以作为函数式接口都可以使用 Lambda 表达式实现

/**
	需求:筛选 url 中包含 “home” 的事件
*/
public class TestUDFFunction {
    public static void main(String[] args) throws Exception {
        //创建上下文环境对象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        //获取数据流
        DataStream<String> data = env.fromElements(
        	"http://www.baidu.com/home",
            "http://www.baidu.com/list",
            "http://www.baidu.com/page",
        );
        
        //筛选过滤
        //但lambda表达式存在泛型擦除,在某些需要类型推断的场景时要注意
        DataStream<String> filterStream = data.filter(value -> value.contains("home"));
        
        filterStream.print();
        
        env.execute();
        
    }
    
}

3. 富函数

Rich Functions,富函数类是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其
Rich 版本。富函数类一般是以抽象类的形式出现的。例如: RichMapFunction、 RichFilterFunction、
RichReduceFunction 等。富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现
更复杂的功能

/**
	富函数典型的生命周期方法有:
		1.open():RichFunction 的初始化方法,在一个算子被调用之前调用,主要用于定义状态或建立数据库连接等
		2.close():生命周期中的最后一个调用的方法,主要用于做一些清理工作和关闭数据库连接等
		3.getRuntimeContext():提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态
*/
public class TestRichFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        
        /*sensorReading.txt
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1 
        */
        DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
        
        DataStream<SensorReading> mapStream = inputStream.map( line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        DataStream<Tuple2<String, Integer>> resultStream = mapStream.map(new MyMapper());
        
        //输出
        resultStream.print();
        
        env.execute();
        
    }
    
    //自定义富函数类继承RichMapFunction抽象类
    public static class MyMapper extends RichMapFunction<SensorReading,Tuple2<String, Integer>> {
        
        //每次类实例创建一次 open 和 close 调用一次
        @Override
        public void open(Configuration parameters) throws Exception {
        	System.out.println("my map open");
        }
        
        @Override
        public Tuple2<String, Integer> map(SensorReading value) throws Exception {
            return new Tuple2<String, Integer>(value.getId(), getRuntimeContext().getIndexOfThisSubtask());
        }
        
        @Override
        public void close() throws Exception {
        	System.out.println("my map close");
        }
    }
}

六、数据重分区操作

1. 随机分区 (shuffle)

/**
	方法签名:DataStream<T> shuffle()
	说明:通过调用 DataStream 的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区
*/
public class TestShuffle {
    public static void main(String[] args) throws Exception {
        //  创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据源,并行度为 1
        DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
        
        //经随机重分区后打印输出,并行度为 4
        stream.shuffle().print("shuffle").setParallelism(4);
        
        env.execute();
    }
}

2. 轮询分区 (rebalance)

/**
	方法签名:DataStream<T> rebalance()
	说明:上游的一个任务按照先后顺序将数据依次向下游每个任务进行发送,可以将输入流数据平均分配到下游的并行任务中去
*/
public class TestRebalance {
    public static void main(String[] args) throws Exception {
        //  创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据源,并行度为 1
        DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
        
        //经轮询重分区后打印输出,并行度为 4
        stream.rebalance().print("rebalance").setParallelism(4);
        
        env.execute();
    }
}

3. 重缩放分区 (rescale)

/**
	方法签名:DataStream<T> rescale()
	说明:重缩放分区和轮询分区非常相似,但重缩放分区是将下游任务进行分组,上游的一个任务只负责将数据发送给下游的某组任务,rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源
	应用:当下游任务(数据接收方)的数量是上游任务(数据发送方)数量的整数倍时,rescale 的效率明显会更高。比如当上游任务数量是 2,下游任务数量是 6 时,上游任务其中一个分区的数据就将会平均分配到下游任务的 3 个分区中
*/
public class TestRescale {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //这里使用了并行数据源的富函数版本
        //这样可以调用 getRuntimeContext 方法来获取运行时上下文的一些信息
        env.addSource(new RichParallelSourceFunction<Integer>() {
        	@Override
        	public void run(SourceContext<Integer> sourceContext) throws Exception {
        		for(int i = 0; i < 8; i++) {
        			//  将奇数发送到索引为 1 的并行子任务
        			//  将偶数发送到索引为 0 的并行子任务
        			if((i + 1) % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
        				sourceContext.collect(i + 1);
        			}
        		}
        	}
        
            @Override
        	public void cancel() {}
            
        }).setParallelism(2).rescale().print().setParallelism(4);
        
        /*
        	输出结果:
        	4> 3
            3> 1
            1> 2
            1> 6 
            3> 5
            4> 7
            2> 4
            2> 8
        */
        
        env.execute();
    }
} 

4. 广播 (broadcast)

/**
	方法签名:DataStream<T> broadcast()
	说明:经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去
*/
public class TestBroadcast {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //读取数据源,并行度为 1
        DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
        
        //经广播后打印输出,并行度为 4
        stream. broadcast().print("broadcast").setParallelism(4);
        
        env.execute();
    }
}

5. 全局分区 (global)

/**
	方法签名:DataStream<T> global()
	说明:全局分区是一种特殊的分区方式,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1。谨慎使用
*/

6. 自定义分区 (partitionCustom)

/**
	方法签名:DataStream<T> partitionCustom(Partitioner<K> p, KeySelector<T, K> key)
	参数说明:
		1.Partitioner:自定义分区器对象
		2.key:应用分区器的字段,指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector
*/
public class TestPartitionCustom {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //将自然数按照奇偶分区
        env.fromElements(1, 2, 3, 4, 5, 6, 7, 8).partitionCustom(
            new Partitioner<Integer>() {
                @Override
                public int partition(Integer key, int numPartitions) {
                	return key % 2;
                }
            }, 
            new KeySelector<Integer, Integer>() {
                @Override
                public Integer getKey(Integer value) throws Exception {
                	return value;
                }
            }
        ).print().setParallelism(2);
        
        env.execute();
    }
}

七、Sink

1. Kafka

  • 引入 flink 连接 Kafka 的官方工具依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <!-- 0.11是kafka版本,2.12是scala版本 -->
        <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
        <!-- 工具版本与flink版本对应 -->
        <version>1.10.1</version>
    </dependency>
    
    
  • 编写 Kafka Sink 代码

    public class SinkToKafka {
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            //定义kafka连接配置信息
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "hadoop102:9092");
            properties.setProperty("group.id", "consumer-group");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("auto.offset.reset", "latest");
            
            //使用通用的 addSource 方法从Kafka读取数据
            //addSource方法需要传入一个 SourceFunction 接口的实现类
            DataStream<String> data = env.addSource(
            	new FlinkKafkaConsumer011<String>(
                    "sensor", //topic
                    new SimpleStringSchema(); //key的反序列化器
                    properties //配置信息
                );
            );
            
            //使用通用的 addSink 方法输出数据到 Kafka
            //addSink方法需要传入一个 SinkFunction 接口的实现类
            data.addSink(new FlinkKafkaProducer011<String>(
            	"hadoop102:9092", //broker-list
                "sinkTest", //topic
                new SimpleStringSchema(); //key的反序列化器
            ));
            
            //执行
            env.execute();
        }
    }
    
    

2. Redis

  • 引入 Apache-Bahir 项目中的 Flink 连接 Redis 依赖

    <dependency>
        <groupId>org.apache.bahir</groupId>
        <!-- 2.11 是scala版本-->
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    
  • 编写 Sink Redis 代码

    public class SinkRedis {
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
            
            //读取文本数据
            /*sensorReading.txt
              sensor_1,1547718199,35.8
              sensor_6,1547718201,15.4
              sensor_7,1547718202,6.7
              sensor_10,1547718205,38.1 
            */
            DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
            
            //将读取的字符串转换成POJO类
            //MapFunction接口是一个函数式接口,可以使用 lambda 表达式实现
            DataStream<SensorReading> mapStream = inputStream.map( line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            });
            
            //输出数据到Redis
            /*
            	RedisSink是一个继承了 RichSinkFunction 的富函数类
            	构造函数参数:
            		1.FlinkJedisConfigBase:连接Redis的配置抽象类,常用实现类为 FlinkJedisPoolConfig
            		2.RedisMapper<T>:定义保存数据的redis命令的接口,需要自定义其实现类
            */
            //定义redis连接配置类
            FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();
            
            mapStream.addSink(new RedisSink<SensorReading>(config, new MyRedisMapper()));
            
            env.execute();
            
        }
        
        //自定义RedisMapper接口实现类
        public static class MyRedisMapper implements RedisMapper<SensorReading> {
            // 定义保存数据到 redis 的命令,存成哈希表,hset sensor_temp id temperature
            @Override
            public RedisCommandDescription getCommandDescription() {
            	return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
            }
            
            //定义存储的key
            @Override
            public String getKeyFromData(SensorReading data) {
            	return data.getId();
            }
            
            //定义存储的value
            @Override
            public String getValueFromData(SensorReading data) {
            	return data.getTemperature().toString();
            }
            
        }
    }
    
  • 启动 Redis server 和 Redis cli 服务,查看数据保存情况

3. ElasticSearch

  • 引入 Flink 连接 ElasticSearch 的官方依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
    
    
  • 编写 Sink ElasticSearch 代码

    public class SinkElasticSearch {
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
            
            //读取文本数据
            /*sensorReading.txt
              sensor_1,1547718199,35.8
              sensor_6,1547718201,15.4
              sensor_7,1547718202,6.7
              sensor_10,1547718205,38.1 
            */
            DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
            
            //将读取的字符串转换成POJO类
            //MapFunction接口是一个函数式接口,可以使用 lambda 表达式实现
            DataStream<SensorReading> mapStream = inputStream.map( line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            });
            
            //输出数据到ElasticSearch
            /*
            	1.ElasticsearchSinkBase是一个继承了 RichSinkFunction 的富函数抽象类,常用的实现类为 ElasticsearchSink
            	2.ElasticsearchSink类通过 Builder<T>(List<HttpHost> list, ElasticsearchSinkFunction<T> esf).build() 创建实例,Builder() 中需要传入的参数:
            		2.1 List<HttpHost>:连接到的 Elasticsearch 集群主机列表
            		2.2 ElasticsearchSinkFunction<T>:定义具体处理逻辑、准备数据向 Elasticsearch发送请求的接口,需要自定义其实现类
            */
            //定义Elasticsearch 集群主机列表
            ArrayList<HttpHost> httpHosts = new ArrayList<>();
    		httpHosts.add(new HttpHost("localhost", 9200));
            
            mapStream.addSink(new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
            
            env.execute();
            
        }
        
        //自定义ElasticsearchSinkFunction接口实现类
        public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
            @Override
            public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
                //创建发送的Source
                HashMap<String, String> dataSource = new HashMap<>();
                dataSource.put("id", element.getId());
                dataSource.put("ts", element.getTimestamp().toString());
                dataSource.put("temp", element.getTemperature().toString());
                
                //创建请求
                IndexRequest indexRequest = Requests.indexRequest().index("sensor").type("readingData").source(dataSource);
                
                //发送请求
                indexer.add(indexRequest);
            }
            
        }
    }
    
  • 启动 ElasticSearch 服务查看数据

4. 自定义 Sink

将数据使用 JDBC 写入到关系型数据库中,以 MySQL 为例;注:Flink 1.11 版本开始有官方连接 JDBC 的 Sink 依赖 flink-connector-jdbc

  • 引入 mysql 连接驱动依赖

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.44</version>
    </dependency>
    
  • 编写 Sink JDBC 代码

    public class SinkJDBC {
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            
            env.setParallelism(1);
            
            //读取文本数据
            /*sensorReading.txt
              sensor_1,1547718199,35.8
              sensor_6,1547718201,15.4
              sensor_7,1547718202,6.7
              sensor_10,1547718205,38.1 
            */
            DataStream<String> inputStream = env.readTextFile("sensorReading.txt");
            
            //将读取的字符串转换成POJO类
            //MapFunction接口是一个函数式接口,可以使用 lambda 表达式实现
            DataStream<SensorReading> mapStream = inputStream.map( line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            });
            
            //输出数据到Mysql
            mapStream.addSink(new MyJdbcSink());
            
            env.execute();
            
        }
        
        //自定义JDBCSink,使用继承富函数类的方式,为了能够使用生命周期方法来建立和关闭数据库连接
        public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
            //定义连接对象和预编译语句对象属性,以便全局使用
            private Connection conn = null;
            private PreparedStatement insertStmt = null;
            private PreparedStatement updateStmt = null;
            
            //在 open 方法中创建连接
            @Override
            public void open(Configuration parameters) throws Exception {
            	conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");
                
                // 创建预编译器,有占位符,可传入参数
                insertStmt = conn.prepareStatement("INSERT INTO sensor_temp(id, temp) VALUES (?, ?)");
                updateStmt = conn.prepareStatement("UPDATE sensor_temp SET temp = ? WHERE id = ?");
            }
            
            // 调用连接,执行 sql
            @Override
            public void invoke(SensorReading value, Context context) throws Exception {
                // 执行更新语句,注意不要留 super
                updateStmt.setDouble(1, value.getTemperature());
                updateStmt.setString(2, value.getId());
                updateStmt.execute();
                
                // 如果刚才 update 语句没有更新,那么插入
                if (updateStmt.getUpdateCount() == 0) {
                    insertStmt.setString(1, value.getId()); 
                    insertStmt.setDouble(2, value.getTemperature());
                    insertStmt.execute();
                }
            }
            
            //在 close 方法中关闭连接
            @Override
            public void close() throws Exception {
                insertStmt.close();
                updateStmt.close();
                conn.close();
            }
        }
        
    }
    
  • 启动 mysql 服务,查看对应数据表中的数据

相关推荐

  1. 详解 Flink 处理 API

    2024-06-08 09:26:06       11 阅读
  2. Apache Flink 处理-[CentOS|Rocky] 镜像

    2024-06-08 09:26:06       13 阅读
  3. 48、Flink 的 Data Source API 详解

    2024-06-08 09:26:06       9 阅读
  4. 详解 Flink Table APIFlink SQL 之入门介绍

    2024-06-08 09:26:06       7 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-08 09:26:06       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-08 09:26:06       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-08 09:26:06       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-08 09:26:06       20 阅读

热门阅读

  1. Element UI 快速入门指南

    2024-06-08 09:26:06       11 阅读
  2. Redis

    2024-06-08 09:26:06       10 阅读
  3. Android基础-自定义view

    2024-06-08 09:26:06       11 阅读
  4. 深度解读ChatGPT基本原理

    2024-06-08 09:26:06       9 阅读
  5. 部件库(Widget Factory)

    2024-06-08 09:26:06       10 阅读
  6. 图像处理 -- 自适应色调映射(ATM)整理

    2024-06-08 09:26:06       9 阅读