服务配置
服务配置中心介绍
首先我们来看一下,微服务架构下关于配置文件的一些问题:
- 配置文件相对分散。在一个微服务架构下,配置文件会随着微服务的增多变的越来越多,而且分散在各个微服务中,不好统一配置和管理。
- 配置文件无法区分环境。微服务项目可能会有多个环境,例如:测试环境、预发布环境、生产环境。每一个环境所使用的配置理论上都是不同的,一旦需要修改,就需要我们去各个微服务下手动维护,这比较困难。
- 配置文件无法实时更新。我们修改了配置文件之后,必须重新启动微服务才能使配置生效,这对一个正在运行的项目来说是非常不友好的。基于上面这些问题,我们就需要配置中心的加入来解决这些问题。
配置中心的思路是:
首先把项目中各种配置全部都放到一个集中的地方进行统一管理,并提供一套标准的接口。当各个服务需要获取配置的时候,就来配置中心的接口拉取自己的配置。当配置中心中的各种参数有更新的时候,也能通知到各个服务实时的过来同步最新的信息,使之动态更新。
Nacos Config入门
1.导入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
2.配置nacos-config
:1)不能使用原来的application.yml作为配置文件,而是新建一个bootstrap.yml作为配置文件;2)在bootstrap和application数据项相同时,bootstrap中的配置不会被覆盖;
配置文件优先级(由高到低):
bootstrap.properties -> bootstrap.yml -> application.properties -> application.yml
spring:
application:
name: service-name
cloud:
nacos:
config:
server-addr: localhost:8848 # nacos的服务端地址
file-extension: yaml # 配置文件格式
profiles:
active: dev # 环境标识
3.自动装配
Nacos服务端配置发布源码
1.组装请求参数
@PostMapping
@TpsControl(pointName = "ConfigPublish")
@Secured(action = ActionTypes.WRITE, signType = SignType.CONFIG)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId,
@RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema) throws NacosException {
// //加密
Pair<String, String> pair = EncryptionHandler.encryptHandler(dataId, content);
content = pair.getSecond();
// 参数校验
ParamUtils.checkTenant(tenant);
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
//组装请求参数
ConfigForm configForm = new ConfigForm();
configForm.setDataId(dataId);
configForm.setGroup(group);
configForm.setNamespaceId(tenant);
configForm.setContent(content);
configForm.setTag(tag);
configForm.setAppName(appName);
configForm.setSrcUser(srcUser);
configForm.setConfigTags(configTags);
configForm.setDesc(desc);
configForm.setUse(use);
configForm.setEffect(effect);
configForm.setType(type);
configForm.setSchema(schema);
if (StringUtils.isBlank(srcUser)) {
configForm.setSrcUser(RequestUtil.getSrcUserName(request));
}
if (!ConfigType.isValidType(type)) {
configForm.setType(ConfigType.getDefaultType().getType());
}
ConfigRequestInfo configRequestInfo = new ConfigRequestInfo();
configRequestInfo.setSrcIp(RequestUtil.getRemoteIp(request));
configRequestInfo.setRequestIpApp(RequestUtil.getAppName(request));
configRequestInfo.setBetaIps(request.getHeader("betaIps"));
String encryptedDataKey = pair.getFirst();
return configOperationService.publishConfig(configForm, configRequestInfo, encryptedDataKey);
}
2.添加或者更新配置
**
* Adds or updates non-aggregated data.
*
* @throws NacosException NacosException.
*/
public Boolean publishConfig(ConfigForm configForm, ConfigRequestInfo configRequestInfo, String encryptedDataKey)
throws NacosException {
//配置信息转map
Map<String, Object> configAdvanceInfo = getConfigAdvanceInfo(configForm);
//参数校验
ParamUtils.checkParam(configAdvanceInfo);
if (AggrWhitelist.isAggrDataId(configForm.getDataId())) {
LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", configRequestInfo.getSrcIp(),
configForm.getDataId(), configForm.getGroup());
throw new NacosApiException(HttpStatus.FORBIDDEN.value(), ErrorCode.INVALID_DATA_ID,
"dataId:" + configForm.getDataId() + " is aggr");
}
final Timestamp time = TimeUtils.getCurrentTime();
// 构建ConfigInfo配置信息,发布配置最基本的五个参数: nameSpaceId、groupId、dataId、应用名称、配置内容
ConfigInfo configInfo = new ConfigInfo(configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(),
configForm.getAppName(), configForm.getContent());
configInfo.setType(configForm.getType());
configInfo.setEncryptedDataKey(encryptedDataKey);
// 判断是否是beta测试版本
if (StringUtils.isBlank(configRequestInfo.getBetaIps())) {
// 正常发布,大部分情况下,我们都没有指定tag
if (StringUtils.isBlank(configForm.getTag())) {
// 1、插入 or 更新配置信息
// 这里分为内置数据库(EmbeddedConfigInfoPersistServiceImpl)和外置数据库(ExternalConfigInfoPersistServiceImpl)操作,通常我们都是使用MySQL进行持久化存储
configInfoPersistService.insertOrUpdate(configRequestInfo.getSrcIp(), configForm.getSrcUser(),
configInfo, time, configAdvanceInfo, false);
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),
configForm.getNamespaceId(), time.getTime()));
} else {
configInfoTagPersistService.insertOrUpdateTag(configInfo, configForm.getTag(),
configRequestInfo.getSrcIp(), configForm.getSrcUser(), time, false);
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),
configForm.getNamespaceId(), configForm.getTag(), time.getTime()));
}
} else {
// 数据插入或者更新
configInfoBetaPersistService.insertOrUpdateBeta(configInfo, configRequestInfo.getBetaIps(),
configRequestInfo.getSrcIp(), configForm.getSrcUser(), time, false);
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(true, configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(),
time.getTime()));
}
// 日志跟踪
ConfigTraceService.logPersistenceEvent(configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(),
configRequestInfo.getRequestIpApp(), time.getTime(), InetUtils.getSelfIP(),
ConfigTraceService.PERSISTENCE_EVENT_PUB, configForm.getContent());
return true;
}
public void insertOrUpdateBeta(final ConfigInfo configInfo, final String betaIps, final String srcIp,
final String srcUser, final Timestamp time, final boolean notify) {
// 没有直接判断是新增还是更新,而且依赖数据库唯一性做检查,重复了(报主键冲突,说明已存在)就做更新
try {
//往数据库中添加信息
addConfigInfo4Beta(configInfo, betaIps, srcIp, null, time, notify);
} catch (DataIntegrityViolationException ive) { // Unique constraint conflict
//报错则更新信息
updateConfigInfo4Beta(configInfo, betaIps, srcIp, null, time, notify);
}
}
public void addConfigInfo4Beta(ConfigInfo configInfo, String betaIps, String srcIp, String srcUser, Timestamp time,
boolean notify) {
String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
String md5 = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
String encryptedDataKey = StringUtils.isBlank(configInfo.getEncryptedDataKey()) ? StringUtils.EMPTY
: configInfo.getEncryptedDataKey();
try {
ConfigInfoBetaMapper configInfoBetaMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
TableConstant.CONFIG_INFO_BETA);
jt.update(configInfoBetaMapper.insert(
Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", "md5", "beta_ips",
"src_ip", "src_user", "gmt_create", "gmt_modified", "encrypted_data_key")),
configInfo.getDataId(), configInfo.getGroup(), tenantTmp, appNameTmp, configInfo.getContent(), md5,
betaIps, srcIp, srcUser, time, time, encryptedDataKey);
} catch (CannotGetJdbcConnectionException e) {
LogUtil.FATAL_LOG.error("[db-error] " + e, e);
throw e;
}
}
public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
tjt.execute(status -> {
try {
// jdbcTemplate操作,自动插入到数据库表(config_info)中,返回主键id
long configId = addConfigInfoAtomic(-1, srcIp, srcUser, configInfo, time, configAdvanceInfo);
String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
addConfigTagsRelation(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
configInfo.getTenant());
// 插入历史数据到表中(his_config_info)
historyConfigInfoPersistService.insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");
} catch (CannotGetJdbcConnectionException e) {
LogUtil.FATAL_LOG.error("[db-error] " + e, e);
throw e;
}
return Boolean.TRUE;
});
}
addConfigInfoAtomic方法
public long addConfigInfoAtomic(final long configId, final String srcIp, final String srcUser,
final ConfigInfo configInfo, Map<String, Object> configAdvanceInfo) {
// 取出配置信息
final String appNameTmp =
StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
final String tenantTmp =
StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
final String desc = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("desc");
final String use = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("use");
final String effect = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("effect");
final String type = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("type");
final String schema = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("schema");
final String encryptedDataKey =
configInfo.getEncryptedDataKey() == null ? StringUtils.EMPTY : configInfo.getEncryptedDataKey();
// 将配置内容进行MD5加密
final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
KeyHolder keyHolder = new GeneratedKeyHolder();
// 根据数据库表获取对应的mapper, 通过插件化的形式, 灵活应对使用不同数据库的场景
ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
TableConstant.CONFIG_INFO);
// 将参数转换成对应数据库类型的sql语句,拼接insert into config_info values(....)插入语句
final String sql = configInfoMapper.insert(
Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", "md5", "src_ip", "src_user",
"gmt_create", "gmt_modified", "c_desc", "c_use", "effect", "type", "c_schema",
"encrypted_data_key"));
// 获取主键名称,默认值为id
String[] returnGeneratedKeys = configInfoMapper.getPrimaryKeyGeneratedKeys();
try {
jt.update(new PreparedStatementCreator() {
@Override
public PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
Timestamp now = new Timestamp(System.currentTimeMillis());
// 通过预编译的PreparedStatement,设置每个字段的值
PreparedStatement ps = connection.prepareStatement(sql, returnGeneratedKeys);
ps.setString(1, configInfo.getDataId());
ps.setString(2, configInfo.getGroup());
ps.setString(3, tenantTmp);
ps.setString(4, appNameTmp);
ps.setString(5, configInfo.getContent());
ps.setString(6, md5Tmp);
ps.setString(7, srcIp);
ps.setString(8, srcUser);
ps.setTimestamp(9, now);
ps.setTimestamp(10, now);
ps.setString(11, desc);
ps.setString(12, use);
ps.setString(13, effect);
ps.setString(14, type);
ps.setString(15, schema);
ps.setString(16, encryptedDataKey);
return ps;
}
}, keyHolder);
Number nu = keyHolder.getKey();
if (nu == null) {
throw new IllegalArgumentException("insert config_info fail");
}
return nu.longValue();
} catch (CannotGetJdbcConnectionException e) {
LogUtil.FATAL_LOG.error("[db-error] " + e, e);
throw e;
}
}
Nacos 服务端监控源码
1.客户端进行长轮询其实是使用定时线程来定时调用/v1/cs/configs/listener接口实现
路径 Nacos-config服务模块下的ConfigController.java
inner.doPollingConfig执行长轮询请求
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, signType = SignType.CONFIG)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
LOGGER.warn("invalid probeModify is blank");
throw new IllegalArgumentException("invalid probeModify");
}
//获取客户端需要监听的可能发送变化的配置,计算MD5值
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// 发送长轮训
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
2.发送长轮询请求
/**
* long polling the config.
*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
// 判断当前请求是否为长轮询,如果是,调用LongPollingService的addLongPollingClient()方法
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
//如果不是长轮训,就直接返回结果
// Compatible with short polling logic.
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// Compatible with short polling result.
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);
// Before 2.0.4 version, return value is put into header.
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
通过scheduler.schedule启动一个定时任务,并延时时间为29.5s
将ClientLongPolling实例本身添加到allSubs队列中,它主要维护一个长轮询的订阅关系。
定时任务执行后,先把ClientLongPolling实例本身从allSubs队列中移除。
通过MD5比较客户端请求的groupKeys是否发生变更,并将变更结果通过response返回给客户端
所谓长轮询就是服务端收到请求之后,不立即返回,而是在延29.5s才把请求结果返回给客户端,这使得客户端和服务端之间在30s之内数据没有发生变化的情况下一直处于连接状态。
/**
* Add LongPollingClient.
*
* @param req HttpServletRequest.
* @param rsp HttpServletResponse.
* @param clientMd5Map clientMd5Map.
* @param probeRequestSize probeRequestSize.
*/
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
//获取客户端请求的超时时间,减去500ms后赋值给timeout变量。
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
long timeout = -1L;
//判断isFixedPolling,如果为true,定时任务将会在30s后开始执行,否则在29.5s后开始执行
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// Do nothing but set fix polling timeout.
} else {
timeout = Math.max(10000, Long.parseLong(str) - delayTime);
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
//和服务端的数据进行MD5对比,如果发送变化则直接返回
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
ConnectionCheckResponse connectionCheckResponse = checkLimit(req);
if (!connectionCheckResponse.isSuccess()) {
generate503Response(req, rsp, connectionCheckResponse.getMessage());
return;
}
// Must be called by http thread, or send response.
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout() is incorrect, Control by oneself
asyncContext.setTimeout(0L);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
//scheduler.execute 执行ClientLongPolling线程
ConfigExecutor.executeLongPolling(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
@Override
public void run() {
asyncTimeoutFuture = scheduler.schedule(new Runnable() {
@Override
public void run() {
try {
//将 ClientLongPolling 实例本身添加到 allSubs 队列中,它主要维护一个长轮询的订阅关系
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
//定时任务执行后,先把 ClientLongPolling 实例本身从 allSubs 队列中移除
allSubs.remove(ClientLongPolling.this);
//判断是否为固定轮询
if (isFixedPolling()) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
//比较数据的 MD5 值,判断是否发生变更
List<String> changedGroups = MD5Util.compareMd5(
(HttpServletRequest)asyncContext.getRequest(),
(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
//并将变更的结果通过response返回给客户端
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);
}