Dubbo服务调用过程

基于注册中心目录服务,使服务消费方能动态的查找服务提供方,使地址透明,不再需要写死服务提供方地址,注册中心基于接口名查询服务提供者的 IP 地址,使服务提供方可以平滑增加或减少机器。

角色分类

以功能角度来说服务可以分成以下几种:

  • 服务提供者;
  • 服务消费者;
  • 服务提供者兼消费者。

注册中心分类

可以分成以下几种注册中心:

  • Simple 注册中心 点对点直连
  • Multicast 注册中心 多播
  • Zookeeper 注册中心
  • Redis 注册中心

配置

服务提供者(provider)配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!-- 应用名称,可显示依赖关系 -->
<dubbo:application name="dubbo-order-server" />

<!-- 注册中心是ZooKeeper,也可以选择Redis做注册中心 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"
client="zkclient" />

<!-- 通过dubbo协议在注册中心(127.0.0.1表示本机)的20880端口暴露服务 -->
<dubbo:protocol name="dubbo" host="127.0.0.1" port="20880" />

<!-- 提供服务用地的是service标签,将该接口暴露到dubbo中 -->
<dubbo:service interface="com.dubbo.service.OrderService"
ref="orderService" />

<!-- Spring容器加载具体的实现类-->
<bean id="orderService" class="dubbo.service.impl.OrderServiceImpl" />

<dubbo:monitor protocol="registry" />

服务消费者(consumer)配置:

1
2
3
4
5
6
7
8
9
10
11
<!-- 应用名称,可显示依赖关系 -->
<dubbo:application name="dubbo-user-consumer" />

<!-- zookeeper作为注册中心 ,也可以选择Redis做注册中心 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"
client="zkclient" />

<dubbo:protocol host="127.0.0.1" />

<!-- 调用服务使用reference标签,从注册中心中查找服务 -->
<dubbo:reference id="orderService" interface="com.dubbo.service.OrderService" />

查看服务注册/暴露结果

Dubbo服务注册信息
Dubbo 在 ZooKeeper 中以树形结构维护服务注册信息:

  • 服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址;
  • 服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址;
  • 监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。

ZooKeeper 启动的时候会把配置信息加载进内存并持久化到数据库,然后启动定时器脏数据检查定时器 DirtyCheckTask,分别检查消费者和提供者的地址列表缓存、消费者和提供者地址列表的数据库数据,清理不存活的消费者和提供者数据,对于缓存中的存在的消费者和提供者而数据库不存在,提供者重新注册和消费者重新订阅。

Dubbo 提供了一些异常情况下的兜底方案:

  • 当提供者出现断电等异常停机时,注册中心能自动删除提供者信息
  • 当注册中心重启时,能自动恢复注册数据,以及订阅请求
  • 当会话过期时,能自动恢复注册数据,以及订阅请求
  • 当设置 <dubbo:registry check=”false” /> 时,记录失败注册和订阅请求,后台定时重试

在了解 ZooKeeper 基础上,还可以增加一些配置来修改注册细节:
可通过 <dubbo:registry username="admin" password="1234" /> 设置 ZooKeeper 登录信息
可通过 <dubbo:registry group="dubbo" /> 设置 ZooKeeper 的根节点,不设置将使用无根树
支持 * 号通配符 <dubbo:reference group="*" version="*" /> ,可订阅服务的所有分组和所有版本的提供者

在 Provider 启动完毕后,可以登录到 ZooKeeper 上查看注册的结果:

1
2
3
4
5
6
7
8
[zk: localhost:2181(CONNECTED) 11] ls /
[dubbo, zookeeper]
[zk: localhost:2181(CONNECTED) 12] ls /dubbo
[com.alibaba.dubbo.monitor.MonitorService, com.tallate.UserServiceBo]
[zk: localhost:2181(CONNECTED) 13] ls /dubbo/com.tallate.UserServiceBo
[configurators, consumers, providers, routers]
[zk: localhost:2181(CONNECTED) 14] ls /dubbo/com.tallate.UserServiceBo/providers
[dubbo%3A%2F%2F192.168.96.194%3A20880%2Fcom.tallate.UserServiceBo%3Fanyhost%3Dtrue%26application%3DdubboProvider%26dubbo%3D2.0.2%26generic%3Dfalse%26group%3Ddubbo%26interface%3Dcom.tallate.UserServiceBo%26methods%3DsayHello%2CtestPojo%2CsayHello2%26pid%3D28129%26revision%3D1.0.0%26side%3Dprovider%26timeout%3D3000%26timestamp%3D1575202776615%26version%3D1.0.0]

服务自动发现流程

服务自动发现功能完成下面这个流程,我们接下来分点概述:

  1. 服务提供者在启动时,向注册中心注册自己提供的服务。
  2. 服务消费者在启动时,向注册中心订阅自己所需的服务。
  3. 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
  4. 服务消费者,从提供者地址列表中,基于软负载均衡算法(基于软件的负载均衡,与 F5 相对),选一台提供者进行调用,如果调用失败,再选另一台调用。

注册和注销服务(Provider 执行流程)

服务的注册与注销,是对服务提供方角色而言,大致流程如下所示:
注册和注销服务

  1. 在接口提供者初始化时,每个接口都会创建一个 Invoker 和 Exporter,Exporter 持有 Invoker 实例,通过 Invocation 中的信息就可找到对应的 Exporter 和 Invoker
  2. 同 Consumer 的过程类似,调用 Invoker 前会调用 Invoker-Filter。
  3. 调用 Invoker.invoke() 时,通过反射调用最终的服务实现执行相关逻辑。

ServiceBean 负责了服务的暴露:

  • 继承自 ServiceConfig,export 方法实现了服务暴露的逻辑;
  • 实现了 Spring 中的 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware

启动时,ServiceBean 主要负责以下任务:

  • 生成 DubboExporter 对象并缓存起来
  • 添加过滤器和监听器支持
  • 在 zk 上注册相关信息,暴露服务,方便被感知到
  • 监听端口,等待通信的到来

Dubbo服务导出

  1. 前置工作,主要用于检查参数和组装 URL;
    ServiceBean#onApplicationEvent: 接收 Spring 上下文刷新事件后执行服务导出操作
    -> ServiceBean#export: 导出服务
    -> ProviderConfig.getExport、getDelay 获取配置,如果 export 为 false 则无法提供给其他服务调用、一般只提供给本地调试时使用,如果需要 delay 则将任务交给一个 ScheduledExecutorService 延迟执行,否则调用 doExport 暴露服务
    -> ServiceConfig.doExport 一堆配置检查
  2. 导出服务,包含导出服务到本地(JVM)和导出服务到远程两个过程;
    ServiceConfig.doExportUrls
    导出服务,Dubbo 中所有服务都通过 URL 导出,支持多协议多注册中心导出服务(遍历 ProtocolConfig 集合导出每个服务)
    AbstractInterfaceConfig#loadRegistries
    加载注册中心链接
    ServiceConfig#doExportUrlsFor1Protocol
    组装 URL,将服务注册到注册中心
    JavassistProxyFactory#getInvoker
    获取 Invoker 实例,用于接收请求
    ServiceConfig#exportLocal、DubboProtocol#export
    根据配置信息导出服务到本地或远程,远程默认取Dubbo协议
    DubboProtocol#openServer
    开始监听请求
  3. 向注册中心注册服务,用于服务发现
    Dubbo 服务注册本质是在 zk 指定目录下创建临时节点,路径是{group}/{Interface}/providers/{url}
    RegistryProtocol#register
    -> RegistryFactory#getRegistry
    -> AbstractRegistry#register

因为Dubbo一般使用ZooKeeper作为注册中心,所以完全可以利用ZooKeeper的临时节点自动删除机制来实现服务器下线自动踢出的机制。

服务订阅和取消(Consumer 执行流程)

为了满足应用系统的需求,服务消费方的可能需要从服务注册中心订阅指定的有服务提供方发布的服务,在得到通知可以使用服务时,就可以直接调用服务。反过来,如果不需要某一个服务了,可以取消该服务。
服务订阅和取消

有两种服务引入方式:

  1. 饿汉式:Spring 容器调用 ReferenceBean 的 afterPropertiesSet 方法时引用服务,可通过配置 <dubbo:reference> 的 init 属性开启。
  2. 懒汉式:ReferenceBean 对应的服务被注入到其他类中时引用

服务提供的方式有三种:

  1. 引用本地 (JVM) 服务;
  2. 通过直连方式引用远程服务;
  3. 通过注册中心引用远程服务。

不管是哪种引用方式,最后都会得到一个 Invoker 实例。如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例,此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。

获取客户端Proxy:

  1. 在 Consumer 初始化的时候,会生成一个代理注册到容器中,该代理回调中持有一个 Invoker 实例,消费调用服务接口时它的 invoke() 方法会被调用。
    spring.ReferenceBean#getObject
    ReferenceConfig#createProxy
    创建代理实例,根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用,不是本地引用的情况下默认采用Dubbo协议。
    Protocol#refer
    -> DubboProtocol#getClients 获取客户端实例,实例类型为 ExchangeClient,ExchangeClient 不具备通信能力,它需要依赖更底层的客户端实例
    -> DubboProtocol#getSharedClient 默认获取共享客户端
    -> DubboProtocol#initClient 创建客户端实例,默认为 Netty
    -> Exchangers#connect(URL url, ExchangeHandler handler)
  2. 使用 Cluster 合并 Invoker
    org.apache.dubbo.rpc.cluster.Cluster#join
    如果配置了多个 URL,则使用 Cluster 合并多个 Invoker
  3. 创建动态代理
    -> ProxyFactory#getProxy(Invoker invoker)
    常用的动态代理技术有 javassist、cglib、jdk,其中 dubbo 使用的是 javassist。

    根据早期 Dubbo 作者梁飞(http://javatar.iteye.com/blog/814426)的说法,使用 javassist 是为了性能。

Consumer端服务调用过程

Dubbo组件

调用代理类的方法

请求实际调用的是InvokerInvocationHandler.invoke

Registry & Directory

Registry 将注册信息保存到本地的Directory

启动服务时需要给一个Dubbo接口创建代理,这时需要将注册URL转换为Invoker对象:
org.apache.dubbo.registry.integration.RegistryProtocol#refer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}

引用一个服务时,会注册一个zkListener,监听注册服务的命名空间的变更情况。
org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
那么服务是怎么注册的呢?其实就是上边Provider注册服务的过程。
监听到注册中心的变更后,更新本地的Invoker列表,同时删除不可用的。
org.apache.dubbo.registry.integration.RegistryDirectory#refreshInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");

if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference

...

Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

...

try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}

Invoker使用Directory

为了服务高可用同一个服务一般会有多个应用服务器提供,要先挑选一个提供者提供服务。在服务接口消费者初始化时,接口方法和提供者 Invoker 对应关系保存在 Directory。 中,通过调用的方法名称(或方法名称+第一个参数)获取该方法对应的提供者 Invoker 列表,如注册中心设置了路由规则,对这些 Invoker 根据路由规则进行过滤。
启动时订阅某个服务:
org.apache.dubbo.registry.integration.RegistryProtocol#doRefer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 订阅providers、configurators、routers这几个namespace
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

// 使用Cluster组合Invoker
Invoker invoker = cluster.join(directory);
return invoker;
}

添加监听器:
org.apache.dubbo.registry.integration.RegistryDirectory#subscribe

1
2
3
4
5
6
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}

Consumer端监听服务变更事件,刷新Invoker列表:
org.apache.dubbo.registry.integration.RegistryDirectory#refreshInvoker

Registry的几种实现

  • ZooKeeperRegistry
  • RedisRegistry
    注册信息的存储,是在启动时调用的:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @Override
    public void doRegister(URL url) {
    // key = dubbo/com.package.to.InterfaceName/providers
    String key = toCategoryPath(url);
    // url的全名
    String value = url.toFullString();
    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
    boolean success = false;
    RpcException exception = null;
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
    JedisPool jedisPool = entry.getValue();
    try {
    Jedis jedis = jedisPool.getResource();
    try {
    // 使用hash结构,可以providers一个key下面存多个url
    jedis.hset(key, value, expire);
    jedis.publish(key, Constants.REGISTER);
    success = true;
    if (! replicate) {
    break; //  如果服务器端已同步数据,只需写入单台机器
    }
    } finally {
    jedisPool.returnResource(jedis);
    }
    } catch (Throwable t) {
    exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
    }
    }
    if (exception != null) {
    if (success) {
    logger.warn(exception.getMessage(), exception);
    } else {
    throw exception;
    }
    }
    }
    注册信息的主动删除,进程关闭时:
    1

Directory的几种实现

  • RegistryDirectory
    保存注册中心的服务注册信息,包括routers、configurators、provider。
  • StaticDirectory
    Invoker列表是固定的。

Cluster

封装了服务降级和容错机制,比如,如果调用失败则执行其他(FailoverClusterInvoker)、仍然调用失败则降级执行 mock(MockClusterInvoker)。
调用的第一层是MockClusterInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;

String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
// 没有设置mock属性或设置为false,则直接调就完了
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
//no mock
result = this.invoker.invoke(invocation);
}
// 配成force了,直接调mock方法
else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
}
// fail-mock的方式
else {
try {
result = this.invoker.invoke(invocation);

//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
result = doMockInvoke(invocation, rpcException);
}
}

} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}

if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}

实际invoke调用的是父类AbstractClusterInvoker的invoke方法,这个方法的主要功能是提供负载均衡:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();

// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}

// 找到所有可调用的服务器
List<Invoker<T>> invokers = list(invocation);
// 发送时要经过负载均衡
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}

上面的doInvoke是一个模板方法,由子类实现,默认子类是FailoverClusterInvoker,可以看到,它先通过负载均衡策略得到一个Invoker,再调用该Invoker,Invoker的默认实现是DubboInvoker,表示使用的是Dubbo协议。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private Result doInvoke(List<Invoker<T>> invokers,
final List<Invoker<T>> invoked,
Holder<RpcException> lastException,
final Set<String> providers,
final Invocation invocation,
final LoadBalance loadbalance,
final int totalRetries,
int retries,
Holder<Invoker<T>> lastInvoked) throws RpcException {
if (retries < totalRetries) {
checkWheatherDestoried();
invokers = list(invocation);
checkInvokers(invokers, invocation);
}

// 负载均衡
final Invoker<T> invoker = select(loadbalance, invocation, invokers, invoked);
invoked.add(invoker);
lastInvoked.value = invoker;
RpcContext.getContext().setInvokers((List) invoked);

try {
return invoker.invoke(invocation);
} catch (RpcException e) {
//业务异常不重试
if (e.isBiz()) {
throw e;
}
lastException.value = e;
} catch (Throwable e) {
lastException.value = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}

if (--retries == 0) {
throw populateException(invokers, lastException.value, providers, invocation, totalRetries);
}

return doInvoke(invokers, invoked, lastException, providers, invocation, loadbalance, totalRetries, retries, lastInvoked);
}

Cluster的实现

  • MockClusterInvoker
    调用失败降级到mock接口;
  • BroadcastClusterInvoker
    每个Invoker都调一次,忽略了LoadBalance;
  • AvailableClusterInvoker
    把处于可用状态的Invoker都调一遍。
  • FailoverClusterInvoker
    一个Invoker失败就换个Invoker重试几次。
  • FailbackClusterInvoker
    如果调用失败就放到一个线程池中延迟5秒再发,一般用于发消息。
  • FailfastClusterInvoker
    失败立刻报错
  • FailsafeClusterInvoker
    失败就忽略,一般是用于记日志这种失败了影响也不大的场景。
  • ForkingClusterInvoker
    一次性选n个Invoker,并行调用,只要有一个调用成功就返回,线程间通过LinkedBlockingQueue通信。

LoadBalance

Cluster 层包含多个 Invoker,LoadBalance 负责从中选出一个来调用,有多种 LoadBalance 策略,比如随机选一个(RandomLoadBalance)、轮询(RoundRobinLoadBalance)、一致性hash(ConsistentHashLoadBalance)。
实例化LoadBalance:com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
使用LoadBalance选择一个Invoker:com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#select

LoadBalance的多种实现

  • RandomLoadBalance
    计算权重,然后根据每个Invoker的权重调一个。
  • LeastActiveLoadBalance
    找最近最不活跃的Invoker调用,如果这样的Invoker有多个,则按权重来随机选一个。
  • RoundRobinLoadBalance
    轮询
  • ConsistentHashLoadBalance
    一致性哈希,启动时会将Invoker排列在一个圆环上:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
    this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
    this.identityHashCode = identityHashCode;
    URL url = invokers.get(0).getUrl();

    String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
    argumentIndex = new int[index.length];
    for (int i = 0; i < index.length; i++) {
    argumentIndex[i] = Integer.parseInt(index[i]);
    }

    int replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
    for (Invoker<T> invoker : invokers) {
    String address = invoker.getUrl().getAddress();
    // 多复制几个,更均匀,避免所有请求都被hash到同一个Invoker
    for (int i = 0; i < replicaNumber / 4; i++) {
    byte[] digest = md5(address + i);
    for (int h = 0; h < 4; h++) {
    long m = hash(digest, h);
    // 放入圆环上
    virtualInvokers.put(m, invoker);
    }
    }
    }
    }
    将Invoker保存到virtualInvokers上,但是virtualInvokers本身是一个HashMap,如果新来的请求不能精确hash到其中的某个Invoker怎么办?是通过tailMap找到的下一个Invoker:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Invoker<T> selectForKey(long hash) {
    Invoker<T> invoker;
    Long key = hash;

    if (!virtualInvokers.containsKey(key)) {
    SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
    if (tailMap.isEmpty()) {
    key = virtualInvokers.firstKey();
    } else {
    key = tailMap.firstKey();
    }
    }
    invoker = virtualInvokers.get(key);
    return invoker;
    }

Filter & Invoker 层

不过,在实际网络调用之前,Dubbo还提供Filter功能,Cluster会先激活Filter链然后最终调到DubboInvoker.invoke(RpcInvocation)

  1. ConsumerContextFilter可以将请求对象Invocation添加到上下文RpcContext中,其实就是存储到一个ThreadLocal变量中。
  2. FutureFilter在调用完毕后唤醒调用者线程。
  3. 或许还会有一些自定义的Filter,比如增加线程的TraceId、打印一些调用日志之类的,Filter结束后才最终调用到DubboInvoker

DubboInvoker封装了同步和异步调用,Dubbo 实现同步和异步调用比较关键的一点就在于由谁调用 ResponseFuture 的 get 方法。同步调用模式下,由框架自身调用 ResponseFuture 的 get 方法。异步调用模式下,则由用户调用该方法。
DubboInvoker是通过Netty发送消息的,消息本身如何发送的就不多说了。

Exchange

封装了网络客户端的发送逻辑,包括:

  • HeaderExchangeChannel
    对 Request 的序列化
  • ReferenceCountExchangeClient
    无引用时自动关闭客户端
  • HeaderExchangeClient
    心跳检测

数据编码 & 发送

DubboCodec
NettyChannel#send

Provider端接受调用的过程

  1. 接收请求
    NettyClient
    请求被接收后,通过 Netty 调用链向下传递执行
    NettyHandler#messageReceived
    NettyChannel
  2. 解码
    ExchangeCodec
  3. 线程派发
    Dispatcher
    IO 线程接收请求后分发给事件处理线程执行,具体的派发逻辑在ChannelHandler中实现,比如AllChannelHandler
  4. 请求分发
    ChannelEventRunnable
    根据请求类型将请求分发给不同的ChannelHandler处理。

Provider 端响应

Consumer 端接收响应

  1. 发送完请求后阻塞
    HeaderExchangeHandler
    用户线程在发送完请求后,会调用 DefaultFutureget 方法等待响应对象的到来,这时每个DefaultFuture都会关联一个调用编号,用于在接收到响应时能对应上请求的DefaultFuture
    当响应对象到来后,IO 线程根据调用编号可以找到DefaultFuture,之后会将响应对象保存到DefaultFuture,并唤醒用户线程。