spring-cloud-gateway過濾器實踐

概述

這裡是 SpringCloud Gateway 實踐的第一篇,主要講過濾器的相關實現。Spring-Cloud-Gateway 是以 WebFlux 為基礎的響應式架構設計, 是非同步非阻塞式的,它能夠充分利用多核 CPU 的硬體資源去處理大量的並發請求。

本篇將基於 spring-cloud-gateway 簡介 基礎環境進行改造。

工作原理

Spring-Cloud-Gateway 基於過濾器實現,同 zuul 類似,有prepost兩種方式的 filter,分別處理前置邏輯後置邏輯。客戶端的請求先經過pre類型的 filter,然後將請求轉發到具體的業務服務,收到業務服務的響應之後,再經過post類型的 filter 處理,最後返迴響應到客戶端。

過濾器執行流程如下,order 越大,優先順序越低

接下來我們來驗證下 filter 執行順序。

這裡創建 3 個過濾器,分別配置不同的優先順序

@Slf4j  public class AFilter implements GlobalFilter {      @Override      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {          log.info("AFilter前置邏輯");          return chain.filter(exchange).then(Mono.fromRunnable(() -> {              log.info("AFilter後置邏輯");          }));      }  }    @Slf4j  public class BFilter implements GlobalFilter {      @Override      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {          log.info("BFilter前置邏輯");          return chain.filter(exchange).then(Mono.fromRunnable(() -> {              log.info("BFilter後置邏輯");          }));      }  }    @Slf4j  public class CFilter implements GlobalFilter {        @Override      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {          log.info("CFilter前置邏輯");          return chain.filter(exchange).then(Mono.fromRunnable(() -> {              log.info("CFilter後置邏輯");          }));      }  }    @Configuration  public class FilterConfig {        @Bean      @Order(-1)      public GlobalFilter a() {          return new AFilter();      }        @Bean      @Order(0)      public GlobalFilter b() {          return new BFilter();      }        @Bean      @Order(1)      public GlobalFilter c() {          return new CFilter();      }  }  
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1    curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1  

查看網關輸出日誌

2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter       : AFilter前置邏輯  2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter       : BFilter前置邏輯  2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter       : CFilter前置邏輯    2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter       : CFilter後置邏輯  2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter       : BFilter後置邏輯  2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter       : AFilter後置邏輯  

自定義過濾器

現在假設我們要統計某個服務的響應時間,我們可以在程式碼中

long beginTime = System.currentTimeMillis();  // do something...  long elapsed = System.currentTimeMillis() - beginTime;  log.info("elapsed: {}ms", elapsed);  

每次都要這麼寫是不是很煩?Spring 告訴我們有個東西叫 AOP。但是我們是微服務啊,在每個服務里都寫也很煩。這時候就該網關的過濾器登台表演了。

自定義過濾器需要實現 GatewayFilterOrdered 。其中 GatewayFilter 中的這個方法就是用來實現你的自定義的邏輯的

Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);  

Ordered 中的 int getOrder() 方法是來給過濾器設定優先順序別的,值越大則優先順序越低。

好了,讓我們來擼程式碼吧.

/**   * 此過濾器功能為計算請求完成時間   */  public class ElapsedFilter implements GatewayFilter, Ordered {        private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";        @Override      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {          exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());          return chain.filter(exchange).then(                  Mono.fromRunnable(() -> {                      Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);                      if (startTime != null) {                          System.out.println(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms");                      }                  })          );      }        /*       *過濾器存在優先順序,order越大,優先順序越低       */      @Override      public int getOrder() {          return Ordered.LOWEST_PRECEDENCE;      }  }    

我們在請求剛剛到達時,往 ServerWebExchange 中放入了一個屬性 elapsedTimeBegin,屬性值為當時的毫秒級時間戳。然後在請求執行結束後,又從中取出我們之前放進去的那個時間戳,與當前時間的差值即為該請求的耗時。因為這是與業務無關的日誌所以將 Ordered 設為 Integer.MAX_VALUE 以降低優先順序。

現在再來看我們之前的問題:怎麼來區分是 「pre」 還是 「post」 呢?其實就是 chain.filter(exchange) 之前的就是 「pre」 部分,之後的也就是 then 裡邊的是 「post」 部分。

創建好 Filter 之後我們將它添加到我們的 Filter Chain 裡邊

@Configuration  public class FilterConfig {          /**       * http://localhost:8100/filter/provider       * @param builder       * @return       */      @Bean      public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {          // @formatter:off          // 可以對比application.yml中關於路由轉發的配置          return builder.routes()                  .route(r -> r.path("/filter/**")                          .filters(f -> f.stripPrefix(1)                                  .filter(new ElapsedFilter()))                          .uri("lb://idc-cloud-provider")                          .order(0)                          .id("filter")                  )                  .build();          // @formatter:on      }    }  

基於全局過濾器實現審計功能

// AdaptCachedBodyGlobalFilter    @Component  public class LogFilter implements GlobalFilter, Ordered {        private Logger log = LoggerFactory.getLogger(LogFilter.class);        private final ObjectMapper objectMapper = new ObjectMapper();      private static final String START_TIME = "startTime";      private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();        @Override      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {            ServerHttpRequest request = exchange.getRequest();          // 請求路徑          String path = request.getPath().pathWithinApplication().value();          // 請求schema: http/https          String scheme = request.getURI().getScheme();          // 請求方法          HttpMethod method = request.getMethod();          // 路由服務地址          URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);          // 請求頭          HttpHeaders headers = request.getHeaders();          // 設置startTime          exchange.getAttributes().put(START_TIME, System.currentTimeMillis());          // 獲取請求地址          InetSocketAddress remoteAddress = request.getRemoteAddress();              MultiValueMap<String, String> formData = null;                AccessRecord accessRecord = new AccessRecord();          accessRecord.setPath(path);          accessRecord.setSchema(scheme);          accessRecord.setMethod(method.name());          accessRecord.setTargetUri(targetUri.toString());          accessRecord.setRemoteAddress(remoteAddress.toString());          accessRecord.setHeaders(headers);            if (method == HttpMethod.GET) {              formData = request.getQueryParams();              accessRecord.setFormData(formData);              writeAccessRecord(accessRecord);          }            if (method == HttpMethod.POST) {              Mono<Void> voidMono = null;              if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) {                  // JSON                  voidMono = readBody(exchange, chain, accessRecord);              }                if (headers.getContentType().equals(MediaType.APPLICATION_FORM_URLENCODED)) {                  // x-www-form-urlencoded                  voidMono = readFormData(exchange, chain, accessRecord);              }                if (voidMono != null) {                  return voidMono;              }            }            return chain.filter(exchange);      }        private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {          return null;      }        private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {            return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {                byte[] bytes = new byte[dataBuffer.readableByteCount()];              dataBuffer.read(bytes);              DataBufferUtils.release(dataBuffer);              Flux<DataBuffer> cachedFlux = Flux.defer(() -> {                  DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);                  DataBufferUtils.retain(buffer);                  return Mono.just(buffer);              });                  // 重寫請求體,因為請求體數據只能被消費一次              ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {                  @Override                  public Flux<DataBuffer> getBody() {                      return cachedFlux;                  }              };                ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();                return ServerRequest.create(mutatedExchange, messageReaders)                      .bodyToMono(String.class)                      .doOnNext(objectValue -> {                          accessRecord.setBody(objectValue);                          writeAccessRecord(accessRecord);                      }).then(chain.filter(mutatedExchange));          });      }        @Override      public int getOrder() {          return Ordered.LOWEST_PRECEDENCE;      }        /**       * TODO 非同步日誌       * @param accessRecord       */      private void writeAccessRecord(AccessRecord accessRecord) {            log.info("nn start------------------------------------------------- n " +                          "請求路徑:{}n " +                          "scheme:{}n " +                          "請求方法:{}n " +                          "目標服務:{}n " +                          "請求頭:{}n " +                          "遠程IP地址:{}n " +                          "表單參數:{}n " +                          "請求體:{}n " +                          "end------------------------------------------------- n ",                  accessRecord.getPath(), accessRecord.getSchema(), accessRecord.getMethod(), accessRecord.getTargetUri(), accessRecord.getHeaders(), accessRecord.getRemoteAddress(), accessRecord.getFormData(), accessRecord.getBody());      }  }  
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1    curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1  

輸出結果

 start-------------------------------------------------   請求路徑:/provider1   scheme:http   請求方法:POST   目標服務:http://192.168.124.5:2001/provider1   請求頭:[Content-Type:"application/json", User-Agent:"PostmanRuntime/7.22.0", Accept:"*/*", Cache-Control:"no-cache", Postman-Token:"2a4ce04d-8449-411d-abd8-247d20421dc2", Host:"192.168.124.5:2000", Accept-Encoding:"gzip, deflate, br", Content-Length:"16", Connection:"keep-alive"]   遠程IP地址:/192.168.124.5:49969   表單參數:null   請求體:{"name":"admin"}   end-------------------------------------------------  

接下來,我們來配置日誌,方便日誌系統提取日誌。SpringBoot 默認的日誌為 logback。

<?xml version="1.0" encoding="UTF-8"?>  <configuration>        <property name="LOGS" value="/Users/cuishiying/Documents/spring-cloud-learning/logs" />        <appender name="Console" class="ch.qos.logback.core.ConsoleAppender">          <layout class="ch.qos.logback.classic.PatternLayout">              <Pattern>                  %black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg%n%throwable              </Pattern>          </layout>      </appender>        <appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">          <file>${LOGS}/spring-boot-logger.log</file>          <encoder                  class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">              <Pattern>%d %p %C{1.} [%t] %m%n</Pattern>          </encoder>            <rollingPolicy                  class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">              <!-- rollover daily and when the file reaches 10 MegaBytes -->              <fileNamePattern>${LOGS}/archived/spring-boot-logger-%d{yyyy-MM-dd}.%i.log              </fileNamePattern>              <timeBasedFileNamingAndTriggeringPolicy                      class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">                  <maxFileSize>10MB</maxFileSize>              </timeBasedFileNamingAndTriggeringPolicy>          </rollingPolicy>      </appender>        <!-- LOG everything at INFO level -->      <root level="info">          <!--<appender-ref ref="RollingFile" />-->          <appender-ref ref="Console" />      </root>        <!-- LOG "cn.idea360*" at TRACE level additivity:是否向上級loger傳遞列印資訊。默認是true-->      <logger name="cn.idea360.gateway" level="info" additivity="false">          <appender-ref ref="RollingFile" />          <appender-ref ref="Console" />      </logger>    </configuration>  

這樣 console 和日誌目錄下就都有日誌了。

自定義過濾器工廠

如果你看過靜態路由的配置,你應該對如下配置有印象。

filters:    - StripPrefix=1    - AddResponseHeader=X-Response-Default-Foo, Default-Bar  

StripPrefixAddResponseHeader 這兩個實際上是兩個過濾器工廠(GatewayFilterFactory),用這種配置的方式更靈活方便。

我們就將之前的那個 ElapsedFilter 改造一下,讓它能接收一個 boolean 類型的參數,來決定是否將請求參數也列印出來。

public class ElapsedGatewayFilterFactory extends AbstractGatewayFilterFactory<ElapsedGatewayFilterFactory.Config> {        private static final Log log = LogFactory.getLog(GatewayFilter.class);      private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";      private static final String KEY = "withParams";          public List<String> shortcutFieldOrder() {          return Arrays.asList(KEY);      }        public ElapsedGatewayFilterFactory() {          super(Config.class);      }          public GatewayFilter apply(Config config) {          return (exchange, chain) -> {              exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());              return chain.filter(exchange).then(                      Mono.fromRunnable(() -> {                          Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);                          if (startTime != null) {                              StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath())                                      .append(": ")                                      .append(System.currentTimeMillis() - startTime)                                      .append("ms");                              if (config.isWithParams()) {                                  sb.append(" params:").append(exchange.getRequest().getQueryParams());                              }                              log.info(sb.toString());                          }                      })              );          };      }          public static class Config {            private boolean withParams;            public boolean isWithParams() {              return withParams;          }            public void setWithParams(boolean withParams) {              this.withParams = withParams;          }        }  }  

過濾器工廠的頂級介面是 GatewayFilterFactory,我們可以直接繼承它的兩個抽象類來簡化開發 AbstractGatewayFilterFactoryAbstractNameValueGatewayFilterFactory,這兩個抽象類的區別就是前者接收一個參數(像 StripPrefix 和我們創建的這種),後者接收兩個參數(像 AddResponseHeader)。

GatewayFilter apply(Config config) 方法內部實際上是創建了一個 GatewayFilter 的匿名類,具體實現和之前的幾乎一樣,就不解釋了。

靜態內部類 Config 就是為了接收那個 boolean 類型的參數服務的,裡邊的變數名可以隨意寫,但是要重寫 List shortcutFieldOrder() 這個方法。

這裡注意一下,一定要調用一下父類的構造器把 Config 類型傳過去,否則會報 ClassCastException

public ElapsedGatewayFilterFactory() {      super(Config.class);  }  

工廠類我們有了,再把它註冊到 Spring 當中

@Bean  public ElapsedGatewayFilterFactory elapsedGatewayFilterFactory() {      return new ElapsedGatewayFilterFactory();  }  

然後添加配置(主要改動在 default-filters 配置)

server:    port: 2000  spring:    application:      name: idc-gateway    redis:      host: localhost      port: 6379      timeout: 6000ms  # 連接超時時長(毫秒)      jedis:        pool:          max-active: 1000  # 連接池最大連接數(使用負值表示沒有限制)          max-wait: -1ms      # 連接池最大阻塞等待時間(使用負值表示沒有限制)          max-idle: 10      # 連接池中的最大空閑連接          min-idle: 5       # 連接池中的最小空閑連接    cloud:      consul:        host: localhost        port: 8500      gateway:        discovery:          locator:            enabled: true            # 修改在這裡。gateway可以通過開啟以下配置來打開根據服務的serviceId來匹配路由,默認是大寫        default-filters:          - Elapsed=true        routes:          - id: provider  # 路由 ID,保持唯一            uri: lb://idc-provider1 # uri指目標服務地址,lb代表從註冊中心獲取服務            predicates: # 路由條件。Predicate 接受一個輸入參數,返回一個布爾值結果。該介面包含多種默認方法來將 Predicate 組合成其他複雜的邏輯(比如:與,或,非)              - Path=/p/**            filters:              - StripPrefix=1 # 過濾器StripPrefix,作用是去掉請求路徑的最前面n個部分截取掉。StripPrefix=1就代表截取路徑的個數為1,比如前端過來請求/test/good/1/view,匹配成功後,路由到後端的請求路徑就會變成http://localhost:8888/good/1/view  

結語

本文到此結束。關於 Webflux 的學習剛入門,覺得可以像 Rxjava 那樣在 onNext 中拿到非同步數據,然而在 post 獲取 body 中沒生效。經測試可知 getBody 獲得的數據輸出為 null,而自己通過 Flux.create 創建的數據可以在訂閱者中獲取到。此處還有待研究,希望拋磚引玉,大家有研究出來的不吝賜教。同時,希望大家關注公眾號【當我遇上你】。

參考