一、Redis分布式锁的原理
Redis分布式锁的原理就是在Redis里面占住一个“坑位”,当有”其他人“也要占这个坑位的时候,发现已经”坑位“已经被占了,那就加锁失败,就能放弃或者重试。
“占坑”一般使用setex(set if not exists)命令,只允许一个客户端来设置键-值(占坑)。先到先得,用完之后,再使用del命令删除这个key。
关于setnx命令:
一般来说我们使用Redis分布式锁,设置了一个Key之后,需要给它设置一个过期时间。保证出现异常异常的话,可以通过过期时间来删除Key。
1
2
|
setnx lock:foo true # 抢锁
expire lock:foo 5 # 加过期时间
|
但是很明显设置键值和加过期时间是两条命令,不是原子命令,如果在这两条命令中间的时间里Redis发生了异常,导致expire
命令没有被执行,那么就会造成死锁,在Redis2.8开始,便在set命令添加了扩展参数。使得setnx命令和expire命令可以一起执行。
set命令的扩展参数:
1
2
3
4
5
6
7
8
|
EX seconds – Set the specified expire time, in seconds.
PX milliseconds – Set the specified expire time, in milliseconds.
NX – Only set the key if it does not already exist.
XX – Only set the key if it already exist.
EX seconds – 设置键key的过期时间,单位时秒
PX milliseconds – 设置键key的过期时间,单位时毫秒
NX – 只有键key不存在的时候才会设置key的值
XX – 只有键key存在的时候才会设置key的值
|
通过在set命令添加上述的参数,已经可以完全取代setnx,setex,psetex命令。
综上所述:
由于setnx命令的缺陷,所以使用set key value [EX seconds] [PX milliseconds] [NX]
来取代setnx命令。
二、分布式锁必备的条件
-
独占性
既然是锁,那就必须是独占性,任何时刻只能有且仅有一个线程持有。
-
高可用
如果Redis是集群的环境,不能因为某一个节点的的不可用而导致获取锁和释放失败。
-
防止死锁
必须要超时控制机制或者撤销锁的操作,防止不能异常情况不能正确释放锁而导致的死锁。
-
不可乱抢
防止”张冠李戴“,一个客户端设置的锁只能由客户端来释放,不能被其他的客户端释放掉。
-
可重入
锁一般都是具有可重入性,同一个节点的同一个线程获得了锁之后,可以再次去获取这个锁。例如我们常用的synchronizd和Lock,只不过这两个都是本地锁。
三、基于Spring Boot实现Redis分布式锁案例
这里使用Spring Boot进行快速开发,Redis使用6.08版本。关于Redis的安装可自行去搜索安装。
用到的依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
</dependencies>
|
启动类
1
2
3
4
5
6
|
@SpringBootApplication
public class LockApplication {
public static void main(String[] args) {
SpringApplication.run(LockApplication.class, args);
}
}
|
Redis的配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory) {
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setConnectionFactory(connectionFactory);
return redisTemplate;
}
}
|
配置文件的一些配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
server.port=8080
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=192.168.244.10
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制) 默认 8
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接 默认 8
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接 默认 0
spring.redis.lettuce.pool.min-idle=0
|
定义controller,因为重点关注的是Redis分布式锁,这里代码便全都在controller里面编写。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
@RestController
@RequestMapping("lock")
public class LockTestController {
private final static String lockKey = "distributed-lock:goods:1";
private final Logger logger = LoggerFactory.getLogger(LockTestController.class);
private final int timeout = 10;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@GetMapping("test")
public Object distributedLockV1() {
String value = UUID.randomUUID() + Thread.currentThread().getName();
// 抢锁成功的话,并设置key的过期时间
try {
Boolean gotLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, timeout, TimeUnit.SECONDS);
if (gotLock) {
logger.info(Thread.currentThread().getName() + ":抢锁成功");
try {
// 这里模拟业务代码的执行时间
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "抢锁成功!";
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放资源,即解锁
stringRedisTemplate.delete(lockKey);
}
// 对于抢锁失败的,可以重试抢锁,也可以直接返回友好的失败操作。
return "抢锁失败";
}
}
|
四、Redis分布式锁超时问题
4.1 正确释放锁
在开始的时候,说到为保证Redis异常,没能正确释放锁,应该给Key设置一个过期时间,让这个Key过期了而被删除。
现在讨论上一章节实现的分布式锁:
加入在抢锁成功之后,**在执行业务逻辑的时候时间过长,此时该线程(线程1)设置的Key过期被删除了,其他线程(线程2)会抢锁成功又重新设置了Key,线程1执行完,释放锁的时候,由于线程1设置的Key过期了,此时删除的线程2设置的Key。*这样线程2岂不是就懵!
解决方案:
所以在删除Key的时候,判断一下这个Key是不是该线程抢锁时设置的Key,所以在设置Key的时候也把Value也设置,在删除的时候,判断一下Value时候一致。自己释放自己加的锁。
改进代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@GetMapping("test2")
public Object distributedLockV2() {
// 生成随机值,在删除Key时判断
String value = UUID.randomUUID() + Thread.currentThread().getName();
// 抢锁成功的话,并设置key的过期时间
try {
Boolean gotLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, timeout, TimeUnit.SECONDS);
if (gotLock) {
logger.info(Thread.currentThread().getName() + ": 抢锁成功");
try {
// 这里模拟业务代码的执行时间
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "抢锁成功!";
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (value.equals(stringRedisTemplate.opsForValue().get(lockKey))){
// 释放资源,即解锁
stringRedisTemplate.delete(lockKey);
}
}
// 对于抢锁失败的,可以重试抢锁,也可以直接返回友好的失败操作。
return "抢锁失败";
}
|
4.2 原子性保证
在这里呢,又仔细想想,上一小节distributedLockV2
方法里,finally代码里判断和删除会被分成两条命令,此时这两条命令会被分开执行,这就不是原子命令。如果看过Redis官网的同学,应该知道,官网其实是给出了解决方法:使用Lua脚本执行,Redis在解析Lua脚本时是原子的。
解决这个判断和删除原子性问题的Lua脚本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
@RestController
@RequestMapping("lock")
public class LockTestController {
private final static String lockKey = "distributed-lock:goods:1";
// 命令执行成功,返回1,失败返回0
private final static String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then " +
"return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
private final Logger logger = LoggerFactory.getLogger(LockTestController.class);
private final int timeout = 50;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@GetMapping("test3")
public Object distributedLockV3() {
String value = UUID.randomUUID() + Thread.currentThread().getName();
// 抢锁成功的话,并设置key的过期时间
try {
Boolean gotLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, timeout, TimeUnit.SECONDS);
if (gotLock) {
logger.info(Thread.currentThread().getName() + ": 抢锁成功");
try {
// 这里模拟业务代码的执行时间
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "抢锁成功!";
}
} catch (Exception e) {
e.printStackTrace();
} finally {
DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long result = stringRedisTemplate.execute(script,
Collections.singletonList(lockKey),
value);
if (result != null) {
if (result.equals(1L)) {
logger.info("释放锁---" + lockKey + "---成功");
} else {
logger.error("释放Redis分布式锁---" + lockKey + "---失败");
}
}
logger.error("执行lua脚本异常!");
}
// 对于抢锁失败的,可以重试抢锁,也可以直接返回友好的失败操作。
return "抢锁失败";
}
}
|
4.3 小总结
1、加了Redis分布式锁,如果出现异常的话,可能无法释放锁,所以在代码层面在finally释放锁。
2、防止因为异常导致代码没有执行到finally,设置的Key需要添加过期时间。
3、释放锁时,需要给Key设置Value值,在释放锁的时候时候,校验一下Value是否一致,防止是放错别人设置的锁。
4、确保判断和删除两条Redis命令是原子的,通过Lua脚本完成。
到这里关于单机Redis下的分布式锁的实现,到这就了。其实还是会有问题:1、Key过期了,但是业务还没执行完,这就要对Key进行续期。2、单机Redis是CP的,集群Redis是AP。主从异步复制可能会导致锁丢失。
下篇就来说一下Redis集群环境下的分布式锁解决方案RedLock算法以及Java语言的实现Redisson