学习笔记-微服务高级(黑马程序员)

Sentinel

  • 测试软件

    • jmeter
  • 雪崩问题

    • 个微服务往往依赖于多个其它微服务,服务提供者I发生了故障,依赖于当前服务的其它服务随着时间的推移形成级联失败
  • 超时处理

    • 设定超时时间,请求超过一定时间没有响应就返回错误信息
  • 仓壁模式

    • 限定每个业务能使用的线程数,避免耗尽整个tomcat的资源,也叫线程隔离
  • 断路器

    • 由断路器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务请求
  • 限流

    • 流量控制,限制业务访问的QPS,避免服务因流量的突增而故障
    • Sentinel
  • Sentinel

    • 运行
      • java -jar sentinel-dashboard-1.8.1.jar
    • 配置
      • server.port
      • sentinel.dashboard.auth.username
        • 默认用户名
      • sentinel.dashboard.auth.password
        • 默认密码
java -Dserver.port=8090 -jar sentinel-dashboard-1.8.1.jar

微服务整合

  • 依赖
<!--sentinel-->
<dependency>
    <groupId>com.alibaba.cloud</groupId> 
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

  • 配置
server:
  port: 8088
spring:
  cloud: 
    sentinel:
      transport:
        dashboard: localhost:8080

限流

  • 流控模式
  • 直接:对当前资源限流
  • 关联:高优先级资源触发阈值,对低优先级资源限流。
  • 链路:阈值统计时,只统计从指定资源进入当前资源的请求,是对请求来源的限流
  • 流控效果
    • 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常
      • 默认的处理方式
    • warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常
      • 这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值
    • 排队等待:让所有的请求按照先后次序排队执行
      • 两个请求的间隔不能小于指定时长

隔离和降级

  • 线程隔离
    • 调用者在调用服务提供者时,给每个调用的请求分配独立线程池
    • 出现故障时,最多消耗这个线程池内资源,避免把调用者的所有资源耗尽
  • 熔断降级
    • 是在调用方这边加入断路器,统计对服务提供者的调用
    • 如果调用的失败比例过高,则熔断该业务,不允许访问该服务的提供者

FeignClient整合Sentinel

  • 配置
feign:
  sentinel:
    enabled: true # 开启feign对sentinel的支持

  • 实现FallbackFactory
@Slf4j
public class UserClientFallbackFactory implements FallbackFactory<UserClient> {
    @Override
    public UserClient create(Throwable throwable) {
        return new UserClient() {
            @Override
            public User findById(Long id) {
                log.error("查询用户异常", throwable);
                return new User();
            }
        };
    }
}

  • FallbackFactory注册Bean:
@Bean
public UserClientFallbackFactory userClientFallbackFactory(){
    return new UserClientFallbackFactory();
}
  • UserClient接口中使用FallbackFactory
@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class)
public interface UserClient {

    @GetMapping("/user/{id}")
    User findById(@PathVariable("id") Long id);
}

线程隔离

  • 线程池隔离
    • 给每个服务调用业务分配一个线程池
    • 利用线程池本身实现隔离效果
  • 信号量隔离
    • 计数器模式,记录业务使用的线程数量
    • 达到信号量上限时,禁止新的请求

熔断降级

  • 断路器控制熔断和放行是通过状态机来完成
  • 状态机
    • closed:关闭状态
      • 断路器放行所有请求,并开始统计异常比例、慢请求比例
      • 超过阈值则切换到open状态
    • open:打开状态
      • 服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑
      • Open状态5秒后会进入half-open状态
    • half-open:半开状态
      • 放行一次请求,根据执行结果来判断接下来的操作。
      • 请求成功:则切换到closed状态
      • 请求失败:则切换到open状态
  • 器熔断策略
    • 慢调用
    • 异常比例
    • 异常数

授权规则

  • 白名单:来源(origin)在白名单内的调用者允许访问

  • 黑名单:来源(origin)在黑名单内的调用者不允许访问

  • 定义RequestOriginParser接口,返回不同的origin

@Component
public class HeaderOriginParser implements RequestOriginParser {
    @Override
    public String parseOrigin(HttpServletRequest request) {
        // 1.获取请求头
        String origin = request.getHeader("origin");
        // 2.非空判断
        if (StringUtils.isEmpty(origin)) {
            origin = "blank";
        }
        return origin;
    }
}

  • 网关的请求添加请求头
spring:
  cloud:
    gateway:
      default-filters:
        - AddRequestHeader=origin,gateway

  • 实现BlockExceptionHandler接口,自定义异常时的返回结果
    • 处理请求被限流、降级、授权拦截时抛出BlockException
@Component
public class SentinelExceptionHandler implements BlockExceptionHandler {
    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
        String msg = "未知异常";
        int status = 429;

        if (e instanceof FlowException) {
            msg = "请求被限流了";
        } else if (e instanceof ParamFlowException) {
            msg = "请求被热点参数限流";
        } else if (e instanceof DegradeException) {
            msg = "请求被降级了";
        } else if (e instanceof AuthorityException) {
            msg = "没有权限访问";
            status = 401;
        }

        response.setContentType("application/json;charset=utf-8");
        response.setStatus(status);
        response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");
    }
}

规则持久化

  • 规则管理模式
    • 原始模式
      • Sentinel的默认模式,将规则保存在内存,重启服务会丢失
    • pull模式
      • 控制台将配置的规则推送到Sentinel客户端
      • 客户端会将配置规则保存在本地文件或数据库中
      • 定时去本地文件或数据库中查询,更新本地规则
    • push模式
      • 控制台将配置规则推送到远程配置中心Nacos
      • Sentinel客户端监听Nacos,获取配置变更的推送消息,完成本地配置更新

实现push模式

  • 依赖
<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>

  • 配置
spring:
  cloud:
    sentinel:
      datasource:
        flow:
          nacos:
            server-addr: localhost:8848 # nacos地址
            dataId: orderservice-flow-rules
            groupId: SENTINEL_GROUP
            rule-type: flow # 还可以是:degrade、authority、param-flow

  • 修改sentinel-dashboard源码
    • 重新启动

分布式事务

  • CAP

    • Consistency(一致性)
    • Availability(可用性)
    • Partition tolerance (分区容错性)
  • BASE

    • Basically Available (基本可用)
    • Soft State(软状态)
    • Eventually Consistent(最终一致性)
  • 分布式事务

    • AP模式
      • 各子事务分别执行和提交,允许出现结果不一致
      • 采用弥补措施恢复数据即可,实现最终一致
    • CP模式
      • 各个子事务执行后互相等待,同时提交,同时回滚,达成强一致
      • 事务等待过程中,处于弱可用状态

Seata

  • Seata的架构
    • TC (Transaction Coordinator)
      • 事务协调者
    • TM (Transaction Manager)
      • 事务管理器
    • RM (Resource Manager)
      • 资源管理器
  • 下载
    • http://seata.io/zh-cn/blog/download.html

微服务集成

  • 依赖
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <exclusions>
        <!--版本较低,1.3.0,因此排除-->
        <exclusion>
            <artifactId>seata-spring-boot-starter</artifactId>
            <groupId>io.seata</groupId>
        </exclusion>
    </exclusions>
</dependency>
<!--seata starter 采用1.4.2版本-->
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <!--<version>1.4.2</version>-->
    <version>${seata.version}</version>
</dependency>

  • 配置
seata:
  registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
    # 参考tc服务自己的registry.conf中的配置
    type: nacos
    nacos: # tc
      server-addr: 127.0.0.1:8848
      namespace: ""
      group: DEFAULT_GROUP
      application: seata-tc-server # tc服务在nacos中的服务名称
      cluster: SH
  tx-service-group: seata-demo # 事务组,根据这个获取tc服务的cluster名称
  service:
    vgroup-mapping: # 事务组与TC服务cluster的映射关系
      seata-demo: SH

XA模式

  • RM一阶段的工作:

    • 注册分支事务到TC
    • 执行分支业务sql但不提交
    • 报告执行状态到TC
  • TC二阶段的工作:

    • TC检测各分支事务执行状态
      • 如果都成功,通知所有RM提交事务
      • 如果有失败,通知所有RM回滚事务
  • RM二阶段的工作:

    • 接收TC指令,提交或回滚事务
  • 配置文件

seata: 
  dtat-source-proxy-mode: XA
  • 添加注解
    • 全局事务的入口方法添加@GlobalTransactional注解

AT模式

  • 阶段一RM的工作:

    • 注册分支事务
    • 记录undo-log(数据快照)
    • 执行业务sql并提交
    • 报告事务状态
  • 阶段二提交时RM的工作:

    • 删除undo-log即可
    • 阶段二回滚时RM的工作:
    • 根据undo-log恢复数据到更新前
  • 配置文件

seata: 
  dtat-source-proxy-mode: AT
  • 添加注解
    • 全局事务的入口方法添加@GlobalTransactional注解

TCC模式

  • 需要实现三个方法
    • Try:资源的检测和预留;
    • Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
    • Cancel:预留资源释放,可以理解为try的反向操作。
@LocalTCC
public interface AccountTCCService {

    @TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel")
    void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
                @BusinessActionContextParameter(paramName = "money")int money);

    boolean confirm(BusinessActionContext ctx);

    boolean cancel(BusinessActionContext ctx);
}

SAGA模式

  • 一阶段
    • 直接提交本地事务
  • 二阶段
    • 成功则什么都不做;失败则通过编写补偿业务来回滚

Redis

Redis持久化

  • RDB持久化

    • Redis Database Backup file
    • Redis数据备份文件,也被叫做Redis数据快照。
      • 把内存中的所有数据都记录到磁盘中
      • 当Redis实例故障重启后,从磁盘读取快照文件,恢复数据
  • RDB执行时机

    • Redis停机时会执行一次RDB
    • Redis内部有触发RDB的机制
      • 可以在redis.conf文件中找到
  • 异步持久化bgsave

    • fork主进程得到一个子进程,共享内存空间
    • 子进程读取内存数据并写入新的RDB文件
    • 用新RDB文件替换旧的RDB文件
  • AOF持久化

  • Append Only File(追加文件)

  • Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件

  • 频率

    • always
    • ererysec
    • no

Redis主从

  • 搭建主从架构

    • 搭建主从集群,实现读写分离
  • 主从数据同步原理

    • 全量同步
      • 第一次同步
    • 增量同步
      • slave重启后同步

Redis哨兵

  • Sentinel

    • 监控
      • 心跳
    • 自动故障恢复
      • 选举新master
    • 通知
      • 通知客户端新的master
  • 心跳检测

    • 主观下线
    • 客观下线

RedisTemplate哨兵模式

  • 依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

spring:
  redis:
    password: 1234 #如果Redis有密码市一定要配置密码
    sentinel:
      master: mymaster #指定master名称
      nodes: # 指定Redis集群信息
        - ip地址:27001
        - ip地址:27002
        - ip地址:27003

  • RedisTemplate配置主从读写分离
@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
    // REPLICA_PREFERRED 优先从slave节点读取 
    return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}

Redis分片集群(增强存储能力)

  • 分片集群
    • 每个集群多个master,每个master保存不同数据
    • 每个master多个slave
    • master直接通过ping监测彼此
    • 客户端访问集群任意节点,最终都会被转发到正确的节点
  • 散列插槽
    • redis会把master节点映射到0~1638插槽上
    • 数据的key与插槽绑定

集群伸缩

  • 新增
    • redis-cli -a 密码 --cluster add-node ip地址:port ip地址:7001
  • 分配插槽
    • redis-cli -a 密码 --cluster reshard ip地址:port
  • 删除
    • redis-cli -a 密码 --cluster del-node ip地址:port 节点id

故障转移

  • 自动故障转移

    • 自动
  • 手动故障转移

    • cluster failover命令

RedisTemplate访问分片集群

  • 引入redis依赖
  • 配置分片集群地址
  • 配置读写分离
spring:
  redis:
    cluster:
      nodes:
        - ip:port
        - ip:port
        - ip:port
        - ip:port
        - ip:port

多级缓存

  • 多级缓存
    • 浏览器&客户端
    • nginx本地缓存
    • redis缓存
    • JVM进程缓存
    • 数据库

JVM进程缓存

  • 分布式缓存
    • redis
  • 进程本地缓存
    • HashMap
    • GuavaCache
    • Caffeine
      • Spring内部的缓存使用的就是Caffeine

Caffeine

// 构建cache
Cache<String,String> cache = Caffeine.newBuilder().build();
// 存储数据
cache.put("gf","xxx");
// 取值
String gf = cache.getIfPresent("gf");

String defaultGF = cache.get("defaultGF",key->{
    return "xxx" 
});

  • 缓存驱逐策略

    • 基于容器
      • 数量上限
    • 基于时间
      • 设置有效时间
    • 基于引用
      • 不建议使用
  • 基于容器

Cache<String,String> cache = Caffeine.newBuilder().maximumSize(1).build();
  • 基于时间
Cache<String,String> cache = Caffeine.newBuilder()
    .expireAfterWrite(Duration.ofSeconds(10))
    .build();

Lua语法

helloworld

  • 新建
touch hello.lua
  • 添加内容
print("hello world")
  • 运行
lua hello.lua

数据类型

  • 数据类型
    • nil
      • 无效值
    • boolean
    • number
    • string
    • function
    • table
      • 数组key为索引
        • local arr = {‘xx’,‘xx’}
        • print(arr[1])
      • map
        • loacl map = {name=‘xx’,age=2}
        • map[‘name’]
        • map.name
  • 查看数据类型
    • type()
  • 循环遍历table
local arr = {'ar','xx',ss}
for index,value in ipairs(arr) do
  print(index,value)
end
loacl map = {name='xx',age=2}
for index,value in ipairs(map) do
  print(index,value)
end
  • 函数
function printArr(arr)
  for index,value in ipairs(arr) do
    print(value)
  end
end
  • 条件控制
if(布尔表达式)
then

else

end
  • 逻辑运算
    • and
    • or
    • not

OpenResty

  • 基于nginx的高性能web平台

  • 安装

    • OpenResty
    • opm
  • 目录

    • /usr/local/openresty
  • 获取请求参数

    • 路径占位符
      • /item/111
      • local id=ngx.var[1]
    • 请求头
      • local headers = ngx.req.get_headers()
    • Get
      • local getParams = ngx.req.get_uri_args()
    • post
      • gx.req.read_body()
      • local postParams = ngx.req.get_post_args()
    • JSON
      • gx.req.read_body()
      • local jsonBody = ngx.req.get_post_data()
  • 封装http请求

    • /usr/lodal/openresty/lualib/common.lua
--封装函数,发送http请求,并解析响应
local function read http(path,params)
  local resp = ngx.location.capture(path,{
    method =ngx.HTTP GET,
    args =params,
  })
  if not resp then
    --记录错误信息,返回404
      ngx.log(ngx.ERR,"http not found,path:",path ,",args:",args)
      ngx.exit(404)
  end
  return resp.body
end
--将方法导出
Local _M = {
  read_http = read_http
}
return _M
--导入common函数库
local common=require('common')
local read_http = common.read_http
--导入cjson库
local cjson=require('cjson')
 
-- 获取路径参数
local id =ngx.var[1]

-- 查询商品信息
local itemJSON =read_http("/item/" .. id, nil)

-- 查询库存信息
local stockJSON = read_http("/item/stock/"..id, nil)
-- JSON转化为lua的table

local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock =stock.stock
item.sold = stock.sold
-- 把item序列化为ison 返回结果
ngx.say(cjson.encode(item))

nginx本地缓存

-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache
 
-- 封装查询函数
function read_data(key, expire, path, params)
    -- 查询本地缓存
    local val = item_cache:get(key)
    if not val then
        ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
        -- 查询redis
        val = read_redis("127.0.0.1", 6379, key)
        -- 判断查询结果
        if not val then
            ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
            -- redis查询失败,去查询http
            val = read_http(path, params)
        end
    end
    -- 查询成功,把数据写入本地缓存
    item_cache:set(key, val, expire)
    -- 返回数据
    return val
end
  • 设置缓存时间1800/60
-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, 1800,  "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)

Redis 缓存预热

  • 冷启动

    • 服务刚刚启动时,redis中并没有缓存
    • 如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力
  • 缓存预热

    • 在实际开发中,可以利用大数据统计用户访问的热点数据,项目启动时将这些热点数据提前查询并保存redis中
  • 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  • 配置
spring:
  redis:
    host: 192.168.150.101
  • 初始化类
 
@Component
public class RedisHandler implements InitializingBean {
 
    @Autowired
    private StringRedisTemplate redisTemplate;
 
    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;
 
    private static final ObjectMapper MAPPER = new ObjectMapper();
 
    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化缓存
        // 1.查询商品信息
        List<Item> itemList = itemService.list();
        // 2.放入缓存
        for (Item item : itemList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(item);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        }
 
        // 3.查询商品库存信息
        List<ItemStock> stockList = stockService.list();
        // 4.放入缓存
        for (ItemStock stock : stockList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(stock);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        }
    }
}
  • openResty查询redis
    • common.lua
-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)
 
-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
    local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
    local pool_size = 100 --连接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    if not ok then
        ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
    end
end
 
-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
    -- 获取一个连接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "连接redis失败 : ", err)
        return nil
    end
    -- 查询redis
    local resp, err = red:get(key)
    -- 查询失败处理
    if not resp then
        ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
    end
    --得到的数据为空处理
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
    end
    close_redis(red)
    return resp
end
 
-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
    local resp = ngx.location.capture(path,{
        method = ngx.HTTP_GET,
        args = params,
    })
    if not resp then
        -- 记录错误信息,返回404
        ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)
        ngx.exit(404)
    end
    return resp.body
end
-- 将方法导出
local _M = {  
    read_http = read_http,
    read_redis = read_redis
}  
return _M
  • 先查redis,没有再查服务
    • item.lua
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')
 
-- 封装查询函数
function read_data(key, path, params)
    -- 查询本地缓存
    local val = read_redis("127.0.0.1", 6379, key)
    -- 判断查询结果
    if not val then
        ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
        -- redis查询失败,去查询http
        val = read_http(path, params)
    end
    -- 返回数据
    return val
end
 
-- 获取路径参数
local id = ngx.var[1]
 
-- 查询商品信息
local itemJSON = read_data("item:id:" .. id,  "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)
 
-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold
 
-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

缓存同步

  • 同步策略
    • 设置有效期:给缓存设置有效期,到期后自动删除,再次查询时更新
    • 同步双写:在修改数据库的同时,直接修改缓存
    • 异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
      • MQ
      • Canal
        • 自己伪装成MySQL的一个slave节点,监听master的binary log变化

Canal

  • 依赖
<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>
  • 配置
canal:
  destination: heima # canal的集群名字,要与安装canal时设置的名称一致
  server: 192.168.150.101:11111 # canal服务地址
  • domain

@Data
@TableName("tb_item")
public class Item {
    @TableId(type = IdType.AUTO)
    @Id
    private Long id;//商品id
    @Column(name = "name")
    private String name;//商品名称
    private String title;//商品标题
    private Long price;//价格(分)
    private String image;//商品图片
    private String category;//分类名称
    private String brand;//品牌名称
    private String spec;//规格
    private Integer status;//商品状态 1-正常,2-下架
    private Date createTime;//创建时间
    private Date updateTime;//更新时间
    @TableField(exist = false)
    @Transient
    private Integer stock;
    @TableField(exist = false)
    @Transient
    private Integer sold;

  • 监听器

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {
 
    @Autowired
    private RedisHandler redisHandler;
    @Autowired
    private Cache<Long, Item> itemCache;
 
    @Override
    public void insert(Item item) {
        // 写数据到JVM进程缓存
        itemCache.put(item.getId(), item);
        // 写数据到redis
        redisHandler.saveItem(item);
    }
 
    @Override
    public void update(Item before, Item after) {
        // 写数据到JVM进程缓存
        itemCache.put(after.getId(), after);
        // 写数据到redis
        redisHandler.saveItem(after);
    }
 
    @Override
    public void delete(Item item) {
        // 删除数据到JVM进程缓存
        itemCache.invalidate(item.getId());
        // 删除数据到redis
        redisHandler.deleteItemById(item.getId());
    }
}
  • RedisHandler
@Component
public class RedisHandler implements InitializingBean {
 
    @Autowired
    private StringRedisTemplate redisTemplate;
 
    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;
 
    private static final ObjectMapper MAPPER = new ObjectMapper();
 
    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化缓存
        // 1.查询商品信息
        List<Item> itemList = itemService.list();
        // 2.放入缓存
        for (Item item : itemList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(item);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        }
 
        // 3.查询商品库存信息
        List<ItemStock> stockList = stockService.list();
        // 4.放入缓存
        for (ItemStock stock : stockList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(stock);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        }
    }
 
    public void saveItem(Item item) {
        try {
            String json = MAPPER.writeValueAsString(item);
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
 
    public void deleteItemById(Long id) {
        redisTemplate.delete("item:id:" + id);
    }
}

RabbitMQ高级特性

消息可靠性

  • 丢失原因

    • 发送时丢失
      • 生产者发送的消息未送达exchange
      • 消息到达exchange后未到达queue
    • MQ宕机,queue将消息丢失
    • consumer接收到消息后未消费就宕机
  • 解决方案

    • 生产者确认机制
    • mq持久化
    • 消费者确认机制
    • 失败重试机制

生产者消息确认

  • publisher-confirm,发送者确认

    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执

    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因
  • 配置

spring:
  rabbitmq:
    # simple:同步等待confirm结果,直到超时
    # correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-confirm-type: correlated
    # 开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    publisher-returns: true

    template:
      # 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
      mandatory: true
  • ReturnCallback
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投递失败,记录日志
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要,可以重发消息
        });
    }
}

  • ConfirmCallback
// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
    result -> {
        if(result.isAck()){
            // 3.1.ack,消息成功
            log.debug("消息发送成功, ID:{}", correlationData.getId());
        }else{
            // 3.2.nack,消息失败
            log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
        }
    },
    ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

// 休眠一会儿,等待ack回执
Thread.sleep(2000);

消息持久化

  • 消息持久化机制

    • 交换机持久化
    • 队列持久化
    • 消息持久化
  • 交换机持久化

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct", true, false);
}
  • 队列持久化
@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}
  • 消息持久化
// 准备消息
Message message = MessageBuilder.withBody("hello, spring".getBytes(Standardcharsets.UTF_8))
    // 默认就是持久化的
    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)    // durable 持久
    .build();
// 发送消息
rabbitTemplate.convertAndSend(  "simple.queue", message);

消费者确认

  • SpringAMQP三种确认模式:
    • manual
      • 手动ack,需要在业务代码结束后,调用api发送ack。
    • auto
      • 自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
    • none
      • 关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 关闭ack none/auto/manual

消费失败重试机制

  • 本地重试
    • 利用Spring的retry机制,在消费者出现异常时利用本地重试
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
  • 失败策略
    • MessageRecovery接口
      • RejectAndDontRequeueRecoverer
        • 重试耗尽后,直接reject,丢弃消息
        • 默认就是这种方式
      • ImmediateRequeueMessageRecoverer
        • 重试耗尽后,返回nack,消息重新入队
      • RepublishMessageRecoverer
        • 重试耗尽后,将失败消息投递到指定的交换机
@Configuration
public class ErrorMessageConfig {
    @Bean
    // 交换机
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    // 队列
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
 
    @Bean
    // RepublishMessageRecoverer,关联队列和交换机
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

死信交换机

  • 死信(dead letter)
    • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
    • 消息是一个过期消息,超时无人消费
    • 要投递的队列消息满了,无法投递
  • 死信交换机
    • 配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中
// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
        .deadLetterExchange("dl.direct") // 指定死信交换机
        .build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

TTL

  • ttl

    • 队列设置超时时间,配置x-message-ttl属性
  • 死信交换机、死信队列

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.ttl.direct"),
    key = "ttl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}
  • 声明一个队列,并且指定TTL
@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
        .ttl(10000) // 设置队列的超时时间,10秒
        .deadLetterExchange("dl.ttl.direct") // 指定死信交换机
        .build();
}
  • 将ttl与交换机绑定
@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
  • 发送消息时,设定TTL
@Test
public void testTTLMsg() {
    // 创建消息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("5000")
        .build();
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    log.debug("发送消息成功");
}

延迟队列

  • 插件

    • DelayExchange
  • 流程

    • 接收消息
    • 判断消息是否具备x-delay属性
    • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
    • 返回routing not found结果给消息发送者
    • x-delay时间到期后,重新投递消息到指定队列
  • 声明延迟交换机

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}
@Slf4j
@Configuration
public class DelayExchangeConfig {
 
    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }
 
    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

  • 发送延迟消息
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        // 添加延迟消息属性
        message.getMessageProperties().setDelay(5000);
        return message;
    }
});


惰性队列

  • 消息堆积

    • 当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限
    • 之后发送的消息就会成为死信,可能会被丢弃
  • Lazy Queues

    • 接收到消息后直接存入磁盘而非内存
    • 消费者要消费消息时才会从磁盘中读取并加载到内存
    • 支持数百万条的消息存储
  • 通过命令行将一个运行中的队列修改为惰性队列

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
  • java
@Bean
public Queue lazyQueue(){
    return QueueBuilder
            .durable("lazy.queue")
            .lazy() // 开启Lazy模式
            .build();
}
@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}

集群

  • 普通集群/叫标准集群(classic cluster)
    • 会在集群的各个节点间共享部分数据
      • 包括:交换机、队列元信息
      • 不包含:队列中的消息
    • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
    • 队列所在节点宕机,队列中的消息就会丢失
  • 镜像集群/主从模式
    • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份
    • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
    • 一个队列的主节点可能是另一个队列的镜像节点
    • 所有操作都是主节点完成,然后同步给镜像节点
    • 主宕机后,镜像节点会替代成新的主
  • 仲裁队列
    • 与镜像队列一样,都是主从模式,支持主从数据同步
    • 使用非常简单,没有复杂的配置
    • 主从同步基于Raft协议,强一致

仲裁队列

  • 创建仲裁队列
@Bean
public Queue quorumQueue() {
    return QueueBuilder
        .durable("quorum.queue") // 持久化
        .quorum() // 仲裁队列
        .build();
}
  • 配置
spring:
  rabbitmq:
    addresses: ip:port, ip:port, ip:port
    username: xx
    password: xx
    virtual-host: /

相关推荐

  1. 学习笔记-服务高级黑马程序员

    2024-04-25 12:40:04       16 阅读
  2. 学习笔记-服务基础(黑马程序员

    2024-04-25 12:40:04       16 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-04-25 12:40:04       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-04-25 12:40:04       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-04-25 12:40:04       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-04-25 12:40:04       20 阅读

热门阅读

  1. 4月24日,每日信息差

    2024-04-25 12:40:04       16 阅读
  2. python元组与列表的区别

    2024-04-25 12:40:04       44 阅读
  3. Okapi Framework

    2024-04-25 12:40:04       17 阅读
  4. Android配置环境

    2024-04-25 12:40:04       21 阅读
  5. C++读写二进制文件

    2024-04-25 12:40:04       12 阅读
  6. MacBook系统升级导致idea无法打开

    2024-04-25 12:40:04       14 阅读
  7. vue文件、js文件外部导入js

    2024-04-25 12:40:04       39 阅读