<返回更多

DelayQueue 源码剖析-细化深入带你理解

2022-10-05  今日头条  小心程序猿QAQ
加入收藏

JAVA中的DelayQueue

DelayQueue类是Java集合框架中的成员。在java.util.concurrent包下,实现了BlockingQueue接口。 他属于一种优先级队列,根据元素的延迟时间进行排序,这就意味着,你只能从队列里取出时间已经到期的元素。 如果没有延迟过期,则轮询将返回 null。 另外,DelayQueue 仅接受属于延迟类型的类或实现
java.util.concurrent.Delayed的那些元素。DelayQueue 在内部阻塞元素,直到某个元素延迟到期。DelayQueue 通过实现 getDelay(TimeUnit.NANOSECONDS) 方法返回剩余延迟时间。传递给 getDelay() 方法的 TimeUnit 实例是一个 Enum,它告诉应以哪个时间单位返回延迟。 TimeUnit 枚举可以采用 DAYS、HOURS、MINUTES、SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。此队列不允许空元素。此类及其迭代器实现了Collection和Iterator接口的所有可选方法。方法iterator()中提供的 Iterator 不能保证以任何特定顺序遍历 DelayQueue 的元素。 因为前面说了,他是一种优先级队列。

DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E>
复制代码

DelayQueue的层次结构

 

可以看到 DelayQueue 实现了 Iterable、Collection、BlockingQueue、Queue接口。

DelayQueue的成员变量

private final transient ReentrantLock lock = new ReentrantLock();
    
private final PriorityQueue<E> q = new PriorityQueue<E>();

private Thread leader = null;

private final Condition available = lock.newCondition();

复制代码

从四个属性中可以看出,它本质上就是一个优先级队列! 并且一个线程变量作为 “leader” , 还有个 lock ,都是什么用的呢?我们来分析一下

源码剖析

首先我们看到它有基本的添加元素、修改元素等实现,那么我们从 添加元素 方法看起

add、put、offer

public boolean add(E e) {
    // 调用offer方法
    return offer(e);
}

public void put(E e) {
    // 调用offer方法
    offer(e);
}
    
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

复制代码
  1. 首先使用属性中定义的锁
  2. 将元素添加到优先级队列中
  3. 判断如果peek出来的就是添加的元素,则leader线程置为null,同时唤醒一个正在等待的线程
  4. 返回成功
  5. 最后解锁

看到添加元素的代码比较简单,那么我们拿取的时候,它是如何决定元素的拿取时机的呢?是否是阻塞的呢?我们看下拿去元素的方法,poll和take

poll

/**
 * 检索并移除队列的头元素,如果队列中没有元素达到延迟时间则返回null
 */
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}
复制代码

poll() 方法同样是加锁进行拿取,只不过 peek() 出来的元素如果getDelay(NANOSECONDS) 发现当前时间没有达到延迟时间的话,那么不进行返回元素,如果达到了,则把元素 poll() 出来。可以看到,他增加了一个 first.getDelay(NANOSECONDS) > 0 的判断,这就是为什么所有的元素需要实现
java.util.concurrent.Delayed。重写getDelay(TimeUnit unit) 方法,不然它怎么知道怎么才算延迟。

take

take() 方法就有些复杂点了

/**
 * 检索检索并移除此队列的头,如有必要,等待,
 * 直到有元素达到延迟时间。
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 获取可中断锁
    lock.lockInterruptibly();
    try {
        // 开始进行轮询
        for (;;) {
            // 获取队列头的元素(最早过期的)
            E first = q.peek();
            if (first == null)
                // 如果队列头元素为null,说明队列中没有元素,当前线程进入等待状态
                available.await();
            else {
                // 这里和上面获取过期元素的判断是一样的
                // 获取头元素的过期时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 对比如果过期,则取出元素
                    return q.poll();
                // 如果没有过期,将头元素置为null
                first = null;
                if (leader != null)
                    // 如果有线程在争抢leader线程,进入等待并让出锁
                    available.await();
                else {
                    // 没有争抢发生则将自己set进去
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 并且等待 剩余等待的时间
                        available.awaitNanos(delay);
                    } finally {
                        // 最终释放自己
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果当前没有leader线程在工作的话,并且队列内部有元素,那么唤醒那些正在等待的线程
        if (leader == null && q.peek() != null)
            available.signal();
        // 解锁
        lock.unlock();
    }
}
复制代码
  1. 首先看一下队列中是否含有元素
  2. 如果没有元素,那就take操作就会处于等待状态
  3. 如果有元素,并且元素已达到过期时间,就返回该元素(元素从队列移出)
  4. 如果元素没有达到过期时间,将first元素置为null,此时再次走向分支条件:如果当前已经有leader线程了,那么就进入等待,直到被唤醒,否则就把自己设为leader线程,因为先前元素还没到过期时间,那么就等待剩余过期时间的时间,最后释放leader字段。
  5. 最后如果leader为null并且队列中含有元素,那么唤醒其他正在等待的线程
  6. 解锁

poll timeout

// 检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 获取可中断锁
    lock.lockInterruptibly();
    try {
        // 开始进行轮询
        for (;;) {
            // 获取队列头的元素(最早过期的)
            E first = q.peek();
            if (first == null) {
                // 如果队列头元素为null,说明队列中没有元素,
                // 如果设置的TimeUnit 小于等于0,说明不等待直接返回 null
                if (nanos <= 0)
                    return null;
                else
                    // 如果队列头元素为null,说明队列中没有元素
                    // 当前线程进入等待状态
                    // 并重新赋值 nanos
                    nanos = available.awaitNanos(nanos);
            } else {
                // 如果 队列中有元素存在的话
                // 那么就检查一下头元素的过期时间
                // 已经到期了的话,就弹出该元素
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                // 如果 nanos 比 delay 小,说明等待的太短了,再等待一下吧
                // 如果 nanos 比 delay 大,但是 leader 已经有线程占了,再等待一下吧
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    // 进入这里的条件是 nanos 比 delay 大,并且 leader 没有线程占用
                    Thread thisThread = Thread.currentThread();
                    // 把自己设置为 leader
                    leader = thisThread;
                    try {
                        // 等待 delay 时间
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果当前没有leader线程在工作的话,并且队列内部有元素,那么唤醒那些正在等待的线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}
复制代码

take()方法相比poll()方法最大的区别就是当延迟未过期时,take 方法会一直等待,而 poll 方法则会返回空,或者只等待你指定的时间。

思考

1. 为什么设置了一个leader字段

先看一段官方解释
Thread designated to wait for the element at the head of the queue. 
This variant of the Leader-Follower pattern (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to minimize unnecessary timed waiting. 
When a thread becomes the leader, it waits only for the next delay to elapse, 
but other threads await indefinitely. 
The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim. 
Whenever the head of the queue is replaced with an element with an earlier expiration time, 
the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled. 
So waiting threads must be prepared to acquire and lose leadership while waiting.
复制代码

说了,这个线程被用来等待队列中头部元素。 可以从如下代码体现

    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
        // 并且等待 剩余等待的时间
        available.awaitNanos(delay);
    } finally {
        // 最终释放自己
        if (leader == thisThread)
            leader = null;
    }
复制代码

Leader-Follower 可以最小化不必要的等待时间,我理解的leader-follower模式就是:选取一个线程作为leader线程,leader线程负责监听网络请求,其它线程为follower线程并且处于waiting状态,当leader线程接受到一个请求后,会释放自己给其他follower机会,众多的follower中会产生一个新的leader,剩下没有抢到机会的继续做follower。

而leadler在delayqueue里目的就是为了避免不必要的唤醒和睡眠。如果所有线程都可用,那么它们就会同时被调用,并且只会有一个线程可以真正从队列里获取元素,其他的线程会再度进行睡眠,浪费cpu资源。

2. 多个线程进行take操作,不同线程处于什么状态

打个比方

同一时刻只有一个人上前去领盒饭,而其他人因为已经有人在窗口等饭了就会处于等待状态,并不是一直在窗口问食堂阿姨问个不停,队列就是食堂,队列里的元素就是一个一个的盒饭,每个盒饭都有自己的制作时间,最快做好的盒饭就会优先放在前面的流水线上,时间一到,上去领盒饭的人领到自己的盒饭,就下去了( leader == null ),然后一看,食堂还有饭!就冲着下面那些人喊,还有饭,快上来拿( available.signal() )! 然后下面的人纷纷上来(争抢锁),那么抢到锁的线程就会持有锁,有机会询问阿姨是否还有饭( peek() ),这时,阿姨有可能说

“有,但还没做好”

“啥时候好呀?”

"再等五分钟吧,别着急"

然后这时,当前线程就会设置自己为leader,并等待五分钟( available.awaitNanos(delay) ),这时!锁被释放,然后就会有人上来问阿姨,但是一看,这leader有人把持着呢,那再等等吧……( if (leader != null) )再然后呢,刚才那个等五分钟的人判断自己是leader,会释放leader。

直到饭好了,有人领到盒饭。 那么等五分钟那老哥会不会领到他刚才问的饭呢?答案是不一定,毕竟follower并没有排队。

3. 为什么first会置为null

看了上面的分析不知道有没有理解, 那么我们看一下简略代码是这样的:

    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    for (;;) {
        E first = q.peek();
        long delay = first.getDelay(NANOSECONDS);
        if (delay <= 0) {
            first = null;
            available.await();
        }
    }
复制代码

for循环里首先peek出来队列中的头元素,如果这个元素没有过期,假设这里不置为null的话,当前线程就持有一个first,然后 await,那么下次再 进入循环还会 peek 一次,而之前的peek依然在内存中,所以没必要多持有一个,所以如果不满足条件(没有到期)则置为null,然后再 await。就像注释里说的 don't retain ref while waiting

原文链接:
https://juejin.cn/post/7150517172987494413

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>