Cluster 模块的目标是暴露 Invoker 对象,实现统一的调用入口
@SPI(FailoverCluster.NAME) //默认扩展点
public interface Cluster {@Adaptive//基于 Directory ,创建 Invoker 对象<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}
复制代码
我们看下类图
从上图可以看到,每一个Cluster实现都对应一个Invoker实现。
那么我们从哪里入手分析呢?
当服务消费者启动时,在获取代理时,会加入集群路由。这里出现 FailoverCluster 。
FailoverCluster
//它实现Cluster。调用失败时自动切换。会切换到其他服务器。但重试会带来延迟,要设定重试次数。通常用于读操作
public class FailoverCluster implements Cluster {public final static String NAME = "failover";@Overridepublic <T> Invoker<T> join(Directory<T> directory) throws RpcException {return new FailoverClusterInvoker<T>(directory);//这里出现了FailoverClusterInvoker}}
复制代码
//AbstractClusterInvoker
我们先看FailoverClusterInvoker的基类。
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {protected final Directory<T> directory;protected final boolean availablecheck;private AtomicBoolean destroyed = new AtomicBoolean(false);private volatile Invoker<T> stickyInvoker = null;public AbstractClusterInvoker(Directory<T> directory) {this(directory, Url());}public AbstractClusterInvoker(Directory<T> directory, URL url) {this.directory = directory;this.availablecheck = Parameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK);}
}//list获得所有服务提供者 Invoker 集合
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {return directory.list(invocation);
}/**使用 LoadBalance 选择 invoker.LoadBalance 提供负责均衡策略invokers 候选的 Invoker 集合selected 已选过的 Invoker 集合. 注意:输入保证不重复*///从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (invokers == null || invokers.isEmpty())return null;String methodName = invocation == null ? "" : MethodName();// 获得 sticky 配置项,方法级boolean sticky = (0).getUrl().getMethodParameter(methodName, "sticky", false);{// 若 stickyInvoker 不存在于 invokers 中,说明不在候选中,需要置空,重新选择if (stickyInvoker != null && !ains(stickyInvoker)) {stickyInvoker = null;}// 若开启粘滞连接的特性,且 stickyInvoker 不存在于 selected 中,则返回 stickyInvoker 这个 Invoker 对象if (sticky && stickyInvoker != null && (selected == null || !ains(stickyInvoker))) {if (availablecheck && stickyInvoker.isAvailable()) {return stickyInvoker;}}}//该方法主要处理粘滞特性,具体选择逻辑在doSelect// 执行选择Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);// 若开启粘滞连接的特性,记录最终选择的 Invoker 到 stickyInvokerif (sticky) {stickyInvoker = invoker;}return invoker;}//从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (invokers == null || invokers.isEmpty())return null;// 1.如果只有一个 Invoker ,直接选择if (invokers.size() == 1)(0);//2.使用 Loadbalance ,选择一个 Invoker 对象Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);//10// 如果 selected中包含 或者 不可用&&availablecheck=true 则重新选择if ((selected != null && ains(invoker))|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {try {//3.重新选一个 Invoker 对象Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);if (rinvoker != null) {invoker = rinvoker;} else {//看下第一次选的位置,如果不是最后,选+1位置.int index = invokers.indexOf(invoker);try {//最后在避免碰撞invoker = index < invokers.size() - 1 ? (index + 1) : (0);} catch (Exception e) {logger.Message() + " may because invokers list dynamic change, ignore.", e);}}} catch (Throwable t) {("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);}}return invoker;}@Override//调用服务提供者的逻辑public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed();// 校验是否销毁// binding attachments into invocation.Map<String, String> contextAttachments = Context().getAttachments();if (contextAttachments != null && contextAttachments.size() != 0) {((RpcInvocation) invocation).addAttachments(contextAttachments);}// 获得所有服务提供者 Invoker 集合List<Invoker<T>> invokers = list(invocation);//-->进入6 7//获得 LoadBalance 对象LoadBalance loadbalance = initLoadBalance(invokers, invocation);//8// 设置调用编号,若是异步调用RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);// 执行调用(子Cluster的Invoker实现类的服务调用的差异逻辑。)return doInvoke(invocation, invokers, loadbalance);}@Override//调用服务提供者的逻辑public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed();// 校验是否销毁// binding attachments into invocation.Map<String, String> contextAttachments = Context().getAttachments();if (contextAttachments != null && contextAttachments.size() != 0) {((RpcInvocation) invocation).addAttachments(contextAttachments);}// 获得所有服务提供者 Invoker 集合List<Invoker<T>> invokers = list(invocation);//-->进入6 7//获得 LoadBalance 对象LoadBalance loadbalance = initLoadBalance(invokers, invocation);//8// 设置调用编号,若是异步调用RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);// 执行调用(子Cluster的Invoker实现类的服务调用的差异逻辑。)return doInvoke(invocation, invokers, loadbalance);}
复制代码
FailoverClusterInvoker
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyinvokers = invokers;checkInvokers(copyinvokers, invocation);// 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常String methodName = MethodName(invocation);// 得到最大可调用次数:最大可重试次数+1,默认最大可重试次数Constants.DEFAULT_RETRIES=2int len = getUrl().getMethodParameter(methodName, "retries", 2) + 1;if (len <= 0) {len = 1;}// retry loop.RpcException le = null; // 保存最后一次调用的异常// 保存已经调用过的InvokerList<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.Set<String> providers = new HashSet<String>(len);// failover机制核心实现:如果出现调用失败,那么重试其他服务器for (int i = 0; i < len; i++) {if (i > 0) {//i > 0进行重新选择,避免重试时,候选 Invoker 集合,已发生变化。checkWhetherDestroyed();copyinvokers = list(invocation);// check againcheckInvokers(copyinvokers, invocation);}// 根据负载均衡机制从copyinvokers中选择一个InvokerInvoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);//9// 保存每次调用的Invokerinvoked.add(invoker);// 设置已经调用的 Invoker 集合,到 Context 中Context().setInvokers((List) invoked);try {// RPC 调用得到 ResultResult result = invoker.invoke(invocation);//-// 重试过程中,将最后一次调用的异常信息以 warn 级别日志输出if (le != null && logger.isWarnEnabled()) {//le 非空,说明此时是重试调用成功logger.warn("Although retry the method " + methodName+ " in the service " + getInterface().getName()+ " was successful by the provider " + Url().getAddress()+ ", but there have been failed providers " + providers+ " (" + providers.size() + "/" + copyinvokers.size()+ ") from the registry " + Url().getAddress()+ " on the consumer " + LocalHost()+ " using the dubbo version " + Version() + ". Last error is: "+ le.getMessage(), le);}return result;} catch (RpcException e) {if (e.isBiz()) { // 如果是业务性质的异常,不再重试,直接抛出.throw e;}le = e;// 其他性质的异常统一封装成RpcException} catch (Throwable e) {le = new Message(), e);} finally {//保存已经调用的网络地址集合。providers.Url().getAddress());}} // 最大可调用次数用完还得到Result的话,抛出RpcException异常:重试了N次还是失败,并输出最后一次异常信息throw new Code(), "Failed to invoke the method "+ methodName + " in the service " + getInterface().getName()+ ". Tried " + len + " times of the providers " + providers+ " (" + providers.size() + "/" + copyinvokers.size()+ ") from the registry " + Url().getAddress()+ " on the consumer " + LocalHost() + " using the dubbo version "+ Version() + ". Last error is: "+ le.getMessage(), le.getCause() != null ? le.getCause() : le);}
复制代码
总结的时序图
转载于:
本文发布于:2024-01-28 00:10:27,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17063718273477.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |