1 Dubbo的并发控制概述
可以通过Dubbo的参数配置,在Dubbo的服务提供方和服务消费方分别限制接口调用的并发数,下面分别阐述。
2 服务消费端的并发控制
2.1 原理解析
在服务消费端,具体进行接口调用并发控制的是 ActiveLimitFilter,它可以进行接口级别和方法级别的最大并发数设置。进行接口级别的最大并发数设置时,假设设置某个接口的最大并发数为10,则该接口下的每个方法最多同时发起10个请求。
在服务消费方真正发起远程调用前,会先调用 ActiveLimitFilter 的 invoke() 方法,核心源码如下所示。
/**
* ActiveLimitFilter restrict the concurrent client invocation for a service or service's method from client side.
* To use active limit filter, configured url with <b>actives</b> and provide valid >0 integer value.
* <pre>
* e.g. <dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" "actives"="2"/>
* In the above example maximum 2 concurrent invocation is allowed.
* If there are more than configured (in this example 2) is trying to invoke remote method, then rest of invocation
* will wait for configured timeout(default is 0 second) before invocation gets kill by dubbo.
* </pre>
*
* @see Filter
*/
@Activate(group = CONSUMER, value = ACTIVES_KEY)
public class ActiveLimitFilter implements Filter, Filter.Listener {
private static final String ACTIVE_LIMIT_FILTER_START_TIME = "active_limit_filter_start_time";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = RpcUtils.getMethodName(invocation);
// actives 获取限定的最大并发数。默认值为0,代表最大并发数为 Integer.MAX_VALUE
int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), RpcUtils.getMethodName(invocation));
// 判断当前总调用数是否将超过设置的最大并发数
if (!RpcStatus.beginCount(url, methodName, max)) {
// timeout 调用的超时时间。默认值为0- wait后不会自动唤醒,需要等待别的线程唤醒。值大于0- wait后超时将自动唤醒,或者被别的线程唤醒。
long timeout = invoker.getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
synchronized (rpcStatus) {
while (!RpcStatus.beginCount(url, methodName, max)) {
try {
// 并发调用数超过了限定值,当前线程挂起,等待被唤醒
rpcStatus.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
// 在超时时间内没有被唤醒,抛出超时异常
if (remain <= 0) {
throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Waiting concurrent invoke timeout in client-side for service: " +
invoker.getInterface().getName() + ", method: " + RpcUtils.getMethodName(invocation) +
", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " +
rpcStatus.getActive() + ". max concurrent invoke limit: " + max);
}
}
}
}
invocation.put(ACTIVE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());
return invoker.invoke(invocation);
}
//...
}
public static boolean beginCount(URL url, String methodName, int max) {
max = (max <= 0) ? Integer.MAX_VALUE : max;
RpcStatus appStatus = getStatus(url);
RpcStatus methodStatus = getStatus(url, methodName);
if (methodStatus.active.get() == Integer.MAX_VALUE) {
return false;
}
for (int i; ; ) {
// 获取该方法的当前调用数
i = methodStatus.active.get();
// 判断调用数是否将超过设置的最大并发数
if (i == Integer.MAX_VALUE || i + 1 > max) {
return false;
}
// 调用数+1
if (methodStatus.active.compareAndSet(i, i + 1)) {
break;
}
}
appStatus.active.incrementAndGet();
return true;
}
综上所述,可知:
- 在服务消费端,当发起的并发调用数大于服务消费端设置的最大并发数时,当前线程将被挂起;
- 如果在超时时间内由于并发请求量减少了而被其他线程唤醒,则当前线程将继续向服务提供方发起请求;
- 如果在超时时间内没有被其他线程唤醒,则不向服务提供方发起请求,直接抛出异常。
2.2 使用
通过为接口或方法设置 actives 值来设置最大并发调用数。举例如下。
<dubbo:reference "actives"="30" id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/>
3 服务提供端的并发控制
3.1 原理解析
在服务提供端,具体进行请求处理并发控制的是 ExecuteLimitFilter,它可以进行接口级别和方法级别的最大并发数设置。进行接口级别的并发数设置时,假设设置某个接口的最大并发数为10,则该接口下的每个方法最多同时处理10个请求。
在服务提供端真正处理请求前,会先调用 ExecuteLimitFilter 的 invoke() 方法,核心源码如下所示。
/**
* The maximum parallel execution request count per method per service for the provider.If the max configured
* <b>executes</b> is set to 10 and if invoke request where it is already 10 then it will throw exception. It
* continues the same behaviour un till it is <10.
*/
@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter, Filter.Listener {
private static final String EXECUTE_LIMIT_FILTER_START_TIME = "execute_limit_filter_start_time";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = RpcUtils.getMethodName(invocation);
// executes 最大并发处理请求数。默认值为0,代表最大并发数为 Integer.MAX_VALUE
int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
// 判断当前处理请求数是否将超过设置的最大并发数
if (!RpcStatus.beginCount(url, methodName, max)) {
throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Failed to invoke method " + RpcUtils.getMethodName(invocation) + " in provider " +
url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
"\" /> limited.");
}
invocation.put(EXECUTE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());
try {
return invoker.invoke(invocation);
} catch (Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
}
}
//...
}
由上可知,当当前处理的请求数大于设定的最大并发处理数时,则不处理本次请求,直接抛出异常。
3.2 使用
通过为接口或方法设置 executes 值来设置最大并发处理数。举例如下。
<dubbo:service executes="50" interface="org.apache.dubbo.test.common.api.DemoService" timeout="3000" ref="demoServiceImpl"/>