一、Gateway基于Redis+Lua脚本限流使用
在 Spring Cloud Gateway中,限流作为网关最基本的功能,官方提供了 RequestRateLimiterGatewayFilterFactory
过滤器工厂,基于 Redis+Lua脚本方式采用令牌桶算法实现了限流。
官方文档:https://docs.spring.io/spring-cloud-gateway/docs/current/reference/html/#the-requestratelimiter-gatewayfilter-factory
1、引入依赖在Gateway服务中,添加 redis-reactive依赖。
org.springframework.boot
spring-boot-starter-data-redis-reactive
org.apache.commons
commons-pool2
2、添加配置
在配置文件中,添加 Redis配置信息,这里针对 app-user服务添加限流策略。
server:
port: 18088
spring:
application:
name: app-gateway
#配置redis地址
redis:
host: 192.168.xxx.xxx
port: 6379
database: 0
timeout: 5000
lettuce:
pool:
max-active: 200
max-wait: 10000
max-idle: 100
min-idle: 10
#配置 nacos注册中心
cloud:
nacos:
discovery:
server-addr: 192.168.xxx.xxx:8848
#配置 gateway网关
gateway:
#设置路由:路由id、路由到微服务的uri、断言
routes:
# app-order服务路由配置
- id: app-order #路由ID,全局唯一,建议配置服务名。
uri: lb://app-order #lb 整合负载均衡器ribbon,loadbalancer
predicates:
- Path=/order/** # 断言,路径相匹配的进行路由
# app-user服务路由配置
- id: app-user
uri: lb://app-user
predicates:
- Path=/app-api/app-user/user/**
filters:
# 去除url前缀:1代表去除第一个路径
- StripPrefix=1
- AddRequestHeader=app-Request-arg, myHeaderValue #添加请求头
- AddRequestParameter=app-RequestParam-arg, zhaoZilong # 添加请求参数
- name: RequestRateLimiter #限流过滤器
args:
redis-rate-limiter.replenishRate: 1 #令牌桶每秒填充速率
redis-rate-limiter.burstCapacity: 2 #令牌桶的总容量
key-resolver: "#{@pathKeyResolver}" #使用SpEL表达式,从Spring容器中获取Bean对象(pathKeyResolver)
3、配置keyResolver
key-resolver: “#{@pathKeyResolver}” 。可以指定限流策略,比如:path限流,请求参数限流,IP限流等。
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Mono;
/**
* 请求限流配置类。可以实现基于三个维度的限流
*/
@Configuration
@Slf4j
public class AppRequestRateLimiterConfig {
/**
* 基于请求url的维度限流,超出请求将返回429
* @return
*/
@Bean
public KeyResolver pathKeyResolver() {
log.info("---------基于请求url限流----------");
//path限流
return exchange -> Mono.just(exchange.getRequest().getURI().getPath());
}
/**
* 基于用户参数的维度限流
* @return
*/
//@Bean
//public KeyResolver userKeyResolver() {
// // user参数限流
// return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
//}
/**
* 基于IP的维度限流
* @return
*/
//@Bean
//public KeyResolver ipKeyResolver() {
// //IP限流
// return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
//}
}
4、测试
启动项目。根据限流策略访问。
- url限流:http://localhost:18088/app-api/app-user/user/findOrderByUserId/1
- user参数限流:http://localhost:18088/app-api/app-user/user/findOrderByUserId/1?user=zhaoyun
- IP限流:http://localhost:18088/app-api/app-user/user/findOrderByUserId/1
上面我配置的是每秒产生的令牌数量是1,当请求速度大于这个数值时,就会返回429的状态码。
到此,在Gateway中的限流就实现了。
二、源码了解Gateway基于Redis+Lua脚本限流,具体实现逻辑在 RequestRateLimiterGatewayFilterFactory类中。
1、RequestRateLimiterGatewayFilterFactory类具体代码如下:
@ConfigurationProperties("spring.cloud.gateway.filter.request-rate-limiter")
public class RequestRateLimiterGatewayFilterFactory extends AbstractGatewayFilterFactory {
public static final String KEY_RESOLVER_KEY = "keyResolver";
private static final String EMPTY_KEY = "____EMPTY_KEY__";
private final RateLimiter defaultRateLimiter;
private final KeyResolver defaultKeyResolver;
private boolean denyEmptyKey = true;
private String emptyKeyStatusCode;
public RequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
super(RequestRateLimiterGatewayFilterFactory.Config.class);
this.emptyKeyStatusCode = HttpStatus.FORBIDDEN.name();
this.defaultRateLimiter = defaultRateLimiter;
this.defaultKeyResolver = defaultKeyResolver;
}
public KeyResolver getDefaultKeyResolver() {
return this.defaultKeyResolver;
}
public RateLimiter getDefaultRateLimiter() {
return this.defaultRateLimiter;
}
public boolean isDenyEmptyKey() {
return this.denyEmptyKey;
}
public void setDenyEmptyKey(boolean denyEmptyKey) {
this.denyEmptyKey = denyEmptyKey;
}
public String getEmptyKeyStatusCode() {
return this.emptyKeyStatusCode;
}
public void setEmptyKeyStatusCode(String emptyKeyStatusCode) {
this.emptyKeyStatusCode = emptyKeyStatusCode;
}
public GatewayFilter apply(RequestRateLimiterGatewayFilterFactory.Config config) {
KeyResolver resolver = (KeyResolver)this.getOrDefault(config.keyResolver, this.defaultKeyResolver);
RateLimiter limiter = (RateLimiter)this.getOrDefault(config.rateLimiter, this.defaultRateLimiter);
boolean denyEmpty = (Boolean)this.getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
HttpStatusHolder emptyKeyStatus = HttpStatusHolder.parse((String)this.getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
return (exchange, chain) -> {
return resolver.resolve(exchange).defaultIfEmpty("____EMPTY_KEY__").flatMap((key) -> {
if ("____EMPTY_KEY__".equals(key)) {
if (denyEmpty) {
ServerWebExchangeUtils.setResponseStatus(exchange, emptyKeyStatus);
return exchange.getResponse().setComplete();
} else {
return chain.filter(exchange);
}
} else {
String routeId = config.getRouteId();
if (routeId == null) {
Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
return limiter.isAllowed(routeId, key).flatMap((response) -> {
Iterator var4 = response.getHeaders().entrySet().iterator();
while(var4.hasNext()) {
Entry header = (Entry)var4.next();
exchange.getResponse().getHeaders().add((String)header.getKey(), (String)header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
} else {
ServerWebExchangeUtils.setResponseStatus(exchange, config.getStatusCode());
return exchange.getResponse().setComplete();
}
});
}
});
};
}
private T getOrDefault(T configValue, T defaultValue) {
return configValue != null ? configValue : defaultValue;
}
public static class Config implements HasRouteId {
private KeyResolver keyResolver;
private RateLimiter rateLimiter;
private HttpStatus statusCode;
private Boolean denyEmptyKey;
private String emptyKeyStatus;
private String routeId;
public Config() {
this.statusCode = HttpStatus.TOO_MANY_REQUESTS;
}
public KeyResolver getKeyResolver() {
return this.keyResolver;
}
public RequestRateLimiterGatewayFilterFactory.Config setKeyResolver(KeyResolver keyResolver) {
this.keyResolver = keyResolver;
return this;
}
public RateLimiter getRateLimiter() {
return this.rateLimiter;
}
public RequestRateLimiterGatewayFilterFactory.Config setRateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
return this;
}
public HttpStatus getStatusCode() {
return this.statusCode;
}
public RequestRateLimiterGatewayFilterFactory.Config setStatusCode(HttpStatus statusCode) {
this.statusCode = statusCode;
return this;
}
public Boolean getDenyEmptyKey() {
return this.denyEmptyKey;
}
public RequestRateLimiterGatewayFilterFactory.Config setDenyEmptyKey(Boolean denyEmptyKey) {
this.denyEmptyKey = denyEmptyKey;
return this;
}
public String getEmptyKeyStatus() {
return this.emptyKeyStatus;
}
public RequestRateLimiterGatewayFilterFactory.Config setEmptyKeyStatus(String emptyKeyStatus) {
this.emptyKeyStatus = emptyKeyStatus;
return this;
}
public void setRouteId(String routeId) {
this.routeId = routeId;
}
public String getRouteId() {
return this.routeId;
}
}
}
Lua脚本在 Scripts文件夹下:request_rate_limitter.lua
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
if ttl > 0 then
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
end
-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }
– 求知若饥,虚心若愚。