im-system 第九章(下)

读扩散&写扩散

读扩散举一个微博大V的例子,如果大V发一条消息,那么关注了大V的用户,就会从大V的队列中倒序拉取就可以获取到大V的消息了

读扩散

在这里插入图片描述

写扩散

写扩散也举一个微博大V的例子,如果大V发一条消息,每个用户都有自己的一个队列,大V会将消息写到所有订阅他的用户的队列中
在这里插入图片描述

  1. 单聊服务建议使用写扩散,也就是写操作X2,不会一下子给服务器太多的压力,而且比较好查询
  2. 群聊服务建议使用读扩散,群聊有群id作为分片键,在群id上建立索引,就可以查询出这个群的所有的消息

存储改进

在这里插入图片描述

mybatis的mapper添加批量插入的方法

im_message_history

@Data
@TableName("im_message_history")
public class ImMessageHistoryEntity {

    private Integer appId;

    private String fromId;

    private String toId;

    private String ownerId;

    /**
     * messageBodyId
     */
    private Long messageKey;
    /**
     * 序列号
     */
    private Long sequence;

    private String messageRandom;

    private Long messageTime;

    private Long createTime;

}

ImMessageBodyEntity

@Data
@TableName("im_message_body")
public class ImMessageBodyEntity {

    private Integer appId;

    /**
     * messageBodyId
     */
    private Long messageKey;

    /**
     * messageBody
     */
    private String messageBody;

    private String securityKey;

    private Long messageTime;

    private Long createTime;

    private String extra;

    private Integer delFlag;

}

ImMessageHistoryMapper

@Repository
public interface ImMessageHistoryMapper extends BaseMapper<ImMessageHistoryEntity> {

    /**
     * 批量插入(mysql)
     * @param entityList
     * @return
     */
    Integer insertBatchSomeColumn(Collection<ImMessageHistoryEntity> entityList);
}

EasySqlInjector

将它定义为1个bean就行了

@Component
public class EasySqlInjector extends DefaultSqlInjector {
    @Override
    public List<AbstractMethod> getMethodList(Class<?> mapperClass) {
        List<AbstractMethod> methodList = super.getMethodList(mapperClass);
        methodList.add(new InsertBatchSomeColumn()); // 添加InsertBatchSomeColumn方法
        return methodList;
    }

}

雪花算法工具

定义为1个bean即可

@Bean
public SnowflakeIdWorker buildSnowflakeSeq() {
    return new SnowflakeIdWorker(0);
}

SnowflakeIdWorker

@Slf4j
public class SnowflakeIdWorker {

    /**
     * 初始偏移时间戳
     */
    private static final long OFFSET = 1546300800L;

    /**
     * 机器id (0~15 保留 16~31作为备份机器)
     */
    private static long WORKER_ID;

    /**
     * 机器id所占位数 (5bit, 支持最大机器数 2^5 = 32)
     */
    private static final long WORKER_ID_BITS = 5L;
    /**
     * 自增序列所占位数 (16bit, 支持最大每秒生成 2^16 = ‭65536‬)
     */
    private static final long SEQUENCE_ID_BITS = 16L;
    /**
     * 机器id偏移位数
     */
    private static final long WORKER_SHIFT_BITS = SEQUENCE_ID_BITS;
    /**
     * 自增序列偏移位数
     */
    private static final long OFFSET_SHIFT_BITS = SEQUENCE_ID_BITS + WORKER_ID_BITS;
    /**
     * 机器标识最大值 (2^5 / 2 - 1 = 15)
     */
    private static final long WORKER_ID_MAX = ((1 << WORKER_ID_BITS) - 1) >> 1;
    /**
     * 备份机器ID开始位置 (2^5 / 2 = 16)
     */
    private static final long BACK_WORKER_ID_BEGIN = (1 << WORKER_ID_BITS) >> 1;
    /**
     * 自增序列最大值 (2^16 - 1 = ‭65535)
     */
    private static final long SEQUENCE_MAX = (1 << SEQUENCE_ID_BITS) - 1;
    /**
     * 发生时间回拨时容忍的最大回拨时间 (秒)
     */
    private static final long BACK_TIME_MAX = 1L;

    /**
     * 上次生成ID的时间戳 (秒)
     */
    private static long lastTimestamp = 0L;
    /**
     * 当前秒内序列 (2^16)
     */
    private static long sequence = 0L;
    /**
     * 备份机器上次生成ID的时间戳 (秒)
     */
    private static long lastTimestampBak = 0L;
    /**
     * 备份机器当前秒内序列 (2^16)
     */
    private static long sequenceBak = 0L;

    //==============================Constructors====================

    /**
     * 构造函数
     *
     * @param workerId     工作ID (0~31)
     */
    public SnowflakeIdWorker(long workerId) {
        if (workerId < 0 || workerId > WORKER_ID_MAX) {
            throw new IllegalArgumentException(String.format("cmallshop.workerId范围: 0 ~ %d 目前: %d", WORKER_ID_MAX, workerId));
        }
        WORKER_ID = workerId;
    }

    // ==============================Methods=================================
    public static long nextId() {
        return nextId(SystemClock.now() / 1000);
    }

    /**
     * 主机器自增序列
     *
     * @param timestamp 当前Unix时间戳
     * @return long
     */
    private static synchronized long nextId(long timestamp) {
        // 时钟回拨检查
        if (timestamp < lastTimestamp) {
            // 发生时钟回拨
            log.warn("时钟回拨, 启用备份机器ID: now: [{}] last: [{}]", timestamp, lastTimestamp);
            return nextIdBackup(timestamp);
        }

        // 开始下一秒
        if (timestamp != lastTimestamp) {
            lastTimestamp = timestamp;
            sequence = 0L;
        }
        if (0L == (++sequence & SEQUENCE_MAX)) {
            // 秒内序列用尽
//            log.warn("秒内[{}]序列用尽, 启用备份机器ID序列", timestamp);
            sequence--;
            return nextIdBackup(timestamp);
        }

        return ((timestamp - OFFSET) << OFFSET_SHIFT_BITS) | (WORKER_ID << WORKER_SHIFT_BITS) | sequence;
    }

    /**
     * 阻塞到下一个毫秒,直到获得新的时间戳
     *
     * @param lastTimestamp 上次生成ID的时间截
     * @return 当前时间戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 备份机器自增序列
     * @param timestamp timestamp 当前Unix时间戳
     * @return long
     */
    private static long nextIdBackup(long timestamp) {
        if (timestamp < lastTimestampBak) {
            if (lastTimestampBak - SystemClock.now() / 1000 <= BACK_TIME_MAX) {
                timestamp = lastTimestampBak;
            } else {
                throw new RuntimeException(String.format("时钟回拨: now: [%d] last: [%d]", timestamp, lastTimestampBak));
            }
        }

        if (timestamp != lastTimestampBak) {
            lastTimestampBak = timestamp;
            sequenceBak = 0L;
        }

        if (0L == (++sequenceBak & SEQUENCE_MAX)) {
            // 秒内序列用尽
//            logger.warn("秒内[{}]序列用尽, 备份机器ID借取下一秒序列", timestamp);
            return nextIdBackup(timestamp + 1);
        }

        return ((timestamp - OFFSET) << OFFSET_SHIFT_BITS) | ((WORKER_ID ^ BACK_WORKER_ID_BEGIN) << WORKER_SHIFT_BITS) | sequenceBak;
    }


    /**
     * 返回以毫秒为单位的当前时间
     *
     * @return 当前时间(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

}

MessageStoreService消息存储服务

相关推荐

  1. im-system

    2024-03-30 15:10:02       20 阅读
  2. Flask

    2024-03-30 15:10:02       41 阅读
  3. Map

    2024-03-30 15:10:02       36 阅读
  4. :分布式训练

    2024-03-30 15:10:02       33 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-30 15:10:02       19 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-30 15:10:02       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-30 15:10:02       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-30 15:10:02       20 阅读

热门阅读

  1. STM32 消息队列处理串口发送的报文

    2024-03-30 15:10:02       18 阅读
  2. 蓝桥杯2016年第十三届省赛真题-立方变自身

    2024-03-30 15:10:02       20 阅读
  3. 前端开发学习笔记(1)

    2024-03-30 15:10:02       14 阅读
  4. Ant Design Vue 搜索下拉框

    2024-03-30 15:10:02       15 阅读
  5. MyISAM和InnoDB

    2024-03-30 15:10:02       17 阅读
  6. C++开源项目研究——gh0st远控(一)

    2024-03-30 15:10:02       18 阅读
  7. 华为NPU下安装apex

    2024-03-30 15:10:02       17 阅读
  8. DevOps流动:技术视角与价值流视角互为补充

    2024-03-30 15:10:02       17 阅读