• java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

  • java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

open-feign

目录

  • 目录
  • 注入feign
    • 重要的class
      • FeignClientsRegistrar
      • FeignClientFactoryBean
  • 注入client
    • HttpClientFeignLoadBalancerConfiguration
  • feign代理类的实现
    • ReflectiveFeign
      • 解析参数
    • 序列化与反序列化
      • SynchronousMethodHandler
    • 负载均衡
      • FeignBlockingLoadBalancerClient
  • feign与hystrix
    • hystrix-target
    • hystrix-feign
    • HystrixInvocationHandler

本文主要包含 feign-client的注册,feignClient调用的序列化,反序列化,ribon的重试
注入的过程类似spring-mybatis,不过一个是通过spring.factories文件内写配置文件,一个是通过ImportBeanDefinitionRegistrar,整体的代理过程也是非常类似,都是将参数封装成模板,交给执行器(client,sqlSession)去execute

注入feign

  1. 使用EnableFeignClients配置扫描路径或者类
  2. 遍历所有的路径或类,生成相应的beanDefinition(做一些校验,排除非接口类等)
  3. 获取bean name,配置编码解吗,注入feignClientFactory

重要的class

class/annotation 参数 功能简介
@EnableFeignClients 扫描包,将@FeignClient注解的接口托管给spring
FeignClientsRegistrar 实现了ImportBeanDefinitionRegistrar,ResourceLoaderAware,EnvironmentAware。很明显,是托管扫描的@FeignClient,托管给Spring

FeignClientsRegistrar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
	public void registerFeignClients(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {

LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>();
Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());
AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter( FeignClient.class);
final Class<?>[] clients = attrs == null ? null: (Class<?>[]) attrs.get("clients");
// @EnableFeignClients注解中如果未指定FeignClient,就扫描注解所在basePackage
if (clients == null || clients.length == 0) {
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);
scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class));
Set<String> basePackages = getBasePackages(metadata);
for (String basePackage : basePackages) {
candidateComponents.addAll(scanner.findCandidateComponents(basePackage));
}
}
// 指定了feignClient就不再去扫描
else {
for (Class<?> clazz : clients) {
candidateComponents.add(new AnnotatedGenericBeanDefinition(clazz));
}
}
// 解析并注入AnnotatedGenericBeanDefinition
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
// 校验是否为接口
Assert.isTrue(annotationMetadata.isInterface(),
"@FeignClient can only be specified on an interface");

//
Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(FeignClient.class.getCanonicalName());

String name = getClientName(attributes);
registerClientConfiguration(registry, name,
attributes.get("configuration"));
// 解析注入spring
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}

private void registerFeignClient(BeanDefinitionRegistry registry,
AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
// 看来feign包装的类就是FeignClientFactory中的getObject()的了
BeanDefinitionBuilder definition = BeanDefinitionBuilder
.genericBeanDefinition(FeignClientFactoryBean.class);
validate(attributes);
// ..... 基本就是设置一堆值,url,别名,fallback,fallbackFactory等。
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, className);

// has a default, won't be null
boolean primary = (Boolean) attributes.get("primary");

beanDefinition.setPrimary(primary);

String qualifier = getQualifier(attributes);
if (StringUtils.hasText(qualifier)) {
alias = qualifier;
}
// 这种写法记一下,之前没用过BeanDefinitionHolder这种注册方式
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,new String[] { alias });
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}


FeignClientFactoryBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class FeignClientFactoryBean
implements FactoryBean<Object>, InitializingBean, ApplicationContextAware {
@Override
public Object getObject() throws Exception {
return getTarget();
}
<T> T getTarget() {
// FeignContext实现了NamedContextFactory<FeignClientSpecification>,用于创建上下文.内部是个放context的map。统一维护各个feignclient相互隔离得上下文
FeignContext context = applicationContext.getBean(FeignContext.class);
// 设置了encoder和decoder(default : org.springframework.cloud.openfeign.support.PageableSpringEncoder)
Feign.Builder builder = feign(context);

if (!StringUtils.hasText(url)) {
if (!name.startsWith("http")) {
url = "http://" + name;
}
else {
url = name;
}

url += cleanPath();
// 通常使用在此返回了feign obj
// 一步步点进去,发现最后是通过Feign接口的newInstance(Target<T> target) 方法,按照接口注入代理的套路来说,应该会有个方法的代理实现,下面分析
return (T) loadBalance(builder, context,
new HardCodedTarget<>(type, name, url));
}

// http类,设置了url则走此方法创建feignClient
if (StringUtils.hasText(url) && !url.startsWith("http")) {
url = "http://" + url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not load balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient) client).getDelegate();
}
if (client instanceof FeignBlockingLoadBalancerClient) {
// not load balancing because we have a url,
// but Spring Cloud LoadBalancer is on the classpath, so unwrap
client = ((FeignBlockingLoadBalancerClient) client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
return (T) targeter.target(this, builder, context,
new HardCodedTarget<>(type, name, url));
}
}

注入client

以ApacheHttpClient,BlockingLoadBalancerClient为例,装配及注入在HttpClientFeignLoadBalancerConfiguration类中

HttpClientFeignLoadBalancerConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(ApacheHttpClient.class)
@ConditionalOnBean(BlockingLoadBalancerClient.class)
@ConditionalOnProperty(value = "feign.httpclient.enabled", matchIfMissing = true)
@Import(HttpClientFeignConfiguration.class)
class HttpClientFeignLoadBalancerConfiguration {

@Bean
@ConditionalOnMissingBean
@Conditional(OnRetryNotEnabledCondition.class)
public Client feignClient(BlockingLoadBalancerClient loadBalancerClient,
HttpClient httpClient) {
ApacheHttpClient delegate = new ApacheHttpClient(httpClient);
return new FeignBlockingLoadBalancerClient(delegate, loadBalancerClient);
}

@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
@ConditionalOnBean(LoadBalancedRetryFactory.class)
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled",
havingValue = "true", matchIfMissing = true)
public Client feignRetryClient(BlockingLoadBalancerClient loadBalancerClient,
HttpClient httpClient,
List<LoadBalancedRetryFactory> loadBalancedRetryFactories) {
AnnotationAwareOrderComparator.sort(loadBalancedRetryFactories);
ApacheHttpClient delegate = new ApacheHttpClient(httpClient);
return new RetryableFeignBlockingLoadBalancerClient(delegate, loadBalancerClient,
loadBalancedRetryFactories.get(0));
}

}

feign代理类的实现

ReflectiveFeign

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public <T> T newInstance(Target<T> target) {
// 处理将所有方法的注解参数,为最后一步创建 methodHandler做准备
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();

for (Method method : target.type().getMethods()) {
// Object的方法跳过(wait,notify,hash,equals等)
if (method.getDeclaringClass() == Object.class) {
continue;
} else if (Util.isDefault(method)) {
// 接口中default方法,放入defaultMethodHandler
// defaultMethodHandler,只是把参数放进去直接执行即可
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
//第一步 targetToHandlersByName.apply(target); 创建好了实际上
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
InvocationHandler handler = factory.create(target, methodToHandler);
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
new Class<?>[] {target.type()}, handler);

for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}

return proxy;
}

解析参数

通过内部类ParseHandlersByName,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static final class ParseHandlersByName{
// 处理接口上的注解 通过parseAndValidateMetadata这个方法
private final Contract contract;
// 超时重定向等内容
private final Options options;
private final Encoder encoder;
private final Decoder decoder;
// 出错得到的结果,解码为定义类型
private final ErrorDecoder errorDecoder;
private final QueryMapEncoder queryMapEncoder;
private final SynchronousMethodHandler.Factory factory;

public Map<String, MethodHandler> apply(Target target) {
List<MethodMetadata> metadata = contract.parseAndValidateMetadata(target.type());
Map<String, MethodHandler> result = new LinkedHashMap<String, MethodHandler>();
for (MethodMetadata md : metadata) {
BuildTemplateByResolvingArgs buildTemplate;

// 构造BuildTemplateByResolvingArgs,实现了RequestTemplate.Factory#create方法,返回RequestTemplate
// 都传入了encoder,也就是这个地方可以进行序列化
if (!md.formParams().isEmpty() && md.template().bodyTemplate() == null) {

buildTemplate =
new BuildFormEncodedTemplateFromArgs(md, encoder, queryMapEncoder, target);
} else if (md.bodyIndex() != null) {
buildTemplate = new BuildEncodedTemplateFromArgs(md, encoder, queryMapEncoder, target);
} else {
buildTemplate = new BuildTemplateByResolvingArgs(md, queryMapEncoder, target);
}


if (md.isIgnored()) {
result.put(md.configKey(), args -> {
throw new IllegalStateException(md.configKey() + " is not a method handled by feign");
});
} else {
// 创建SynchronousMethodHandler
result.put(md.configKey(),
factory.create(target, md, buildTemplate, options, decoder, errorDecoder));
}
}
return result;
}
}

序列化与反序列化

SynchronousMethodHandler

这个类就是真正feignClient中的接口调用的invoke方法了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Override
public Object invoke(Object[] argv) throws Throwable {
// 封装成请求模板
// 包含header,queries,bodyTempalte,method,charSet....等参数
RequestTemplate template = buildTemplateFromArgs.create(argv);
Options options = findOptions(argv);
Retryer retryer = this.retryer.clone();
// 上来一个死循环,老外是真的牛皮,可以支持重试
while (true) {
try {
return executeAndDecode(template, options);
} catch (RetryableException e) {
try {
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}

Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
Request request = targetRequest(template);

if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}

Response response;
long start = System.nanoTime();
try {
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 12
response = response.toBuilder()
.request(request)
.requestTemplate(template)
.build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);


if (decoder != null)
return decoder.decode(response, metadata.returnType());

CompletableFuture<Object> resultFuture = new CompletableFuture<>();
asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
metadata.returnType(),
elapsedTime);

try {
if (!resultFuture.isDone())
throw new IllegalStateException("Response handling not done");

return resultFuture.join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause != null)
throw cause;
throw e;
}
}

负载均衡

ribbon的实现为LoadBalancerFeignClient,略微复杂一点,和ribbon结合不够完美。用了装饰者模式进行二次封装
本文就以spring-cloud-loadBalancer实现看源码

FeignBlockingLoadBalancerClient

主要通过负载均衡获取服务的注册信息,根据负载均衡策略获取ServiceInstance,构建出真实的URL,然后封装成统一的Request交给delegate去执行请求。最后将响应的Response交给SynchronousMethodHandler#executeAndDecode方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class FeignBlockingLoadBalancerClient implements Client {

// apacheHttp或者okHttp, 实现的Client接口。
private final Client delegate;

// 负载均衡
private final BlockingLoadBalancerClient loadBalancerClient;

// 通过Apache的实例或者OkHttp的实例去执行真正的请求
@Override
public Response execute(Request request, Request.Options options) throws IOException {
final URI originalUri = URI.create(request.url());
String serviceId = originalUri.getHost();
Assert.state(serviceId != null,
"Request URI does not contain a valid hostname: " + originalUri);
ServiceInstance instance = loadBalancerClient.choose(serviceId);
if (instance == null) {
String message = "Load balancer does not contain an instance for the service "
+ serviceId;
if (LOG.isWarnEnabled()) {
LOG.warn(message);
}
return Response.builder().request(request)
.status(HttpStatus.SERVICE_UNAVAILABLE.value())
.body(message, StandardCharsets.UTF_8).build();
}
String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri)
.toString();
Request newRequest = Request.create(request.httpMethod(), reconstructedUrl,
request.headers(), request.body(), request.charset(),
request.requestTemplate());
return delegate.execute(newRequest, options);
}

}

feign与hystrix

在FeignClientsConfiguration类中可以找到如下代码,即有hystrixCommand,hystrixFeign类的时候创建hystrixFeignClientBuidler.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
protected static class HystrixFeignConfiguration {

@Bean
@Scope("prototype")
@ConditionalOnMissingBean
//boolean matchIfMissing() default false;默认为false
@ConditionalOnProperty(name = "feign.hystrix.enabled")
public Feign.Builder feignHystrixBuilder() {
return HystrixFeign.builder();
}

}

在FeignAutoConfiguration类中

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(name = "feign.hystrix.HystrixFeign")
protected static class HystrixFeignTargeterConfiguration {

@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new HystrixTargeter();
}

}

hystrix-target

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  // 核心也就是这段代码了
@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign,
FeignContext context, Target.HardCodedTarget<T> target) {
if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
return feign.target(target);
}
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
String name = StringUtils.isEmpty(factory.getContextId()) ? factory.getName()
: factory.getContextId();
//这个类有点类似之前的contract.主要是解析出来构造hystrixCommand用.
SetterFactory setterFactory = getOptional(name, context, SetterFactory.class);
if (setterFactory != null) {
builder.setterFactory(setterFactory);
}
Class<?> fallback = factory.getFallback();
if (fallback != void.class) {
return targetWithFallback(name, context, target, builder, fallback);
}
Class<?> fallbackFactory = factory.getFallbackFactory();
if (fallbackFactory != void.class) {
return targetWithFallbackFactory(name, context, target, builder,
fallbackFactory);
}

return feign.target(target);
}

hystrix-feign

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

Feign build(final FallbackFactory<?> nullableFallbackFactory) {
// 在ReflectiveFeign#newInstance中的
// InvocationHandler handler = factory.create(target, methodToHandler);
// 替换了invocationHandler.
super.invocationHandlerFactory(new InvocationHandlerFactory() {
@Override
public InvocationHandler create(Target target,
Map<Method, MethodHandler> dispatch) {
return new HystrixInvocationHandler(target, dispatch, setterFactory,
nullableFallbackFactory);
}
});
super.contract(new HystrixDelegatingContract(contract));
return super.build();
}

HystrixInvocationHandler

```java
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
// early exit if the invoked method is from java.lang.Object
// code is the same as ReflectiveFeign.FeignInvocationHandler
if (“equals”.equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if (“hashCode”.equals(method.getName())) {
return hashCode();
} else if (“toString”.equals(method.getName())) {
return toString();
}

HystrixCommand<Object> hystrixCommand =
    new HystrixCommand<Object>(setterMethodMap.get(method)) {
      @Override
      protected Object run() throws Exception {
        try {
          return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
        } catch (Exception e) {
          throw e;
        } catch (Throwable t) {
          throw (Error) t;
        }
      }

      @Override
      protected Object getFallback() {
        if (fallbackFactory == null) {
          return super.getFallback();
        }
        try {
          Object fallback = fallbackFactory.create(getExecutionException());
          Object result = fallbackMethodMap.get(method).invoke(fallback, args);
          if (isReturnsHystrixCommand(method)) {
            return ((HystrixCommand) result).execute();
          } else if (isReturnsObservable(method)) {
            // Create a cold Observable
            return ((Observable) result).toBlocking().first();
          } else if (isReturnsSingle(method)) {
            // Create a cold Observable as a Single
            return ((Single) result).toObservable().toBlocking().first();
          } else if (isReturnsCompletable(method)) {
            ((Completable) result).await();
            return null;
          } else if (isReturnsCompletableFuture(method)) {
            return ((Future) result).get();
          } else {
            return result;
          }
        } catch (IllegalAccessException e) {
          // shouldn't happen as method is public due to being an interface
          throw new AssertionError(e);
        } catch (InvocationTargetException | ExecutionException e) {
          // Exceptions on fallback are tossed by Hystrix
          throw new AssertionError(e.getCause());
        } catch (InterruptedException e) {
          // Exceptions on fallback are tossed by Hystrix
          Thread.currentThread().interrupt();
          throw new AssertionError(e.getCause());
        }
      }
    };

if (Util.isDefault(method)) {
  return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
  return hystrixCommand;
} else if (isReturnsObservable(method)) {
  // Create a cold Observable
  return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
  // Create a cold Observable as a Single
  return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
  return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
  return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();

}
```

loadbalancer
distribution-introduction
  1. 1. 目录
  2. 2. 注入feign
    1. 2.1. 重要的class
      1. 2.1.1. FeignClientsRegistrar
      2. 2.1.2. FeignClientFactoryBean
  3. 3. 注入client
    1. 3.1. HttpClientFeignLoadBalancerConfiguration
  4. 4. feign代理类的实现
    1. 4.1. ReflectiveFeign
      1. 4.1.1. 解析参数
    2. 4.2. 序列化与反序列化
      1. 4.2.1. SynchronousMethodHandler
    3. 4.3. 负载均衡
      1. 4.3.1. FeignBlockingLoadBalancerClient
  5. 5. feign与hystrix
    1. 5.1. hystrix-target
    2. 5.2. hystrix-feign
    3. 5.3. HystrixInvocationHandler
© 2023 haoxp
Hexo theme