大家好呀,我是楼仔。
今天第一天开工,收拾心情,又要开始好好学习,好好工作了。
对于使用 JAVA 的小伙伴,其实我们完全不用手动撸一个分布式锁,直接使用 redisson 就行。
但是因为这些封装好的组建,让我们越来越懒。
我们使用一些封装好的开源组建时,可以了解其中的原理,或者自己动手写一个,可以更好提升你的技术水平。
今天我就教大家用原生的 Redis,手动撸一个 Redis 分布式锁,很有意思。
其实通过 Redis 实现分布式锁,经常会有面试官会问,很多同学都知道用 SetNx() 去获取锁,解决并发问题。
SetNx() 是什么?我简单解答一下。
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
对于下面 2 种问题,你知道如何解决么?
这个就是我们实现 Redis 分布式锁时,需要重点解决的 2 个问题。
刚才说过,通过 SetNx() 去获取锁,可以解决并发问题。
当获取到锁,处理完业务逻辑后,会将锁释放。
图片
但当机器宕机,或者重启时,没有执行 Del() 删除锁操作,会导致锁一直没有释放。
所以,我们还需要记录锁的超时时间,判断锁是否超时。
图片
这里我们通过 GetKey() 获取锁的超时时间 A,通过和当前时间比较,判断锁是否超时。
如果锁未超时,直接返回,如果锁超时,重新设置锁的超时时间,成功获取锁。
还有其它问题么?当然!
因为在并发场景下,会存在 A、B 两个线程同时执行 SetNx(),导致两个线程同时获取到锁。
那如何解决呢?将 SetNx() 用 GetSet() 替换。
图片
GetSet() 是什么?我简单解答一下。
Redis Getset 命令用于设置指定 key 的值,并返回 key 的旧值。
这里不太好理解,我举个例子。
假如 A、B 两个线程,A 先执行,B 后执行:
可能有同学会继续问,之前设置的超时是 Ta = 200,现在变成了 Tb = 201,延长或缩短了锁的超时时间,不会有问题么?
其实在现实并发场景中,能走到这一步,基本是“同时”进来的,两者的时间差非常小,可以忽略此影响。
这里给出 Go 代码,注释都写得非常详细,即使你不会 Go,读注释也能读懂。
// 获取分布式锁,需要考虑以下情况:
// 1. 机器A获取到锁,但是在未释放锁之前,机器挂掉或者重启,会导致其它机器全部hang住,这时需要根据锁的超时时间,判断该锁是否需要重置;
// 2. 当锁超时时,需要考虑两台机器同时去获取该锁,需要通过GETSET方法,让先执行该方法的机器获取锁,另外一台继续等待。
func GetDistributeLock(key string, expireTime int64) bool {
currentTime := time.Now().Unix()
expires := currentTime + expireTime
redisAlias := "jointly"
// 1.获取锁,并将value值设置为锁的超时时间
redisRet, err := redis.SetNx(redisAlias, key, expires)
if nil == err && utils.MustInt64(1) == redisRet {
// 成功获取到锁
return true
}
// 2.当获取到锁的机器突然重启&挂掉时,就需要判断锁的超时时间,如果锁超时,新的机器可以重新获取锁
// 2.1 获取锁的超时时间
currentLockTime, err := redis.GetKey(redisAlias, key)
if err != nil {
return false
}
// 2.2 当"锁的超时时间"大于等于"当前时间",证明锁未超时,直接返回
if utils.MustInt64(currentLockTime) >= currentTime {
return false
}
// 2.3 将最新的超时时间,更新到锁的value值,并返回旧的锁的超时时间
oldLockTime, err := redis.GetSet(redisAlias, key, expires)
if err != nil {
return false
}
// 2.4 当锁的两个"旧的超时时间"相等时,证明之前没有其它机器进行GetSet操作,成功获取锁
// 说明:这里存在并发情况,如果有A和B同时竞争,A会先GetSet,当B再去GetSet时,oldLockTime就等于A设置的超时时间
if utils.MustString(oldLockTime) == currentLockTime {
return true
}
return false
}
删除锁逻辑:
// 删除分布式锁
// @return bool true-删除成功;false-删除失败
func DelDistributeLock(key string) bool {
redisAlias := "jointly"
redisRet := redis.Del(redisAlias, key)
if redisRet != nil {
return false
}
return true
}
业务逻辑:
func DoProcess(processId int) {
fmt.Printf("启动第%d个线程n", processId)
redisKey := "redis_lock_key"
for {
// 获取分布式锁
isGetLock := GetDistributeLock(redisKey, 10)
if isGetLock {
fmt.Printf("Get Redis Key Success, id:%dn", processId)
time.Sleep(time.Second * 3)
// 删除分布式锁
DelDistributeLock(redisKey)
} else {
// 如果未获取到该锁,为了避免redis负载过高,先睡一会
time.Sleep(time.Second * 1)
}
}
}
最后起个 10 个多线程,去执行这个 DoProcess():
func mAIn() {
// 初始化资源
var group string = "group"
var name string = "name"
var host string
// 初始化资源
host = "http://ip:port"
_, err := xrpc.NewXRpcDefault(group, name, host)
if err != nil {
panic(fmt.Sprintf("initRpc when init rpc failed, err:%v", err))
}
redis.SetRedis("louzai", "redis_louzai")
// 开启10个线程,去抢Redis分布式锁
for i := 0; i <= 9; i ++ {
go DoProcess(i)
}
// 避免子线程退出,主线程睡一会
time.Sleep(time.Second * 100)
return
}
程序跑了100 s,我们可以看到,每次都只有 1 个线程获取到锁,分别是 2、1、5、9、3,执行结果如下:
启动第0个线程
启动第6个线程
启动第9个线程
启动第4个线程
启动第5个线程
启动第2个线程
启动第1个线程
启动第8个线程
启动第7个线程
启动第3个线程
Get Redis Key Success, id:2
Get Redis Key Success, id:2
Get Redis Key Success, id:1
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
这个代码,其实是我很久之前写的,因为当时 Go 没有开源的分布式锁,但是我又需要通过单机去执行某个任务,所以就自己手动撸了一个,后来在线上跑了 2 年,一直都没有问题。
不过期间也遇到过一个坑,就是我们服务迁移时,忘了将旧机器的分布式锁停掉,导致锁经常被旧机器抢占,当时觉得很奇怪,我的锁呢?
写这篇文章时,又让我想到当时工作的场景。
最后再切回正题,本文由浅入深,详细讲解了 Redis 实现的详细过程,以及锁超时、并发场景下,如何保证锁能正常释放,且只有一个线程去获取锁。