MySQL向Es数据同步策略

哈喽,大家好!目前有有一个小项目功能落到自己手中,也是一个面试必考点。如何保证MySQL与Es、Redis等之间的数据一致性,带着大家的问题,我给提供一种解决方案(最终一致性)

代码如下:

@Component
@Slf4j
@EnableAsync
@AllArgsConstructor
public class EsSynchronizeTasksJob {


    private final  ISysUserAllESService sysUserAllESService;

    private final SysUserAllESDocMapper userAllESDocMapper;

    private final SysPositRequestDocMapper sysPositRequestDocMapper;

    private final ISysPositRequestService sysPositRequestService;

//    @Scheduled(fixedRate = 10000)
//    @Scheduled(cron = "0 0 3 * * ?")
    @Async("pushMysqlToEsExecutor")
    @Transactional(rollbackFor = Exception.class)
    public void syncDataToEsBySysUserAllES() throws Exception {
        log.info("<----异步执行数据同步开始---->");
        // 获取系统时间
        Long startTime = System.currentTimeMillis();

        process(sysPositRequestService, sysPositRequestDocMapper);
        process(sysUserAllESService, userAllESDocMapper);


        Long endTime = System.currentTimeMillis();
        log.info("<----异步执行数据同步结束---->");
        log.info("<----异步执行数据同步耗时---->" + (endTime - startTime) + "ms");
    }


    private <D extends BaseEntity, S extends IService<D>, M extends BaseEsMapper<D>> void process(IService service, BaseEsMapper mapper) throws Exception {
        Class<D> docClass = mapper.getEntityClass();
        String indexName = docClass.getAnnotation(Document.class).indexName();
        if (!mapper.existsIndex(indexName)) {
            boolean success = mapper.createIndex(indexName);
            if (success) {
                System.out.println("索引创建成功!indexName===>" + indexName);
            }
        } else {
            List<D> sysUserAllESDocs = mapper.selectList(new LambdaEsQueryWrapper<>());
            System.out.println("es中有" + sysUserAllESDocs.size());

            // TODO 现在是全量同步,后面待定时任务开启后就是每天新增的数据
            Integer delete = mapper.delete(new LambdaEsQueryWrapper<>());
            System.out.println("删除ES之前的数据" + delete);
        }

        // TODO 现在是全量同步,后面待定时任务开启后就是每天新增的数据
        List<D> list = service.lambdaQuery().getBaseMapper().selectList(new LambdaQueryWrapper<>());
        List<D> document = new ArrayList<>();
        for (D o : list) {
            D docInstance = docClass.getDeclaredConstructor().newInstance();
            BeanUtils.copyProperties(o, docInstance);
            Method setIdMethod = docClass.getMethod("setMysqlId", Long.class);
            Long idValue = (Long) o.getClass().getMethod("getId").invoke(o);
            setIdMethod.invoke(docInstance, idValue);
            document.add(docInstance);
        }
        int successCount = mapper.insertBatch(document);
        System.out.println("在MySQL表中有:" + list.size() + "条数据");
        System.out.println("向ES中成功添加:" + successCount + "条数据");
        if (list.size() == successCount) {
            System.out.println("同步成功");
        } else {
            int i = list.size() - successCount;
            System.out.println("同步失败,有" + i + "条数据未写入ES,请联系管理员!");
        }
    }
}

自定义线程池类:

package com.ruoyi.web.es.conifg;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 七大参数:
 * 1 corePoolSize:线程池核心线程数量,核心线程不会被回收,即使没有任务执行,也会保持空闲状态。如果线程池中的线程少于此数目,则在执行任务时创建。
 * 2 maximumPoolSize:池允许最大的线程数,当线程数量达到corePoolSize,且workQueue队列塞满任务了之后,继续创建线程。
 * 3 keepAliveTime:超过corePoolSize之后的“临时线程”的存活时间。
 * 4 unit:keepAliveTime的单位。
 * 5 workQueue:当前线程数超过corePoolSize时,新的任务会处在等待状态,并存在workQueue中,BlockingQueue是一个先进先出的阻塞式队列实现,底层实现会涉及Java并发的AQS机制,有关于AQS的相关知识,我会单独写一篇,敬请期待。
 * 6 threadFactory:创建线程的工厂类,通常我们会自顶一个threadFactory设置线程的名称,这样我们就可以知道线程是由哪个工厂类创建的,可以快速定位。
 * 7 handler:线程池执行拒绝策略,当线数量达到maximumPoolSize大小,并且workQueue也已经塞满了任务的情况下,线程池会调用handler拒绝策略来处理请求。
 * 四大拒绝策略:
 * 1 AbortPolicy:为线程池默认的拒绝策略,该策略直接抛异常处理。
 * 2 DiscardPolicy:直接抛弃不处理。
 * 3 DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务
 * 4 CallerRunsPolicy:使用当前调用的线程来执行此任务
 *
 * @author gao
 */
@Configuration
public class ThreadPoolTaskExecutorConfig {

    // 核心线程数
    private static final int corePoolSize = 4;

    // 最大线程数
    private static final int maxPoolSize = 4;

    // 允许线程空闲时间(单位:默认为秒)
    private static final int keepAliveTime = 10;
    // 缓冲队列数
    private static final int queueCapacity = 15;
    // 线程池名前缀
    private static final String threadNamePrefix = "Thread-PushMessage-Service-";

    @Bean("pushMysqlToEsExecutor")
    public ThreadPoolTaskExecutor pushMessageExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setThreadNamePrefix(threadNamePrefix);
        // setWaitForTasksToCompleteOnShutdown(true): 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // setAwaitTerminationSeconds(60): 该方法用来设置线程池中 任务的等待时间,如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
        executor.setAwaitTerminationSeconds(60);
        // 线程池对拒绝任务的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }


    @Bean(name = "pushMysqlToEsScheduler")
    public ThreadPoolTaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
                corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(queueCapacity),
                new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").build()
        );
        scheduler.setPoolSize(customThreadPool.getMaximumPoolSize());
        scheduler.setThreadNamePrefix(threadNamePrefix);
        return scheduler;
    }


}

大家知道有几种保证数据一致性的解决方案呢?

①通过读写锁来实现

②通过中间件canal伪装成MySQL的从节点来实现(参考地址:超详细的Canal入门,看这篇就够了!-CSDN博客

③通过MQ异步消息推送实现,MySQL数据发生变化时向MQ发送消息

④通过定时任务获取MySQL中更新的数据

..........

最后:①②是强一致性的解决方案,③④是最终一致性的解决方案。项目实战是一个漫长的过程,山高路远,道阻且长!加油💪🏻!

相关推荐

  1. MySQLEs数据同步策略

    2024-06-18 15:04:03       9 阅读
  2. ✅技术社区—MySQLES数据同步策略

    2024-06-18 15:04:03       25 阅读
  3. esmysql同步问题

    2024-06-18 15:04:03       21 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-18 15:04:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-18 15:04:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-18 15:04:03       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-18 15:04:03       20 阅读

热门阅读

  1. 关于一份nginx-我是如何优化的

    2024-06-18 15:04:03       8 阅读
  2. 互联网十万个为什么之什么是对象存储?

    2024-06-18 15:04:03       8 阅读
  3. PHP中的for循环:多方面探讨与实际应用

    2024-06-18 15:04:03       8 阅读
  4. Qt 插件框架

    2024-06-18 15:04:03       8 阅读
  5. 力扣1385.两个数组间的距离值

    2024-06-18 15:04:03       8 阅读
  6. 【Python高级编程】使用OpenCV进行图像旋转详解

    2024-06-18 15:04:03       7 阅读
  7. CSS行内样式书写规范及注意事项

    2024-06-18 15:04:03       5 阅读
  8. 查看mysql数据库端口号

    2024-06-18 15:04:03       7 阅读
  9. 美股 — “四巫日”

    2024-06-18 15:04:03       9 阅读