在 Canary Release 一文中作者说明了灰度发布的概念和流程
现在我们来看看在 Spring Cloud 技术体系下该如何实现灰度发布?相关的源码在 examples/example-cloud 。
如何标记那些服务是灰度服务? Eureka 服务注册中心支持元数据
Additional metadata can be added to the instance registration in the eureka.instance.metadataMap, and this metadata is accessible in the remote clients. In general, additional metadata does not change the behavior of the client, unless the client is made aware of the meaning of the metadata. There are a couple of special cases, described later in this document, where Spring Cloud already assigns meaning to the metadata map.
引用自 Eureka Metadata for Instances and Clients
因此在发布服务时在 application.yml 文件中增加元数据 eureka.instance.metadata-map.canary,值为 true 时表示该服务是灰度服务
1 2 3 4 eureka: instance: metadata-map: canary: true
如何标记那些请求是灰度请求? 标记一个请求是灰度请求还是正常请求有很多种方式,比如根据 IP 地址、用户 ID、时区等等来标记。可以在前端发起请求时打标记,也可以在后端入口处打标记。目前我们选择在网关根据登录用户名来标记请求是否为灰度请求,具体实现见 CustomRequestFilter.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { return reactiveJwtDecoder.decode(accessToken) .flatMap(jwt -> { ServerHttpRequest newRequest = exchange.getRequest().mutate() .header("X-User" , jwt.getSubject()) .header("X-Canary" , isCanary(jwt.getSubject())) .build(); ServerWebExchange newExchange = exchange.mutate().request(newRequest).build(); return chain.filter(newExchange); }) .onErrorResume(throwable -> chain.filter(exchange)); } private String isCanary (String subject) { return String.valueOf(subject != null && subject.equals("alice" )); }
灰度请求如何调用灰度服务 Spring Cloud LoadBalancer 允许自定义负载均衡器,参考 Switching between the load-balancing algorithms 和 Passing Your Own Spring Cloud LoadBalancer Configuration 。而 Spring Cloud Gateway 默认的负载均衡器是 RoundRobinLoadBalancer,它不区分服务是否为灰度服务,为了实现灰度请求调用灰度服务的目的需要参考 RoundRobinLoadBalancer 实现 CanaryRoundRobinLoadBalancer 。在这个负载均衡器里需要做两件事情,第一件事是拿到上一步设置在请求头里的灰度标记
1 2 3 4 5 6 public Mono<Response<ServiceInstance>> choose (Request request) { List<String> candidates = ((RequestDataContext) request.getContext()).getClientRequest().getHeaders().get("X-Canary" ); String canary = candidates != null && !candidates.isEmpty() ? candidates.getFirst() : "false" ; }
第二件事是把灰度服务和正常服务区分开来,这需要用到服务实例的元数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private Response<ServiceInstance> getInstanceResponse (List<ServiceInstance> instances, String canary) { List<ServiceInstance> normalInstances = new ArrayList <>(); List<ServiceInstance> canaryInstances = new ArrayList <>(); for (ServiceInstance instance : instances) { if (instance.getMetadata().get("canary" ) != null && instance.getMetadata().get("canary" ).equals("true" )) { canaryInstances.add(instance); } else { normalInstances.add(instance); } } if (canary.equals("true" )) { instances = canaryInstances.isEmpty() ? normalInstances : canaryInstances; } else { instances = normalInstances.isEmpty() ? canaryInstances : normalInstances; } }
有了负载均衡器后还需要定义它的配置类,注意这个配置类不要添加 @Configuration 注解
1 2 3 4 5 6 7 public class CanaryRoundRobinLoadBalancerClientConfiguration { @Bean ReactorLoadBalancer<ServiceInstance> randomLoadBalancer (Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new CanaryRoundRobinLoadBalancer (loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name); } }
接下来通过 @LoadBalancerClient 自定义每种服务使用的负载均衡算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @LoadBalancerClients( value = { // 使用 Eureka 作为注册中心时 name 必须与 Eureka 中注册的服务名保持一致,Eureka 在注册服务时会把名称转为大写形式, // 具体实现为 org.springframework.cloud.netflix.eureka.InstanceInfoFactory 类的 create 方 @LoadBalancerClient(name = "EXAMPLE-USER", configuration = CanaryRoundRobinLoadBalancerClientConfiguration.class), @LoadBalancerClient(name = "EXAMPLE-PRODUCT", configuration = CanaryRoundRobinLoadBalancerClientConfiguration.class), @LoadBalancerClient(name = "EXAMPLE-ORDER", configuration = CanaryRoundRobinLoadBalancerClientConfiguration.class), } ) public class ExampleGatewayApplication { public static void main (String[] args) { SpringApplication.run(ExampleGatewayApplication.class, args); } }
如何实现灰度服务之间的相互调用 首先,定义一个 Feign 拦截器将灰度标记在服务之间进行传递,见 CustomRequestInterceptor.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void apply (RequestTemplate template) { ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); if (requestAttributes != null ) { HttpServletRequest request = requestAttributes.getRequest(); String user = request.getHeader("X-User" ); if (user != null ) { template.header("X-User" , user); } String canary = request.getHeader("X-Canary" ); if (canary != null ) { template.header("X-Canary" , canary); } } }
其次,定义灰度负载均衡器和它的配置,这部分和网关一样,不再重复。
最后,通过 @LoadBalancerClient 自定义每种服务使用的负载均衡算法
1 2 3 4 5 6 7 8 9 10 11 12 @LoadBalancerClients( value = { // 这里的名字得用小写形式 @LoadBalancerClient(name = "example-user", configuration = CanaryRoundRobinLoadBalancerClientConfiguration.class), @LoadBalancerClient(name = "example-product", configuration = CanaryRoundRobinLoadBalancerClientConfiguration.class), } ) public class ExampleOrderApplication { public static void main (String[] args) { SpringApplication.run(ExampleOrderApplication.class, args); } }
答疑时间 为什么在网关自定义负载均衡时名称需要大写? 服务提供方注册大写形式的应用名称
The default application name (that is, the service ID), virtual host, and non-secure port (taken from the Environment) are ${spring.application.name}, ${spring.application.name} and ${server.port}, respectively.
Having spring-cloud-starter-netflix-eureka-client on the classpath makes the app into both a Eureka “instance” (that is, it registers itself) and a “client” (it can query the registry to locate other services). The instance behaviour is driven by eureka.instance.* configuration keys, but the defaults are fine if you ensure that your application has a value for spring.application.name (this is the default for the Eureka service ID or VIP).
引用自 Registering with Eureka
当使用 Eureka 作为服务注册中心时,Eureka 默认将 spring.application.name 的值作为应用名称或服务 ID,比如 example-user。我们配置名称都是小写形式,但是在网关使用 @LoadBalancerClient 注解自定义服务负载均衡时要求 name 属性的值必须使用大写形式,比如 EXAMPLE-USER,这是为什么呢?
Eureka 客户端在启动时通过 EurekaClientAutoConfiguration 向容器中注入了 ApplicationInfoManager,这个 Bean 在实例化时调用 InstanceInfoFactory 类的 create 方法把配置信息转换成 InstanceInfo 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 public InstanceInfo create (EurekaInstanceConfig config) { InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(); builder.setNamespace(namespace) .setAppName(config.getAppname()) .setInstanceId(config.getInstanceId()) }
注意到第 9 行的 setAppName 方法,就是在这里将应用名称强制转换为了大写形式
1 2 3 4 public Builder setAppName (String appName) { this .result.appName = (String)this .intern.apply(appName.toUpperCase(Locale.ROOT)); return this ; }
最终 com.netflix.discovery.DiscoveryClient 的 register 方法使用前面创建的 InstanceInfo 对象向服务注册中心注册服务。这部分是服务提供方的服务注册逻辑,下面来看看服务消费方的服务发现逻辑。
服务消费方构建大写形式的路由 在配置 spring.cloud.gateway.discovery.locator.enabled 的值为 true 时 GatewayDiscoveryClientAutoConfiguration 向容器中注入 DiscoveryClientRouteDefinitionLocator,这个 Bean 在实例化时从服务注册中心获取 ServiceInstance
1 2 3 4 5 6 public DiscoveryClientRouteDefinitionLocator (ReactiveDiscoveryClient discoveryClient, DiscoveryLocatorProperties properties) { this (discoveryClient.getClass().getSimpleName(), properties); serviceInstances = discoveryClient.getServices() .flatMap(service -> discoveryClient.getInstances(service).collectList()); }
在使用 Eureka 作为服务注册中心时 ServiceInstance 的实现类是 EurekaServiceInstance,这个类有一个成员变量 InstanceInfo,这个变量的内容和服务提供方注册的内容一致。
网关收到请求后 RoutePredicateHandlerMapping 的 getHandlerInternal 方法调用 lookupRoute 方法获取路由 Route,经过一系列代理后调用 RouteDefinitionRouteLocator 的 getRoutes 方法获取路由。RouteDefinitionRouteLocator#getRoutes 方法里调用 DiscoveryClientRouteDefinitionLocator#getRouteDefinitions 方法把 ServiceInstance 转换为 RouteDefinition
1 2 3 4 5 6 7 8 9 10 protected RouteDefinition buildRouteDefinition (Expression urlExpr, ServiceInstance serviceInstance) { String serviceId = serviceInstance.getServiceId(); RouteDefinition routeDefinition = new RouteDefinition (); routeDefinition.setId(this .routeIdPrefix + serviceId); String uri = urlExpr.getValue(this .evalCtxt, serviceInstance, String.class); routeDefinition.setUri(URI.create(uri)); routeDefinition.setMetadata(new LinkedHashMap <>(serviceInstance.getMetadata())); return routeDefinition; }
方法参数中 urlExpr 变量的格式为 'lb://'+serviceId,第 2 行 serviceInstance.getServiceId() 获取的是 InstanceInfo#appName 变量的值,这个值为大写形式的应用名称,比如 EXAMPLE-USER,因此第 5 行得到的 uri 为 lb://EXAMPLE-USER,这个值会设置到 Route#uri 变量中,即使用大写形式的应用名称作为路由 URI 的 scheme 部分。路由构建好后在 RoutePredicateHandlerMapping#getHandlerInternal 中将路由设置到 ServerWebExchange 的属性中 exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);。
路由转换为请求地址 在 RouteToRequestUrlFilter 中将路由 URI 和实际请求的 URI 进行合并
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); URI uri = exchange.getRequest().getURI(); boolean encoded = containsEncodedParts(uri); URI routeUri = route.getUri(); URI mergedUrl = UriComponentsBuilder.fromUri(uri) .scheme(routeUri.getScheme()) .host(routeUri.getHost()) .port(routeUri.getPort()) .build(encoded) .toUri(); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl); return chain.filter(exchange); }
合并的逻辑是用路由 URI 的 scheme、host、port 替换实际请求的 URI 的对应部分,比如请求的 URI 为 http://localhost:8080/user/1,路由的 URI 为 lb://EXAMPLE-USER,合并后的结果为 lb://EXAMPLE-USER/user/1。最后把合并后的 URI 设置到 ServerWebExchange 的 GATEWAY_REQUEST_URL_ATTR 属性中。
创建特定于每个应用名称的 Bean 容器 在 ReactiveLoadBalancerClientFilter 中触发为每个应用名称创建独立的 Bean 容器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); addOriginalRequestUrl(exchange, url); URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String serviceId = requestUri.getHost(); Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class); DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest <>(new RequestDataContext ( new RequestData (exchange.getRequest(), exchange.getAttributes()), getHint(serviceId))); return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> { }
第 7 行从 ServerWebExchange 中取出 GATEWAY_REQUEST_URL_ATTR 属性的值,即 lb://EXAMPLE-USER/user/1。第 8 行从 URI 变量中去除 HOST,即 EXAMPLE-USER,作为服务的 ID(应用名称)。第 10 行的 clientFactory.getInstances 在第一次调用时将触发创建特定于每个应用名称的 Bean 容器的过程
1 2 3 4 5 6 7 8 9 10 11 12 13 public GenericApplicationContext createContext (String name) { GenericApplicationContext context = buildContext(name); if (applicationContextInitializers.get(name) != null ) { applicationContextInitializers.get(name).initialize(context); context.refresh(); return context; } registerBeans(name, context); context.refresh(); return context; }
我们将重点看看第 10 行的 registerBeans 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void registerBeans (String name, GenericApplicationContext context) { Assert.isInstanceOf(AnnotationConfigRegistry.class, context); AnnotationConfigRegistry registry = (AnnotationConfigRegistry) context; if (this .configurations.containsKey(name)) { for (Class<?> configuration : this .configurations.get(name).getConfiguration()) { registry.register(configuration); } } for (Map.Entry<String, C> entry : this .configurations.entrySet()) { if (entry.getKey().startsWith("default." )) { for (Class<?> configuration : entry.getValue().getConfiguration()) { registry.register(configuration); } } } registry.register(PropertyPlaceholderAutoConfiguration.class, this .defaultConfigType); }
this.configurations 包含所有的 LoadBalancerClientSpecification 类型的 Bean,通过 @LoadBalancerClient 注解定义的也包含在内。第 4~8 行会注入我们定义的配置类 CanaryRoundRobinLoadBalancerClientConfiguration,从而覆盖第 16 行注入的 this.defaultConfigType,即 LoadBalancerClientConfiguration 配置类。这就是在网关自定义负载均衡时名称需要大写的原因。
为什么在 Feign 客户端自定义负载均衡时名称需要小写? 构建 Feign 客户端调用地址 在项目启动时 FeignClientsRegistrar 扫描 @FeignClient 注解标注的类注册 Feign 客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public void registerFeignClients (AnnotationMetadata metadata, BeanDefinitionRegistry registry) { LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet <>(); Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName()); final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients" ); if (clients == null || clients.length == 0 ) { ClassPathScanningCandidateComponentProvider scanner = getScanner(); scanner.setResourceLoader(this .resourceLoader); scanner.addIncludeFilter(new AnnotationTypeFilter (FeignClient.class)); Set<String> basePackages = getBasePackages(metadata); for (String basePackage : basePackages) { candidateComponents.addAll(scanner.findCandidateComponents(basePackage)); } } else { for (Class<?> clazz : clients) { candidateComponents.add(new AnnotatedGenericBeanDefinition (clazz)); } } for (BeanDefinition candidateComponent : candidateComponents) { if (candidateComponent instanceof AnnotatedBeanDefinition beanDefinition) { AnnotationMetadata annotationMetadata = beanDefinition.getMetadata(); Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface" ); Map<String, Object> attributes = annotationMetadata .getAnnotationAttributes(FeignClient.class.getCanonicalName()); String name = getClientName(attributes); String className = annotationMetadata.getClassName(); registerClientConfiguration(registry, name, className, attributes.get("configuration" )); registerFeignClient(registry, annotationMetadata, attributes); } } }
在当前使用 Feign 的方式下,即 @FeignClient(name = "example-user"),只需要注意第 33 行的 registerFeignClient 方法调用,在未配置 spring.cloud.openfeign.lazy-attributes-resolution 属性或者其值为 false 时将调用 eagerlyRegisterFeignClientBeanDefinition 方法注册 FeignClientFactoryBean
1 2 3 4 5 6 7 8 9 10 11 private void eagerlyRegisterFeignClientBeanDefinition (String className, Map<String, Object> attributes, BeanDefinitionRegistry registry) { validate(attributes); BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(FeignClientFactoryBean.class); String name = getName(attributes); definition.addPropertyValue("name" , name); String contextId = getContextId(null , attributes); definition.addPropertyValue("contextId" , contextId); }
这里需要注意第 5~9 行的 name 和 contextId 的值,在当前使用 Feign 的方式下它们的值都是 example-user。FeignClientFactoryBean 的 getTarget 方法会构建调用的 url
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <T> T getTarget () { FeignClientFactory feignClientFactory = beanFactory != null ? beanFactory.getBean(FeignClientFactory.class) : applicationContext.getBean(FeignClientFactory.class); Feign.Builder builder = feign(feignClientFactory); if (!StringUtils.hasText(url) && !isUrlAvailableInConfig(contextId)) { if (!name.startsWith("http://" ) && !name.startsWith("https://" )) { url = "http://" + name; } else { url = name; } url += cleanPath(); return (T) loadBalance(builder, feignClientFactory, new HardCodedTarget <>(type, name, url)); } }
这个方法构建的 url 为 http://example-user,域名是在 @FeignClient 注解定义的名称。
创建特定于每个 Feign 客户端的 Bean 容器 通过 Feign 客户端调用远程服务会进入 FeignBlockingLoadBalancerClient 的 execute 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public Response execute (Request request, Request.Options options) throws IOException { final URI originalUri = URI.create(request.url()); String serviceId = originalUri.getHost(); Assert.state(serviceId != null , "Request URI does not contain a valid hostname: " + originalUri); String hint = getHint(serviceId); DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest <>( new RequestDataContext (buildRequestData(request), hint)); Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator .getSupportedLifecycleProcessors( loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class); supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest)); ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest); org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse ( instance); if (instance == null ) { String message = "Load balancer does not contain an instance for the service " + serviceId; if (LOG.isWarnEnabled()) { LOG.warn(message); } supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext <ResponseData, ServiceInstance, RequestDataContext>( CompletionContext.Status.DISCARD, lbRequest, lbResponse))); return Response.builder() .request(request) .status(HttpStatus.SERVICE_UNAVAILABLE.value()) .body(message, StandardCharsets.UTF_8) .build(); } String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString(); Request newRequest = buildRequest(request, reconstructedUrl, instance); return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse, supportedLifecycleProcessors); }
第 2 行 originalUri 的值为 http://example-user/user/1,因此第 3 行获取到的 serviceId 为 example-user,这个值就是 @FeignClient 注解 name 属性的值。在第一次调用第 10 行的 loadBalancerClientFactory.getInstances 方法时会创建特定于每个 Feign 客户端的 Bean 容器,在刷新容器时会创建自定义的 CanaryRoundRobinLoadBalancer 对象。第 13 行的 loadBalancerClient.choose 调用会调用创建一个 DiscoveryClientServiceInstanceListSupplier 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 public DiscoveryClientServiceInstanceListSupplier (DiscoveryClient delegate, Environment environment) { this .serviceId = environment.getProperty(PROPERTY_NAME); resolveTimeout(environment); this .serviceInstances = Flux.defer(() -> Mono.fromCallable(() -> delegate.getInstances(serviceId))) .timeout(timeout, Flux.defer(() -> { logTimeout(); return Flux.just(new ArrayList <>()); }), Schedulers.boundedElastic()) .onErrorResume(error -> { logException(error); return Flux.just(new ArrayList <>()); }); }
第 2 行获取的 serviceId 为 example-user,第 4 行会调用 CompositeDiscoveryClient 的 getInstances 获取 ServiceInstance
1 2 3 4 5 6 7 8 9 10 11 public List<ServiceInstance> getInstances (String serviceId) { if (this .discoveryClients != null ) { for (DiscoveryClient discoveryClient : this .discoveryClients) { List<ServiceInstance> instances = discoveryClient.getInstances(serviceId); if (instances != null && !instances.isEmpty()) { return instances; } } } return Collections.emptyList(); }
第 4 行会调用 EurekaDiscoveryClient 的 getInstances 获取 ServiceInstance
1 2 3 4 5 6 7 8 public List<ServiceInstance> getInstances (String serviceId) { List<InstanceInfo> infos = this .eurekaClient.getInstancesByVipAddress(serviceId, false ); List<ServiceInstance> instances = new ArrayList <>(); for (InstanceInfo info : infos) { instances.add(new EurekaServiceInstance (info)); } return instances; }
第 2 行将 serviceId,即 example-user,当作 vipAddress 从服务注册中心获取 InstanceInfo,而 vipAddress 在服务注册时正是服务提供方的 spring.application.name 属性的值,它是小写形式,因此在 Feign 客户端自定义负载均衡时名称需要小写。
完~