使用FlinkCDC从mysql同步数据到ES,并实现数据检索

一、背景

随着公司的业务量越来越大,查询需求越来越复杂,mysql已经不支持变化多样的复杂查询了。

于是,使用cdc捕获MySQL的数据变化,同步到ES中,进行数据的检索。

一、环境准备

1、创建ES索引

// 创建索引并指定映射
PUT /course
{
	"mappings": {
		"properties": {
			"id": {
				"type": "keyword"
			},
			"name": {
				"type": "text"
			},
			"label": {
				"type": "text"
			},
			"content": {
			  "type": "text"
			}
		}
	}
}

// 查询course下所有数据(备用)
GET /course/_search
// 删除索引及数据(备用)
DELETE /course

2、创建mysql数据表

CREATE TABLE `course` (
  `id` varchar(32) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `label` varchar(255) DEFAULT NULL,
  `content` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

二、使用FlinkCDC同步数据

1、导包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.18.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>3.0.0</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>1.18.0</version>
</dependency>



2、demo

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;


/**
 * cdc
 */
public class CDCTest {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("192.168.56.10")
                .port(3306)
                .databaseList("mytest")
                .tableList("mytest.course")
                .username("root")
                .password("root")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启检查点
        env.enableCheckpointing(3000);

        env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
            // 1个并行任务
            .setParallelism(1)
            .addSink(new RichSinkFunction<String>() {
                private final static ElasticSearchUtil es = new ElasticSearchUtil("192.168.56.10");
                @Override
                public void invoke(String value, Context context) throws Exception {
                    super.invoke(value, context);
                    JSONObject jsonObject = JSON.parseObject(value);
                    DataInfo dataInfo = new DataInfo();
                    dataInfo.setOp(jsonObject.getString("op"));
                    dataInfo.setBefore(jsonObject.getJSONObject("before"));
                    dataInfo.setAfter(jsonObject.getJSONObject("after"));
                    dataInfo.setDb(jsonObject.getJSONObject("source").getString("db"));
                    dataInfo.setTable(jsonObject.getJSONObject("source").getString("table"));

                    if (dataInfo.getDb().equals("mytest") && dataInfo.getTable().equals("course")) {

                        String id = dataInfo.getAfter().get("id").toString();
                        if(dataInfo.getOp().equals("d")) {
                            es.deleteById("course", id);
                        } else {
                            es.put(dataInfo.getAfter(), "course", id);
                        }
                    }
                }
            })

            .setParallelism(1); // 对接收器使用并行性1来保持消息顺序

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

```java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import java.util.Map;

/**
 * 收集的数据类型
 * @author cuixiangfei
 * @since 20234-03-20
 */
public class DataInfo {

    // 操作 c是create;u是update;d是delete;r是read
    private String op;

    private String db;

    private String table;

    private Map<String, Object> before;

    private Map<String, Object> after;


    public String getOp() {
        return op;
    }

    public void setOp(String op) {
        this.op = op;
    }

    public String getDb() {
        return db;
    }

    public void setDb(String db) {
        this.db = db;
    }

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public Map<String, Object> getBefore() {
        return before;
    }

    public void setBefore(Map<String, Object> before) {
        this.before = before;
    }

    public Map<String, Object> getAfter() {
        return after;
    }

    public void setAfter(Map<String, Object> after) {
        this.after = after;
    }

    public boolean checkOpt() {
        if (this.op.equals("r")) {
            return false;
        }
        return true;
    }

    @Override
    public String toString() {
        return "DataInfo{" +
                "op='" + op + '\'' +
                ", db='" + db + '\'' +
                ", table='" + table + '\'' +
                ", before=" + before +
                ", after=" + after +
                '}';
    }

    public static void main(String[] args) {
        String value = "{\"before\":{\"id\":\"333\",\"name\":\"333\",\"label\":\"333\",\"content\":\"3333\"},\"after\":{\"id\":\"333\",\"name\":\"33322\",\"label\":\"333\",\"content\":\"3333\"},\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1710923957000,\"snapshot\":\"false\",\"db\":\"mytest\",\"sequence\":null,\"table\":\"course\",\"server_id\":1,\"gtid\":null,\"file\":\"mysql-bin.000008\",\"pos\":1318,\"row\":0,\"thread\":9,\"query\":null},\"op\":\"u\",\"ts_ms\":1710923957825,\"transaction\":null}";
        JSONObject jsonObject = JSON.parseObject(value);

        System.out.println(jsonObject.get("op"));
        System.out.println(jsonObject.get("before"));
        System.out.println(jsonObject.get("after"));
        System.out.println(jsonObject.getJSONObject("source").get("db"));
        System.out.println(jsonObject.getJSONObject("source").get("table"));
    }
}

3、es工具类

springboot集成elasticSearch(附带工具类)

三、测试

1、先创建几条数据

INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('1', '11', '111', '1111');
INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('2', '22 33', '222 333', '2222 3333');
INSERT INTO `mytest`.`course`(`id`, `name`, `label`, `content`) VALUES ('3', '33 44', '33 444', '3333 4444');

2、启动cdc

3、查询es

在这里插入图片描述

4、增删改几条数据进行测验

相关推荐

  1. 使用docker实现logstash同步mysqles

    2024-03-27 10:28:08       44 阅读
  2. 使用maxwell实时同步mysql数据kafka

    2024-03-27 10:28:08       16 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-27 10:28:08       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-27 10:28:08       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-27 10:28:08       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-27 10:28:08       20 阅读

热门阅读

  1. 捉虫日常:R包下载成功加载失败?

    2024-03-27 10:28:08       20 阅读
  2. @Autowired与@Resource区别

    2024-03-27 10:28:08       19 阅读
  3. Python基于 BaseHTTPRequestHandler 创建简单Web服务

    2024-03-27 10:28:08       17 阅读
  4. XSS伪协议

    2024-03-27 10:28:08       19 阅读
  5. 网络入门基础

    2024-03-27 10:28:08       18 阅读
  6. 前端理论总结(js)——reduce相关应用方法

    2024-03-27 10:28:08       20 阅读
  7. Go语言base64流式编码在收尾时的一个小坑

    2024-03-27 10:28:08       17 阅读
  8. 正则表达式:深入理解与应用

    2024-03-27 10:28:08       16 阅读
  9. CentOS 7 安装 Git

    2024-03-27 10:28:08       19 阅读
  10. 11. Linux中进程控制细节

    2024-03-27 10:28:08       19 阅读
  11. 【算法】计数排序

    2024-03-27 10:28:08       17 阅读