Flink之JDBCSink连接MySQL

输出到MySQL

  1. 添加依赖
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>3.1.0-1.17</version>
</dependency>
<dependency>
    <groupId>com.mysql</groupId>
    <artifactId>mysql-connector-j</artifactId>
    <version>8.0.32</version>
</dependency>
  1. 启动MySQL, 在test库下建表clicks
CREATE TABLE `clicks` (
  `user` VARCHAR(100) NOT NULL,
  `url` VARCHAR(100) DEFAULT NULL,
  `ts` BIGINT DEFAULT NULL
) ENGINE=INNODB DEFAULT CHARSET=utf8
  1. 示例代码
public class Flink04_JdbcSink {
   
    public static void main(String[] args) {
   
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);

        //
        SinkFunction<Event> sink = JdbcSink.sink(
                "insert into clicks(user, url, ts) values (?,?,?)"
                , new JdbcStatementBuilder<Event>() {
   
                    @Override
                    public void accept(PreparedStatement preparedStatement, Event event) throws SQLException {
   
                        //给SQL的占位符赋值
                        preparedStatement.setString(1, event.getUser());
                        preparedStatement.setString(2, event.getUrl());
                        preparedStatement.setLong(3, event.getTs());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(5)
                        .withBatchIntervalMs(10000)
                        .withMaxRetries(3)
                        .build()
                ,
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("000000")
                        .withUrl("jdbc:mysql://hadoop102:3306/flink")
                        .build()
        );

        ds.addSink(sink);

        try {
   
            env.execute();
        } catch (Exception e) {
   
            throw new RuntimeException(e);
        }
    }
}

MySQL的幂等性处理

  1. 将插入关键字替换为replace,如果主键重复,将除了主键外的所有字段都替换。
  2. 使用on duplicate key update 字段名 = values(字段名)语法,如果主键重复,可以选择部分字段进行替换,其余字段保持不变。
  3. 示例代码
public class Flink05_JdbcSinkReplace {
   
    public static void main(String[] args) {
   
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(1);

        DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);

        SingleOutputStreamOperator<WordCount> countDs =
                ds.map(event -> new WordCount(event.getUrl(), 1))
                .keyBy(WordCount::getWord)
                .sum("count");

        //
        SinkFunction<WordCount> sink = JdbcSink.sink(
//                "replace into url_count(url, cnt) values (?,?)"
                "insert into url_count(url, cnt) values(?,?) on duplicate key update cnt = values(cnt)"
                ,
                new JdbcStatementBuilder<WordCount>() {
   
                    @Override
                    public void accept(PreparedStatement preparedStatement, WordCount wordCount) throws SQLException {
   
                        //注意:这里的起始下标是1
                        preparedStatement.setString(1, wordCount.getWord());
                        preparedStatement.setInt(2, wordCount.getCount());
                    }
                }
                ,
                JdbcExecutionOptions.builder()
                        .withBatchSize(5)
                        .withBatchIntervalMs(10000)
                        .withMaxRetries(3)
                        .build()
                ,
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("000000")
                        .withUrl("jdbc:mysql://hadoop102:3306/flink")
                        .build()
        );

        countDs.addSink(sink);

        try {
   
            env.execute();
        } catch (Exception e) {
   
            throw new RuntimeException(e);
        }
    }
}

相关推荐

  1. FlinkJDBCSink连接MySQL

    2023-12-09 06:54:04       34 阅读
  2. Flink系列:Print SQL连接器

    2023-12-09 06:54:04       34 阅读
  3. Flink系列:Upsert Kafka SQL 连接器

    2023-12-09 06:54:04       36 阅读
  4. Flink系列:Apache Kafka SQL 连接器

    2023-12-09 06:54:04       27 阅读
  5. Flink系列:Elasticsearch SQL 连接器

    2023-12-09 06:54:04       38 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-09 06:54:04       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-09 06:54:04       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-09 06:54:04       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-09 06:54:04       20 阅读

热门阅读

  1. 十年OpenCV开发以后发布的作品 - OpenCV实验大师

    2023-12-09 06:54:04       44 阅读
  2. 常见请求头与响应头你了解哪些?

    2023-12-09 06:54:04       39 阅读
  3. 搜索引擎和网络浏览器之间的区别

    2023-12-09 06:54:04       43 阅读
  4. Express Generator使用

    2023-12-09 06:54:04       34 阅读
  5. el-form表单校验值为0提示校验不通过

    2023-12-09 06:54:04       42 阅读
  6. torchvision中的标准ResNet50网络结构

    2023-12-09 06:54:04       37 阅读