写点什么

深入浅出:ConcurrentLinkedQueue 源码分析与实战

  • 2023-10-26
    湖南
  • 本文字数:4919 字

    阅读完需:约 16 分钟

如下是 Java 集合体系架构图,近期几期内容都是围绕该体系进行知识讲解,以便于同学们学习 Java 集合篇知识能够系统化而不零散。


前言

  在多线程编程中,由于线程之间的竞争,导致多线程访问数据时容易出现数据不一致的问题,为了解决这个问题,Java 提供了一些线程安全的数据结构,其中之一就是 ConcurrentLinkedQueue,它是一个非阻塞的线程安全队列。

摘要

  本文主要介绍 ConcurrentLinkedQueue 的源代码解析、应用场景案例、优缺点分析、类代码方法介绍

ConcurrentLinkedQueue

简介

  ConcurrentLinkedQueue 是一个线程安全的队列,它的特点是非阻塞,也就是说当队列为空时,出队操作不会阻塞线程,而是立即返回 null。同时,它也不允许插入 null 元素。

  ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列。它采用了先进先出的原则,对于并发访问,它采取了一种无锁算法(lock-free),实现了高效率的并发操作。它通过 CAS 操作实现了“原子操作”,保证了线程安全。

源代码解析

  ConcurrentLinkedQueue 的源代码中,最重要的是 Node 类和 head、tail 两个节点。每个节点代表队列中的一个元素,节点中除了存储元素外,还包含了指向下一个节点的指针。


private static class Node<E> {    volatile E item;    volatile Node<E> next;
Node(E item) { UNSAFE.putObject(this, itemOffset, item); }
boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); }
void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }
private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset;
static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } }}
复制代码

 在队列中 head 和 tail 是两个 Node 节点,其中 head 代表队列中最先入队的元素,tail 代表队列中最后入队的元素。head 和 tail 节点中的 next 指针指向队列中的下一个元素,通过这样的方式将整个队列串起来,实现了队列的操作。



public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>        implements Queue<E>, java.io.Serializable {    private transient volatile Node<E> head;    private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
private void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
private void updateTail(Node<E> t, Node<E> p) { if (t != p && casTail(t, p)) t.lazySetNext(p); }
boolean casHead(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); }
boolean casTail(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); }
private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset;
static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentLinkedQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } }}
复制代码

应用场景案例

  ConcurrentLinkedQueue 的应用场景很广泛,它可以作为多线程环境下的任务队列,也可以作为消息队列、日志队列等。下面以一个简单的任务队列为例进行说明。

public class TaskQueue {    private ConcurrentLinkedQueue<Task> queue;
public TaskQueue() { queue = new ConcurrentLinkedQueue<>(); }
public void addTask(Task task) { queue.offer(task); }
public void executeTasks() { if (queue.isEmpty()) { return; } Task task = null; while ((task = queue.poll()) != null) { task.execute(); } }}
复制代码

在上述代码中,我们定义了一个 TaskQueue 类,它包含了一个 ConcurrentLinkedQueue 对象,用于存储任务。addTask()方法用于向队列中添加任务,executeTasks()方法用于执行队列中的任务。当队列为空时,直接返回。否则,使用 poll()方法从队列中取出一个任务执行,直到队列中的任务全部被执行完成。

优缺点分析

优点

  • 高并发性:ConcurrentLinkedQueue 的实现采用了无锁算法,相比于同步队列的加锁操作,它在高并发场景下的性能更优;

  • 无阻塞:当队列为空时,出队操作不会阻塞线程,而是立即返回 null;

  • 线程安全:ConcurrentLinkedQueue 是线程安全的,不需要我们手动进行同步。

缺点

  • 不支持随机访问:由于 ConcurrentLinkedQueue 是基于链表实现的,所以它不支持随机访问操作,只能从队头或队尾进行插入、删除和访问操作。如果应用场景中需要随机访问,建议使用其他数据结构;

  • 不支持元素排序:ConcurrentLinkedQueue 是一个队列,它不支持对元素进行排序。如果应用场景中需要对元素排序,建议使用其他数据结构。

类代码方法介绍

offer(E e)

  插入指定元素作为此队列的末尾(最后一个元素)。如果队列为空,则插入位于队头(第一个元素)。

public boolean offer(E e) {    checkNotNull(e);    final Node<E> newNode = new Node<E>(e);    for (Node<E> t = tail, p = t;;) {        Node<E> q = p.next;        if (q == null) {            if (p.casNext(null, newNode)) {                if (p != t)                    casTail(t, newNode);  // Failure is OK.                return true;            }        } else if (p == q)            p = (t != (t = tail)) ? t : head;        else            p = (p != t && t != (t = tail)) ? t : q;    }}
复制代码

poll()

获取并移除此队列的头。

public E poll() {    restartFromHead:    for (;;) {        for (Node<E> h = head, p = h, q;;) {            E item = p.item;
if (item != null && p.casItem(item, null)) { if (p != h) // Hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } }}
复制代码

size()

返回队列中的元素数量。

public int size() {    int count = 0;    for (Node<E> p = first(); p != null; p = succ(p))        if (p.item != null)            ++count;    return count;}
复制代码

isEmpty()

判断队列是否为空。

public boolean isEmpty() {    return first() == null;}
复制代码

contains(Object o)

判断队列中是否包含指定元素。

public boolean contains(Object o) {    if (o == null) return false;    for (Node<E> p = first(); p != null; p = succ(p)) {        E item = p.item;        if (item != null && o.equals(item))            return true;    }    return false;}
复制代码

add(E e)

插入指定元素作为此队列的末尾。与 offer()方法相同。

public boolean add(E e) {    return offer(e);}
复制代码

remove()

获取并移除此队列的头。与 poll()方法相同。

public E remove() {    E x = poll();    if (x != null)        return x;    else        throw new NoSuchElementException();}
复制代码

element()

获取但不移除此队列的头。与 peek()方法相同。

public E element() {    E x = peek();    if (x != null)        return x;    else        throw new NoSuchElementException();}
复制代码

测试用例

  我们可以编写如下测试用例来验证ConcurrentLinkedQueue的正确性。

测试代码

  下面是一个简单的示例代码,使用了ConcurrentLinkedQueue创建了一个线程安全的队列,并对其进行了读写测试:

package com.example.javase.collection;
import java.util.concurrent.ConcurrentLinkedQueue;
/** * @Author ms * @Date 2023-10-22 18:57 */public class ConcurrentLinkedQueueTest { public static void main(String[] args) { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// 添加元素 queue.offer("Java"); queue.offer("Python"); queue.offer("C++");
// 输出队列元素 System.out.println("Queue elements: " + queue);
// 获取并移除队列头部元素 String headElement = queue.poll(); System.out.println("Head element: " + headElement);
// 输出队列元素 System.out.println("Queue elements after polling: " + queue);
// 获取队列头部元素 String peekElement = queue.peek(); System.out.println("Peek element: " + peekElement);
// 输出队列元素 System.out.println("Queue elements after peeking: " + queue); }}
复制代码

期望输出结果如下:

Queue elements: [Java, Python, C++]Head element: JavaQueue elements after polling: [Python, C++]Peek element: PythonQueue elements after peeking: [Python, C++]
复制代码

接下来我们可以在本地执行一下这个测试用例,以作为检验是否能够将其预期结果正确输出。

测试结果

  根据如上测试用例,本地测试结果如下,仅供参考,你们也可以自行修改测试用例或者添加更多的测试数据或测试方法,进行熟练学习以此加深理解。

如上测试用例执行后,经肉眼验证与预期结果是一致!

测试代码分析

  根据如上测试用例,在此我给大家进行深入详细的解读一下测试代码,以便于更多的同学能够理解并加深印象。  如上代码是一个使用ConcurrentLinkedQueue实现的队列的示例代码。ConcurrentLinkedQueue是一个线程安全的无界队列,它采用了无锁算法来实现高效的并发操作。在该示例中,首先创建了一个ConcurrentLinkedQueue对象,并通过调用 offer()方法向队列中添加了 3 个元素。然后,分别演示了 poll()方法和 peek()方法的使用,它们分别用于获取并移除队列头部元素和获取队列头部元素,最终输出了操作后的队列元素。

小结

  ConcurrentLinkedQueue 是一个线程安全的队列,它采用了先进先出的原则,对于并发访问,它采取了一种无锁算法(lock-free),实现了高效率的并发操作。它通过 CAS 操作实现了“原子操作”,保证了线程安全。

  ConcurrentLinkedQueue 的应用场景很广泛,它可以作为多线程环境下的任务队列,也可以作为消息队列、日志队列等。

总结

  1. ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列;

  2. 支持先进先出原则,采用无锁算法实现高效的并发操作;

  3. 不支持随机访问和元素排序;

  4. 适用于多线程环境下的任务队列、消息队列等;

  5. 具有高并发性、无阻塞、线程安全等优点。

... ...


用户头像

只要码不死,就往死里码 2021-11-19 加入

还未添加个人简介

评论

发布
暂无评论
深入浅出:ConcurrentLinkedQueue源码分析与实战_#java_程序员万金游_InfoQ写作社区