代码示例
cloudB
cloudA-api
cloudA
feign对象注册源码解析
FeignClientsRegistrar
FeignClientsRegistrar实现了ImportBeanDefinitionRegistrar
当启动类自动扫描构造beanDefinition的时候,进行beanDefinition注册
ImportBeanDefinitionRegistrar回调registerBeanDefinitions
registerFeignClients
通过@EnableFeignClients下的包路径扫描,找到@FeignClient的接口,这里扫描到一个我们自定义feign接口
这里会对feign做校验(是否为接口)
registerClientConfiguration
注册bd,用FeignClientSpecification来做乘载。beanName为 demo-cloud-A.FeignClientSpecification
registerFeignClient
在这里创建FeignClientFactoryBean,他是FactoryBean。
再次注册一个bd,不过这里的构造方法是Supplier
将Supplier传递到bd中
创建bean实例时
判断bd中的instanceSupplier如果不为空,通过instanceSupplier来创建实例
FeignClientsRegistrar#lambda
factoryBean.getObject
Feign.Builder默认实现为openFeign的FeignCircuitBreaker.Builder,如果我们使用Sentinel,那么实现类为SentinelFeign.Builder
loadBalance
这里的第三个参数HardCodedTarget,通过new HardCodedTarget<>(type, name, url)构造
获取Feign的断路器Targeter,由FeignCircuitBreakerTargeter实现
调用targeter.target
loadBalance => targeter.target
判断是否为openFeign默认的Builder,这里我们使用的Sentinel
build方法由SentinelFeign实现
最终通过super.build()来构造Feign对象并返回
调用Feign.newInstance(target),属性如下图所示
基于JDK的动态代理对象,代理的类为HardCodedTarget,接口为Target,InvocationHandler为SentinelInvocationHandler
调用过程源码解析
userClient
userClient是一个jdk动态代理对象,创建过程在上面
SentinelInvocationHandler
methodHandler.invoke
在执行调用前,会初始化RequestTemplate和Options
executeAndDecode
开始执行http
targetRequest 拦截器执行,创建Request对象
在targetRequest中,会执行拦截器的apply方法
在这里我们创建了一个传递header的interceptor
自定义interceptor,向requestTemplate传递header数据
最终构造了一个Request对象
client.execute
通过FeignBlockingLoadBalancerClient调用
client.execute # loadBalancerClient.choose
loadBalancerClient.choose,这里的loadBalancerClient是BlockingLoadBalancerClient(org.springframework.cloud.loadbalancer.blocking.client包下)
负载均衡算法:RoundRobinLoadBalancer
这里的loadbalancer是使用的RoundRobinLoadBalancer
RoundRobinLoadBalancer是一种常见的负载均衡算法,其主要思想是按照轮询的方式将请求分配给后端的多个服务器,在每一轮中均匀地选择其中一个服务器处理请求。
具体来说,RoundRobinLoadBalancer维护一个列表,其中包含了所有可用的后端服务器。在每次请求到来时,该算法会依次选择列表中的下一个服务器,并将该请求发送给它。当到达列表的末尾时,算法会重新从列表的开头开始选择服务器。这种方式可以确保每个服务器在处理请求时都能够平均分配负载,从而避免了任何一个服务器被过度利用而导致性能问题。
需要注意的是,RoundRobinLoadBalancer在实际使用中也有一些问题。例如,如果其中一个服务器处理请求的时间比其他服务器长,那么该服务器上的积压请求会增加,而其他服务器可能会一直处于空闲状态。此外,如果列表中的某一个服务器已经失效,算法可能会一直选择该服务器,从而导致请求失败。为了解决这些问题,通常需要采用一些额外的技术手段来优化负载均衡算法的性能。
会通过nacos去找serviceId的客户端服务信息
nacos服务发现寻址
先从缓存获取ServiceInfo
如果获取不到,通过grpc或者http去请求服务端,拉取最新的可用列表
最后解析完毕后,通过HttpURLConnection调用(需要更改成okhttp,在优化点中)
openFeign优化点
Feign拦截器,传递请求头,将MDC放入请求头
public class FeignRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
Enumeration<String> headerNames = request.getHeaderNames();
if (headerNames != null) {
while (headerNames.hasMoreElements()) {
String name = headerNames.nextElement();
String values = request.getHeader(name);
requestTemplate.header(name, values);
}
}
requestTemplate.header(LogConstant.TRACE_ID, MDC.get(LogConstant.TRACE_ID));
}
}
@Bean
public FeignRequestInterceptor feignRequestInterceptor() {
return new FeignRequestInterceptor();
}
整合okhttp
默认的情况下,openFeign使用的上是HttpURLConnection发起请求,openFeign每次需要创建一个新的请求,而不是使用的链接池,所以我们的需要替换掉这个默认的实现,改用一个有链接池的实现。
<!-- 替换默认的HttpURLConnection,改为okhttp,并添加链接池-->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
<version>11.9.1</version>
</dependency>
okhttp配置
@Data
@ConfigurationProperties("hope.feign.okhttp")
public class OkHttpProperties {
/**
* 是否支持重定向,默认:true
*/
boolean followRedirects = true;
/**
* 链接超时时间,单位毫秒
*/
int connectTimeout = 5000;
/**
* 禁用ssl验证
*/
boolean disableSslValidation = true;
/**
* 读超时,单位毫秒
*/
int readTimeout = 15000;
/**
* 写超时,单位毫秒
*/
int writeTimeout = 15000;
/**
* 是否自动重连
*/
boolean retryOnConnectionFailure = true;
/**
* 最大空闲链接
*/
int maxIdleConnections = 10;
/**
* 默认保持5分钟
*/
long keepAliveDuration = 1000 * 60 * 5L;
}
/**
* okhttp feign配置
*/
@AutoConfiguration
@ConditionalOnClass({OkHttpClient.class})
@ConditionalOnMissingBean({okhttp3.OkHttpClient.class})
@ConditionalOnProperty(value = "feign.okhttp.enabled", havingValue = "true")
@EnableConfigurationProperties(OkHttpProperties.class)
class OkHttpFeignConfiguration {
private okhttp3.OkHttpClient okHttpClient;
@Bean
public okhttp3.OkHttpClient client(OkHttpClientFactory httpClientFactory, OkHttpProperties properties, OkHttpClientConnectionPoolFactory connectionPoolFactory) {
this.okHttpClient = httpClientFactory.createBuilder(properties.disableSslValidation)
// 链接超时时间
.connectTimeout(properties.getConnectTimeout(), TimeUnit.MILLISECONDS)
// 是否禁用重定向
.followRedirects(properties.isFollowRedirects())
//设置读超时
.readTimeout(properties.getReadTimeout(), TimeUnit.MILLISECONDS)
//设置写超时
.writeTimeout(properties.getWriteTimeout(), TimeUnit.MILLISECONDS)
// 链接失败是否重试
.retryOnConnectionFailure(properties.isRetryOnConnectionFailure())
//链接池
.connectionPool(connectionPoolFactory.create(properties.getMaxIdleConnections(), properties.getKeepAliveDuration(), TimeUnit.MILLISECONDS))
.addInterceptor(new OkHttpFeignLoggingInterceptor())
.build();
return this.okHttpClient;
}
@PreDestroy
public void destroy() {
if (this.okHttpClient != null) {
this.okHttpClient.dispatcher().executorService().shutdown();
this.okHttpClient.connectionPool().evictAll();
}
}
}
okhttp拦截器,用来打印feign请求日志
@Slf4j
public class OkHttpFeignLoggingInterceptor implements Interceptor {
private static final Charset UTF8 = StandardCharsets.UTF_8;
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
Response response;
StringBuilder builder = StrUtil.builder();
builder.append("================ Feign Start ================").append("\n");
try {
RequestBody requestBody = request.body();
boolean hasRequestBody = requestBody != null;
Connection connection = chain.connection();
String requestStartMessage = "==> "
+ request.method()
+ ' ' + request.url()
+ (connection != null ? " " + connection.protocol() : "");
builder.append(requestStartMessage).append("\n");
if (hasRequestBody) {
// Request body headers are only present when installed as a network interceptor. Force
// them to be included (when available) so there values are known.
if (requestBody.contentType() != null) {
builder.append("Content-Type: ").append(requestBody.contentType()).append("\n");
}
if (requestBody.contentLength() != -1) {
builder.append("Content-Length: ").append(requestBody.contentLength()).append("\n");
}
}
Headers headers = request.headers();
for (int i = 0, count = headers.size(); i < count; i++) {
String name = headers.name(i);
// Skip headers from the request body as they are explicitly logged above.
if (!"Content-Type".equalsIgnoreCase(name) && !"Content-Length".equalsIgnoreCase(name)) {
builder.append(name).append(": ").append(headers.value(i)).append("\n");
}
}
if (!hasRequestBody) {
builder.append("==> END ").append(request.method()).append("\n");
} else if (bodyHasUnknownEncoding(request.headers())) {
builder.append("==> END ").append(request.method()).append(" (encoded body omitted)").append("\n");
} else {
Buffer buffer = new Buffer();
requestBody.writeTo(buffer);
Charset charset = UTF8;
MediaType contentType = requestBody.contentType();
if (contentType != null) {
charset = contentType.charset(UTF8);
}
if (isPlaintext(buffer)) {
builder.append(buffer.readString(charset)).append("\n");
builder.append("==> END ").append(request.method()).append(" (").append(requestBody.contentLength()).append("-byte body)").append("\n");
} else {
builder.append("==> END ").append(request.method()).append(" (binary ").append(requestBody.contentLength()).append("-byte body omitted)").append("\n");
}
}
long startNs = System.nanoTime();
try {
response = chain.proceed(request);
} catch (Exception e) {
builder.append("<-- HTTP FAILED: ").append(e).append("\n");
throw e;
}
long tookMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
ResponseBody responseBody = response.body();
long contentLength = responseBody.contentLength();
builder.append("<-- ").append(response.code()).append(response.message().isEmpty() ? "" : ' ' + response.message()).append(' ').append(response.request().url()).append(" (").append(tookMs).append("ms").append(')').append("\n");
Headers responseHeader = response.headers();
int count = responseHeader.size();
for (int i = 0; i < count; i++) {
builder.append(responseHeader.name(i)).append(": ").append(responseHeader.value(i)).append("\n");
}
if (!HttpHeaders.hasBody(response)) {
builder.append("<-- END HTTP").append("\n");
} else if (bodyHasUnknownEncoding(response.headers())) {
builder.append("<-- END HTTP (encoded body omitted)").append("\n");
} else {
BufferedSource source = responseBody.source();
// Buffer the entire body.
source.request(Long.MAX_VALUE);
Buffer buffer = source.getBuffer();
Long gzippedLength = null;
if ("gzip".equalsIgnoreCase(responseHeader.get("Content-Encoding"))) {
gzippedLength = buffer.size();
GzipSource gzippedResponseBody = null;
try {
gzippedResponseBody = new GzipSource(buffer.clone());
buffer = new Buffer();
buffer.writeAll(gzippedResponseBody);
} finally {
if (gzippedResponseBody != null) {
gzippedResponseBody.close();
}
}
}
Charset charset = UTF8;
MediaType contentType = responseBody.contentType();
if (contentType != null) {
charset = contentType.charset(UTF8);
}
if (!isPlaintext(buffer)) {
builder.append("<-- END HTTP (binary ").append(buffer.size()).append("-byte body omitted)").append("\n");
return response;
}
if (contentLength != 0) {
builder.append(buffer.clone().readString(charset)).append("\n");
}
if (gzippedLength != null) {
builder.append("<-- END HTTP (").append(buffer.size()).append("-byte, ").append(gzippedLength).append("-gzipped-byte body)").append("\n");
} else {
builder.append("<-- END HTTP (").append(buffer.size()).append("-byte body)").append("\n");
}
}
} finally {
builder.append("================ Feign End ================").append("\n");
log.info("\n\n" + builder.toString());
}
return response;
}
/**
* Returns true if the body in question probably contains human readable text. Uses a small sample
* of code points to detect unicode control characters commonly used in binary file signatures.
*/
private static boolean isPlaintext(Buffer buffer) {
try {
Buffer prefix = new Buffer();
long byteCount = buffer.size() < 64 ? buffer.size() : 64;
buffer.copyTo(prefix, 0, byteCount);
for (int i = 0; i < 16; i++) {
if (prefix.exhausted()) {
break;
}
int codePoint = prefix.readUtf8CodePoint();
if (Character.isISOControl(codePoint) && !Character.isWhitespace(codePoint)) {
return false;
}
}
return true;
} catch (EOFException e) {
// Truncated UTF-8 sequence.
return false;
}
}
private boolean bodyHasUnknownEncoding(Headers headers) {
String contentEncoding = headers.get("Content-Encoding");
return contentEncoding != null
&& !"identity".equalsIgnoreCase(contentEncoding)
&& !"gzip".equalsIgnoreCase(contentEncoding);
}
}
开启请求压缩功能
feign:
# 开启压缩功能
compression:
request:
enabled: true
mime-types: text/xml,application/xml,application/json
min-request-size: 2048
response:
enabled: true
添加LoadBalancerCacheManager
项目启动时出现
iguration$LoadBalancerCaffeineWarnLogger : Spring Cloud LoadBalancer is currently working with the default cache. While this cache implementation is useful for development and tests, it’s recommended to use Caffeine cache in production.You can switch to using Caffeine cache, by adding it and org.springframework.cache.caffeine.CaffeineCacheManager to the classpath.
配置超时时间
feign:
client:
config:
# 设置超时,囊括了okhttp的超时,okhttp属于真正执行的超时,openFeign属于服务间的超时
# 设置全局超时时间
default:
connectTimeout: 2000
readTimeout: 5000
# 针对特定contextId设置超时时间
xxx-server:
connectTimeout: 1000
readTimeout: 2000