JedisCluster 通过 Pipeline 实现两套数据轮换更新

其他系列文章导航

Java基础合集
数据结构与算法合集

设计模式合集

多线程合集

分布式合集

ES合集


文章目录

其他系列文章导航

文章目录

前言

一、整体流程

1.1 大致流程

1.2 流程代码解释

二、从数据库里查数据

2.1 SQL语句

三、更新当前前缀

3.1 设置前缀常量

3.2 初始化 currentPrefixIndex

3.3 获取当日前缀 

 3.4 更新 currentPrefixIndex

四、往redis集群更新数据

4.1 大致流程

五、JedisCluster 实现 Pipeline 操作

5.1 实现过程


前言

本文实现了通过定时任务来调用接口,使两套数据轮换更新。

因为要区分两套数据,所以 key 要设置前缀。

例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key 。今天查完后,明天更新 B 开头的 key ,但是 A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。


一、整体流程

1.1 大致流程

  1. 从数据库里查数据。
  2. 更新当前前缀。
  3. 往redis集群更新数据。

1.2 流程代码解释

    @Override
    public R<String> updateCampToJedis() {
        R<String> r = new R<>();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
        String currentMonth = dateFormat.format(new Date());
        //1. 从数据库里查数据
        List<UserWideInfo> UserWideInfoList = UserWideInfoMapper.selectFromTable(currentMonth);
        if (UserWideInfoList.size() == 0) {
            r.setCode(R.ERROR_CODE);
            r.setMsg("没有数据");
            return r;
        }
        //2. 更新当前前缀
        updateCurrentPrefixIndex();
        r.setCode(R.SUCCESS_CODE);
        //3. 往redis集群存入数据
        insertToJedis(ZhmsUserWideInfoList);
        return r;
    }

二、从数据库里查数据

2.1 SQL语句

这里因为每个月查询的是不同月份的表,所有用到动态 sql 。

    <select  id="selectFromTable" resultType="com.hopedata.zhmscloud.camp.entity.po.ZhmsUserWideInfo">
        SELECT * FROM USER_WIDE_INFO_M_${SysMonth}
    </select>

三、更新当前前缀

要做到更新当前前缀,需要有两套前缀不同的 key ,还需要一个能区分前缀的前缀索引 currentPrefixIndex 。

3.1 设置前缀常量

用 A 和 B 来区分两组 key 。

代码如下: 

    private static final String PREFIX_A = "A";
    private static final String PREFIX_B = "B";

3.2 初始化 currentPrefixIndex

向 redis集群中存入初始的 currentPrefixIndex 。

代码如下: 

    @GetMapping("/init")
    public String init() {
        return jedisCluster.set("currentPrefixIndex", "0");
    }

3.3 获取当日前缀 

先取出当日的前缀索引 currentPrefixIndex ,与 2 取余数 ,来获取当日的前缀。

代码如下: 

     //获取当日前缀
    private String getKeyPrefix() {
        int currentPrefixIndex = Integer.parseInt(jedisCluster.get("currentPrefixIndex"));
        if (currentPrefixIndex % 2 == 0) {
            return PREFIX_A;
        } else {
            return PREFIX_B;
        }
    }

 3.4 更新 currentPrefixIndex

每天需要更新前缀索引 currentPrefixIndex ,让 currentPrefixIndex + 1 , 使区分读的数据。

代码如下: 

    // 重新设置currentPrefixIndex
    private void updateCurrentPrefixIndex() {
        String currentValue = jedisCluster.get("currentPrefixIndex");
        int newValue = Integer.parseInt(currentValue) + 1;
        jedisCluster.set("currentPrefixIndex", String.valueOf(newValue));
    }

四、往redis集群更新数据

这其实是最重要的一步,因为同时存入大量的数据,所以要使用到 Pipeline 来实现。

4.1 大致流程

  1. 获取到当前前缀,查出相关的 key ,更新数据之前把旧数据删除。
  2. 把新数据解析后更新到 redis 集群。

注意:因为数据量大,为了减少网络性能消耗,删除和更新都要用 Pipeline 来操作。

代码如下:

    private void insertToJedis(List<UserWideInfo> UserWideInfoList) {
        String keyPrefix = getKeyPrefix();
        List<String> keys = new ArrayList<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        for (JedisPool node : clusterNodes.values()) {
            try (Jedis jedis = node.getResource()) {
                Set<String> nodeKeys = jedis.keys(keyPrefix + "*");
                keys.addAll(nodeKeys);
            }
        }
        Map<JedisPool, List<String>> delKey = assignKey(keys, jedisCluster);
        //先删旧的
        for (JedisPool jedisPool : delKey.keySet()) {
            try (Jedis jedis = jedisPool.getResource()){
                Pipeline pipelined = jedis.pipelined();
                List<String> keysList = delKey.get(jedisPool);
                for (String key : keysList) {
                    pipelined.del(key);
                }
                pipelined.sync();
            }
        }
        List<String> keyList =new ArrayList<>();
        HashMap<String, String> map = new HashMap<>();
        //填充keyList和value
        for (UserWideInfo UserWideInfo : UserWideInfoList) {
            String key = keyPrefix + "_" + UserWideInfo.getBillNo();
            keyList.add(key);
            //构建value
            ...
            ...
            map.put(key, value);
        }
        Map<JedisPool, List<String>> result = assignKey(keyList, jedisCluster);
        for (JedisPool jedisPool : result.keySet()) {
            try (Jedis jedis = jedisPool.getResource()){
                Pipeline pipelined = jedis.pipelined();
                // 获取当前JedisPool对应的键列表
                List<String> keysList = result.get(jedisPool); 
                // 将命令添加到Pipeline中
                for (String key : keysList) {
                    String value = map.get(key);
                    pipelined.set(key, value);
                }
                // 执行Pipeline中的所有命令
                pipelined.sync();
            }
        }
    }

五、JedisCluster 实现 Pipeline 操作

5.1 实现过程

因为 JedisCluster 不支持 Pipeline 操作,所以需要自己来实现。

代码如下:

@Slf4j
public class JedisPipelineUtil {

    /**
     * jedis集群下使用pipeline之前先将key分配管道
     * Map<String, List<String>> 键值为节点ip和端口号 192.168.1.1:6397 value为redis存入的key
     *
     * @param list         存redis的key
     * @param jedisCluster
     * @return
     */
    public static Map<String, List<String>> assignSlot(List<String> list, JedisCluster jedisCluster) {
        Map<String, List<String>> hostPhoneMap = new HashMap<>();
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
        Map.Entry<String, JedisPool> next = clusterNodes.entrySet().iterator().next();
        JedisPool jedisPool = next.getValue();
        Jedis jedis = jedisPool.getResource();
        Map<Integer, String> slots = discoverClusterSlots(jedis);
        for (String s : list) {
            String hostAndPort = slots.get(JedisClusterCRC16.getSlot(s));
            if (hostPhoneMap.containsKey(hostAndPort)) {
                hostPhoneMap.get(hostAndPort).add(s);
            } else {
                List<String> newList = new ArrayList<>();
                newList.add(s);
                hostPhoneMap.put(hostAndPort, newList);
            }
        }
        jedis.close();
        return hostPhoneMap;
    }

    /**
     * jedis集群下使用pipeline之前先将key分配管道
     * Map<JedisPool, List<String>> 键值为节JedisPool value为redis存入的key
     *
     * @param list         存redis的key
     * @param jedisCluster
     * @return
     */
    public static Map<JedisPool, List<String>> assignKey(List<String> list, JedisCluster jedisCluster) {
        Map<JedisPool, List<String>> map = new HashMap<>();
        Map<String, List<String>> var1 = assignSlot(list, jedisCluster);
        Iterator<Map.Entry<String, List<String>>> iterator = var1.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, List<String>> next = iterator.next();
            JedisPool jedisPool = jedisCluster.getClusterNodes().get(next.getKey());
            map.put(jedisPool, next.getValue());
        }
        return map;

    }

    private static Map<Integer, String> discoverClusterSlots(Jedis jedis) {
        Map<Integer, String> slotsMap = new HashMap<>();
        List<Object> slots = jedis.clusterSlots();
        Iterator var3 = slots.iterator();
        while (var3.hasNext()) {
            Object slotInfoObj = var3.next();
            List<Object> slotInfo = (List) slotInfoObj;
            if (slotInfo.size() > 2) {
                List<Integer> slotNums = getAssignedSlotArray(slotInfo);
                List<Object> hostInfos = (List) slotInfo.get(2);
                if (!hostInfos.isEmpty()) {
                    String targetNode = generateHostAndPort(hostInfos);
                    Iterator<Integer> var4 = slotNums.iterator();
                    while (var4.hasNext()) {
                        Integer slot = var4.next();
                        slotsMap.put(slot, targetNode);
                    }
                }
            }
        }
        return slotsMap;
    }

    private static List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
        List<Integer> slotNums = new ArrayList<>();

        for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); ++slot) {
            slotNums.add(slot);
        }

        return slotNums;
    }

    private static String generateHostAndPort(List<Object> hostInfos) {
        String host = SafeEncoder.encode((byte[]) hostInfos.get(0));
        int port = ((Long) hostInfos.get(1)).intValue();
        return host + ":" + port;
    }
}

使用 assignKey 方法就可以分配管道。


相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-16 10:22:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-16 10:22:01       101 阅读
  3. 在Django里面运行非项目文件

    2023-12-16 10:22:01       82 阅读
  4. Python语言-面向对象

    2023-12-16 10:22:01       91 阅读

热门阅读

  1. C语言实现动态数组

    2023-12-16 10:22:01       60 阅读
  2. 力扣221. 最大正方形

    2023-12-16 10:22:01       57 阅读
  3. 使用OpenCV和PIL库读取图片的区别

    2023-12-16 10:22:01       58 阅读
  4. php语言的基础用法有哪些

    2023-12-16 10:22:01       62 阅读
  5. ElasticSearch之cat segments API

    2023-12-16 10:22:01       62 阅读
  6. centos7编译grpc源码

    2023-12-16 10:22:01       65 阅读
  7. Vue2面试题:说一下路由模式hash和history的区别?

    2023-12-16 10:22:01       52 阅读