如何打造一个Dubbo网关--组合调用

通过泛化调用一文我们知道一个接口会有两个唯一值:pathNameinvokeName,通过这两个值定义了两种调用方式

  • 通过pathName的被称为路径调用,入口是/{system}/{clazz}/{method}很明显一次只能调用一个接口
  • 通过invokeName的被称为组合调用,入口时/api,可以一次性调用多个接口,返回一个结果列表

因为网关的实际调用代码就是按照组合调用来的比较复杂,所以将详细的内容放在了软分组、网关注入和平台之后。本篇内容主要讲述调用示例调用完整流程,以下代码片段全部来自于plume

调用示例

我们拿文档插件里的接口来做示例,之前已经说过了网关全部使用post来简化沟通成本

请不要讨论rest风格和这种风格的优劣,这只是基于我们的场景下的最优选择,请根据自己的场景选择

这里需要将MessageFacade变更为MessageTestService。前文提到过非Service无法调用:实际上是非Service无法进行路径调用,但是可以进行组合调用

当然在使用时,具体的pathNameinvokeName以文档为准

路径调用

输入信息,url形式的调用

1
2
3
4
5
6
7
8
9
10
11
12
curl -X POST 'https://gateway_addr/message/messageTest/messageList?group=test' 
--data-raw '
{
"tenant": "test",
"timestamp": 1526217788730,
"token": "test",
"sign": "a102d405477c04e8cc1c0c36f0cdc810a6636da5c9529b18fd97a85f9c389abc",
"param": {
"page": {}
}
}
'

返回的preload是一个值,非数组

1
2
3
4
5
6
7
8
{
"code": 0,
"message": "请求成功...",
"proof": "61c0eaae-2044-46d6-982d-06637eea5a25",
"preload": {
......
}
}

组合调用

输入信息,添加invoke数组,param也变为数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
curl -X POST 'https://gateway_addr/api?group=test' 
--data-raw '
{
"tenant": "test",
"timestamp": 1526217788730,
"token": "test",
"sign": "a102d405477c04e8cc1c0c36f0cdc810a6636da5c9529b18fd97a85f9c389abc",
"invoke": [
"message.messageTestService.messageList"
]
"param": [
{
"page": {}
}
]
}
'

返回的preload是一个数组,里边值的顺序对应调用顺序

1
2
3
4
5
6
7
8
9
10
{
"code": 0,
"message": "请求成功...",
"proof": "61c0eaae-2044-46d6-982d-06637eea5a25",
"preload": [
{
......
}
]
}

主流程

网关是基于spring-cloud-gateway的,所以这里使用webflux,注释较为全面,简单的说一下

  1. 即使是路径调用也会转换成只有一次请求的组合调用,统一两者
  2. 判断入参信息是否合规,并设置rpc上下文,初始化追踪信息map
  3. doCache是从数据库获取单个接口信息的封装,看名字也知道会走缓存
  4. doMember就是在用户注入里的流程,包括对白名单的处理、泛化调用拿到用户信息等。无论一次调用几个接口显然只能是同一个用户
  5. 对非白名单的限流处理,实现时令牌桶。如果有userId则userId作为限流key,否则使用ip
  6. 处理网关缓存,如果接口使用了网关缓存注解:如果缓存里有则从缓存读,否则去执行rpc并放入缓存设置过期
  7. 拼装返回结果,对路径调用和组合调用作区分,一个只有一个值实际结果是map,另一个会有多个值实际结果是list
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* 实际调用流程
*
* @param request http的一些请求信息
* @param inputProof 追踪用
* @param getToken 用户token
* @param inputTenant 租户信息
* @param inputGroup 输入的分组,默认stable
* @param invokeNames 组合调用名,list
* @param inputParams 输入参数,list
* @param inputAttaches 附加参数、list
* @param fromPath 是否是路径调用
*/
public Mono<ApiResponse> doApi(RequestInfo request, String inputProof, String getToken, String inputTenant, String inputGroup,
List<String> invokeNames, List<Map<String, Object>> inputParams, List<Map<String, Object>> inputAttaches, boolean fromPath) {
final String proof = null == inputProof ? UUID.randomUUID().toString() : inputProof;
gatewayCache.setRpcContext(inputTenant, inputGroup, proof);
final ApiResponse response = new ApiResponse(true, proof);
log.info("[GATEWAY] 来源: {} | 分组: {} | 调用方法: {} | 输入参数: {} | 凭证: {}",
inputTenant, inputGroup, invokeNames, inputParams, proof);
AssertWrapper.isTrue(null != inputTenant && null != inputGroup, "调用方法输入错误!");

int size = invokeNames.size();
// 存储接口的追踪信息
Map<String, TrackBean> tracks = Maps.newHashMapWithExpectedSize(size);

// 从数据库中获取输入的接口信息,优先缓存
InvokeCache paramCache = doCache(inputGroup, inputParams, invokeNames);
// 校验白名单及用户信息,会抛出异常直接结束
Map<String, Object> memberInfo = doMember(inputTenant, inputGroup, paramCache, getToken, "");

// 用户或ip维度的限流
if (!paramCache.isWhitelist()) {
if (!currentLimit.tryConsume(request, memberInfo)) {
throw new PassedException(PlatformExceptionEnum.API_ACCESS_LIMIT);
}
}

// 接口是否需要网关缓存,value是缓存时长
final Map<String, Integer> needs = paramCache.getNeedCaches();
// 数据库里的接口信息
final List<InvokeDetailCache> caches = paramCache.getInvokeDetails();
// 并行调用多个接口
final Mono<List<Object>> outcome = Flux.fromStream(() -> IntStream.range(0, size).mapToObj(i -> {
final long start = System.currentTimeMillis();
// 获取具体的接口名
final String invokeName = invokeNames.get(i);
// 获取当前接口的输入参数
final Map<String, Object> inputParam = null == inputParams.get(i) ? Collections.emptyMap() : inputParams.get(i);
// 接口附加信息
final Map<String, Object> inputAttach = (null == inputAttaches || inputAttaches.size() <= i) ? null : inputAttaches.get(i);

// 获取当前接口信息
final InvokeDetailCache invokeDetailCache = caches.get(i);
AssertWrapper.notNull(invokeName, "调用方法名不能为空");
AssertWrapper.notNull(invokeDetailCache, "调用方法名未找到");

// 是否需要追踪接口调用
final TrackBean trackInfo = new TrackBean(0, inputParam, null);
if (Objects.nonNull(invokeDetailCache.getIsTrack()) && invokeDetailCache.getIsTrack()) {
tracks.put(invokeName, trackInfo);
}

final Object result;
// 处理网关缓存,如果配置了缓存时长,并且缓存里有则从缓存读,否则去执行rpc
final String needKey = invokeName + methodSplit + inputParam.size();
final Integer cacheTime = needs.getOrDefault(needKey, 0);
if (cacheTime > 0) {
final Object value = methodResultCache.getValue(needKey, inputParam, inputTenant, inputGroup, getToken);
if (null == value) {
result = doResult(request, getToken, invokeName, inputParam, inputAttach, memberInfo, invokeDetailCache, proof);
methodResultCache.setValue(needKey, inputParam, inputTenant, inputGroup, getToken, result, cacheTime);
} else {
// 缓存里读取不需要追踪
result = value;
tracks.remove(invokeName);
}
} else {
result = doResult(request, getToken, invokeName, inputParam, inputAttach, memberInfo, invokeDetailCache, proof);
}
trackInfo.setTime(System.currentTimeMillis() - start);
log.debug("[GATEWAY] 此次调用用时: {} | 凭证: {}", trackInfo, proof);
return result;
})).collectList();

return outcome.map(list -> {
// 对路径调用和组合调用作区分,两者的结构并不一样
if (fromPath) {
if (list.isEmpty()) {
response.exception(PlatformExceptionEnum.RESULT_ERROR);
} else {
response.setPreload(list.get(0));
}
} else {
response.setPreload(list);
}
if (!fromPath && response.getCode() != 0) {
response.setPreload(Collections.singleton(response.getPreload()));
return response;
}
log.debug("[GATEWAY] 来源: {} | 分组: {} | 调用方法: {} | 返回结果: {} | 凭证: {}",
inputTenant, inputGroup, invokeNames, response.getPreload(), proof);

// 记录需要追踪的接口
gatewayTrack.keepData(request, response, memberInfo, tracks, inputGroup, inputTenant);
return response;
});
}

实际调用

之前已经对doResult进行过分析了,这里来看一下全部的代码,同样简单的说一下

  1. InvokeDetailCache里边就是查询数据库后并放入缓存的数据,包含参数字段名、参数字段类型、注入信息等注解信息
  2. doInject在前文已经分析过了,通过给的用户信息按等级注入用户信息和附加信息
  3. 发起rpc调用的部分在泛化调用里也已经分析过了,catch的地方说明拿到的reference有问题,清理缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 实际发起rpc调用,大部分参数同主流程
*
* @param memberInfo 通过泛化调用拿到的用户信息,可能是空
* @param cache 接口详细信息,dubbo调用要使用
*/
private Object doResult(RequestInfo request, String getToken, String invokeName,
Map<String, Object> inputParam, Map<String, Object> inputAttach,
Map<String, Object> memberInfo, InvokeDetailCache cache, String proof) {
// 分割获取方法名
String methodName = StringUtil.splitLastByDot(invokeName);
// 获取注入信息
final Map<String, InjectionInfo> injects = cache.getInjects();

// 准备拼装输入参数的值
final List<Object> invokeParams = new ArrayList<>();
// 获取要调用方法的所有参数名
final String[] invokeCacheNames = cache.getNames();
for (int j = 0; j < invokeCacheNames.length; j++) {
String getName = invokeCacheNames[j];
// 从入参中获取参数值用于调用
Object getParam = inputParam.get(getName);
// 使用参数名判断是否要注入,如果是进行注入
if (null != injects && CollectionUtil.notEmpty(injects.keySet()) && injects.keySet().contains(getName)) {
getParam = doInject(getToken, getParam, injects.get(getName), memberInfo, inputAttach, request);
}
invokeParams.add(getParam);
}

Object result = null;
GenericService genericService = null;
try {
// 获取dubbo的reference,如果拿不到,清理重试
genericService = gatewayCache.referenceConfig(cache.getClassName()).get();
if (null == genericService) {
gatewayCache.referenceClean(cache.getClassName());
genericService = gatewayCache.referenceConfig(cache.getClassName()).get();
}
log.debug("[GATEWAY] 将调用的方法参数为: {} = {} | 凭证: {} | 引用: {}", methodName, cache, proof, genericService);
// 进行rpc调用并拿到结果
result = genericService.$invoke(methodName, cache.getTypes(), invokeParams.toArray());
} catch (NoSuchMethodError | NullPointerException ex) {
log.error("[GATEWAY] 调用的方法缓存错误,清除缓存: {} = {} | 凭证: {} | 引用: {}", methodName, cache, proof, genericService);
gatewayCache.referenceClean(cache.getClassName());
}
if (null == result) {
throw new PassedException(PlatformExceptionEnum.RESULT_ERROR);
}
return result;
}

如何打造一个Dubbo网关--组合调用
https://back.pub/post/hh-code-dubbo-gateway-8/
作者
Dash
发布于
2019年2月23日
许可协议