如何打造一个Dubbo网关--软分组

微服务架构下,一个完整的业务通常是被拆分成数个甚至数十个子项目,每个子项目独立运行,一组人员负责一个或数个子项目

在这种情况下并行开发不可避免,我们假设这么一种情况:red组要做一个需求评估需要改动A、B、C三个项目;blue组也要做一个需求评估需要改动B、C、D三个项目;yellow组也要做一个需求评估需要改动E、F这两个项目。这种情况下B、C项目的发布权限给谁?又如何保证E、F能调用到正确的A、B、C、D的接口呢?

  • 硬分组:所有资源物理隔离,也就是说开多个完整的dev环境,成本太高多数情况下都是不能接受的
  • 软分组:将开发环境进行逻辑分组,完整的dev环境默认是stable分组,再添加red、blue、yellow三个分组,只包含各自要开发的项目

在软分组下,流量进入网关后如果无分组标记则走stable分组,否则按分组标记走。本篇内容主要讲述SPI扩展网关软分组。以下代码片段全部来自于plume

SPI机制

SPI(Service Provider Interface)是用来提供接口实现类的一种方式,这样就不用在代码里硬编码实现类里,只需要按规则加一个文件即可

Java

Java本身又一套自己的SPI机制,主要就是为了解决上面说的硬编码实现类的问题,提供实现类的可插拔

首先新增一个文件,目录固定META-INF/services,文件名为接口全名

下面的示例是在网关中添加一个限流接口CurrentLimit的令牌桶限流实现

META-INF/services/net.dloud.platform.common.provider.CurrentLimit

文件里的内容如下,写入实现类的全名即可

1
net.dloud.platform.gateway.spi.SimpleCurrentLimit

使用时通过ServiceLoader获取Iterator即可遍历所有定义的实现类

ServiceLoader.load(fullClassName).iterator()

Java的SPI主要有以下几个问题:

  1. 不能按需加载,只能去遍历
  2. 所有查找到的实现类都会被实例化
  3. 多线程并发不安全
  4. 只支持空参构造

Dubbo

Dubbo的SPI机制是从Java的SPI机制扩展加强而来的,解决123问题,并且定义方式有些不同:使用key=class_full_name定义

首先新增一个文件,目录固定META-INF/dubbo,文件名同样为接口全名

这里我们想使用netty4作为transport,Dubbo提供了一个dubbo-remoting-netty4,我们看看它是如何做的

META-INF/dubbo/internal/com.alibaba.dubbo.remoting.Transporter

可以看到内部的实现定义在META-INF/dubbo/internal里,文件里的内容如下

1
2
netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter
netty=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter

也就是说如果引入这个包,默认就会直接使用netty4;如果自定义的话,注意这里的key是一个约定值。像transport需要在配置ProtocolConfig时进行指定,具体的每个扩展点在其文档里可以查找到

1
protocolConfig.setTransporter("netty4");

方案说明

Dubbo是接口级的注册,所以我们自定义的分组里的每个服务都是一组Provider;Dubbo在调用时会去获取当前接口的所有Provider,按照lb算法选择一个合适的Provider

在软分组的需求下,我们只需要通过SPI自定义一个lb实现即可。这个lb要求在有分组信息时选择到特定的节点,为了实现lb有以下三件事必须去做:

  1. 当前服务,无论是业务系统还是网关,都必须知道下一步要调用的业务系统的所有节点和其分组
  2. 根据分组的节点信息和输入分组对Provider进行过滤,如果不为空则使用新的Provider列表,否则使用stable分组的Provider列表
  3. 如果过滤后的分组列表仍然有多个,则选择一种lb算法进行后直接调用,否则直接调用

分组信息

分组信息为了实时性需求,需要使用Zookeeper的临时节点,毕竟选择到一个失效节点还需要重试。在这里有两种实现方式:

  1. 所有系统处理所有分组信息,每个系统里都有一份完整的关于所有节点的分组信息cache
  2. 网关处理所有分组,其它系统只处理所在分组的信息;当输入分组不是stable时,网关将相关分组的所有节点数据写入调用上下文,一直传递下去

因为非stable分组相关的节点信息一般较少,这里我们选择2,理论上可以降低一部分Zookeeper开销,但其实差别没有很大

网关获取

Zookeeper定义一个名为/dubbo_group的path,网关首先需要监听这个path下的所有子path的增减,还需要监听所有存在的子path下的所有临时节点的增减

使用当前节点的ip:dubbo_port作为Zookeeper的临时节点名,这样来确保唯一

在容器启动完成后,首先对/dubbo_group进行监听,这里使用curator进行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 根path,网关使用
public static final String DUBBO_ZK_PATH = "/dubbo_group";
// 自身所在的分组path,其它系统使用
public static final String DUBBO_GROUP_PATH = DUBBO_ZK_PATH + "/" + PlatformConstants.GROUP;
// 所有分组信息的存放位置
public static Map<String, Map<String, StartupTime>> dubboProvider = new HashMap<>();

...

@Override
public void onApplicationEvent(ContextRefreshedEvent refreshedEvent) {
try {
// 使用PathChlidrenCache来监听子path
PathChildrenCache cache = new PathChildrenCache(curatorClient, DUBBO_ZK_PATH, false);
cache.start();
cache.getListenable().addListener((client, event) -> {
final ChildData data = event.getData();
switch (event.getType()) {
// 新增事件
case CHILD_ADDED:
final String add = data.getPath().replaceFirst(DUBBO_ZK_PATH + "/", "");
// add是gorup
final String basePath = DUBBO_ZK_PATH + "/" + add;
// 当有分组新增时,对其子节点同样要添加listener
CuratorWrapper.addListeners(() -> CuratorWrapper.childrenCache(DubboWrapper.dubboListener
(add, basePath), null, basePath));
log.info("[GATEWAY] 添加DUBBO GROUP: {}", add);
break;
// 移除事件
case CHILD_REMOVED:
final String del = data.getPath().replaceFirst(DUBBO_ZK_PATH + "/", "");
// del是group
dubboProvider.remove(del);
log.info("[GATEWAY] 移除DUBBO GROUP: {}", del);
break;
default:
log.info("[GATEWAY] DUBBO GROUP节点状态: {}", event.getType());
}
});
} catch (Exception e) {
log.error("[GATEWAY] 监听DUBBO GROUP节点失败: {}", e.getMessage());
}
}

我们来详细看一下CuratorWrapper是如何实现的

ListenerExecutor就是一个SAM接口,里面存放已存在的Listener定义,之所以存在因为在Zookeeper重连后需要重设已存在的Listener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private static Set<ListenerExecutor> listenerExecutors;

/**
* 设置或重设所有Listener
*/
private static void setListeners() {
if (null == listenerExecutors || listenerExecutors.isEmpty()) {
return;
}

for (ListenerExecutor executor : listenerExecutors) {
executor.execute();
}
}

/**
* 添加一个Listener
*/
public static void addListeners(ListenerExecutor executor) {
// 不存在就创建
if (null == listenerExecutors) {
listenerExecutors = new ConcurrentHashSet<>();
}
// 添加时需要先设置一次,之后加入缓存
executor.execute();
final boolean add = listenerExecutors.add(executor);
if (!add) {
log.error("[{}] ZK添加监听器失败", PlatformConstants.APPNAME);
}
}

/**
* 对子path添加Listener
*
* @param cacheListener 要添加的Listener
* @param currentPath 当前节点的完整path,格式如下:/dubbo_group/group_name/ip:dubbo_port
* @param basePath Listener会作用的path
*/
public static void childrenCache(PathChildrenCacheListener cacheListener, String currentPath, String basePath) {
try {
// 如果currentPath存在需要初始化
if (StringUtil.notBlank(currentPath)) {
// 判断是否存在,存在的话先删除后新建
final Stat stat = curatorClient.checkExists().forPath(currentPath);
if (null != stat) {
log.info("[{}] 当前节点({})已存在, 删除重建", PlatformConstants.APPNAME, currentPath);
curatorClient.delete().forPath(currentPath);
}
DubboWrapper.nodeCreate(curatorClient, currentPath);
}
// 同样通过PathChildrenCache监听子path里的临时节点
PathChildrenCache cache = new PathChildrenCache(curatorClient, basePath, false);
cache.start();
cache.getListenable().addListener(cacheListener);
} catch (Exception e) {
log.error("[{}] 新建CURATOR节点或监听器失败: {}, {}, {}", PlatformConstants.APPNAME, currentPath, basePath, e.getMessage());
}
}

临时节点的Listener通过DubboWrapper.dubboListener创建,这个逻辑和其它系统一致,我们在下面说明

其它系统获取

其它系统监听逻辑基本一致,只是范围小了很多,只需要在容器启动完成后,对/dubbo_group/now_group进行监听即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 自身所在的分组path,其它系统使用
public static final String DUBBO_GROUP_PATH = DUBBO_ZK_PATH + "/" + PlatformConstants.GROUP;

public static String currentPath() {
return DUBBO_GROUP_PATH + "/" + CuratorWrapper.currentPath(StartupConstants.RUN_HOST, StartupConstants.DUBBO_PORT);
}

...

@Bean("dubboListener")
public ApplicationListener<ContextRefreshedEvent> dubboListener() {
return (event) -> {
final String currentPath = DubboWrapper.currentPath();
log.info("[PLATFORM] DUBBO初始化GROUP使用地址: {}", currentPath);

try {
CuratorWrapper.addListeners(() -> CuratorWrapper.childrenCache(DubboWrapper.dubboListener
(PlatformConstants.GROUP, DUBBO_GROUP_PATH), currentPath, DUBBO_GROUP_PATH));
} catch (Exception e) {
log.error("[PLATFORM] DUBBO初始化GROUP失败: {}, {}", currentPath, e.getMessage());
}
};
}

下面的是临时节点Listener执行的具体逻辑,对于临时节点的监听,所有系统包括网关都一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 针对子path里临时节点的Listener
*
* @param group 监听的group
* @param basePath Listener会作用的path,已经包含了group
*/
public static PathChildrenCacheListener dubboListener(String group, String basePath) {
// 查看group相关的缓存的数据是否存在,不存在则创建
final Map<String, StartupTime> availableProvider = dubboProvider.getOrDefault(group, new ConcurrentHashMap<>());
if (!dubboProvider.containsKey(group)) {
dubboProvider.put(group, availableProvider);
}

// 具体的PathChildrenCacheListener
return (client, event) -> {
final ChildData data = event.getData();
switch (event.getType()) {
// 初始化事件
case INITIALIZED:
log.info("[PLATFORM] 初始化GROUP[{}]", group);
break;
// 临时节点新增
case CHILD_ADDED:
// add是host:dubbo_port
final String add = data.getPath().replaceFirst(basePath + "/", "");
// 这里使用了Kryo,不重要,只是一种序列化手段
final StartupTime startup = KryoBaseUtil.readObjectFromByteArray(client.getData().forPath(data.getPath()), StartupTime.class);
availableProvider.put(add, startup);
log.info("[PLATFORM] 节点[{}]加入GROUP[{}]", add, group);
break;
case CHILD_REMOVED:
// del是host:dubbo_port
final String del = data.getPath().replaceFirst(basePath + "/", "");
// 判断curator client是否可用
if (client.getState() == CuratorFrameworkState.STARTED) {
// 判断临时节点是否存在,确认不存在后再移除;否则有可能出现节点闪断重连,但是还是被删除了
final Stat stat = client.checkExists().forPath(data.getPath());
if (null == stat) {
availableProvider.remove(del);
log.info("[PLATFORM] 节点[{}]移出GROUP[{}]", del, group);
// 如果判断移除的是自己,需要进行重建
final String currentPath = currentPath();
if (Objects.equals(currentPath, data.getPath())) {
final Stat now = client.checkExists().forPath(currentPath);
if (null == now) {
log.info("[{}] 节点({})不存在, 需要重建", PlatformConstants.APPNAME, currentPath);
nodeCreate(client, currentPath);
}
}
}
}
break;
default:
log.info("[PLATFORM] 当前节点状态: {}", event.getType());
}
};
}

调用上下文

网关需要将非stable调用的分组信息写入调用上下文,这里需要借助Dubbo的RpcContent

这里就是在RpcContext中写入了租户、分组信息;并且如果DEFAULT_GROUP=stable!=inputGroup时只将host:dubbo_port信息所组成的set写入其中

1
2
3
4
5
6
7
8
9
public void setRpcContext(String inputTenant, String inputGroup, String proof) {
final RpcContext context = RpcContext.getContext();
context.setAttachment(PlatformConstants.PROOF_KEY, proof);
context.setAttachment(PlatformConstants.SUBGROUP_KEY, inputGroup);
context.setAttachment(PlatformConstants.FROM_KEY, inputTenant);
if (!PlatformConstants.DEFAULT_GROUP.equals(inputGroup) && CollectionUtil.notEmpty(dubboProvider.get(inputGroup))) {
context.setAttachment(PlatformConstants.HANDGROUP_KEY, Protostuff.collection2String(dubboProvider.get(inputGroup).keySet()));
}
}

负载均衡

通过上面对Zookeeper的监听和RpcContext传递,已经可以确保在调用时在每个系统中获取到正确且实时的分组信息了,下面就剩下SPI的实现了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@Slf4j
@Activate(value = "soft", order = 10)
public class SoftLoadBalanceImpl implements LoadBalance {
private RandomLoadBalance random;

private RoundRobinLoadBalance roundRobin;

private LeastActiveLoadBalance leastActive;

private ConsistentHashLoadBalance consistentHash;

@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
int length = invokers.size();
boolean isSame = false;
// 日志使用
String methodName = invocation.getMethodName();
String proof = RpcContext.getContext().getAttachment(PlatformConstants.PROOF_KEY);
// 输入的分组
String subGroup = RpcContext.getContext().getAttachment(PlatformConstants.SUBGROUP_KEY);
// 可用的provider,过滤使用
Set<String> availableProvider = null;
// 过滤后的,所有可用的invoker
List<Invoker<T>> invokerList = Lists.newArrayListWithExpectedSize(length);

// 判断输入分组和当前所在分组是否相同
if (null == subGroup) {
isSame = true;
subGroup = PlatformConstants.GROUP;
}
if (!isSame && PlatformConstants.GROUP.equals(subGroup)) {
isSame = true;
}

if (isSame) {
// 如果相同,只需要从dubboProvider选择做为过滤条件即可
availableProvider = CollectionUtil.isEmpty(dubboProvider.get(subGroup)) ? null : dubboProvider.get(subGroup).keySet();
} else {
// 如果不同,从上下文里解析做为过滤条件
String handGroup = RpcContext.getContext().getAttachment(PlatformConstants.HANDGROUP_KEY);
if (null != handGroup) {
try {
availableProvider = new HashSet<>(Protostuff.string2Collection(handGroup));
log.info("[PLATFORM] 凭证:{} | 获取附加分组列表: {}", proof, availableProvider);
} catch (Exception e) {
log.warn("[PLATFORM] 凭证:{} | 获取附加分组列表失败: {}", proof, e.getMessage());
log.warn("[PLATFORM] 凭证:" + proof + " | 获取附加分组列表失败:", e);
}
}
// 如果上下文里没有,还是从dubboProvider选择做为过滤条件,确保从stable仍然进入stable
if (CollectionUtil.isEmpty(availableProvider)) {
log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 输入分组[{}]无可用服务,调用默认服务分组[{}]", proof, methodName, subGroup, PlatformConstants.GROUP);
availableProvider = null == dubboProvider.get(subGroup) ? null : dubboProvider.get(subGroup).keySet();
}
}

if (CollectionUtil.isEmpty(availableProvider)) {
// 如果还是没有找到可用的过滤列表,直接使用dubbo提供的全部invokers
invokerList = invokers;
log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 所有分组[{}]无可用服务,使用默认提供者", proof, methodName, subGroup);
} else {
if (length > 1) {
// 将dubbo提供的所有invoker转换,供下边过滤和选取使用
final Map<String, Invoker<T>> dubboInvokers = Maps.newHashMapWithExpectedSize(length);
for (int i = 0; i < length; i++) {
final Invoker<T> invoker = invokers.get(i);
final URL invokeUrl = invoker.getUrl();
dubboInvokers.put(invokeUrl.getAddress(), invoker);
}
// 使用set过滤
final Set<String> dubboProvider = dubboInvokers.keySet();
dubboProvider.retainAll(availableProvider);
log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 使用分组[{}], 分组可用列表: {}", proof, methodName, subGroup, dubboProvider);

if (dubboProvider.isEmpty()) {
// 过滤完成后全都不符合条件,没得选,使用全部
invokerList.addAll(invokers);
log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 分组[{}]无可用服务,使用所有提供者", proof, methodName, subGroup);
} else {
for (Iterator<String> it = dubboProvider.iterator(); it.hasNext(); ) {
final String next = it.next();
final Invoker<T> invoker = dubboInvokers.get(next);
if (null != invoker) {
invokerList.add(invoker);
}
}
}
// 选择完成后全都不符合条件,没得选,使用全部
if (invokerList.isEmpty()) {
invokerList.addAll(invokers);
log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 分组[{}]无可用服务,使用所有提供者", proof, methodName, subGroup);
}
} else if (length == 1) {
// 只有一个没得选,必须调用
invokerList = invokers;
log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 只有这一个可用服务", proof, methodName);
} else {
log.error("[PLATFORM] 凭证:{} | 路由[DUBBO]服务失败: {}, 无提供者", proof, methodName);
}
}

// 通过dubbo自带的lb策略选定一个具体的invoker
switch (PlatformConstants.DUBBO_LOAD_BALANCE) {
case RoundRobinLoadBalance.NAME:
if (null == roundRobin)
roundRobin = new RoundRobinLoadBalance();
return roundRobin.select(invokerList, url, invocation);
case LeastActiveLoadBalance.NAME:
if (null == leastActive)
leastActive = new LeastActiveLoadBalance();
return leastActive.select(invokerList, url, invocation);
default:
if (null == random)
random = new RandomLoadBalance();
return random.select(invokerList, url, invocation);
}
}
}

代码上的注释说的很清楚了,就是一个对比加过滤的过程

这里使用了@Activate注解来进行条件激活,当配置的lb策略为soft时才会使用并实例化,可以对ProviderConfigConsumerConfig进行默认配置

1
2
providerConfig.setLoadbalance("soft");
consumerConfig.setLoadbalance("soft");

如何打造一个Dubbo网关--软分组
https://back.pub/post/hh-code-dubbo-gateway-2/
作者
Dash
发布于
2018年11月4日
许可协议