消费者在真正发起对provider的调用之前,会先经过Cluster层,里面就是我们常说的集群容错方案。
从Dubbo整体设计图上来看(参考:https://dubbo.apache.org/zh/docsv2.7/dev/design/ ),集群容错层位于以下位置:
为什么会需要容错方案呢?
当消费者在调用provider(一般会有多个提供者)时,有可能因为网络或其他原因导致失败,这时,框架需要能捕获到对应异常,并重新发起对其他provider的调用。
这个就被称为集群容错方案。下面一起来看下Dubbo提供的那些容错方案。
1.集群容错方案概览Dubbo Cluster接口有一系列的实现类,如下图所示:
我们找几个重点的容错方案来了解下
注意:集群容错方案的切换可以通过在接口上设置,如下所示
2.FailoverCluster (失败重试,默认方案)
2.1 适用场景
失败重试方案,当consumer调用provider失败后,会自动切换到其他的provider服务器进行重试(默认重试次数为2)。
通常这种方案适用于:读操作或者具有幂等的写操作。
2.2 源码解析public class FailoverCluster extends AbstractCluster {
// 这里的NAME,就是上面在中配置的cluster属性值
public final static String NAME = "failover";
@Override
public AbstractClusterInvoker doJoin(Directory directory) throws RpcException {
// 重点在这里
return new FailoverClusterInvoker(directory);
}
}
2.2.1 FailoverClusterInvoker
public class FailoverClusterInvoker extends AbstractClusterInvoker {
public Result doInvoke(Invocation invocation, final List invokers, LoadBalance loadbalance) throws RpcException {
// 这里是所有的provider Invoker集合
List copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
// 获取方法名称
String methodName = RpcUtils.getMethodName(invocation);
// 获取对应方法的retry参数配置,默认为2,所以总共尝试调用provider的次数为3
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
// 根据负载均衡策略,选择一个provider,这个策略后续专门说明下
Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 发起调用
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
...
}
return result;
} catch (RpcException e) {
// 业务异常则直接抛出,不再重试
if (e.isBiz()) { // biz exception.
throw e;
}
// 其他类型的异常则继续尝试
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
...
}
}
源码比较简单,就是获取retry重试次数参数后,针对所有的provider,通过负载均衡策略,选择一个合适的provider进行调用,非业务异常,则再次重试,直到retry次数后,如果还是失败,则直接抛出异常。
3.FailfastCluster(快速失败) 3.1 适用场景当Consumer调用provider失败后,直接抛出,就只调用一次。
这种比较适合那种非幂等性的写操作。
3.2 源码解析public class FailfastCluster extends AbstractCluster {
public final static String NAME = "failfast";
@Override
public AbstractClusterInvoker doJoin(Directory directory) throws RpcException {
return new FailfastClusterInvoker(directory);
}
}
3.2.1 FailfastClusterInvoker
public class FailfastClusterInvoker extends AbstractClusterInvoker {
public FailfastClusterInvoker(Directory directory) {
super(directory);
}
@Override
public Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
// 依旧按照策略选择一个provider Invoker
Invoker invoker = select(loadbalance, invocation, invokers, null);
try {
// 执行调用
return invoker.invoke(invocation);
} catch (Throwable e) {
// 失败则直接抛出异常
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
"Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
+ " select from all providers " + invokers + " for service " + getInterface().getName()
+ " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
e.getCause() != null ? e.getCause() : e);
}
}
}
4.FailsafeCluster (安全失败)
4.1 适用场景
安全失败,当consumer调用provider失败时,直接忽略异常。比较适合写入审计日志等操作。
4.2 源码解析public class FailsafeCluster extends AbstractCluster {
public final static String NAME = "failsafe";
@Override
public AbstractClusterInvoker doJoin(Directory directory) throws RpcException {
return new FailsafeClusterInvoker(directory);
}
}
4.2.1 FailsafeClusterInvoker
public class FailsafeClusterInvoker extends AbstractClusterInvoker {
private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
public FailsafeClusterInvoker(Directory directory) {
super(directory);
}
@Override
public Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
// 同上操作
Invoker invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
// 不同之处在这里,失败了,也是直接返回Result,忽略该异常
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}
}
5.FailbackCluster (失败自动恢复)
5.1 适用场景
当调用出现异常时,在后台记录失败的请求,按照一定的策略进行自动重试。比较适合消息通知类的操作
5.2 源码解析public class FailbackCluster extends AbstractCluster {
public final static String NAME = "failback";
@Override
public AbstractClusterInvoker doJoin(Directory directory) throws RpcException {
return new FailbackClusterInvoker(directory);
}
}
5.2.1 FailbackClusterInvoker
public class FailbackClusterInvoker extends AbstractClusterInvoker {
protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance) throws RpcException {
Invoker invoker = null;
try {
// 如上调用
checkInvokers(invokers, invocation);
invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
// 失败后则添加到本地
addFailed(loadbalance, invocation, invokers, invoker);
// 直接返回Result
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}
private void addFailed(LoadBalance loadbalance, Invocation invocation, List invokers, Invoker lastInvoker) {
if (failTimer == null) {
synchronized (this) {
if (failTimer == null) {
// 创建一个定时器
failTimer = new HashedWheelTimer(
new NamedThreadFactory("failback-cluster-timer", true),
1,
TimeUnit.SECONDS, 32, failbackTasks);
}
}
}
// 将当前重试作为一个Task,后续定时触发
RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
try {
failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
} catch (Throwable e) {
logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
}
}
}
6.ForkingCluster(并行调用)
6.1 适用场景
当consumer发起调用时,会并行调用多个provider的服务(上面都是每次只调用某一个provider),只要其中一个成功即返回。
这种模式适用于:实时性要求比较高的读操作
6.2 源码解析public class ForkingCluster extends AbstractCluster {
public final static String NAME = "forking";
@Override
public AbstractClusterInvoker doJoin(Directory directory) throws RpcException {
return new ForkingClusterInvoker(directory);
}
}
6.2.1 ForkingClusterInvoker
public class ForkingClusterInvoker extends AbstractClusterInvoker {
public Result doInvoke(final Invocation invocation, List invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
final List selected;
final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// 选择forks个数个provider
if (forks = invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList(forks);
while (selected.size() < forks) {
Invoker invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {
//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue ref = new LinkedBlockingQueue();
// 同时发起调用
for (final Invoker invoker : selected) {
executor.execute(() -> {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
// 所以的provider调用都失败了,则将Exception添加到ref中
if (value >= selected.size()) {
ref.offer(e);
}
}
});
}
try {
// 在timeout时间内获取结果,如果为Exception,则说明,所有调用都失败了
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
} finally {
// clear attachments which is binding to current thread.
RpcContext.getContext().clearAttachments();
}
}
}
总结:
本文介绍了Dubbo Consumer常用的几种集群容错方案,我们可以在适当的场景选择合适的方案。
当然也可以自定义,自定义也比较简单(源于Dubbo的高可扩展性),只需要实现Cluster接口,然后创建文件将该实现类添加到指定文件即可。