spring-cloud-gateway过滤器实践

概述

这里是 SpringCloud Gateway 实践的第一篇,主要讲过滤器的相关实现。Spring-Cloud-Gateway 是以 WebFlux 为基础的响应式架构设计, 是异步非阻塞式的,它能够充分利用多核 CPU 的硬件资源去处理大量的并发请求。

本篇将基于 spring-cloud-gateway 简介 基础环境进行改造。

工作原理

Spring-Cloud-Gateway 基于过滤器实现,同 zuul 类似,有prepost两种方式的 filter,分别处理前置逻辑后置逻辑。客户端的请求先经过pre类型的 filter,然后将请求转发到具体的业务服务,收到业务服务的响应之后,再经过post类型的 filter 处理,最后返回响应到客户端。

过滤器执行流程如下,order 越大,优先级越低

接下来我们来验证下 filter 执行顺序。

这里创建 3 个过滤器,分别配置不同的优先级

@Slf4j  public class AFilter implements GlobalFilter {      @Override      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {          log.info("AFilter前置逻辑");          return chain.filter(exchange).then(Mono.fromRunnable(() -> {              log.info("AFilter后置逻辑");          }));      }  }    @Slf4j  public class BFilter implements GlobalFilter {      @Override      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {          log.info("BFilter前置逻辑");          return chain.filter(exchange).then(Mono.fromRunnable(() -> {              log.info("BFilter后置逻辑");          }));      }  }    @Slf4j  public class CFilter implements GlobalFilter {        @Override      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {          log.info("CFilter前置逻辑");          return chain.filter(exchange).then(Mono.fromRunnable(() -> {              log.info("CFilter后置逻辑");          }));      }  }    @Configuration  public class FilterConfig {        @Bean      @Order(-1)      public GlobalFilter a() {          return new AFilter();      }        @Bean      @Order(0)      public GlobalFilter b() {          return new BFilter();      }        @Bean      @Order(1)      public GlobalFilter c() {          return new CFilter();      }  }  
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1    curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1  

查看网关输出日志

2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter       : AFilter前置逻辑  2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter       : BFilter前置逻辑  2020-03-29 16:23:22.832  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter       : CFilter前置逻辑    2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter       : CFilter后置逻辑  2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter       : BFilter后置逻辑  2020-03-29 16:23:22.836  INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter       : AFilter后置逻辑  

自定义过滤器

现在假设我们要统计某个服务的响应时间,我们可以在代码中

long beginTime = System.currentTimeMillis();  // do something...  long elapsed = System.currentTimeMillis() - beginTime;  log.info("elapsed: {}ms", elapsed);  

每次都要这么写是不是很烦?Spring 告诉我们有个东西叫 AOP。但是我们是微服务啊,在每个服务里都写也很烦。这时候就该网关的过滤器登台表演了。

自定义过滤器需要实现 GatewayFilterOrdered 。其中 GatewayFilter 中的这个方法就是用来实现你的自定义的逻辑的

Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);  

Ordered 中的 int getOrder() 方法是来给过滤器设定优先级别的,值越大则优先级越低。

好了,让我们来撸代码吧.

/**   * 此过滤器功能为计算请求完成时间   */  public class ElapsedFilter implements GatewayFilter, Ordered {        private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";        @Override      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {          exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());          return chain.filter(exchange).then(                  Mono.fromRunnable(() -> {                      Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);                      if (startTime != null) {                          System.out.println(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms");                      }                  })          );      }        /*       *过滤器存在优先级,order越大,优先级越低       */      @Override      public int getOrder() {          return Ordered.LOWEST_PRECEDENCE;      }  }    

我们在请求刚刚到达时,往 ServerWebExchange 中放入了一个属性 elapsedTimeBegin,属性值为当时的毫秒级时间戳。然后在请求执行结束后,又从中取出我们之前放进去的那个时间戳,与当前时间的差值即为该请求的耗时。因为这是与业务无关的日志所以将 Ordered 设为 Integer.MAX_VALUE 以降低优先级。

现在再来看我们之前的问题:怎么来区分是 “pre” 还是 “post” 呢?其实就是 chain.filter(exchange) 之前的就是 “pre” 部分,之后的也就是 then 里边的是 “post” 部分。

创建好 Filter 之后我们将它添加到我们的 Filter Chain 里边

@Configuration  public class FilterConfig {          /**       * http://localhost:8100/filter/provider       * @param builder       * @return       */      @Bean      public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {          // @formatter:off          // 可以对比application.yml中关于路由转发的配置          return builder.routes()                  .route(r -> r.path("/filter/**")                          .filters(f -> f.stripPrefix(1)                                  .filter(new ElapsedFilter()))                          .uri("lb://idc-cloud-provider")                          .order(0)                          .id("filter")                  )                  .build();          // @formatter:on      }    }  

基于全局过滤器实现审计功能

// AdaptCachedBodyGlobalFilter    @Component  public class LogFilter implements GlobalFilter, Ordered {        private Logger log = LoggerFactory.getLogger(LogFilter.class);        private final ObjectMapper objectMapper = new ObjectMapper();      private static final String START_TIME = "startTime";      private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();        @Override      public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {            ServerHttpRequest request = exchange.getRequest();          // 请求路径          String path = request.getPath().pathWithinApplication().value();          // 请求schema: http/https          String scheme = request.getURI().getScheme();          // 请求方法          HttpMethod method = request.getMethod();          // 路由服务地址          URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);          // 请求头          HttpHeaders headers = request.getHeaders();          // 设置startTime          exchange.getAttributes().put(START_TIME, System.currentTimeMillis());          // 获取请求地址          InetSocketAddress remoteAddress = request.getRemoteAddress();              MultiValueMap<String, String> formData = null;                AccessRecord accessRecord = new AccessRecord();          accessRecord.setPath(path);          accessRecord.setSchema(scheme);          accessRecord.setMethod(method.name());          accessRecord.setTargetUri(targetUri.toString());          accessRecord.setRemoteAddress(remoteAddress.toString());          accessRecord.setHeaders(headers);            if (method == HttpMethod.GET) {              formData = request.getQueryParams();              accessRecord.setFormData(formData);              writeAccessRecord(accessRecord);          }            if (method == HttpMethod.POST) {              Mono<Void> voidMono = null;              if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) {                  // JSON                  voidMono = readBody(exchange, chain, accessRecord);              }                if (headers.getContentType().equals(MediaType.APPLICATION_FORM_URLENCODED)) {                  // x-www-form-urlencoded                  voidMono = readFormData(exchange, chain, accessRecord);              }                if (voidMono != null) {                  return voidMono;              }            }            return chain.filter(exchange);      }        private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {          return null;      }        private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {            return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {                byte[] bytes = new byte[dataBuffer.readableByteCount()];              dataBuffer.read(bytes);              DataBufferUtils.release(dataBuffer);              Flux<DataBuffer> cachedFlux = Flux.defer(() -> {                  DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);                  DataBufferUtils.retain(buffer);                  return Mono.just(buffer);              });                  // 重写请求体,因为请求体数据只能被消费一次              ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {                  @Override                  public Flux<DataBuffer> getBody() {                      return cachedFlux;                  }              };                ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();                return ServerRequest.create(mutatedExchange, messageReaders)                      .bodyToMono(String.class)                      .doOnNext(objectValue -> {                          accessRecord.setBody(objectValue);                          writeAccessRecord(accessRecord);                      }).then(chain.filter(mutatedExchange));          });      }        @Override      public int getOrder() {          return Ordered.LOWEST_PRECEDENCE;      }        /**       * TODO 异步日志       * @param accessRecord       */      private void writeAccessRecord(AccessRecord accessRecord) {            log.info("nn start------------------------------------------------- n " +                          "请求路径:{}n " +                          "scheme:{}n " +                          "请求方法:{}n " +                          "目标服务:{}n " +                          "请求头:{}n " +                          "远程IP地址:{}n " +                          "表单参数:{}n " +                          "请求体:{}n " +                          "end------------------------------------------------- n ",                  accessRecord.getPath(), accessRecord.getSchema(), accessRecord.getMethod(), accessRecord.getTargetUri(), accessRecord.getHeaders(), accessRecord.getRemoteAddress(), accessRecord.getFormData(), accessRecord.getBody());      }  }  
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1    curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1  

输出结果

 start-------------------------------------------------   请求路径:/provider1   scheme:http   请求方法:POST   目标服务:http://192.168.124.5:2001/provider1   请求头:[Content-Type:"application/json", User-Agent:"PostmanRuntime/7.22.0", Accept:"*/*", Cache-Control:"no-cache", Postman-Token:"2a4ce04d-8449-411d-abd8-247d20421dc2", Host:"192.168.124.5:2000", Accept-Encoding:"gzip, deflate, br", Content-Length:"16", Connection:"keep-alive"]   远程IP地址:/192.168.124.5:49969   表单参数:null   请求体:{"name":"admin"}   end-------------------------------------------------  

接下来,我们来配置日志,方便日志系统提取日志。SpringBoot 默认的日志为 logback。

<?xml version="1.0" encoding="UTF-8"?>  <configuration>        <property name="LOGS" value="/Users/cuishiying/Documents/spring-cloud-learning/logs" />        <appender name="Console" class="ch.qos.logback.core.ConsoleAppender">          <layout class="ch.qos.logback.classic.PatternLayout">              <Pattern>                  %black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg%n%throwable              </Pattern>          </layout>      </appender>        <appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">          <file>${LOGS}/spring-boot-logger.log</file>          <encoder                  class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">              <Pattern>%d %p %C{1.} [%t] %m%n</Pattern>          </encoder>            <rollingPolicy                  class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">              <!-- rollover daily and when the file reaches 10 MegaBytes -->              <fileNamePattern>${LOGS}/archived/spring-boot-logger-%d{yyyy-MM-dd}.%i.log              </fileNamePattern>              <timeBasedFileNamingAndTriggeringPolicy                      class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">                  <maxFileSize>10MB</maxFileSize>              </timeBasedFileNamingAndTriggeringPolicy>          </rollingPolicy>      </appender>        <!-- LOG everything at INFO level -->      <root level="info">          <!--<appender-ref ref="RollingFile" />-->          <appender-ref ref="Console" />      </root>        <!-- LOG "cn.idea360*" at TRACE level additivity:是否向上级loger传递打印信息。默认是true-->      <logger name="cn.idea360.gateway" level="info" additivity="false">          <appender-ref ref="RollingFile" />          <appender-ref ref="Console" />      </logger>    </configuration>  

这样 console 和日志目录下就都有日志了。

自定义过滤器工厂

如果你看过静态路由的配置,你应该对如下配置有印象。

filters:    - StripPrefix=1    - AddResponseHeader=X-Response-Default-Foo, Default-Bar  

StripPrefixAddResponseHeader 这两个实际上是两个过滤器工厂(GatewayFilterFactory),用这种配置的方式更灵活方便。

我们就将之前的那个 ElapsedFilter 改造一下,让它能接收一个 boolean 类型的参数,来决定是否将请求参数也打印出来。

public class ElapsedGatewayFilterFactory extends AbstractGatewayFilterFactory<ElapsedGatewayFilterFactory.Config> {        private static final Log log = LogFactory.getLog(GatewayFilter.class);      private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";      private static final String KEY = "withParams";          public List<String> shortcutFieldOrder() {          return Arrays.asList(KEY);      }        public ElapsedGatewayFilterFactory() {          super(Config.class);      }          public GatewayFilter apply(Config config) {          return (exchange, chain) -> {              exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());              return chain.filter(exchange).then(                      Mono.fromRunnable(() -> {                          Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);                          if (startTime != null) {                              StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath())                                      .append(": ")                                      .append(System.currentTimeMillis() - startTime)                                      .append("ms");                              if (config.isWithParams()) {                                  sb.append(" params:").append(exchange.getRequest().getQueryParams());                              }                              log.info(sb.toString());                          }                      })              );          };      }          public static class Config {            private boolean withParams;            public boolean isWithParams() {              return withParams;          }            public void setWithParams(boolean withParams) {              this.withParams = withParams;          }        }  }  

过滤器工厂的顶级接口是 GatewayFilterFactory,我们可以直接继承它的两个抽象类来简化开发 AbstractGatewayFilterFactoryAbstractNameValueGatewayFilterFactory,这两个抽象类的区别就是前者接收一个参数(像 StripPrefix 和我们创建的这种),后者接收两个参数(像 AddResponseHeader)。

GatewayFilter apply(Config config) 方法内部实际上是创建了一个 GatewayFilter 的匿名类,具体实现和之前的几乎一样,就不解释了。

静态内部类 Config 就是为了接收那个 boolean 类型的参数服务的,里边的变量名可以随意写,但是要重写 List shortcutFieldOrder() 这个方法。

这里注意一下,一定要调用一下父类的构造器把 Config 类型传过去,否则会报 ClassCastException

public ElapsedGatewayFilterFactory() {      super(Config.class);  }  

工厂类我们有了,再把它注册到 Spring 当中

@Bean  public ElapsedGatewayFilterFactory elapsedGatewayFilterFactory() {      return new ElapsedGatewayFilterFactory();  }  

然后添加配置(主要改动在 default-filters 配置)

server:    port: 2000  spring:    application:      name: idc-gateway    redis:      host: localhost      port: 6379      timeout: 6000ms  # 连接超时时长(毫秒)      jedis:        pool:          max-active: 1000  # 连接池最大连接数(使用负值表示没有限制)          max-wait: -1ms      # 连接池最大阻塞等待时间(使用负值表示没有限制)          max-idle: 10      # 连接池中的最大空闲连接          min-idle: 5       # 连接池中的最小空闲连接    cloud:      consul:        host: localhost        port: 8500      gateway:        discovery:          locator:            enabled: true            # 修改在这里。gateway可以通过开启以下配置来打开根据服务的serviceId来匹配路由,默认是大写        default-filters:          - Elapsed=true        routes:          - id: provider  # 路由 ID,保持唯一            uri: lb://idc-provider1 # uri指目标服务地址,lb代表从注册中心获取服务            predicates: # 路由条件。Predicate 接受一个输入参数,返回一个布尔值结果。该接口包含多种默认方法来将 Predicate 组合成其他复杂的逻辑(比如:与,或,非)              - Path=/p/**            filters:              - StripPrefix=1 # 过滤器StripPrefix,作用是去掉请求路径的最前面n个部分截取掉。StripPrefix=1就代表截取路径的个数为1,比如前端过来请求/test/good/1/view,匹配成功后,路由到后端的请求路径就会变成http://localhost:8888/good/1/view  

结语

本文到此结束。关于 Webflux 的学习刚入门,觉得可以像 Rxjava 那样在 onNext 中拿到异步数据,然而在 post 获取 body 中没生效。经测试可知 getBody 获得的数据输出为 null,而自己通过 Flux.create 创建的数据可以在订阅者中获取到。此处还有待研究,希望抛砖引玉,大家有研究出来的不吝赐教。同时,希望大家关注公众号【当我遇上你】。

参考