flink:自定义数据分区

shuffle随机地将数据分配到下游的子任务。
rebalance用round robbin模式将数据分配到下游的子任务。
global把所有的数据都分配到一个分区。
partitionCustom: 自定义数据分区。

package cn.edu.tju.demo;

import org.apache.flink.api.common.functions.;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.
;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.*;

public class Test12 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment
.getExecutionEnvironment();

    DataStreamSource<String> mySource = environment.addSource(new MySourceFunction());
    SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = mySource.map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String value) throws Exception {
            return new Tuple2<>(value, new Random().nextInt(10));
        }
    });

    DataStream<Tuple2<String, Integer>> resultStream = mapStream.partitionCustom(new MyPartitioner(), 1);


    resultStream.print();

    environment.execute("my job");

}

public static class MyPartitioner implements Partitioner<Integer>{

    @Override
    public int partition(Integer key, int partitions) {
        return key % partitions;
    }
}



public static class MySourceFunction implements SourceFunction<String> {
    private boolean runningFlag = true;
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (runningFlag){
            ctx.collect("hi world");
            ctx.collect("hello world");
            Thread.sleep(30000);
        }
    }

    @Override
    public void cancel() {
        runningFlag = false;
    }
}

}

相关推荐

  1. flink:定义数据分区

    2024-03-09 22:20:08       28 阅读
  2. 定义Flink SourceFunction定时读取数据库

    2024-03-09 22:20:08       33 阅读
  3. flink定义函数如何从崩溃中恢复数据

    2024-03-09 22:20:08       14 阅读
  4. 定义fink source

    2024-03-09 22:20:08       16 阅读
  5. 7、Flink 定义 WaterMarkGenerator 案例

    2024-03-09 22:20:08       27 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-09 22:20:08       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-09 22:20:08       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-09 22:20:08       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-09 22:20:08       18 阅读

热门阅读

  1. Python中的new和call方法

    2024-03-09 22:20:08       23 阅读
  2. 物联网常见协议篇

    2024-03-09 22:20:08       58 阅读
  3. nginx作为tcp的负载均衡

    2024-03-09 22:20:08       23 阅读
  4. C++从零开始的打怪升级之路(day44)

    2024-03-09 22:20:08       22 阅读
  5. logstash和elasticsearch的几种交互接口

    2024-03-09 22:20:08       22 阅读
  6. 负载均衡 dubbo

    2024-03-09 22:20:08       35 阅读
  7. 探究短链接生成算法

    2024-03-09 22:20:08       24 阅读
  8. html地铁跑酷

    2024-03-09 22:20:08       25 阅读
  9. Markdown语法与基础使用

    2024-03-09 22:20:08       21 阅读