微服务架构下,一个完整的业务通常是被拆分成数个甚至数十个子项目,每个子项目独立运行,一组人员负责一个或数个子项目
在这种情况下并行开发不可避免,我们假设这么一种情况:red组要做一个需求评估需要改动A、B、C三个项目;blue组也要做一个需求评估需要改动B、C、D三个项目;yellow组也要做一个需求评估需要改动E、F这两个项目。这种情况下B、C项目的发布权限给谁?又如何保证E、F能调用到正确的A、B、C、D的接口呢?
- 硬分组:所有资源物理隔离,也就是说开多个完整的dev环境,成本太高多数情况下都是不能接受的
- 软分组:将开发环境进行逻辑分组,完整的dev环境默认是stable分组,再添加red、blue、yellow三个分组,只包含各自要开发的项目
在软分组下,流量进入网关后如果无分组标记则走stable分组,否则按分组标记走。本篇内容主要讲述SPI扩展和网关软分组。以下代码片段全部来自于plume
SPI机制
SPI(Service Provider Interface)是用来提供接口实现类的一种方式,这样就不用在代码里硬编码实现类里,只需要按规则加一个文件即可
Java
Java本身又一套自己的SPI机制,主要就是为了解决上面说的硬编码实现类的问题,提供实现类的可插拔
首先新增一个文件,目录固定META-INF/services
,文件名为接口全名
下面的示例是在网关中添加一个限流接口CurrentLimit
的令牌桶限流实现
META-INF/services/net.dloud.platform.common.provider.CurrentLimit
文件里的内容如下,写入实现类的全名即可
1
| net.dloud.platform.gateway.spi.SimpleCurrentLimit
|
使用时通过ServiceLoader
获取Iterator
即可遍历所有定义的实现类
ServiceLoader.load(fullClassName).iterator()
Java的SPI主要有以下几个问题:
- 不能按需加载,只能去遍历
- 所有查找到的实现类都会被实例化
- 多线程并发不安全
- 只支持空参构造
Dubbo
Dubbo的SPI机制是从Java的SPI机制扩展加强而来的,解决123问题,并且定义方式有些不同:使用key=class_full_name
定义
首先新增一个文件,目录固定META-INF/dubbo
,文件名同样为接口全名
这里我们想使用netty4作为transport,Dubbo提供了一个dubbo-remoting-netty4,我们看看它是如何做的
META-INF/dubbo/internal/com.alibaba.dubbo.remoting.Transporter
可以看到内部的实现定义在META-INF/dubbo/internal
里,文件里的内容如下
1 2
| netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter netty=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter
|
也就是说如果引入这个包,默认就会直接使用netty4;如果自定义的话,注意这里的key是一个约定值。像transport需要在配置ProtocolConfig
时进行指定,具体的每个扩展点在其文档里可以查找到
1
| protocolConfig.setTransporter("netty4");
|
方案说明
Dubbo是接口级的注册,所以我们自定义的分组里的每个服务都是一组Provider
;Dubbo在调用时会去获取当前接口的所有Provider
,按照lb算法选择一个合适的Provider
在软分组的需求下,我们只需要通过SPI自定义一个lb实现即可。这个lb要求在有分组信息时选择到特定的节点,为了实现lb有以下三件事必须去做:
- 当前服务,无论是业务系统还是网关,都必须知道下一步要调用的业务系统的所有节点和其分组
- 根据分组的节点信息和输入分组对
Provider
进行过滤,如果不为空则使用新的Provider
列表,否则使用stable分组的Provider
列表
- 如果过滤后的分组列表仍然有多个,则选择一种lb算法进行后直接调用,否则直接调用
分组信息
分组信息为了实时性需求,需要使用Zookeeper
的临时节点,毕竟选择到一个失效节点还需要重试。在这里有两种实现方式:
- 所有系统处理所有分组信息,每个系统里都有一份完整的关于所有节点的分组信息cache
- 网关处理所有分组,其它系统只处理所在分组的信息;当输入分组不是stable时,网关将相关分组的所有节点数据写入调用上下文,一直传递下去
因为非stable分组相关的节点信息一般较少,这里我们选择2,理论上可以降低一部分Zookeeper
开销,但其实差别没有很大
网关获取
在Zookeeper
定义一个名为/dubbo_group
的path,网关首先需要监听这个path下的所有子path的增减,还需要监听所有存在的子path下的所有临时节点的增减
使用当前节点的ip:dubbo_port
作为Zookeeper
的临时节点名,这样来确保唯一
在容器启动完成后,首先对/dubbo_group
进行监听,这里使用curator进行操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public static final String DUBBO_ZK_PATH = "/dubbo_group";
public static final String DUBBO_GROUP_PATH = DUBBO_ZK_PATH + "/" + PlatformConstants.GROUP;
public static Map<String, Map<String, StartupTime>> dubboProvider = new HashMap<>();
...
@Override public void onApplicationEvent(ContextRefreshedEvent refreshedEvent) { try { PathChildrenCache cache = new PathChildrenCache(curatorClient, DUBBO_ZK_PATH, false); cache.start(); cache.getListenable().addListener((client, event) -> { final ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: final String add = data.getPath().replaceFirst(DUBBO_ZK_PATH + "/", ""); final String basePath = DUBBO_ZK_PATH + "/" + add; CuratorWrapper.addListeners(() -> CuratorWrapper.childrenCache(DubboWrapper.dubboListener (add, basePath), null, basePath)); log.info("[GATEWAY] 添加DUBBO GROUP: {}", add); break; case CHILD_REMOVED: final String del = data.getPath().replaceFirst(DUBBO_ZK_PATH + "/", ""); dubboProvider.remove(del); log.info("[GATEWAY] 移除DUBBO GROUP: {}", del); break; default: log.info("[GATEWAY] DUBBO GROUP节点状态: {}", event.getType()); } }); } catch (Exception e) { log.error("[GATEWAY] 监听DUBBO GROUP节点失败: {}", e.getMessage()); } }
|
我们来详细看一下CuratorWrapper
是如何实现的
ListenerExecutor
就是一个SAM接口,里面存放已存在的Listener
定义,之所以存在因为在Zookeeper
重连后需要重设已存在的Listener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| private static Set<ListenerExecutor> listenerExecutors;
private static void setListeners() { if (null == listenerExecutors || listenerExecutors.isEmpty()) { return; }
for (ListenerExecutor executor : listenerExecutors) { executor.execute(); } }
public static void addListeners(ListenerExecutor executor) { if (null == listenerExecutors) { listenerExecutors = new ConcurrentHashSet<>(); } executor.execute(); final boolean add = listenerExecutors.add(executor); if (!add) { log.error("[{}] ZK添加监听器失败", PlatformConstants.APPNAME); } }
public static void childrenCache(PathChildrenCacheListener cacheListener, String currentPath, String basePath) { try { if (StringUtil.notBlank(currentPath)) { final Stat stat = curatorClient.checkExists().forPath(currentPath); if (null != stat) { log.info("[{}] 当前节点({})已存在, 删除重建", PlatformConstants.APPNAME, currentPath); curatorClient.delete().forPath(currentPath); } DubboWrapper.nodeCreate(curatorClient, currentPath); } PathChildrenCache cache = new PathChildrenCache(curatorClient, basePath, false); cache.start(); cache.getListenable().addListener(cacheListener); } catch (Exception e) { log.error("[{}] 新建CURATOR节点或监听器失败: {}, {}, {}", PlatformConstants.APPNAME, currentPath, basePath, e.getMessage()); } }
|
临时节点的Listener
通过DubboWrapper.dubboListener
创建,这个逻辑和其它系统一致,我们在下面说明
其它系统获取
其它系统监听逻辑基本一致,只是范围小了很多,只需要在容器启动完成后,对/dubbo_group/now_group
进行监听即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public static final String DUBBO_GROUP_PATH = DUBBO_ZK_PATH + "/" + PlatformConstants.GROUP;
public static String currentPath() { return DUBBO_GROUP_PATH + "/" + CuratorWrapper.currentPath(StartupConstants.RUN_HOST, StartupConstants.DUBBO_PORT); }
...
@Bean("dubboListener") public ApplicationListener<ContextRefreshedEvent> dubboListener() { return (event) -> { final String currentPath = DubboWrapper.currentPath(); log.info("[PLATFORM] DUBBO初始化GROUP使用地址: {}", currentPath);
try { CuratorWrapper.addListeners(() -> CuratorWrapper.childrenCache(DubboWrapper.dubboListener (PlatformConstants.GROUP, DUBBO_GROUP_PATH), currentPath, DUBBO_GROUP_PATH)); } catch (Exception e) { log.error("[PLATFORM] DUBBO初始化GROUP失败: {}, {}", currentPath, e.getMessage()); } }; }
|
下面的是临时节点Listener
执行的具体逻辑,对于临时节点的监听,所有系统包括网关都一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
public static PathChildrenCacheListener dubboListener(String group, String basePath) { final Map<String, StartupTime> availableProvider = dubboProvider.getOrDefault(group, new ConcurrentHashMap<>()); if (!dubboProvider.containsKey(group)) { dubboProvider.put(group, availableProvider); }
return (client, event) -> { final ChildData data = event.getData(); switch (event.getType()) { case INITIALIZED: log.info("[PLATFORM] 初始化GROUP[{}]", group); break; case CHILD_ADDED: final String add = data.getPath().replaceFirst(basePath + "/", ""); final StartupTime startup = KryoBaseUtil.readObjectFromByteArray(client.getData().forPath(data.getPath()), StartupTime.class); availableProvider.put(add, startup); log.info("[PLATFORM] 节点[{}]加入GROUP[{}]", add, group); break; case CHILD_REMOVED: final String del = data.getPath().replaceFirst(basePath + "/", ""); if (client.getState() == CuratorFrameworkState.STARTED) { final Stat stat = client.checkExists().forPath(data.getPath()); if (null == stat) { availableProvider.remove(del); log.info("[PLATFORM] 节点[{}]移出GROUP[{}]", del, group); final String currentPath = currentPath(); if (Objects.equals(currentPath, data.getPath())) { final Stat now = client.checkExists().forPath(currentPath); if (null == now) { log.info("[{}] 节点({})不存在, 需要重建", PlatformConstants.APPNAME, currentPath); nodeCreate(client, currentPath); } } } } break; default: log.info("[PLATFORM] 当前节点状态: {}", event.getType()); } }; }
|
调用上下文
网关需要将非stable调用的分组信息写入调用上下文,这里需要借助Dubbo的RpcContent
这里就是在RpcContext
中写入了租户、分组信息;并且如果DEFAULT_GROUP=stable
!=inputGroup
时只将host:dubbo_port
信息所组成的set写入其中
1 2 3 4 5 6 7 8 9
| public void setRpcContext(String inputTenant, String inputGroup, String proof) { final RpcContext context = RpcContext.getContext(); context.setAttachment(PlatformConstants.PROOF_KEY, proof); context.setAttachment(PlatformConstants.SUBGROUP_KEY, inputGroup); context.setAttachment(PlatformConstants.FROM_KEY, inputTenant); if (!PlatformConstants.DEFAULT_GROUP.equals(inputGroup) && CollectionUtil.notEmpty(dubboProvider.get(inputGroup))) { context.setAttachment(PlatformConstants.HANDGROUP_KEY, Protostuff.collection2String(dubboProvider.get(inputGroup).keySet())); } }
|
负载均衡
通过上面对Zookeeper
的监听和RpcContext
传递,已经可以确保在调用时在每个系统中获取到正确且实时的分组信息了,下面就剩下SPI的实现了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| @Slf4j @Activate(value = "soft", order = 10) public class SoftLoadBalanceImpl implements LoadBalance { private RandomLoadBalance random;
private RoundRobinLoadBalance roundRobin;
private LeastActiveLoadBalance leastActive;
private ConsistentHashLoadBalance consistentHash;
@Override public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { int length = invokers.size(); boolean isSame = false; String methodName = invocation.getMethodName(); String proof = RpcContext.getContext().getAttachment(PlatformConstants.PROOF_KEY); String subGroup = RpcContext.getContext().getAttachment(PlatformConstants.SUBGROUP_KEY); Set<String> availableProvider = null; List<Invoker<T>> invokerList = Lists.newArrayListWithExpectedSize(length);
if (null == subGroup) { isSame = true; subGroup = PlatformConstants.GROUP; } if (!isSame && PlatformConstants.GROUP.equals(subGroup)) { isSame = true; }
if (isSame) { availableProvider = CollectionUtil.isEmpty(dubboProvider.get(subGroup)) ? null : dubboProvider.get(subGroup).keySet(); } else { String handGroup = RpcContext.getContext().getAttachment(PlatformConstants.HANDGROUP_KEY); if (null != handGroup) { try { availableProvider = new HashSet<>(Protostuff.string2Collection(handGroup)); log.info("[PLATFORM] 凭证:{} | 获取附加分组列表: {}", proof, availableProvider); } catch (Exception e) { log.warn("[PLATFORM] 凭证:{} | 获取附加分组列表失败: {}", proof, e.getMessage()); log.warn("[PLATFORM] 凭证:" + proof + " | 获取附加分组列表失败:", e); } } if (CollectionUtil.isEmpty(availableProvider)) { log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 输入分组[{}]无可用服务,调用默认服务分组[{}]", proof, methodName, subGroup, PlatformConstants.GROUP); availableProvider = null == dubboProvider.get(subGroup) ? null : dubboProvider.get(subGroup).keySet(); } }
if (CollectionUtil.isEmpty(availableProvider)) { invokerList = invokers; log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 所有分组[{}]无可用服务,使用默认提供者", proof, methodName, subGroup); } else { if (length > 1) { final Map<String, Invoker<T>> dubboInvokers = Maps.newHashMapWithExpectedSize(length); for (int i = 0; i < length; i++) { final Invoker<T> invoker = invokers.get(i); final URL invokeUrl = invoker.getUrl(); dubboInvokers.put(invokeUrl.getAddress(), invoker); } final Set<String> dubboProvider = dubboInvokers.keySet(); dubboProvider.retainAll(availableProvider); log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 使用分组[{}], 分组可用列表: {}", proof, methodName, subGroup, dubboProvider);
if (dubboProvider.isEmpty()) { invokerList.addAll(invokers); log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 分组[{}]无可用服务,使用所有提供者", proof, methodName, subGroup); } else { for (Iterator<String> it = dubboProvider.iterator(); it.hasNext(); ) { final String next = it.next(); final Invoker<T> invoker = dubboInvokers.get(next); if (null != invoker) { invokerList.add(invoker); } } } if (invokerList.isEmpty()) { invokerList.addAll(invokers); log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 分组[{}]无可用服务,使用所有提供者", proof, methodName, subGroup); } } else if (length == 1) { invokerList = invokers; log.info("[PLATFORM] 凭证:{} | 路由[DUBBO]服务: {}, 只有这一个可用服务", proof, methodName); } else { log.error("[PLATFORM] 凭证:{} | 路由[DUBBO]服务失败: {}, 无提供者", proof, methodName); } }
switch (PlatformConstants.DUBBO_LOAD_BALANCE) { case RoundRobinLoadBalance.NAME: if (null == roundRobin) roundRobin = new RoundRobinLoadBalance(); return roundRobin.select(invokerList, url, invocation); case LeastActiveLoadBalance.NAME: if (null == leastActive) leastActive = new LeastActiveLoadBalance(); return leastActive.select(invokerList, url, invocation); default: if (null == random) random = new RandomLoadBalance(); return random.select(invokerList, url, invocation); } } }
|
代码上的注释说的很清楚了,就是一个对比加过滤的过程
这里使用了@Activate
注解来进行条件激活,当配置的lb策略为soft
时才会使用并实例化,可以对ProviderConfig
和ConsumerConfig
进行默认配置
1 2
| providerConfig.setLoadbalance("soft"); consumerConfig.setLoadbalance("soft");
|