简介:本文将以图文方式详述 Spring Cloud Alibaba 技术体系之 Nacos 服务注册源码解析。
目录
1.1.2.1 NacosAutoServiceRegistration
环境介绍:
- JDK 1.8
- nacos 1.4.2
- spring-boot 2.2.5.RELEASE
- spring-cloud Hoxton.SR3
- spring-cloud-alibaba 2.2.1.RELEASE
一、Nacos 服务架构
以 Spring Cloud 为基础搭建平台,Nacos 在服务架构中的位置如下图:
二、Nacos 服务注册流程图(源码级别)
1. 注册中心核心工作流程
1. 服务注册:当微服务实例启动时,它会将自己的信息(如 IP 地址、端口号、服务名称等)注册到注册中心。这通常需要发送一个注册请求到注册中心来完成。spring-cloud-commons 中定义了一套服务注册的规范:顶层接口是:ServiceRegistry。最终是调用 Nacos Server POST /nacos/v1/ns/instance 接口请求来完成注册。
2. 服务存储:注册中心接收到服务实例的注册信息后,会将其存储在注册表中。注册表是注册中心的核心组件,用于保存所有微服务实例的信息。在 Nacos 服务端是存储在下面这样一个数据结构中:
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
3. 服务发现:客户端微服务在调用某个服务时,会向注册中心发起服务发现请求。注册中心会根据请求中的服务名称等信息,从注册表中查找对应的服务实例信息,并返回给请求方。 spring-cloud-commons 中定义了一套服务发现的规范:顶层接口是:DiscoveryClient。
4. 心跳检测:为了确保注册表中的服务实例信息的准确性,注册中心会定期向各个服务实例发送心跳检测请求。服务实例在接收到心跳检测请求后,会返回一个响应,表明它仍然在线。如果注册中心在一段时间内没有收到某个服务实例的响应,就会将其从注册表中移除。Nacos Client 会启动一个定时任务每 5 秒发送一次心跳,最终是调用 Nacos PUT /nacos/v1/ns/instance/beat 接口请求完成心跳发送。Nacos Server 会开启一个定时任务来检测注册服务的健康情况,对于超过 15 秒没收到客户端心跳的实例,会设置为不健康状态,即 healthy=false,超过 30 秒没收到心跳,则会剔除该实例。Nacos Client 可以通过再次发送心跳恢复。
5. 服务下线:当微服务实例停止运行时,它会向注册中心发送一个下线请求。注册中心在接收到下线请求后,会将该服务实例从注册表中移除。最终是调用 Nacos Server DELETE /nacos/v1/ns/instance 接口请求来完成下线。
6. 服务变更通知:如果注册表中的服务实例信息发生变化(如新增、下线、IP地址变更等),注册中心会向订阅了该服务的客户端或其他微服务实例发送变更通知。这样,客户端或其他微服务实例就能及时获取到最新的服务实例信息,更新本地服务实例列表。Nacos Server 处理服务变更通知的核心类是 com.alibaba.nacos.naming.push.PushService。
三、Nacos 服务注册源码解析
1. 服务注册
1.1 客户端服务注册
1.1.1 添加依赖
Nacos 服务注册是在客户端主动发起的,在 Spring Boot 程序中通过自动装配来完成自动注册,首先要在 pom 引入依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
1.1.2 源码分析
我们首先找到 Spring Boot 的 spring.factories 文件,这个是针对引入的 jar 包做自动装配用的。
点进去看一下:
通过分析发现 NacosServiceRegistryAutoConfiguration 就是我们服务注册的核心配置类,该类定义了三个核心 Bean 对象:
- NacosServiceRegistry:完成服务注册功能,实现了 ServiceRegistry 接口。
- NacosRegistration:注册时用来存储 Nacos 服务端的相关信息。
- NacosAutoServiceRegistration:服务自动注册功能。
/**
* @author xiaojing
* @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
1.1.2.1 NacosAutoServiceRegistration
自动注册功能是怎么实现的呢?我们进一步跟进源码,发现 NacosAutoServiceRegistration 继承了抽象类 AbstractAutoServiceRegistration<Registration>
public class NacosAutoServiceRegistration
extends AbstractAutoServiceRegistration<Registration> {
}
该抽象类实现了 ApplicationListener<WebServerInitializedEvent> 接口。ApplicationListener 接口实际是一个事件监听器,其监听 WebServerInitializedEvent 事件:在容器启动且上下文准备就绪后,会调用 onApplicationEvent 方法。
public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware,
ApplicationListener<WebServerInitializedEvent> {
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
}
实际调用 bind(event) 方法,bind 方法会调用 start() 方法,最后调用 register() 方法;在内部去调用 this.serviceRegistry.register(getRegistration()) 方法完成服务注册。
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
private final ServiceRegistry<R> serviceRegistry;
/**
* Register the local service with the {@link ServiceRegistry}.
*/
protected void register() {
this.serviceRegistry.register(getRegistration());
}
这里的 register() 是典型的模版方法设计模式。
1.1.2.1 NacosServiceRegistry
我们接着 NacosServiceRegistry#register() 往下走:
private final NacosDiscoveryProperties nacosDiscoveryProperties;
private final NamingService namingService;
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
Instance instance = getNacosInstanceFromRegistration(registration);
try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
最后调用的是 namingService.registerInstance(serviceId, group, instance) 方法。
private BeatReactor beatReactor;
private NamingProxy serverProxy;
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}
这里有两个重点:
1. 如果实例节点是临时节点的话(默认是临时的,即 AP 模式),会组装一个心跳信息,然后通过 BeatReactor 组件发送心跳到服务端,也就是 服务续约,在心跳机制章节会详细介绍;
2、调用 ServerProxy 组件注册服务到服务端,即 服务注册。
然后再调用 serverProxy.registerService(groupedServiceName, groupName, instance) 方法,在方法中直接做 HTTP 调用 Nacos Server 的接口 POST /nacos/v1/ns/instance 进行服务注册。
/**
* register a instance to service with specified instance properties.
*
* @param serviceName name of service
* @param groupName group of service
* @param instance instance to register
* @throws NacosException nacos exception
*/
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
/**
* Request api.
*
* @param api api
* @param params parameters
* @param body body
* @param servers servers
* @param method http method
* @return result
* @throws NacosException nacos exception
*/
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (StringUtils.isNotBlank(nacosDomain)) {
for (int i = 0; i < maxRetry; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
exception.getErrMsg());
throw new NacosException(exception.getErrCode(),
"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
/**
* Call server.
*
* @param api api
* @param params parameters
* @param body body
* @param curServer ?
* @param method http method
* @return result
* @throws NacosException nacos exception
*/
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
Header header = builderHeader();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!IPUtil.containsPort(curServer)) {
curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
.observe(end - start);
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to request", e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
1.2 服务端处理服务注册
com.alibaba.nacos.naming.controllers.InstanceController#register() 方法是 Nacos Server 接收服务注册请求的入口:
/**
* Register new instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during register
*/
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
最后会调用 ServiceManager#registerInstanc() 方法,完成真正的服务注册操作。
/**
* Register an instance to a service in AP mode.
*
* <p>This method creates service or cluster silently if they don't exist.
*
* @param namespaceId id of namespace
* @param serviceName service name
* @param instance instance to register
* @throws Exception any error occurred in the process
*/
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
ServiceManager#registerInstanc() 主要做了四件事情:
1. 如果服务不存在,则创建一个服务;
2. 从本地缓存 serviceMap 中获取一个服务信息;
3. 校验服务不能为空;
4. 将服务的实例信息添加到 DataStore 的 dataMap 缓存中。
1.2.1 创建服务
最终会将服务信息保存到本地缓存 serviceMap 中。
/**
* Core manager storing all services in Nacos.
*
* @author nkorange
*/
@Component
public class ServiceManager implements RecordListener<Service> {
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
/**
* Create service if not exist.
*
* @param namespaceId namespace
* @param serviceName service name
* @param local whether create service by local
* @param cluster cluster
* @throws NacosException nacos exception
*/
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// 存储服务信息和初始化
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
private void putServiceAndInit(Service service) throws NacosException {
// 将服务信息保存到本地缓存serviceMap中
putService(service);
service = getService(service.getNamespaceId(), service.getName());
// 服务初始化,会做健康检查
service.init();
// 添加两个监听器,使用Raft协议和Distro协议维护数据一致性的,包括:Nacos Client感知服务提供者实例变更
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
/**
* Put service into manager.
*
* @param service service
*/
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}
}
注意:在service.init() 初始化服务时,会启动一个定时任务做不健康服务的 服务剔除。在 2.3 章节中会展开详细介绍。
1.2.2 从本地缓存中获取服务信息
/**
* Core manager storing all services in Nacos.
*
* @author nkorange
*/
@Component
public class ServiceManager implements RecordListener<Service> {
public Service getService(String namespaceId, String serviceName) {
if (serviceMap.get(namespaceId) == null) {
return null;
}
return chooseServiceMap(namespaceId).get(serviceName);
}
public Map<String, Service> chooseServiceMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
}
1.2.3 校验服务不能为空
一个简单的非空判断
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
1.2.4 服务实例信息“持久化”
将服务的相应实例信息保存到 DataStore 的 dataMap 中;由于存在多个服务实例同时注册的场景,所以要加一个 synchronized 锁。
/**
* Core manager storing all services in Nacos.
*
* @author nkorange
*/
@Component
public class ServiceManager implements RecordListener<Service> {
/**
* Add instance to service.
*
* @param namespaceId namespace
* @param serviceName service name
* @param ephemeral whether instance is ephemeral
* @param ips instances
* @throws NacosException nacos exception
*/
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances);
}
}
}
consistencyService 是一个接口,由于我们默认是采用 ephemeral 方式,所以以临时 Client 为例,我们看一下 DistroConsistencyServiceImpl;如果是持久client,则关注RaftConsistencyServiceImpl。
/**
* A consistency protocol algorithm called <b>Distro</b>
*
* <p>Use a distro algorithm to divide data into many blocks. Each Nacos server node takes responsibility for exactly
* one block of data. Each block of data is generated, removed and synchronized by its responsible server. So every
* Nacos server only handles writings for a subset of the total service data.
*
* <p>At mean time every Nacos server receives data sync of other Nacos server, so every Nacos server will eventually
* have a complete set of data.
*
* @author nkorange
* @since 1.0.0
*/
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
@Override
public void put(String key, Record value) throws NacosException {
// 服务实例信息持久化
onPut(key, value);
// Nacos集群间数据同步
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
/**
* Put a new record.
*
* @param key key of record
* @param value record
*/
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 往DataStore的dataMap中添加数据
dataStore.put(key, datum);
}
// 如果listener中没有这个key的话直接返回,key是在创建Service时添加进去的,见ServiceManager#putServiceAndInit()方法
if (!listeners.containsKey(key)) {
return;
}
// 通知Nacos client服务端服务实例信息发生变更,这里是先添加任务
notifier.addTask(key, DataOperation.CHANGE);
}
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
/**
* Add new notify task to queue.
*
* @param datumKey data key
* @param action action for data
*/
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
// 如果不存在,但是change事件,往services缓存中放一份数据
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
// 最后往tasks任务队列中添加任务
tasks.offer(Pair.with(datumKey, action));
}
}
}
/**
* Store of data.
*
* @author nkorange
* @since 1.0.0
*/
@Component
public class DataStore {
private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
public void put(String key, Datum value) {
dataMap.put(key, value);
}
}
最后一步将服务实例信息添加到 tasks 任务队列中后,按常理来说到这也就结束了;但是添加到任务队列之后呢,怎么处理呢?
注意到 DistroConsistencyServiceImpl 中有一个 @PostConstruct 修饰的 init() 方法,也就是说在DistroConsistencyServiceImpl 类构造器执行之后会执行这个方法启动 Notifier 通知器:
/**
* A consistency protocol algorithm called <b>Distro</b>
*
* <p>Use a distro algorithm to divide data into many blocks. Each Nacos server node takes responsibility for exactly
* one block of data. Each block of data is generated, removed and synchronized by its responsible server. So every
* Nacos server only handles writings for a subset of the total service data.
*
* <p>At mean time every Nacos server receives data sync of other Nacos server, so every Nacos server will eventually
* have a complete set of data.
*
* @author nkorange
* @since 1.0.0
*/
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
public class Notifier implements Runnable {
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
// 从队列中取出任务
Pair<String, DataOperation> pair = tasks.take();
// 处理服务信息变更
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
// 从services缓存中移除
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
// 通知数据发生变更
if (action == DataOperation.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
// 通知数据删除
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
下面着重看一下通知数据变更的 onChange() 方法:
/**
* Service of Nacos server side
*
* <p>We introduce a 'service --> cluster --> instance' model, in which service stores a list of clusters, which
* contain a list of instances.
*
* <p>his class inherits from Service in API module and stores some fields that do not have to expose to client.
*
* @author nkorange
*/
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
// 更新服务IP信息
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
/**
* Update instances.
*
* @param instances instances
* @param ephemeral whether is ephemeral instance
*/
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
// 如果实例所在的集群名称为null,设置集群名为DEFAULT
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
// 如果服务实例所在的集群不存在,这根据实例所在的clusterName新建cluster
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
// cluster信息维护到clusterMap缓存中
getClusterMap().put(instance.getClusterName(), cluster);
}
// 获取cluster集群下所有的实例
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
// 将新注册的那个实例添加到集群中
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
// 更新cluster集群的实例信息,对比新老数据做更新,感兴趣的可以点进去看一下
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
setLastModifiedMillis(System.currentTimeMillis());
// 调用PushService,通知客户端服务变更
getPushService().serviceChanged(this);
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
}
Nacos 服务注册,服务端主要做的事情:
1. 将服务信息和服务的实例信息保存到两个Map(serviceMap、dataMap)中;
2. 启动一个延时 5s 执行的定时任务做服务剔除/下线;
3. Notifier 做普通服务集群实例信息维护,调用 PushService 通知客户端服务信息发生变更。
2. 心跳机制
2.1 客户端发送心跳
Nacos Client 端会定时发送心跳到服务端,保证自己处于存活状态,防止服务端将服务剔除。Nacos Client 默认每 5 秒向服务端发送一次请求,通过服务端接口:PUT /nacos/v1/ns/instance/beat 发送心跳。
接上文 客户端服务注册 提到的 服务续约,在 BeatReactor#addBeatInfo() 方法中,主要是将 BeatTask 任务加入到线程池当中。
/**
* Beat reactor.
*
* @author harold
*/
public class BeatReactor implements Closeable {
private final ScheduledExecutorService executorService;
public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
/**
* Add beat information.
*
* @param serviceName service name
* @param beatInfo beat information
*/
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 将心跳任务添加到线程池中,发起一个心跳检测任务
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
}
重点看下 BeatTask#run() 方法
/**
* Beat reactor.
*
* @author harold
*/
public class BeatReactor implements Closeable {
private final ScheduledExecutorService executorService;
private final NamingProxy serverProxy;
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
// 心跳周期执行时间,默认是5秒
long nextTime = beatInfo.getPeriod();
try {
// 向Nacos Server服务端发送心跳请求
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
// 如果返回资源未找到,则立即重新注册服务
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
// 下一轮定时延时发送心跳请求
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
}
public class NamingProxy implements Closeable {
/**
* Send beat.
*
* @param beatInfo beat info
* @param lightBeatEnabled light beat
* @return beat result
* @throws NacosException nacos exception
*/
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
}
客户端心跳实际就是通过定时向 server 发送数据包,然后启动一个线程检测服务端的返回,然后延迟指定时间进行下一次定时,循环反复的过程。
2.2 服务端处理心跳
com.alibaba.nacos.naming.controllers.InstanceController#beat() 方法是 Nacos Server 接收客户端心跳请求的入口:
/**
* Create a beat for instance.
*
* @param request http request
* @return detail information of instance
* @throws Exception any error during handle
*/
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
// 设置心跳间隔
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
// 判断有无心跳内容
// 如果存在心跳内容则不是轻量级心跳就转化为RsInfo
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
// 获取对应的实例信息
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
if (instance == null) {
// 如果实例不存在且心跳内容也不存在
// 返回RESOURCE_NOT_FOUND给客户端,客户端会重新发起服务注册
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
// 根据心跳内容创建一个实例信息
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
// 注册实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
//获取服务的信息
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
//不存在心跳内容的话,要创建一个进行处理
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
// 处理客户端心跳核心方法
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
// 5秒间隔
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
// 告诉客户端不需要带上心跳信息了,变成轻量级心跳了
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
接着重点看下 Service#processClientBeat(),该方法将心跳信息放到一个定时线程中,并立即执行。
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
/**
* Process client beat.
*
* @param rsInfo metrics info of server
*/
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
}
public class HealthCheckReactor {
/**
* Schedule client beat check task without a delay.
*
* @param task health check task
* @return scheduled future
*/
public static ScheduledFuture<?> scheduleNow(Runnable task) {
return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);
}
}
接下来我们重点看看 ClientBeatProcessor#run()
public class ClientBeatProcessor implements Runnable {
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
// 更新对应实例下最近一次心跳信息
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
// 如果之前该实例是不健康的,会被设置为健康
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
// 发送服务变更事件
getPushService().serviceChanged(service);
}
}
}
}
}
public PushService getPushService() {
return ApplicationUtils.getBean(PushService.class);
}
}
/**
* Push service.
*
* @author nacos
*/
@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
/**
* Service changed.
*
* @param service service
*/
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
if (futureMap
.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
return;
}
this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}
}
run() 方法主要做两件事:
1. 更新对应实例的心跳时间
2. 发送服务变更事件
PushService#onApplicationEvent() 会监听 ServiceChangeEvent 事件:
/**
* Push service.
*
* @author nacos
*/
@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
// Nacos服务端给每个客户端实例推送udp包时,该实例就是一个udp客户端,
// clientMap中存放的就是这些udp客户端信息
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
// switchDomain.getDefaultPushCacheMillis()默认是10秒,
// 即10000毫秒,不会进入这个分支,所以compressData=null
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
// compressData=null,所以会进入这个分支,关注prepareHostsData(client)方法
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(),
(ackEntry == null ? null : ackEntry.key));
// 通过udp协议向Nacos消费者客户端推送数据
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
}
乍一看这个方法很复杂,其实主要就是开启了一个一次性延迟任务(注意不是定时任务,只会执行一次),它的职责就是通过 udp 协议向 Nacos 客户端推送数据,对应方法:udpPush(ackEntry)
@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap = new ConcurrentHashMap<>();
private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap = new ConcurrentHashMap<>();
private static DatagramSocket udpSocket;
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
// 如果重试次数大于MAX_RETRY_TIMES=1次,就不再发送udp包了
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
// 结合Receiver.run()可知,ackMap存放的是已发送udp但是还没收到ACK响应的数据包
ackMap.put(ackEntry.key, ackEntry);
// udpSendTimeMap存放每个udp数据包开始发送的时间
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
Loggers.PUSH.info("send udp packet: " + ackEntry.key);
// 发送udp数据包
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
// 又提交了一个延迟任务(延迟10秒),其实这个任务的作用就是重试,
// 实现的效果就是当前发送完udp之后,如果没有收到ACK响应,就隔10秒重发一次,并且只重试一次
GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
ackEntry.origin.getAddress().getHostAddress(), e);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
}
public static class Retransmitter implements Runnable {
Receiver.AckEntry ackEntry;
public Retransmitter(Receiver.AckEntry ackEntry) {
this.ackEntry = ackEntry;
}
@Override
public void run() {
if (ackMap.containsKey(ackEntry.key)) {
Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);
udpPush(ackEntry);
}
}
}
上面这段代码就一个作用:向 Nacos 客户端发送 udp 包,如果隔了 10 秒还没有收到 ACK 响应,就重发一次(通过另一个延迟任务实现)。
那怎么确认有没有接收到 Nacos 客户端的 ACK 响应呢?这就是我们另一个线程要干的活了:Receiver 线程。
@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
public static class Receiver implements Runnable {
@Override
public void run() {
while (true) {
byte[] buffer = new byte[1024 * 64];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {
udpSocket.receive(packet);
String json = new String(packet.getData(), 0, packet.getLength(), StandardCharsets.UTF_8).trim();
AckPacket ackPacket = JacksonUtils.toObj(json, AckPacket.class);
InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();
String ip = socketAddress.getAddress().getHostAddress();
int port = socketAddress.getPort();
// 接收到ACK响应的时间距离上次接收到的时间之差如果大于10秒
// ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L)
if (System.nanoTime() - ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) {
Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json);
}
String ackKey = getAckKey(ip, port, ackPacket.lastRefTime);
// ackMap存放的是已发送udp但是还没收到ACK响应的数据包
// 在接收到udp数据包之后,移除对应key
AckEntry ackEntry = ackMap.remove(ackKey);
if (ackEntry == null) {
throw new IllegalStateException(
"unable to find ackEntry for key: " + ackKey + ", ack json: " + json);
}
long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);
Loggers.PUSH
.info("received ack: {} from: {}:{}, cost: {} ms, unacked: {}, total push: {}", json, ip,
port, pushCost, ackMap.size(), totalPush);
// pushCostMap存放每个数据包的耗时
pushCostMap.put(ackKey, pushCost);
udpSendTimeMap.remove(ackKey);
} catch (Throwable e) {
Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);
}
}
}
}
}
Receiver 线程会一直轮询接收 udp 协议的响应,每接收到一个响应包,就会从 ackMap 和 udpSendTimeMap 中移除对应的 key。
注意:Receiver 线程是在 PushService static 代码块中完成初始化的。
2.3 Nacos 心跳机制总结
Nacos 客户端:
- 每 5 秒发送一次心跳请求给服务端,通过服务端接口:PUT /nacos/v1/ns/instance/beat;
Nacos 服务端:
udp 推送
- 当服务端注册表中实例发送了变更时,就会发布 ServiceChangeEvent 事件,就会被PushService 监听到,监听到之后就会以服务维度向客户端通过 udp 协议推送通知,从clientMap 中找出需要推送的客户端进行能推送;
- 如果发送失败或者超过10 秒没收到 ack 响应,就会隔 10 秒进行重试(从 ackMap 中找出需要重试的包,ackMap 由 Receiver 线程维护),最大重试次数默认为1次,超过1次就不再发送;
ack 接收
- PushService 类的 static 代码块中开启了守护线程 Receiver,用于循环接收来自客户端的 ack响应,使用 ackMap 维护所有已发送 udp 包但还没有进行 ack 响应的包,如果接收到 ack 响应,就从 ackMap 中移除;
udp 客户端维护
- PushService 类的 static 代码块中开启了一个定时任务(20秒一次)专门用来维护 clientMap(存放了所有需要进行udp推送的客户端),如果发现哪个客户端从初始化到响应 ack 的时间间隔超过了 10 秒,就从 clientMap 中移除,那么下次就不会再往这个客户端推送 udp 了。
2.3 Nacos服务的健康检查
Nacos Server 会开启一个定时任务来检查注册服务的健康情况,对于超过 15 秒没收到客户端的心跳实例会将它的 healthy 属性置为 false ,此时客户端不会将该实例的信息发现,如果某个服务的实例超过 30 秒没收到心跳,则剔除该实例,如果剔除的实例想要恢复,重新发送心跳即可。
当有实例注册的时候,我们会看到有个 service.init() 的方法,该方法的实现主要是将ClientBeatCheckTask 加入到线程池当中:
/**
* Service of Nacos server side
*
* <p>We introduce a 'service --> cluster --> instance' model, in which service stores a list of clusters, which
* contain a list of instances.
*
* <p>his class inherits from Service in API module and stores some fields that do not have to expose to client.
*
* @author nkorange
*/
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
@JsonIgnore
private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);
/**
* Init service.
*/
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
}
/**
* Check and update statues of ephemeral instances, remove them if they have been expired.
*
* @author nkorange
*/
public class ClientBeatCheckTask implements Runnable {
private Service service;
@Override
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
// 如果心跳时间超过15秒则设置该实例信息为不健康状况
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// 如果心跳时间超过30秒则删除该实例信息
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
private void deleteIp(Instance instance) {
try {
NamingProxy.Request request = NamingProxy.Request.newRequest();
request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
.appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
.appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
// delete instance asynchronously:
HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
instance.toJson(), result.getMessage(), result.getCode());
}
}
@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),
throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
}
}
}
/**
* Instance operation controller.
*
* @author nkorange
*/
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
/**
* Deregister instances.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during deregister
*/
@CanDistro
@DeleteMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String deregister(HttpServletRequest request) throws Exception {
Instance instance = getIpAddress(request);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);
return "ok";
}
serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
return "ok";
}
}