1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
public Mono<ApiResponse> doApi(RequestInfo request, String inputProof, String getToken, String inputTenant, String inputGroup, List<String> invokeNames, List<Map<String, Object>> inputParams, List<Map<String, Object>> inputAttaches, boolean fromPath) { final String proof = null == inputProof ? UUID.randomUUID().toString() : inputProof; gatewayCache.setRpcContext(inputTenant, inputGroup, proof); final ApiResponse response = new ApiResponse(true, proof); log.info("[GATEWAY] 来源: {} | 分组: {} | 调用方法: {} | 输入参数: {} | 凭证: {}", inputTenant, inputGroup, invokeNames, inputParams, proof); AssertWrapper.isTrue(null != inputTenant && null != inputGroup, "调用方法输入错误!");
int size = invokeNames.size(); Map<String, TrackBean> tracks = Maps.newHashMapWithExpectedSize(size);
InvokeCache paramCache = doCache(inputGroup, inputParams, invokeNames); Map<String, Object> memberInfo = doMember(inputTenant, inputGroup, paramCache, getToken, "");
if (!paramCache.isWhitelist()) { if (!currentLimit.tryConsume(request, memberInfo)) { throw new PassedException(PlatformExceptionEnum.API_ACCESS_LIMIT); } }
final Map<String, Integer> needs = paramCache.getNeedCaches(); final List<InvokeDetailCache> caches = paramCache.getInvokeDetails(); final Mono<List<Object>> outcome = Flux.fromStream(() -> IntStream.range(0, size).mapToObj(i -> { final long start = System.currentTimeMillis(); final String invokeName = invokeNames.get(i); final Map<String, Object> inputParam = null == inputParams.get(i) ? Collections.emptyMap() : inputParams.get(i); final Map<String, Object> inputAttach = (null == inputAttaches || inputAttaches.size() <= i) ? null : inputAttaches.get(i);
final InvokeDetailCache invokeDetailCache = caches.get(i); AssertWrapper.notNull(invokeName, "调用方法名不能为空"); AssertWrapper.notNull(invokeDetailCache, "调用方法名未找到");
final TrackBean trackInfo = new TrackBean(0, inputParam, null); if (Objects.nonNull(invokeDetailCache.getIsTrack()) && invokeDetailCache.getIsTrack()) { tracks.put(invokeName, trackInfo); }
final Object result; final String needKey = invokeName + methodSplit + inputParam.size(); final Integer cacheTime = needs.getOrDefault(needKey, 0); if (cacheTime > 0) { final Object value = methodResultCache.getValue(needKey, inputParam, inputTenant, inputGroup, getToken); if (null == value) { result = doResult(request, getToken, invokeName, inputParam, inputAttach, memberInfo, invokeDetailCache, proof); methodResultCache.setValue(needKey, inputParam, inputTenant, inputGroup, getToken, result, cacheTime); } else { result = value; tracks.remove(invokeName); } } else { result = doResult(request, getToken, invokeName, inputParam, inputAttach, memberInfo, invokeDetailCache, proof); } trackInfo.setTime(System.currentTimeMillis() - start); log.debug("[GATEWAY] 此次调用用时: {} | 凭证: {}", trackInfo, proof); return result; })).collectList();
return outcome.map(list -> { if (fromPath) { if (list.isEmpty()) { response.exception(PlatformExceptionEnum.RESULT_ERROR); } else { response.setPreload(list.get(0)); } } else { response.setPreload(list); } if (!fromPath && response.getCode() != 0) { response.setPreload(Collections.singleton(response.getPreload())); return response; } log.debug("[GATEWAY] 来源: {} | 分组: {} | 调用方法: {} | 返回结果: {} | 凭证: {}", inputTenant, inputGroup, invokeNames, response.getPreload(), proof);
gatewayTrack.keepData(request, response, memberInfo, tracks, inputGroup, inputTenant); return response; }); }
|