您当前的位置: 首页 >  dubbo

暂无认证

  • 0浏览

    0关注

    92582博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Dubbo 是如何控制并发数和限流的?

发布时间:2021-02-28 09:49:05 ,浏览量:0

点击上方“码农突围”,马上关注

这里是码农充电第一站,回复“666”,获取一份专属大礼包
真爱,请设置“星标”或点个“在看”

ExecuteLimitFilter

ExecuteLimitFilter ,在服务提供者,通过的 "executes" 统一配置项开启:表示每服务的每方法最大可并行执行请求数。

ExecuteLimitFilter是通过信号量来实现的对服务端的并发数的控制。

ExecuteLimitFilter执行流程:

  1. 首先会去获得服务提供者每服务每方法最大可并行执行请求数

  2. 如果每服务每方法最大可并行执行请求数大于零,那么就基于基于服务 URL + 方法维度获取一个RpcStatus实例

  3. 通过RpcStatus实例获取一个信号量,若果获取的这个信号量调用tryAcquire返回false,则抛出异常

  4. 如果没有抛异常,那么久调用RpcStatus静态方法beginCount,给这个 URL + 方法维度开始计数

  5. 调用服务

  6. 调用结束后计数调用RpcStatus静态方法endCount,计数结束

  7. 释放信号量

ExecuteLimitFilter

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    URL url = invoker.getUrl();
    String methodName = invocation.getMethodName();
    Semaphore executesLimit = null;
    boolean acquireResult = false;
    int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
    if (max > 0) {
        RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
        //            if (count.getActive() >= max) {
        /**
             * http://manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
        executesLimit = count.getSemaphore(max);
        if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than  limited.");
        }
    }
    long begin = System.currentTimeMillis();
    boolean isSuccess = true;
    RpcStatus.beginCount(url, methodName);
    try {
        Result result = invoker.invoke(invocation);
        return result;
    } catch (Throwable t) {
        isSuccess = false;
        if (t instanceof RuntimeException) {
            throw (RuntimeException) t;
        } else {
            throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
        }
    } finally {
        RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
        if(acquireResult) {
            executesLimit.release();
        }
    }
}

我们接下来看看RpcStatus这个类

private static final ConcurrentMap invoker, Invocation invocation) throws RpcException {

    if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
        throw new RpcException(
            "Failed to invoke service " +
            invoker.getInterface().getName() +
            "." +
            invocation.getMethodName() +
            " because exceed max service tps.");
    }

    return invoker.invoke(invocation);
}            
关注
打赏
1653961664
查看更多评论
立即登录/注册

微信扫码登录

0.3831s