SpringCloud升級之路2020.0.x版-26.OpenFeign的組件

本系列代碼地址://github.com/JoJoTec/spring-cloud-parent

首先,我們給出官方文檔中的組件結構圖:
MindMap overview

官方文檔中的組件,是以實現功能為維度的,我們這裡是以源碼實現為維度的(因為之後我們使用的時候,需要根據需要定製這些組件,所以需要從源碼角度去拆分分析),可能會有一些小差異。

負責解析類元數據的 Contract

OpenFeign 是通過代理類元數據來自動生成 HTTP API 的,那麼到底解析哪些類元數據,哪些類元數據是有效的,是通過指定 Contract 來實現的,我們可以通過實現這個 Contract 來自定義一些類元數據的解析,例如,我們自定義一個註解:

//僅可用於方法上
@java.lang.annotation.Target(METHOD)
//指定註解保持到運行時
@Retention(RUNTIME)
@interface Get {
    //請求 uri
    String uri();
}

這個註解很簡單,標註了這個註解的方法會被自動封裝成 GET 請求,請求 uri 為 uri() 的返回。

然後,我們自定義一個 Contract 來處理這個註解。由於 MethodMetadata 是 final 並且是 package private 的,所以我們只能繼承 Contract.BaseContract 去自定義註解解析:

//外部自定義必須繼承 BaseContract,因為裏面生成的 MethodMetadata 的構造器是 package private 的
static class CustomizedContract extends Contract.BaseContract {
    @Override
    protected void processAnnotationOnClass(MethodMetadata data, Class<?> clz) {
        //處理類上面的註解,這裡沒用到
    }
    @Override
    protected void processAnnotationOnMethod(MethodMetadata data, Annotation annotation, Method method) {
        //處理方法上面的註解
        Get get = method.getAnnotation(Get.class);
        //如果 Get 註解存在,則指定方法 HTTP 請求方式為 GET,同時 uri 指定為註解 uri() 的返回
        if (get != null) {
            data.template().method(Request.HttpMethod.GET);
            data.template().uri(get.uri());
        }
    }
    @Override
    protected boolean processAnnotationsOnParameter(MethodMetadata data, Annotation[] annotations, int paramIndex) {
        //處理參數上面的註解,這裡沒用到
        return false;
    }
}

然後,我們來使用這個 Contract:

interface HttpBin {
    @Get(uri = "/get")
    String get();
}

public static void main(String[] args) {
    HttpBin httpBin = Feign.builder()
            .contract(new CustomizedContract())
            .target(HttpBin.class, "//www.httpbin.org");
    //實際上就是調用 //www.httpbin.org/get
    String s = httpBin.get();
}

一般的,我們不會使用這個 Contract,因為我們業務上一般不會自定義註解。這是底層框架需要用的功能。比如在 spring-mvc 環境下,我們需要兼容 spring-mvc 的註解,這個實現類就是 SpringMvcContract

編碼器 Encoder 與解碼器 Decoder

編碼器與解碼器接口定義:

public interface Decoder {
  Object decode(Response response, Type type) throws IOException, DecodeException, FeignException;
}
public interface Encoder {
  void encode(Object object, Type bodyType, RequestTemplate template) throws EncodeException;
}

OpenFeign 可以自定義編碼解碼器,我們這裡使用 FastJson 自定義實現一組編碼與解碼器,來了解其中使用的原理。

/**
 * 基於 FastJson 的反序列化解碼器
 */
static class FastJsonDecoder implements Decoder {
    @Override
    public Object decode(Response response, Type type) throws IOException, DecodeException, FeignException {
        //讀取 body
        byte[] body = response.body().asInputStream().readAllBytes();
        return JSON.parseObject(body, type);
    }
}

/**
 * 基於 FastJson 的序列化編碼器
 */
static class FastJsonEncoder implements Encoder {
    @Override
    public void encode(Object object, Type bodyType, RequestTemplate template) throws EncodeException {
        if (object != null) {
            //編碼 body
            template.header(CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
            template.body(JSON.toJSONBytes(object), StandardCharsets.UTF_8);
        }
    }
}

然後,我們通過 //httpbin.org/anything 來測試,這個鏈接會返回我們發送的請求的一切元素。

interface HttpBin {
    @RequestLine("POST /anything")
    Object postBody(Map<String, String> body);
}

public static void main(String[] args) {
    HttpBin httpBin = Feign.builder()
            .decoder(new FastJsonDecoder())
            .encoder(new FastJsonEncoder())
            .target(HttpBin.class, "//www.httpbin.org");
    Object o = httpBin.postBody(Map.of("key", "value"));
}

查看響應,可以看到我們發送的 json body 被正確的接收到了。

目前,OpenFeign 項目中的編碼器以及解碼器主要實現包括:

序列化 需要額外添加的依賴 實現類
直接轉換成字符串,默認的編碼解碼器 feign.codec.Encoder.Defaultfeign.codec.Decoder.Default
gson feign-gson feign.gson.GsonEncoderfeign.gson.GsonDecoder
xml feign-jaxb feign.jaxb.JAXBEncoderfeign.jaxb.JAXBDecoder
json (jackson) feign-jackson feign.jackson.JacksonEncoderfeign.jackson.JacksonDecoder

我們在 Spring Cloud 環境中使用的時候,在 Spring MVC 中是有統一的編碼器以及解碼器的,即 HttpMessageConverters,並且通過膠水項目做了兼容,所以我們統一用 HttpMessageConverters 指定自定義編碼解碼器就好。

請求攔截器 RequestInterceptor

RequestInterceptor 的接口定義:

public interface RequestInterceptor {
  void apply(RequestTemplate template);
}

可以從接口看出,RequestInterceptor 其實就是對於 RequestTemplate 進行額外的操作。對於每次請求,都會經過所有的 RequestInterceptor 處理。

舉個例子,我們可以對於每個請求加上特定的 Header:

interface HttpBin {
    //發到這個鏈接的所有請求,響應會返回請求中的所有元素
    @RequestLine("GET /anything")
    String anything();
}

static class AddHeaderRequestInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate template) {
        //添加 header
        template.header("test-header", "test-value");
    }
}

public static void main(String[] args) {
    HttpBin httpBin = Feign.builder()
            .requestInterceptor(new AddHeaderRequestInterceptor())
            .target(HttpBin.class, "//www.httpbin.org");
    String s = httpBin.anything();
}

執行程序,可以在響應中看到我們發送請求中添加的 header。

Http 請求客戶端 Client

OpenFeign 底層的 Http 請求客戶端是可以自定義的,OpenFeign 針對不同的 Http 客戶端都有封裝,默認的是通過 Java 內置的 Http 請求 API。我們來看下 Client 的接口定義源碼:

public interface Client {
  /**
   * 執行請求
   * @param request HTTP 請求
   * @param options 配置選項
   * @return
   * @throws IOException
   */
  Response execute(Request request, Options options) throws IOException;
}

Request 是 feign 中對於 Http 請求的定義,Client 的實現需要將 Request 轉換成對應底層的 Http 客戶端的請求並調用合適的方法進行請求。Options 是一些請求通用配置,包括:

public static class Options {
    //tcp 建立連接超時
    private final long connectTimeout;
    //tcp 建立連接超時時間單位
    private final TimeUnit connectTimeoutUnit;
    //請求讀取響應超時
    private final long readTimeout;
    //請求讀取響應超時時間單位
    private final TimeUnit readTimeoutUnit;
    //是否跟隨重定向
    private final boolean followRedirects;
}

目前,Client 的實現包括以下這些:

底層 HTTP 客戶端 需要添加的依賴 實現類
Java HttpURLConnection feign.Client.Default
Java 11 HttpClient feign-java11 feign.http2client.Http2Client
Apache HttpClient feign-httpclient feign.httpclient.ApacheHttpClient
Apache HttpClient 5 feign-hc5 feign.hc5.ApacheHttp5Client
Google HTTP Client feign-googlehttpclient feign.googlehttpclient.GoogleHttpClient
Google HTTP Client feign-googlehttpclient feign.googlehttpclient.GoogleHttpClient
jaxRS feign-jaxrs2 feign.jaxrs2.JAXRSClient
OkHttp feign-okhttp feign.okhttp.OkHttpClient
Ribbon feign-ribbon feign.ribbon.RibbonClient

錯誤解碼器相關

可以指定錯誤解碼器 ErrorDecoder,同時還可以指定異常拋出策略 ExceptionPropagationPolicy.

ErrorDecoder 是讀取 HTTP 響應判斷是否有錯誤需要拋出異常使用的:

public interface ErrorDecoder {
    public Exception decode(String methodKey, Response response);
}

只有響應碼不為 2xx 的時候,才會調用配置的 ErrorDecoderdecode 方法。默認的 ErrorDecoder 的實現是:

public static class Default implements ErrorDecoder {

    @Override
    public Exception decode(String methodKey, Response response) {
      //將不同響應碼包裝成不同的異常
      FeignException exception = errorStatus(methodKey, response);
      //提取 Retry-After 這個 HTTP 響應頭,如果存在這個響應頭則將異常封裝為 RetryableException
      //對於 RetryableException,在後面的分析我們會知道如果拋出這個異常會觸發重試器的重試
      Date retryAfter = retryAfterDecoder.apply(firstOrNull(response.headers(), RETRY_AFTER));
      if (retryAfter != null) {
        return new RetryableException(
            response.status(),
            exception.getMessage(),
            response.request().httpMethod(),
            exception,
            retryAfter,
            response.request());
      }
      return exception;
    }
  }

可以看出, ErrorDecoder 是可能給異常封裝一層異常的,這有時候對於我們在外層捕捉會造成影響,所以可以通過指定 ExceptionPropagationPolicy 來拆開這層封裝。ExceptionPropagationPolicy 是一個枚舉類:

public enum ExceptionPropagationPolicy {
  //什麼都不做
  NONE, 
  //是否將 RetryableException 的原始 exception 提取出來作為異常拋出
  //目前只針對 RetryableException 生效,調用 exception 的 getCause,如果不為空就返回這個 cause,否則返回原始 exception
  UNWRAP,
  ;
}

接下來看個例子:

interface TestHttpBin {
    //請求一定會返回 500
    @RequestLine("GET /status/500")
    Object get();
}

static class TestErrorDecoder implements ErrorDecoder {
    @Override
    public Exception decode(String methodKey, Response response) {
        //獲取錯誤碼對應的 FeignException
        FeignException exception = errorStatus(methodKey, response);
        //封裝為 RetryableException
        return new RetryableException(
                response.status(),
                exception.getMessage(),
                response.request().httpMethod(),
                exception,
                new Date(),
                response.request());
    }
}

public static void main(String[] args) {
    TestHttpBin httpBin = Feign.builder()
            .errorDecoder(new TestErrorDecoder())
            //如果這裡沒有指定為 UNWRAP 那麼下面拋出的異常就是 RetryableException,否則就是 RetryableException 的 cause 也就是 FeignException
            .exceptionPropagationPolicy(ExceptionPropagationPolicy.UNWRAP)
            .target(TestHttpBin.class, "//httpbin.org");
    httpBin.get();
}

執行後可以發現拋出了 feign.FeignException$InternalServerError: [500 INTERNAL SERVER ERROR] during [GET] to [//httpbin.org/status/500] [TestHttpBin#get()]: [] 這個異常。

針對 RetryableException 的重試器 Retryer

在調用發生異常的時候,我們可能希望按照一定策略進行重試,抽象這種重試策略一般包括:

  • 對於哪些異常會重試
  • 什麼時候重試,什麼時候結束重試,例如重試 n 次以後

對於那些異常會重試,這個由 ErrorDecoder 決定。如果異常需要被重試,就把它封裝成 RetryableException,這樣 Feign 就會使用 Retryer 進行重試。對於什麼時候重試,什麼時候結束重試,這些就是 Retryer 需要考慮的事情:

public interface Retryer extends Cloneable {
  /**
    * 判斷繼續重試,或者拋出異常結束重試
    */
  void continueOrPropagate(RetryableException e);
  /**
    * 對於每次請求,都會調用這個方法創建一個新的同樣配置的 Retryer 對象
    */
  Retryer clone();
}

我們來看一下 Retryer 的默認實現:

class Default implements Retryer {
	//最大重試次數
    private final int maxAttempts;
	//初始重試間隔
    private final long period;
	//最大重試間隔
    private final long maxPeriod;
	//當前重試次數
    int attempt;
	//當前已經等待的重試間隔時間和
    long sleptForMillis;

    public Default() {
	  //默認配置,初始重試間隔為 100ms,最大重試間隔為 1s,最大重試次數為 5
      this(100, SECONDS.toMillis(1), 5);
    }

    public Default(long period, long maxPeriod, int maxAttempts) {
      this.period = period;
      this.maxPeriod = maxPeriod;
      this.maxAttempts = maxAttempts;
	  //當前重試次數從 1 開始,因為第一次進入 continueOrPropagate 之前就已經發生調用但是失敗了並拋出了 RetryableException
      this.attempt = 1;
    }

    // visible for testing;
    protected long currentTimeMillis() {
      return System.currentTimeMillis();
    }

    public void continueOrPropagate(RetryableException e) {
	  //如果當前重試次數大於最大重試次數則
      if (attempt++ >= maxAttempts) {
        throw e;
      }

      long interval;
	  //如果指定了 retry-after,則以這個 header 為準決定等待時間
      if (e.retryAfter() != null) {
        interval = e.retryAfter().getTime() - currentTimeMillis();
        if (interval > maxPeriod) {
          interval = maxPeriod;
        }
        if (interval < 0) {
          return;
        }
      } else {
		//否則,通過 nextMaxInterval 計算
        interval = nextMaxInterval();
      }
      try {
        Thread.sleep(interval);
      } catch (InterruptedException ignored) {
        Thread.currentThread().interrupt();
        throw e;
      }
	  //記錄一共等待的時間
      sleptForMillis += interval;
    }

	//每次重試間隔增長 50%,直到最大重試間隔
    long nextMaxInterval() {
      long interval = (long) (period * Math.pow(1.5, attempt - 1));
      return interval > maxPeriod ? maxPeriod : interval;
    }

    @Override
    public Retryer clone() {
	  //複製配置
      return new Default(period, maxPeriod, maxAttempts);
    }
}

默認的 Retryer 功能也比較豐富,用戶可以參考這個實現更適合自己業務場景的重試器。

每個 HTTP 請求的配置 Options

無論是哪種 HTTP 客戶端,都需要如下幾個配置:

  • 連接超時:這個是 TCP 連接建立超時時間
  • 讀取超時:這個是收到 HTTP 響應之前的超時時間
  • 是否跟隨重定向
    OpenFeign 可以通過 Options 進行配置:
public static class Options {
    private final long connectTimeout;
    private final TimeUnit connectTimeoutUnit;
    private final long readTimeout;
    private final TimeUnit readTimeoutUnit;
    private final boolean followRedirects;
}

例如我們可以這麼配置一個連接超時為 500ms,讀取超時為 6s,跟隨重定向的 Feign:

Feign.builder().options(new Request.Options(
    500, TimeUnit.MILLISECONDS, 6, TimeUnit.SECONDS, true
))

我們這一節詳細介紹了 OpenFeign 的各個組件,有了這些知識,其實我們自己就能實現 Spring-Cloud-OpenFeign 裏面的膠水代碼。其實 Spring-Cloud-OpenFeign 就是將這些組件以 Bean 的形式註冊到 NamedContextFactory 中,供不同微服務進行不同的配置。

微信搜索「我的編程喵」關注公眾號,每日一刷,輕鬆提升技術,斬獲各種offer