您当前的位置: 首页 >  gateway

Charge8

暂无认证

  • 9浏览

    0关注

    447博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Gateway基于Redis+Lua脚本限流

Charge8 发布时间:2022-10-03 16:22:59 ,浏览量:9

一、Gateway基于Redis+Lua脚本限流使用

在 Spring Cloud Gateway中,限流作为网关最基本的功能,官方提供了 RequestRateLimiterGatewayFilterFactory过滤器工厂,基于 Redis+Lua脚本方式采用令牌桶算法实现了限流。

官方文档:https://docs.spring.io/spring-cloud-gateway/docs/current/reference/html/#the-requestratelimiter-gatewayfilter-factory

1、引入依赖

在Gateway服务中,添加 redis-reactive依赖。

    
    
      org.springframework.boot
      spring-boot-starter-data-redis-reactive
    
    
      org.apache.commons
      commons-pool2
    
2、添加配置

在配置文件中,添加 Redis配置信息,这里针对 app-user服务添加限流策略。

server:
  port: 18088
spring:
  application:
    name: app-gateway

  #配置redis地址
  redis:
    host: 192.168.xxx.xxx
    port: 6379
    database: 0
    timeout: 5000
    lettuce:
      pool:
        max-active: 200
        max-wait: 10000
        max-idle: 100
        min-idle: 10
  #配置 nacos注册中心
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.xxx.xxx:8848

    #配置 gateway网关
    gateway:
      #设置路由:路由id、路由到微服务的uri、断言
      routes:
        # app-order服务路由配置
        - id: app-order  #路由ID,全局唯一,建议配置服务名。
          uri: lb://app-order  #lb 整合负载均衡器ribbon,loadbalancer
          predicates:
            - Path=/order/**   # 断言,路径相匹配的进行路由
        # app-user服务路由配置
        - id: app-user
          uri: lb://app-user
          predicates:
            - Path=/app-api/app-user/user/**
          filters:
            # 去除url前缀:1代表去除第一个路径
            - StripPrefix=1
            - AddRequestHeader=app-Request-arg, myHeaderValue #添加请求头
            - AddRequestParameter=app-RequestParam-arg, zhaoZilong # 添加请求参数
            - name: RequestRateLimiter #限流过滤器
              args:
                redis-rate-limiter.replenishRate: 1  #令牌桶每秒填充速率
                redis-rate-limiter.burstCapacity: 2  #令牌桶的总容量
                key-resolver: "#{@pathKeyResolver}" #使用SpEL表达式,从Spring容器中获取Bean对象(pathKeyResolver)
3、配置keyResolver

key-resolver: “#{@pathKeyResolver}” 。可以指定限流策略,比如:path限流,请求参数限流,IP限流等。

import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Mono;

/**
 * 请求限流配置类。可以实现基于三个维度的限流
 */
@Configuration
@Slf4j
public class AppRequestRateLimiterConfig {

    /**
     * 基于请求url的维度限流,超出请求将返回429
     * @return
     */
    @Bean
    public KeyResolver pathKeyResolver() {
        log.info("---------基于请求url限流----------");
        //path限流
        return exchange -> Mono.just(exchange.getRequest().getURI().getPath());
    }

    /**
     * 基于用户参数的维度限流
     * @return
     */
    //@Bean
    //public KeyResolver userKeyResolver() {
    //    // user参数限流
    //    return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
    //}

    /**
     * 基于IP的维度限流
     * @return
     */
    //@Bean
    //public KeyResolver ipKeyResolver() {
    //    //IP限流
    //    return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
    //}

}
4、测试

启动项目。根据限流策略访问。

  • url限流:http://localhost:18088/app-api/app-user/user/findOrderByUserId/1
  • user参数限流:http://localhost:18088/app-api/app-user/user/findOrderByUserId/1?user=zhaoyun
  • IP限流:http://localhost:18088/app-api/app-user/user/findOrderByUserId/1

上面我配置的是每秒产生的令牌数量是1,当请求速度大于这个数值时,就会返回429的状态码。

到此,在Gateway中的限流就实现了。

二、源码了解

Gateway基于Redis+Lua脚本限流,具体实现逻辑在 RequestRateLimiterGatewayFilterFactory类中。

1、RequestRateLimiterGatewayFilterFactory类

具体代码如下:

@ConfigurationProperties("spring.cloud.gateway.filter.request-rate-limiter")
public class RequestRateLimiterGatewayFilterFactory extends AbstractGatewayFilterFactory {
    public static final String KEY_RESOLVER_KEY = "keyResolver";
    private static final String EMPTY_KEY = "____EMPTY_KEY__";
    private final RateLimiter defaultRateLimiter;
    private final KeyResolver defaultKeyResolver;
    private boolean denyEmptyKey = true;
    private String emptyKeyStatusCode;

    public RequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
        super(RequestRateLimiterGatewayFilterFactory.Config.class);
        this.emptyKeyStatusCode = HttpStatus.FORBIDDEN.name();
        this.defaultRateLimiter = defaultRateLimiter;
        this.defaultKeyResolver = defaultKeyResolver;
    }

    public KeyResolver getDefaultKeyResolver() {
        return this.defaultKeyResolver;
    }

    public RateLimiter getDefaultRateLimiter() {
        return this.defaultRateLimiter;
    }

    public boolean isDenyEmptyKey() {
        return this.denyEmptyKey;
    }

    public void setDenyEmptyKey(boolean denyEmptyKey) {
        this.denyEmptyKey = denyEmptyKey;
    }

    public String getEmptyKeyStatusCode() {
        return this.emptyKeyStatusCode;
    }

    public void setEmptyKeyStatusCode(String emptyKeyStatusCode) {
        this.emptyKeyStatusCode = emptyKeyStatusCode;
    }

    public GatewayFilter apply(RequestRateLimiterGatewayFilterFactory.Config config) {
        KeyResolver resolver = (KeyResolver)this.getOrDefault(config.keyResolver, this.defaultKeyResolver);
        RateLimiter limiter = (RateLimiter)this.getOrDefault(config.rateLimiter, this.defaultRateLimiter);
        boolean denyEmpty = (Boolean)this.getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
        HttpStatusHolder emptyKeyStatus = HttpStatusHolder.parse((String)this.getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
        return (exchange, chain) -> {
            return resolver.resolve(exchange).defaultIfEmpty("____EMPTY_KEY__").flatMap((key) -> {
                if ("____EMPTY_KEY__".equals(key)) {
                    if (denyEmpty) {
                        ServerWebExchangeUtils.setResponseStatus(exchange, emptyKeyStatus);
                        return exchange.getResponse().setComplete();
                    } else {
                        return chain.filter(exchange);
                    }
                } else {
                    String routeId = config.getRouteId();
                    if (routeId == null) {
                        Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
                        routeId = route.getId();
                    }

                    return limiter.isAllowed(routeId, key).flatMap((response) -> {
                        Iterator var4 = response.getHeaders().entrySet().iterator();

                        while(var4.hasNext()) {
                            Entry header = (Entry)var4.next();
                            exchange.getResponse().getHeaders().add((String)header.getKey(), (String)header.getValue());
                        }

                        if (response.isAllowed()) {
                            return chain.filter(exchange);
                        } else {
                            ServerWebExchangeUtils.setResponseStatus(exchange, config.getStatusCode());
                            return exchange.getResponse().setComplete();
                        }
                    });
                }
            });
        };
    }

    private  T getOrDefault(T configValue, T defaultValue) {
        return configValue != null ? configValue : defaultValue;
    }

    public static class Config implements HasRouteId {
        private KeyResolver keyResolver;
        private RateLimiter rateLimiter;
        private HttpStatus statusCode;
        private Boolean denyEmptyKey;
        private String emptyKeyStatus;
        private String routeId;

        public Config() {
            this.statusCode = HttpStatus.TOO_MANY_REQUESTS;
        }

        public KeyResolver getKeyResolver() {
            return this.keyResolver;
        }

        public RequestRateLimiterGatewayFilterFactory.Config setKeyResolver(KeyResolver keyResolver) {
            this.keyResolver = keyResolver;
            return this;
        }

        public RateLimiter getRateLimiter() {
            return this.rateLimiter;
        }

        public RequestRateLimiterGatewayFilterFactory.Config setRateLimiter(RateLimiter rateLimiter) {
            this.rateLimiter = rateLimiter;
            return this;
        }

        public HttpStatus getStatusCode() {
            return this.statusCode;
        }

        public RequestRateLimiterGatewayFilterFactory.Config setStatusCode(HttpStatus statusCode) {
            this.statusCode = statusCode;
            return this;
        }

        public Boolean getDenyEmptyKey() {
            return this.denyEmptyKey;
        }

        public RequestRateLimiterGatewayFilterFactory.Config setDenyEmptyKey(Boolean denyEmptyKey) {
            this.denyEmptyKey = denyEmptyKey;
            return this;
        }

        public String getEmptyKeyStatus() {
            return this.emptyKeyStatus;
        }

        public RequestRateLimiterGatewayFilterFactory.Config setEmptyKeyStatus(String emptyKeyStatus) {
            this.emptyKeyStatus = emptyKeyStatus;
            return this;
        }

        public void setRouteId(String routeId) {
            this.routeId = routeId;
        }

        public String getRouteId() {
            return this.routeId;
        }
    }
}

在这里插入图片描述 在这里插入图片描述

2、Lua脚本

Lua脚本在 Scripts文件夹下:request_rate_limitter.lua

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
  new_tokens = filled_tokens - requested
  allowed_num = 1
end

--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)

if ttl > 0 then
  redis.call("setex", tokens_key, ttl, new_tokens)
  redis.call("setex", timestamp_key, ttl, now)
end

-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }

– 求知若饥,虚心若愚。

关注
打赏
1664721914
查看更多评论
立即登录/注册

微信扫码登录

0.0424s