ConcurrentLinkedQueue
ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。
ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个 next 关联起来,从而组成一张链表结构的队列。默认情况下 head 节点存储的元素为空,tail 节点等于 head 节点。
入队列
 public boolean offer(E e) {    checkNotNull(e);    final Node<E> newNode = new Node<E>(e);
    for (Node<E> t = tail, p = t;;) {        Node<E> q = p.next;        if (q == null) {            // p is last node            if (p.casNext(null, newNode)) {                // Successful CAS is the linearization point                // for e to become an element of this queue,                // and for newNode to become "live".                if (p != t) // hop two nodes at a time                    casTail(t, newNode);  // Failure is OK.                return true;            }            // Lost CAS race to another thread; re-read next        }        else if (p == q)            // We have fallen off list.  If tail is unchanged, it            // will also be off-list, in which case we need to            // jump to head, from which all live nodes are always            // reachable.  Else the new tail is a better bet.            p = (t != (t = tail)) ? t : head;        else            // Check for tail updates after two hops.            p = (p != t && t != (t = tail)) ? t : q;    }}
   复制代码
 
整个入队过程主要做两件事情:第一是定位出尾节点;第二是使用 CAS 算法将入队节点设置成尾节点的 next 节点,如不成功则重试。
出队列
并不是每次出队时都更新 head 节点,当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点。只有当 head 节点里没有元素时,出队操作才会更新 head 节点。这种做法也是通过 hops 变量来减少使用 CAS 更新 head 节点的消耗,从而提高出队效率。
 public E poll() {    restartFromHead:    for (;;) {        for (Node<E> h = head, p = h, q;;) {            E item = p.item;
            if (item != null && p.casItem(item, null)) {                // Successful CAS is the linearization point                // for item to be removed from this queue.                if (p != h) // hop two nodes at a time                    updateHead(h, ((q = p.next) != null) ? q : p);                return item;            }            else if ((q = p.next) == null) {                updateHead(h, p);                return null;            }            else if (p == q)                continue restartFromHead;            else                p = q;        }    }}
   复制代码
 
首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用 CAS 的方式将头节点的引用设置成 null,如果 CAS 成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了 head 节点,导致元素发生了变化,需要重新获取头节点。
评论