Java服务_Redis实现限流与锁策略

Java服务_Redis实现限流与锁策略

Redis实现分布式锁的7种方案

常开发中,秒杀下单、抢红包等等业务场景,都需要用到分布式锁,而Redis非常适合作为分布式锁使用。

1.概要的说一下七种方法:

  • 方案一:SETNX + EXPIRE。使用SETNX命令来抢占锁,如果成功,再使用EXPIRE命令给锁设置一个过期时间。这种方案的缺点是SETNX和EXPIRE不是原子操作,可能导致锁无法释放。
  • 方案二:SETNX + value值是(系统时间+过期时间)。使用SETNX命令来抢占锁,如果失败,再获取锁的过期时间,如果已经过期,就用GETSET命令更新锁的过期时间。这种方案的缺点是需要客户端的时间同步,而且可能导致锁的过期时间被覆盖或者被其他客户端删除。
  • 方案三:使用Lua脚本(包含SETNX + EXPIRE两条指令)。使用Lua脚本来保证SETNX和EXPIRE的原子性,如果抢占锁成功,就设置锁的过期时间。这种方案的优点是避免了方案一的问题,但是还是存在方案二的问题。
  • 方案四:SET的扩展命令(SET EX PX NX)。使用SET命令的扩展参数来实现分布式锁,EX表示过期时间,PX表示毫秒单位,NX表示只有当键不存在时才设置值。这种方案的优点是简单高效,可以一条命令完成加锁和设置过期时间。
  • 方案五:SET EX PX NX + 校验唯一随机值,再释放锁。在方案四的基础上,增加了一个唯一随机值作为锁的持有者标识,只有持有者才能释放锁。这种方案的优点是增加了安全性,避免了其他客户端误删或者覆盖锁。
  • 方案六: 开源框架Redisson。Redisson是一个基于Redis的Java分布式对象和服务框架,提供了多种分布式锁的实现,如可重入锁、公平锁、读写锁等。这种方案的优点是功能丰富,易于使用和集成。
  • 方案七:多机实现的分布式锁Redlock。Redlock是Redis作者提出的一种基于多个Redis节点实现分布式锁的算法,可以保证在任何时刻只有一个客户端持有锁,并且能够容忍一定数量的节点故障。这种方案的优点是高可用性和高可靠性。

2.什么是分布式锁

分布式锁就是,控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。

一把靠谱的分布式锁需要具备的特性:

  • 「互斥性」: 任意时刻,只有一个客户端能持有锁。
  • 「锁超时释放」:持有锁超时,可以释放,防止不必要的资源浪费,也可以防止死锁。
  • 「可重入性」:一个线程如果获取了锁之后,可以再次对其请求加锁。
  • 「高性能和高可用」:加锁和解锁需要开销尽可能低,同时也要保证高可用,避免分布式锁失效。
  • 「安全性」:锁只能被持有的客户端删除,不能被其他客户端删除

方案一:SETNX + EXPIRE

SETNX 是SET IF NOT EXISTS的简写.日常命令格式是SETNX key value,如果 key不存在,则SETNX成功返回1,如果这个key已经存在了,则返回0。key可以设置为key_resource_id,value设置任意值,伪代码如下:

1
2
3
4
5
6
7
8
9
10
if(jedis.setnx(key_resource_id,lock_value) == 1){ //加锁
expire(key_resource_id,100); //设置过期时间
try {
do something //业务请求
}catch(){
}
finally {
jedis.del(key_resource_id); //释放锁
}
}

但是这个方案中,setnxexpire两个命令分开了,「不是原子操作」。如果执行完setnx加锁,正要执行expire设置过期时间时,进程crash或者要重启维护了,那么这个锁就“长生不老”了,「别的线程永远获取不到锁啦」

锁方案二:SETNX + value值是(系统时间+过期时间)

为了解决方案一,「发生异常锁得不到释放的场景」,有小伙伴认为,可以把过期时间放到setnx的value值里面。如果加锁失败,再拿出value值校验一下即可。加锁代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
long expires = System.currentTimeMillis() + expireTime; //系统时间+设置的过期时间
String expiresStr = String.valueOf(expires);

// 如果当前锁不存在,返回加锁成功
if (jedis.setnx(key_resource_id, expiresStr) == 1) {
return true;
}
// 如果锁已经存在,获取锁的过期时间
String currentValueStr = jedis.get(key_resource_id);

// 如果获取到的过期时间,小于系统当前时间,表示已经过期
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {

// 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间(不了解redis的getSet命令的小伙伴,可以去官网看下哈)
String oldValueStr = jedis.getSet(key_resource_id, expiresStr);

if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
// 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才可以加锁
return true;
}
}

//其他情况,均返回加锁失败
return false;
}

这个方案的优点是,巧妙移除expire单独设置过期时间的操作,把「过期时间放到setnx的value值」里面来。解决了方案一发生异常,锁得不到释放的问题。但是这个方案还有别的缺点:(1)过期时间是客户端自己生成的(System.currentTimeMillis()是当前系统的时间),必须要求分布式环境下,每个客户端的时间必须同步;(2)如果锁过期的时候,并发多个客户端同时请求过来,都执行jedis.getSet(),最终只能有一个客户端加锁成功,但是该客户端锁的过期时间,可能被别的客户端覆盖;(3)该锁没有保存持有者的唯一标识,可能被别的客户端释放/解锁。

方案三:使用Lua脚本(包含SETNX + EXPIRE两条指令)

实际上,我们还可以使用Lua脚本来保证原子性(包含setnx和expire两条指令),lua脚本如下:

1
2
3
4
5
if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then
redis.call('expire',KEYS[1],ARGV[2])
else
return 0
end;

加锁代码如下:

1
2
3
4
5
String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
" redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
Object result = jedis.eval(lua_scripts, Collections.singletonList(key_resource_id), Collections.singletonList(values));
//判断是否成功
return result.equals(1L);

方案四:SET的扩展命令(SET EX PX NX)

除了使用,使用Lua脚本,保证SETNX + EXPIRE两条指令的原子性,我们还可以巧用Redis的SET指令扩展参数(SET key value[EX seconds][PX milliseconds][NX|XX]),它也是原子性的。

SET key value[EX seconds][PX milliseconds][NX|XX],其参数含义分别为:

EX seconds :设定key的过期时间,时间单位是秒;

PX milliseconds: 设定key的过期时间,单位为毫秒;

NX :表示key不存在的时候,才能set成功,也即保证只有第一个客户端请求才能获得锁,而其他客户端请求只能等其释放锁,才能获取;

XX: 仅当key存在时设置值。

伪代码如下:

1
2
3
4
5
6
7
8
9
if(jedis.set(key_resource_id, lock_value, "NX", "EX", 100s) == 1){ //加锁
try {
do something //业务处理
}catch(){
}
finally {
jedis.del(key_resource_id); //释放锁
}
}

但是这个方案还是可能存在问题:

问题一:「锁过期释放了,业务还没执行完」。假设线程a获取锁成功,一直在执行临界区的代码。但是100s过去后,它还没执行完。但是,这时候锁已经过期了,此时线程b又请求过来。显然线程b就可以获得锁成功,也开始执行临界区的代码。那么问题就来了,临界区的业务代码都不是严格串行执行的啦。

问题二:「锁被别的线程误删」。假设线程a执行完后,去释放锁。但是它不知道当前的锁可能是线程b持有的(线程a去释放锁时,有可能过期时间已经到了,此时线程b进来占有了锁)。那线程a就把线程b的锁释放掉了,但是线程b临界区业务代码可能都还没执行完呢。

方案五:SET EX PX NX + 校验唯一随机值,再删除

既然锁可能被别的线程误删,那我们给value值设置一个标记当前线程唯一的随机数,在删除的时候,校验一下。伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
if(jedis.set(key_resource_id, uni_request_id, "NX", "EX", 100s) == 1){ //加锁
try {
do something //业务处理
}catch(){
}
finally {
//判断是不是当前线程加的锁,是才释放
if (uni_request_id.equals(jedis.get(key_resource_id))) {
jedis.del(lockKey); //释放锁
}
}
}

在这里,「判断是不是当前线程加的锁」「释放锁」不是一个原子操作。如果调用jedis.del()释放锁的时候,可能这把锁已经不属于当前客户端,会解除他人加的锁。

为了更严谨,一般也是用lua脚本代替。lua脚本如下:

1
2
3
4
5
if redis.call('get',KEYS[1]) == ARGV[1] then 
return redis.call('del',KEYS[1])
else
return 0
end;

方案六:Redisson框架

方案五还是可能存在「锁过期释放,业务没执行完」的问题。有些小伙伴认为,稍微把锁过期时间设置长一些就可以啦。其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放。(也就是看门狗策略)

当前开源框架Redisson解决了这个问题,Redisson底层原理图:

只要线程一加锁成功,就会启动一个watch dog看门狗,它是一个后台线程,会每隔10秒检查一下,如果线程1还持有锁,那么就会不断的延长锁key的生存时间。因此,Redisson就是使用Redisson解决了「锁过期释放,业务没执行完」问题。

方案七:多机实现的分布式锁Redlock+Redisson

前面六种方案都只是基于单机版Redis的讨论,还不是很完美,其实Redis一般都是集群部署的:

如果线程一在Redis的master节点上拿到了锁,但是加锁的key还没同步到slave节点,恰好这时,master节点发生故障,一个slave节点就会升级为master节点,线程二就可以获取同个key的锁啦,但线程一也已经拿到锁了,锁的安全性就没了。

为了解决这个问题,Redis作者antirez提出一种高级的分布式锁算法:Redlock,其核心思想是这样的:搞多个Redis master部署,以保证它们不会同时宕掉,并且这些master节点是完全相互独立的,相互之间不存在数据同步,同时需要确保在这多个master实例上,是与在Redis单实例,使用相同方法来获取和释放锁。

RedLock的实现步骤如下:

  • 获取当前时间,以毫秒为单位。
  • 按顺序向5个master节点请求加锁。客户端设置网络连接和响应超时时间,并且超时时间要小于锁的失效时间。(假设锁自动失效时间为10秒,则超时时间一般在5-50毫秒之间,我们就假设超时时间是50ms吧)。如果超时,跳过该master节点,尽快去尝试下一个master节点。
  • 客户端使用当前时间减去开始获取锁时间(即步骤1记录的时间),得到获取锁使用的时间。当且仅当超过一半(N/2+1,这里是5/2+1=3个节点)的Redis master节点都获得锁,并且使用的时间小于锁失效时间时,锁才算获取成功。(如上图,10s> 30ms+40ms+50ms+4m0s+50ms)
  • 如果取到了锁,key的真正有效时间就变啦,需要减去获取锁所使用的时间。
  • 如果获取锁失败(没有在至少N/2+1个master实例取到锁,有或者获取锁时间已经超过了有效时间),客户端要在所有的master节点上解锁(即便有些master节点根本就没有加锁成功,也需要解锁,以防止有些漏网之鱼)。

选择哪种方案

主要根据以下几个因素来选择合适的方案:

  • 性能和效率。如果你需要一个简单高效的分布式锁,可以选择方案四或者方案五,它们只需要一条命令就可以完成加锁和设置过期时间,而且可以避免锁无法释放的问题。
  • 安全性和可靠性。如果你需要一个安全可靠的分布式锁,可以选择方案五或者方案七,它们可以保证锁只能被持有者释放,而且可以容忍一定数量的节点故障。
  • 功能和易用性。如果你需要一个功能丰富易于使用的分布式锁,可以选择方案六,它提供了多种分布式锁的实现,如可重入锁、公平锁、读写锁等,而且可以方便地集成到Java项目中。

当然,这些方案并不是完美的,它们可能存在一些潜在的问题或者局限性,你需要根据自己的业务需求和场景来权衡利弊,选择最适合你的方案。

Redis实现分布式锁的7种方案

Redis实现限流

三种Redis限流策略

Redis是一款高性能的key-value数据存储系统,越来越多的应用需要利用Redis实现限流功能,以避免应用遭受突发高流量而导致服务不可用等问题,常见的有三种策略:

1.计数器算法

计数器算法是Redis实现限流的常见手段,其核心思路是利用redis提供的key过期时间作为限流窗口期,key的值记录该窗口期内已经产生的访问资源次数,key本身记录限流的资源范围。当请求到来时,将统计单位时间内的请求数量与阈值进行比较,当达到阈值时就拒绝后续访问,从而起到限制流量的目的。具体实现方法如下:

1.1 使用Redis的原子操作incr操作,实现计数器的自增。

1.2 通过Redis对key设置过期时间,例如设置一分钟后过期。

1.3 当计算器的值超过限制阈值时,拒绝访问,否则可以继续访问并执行incr操作。

需要注意的是,由于计数器算法只记录请求数量,无法区分不同类型的请求,可能会存在被恶意用户绕过的可能性。因此,这种方法适用于单一请求的场景,如接口限流。

2.漏桶算法

漏桶算法也是一种流量控制算法,和计数器算法相比,漏桶算法会对请求进行一个统一的速率限制,而非单纯地限制访问量。

具体实现方法如下:

2.1 将漏桶看作一个固定大小的容器,以固定的速率漏出水。

2.2 使用Redis的List数据类型,将每个请求按照时间顺序加入List中,即水流进入水桶的过程。

2.3 使用Redis的过期机制,将List中已经达到一定时间的请求移出,即水从桶中漏出的过程。

2.4 当请求加入List时,判断List的长度是否达到桶的最大限制,如果超过限制,就拒绝请求,否则可以正常处理。

漏桶算法可用于应对各种请求,由于限制速率而非请求数量,不容易被恶意用户绕过,常用于对整个应用的限流控制。

3.令牌桶算法

令牌桶算法也属于流量控制算法,其主要思想为固定速率向令牌桶中添加令牌,一个请求需要获取令牌才能执行,当令牌桶中没有令牌时,请求将被拒绝。具体实现方法如下:

3.1 使用Redis的List数据类型,将一定数量的令牌添加到List中,表示令牌桶的容量。

3.2 使用Redis过期机制,每当有请求到来时,如果List中还有令牌,则可以正常处理请求,并从List中移除一个令牌,否则拒绝请求。

3.3 当令牌生成速度过快或者请求到来速度过慢时,可能会出现令牌桶溢出的情况。因此,可使用Redis的有序集合数据类型,记录每次执行的时间和执行次数,用于在下一次添加令牌时,调整添加令牌的数量,以适应实际情况。

令牌桶算法不仅能够限制并发数,而且可以控制请求速率,比较适合对底层资源进行保护,比如数据库连接池、磁盘IO等。

计数器算法限流实例

具体实现方案如下:

  • 首先规定资源限制范围,一般都是限制对某个接口的调用频率,因此key使用接口方法名即可;
  • 第一次访问资源时,key不存在,那么新创建一个key,并将值设置为1,最后设置key的过期时间,表示开启限流窗口期;
  • 每一次访问资源,会首先判断当前是否存在限流窗口期,如果存在,将访问次数加一,并判断是否达到最大资源访问次数限制;
  • 如果达到了,则抛出异常,告诉用户访问频繁,请稍后再试;
  • 如果没达到,则放行请求;
  • 在不是第一次访问资源的前提下,如果发现限流窗口期过了,那么重新开启一个。

1.准备工作

添加依赖

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

添加配置信息

1
2
3
4
5
6
7
8
9
10
11
12
spring:
redis:
host: xxx
port: 6379
password: xxx
lettuce:
#只有自动配置连接池的依赖,连接池才会生效
pool:
max-active: 8 #最大连接
max-idle: 8 #最大空闲连接
min-idle: 0 #最小空闲连接
max-wait: 100 #连接等待时间

修改redisTemplate的序列化方式为JSON

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@ConditionalOnMissingBean
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory)
{
//创建template
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
//设置连接工厂
redisTemplate.setConnectionFactory(redisConnectionFactory);
//设置序列化工具
GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
//key和hashKey采用String序列化
redisTemplate.setKeySerializer(RedisSerializer.string());
redisTemplate.setHashKeySerializer(RedisSerializer.string());
//value和hashValue用JSON序列化
redisTemplate.setValueSerializer(jsonRedisSerializer);
redisTemplate.setHashValueSerializer(jsonRedisSerializer);
return redisTemplate;
}

2.限流核心类实现

定义一个顶层的流量控制接口实现,pass方法返回true,表示方向请求,否则表示请求被拦截了:

1
2
3
4
5
6
7
public interface RateLimiter {
/**
* @param requestInfo 请求信息
* @return 当前请求是否允许通过
*/
boolean pass(RequestInformation requestInfo);
}

requestInfo提供当前请求的相关信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Data
public class RequestInformation {
/**
* 限流key
*/
private String key;
/**
* 限流时间
*/
private int time;
/**
* time时间内最大请求资源次数
*/
private int count;
/**
* 限流类型
*/
private int limitType;
/**
* 请求的方法信息
*/
private Method method;
/**
* 方法参数信息
*/
private Object[] arguments;
/**
* 客户端IP地址
*/
private String ip;
private HttpServletRequest httpServletRequest;
private HttpServletResponse httpServletResponse;

public RequestInformation() {
}
}

提供一个限流注解,该注解可以标注在方法或者类上,标注在类上,则表示当前类所有方法都需要流量控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Limiter {
/**
* @return 限流key--默认为rate_limit:业务名:类名.方法名 ,如果限制了IP类型,则为: rate_limit:业务名:ip:类名.方法名
*/
String key() default "";
/**
* @return 限流时间,单位为s
*/
int time() default 60;
/**
* @return time时间内限制的资源请求次数
*/
int count() default 100;
/**
* @return 限流类型
*/
int limitType() default LimitType.DEFAULT;
}

redis作为限流器的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class RedisRateLimiterImpl implements RateLimiter{
private static final String RATE_LIMITER_KEY_PREFIX="rate_limiter";
/**
* 使用redis做限流处理使用的lua脚本
*/
private static final String LIMITER_LUA=
"local key = KEYS[1]\n" +
"local count = tonumber(ARGV[1])\n" +
"local time = tonumber(ARGV[2])\n" +
"local current = redis.call('get', key)\n" +
"if current and tonumber(current) > count then\n" +
" return 1\n" +
"end\n" +
"current = redis.call('incr', key)\n" +
"if tonumber(current) == 1 then\n" +
" redis.call('expire', key, time)\n" +
"end\n" +
"return 0\n";
private RedisTemplate<String, Object> redisTemplate;

public RedisRateLimiterImpl(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

/**
* @param requestInfo 请求信息
* @return 当前请求是否允许通过
*/
@Override
public boolean pass(RequestInformation requestInfo) {
//拿到限流key
String limiterKey=getRateLimiterKey(requestInfo);
//执行lua脚本
Long limiterRes = redisTemplate.execute(RedisScript.of(LIMITER_LUA,Long.class), List.of(limiterKey), requestInfo.getCount(), requestInfo.getTime());
//判断限流结果
return limiterRes==0L;
}


private String getRateLimiterKey(RequestInformation requestInfo) {
return combineKey(RATE_LIMITER_KEY_PREFIX,
requestInfo.getKey(),
requestInfo.getIp(),
requestInfo.getMethod().getClass().getName(),
requestInfo.getMethod().getName());
}

private String combineKey(String ... keys) {
StringBuilder keyBuilder=new StringBuilder();
for (int i = 0; i < keys.length; i++) {
if(StringUtils.isEmpty(keys[i])){
continue;
}
keyBuilder.append(keys[i]);
if(i==keys.length-1){
continue;
}
keyBuilder.append(":");
}
return keyBuilder.toString();
}
}

lua脚本解释:

KEYS 和 ARGV 都是一会调用时候传进来的参数,tonumber 就是把字符串转为数字,redis.call 就是执行具体的 redis 指令,具体流程是这样:

  • 首先获取到传进来的 key 以及 限流的 count 和时间 time;
  • 通过 get 获取到这个 key 对应的值,这个值就是当前时间窗内这个接口可以访问多少次;
  • 如果是第一次访问,此时拿到的结果为 nil,否则拿到的结果应该是一个数字,所以接下来就判断,如果拿到的结果是一个数字,并且这个数字还大于 count,那就说明已经超过流量限制了,那么返回1表示请求拦截;
  • 如果拿到的结果为 nil,说明是第一次访问,此时就给当前 key 自增 1,然后设置一个过期时间;
  • 最后返回0表示请求放行。

另外lua脚本也可以定义在文件在,然后通过加载文件获取:

1
2
3
4
5
6
7
@Bean
public DefaultRedisScript<Long> limitScript() {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/limit.lua")));
redisScript.setResultType(Long.class);
return redisScript;
}

3.aop相关逻辑实现

我们需要将限流逻辑在需要流量管控的方法执行前先执行,因此需要拦截目标方法,有两个思路:

  • 通过@Aspect注解标注一个切面类,用@Before或者@Around注解标注在切面方法上,里面填写限流管控逻辑;
  • 手动编写一个advisor增强器,注入容器,并提供相关拦截器和pointcut实现。

手动编写advisor方式的实现具体步骤如下:

编写拦截器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@Slf4j
public class RateLimiterMethodInterceptor implements MethodInterceptor {
private final RateLimiter rateLimiter;

public RateLimiterMethodInterceptor(RateLimiter rateLimiter) {
this.rateLimiter=rateLimiter;
}

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
try{
RequestInformation requestInformation = new RequestInformation();
buildMethodInfo(requestInformation,invocation);
buildLimitInfo(requestInformation);
buildRequestInfo(requestInformation);
if (rateLimiter.pass(requestInformation)){
return invocation.proceed();
}
logWarn(requestInformation);
}catch (Exception e){
e.printStackTrace();
throw e;
}
throw new RateLimiterException("访问过于频繁,请稍后再试!");
}

private void logWarn(RequestInformation requestInformation) {
if(requestInformation.getHttpServletRequest()!=null){
log.warn("rateLimiter拦截了一个请求,该请求信息如下: URI: {} ,IP: {} ,方法名: {} ,方法参数信息: {} ",
requestInformation.getHttpServletRequest().getRequestURI(),requestInformation.getIp(),requestInformation.getMethod().getName(),
Arrays.toString(requestInformation.getArguments()));
}else {
log.warn("rateLimiter拦截了一个请求,该请求信息如下: 方法名: {} ,方法参数信息: {} ",
requestInformation.getMethod().getName(), Arrays.toString(requestInformation.getArguments()));
}
}

private void buildLimitInfo(RequestInformation requestInformation) throws RateLimiterException {
Method method = requestInformation.getMethod();
Limiter limiter;
if(method.isAnnotationPresent(Limiter.class)){
limiter = method.getAnnotation(Limiter.class);
}else {
limiter=method.getClass().getAnnotation(Limiter.class);
}
if(limiter==null){
throw new RateLimiterException("无法在当前方法"+method.getName()+"或者类"+method.getClass().getName()+"上寻找到@Limiter注解");
}
requestInformation.setKey(limiter.key());
requestInformation.setCount(limiter.count());
requestInformation.setTime(limiter.time());
requestInformation.setLimitType(limiter.limitType());
}

private void buildMethodInfo(RequestInformation requestInformation, MethodInvocation invocation) {
requestInformation.setMethod(invocation.getMethod());
if(invocation instanceof ReflectiveMethodInvocation){
ReflectiveMethodInvocation reflectiveMethodInvocation = (ReflectiveMethodInvocation) invocation;
requestInformation.setArguments(reflectiveMethodInvocation.getArguments());
}
}

/**
* 从线程上下文中取出请求和响应相关信息
*/
private void buildRequestInfo(RequestInformation requestInformation) {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
if(requestAttributes instanceof ServletRequestAttributes){
ServletRequestAttributes sra = (ServletRequestAttributes) requestAttributes;
requestInformation.setHttpServletRequest(sra.getRequest());
requestInformation.setHttpServletResponse(sra.getResponse());
}
if(requestInformation.getHttpServletRequest()!=null && requestInformation.getLimitType()==LimitType.IP){
requestInformation.setIp(IPUtils.getIpAddress(requestInformation.getHttpServletRequest()));
}
}
}

编写advisor增强器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class RateLimiterAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RateLimiterAdvisor rateLimiterAdvisor(RateLimiter rateLimiter) {
return new RateLimiterAdvisor(rateLimiter);
}

@Bean
@ConditionalOnMissingBean
public RateLimiter rateLimiter(RedisTemplate<String, Object> redisTemplate) {
return new RedisRateLimiterImpl(redisTemplate);
}
...
}

采用@Aspect注解切面的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Aspect
@Component
public class RateLimiterAspect {
private static final Logger log = LoggerFactory.getLogger(RateLimiterAspect.class);

@Autowired
private RedisTemplate<Object, Object> redisTemplate;

@Autowired
private RedisScript<Long> limitScript;

@Before("@annotation(rateLimiter)")
public void doBefore(JoinPoint point, RateLimiter rateLimiter) throws Throwable {
String key = rateLimiter.key();
int time = rateLimiter.time();
int count = rateLimiter.count();

String combineKey = getCombineKey(rateLimiter, point);
List<Object> keys = Collections.singletonList(combineKey);
try {
Long number = redisTemplate.execute(limitScript, keys, count, time);
if (number==null || number.intValue() > count) {
throw new ServiceException("访问过于频繁,请稍候再试");
}
log.info("限制请求'{}',当前请求'{}',缓存key'{}'", count, number.intValue(), key);
} catch (ServiceException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("服务器限流异常,请稍候再试");
}
}

public String getCombineKey(RateLimiter rateLimiter, JoinPoint point) {
StringBuffer stringBuffer = new StringBuffer(rateLimiter.key());
if (rateLimiter.limitType() == LimitType.IP) {
stringBuffer.append(IpUtils.getIpAddr(((ServletRequestAttributes) RequestContextHolder.currentRequestAttributes()).getRequest())).append("-");
}
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
Class<?> targetClass = method.getDeclaringClass();
stringBuffer.append(targetClass.getName()).append("-").append(method.getName());
return stringBuffer.toString();
}
}

利用Redis实现限流