【Nacos源码分析02-服务配置】

服务配置

服务配置中心介绍
首先我们来看一下,微服务架构下关于配置文件的一些问题:

  1. 配置文件相对分散。在一个微服务架构下,配置文件会随着微服务的增多变的越来越多,而且分散在各个微服务中,不好统一配置和管理。
  2. 配置文件无法区分环境。微服务项目可能会有多个环境,例如:测试环境、预发布环境、生产环境。每一个环境所使用的配置理论上都是不同的,一旦需要修改,就需要我们去各个微服务下手动维护,这比较困难。
  3. 配置文件无法实时更新。我们修改了配置文件之后,必须重新启动微服务才能使配置生效,这对一个正在运行的项目来说是非常不友好的。基于上面这些问题,我们就需要配置中心的加入来解决这些问题。

配置中心的思路是:
首先把项目中各种配置全部都放到一个集中的地方进行统一管理,并提供一套标准的接口。当各个服务需要获取配置的时候,就来配置中心的接口拉取自己的配置。当配置中心中的各种参数有更新的时候,也能通知到各个服务实时的过来同步最新的信息,使之动态更新。
在这里插入图片描述

Nacos Config入门

1.导入依赖

   <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
   </dependency>

2.配置nacos-config
:1)不能使用原来的application.yml作为配置文件,而是新建一个bootstrap.yml作为配置文件;2)在bootstrap和application数据项相同时,bootstrap中的配置不会被覆盖;
配置文件优先级(由高到低):
bootstrap.properties -> bootstrap.yml -> application.properties -> application.yml

    spring:
      application:
        name: service-name
      cloud:
        nacos:
          config:
            server-addr: localhost:8848 # nacos的服务端地址
            file-extension: yaml # 配置文件格式
      profiles:
        active: dev # 环境标识

3.自动装配
在这里插入图片描述

Nacos服务端配置发布源码

1.组装请求参数

@PostMapping
    @TpsControl(pointName = "ConfigPublish")
    @Secured(action = ActionTypes.WRITE, signType = SignType.CONFIG)
    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
            @RequestParam(value = "dataId") String dataId,
            @RequestParam(value = "group") String group,
            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
            @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
            @RequestParam(value = "appName", required = false) String appName,
            @RequestParam(value = "src_user", required = false) String srcUser,
            @RequestParam(value = "config_tags", required = false) String configTags,
            @RequestParam(value = "desc", required = false) String desc,
            @RequestParam(value = "use", required = false) String use,
            @RequestParam(value = "effect", required = false) String effect,
            @RequestParam(value = "type", required = false) String type,
            @RequestParam(value = "schema", required = false) String schema) throws NacosException {
        
        // //加密
        Pair<String, String> pair = EncryptionHandler.encryptHandler(dataId, content);
        content = pair.getSecond();
        
        // 参数校验
        ParamUtils.checkTenant(tenant);
        ParamUtils.checkParam(dataId, group, "datumId", content);
        ParamUtils.checkParam(tag);
    	//组装请求参数
        ConfigForm configForm = new ConfigForm();
        configForm.setDataId(dataId);
        configForm.setGroup(group);
        configForm.setNamespaceId(tenant);
        configForm.setContent(content);
        configForm.setTag(tag);
        configForm.setAppName(appName);
        configForm.setSrcUser(srcUser);
        configForm.setConfigTags(configTags);
        configForm.setDesc(desc);
        configForm.setUse(use);
        configForm.setEffect(effect);
        configForm.setType(type);
        configForm.setSchema(schema);
    
        if (StringUtils.isBlank(srcUser)) {
            configForm.setSrcUser(RequestUtil.getSrcUserName(request));
        }
        if (!ConfigType.isValidType(type)) {
            configForm.setType(ConfigType.getDefaultType().getType());
        }
    	
        ConfigRequestInfo configRequestInfo = new ConfigRequestInfo();
        configRequestInfo.setSrcIp(RequestUtil.getRemoteIp(request));
        configRequestInfo.setRequestIpApp(RequestUtil.getAppName(request));
        configRequestInfo.setBetaIps(request.getHeader("betaIps"));
    
        String encryptedDataKey = pair.getFirst();
       	
        return configOperationService.publishConfig(configForm, configRequestInfo, encryptedDataKey);
    }

2.添加或者更新配置

**
     * Adds or updates non-aggregated data.
     *
     * @throws NacosException NacosException.
     */
    public Boolean publishConfig(ConfigForm configForm, ConfigRequestInfo configRequestInfo, String encryptedDataKey)
            throws NacosException {
        //配置信息转map
        Map<String, Object> configAdvanceInfo = getConfigAdvanceInfo(configForm);
        //参数校验
        ParamUtils.checkParam(configAdvanceInfo);
        
        if (AggrWhitelist.isAggrDataId(configForm.getDataId())) {
            LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", configRequestInfo.getSrcIp(),
                    configForm.getDataId(), configForm.getGroup());
            throw new NacosApiException(HttpStatus.FORBIDDEN.value(), ErrorCode.INVALID_DATA_ID,
                    "dataId:" + configForm.getDataId() + " is aggr");
        }
        
        final Timestamp time = TimeUtils.getCurrentTime();
            // 构建ConfigInfo配置信息,发布配置最基本的五个参数: nameSpaceId、groupId、dataId、应用名称、配置内容
        ConfigInfo configInfo = new ConfigInfo(configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(),
                configForm.getAppName(), configForm.getContent());
       
        configInfo.setType(configForm.getType());
        configInfo.setEncryptedDataKey(encryptedDataKey);
           // 判断是否是beta测试版本
        if (StringUtils.isBlank(configRequestInfo.getBetaIps())) {
         // 正常发布,大部分情况下,我们都没有指定tag
            if (StringUtils.isBlank(configForm.getTag())) {
            // 1、插入 or 更新配置信息
            // 这里分为内置数据库(EmbeddedConfigInfoPersistServiceImpl)和外置数据库(ExternalConfigInfoPersistServiceImpl)操作,通常我们都是使用MySQL进行持久化存储
                configInfoPersistService.insertOrUpdate(configRequestInfo.getSrcIp(), configForm.getSrcUser(),
                        configInfo, time, configAdvanceInfo, false);
                ConfigChangePublisher.notifyConfigChange(
                        new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),
                                configForm.getNamespaceId(), time.getTime()));
            } else {
                configInfoTagPersistService.insertOrUpdateTag(configInfo, configForm.getTag(),
                        configRequestInfo.getSrcIp(), configForm.getSrcUser(), time, false);
                ConfigChangePublisher.notifyConfigChange(
                        new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),
                                configForm.getNamespaceId(), configForm.getTag(), time.getTime()));
            }
        } else {
            // 数据插入或者更新
            configInfoBetaPersistService.insertOrUpdateBeta(configInfo, configRequestInfo.getBetaIps(),
                    configRequestInfo.getSrcIp(), configForm.getSrcUser(), time, false);
            ConfigChangePublisher.notifyConfigChange(
                    new ConfigDataChangeEvent(true, configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(),
                            time.getTime()));
        }
        // 日志跟踪
        ConfigTraceService.logPersistenceEvent(configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(),
                configRequestInfo.getRequestIpApp(), time.getTime(), InetUtils.getSelfIP(),
                ConfigTraceService.PERSISTENCE_EVENT_PUB, configForm.getContent());
        
        return true;
    }
    public void insertOrUpdateBeta(final ConfigInfo configInfo, final String betaIps, final String srcIp,
            final String srcUser, final Timestamp time, final boolean notify) {
    // 没有直接判断是新增还是更新,而且依赖数据库唯一性做检查,重复了(报主键冲突,说明已存在)就做更新
        try {
        //往数据库中添加信息
            addConfigInfo4Beta(configInfo, betaIps, srcIp, null, time, notify);
        } catch (DataIntegrityViolationException ive) { // Unique constraint conflict
        	//报错则更新信息
            updateConfigInfo4Beta(configInfo, betaIps, srcIp, null, time, notify);
        }
    }
    public void addConfigInfo4Beta(ConfigInfo configInfo, String betaIps, String srcIp, String srcUser, Timestamp time,
            boolean notify) {
        String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
        String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
        String md5 = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
        String encryptedDataKey = StringUtils.isBlank(configInfo.getEncryptedDataKey()) ? StringUtils.EMPTY
                : configInfo.getEncryptedDataKey();
        try {
            ConfigInfoBetaMapper configInfoBetaMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
                    TableConstant.CONFIG_INFO_BETA);
            jt.update(configInfoBetaMapper.insert(
                            Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", "md5", "beta_ips",
                                    "src_ip", "src_user", "gmt_create", "gmt_modified", "encrypted_data_key")),
                    configInfo.getDataId(), configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(), md5,
                    betaIps, srcIp, srcUser, time, time, encryptedDataKey);
        } catch (CannotGetJdbcConnectionException e) {
            LogUtil.FATAL_LOG.error("[db-error] " + e, e);
            throw e;
        }
    }
  public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
            final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
        tjt.execute(status -> {
            try {
             // jdbcTemplate操作,自动插入到数据库表(config_info)中,返回主键id
                long configId = addConfigInfoAtomic(-1, srcIp, srcUser, configInfo, time, configAdvanceInfo);
                String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
                addConfigTagsRelation(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
                        configInfo.getTenant());
                  // 插入历史数据到表中(his_config_info)
                historyConfigInfoPersistService.insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");
            } catch (CannotGetJdbcConnectionException e) {
                LogUtil.FATAL_LOG.error("[db-error] " + e, e);
                throw e;
            }
            return Boolean.TRUE;
        });
    }

addConfigInfoAtomic方法

public long addConfigInfoAtomic(final long configId, final String srcIp, final String srcUser,
        final ConfigInfo configInfo, Map<String, Object> configAdvanceInfo) {
    // 取出配置信息
    final String appNameTmp =
            StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
    final String tenantTmp =
            StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
    
    final String desc = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("desc");
    final String use = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("use");
    final String effect = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("effect");
    final String type = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("type");
    final String schema = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("schema");
    final String encryptedDataKey =
            configInfo.getEncryptedDataKey() == null ? StringUtils.EMPTY : configInfo.getEncryptedDataKey();
 
    // 将配置内容进行MD5加密
    final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
    
    KeyHolder keyHolder = new GeneratedKeyHolder();
 
    // 根据数据库表获取对应的mapper, 通过插件化的形式, 灵活应对使用不同数据库的场景
    ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
            TableConstant.CONFIG_INFO);
    // 将参数转换成对应数据库类型的sql语句,拼接insert into config_info values(....)插入语句
    final String sql = configInfoMapper.insert(
            Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", "md5", "src_ip", "src_user",
                    "gmt_create", "gmt_modified", "c_desc", "c_use", "effect", "type", "c_schema",
                    "encrypted_data_key"));
    // 获取主键名称,默认值为id
    String[] returnGeneratedKeys = configInfoMapper.getPrimaryKeyGeneratedKeys();
    try {
        jt.update(new PreparedStatementCreator() {
            @Override
            public PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
                Timestamp now = new Timestamp(System.currentTimeMillis());
 
                // 通过预编译的PreparedStatement,设置每个字段的值
                PreparedStatement ps = connection.prepareStatement(sql, returnGeneratedKeys);
                ps.setString(1, configInfo.getDataId());
                ps.setString(2, configInfo.getGroup());
                ps.setString(3, tenantTmp);
                ps.setString(4, appNameTmp);
                ps.setString(5, configInfo.getContent());
                ps.setString(6, md5Tmp);
                ps.setString(7, srcIp);
                ps.setString(8, srcUser);
                ps.setTimestamp(9, now);
                ps.setTimestamp(10, now);
                ps.setString(11, desc);
                ps.setString(12, use);
                ps.setString(13, effect);
                ps.setString(14, type);
                ps.setString(15, schema);
                ps.setString(16, encryptedDataKey);
                return ps;
            }
        }, keyHolder);
        Number nu = keyHolder.getKey();
        if (nu == null) {
            throw new IllegalArgumentException("insert config_info fail");
        }
        return nu.longValue();
    } catch (CannotGetJdbcConnectionException e) {
        LogUtil.FATAL_LOG.error("[db-error] " + e, e);
        throw e;
    }
}

Nacos 服务端监控源码

在这里插入图片描述

1.客户端进行长轮询其实是使用定时线程来定时调用/v1/cs/configs/listener接口实现
路径 Nacos-config服务模块下的ConfigController.java
inner.doPollingConfig执行长轮询请求

     @PostMapping("/listener")
    @Secured(action = ActionTypes.READ, signType = SignType.CONFIG)
    public void listener(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        String probeModify = request.getParameter("Listening-Configs");
        if (StringUtils.isBlank(probeModify)) {
            LOGGER.warn("invalid probeModify is blank");
            throw new IllegalArgumentException("invalid probeModify");
        }
        //获取客户端需要监听的可能发送变化的配置,计算MD5值
        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
        
        Map<String, String> clientMd5Map;
        try {
            clientMd5Map = MD5Util.getClientMd5Map(probeModify);
        } catch (Throwable e) {
            throw new IllegalArgumentException("invalid probeModify");
        }
        
        // 发送长轮训
        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
    }

2.发送长轮询请求


    /**
     * long polling the config.
     */
    public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
            Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
        
        // 判断当前请求是否为长轮询,如果是,调用LongPollingService的addLongPollingClient()方法
        if (LongPollingService.isSupportLongPolling(request)) {
            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
            return HttpServletResponse.SC_OK + "";
        }
        //如果不是长轮训,就直接返回结果
        
        // Compatible with short polling logic.
        List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
        
        // Compatible with short polling result.
        String oldResult = MD5Util.compareMd5OldResult(changedGroups);
        String newResult = MD5Util.compareMd5ResultString(changedGroups);
        
        String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
        if (version == null) {
            version = "2.0.0";
        }
        int versionNum = Protocol.getVersionNumber(version);
        
        // Before 2.0.4 version, return value is put into header.
        if (versionNum < START_LONG_POLLING_VERSION_NUM) {
            response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
            response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
        } else {
            request.setAttribute("content", newResult);
        }
        
        // Disable cache.
        response.setHeader("Pragma", "no-cache");
        response.setDateHeader("Expires", 0);
        response.setHeader("Cache-Control", "no-cache,no-store");
        response.setStatus(HttpServletResponse.SC_OK);
        return HttpServletResponse.SC_OK + "";
    }

通过scheduler.schedule启动一个定时任务,并延时时间为29.5s
将ClientLongPolling实例本身添加到allSubs队列中,它主要维护一个长轮询的订阅关系。
定时任务执行后,先把ClientLongPolling实例本身从allSubs队列中移除。
通过MD5比较客户端请求的groupKeys是否发生变更,并将变更结果通过response返回给客户端
所谓长轮询就是服务端收到请求之后,不立即返回,而是在延29.5s才把请求结果返回给客户端,这使得客户端和服务端之间在30s之内数据没有发生变化的情况下一直处于连接状态。

/**
     * Add LongPollingClient.
     *
     * @param req              HttpServletRequest.
     * @param rsp              HttpServletResponse.
     * @param clientMd5Map     clientMd5Map.
     * @param probeRequestSize probeRequestSize.
     */
    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
            int probeRequestSize) {
        
        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        //获取客户端请求的超时时间,减去500ms后赋值给timeout变量。
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        
        // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
        long timeout = -1L;
        //判断isFixedPolling,如果为true,定时任务将会在30s后开始执行,否则在29.5s后开始执行
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // Do nothing but set fix polling timeout.
        } else {
            timeout = Math.max(10000, Long.parseLong(str) - delayTime);
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            //和服务端的数据进行MD5对比,如果发送变化则直接返回
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                        RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                        changedGroups.size());
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                        RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                        changedGroups.size());
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        ConnectionCheckResponse connectionCheckResponse = checkLimit(req);
        if (!connectionCheckResponse.isSuccess()) {
            generate503Response(req, rsp, connectionCheckResponse.getMessage());
            return;
        }
        
        // Must be called by http thread, or send response.
        final AsyncContext asyncContext = req.startAsync();
        
        // AsyncContext.setTimeout() is incorrect, Control by oneself
        asyncContext.setTimeout(0L);
        
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        //scheduler.execute 执行ClientLongPolling线程
        ConfigExecutor.executeLongPolling(
                new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }
    
@Override
public void run() {
    asyncTimeoutFuture = scheduler.schedule(new Runnable() {
        @Override
        public void run() {
            try {
             //将 ClientLongPolling 实例本身添加到 allSubs 队列中,它主要维护一个长轮询的订阅关系
                getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                //定时任务执行后,先把 ClientLongPolling 实例本身从 allSubs 队列中移除
                allSubs.remove(ClientLongPolling.this);
				 //判断是否为固定轮询
                if (isFixedPolling()) {
                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                        (System.currentTimeMillis() - createTime),
                        "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                        "polling",
                        clientMd5Map.size(), probeRequestSize);
                        //比较数据的 MD5 值,判断是否发生变更
                    List<String> changedGroups = MD5Util.compareMd5(
                        (HttpServletRequest)asyncContext.getRequest(),
                        (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                        //并将变更的结果通过response返回给客户端
                    if (changedGroups.size() > 0) {
                        sendResponse(changedGroups);
                    } else {
                        sendResponse(null);
                    }
                } else {
                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                        (System.currentTimeMillis() - createTime),
                        "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                        "polling",
                        clientMd5Map.size(), probeRequestSize);
                    sendResponse(null);
                }
            } catch (Throwable t) {
                LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
            }

        }

    }, timeoutTime, TimeUnit.MILLISECONDS);

    allSubs.add(this);
}

相关推荐

  1. 服务配置nacos

    2024-06-05 19:54:05       25 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-05 19:54:05       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-05 19:54:05       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-05 19:54:05       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-05 19:54:05       20 阅读

热门阅读

  1. iOS ActivityViewController使用

    2024-06-05 19:54:05       10 阅读
  2. docker安装minio及minio的使用

    2024-06-05 19:54:05       10 阅读
  3. axios学习

    2024-06-05 19:54:05       8 阅读
  4. 什么是封装?为什么是要封装?

    2024-06-05 19:54:05       10 阅读
  5. Python 变量相除:深入探索与实战解析

    2024-06-05 19:54:05       10 阅读
  6. 如何把docker里的内容拷贝出来

    2024-06-05 19:54:05       7 阅读
  7. Python开发入门:从基础到实践的全方位探索

    2024-06-05 19:54:05       7 阅读
  8. 前端--导出

    2024-06-05 19:54:05       11 阅读
  9. AI 领域未来的 5 个预测

    2024-06-05 19:54:05       8 阅读
  10. git 下载失败

    2024-06-05 19:54:05       9 阅读
  11. Qt6 QSslSocket 客户端设计踩坑

    2024-06-05 19:54:05       9 阅读