阐述Dubbo的并发控制原理

1 Dubbo的并发控制概述

可以通过Dubbo的参数配置,在Dubbo的服务提供方和服务消费方分别限制接口调用的并发数,下面分别阐述。

2 服务消费端的并发控制

2.1 原理解析

在服务消费端,具体进行接口调用并发控制的是 ActiveLimitFilter,它可以进行接口级别和方法级别的最大并发数设置。进行接口级别的最大并发数设置时,假设设置某个接口的最大并发数为10,则该接口下的每个方法最多同时发起10个请求。

在服务消费方真正发起远程调用前,会先调用 ActiveLimitFilter 的 invoke() 方法,核心源码如下所示。

/**
 * ActiveLimitFilter restrict the concurrent client invocation for a service or service's method from client side.
 * To use active limit filter, configured url with <b>actives</b> and provide valid >0 integer value.
 * <pre>
 *     e.g. <dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" "actives"="2"/>
 *      In the above example maximum 2 concurrent invocation is allowed.
 *      If there are more than configured (in this example 2) is trying to invoke remote method, then rest of invocation
 *      will wait for configured timeout(default is 0 second) before invocation gets kill by dubbo.
 * </pre>
 *
 * @see Filter
 */
@Activate(group = CONSUMER, value = ACTIVES_KEY)
public class ActiveLimitFilter implements Filter, Filter.Listener {

    private static final String ACTIVE_LIMIT_FILTER_START_TIME = "active_limit_filter_start_time";

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = RpcUtils.getMethodName(invocation);
        // actives 获取限定的最大并发数。默认值为0,代表最大并发数为 Integer.MAX_VALUE
        int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
        final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), RpcUtils.getMethodName(invocation));
        // 判断当前总调用数是否将超过设置的最大并发数
        if (!RpcStatus.beginCount(url, methodName, max)) {
            // timeout 调用的超时时间。默认值为0- wait后不会自动唤醒,需要等待别的线程唤醒。值大于0- wait后超时将自动唤醒,或者被别的线程唤醒。
            long timeout = invoker.getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            synchronized (rpcStatus) {
                while (!RpcStatus.beginCount(url, methodName, max)) {
                    try {
                        // 并发调用数超过了限定值,当前线程挂起,等待被唤醒
                        rpcStatus.wait(remain);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                    long elapsed = System.currentTimeMillis() - start;
                    remain = timeout - elapsed;
                    // 在超时时间内没有被唤醒,抛出超时异常
                    if (remain <= 0) {
                        throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
                            "Waiting concurrent invoke timeout in client-side for service:  " +
                                invoker.getInterface().getName() + ", method: " + RpcUtils.getMethodName(invocation) +
                                ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " +
                                rpcStatus.getActive() + ". max concurrent invoke limit: " + max);
                    }
                }
            }
        }

        invocation.put(ACTIVE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());

        return invoker.invoke(invocation);
    }

    //...


}



public static boolean beginCount(URL url, String methodName, int max) {
    max = (max <= 0) ? Integer.MAX_VALUE : max;
    RpcStatus appStatus = getStatus(url);
    RpcStatus methodStatus = getStatus(url, methodName);
    if (methodStatus.active.get() == Integer.MAX_VALUE) {
        return false;
    }
    for (int i; ; ) {
        // 获取该方法的当前调用数
        i = methodStatus.active.get();

        // 判断调用数是否将超过设置的最大并发数
        if (i == Integer.MAX_VALUE || i + 1 > max) {
            return false;
        }

        // 调用数+1
        if (methodStatus.active.compareAndSet(i, i + 1)) {
            break;
        }
    }

    appStatus.active.incrementAndGet();

    return true;
}

 综上所述,可知:

  • 在服务消费端,当发起的并发调用数大于服务消费端设置的最大并发数时,当前线程将被挂起
  • 如果在超时时间内由于并发请求量减少了而被其他线程唤醒,则当前线程将继续向服务提供方发起请求;
  • 如果在超时时间内没有被其他线程唤醒,则不向服务提供方发起请求,直接抛出异常。

2.2 使用

通过为接口或方法设置 actives 值来设置最大并发调用数。举例如下。

<dubbo:reference "actives"="30" id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/>

3 服务提供端的并发控制

3.1 原理解析

在服务提供端,具体进行请求处理并发控制的是 ExecuteLimitFilter,它可以进行接口级别和方法级别的最大并发数设置。进行接口级别的并发数设置时,假设设置某个接口的最大并发数为10,则该接口下的每个方法最多同时处理10个请求。

在服务提供端真正处理请求前,会先调用 ExecuteLimitFilter 的 invoke() 方法,核心源码如下所示。


/**
 * The maximum parallel execution request count per method per service for the provider.If the max configured
 * <b>executes</b> is set to 10 and if invoke request where it is already 10 then it will throw exception. It
 * continues the same behaviour un till it is <10.
 */
@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter, Filter.Listener {

    private static final String EXECUTE_LIMIT_FILTER_START_TIME = "execute_limit_filter_start_time";

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = RpcUtils.getMethodName(invocation);
        // executes 最大并发处理请求数。默认值为0,代表最大并发数为 Integer.MAX_VALUE
        int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
        // 判断当前处理请求数是否将超过设置的最大并发数
        if (!RpcStatus.beginCount(url, methodName, max)) {
            throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
                    "Failed to invoke method " + RpcUtils.getMethodName(invocation) + " in provider " +
                            url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
                            "\" /> limited.");
        }

        invocation.put(EXECUTE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());
        try {
            return invoker.invoke(invocation);
        } catch (Throwable t) {
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        }
    }


    //...
}

由上可知,当当前处理的请求数大于设定的最大并发处理数时,则不处理本次请求,直接抛出异常。

3.2 使用

通过为接口或方法设置 executes 值来设置最大并发处理数。举例如下。

<dubbo:service  executes="50" interface="org.apache.dubbo.test.common.api.DemoService" timeout="3000" ref="demoServiceImpl"/>

相关推荐

  1. 阐述Dubbo并发控制原理

    2024-03-18 18:34:04       17 阅读
  2. 阐述Dubbo服务提供方解码原理

    2024-03-18 18:34:04       9 阅读
  3. mysql MVCC(多版本并发控制)实现原理

    2024-03-18 18:34:04       28 阅读
  4. 【数据库原理】(26)数据库并发控制

    2024-03-18 18:34:04       30 阅读
  5. MVCC(多版本并发控制原理实现

    2024-03-18 18:34:04       17 阅读
  6. mvcc 并发事务控制

    2024-03-18 18:34:04       28 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-18 18:34:04       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-18 18:34:04       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-18 18:34:04       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-18 18:34:04       18 阅读

热门阅读

  1. 程序员应该如何选择职业赛道?

    2024-03-18 18:34:04       18 阅读
  2. 鸿蒙内核系统

    2024-03-18 18:34:04       20 阅读
  3. 5.66 BCC工具之offwaketime.py解读

    2024-03-18 18:34:04       16 阅读
  4. 备份恢复新体验!pgBackRest与IvorySQL的完美融合

    2024-03-18 18:34:04       18 阅读