扩展spring-cloud-ribbon支持灰度

目标

  • 扩展ribbon完成灰度调用
  • 完成对zuul的支持
  • 完成服务间调用的支持
  • 实战,解决在开发环境,进行开发中的测试,DEBUG.在微服务的模式下,需要在开发者的机器启动大量服务,
    启动大量的服务需要大量的内存和大量的时间,在我们时间的项目开发中,在16G的机器上甚至无法进行调
    和测试相关工作。

思路

ZUUL调用服务

  • 利用eureka的mate-map机制,在服务启动时添加部分元数据信息
1
eureka.instance.metadata-map.developer=qingmu
  • 在访问zuul网关时携带参数developer=qingmu
  • 对zuul Filter 进行扩展,获取到参数中的developer,并设置到threadlocal中
  • 对ribbon进行扩展,重写ZoneAvoidanceRule.choose 方法,返回server之前获取到
    ThreadLocal中预先设置的developer,并获取到allServers,遍历allServers,获取到server的的metadata-map
    判断其中是否有developer=qingmu 如果有则命中,添加进一个新集合,遍历完成之后产生新的集合,使用的新的集合
    完成server选择。如未能命中,则走已有的默认实现。如此便完成了ZUUL对server的灰度调用。

服务调用服务

  • 同样利用eureka的机制
  • 同理zuul会将developer进行传递
  • 当传递到服务时,服务自定义一个拦截器,将参数developer 取出,存入自己的ThreadLocal中,方便后续的feign使用
  • 当服务进行服务调用时
  • 首先我们对feign的拦截器进行扩展,将developer 参数继续传递下去,方便接下来的服务老铁继续使用
  • 其次走对ribbon扩展的相关逻辑。即完成了服务直接的灰度调用

注意

  • 由于使用了threadlocal 变量进行参数隐式传递,Hystrix的ThreadLocal隔离模式是无法使用了。
  • 由于jdk提供的线程池实现,无法进行跨线程池的threadlocal变量传递
  • 所以在进行灰度调用时,可使用信号量隔离模式
  • 设置strategy为SEMAPHORE
1
2

hystrix.command.default.execution.isolation.strategy: SEMAPHORE
  • 也可以使用自定义策略的当时进行threadlocal的传递
  • 继承HystrixConcurrencyStrategy策略类覆写wrapCallable方法即可

  • RibbonHystrixConcurrencyStrategy.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

@Slf4j
public class RibbonHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private HystrixConcurrencyStrategy delegate;

public RibbonHystrixConcurrencyStrategy() {
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof RibbonHystrixConcurrencyStrategy) {
// Welcome to singleton hell...
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance()
.registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
} catch (Exception e) {
log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
}
}

private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher,
HystrixPropertiesStrategy propertiesStrategy) {
if (log.isDebugEnabled()) {
log.debug("Current Hystrix plugins configuration is ["
+ "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
+ eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
log.debug("Registering Sleuth Hystrix Concurrency Strategy.");
}
}

@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return new WrappedCallable<>(callable, RibbonFilterContextHolder.getCurrentContext());
}

@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}

@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}

@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}

@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}

public static class WrappedCallable<T> implements Callable<T> {

private final Callable<T> target;
private final RibbonFilterContext ribbonFilterContext;

public WrappedCallable(Callable<T> target, RibbonFilterContext ribbonFilterContext) {
this.target = target;
this.ribbonFilterContext = ribbonFilterContext;
}

@Override
public T call() throws Exception {
try {
RibbonFilterContextHolder.setCurrentContext(ribbonFilterContext);
return target.call();
} finally {
RibbonFilterContextHolder.clearCurrentContext();
}
}
}
}

代码实现

zuul 拦截器扩展

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// RibbonFilter.java
@Component
public class RibbonFilter extends ZuulFilter {

@Override
public String filterType() {
return PRE_TYPE;
}

@Override
public int filterOrder() {
return FORM_BODY_WRAPPER_FILTER_ORDER;
}

@Override
public boolean shouldFilter() {
return true;
}

@Override
public Object run() {
RibbonFilterContextHolder.clearCurrentContext();
RequestContext ctx = RequestContext.getCurrentContext();
final HttpServletRequest request = ctx.getRequest();
final String requestURI = request.getRequestURI();
if (request.getParameter("developer") != null) {
// put the serviceId in `RequestContext`
RibbonFilterContextHolder.getCurrentContext()
.add("developer", request.getParameter("developer"));
} else if (request.getHeader("developer") != null) {
RibbonFilterContextHolder.getCurrentContext()
.add("developer", request.getHeader("developer"));
}
return true;
}

MetadataAwareRule ribbon 规则覆写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

public class MetadataAwareRule extends ZoneAvoidanceRule {


@Override
public Server choose(Object key) {
final RibbonFilterContext context = RibbonFilterContextHolder.getCurrentContext();
ILoadBalancer lb = getLoadBalancer();
final List<Server> allServers = lb.getAllServers();
final List<Server> metaServers = new ArrayList<>();
final List<Server> noMetaServers = new ArrayList<>();
for (Server server : allServers) {
if (server instanceof DiscoveryEnabledServer) {
final DiscoveryEnabledServer discoveryEnabledServer = (DiscoveryEnabledServer) server;
final Set<Map.Entry<String, String>> attributes = Collections.unmodifiableSet(context.getAttributes().entrySet());
final Map<String, String> metadata = discoveryEnabledServer.getInstanceInfo().getMetadata();
if (metadata.isEmpty()) { // 如果没有meta数据 表示是测试服务上的地址
noMetaServers.add(server);
} else {
if (metadata.entrySet().containsAll(attributes)) { // 如果meta 满足
metaServers.add(server);
}
}
}
}

if (context.getAttributes().isEmpty()) { //默认走不带元数据的服务
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(noMetaServers, key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
} else { // 走自定义路由

if (metaServers.isEmpty()) {
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(noMetaServers, key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
} else {
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(metaServers, key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}

}
}

ThreadLocal 变量封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Setter
@Getter
public class RibbonFilterContext {

private final Map<String, String> attributes = new HashMap<>();

public RibbonFilterContext add(String key, String value) {
attributes.put(key, value);
return this;
}

public String get(String key) {
return attributes.get(key);
}

public RibbonFilterContext remove(String key) {
attributes.remove(key);
return this;
}

public Map<String, String> getAttributes() {
return Collections.unmodifiableMap(attributes);
}
}

public class RibbonFilterContextHolder {

private static final ThreadLocal<RibbonFilterContext> contextHolder = new InheritableThreadLocal<RibbonFilterContext>() {
@Override
protected RibbonFilterContext initialValue() {
return new RibbonFilterContext();
}
};

public static RibbonFilterContext getCurrentContext() {
return contextHolder.get();
}

public static void setCurrentContext(RibbonFilterContext context) {
contextHolder.set(context);
}

public static void clearCurrentContext() {
contextHolder.remove();
}
}

激活自定义Rule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

@Configuration
@ConditionalOnClass(DiscoveryEnabledNIWSServerList.class)
@AutoConfigureBefore(RibbonClientConfiguration.class)
public class RibbonMetaFilterAutoConfiguration {

@Bean
@ConditionalOnMissingBean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MetadataAwareRule metadataAwareRule() {
return new MetadataAwareRule();
}

/**
* 根据自己的选择判断时候激活改策略
* @return
*/
@Bean
public RibbonHystrixConcurrencyStrategy ribbonHystrixConcurrencyStrategy(){
return new RibbonHystrixConcurrencyStrategy();
}

}

对Feign进行扩展将developer进行传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

@Configuration
@EnableWebMvc
public class MyAutoConfigurationBefore extends WebMvcConfigurerAdapter {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new HandlerInterceptor() {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (request.getParameter("developer") != null) {
RibbonFilterContextHolder.getCurrentContext()
.add("developer", request.getParameter("developer"));
} else if (request.getHeader("developer") != null) {
RibbonFilterContextHolder.getCurrentContext()
.add("developer", request.getHeader("developer"));
}
return true;
}

@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {

}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
RibbonFilterContextHolder.clearCurrentContext();
}
});
}
}

/**
* @see feign.RequestInterceptor
* @return
*/
@Bean
public RequestInterceptor headerInterceptor() {
return requestTemplate -> {
final String developer = RibbonFilterContextHolder.getCurrentContext().get("developer");
if (StringUtils.isNotBlank(developer)) {
requestTemplate.header("developer", developer);
}
};
}
  • 如此便完成了灰度调用
  • 简单的说就是利用了threadlocal机制存储了从前端调用者传入的特殊参数
  • 在进行调用之前,拦截下负载均衡的choose方法,在调用之前对从注册中心获取到的所有server进行匹配
  • 成功则走匹配成功的server,匹配无一个成功的就走默认方法即可。