<返回更多

Java分布式锁看这篇就够了

2020-09-01    
加入收藏

作者 | seesun2012

来源 | urlify.cn/uY36R3

 

Java分布式锁看这篇就够了

 

什么是锁?


什么是分布式?


分布式的 CAP 理论告诉我们:

任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。

目前很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。基于 CAP理论,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证最终一致性。

分布式场景


此处主要指集群模式下,多个相同服务同时开启.

在许多的场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。很多时候我们需要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,通过 Java 提供的并发 API 我们可以解决,但是在分布式环境下,就没有那么简单啦。

什么是分布式锁?


我们需要怎样的分布式锁?


基于数据库做分布式锁


基于乐观锁

基于表主键唯一做分布式锁

思路:利用主键唯一的特性,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,当方法执行完毕之后,想要释放锁的话,删除这条数据库记录即可。

上面这种简单的实现有以下几个问题:

当然,我们也可以有其他方式解决上面的问题。

基于表字段版本号做分布式锁


这个策略源于 mysql 的 mvcc 机制,使用这个策略其实本身没有什么问题,唯一的问题就是对数据表侵入较大,我们要为每个表设计一个版本号字段,然后写一条判断 sql 每次进行判断,增加了数据库操作的次数,在高并发的要求下,对数据库连接的开销也是无法忍受的。

基于悲观锁

基于数据库排他锁做分布式锁


在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁 (注意:InnoDB 引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给要执行的方法字段名添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上。)。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。
我们可以认为获得排他锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,通过connection.commit()操作来释放锁。
这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。

但是还是无法直接解决数据库单点和可重入问题。

这里还可能存在另外一个问题,虽然我们对方法字段名使用了唯一索引,并且显示使用 for update 来使用行级锁。但是,MySQL 会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。如果发生这种情况就悲剧了。。。

还有一个问题,就是我们要使用排他锁来进行分布式锁的 lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆。

优缺点


优点:简单,易于理解
缺点:会有各种各样的问题(操作数据库需要一定的开销,使用数据库的行级锁并不一定靠谱,性能不靠谱)

基于 Redis 做分布式锁


基于 REDIS 的 SETNX()、EXPIRE() 方法做分布式锁


setnx()


setnx 的含义就是 SET if Not Exists,其主要有两个参数 setnx(key, value)。该方法是原子的,如果 key 不存在,则设置当前 key 成功,返回 1;如果当前 key 已经存在,则设置当前 key 失败,返回 0。

expire()


expire 设置过期时间,要注意的是 setnx 命令不能设置 key 的超时时间,只能通过 expire() 来对 key 设置。

使用步骤


1、setnx(lockkey, 1) 如果返回 0,则说明占位失败;如果返回 1,则说明占位成功

2、expire() 命令对 lockkey 设置超时时间,为的是避免死锁问题。

3、执行完业务代码后,可以通过 delete 命令删除 key。

这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步 setnx 执行成功后,在 expire() 命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题,所以如果要对其进行完善的话,可以使用 redis 的 setnx()、get() 和 getset() 方法来实现分布式锁。

基于 REDIS 的 SETNX()、GET()、GETSET()方法做分布式锁


这个方案的背景主要是在 setnx() 和 expire() 的方案上针对可能存在的死锁问题,做了一些优化。

getset()


这个命令主要有两个参数 getset(key,newValue)。该方法是原子的,对 key 设置 newValue 这个值,并且返回 key 原来的旧值。假设 key 原来是不存在的,那么多次执行这个命令,会出现下边的效果:

使用步骤


import cn.com.tpig.cache.redis.RedisService;
import cn.com.tpig.utils.SpringUtils;//redis分布式锁public final class RedisLockUtil {    private static final int defaultExpire = 60;
    private RedisLockUtil() {        //    }    /**     * 加锁     * @param key redis key     * @param expire 过期时间,单位秒     * @return true:加锁成功,false,加锁失败
     */    public static boolean lock(String key, int expire) {        RedisService redisService = SpringUtils.getBean(RedisService.class);        long status = redisService.setnx(key, "1");
        if(status == 1) {
            redisService.expire(key, expire);            return true;
        }        return false;
    }    public static boolean lock(String key) {        return lock2(key, defaultExpire);
    }    /**     * 加锁     * @param key redis key     * @param expire 过期时间,单位秒     * @return true:加锁成功,false,加锁失败
     */    public static boolean lock2(String key, int expire) {        RedisService redisService = SpringUtils.getBean(RedisService.class);        long value = System.currentTimeMillis() + expire;        long status = redisService.setnx(key, String.valueOf(value));
        if(status == 1) {
            return true;
        }        long oldExpireTime = Long.parseLong(redisService.get(key, "0"));
        if(oldExpireTime < System.currentTimeMillis()) {
            //超时            long newExpireTime = System.currentTimeMillis() + expire;            long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime)));            if(currentExpireTime == oldExpireTime) {
                return true;
            }        }        return false;
    }    public static void unLock1(String key) {        RedisService redisService = SpringUtils.getBean(RedisService.class);        redisService.del(key);    }    public static void unLock2(String key) {        RedisService redisService = SpringUtils.getBean(RedisService.class);        long oldExpireTime = Long.parseLong(redisService.get(key, "0"));
        if(oldExpireTime > System.currentTimeMillis()) {
            redisService.del(key);        }   }}

 

public void drawRedPacket(long userId) {
    String key = "draw.redpacket.userid:" + userId;
    boolean lock = RedisLockUtil.lock2(key, 60);
    if(lock) {
        try {
            //领取操作
        } finally {
            //释放锁
            RedisLockUtil.unLock(key);
        }
    } else {
        new RuntimeException("重复领取奖励");
    }
}

基于 REDLOCK 做分布式锁


Redlock 是 Redis 的作者 antirez 给出的集群模式的 Redis 分布式锁,它基于 N 个完全独立的 Redis 节点(通常情况下 N 可以设置成 5)。

算法的步骤如下:

使用 Redlock 算法,可以保证在挂掉最多 2 个节点的时候,分布式锁服务仍然能工作,这相比之前的数据库锁和缓存锁大大提高了可用性,由于 redis 的高效性能,分布式缓存锁性能并不比数据库锁差。

但是,有一位分布式的专家写了一篇文章《How to do distributed locking》,质疑 Redlock 的正确性。

https://mp.weixin.qq.com/s/1bPLk_VZhZ0QYNZS8LkviA

https://blog.csdn.net/jek123456/article/details/72954106

优缺点


优点: 性能高

缺点:

失效时间设置多长时间为好?如何设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间。

基于 REDISSON 做分布式锁


redisson 是 redis 官方的分布式锁组件。GitHub 地址:https://github.com/redisson/redisson

上面的这个问题 ——> 失效时间设置多长时间为好?这个问题在 redisson 的做法是:每获得一个锁时,只设置一个很短的超时时间,同时起一个线程在每次快要到超时时间时去刷新锁的超时时间。在释放锁的同时结束这个线程。

基于 ZooKeeper 做分布式锁


ZOOKEEPER 锁相关基础知识


ZK 基本锁


ZK 锁优化


步骤:

import java.io.IOException;
import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import org.Apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;public class DistributedLock implements Lock, Watcher{    private ZooKeeper zk;    private String root = "/locks";//根
    private String lockName;//竞争资源的标志    private String waitNode;//等待前一个锁    private String myZnode;//当前锁    private CountDownLatch latch;//计数器    private int sessionTimeout = 30000;
    private List<Exception> exception = new ArrayList<Exception>();    /**     * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
     * @param config 127.0.0.1:2181
     * @param lockName 竞争资源标志,lockName中不能包含单词lock     */    public DistributedLock(String config, String lockName){
        this.lockName = lockName;        // 创建一个与服务器的连接        try {            zk = new ZooKeeper(config, sessionTimeout, this);
            Stat stat = zk.exists(root, false);
            if(stat == null){
                // 创建根节点                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }        } catch (IOException e) {            exception.add(e);        } catch (KeeperException e) {            exception.add(e);        } catch (InterruptedException e) {            exception.add(e);        }    }    /**     * zookeeper节点的监视器     */    public void process(WatchedEvent event) {        if(this.latch != null) {
            this.latch.countDown();        }    }    public void lock() {        if(exception.size() > 0){
            throw new LockException(exception.get(0));
        }        try {            if(this.tryLock()){
                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                return;
            }            else{
                waitForLock(waitNode, sessionTimeout);//等待锁            }        } catch (KeeperException e) {            throw new LockException(e);        } catch (InterruptedException e) {            throw new LockException(e);        }    }    public boolean tryLock() {        try {            String splitStr = "_lock_";
            if(lockName.contains(splitStr))
                throw new LockException("lockName can not contains \u000B");
            //创建临时子节点            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(myZnode + " is created ");
            //取出所有子节点            List<String> subNodes = zk.getChildren(root, false);
            //取出所有lockName的锁            List<String> lockObjNodes = new ArrayList<String>();            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);                }            }            Collections.sort(lockObjNodes);
            System.out.println(myZnode + "==" + lockObjNodes.get(0));
            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //如果是最小的节点,则表示取得锁                return true;
            }            //如果不是最小的节点,找到比自己小1的节点
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
        } catch (KeeperException e) {            throw new LockException(e);        } catch (InterruptedException e) {            throw new LockException(e);        }        return false;
    }    public boolean tryLock(long time, TimeUnit unit) {
        try {            if(this.tryLock()){
                return true;
            }            return waitForLock(waitNode,time);
        } catch (Exception e) {            e.printStackTrace();        }        return false;
    }    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);
        //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听        if(stat != null){
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);            this.latch = null;        }        return true;
    }    public void unlock() {        try {            System.out.println("unlock " + myZnode);
            zk.delete(myZnode,-1);
            myZnode = null;            zk.close();
        } catch (InterruptedException e) {            e.printStackTrace();        } catch (KeeperException e) {            e.printStackTrace();        }    }    public void lockInterruptibly() throws InterruptedException {        this.lock();    }    public Condition newCondition() {        return null;
    }    public class LockException extends RuntimeException {        private static final long serialVersionUID = 1L;
        public LockException(String e){            super(e);        }        public LockException(Exception e){            super(e);        }    }}

优缺点


优点:

有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。

缺点:

性能上可能并没有缓存服务那么高,因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK 中创建和删除节点只能通过 Leader 服务器来执行,然后将数据同步到所有的 Follower 机器上。还需要对 ZK的原理有所了解。

基于 Consul 做分布式锁


DD 写过类似文章,其实主要利用 Consul 的 Key / Value 存储 API 中的 acquire 和 release 操作来实现。

文章地址:http://blog.didispace.com/spring-cloud-consul-lock-and-semphore/

使用分布式锁的注意事项


1、注意分布式锁的开销

2、注意加锁的粒度

3、加锁的方式

总结


无论你身处一个什么样的公司,最开始的工作可能都需要从最简单的做起。不要提阿里和腾讯的业务场景 qps 如何大,因为在这样的大场景中你未必能亲自参与项目,亲自参与项目未必能是核心的设计者,是核心的设计者未必能独自设计。希望大家能根据自己公司业务场景,选择适合自己项目的方案。

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>