<返回更多

当 LinkedList 不是列表时,速度快的兔子都追不上

2022-10-11  微信公众号  小姐姐味道
加入收藏

原创:小姐姐味道(微信公众号ID:xjjdog),欢迎分享,转载请保留出处。

ArrayList和LinkedList有什么区别?

这种侮辱人的问题,默认就把这两者限定在了同一个场景之中,它甚至连八股文都算不上。

一旦你被问到这种问题,也证明面试基本上泡汤了--面试官已经实在是找不到其他问题与你交流了。

你Over了。

但当我们细看一下LinkedList的class定义,就会发现,它并不像是ArrayList的那样具有纯洁的列表精神。

public class ArrayList<E> extends AbstractList<E>
        implements List<E>, Randomaccess, Cloneable, JAVA.io.Serializable
//VS
public class LinkedList<E>
    extends AbstractSequentialList<E>
    implements List<E>, Deque<E>, Cloneable, java.io.Serializable

LinkedList除了能够当作普通的列表,它还是一个Deque。双向链表,听着就比较唬人,这就是一个既能当做队列、又能当做栈的存在。

有了这种双重Buff的叠加,LinkedList的应用场景比ArrayList丰富的多。除了能做最简单的LRU缓存,LinkedList在刷题的时候也是充满了正能量。

关于类似Deque的API,xjjdog以前有专门的文章来介绍这些爆炸性的方法。

《带你见识一下,JAVA中的方法爆炸!》

王者ConcurrentLinkedQueue,一个阻塞的双向队列,它的基本操作方法有:(3[基本]x2[异常与返回值]+4[阻塞加超时])x3[队头队尾]=5x2x3=30,足足有30个方法。看完上面的文章,这30个方法可以很快手到擒来。

不过我们今天要聊的一个重点,是使用Deque来实现更快的延迟队列。

延迟队列

如果你想要某些数据延迟一段时间再进行处理,或者要再某段时间内按照分组进行一些计算,那延迟队列无疑是非常合适的。

在Java的Concurrent包里,就静悄悄的躺着DelayQueue。只要你的数据实现了Delayed接口,那么就可以放心大胆的把它们往里面塞。

可惜的是,DelayQueue的底层存储,使用的是PriorityQueue。

PriorityQueue是堆实现的,offer和poll数据的时间复杂度是O(logN)。这就意味着,当DelayQueue中的数据比较多的时候,它的性能就会下降。

除了把数据分片,使用多个DelayQueue来完成工作,我们有没有速度更快的方法?比如把PriorityQueue使用LinkedList来替换?

这要看场景。

如果你向DelayQueue中添加数据,是与当前添加的时间相关的,那完全可以使用LinkedList来替换PriorityQueue。

关键代码

要了解DelayQueue的运行原理,我们可以需要先看一下Delayed接口。任何想要存储到DelayQueue中的数据,都需要实现这个接口。

其中,getDelay就是用来判断当前数据是否超时的方法。而compareTo,则是PriorityQueue用来排序的,如果我们是按照当前塞入数据的,则compareTo方法就不是必要的。

public long getDelay(@NotNull TimeUnit unit) {
    return MAX_CACHE_DUAL + this.enqueueTimeNs - System.nanoTime();
}
public int compareTo(@NotNull Delayed o) {
    long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
    return (d == 0) ? 0 : (d < 0 ? -1 : 1);
}

按照以上的思路,我们把DelayQueue的代码拷贝一份,仅保留关键代码,如下。

public class LightweightDelayedQueue<E extends Delayed> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final LinkedList<E> q = new LinkedList<>();
    private final Condition available = lock.newCondition();
    private Thread leader;
    public void put(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
        } finally {
            lock.unlock();
        }
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (; ; ) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.size();
        } finally {
            lock.unlock();
        }
    }
    public int drainTo(Collection<? super E> c, int maxElements) {
        Objects.requireNonNull(c);
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            for (E first;
                 n < maxElements
                         && (first = q.peek()) != null
                         && first.getDelay(NANOSECONDS) <= 0; ) {
                c.add(first);   // In this order, in case add() throws.
                q.poll();
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }
}

主要方法

轻量级的延迟队列,如果一旦采用了LinkedList,那么它的入队、出队方法,就都变成了O(1)的时间复杂度。在延迟队列中的数据增加时,时间复杂度也能维持不变,可以说是速度快的连兔子都追不上了。

一般,在java中,put和take方法,都是代表阻塞性方法。比如,take方法,我们可以将其安全的阻塞在某个线程上,而不用担心太多的资源浪费。

final Thread thread = Thread.currentThread();
while (!this.close && !thread.isInterrupted()) {
    Data data = q.take();
}

这一切都是Condition办到的,它和wait、notify的作用是一样的。

当我们通过put方法添加新的数据到队列中,会通过signal方法,来通知等待的线程获取数据。

相同的,如果take方法发现队列中的数据为空,它将进入等待状态。如果有数据,也并不是马上把这些数据取出来,因为数据还没到期。比如最老的数据还剩下5秒才到期,代码也对这种情况进行了处理,它会尝试awaitNanos对应的时间。

这样,我们就可以使用这种简单的轮询方式来实现延迟队列,而不需要单独的线程去检测队列中的数据。

增加take方法的效率

但是这样还不够。

当数据量比较大的时候,队列的数据可能有多条已经到期。如果我们通过take方法来一条一条获取的话,效率自然不如批量获取高。

drainTo方法,可以一股脑的把到期的数据转移到其他的集合中,但它并不是一个阻塞性的方法。

我们可以先使用take来阻塞线程,然后再批量取出所有数据。

下面代码,会一次性获取100条数据作为一个批量。

final Data takeItem = q.take();
final List<Data> buckets = new ArrayList<>(100);
q.drainTo(buckets, 99);
buckets.add(takeItem);

End

实际上,我们的某个业务,当采用LinkedList来替代PriorityQueue,并进行批量操作后,CPU的使用直接降低了1/3。

Deque是xjjdog最喜欢的一个接口。每当使用offerFirst、offerLast这样精准的API进行操作,都会体验到多巴胺的乐趣。LinkedList作为它的儿子,自然拥有了这些所有的功能。

当我们使用concurrent包里的基本API,对这些淳朴的工具进行封装,它们就会焕发出新的活力。

作者简介:小姐姐味道 (xjjdog),一个不允许程序员走弯路的公众号。聚焦基础架构linux。十年架构,日百亿流量,与你探讨高并发世界,给你不一样的味道。

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>