Java 中的几种阻塞队列,kalilinux 渗透教程
remove: 删除数据时,如果队列中有此数据,删除成功返回
true
,否则返回false
。如果包含一个或者多个 object,那么只移除一个就返回true
。注意:remove(o)
是BlockingQueue
接口的方法,remove()
是Queue
接口的方法。element: 如果队列为空,那么抛出异常
NoSuchElementException
。如果队列不为空,查询返回队列头部的数据,但是不移除数据,这点不同于remove()
,element
同样是Queue
接口的方法。
返回特殊值:
offer: 插入数据时,如果阻塞队列没满,那么插入成功返回
true
,否则返回false
。当使用有界(capacity-restricted queue)阻塞队列时,建议使用offer
方法,不建议会抛出异常的add
方法。poll: 此方法是
Queue
接口的。如果队列不为空,查询、移除并返回队列头部元素。如果队列为空,那么返回null
。peek: 此方法是
Queue
接口的。如果队列为空,返回null
,这点不同于poll
。如果队列不为空,查询返回队列头部的数据,但是不移除数据,这点不同于remove()
。
一直阻塞:
put: 插入数据时,如果队列已满,那么阻塞等待队列可用,等待期间如果被中断,那么抛出
InterruptedException
。
take: 查询、删除并返回队列头部元素,如果队列为空,那么阻塞等待队列可用,等待期间如果被中断,那么抛出
InterruptedException
。
超时退出:
offer: 插入数据时,如果队列已满,那么阻塞指定时间等待队列可用,等待期间如果被中断,那么抛出
InterruptedException
。如果插入成功,那么返回true
,如果在达到指定时间后仍然队列不可用,那么返回false
。poll: 查询、删除并返回队列头部元素,如果队列为空,那么阻塞指定时间等待队列可用,等待期间如果被中断,那么抛出
InterruptedException
。如果删除成功,那么返回队列头部元素,如果在达到指定时间后仍然队列不可用,那么返回null
。
Queue
队列不能插入 null,否则会抛出NullPointerException
。
JDK7 提供了 7 个阻塞队列。分别是
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
ArrayBlockingQueue
ArrayBlockingQueue
是基于数组(array-based)的先进先出(FIFO)有界(bounded)阻塞队列。
创建队列时,必须要指定队列容量(capacity),即数组大小。
创建队列时,可以传入
Collection
来初始化队列元素。队列一旦被创建,那么队列容量不能被改变。
队列支持公平模式和非公平模式,默认非公平模式。
队列中只有一把锁,写锁和读锁未分离,并发控制采用了经典的 two-condition(
notEmpty
、notFull
)算法。
LinkedBlockingQueue
LinkedBlockingQueue
是基于链表(linked nodes)的先进先出(FIFO)的可选界(optionally-bounded)的阻塞队列。
创建队列时,为了避免额外开销,可以指定队列容量(capacity);如果不指定队列容量,那么默认队列容量为
Integer.MAX_VALUE
。创建队列时,可以可以传入
Collection
来初始化队列元素,此时不能指定队列容量,默认为Integer.MAX_VALUE
。队列中的
count
即当前队列元素个数,采用AtomicInteger
,避免put
和take
的竞争。与
ArrayBlockingQueue
不同的是,LinkedBlockingQueue
队列中有两把锁,读锁和写锁是分离的。在使用
LinkedBlockingQueue
时,若队列大小为默认值,且生产速度大于消费速度时,可能会内存溢出。LinkedBlockingQueue
理论上来说比ArrayBlockingQueue
有更高的吞吐量,但是在大多数的实际应用场景中,却没有很好的表现。
PriorityBlockingQueue
PriorityBlockingQueue
是基于数组(array based)的支持优先级的无界(unbounded)的阻塞队列。此队列的数据结构是堆。
创建队列时,如果指定初始化容量(initialCapacity),那么默认初始化容量
DEFAULT_INITIAL_CAPACITY
为 11。创建队列时,可以指定队列初始化容量(initialCapacity),不是队列容量(capacity)。
PriorityBlockingQueue
的无界(unbounded)相对于LinkedBlockingQueue
的可选界(optionally-bounded)来说,无界是指不能在创建队列时,不能指定队列的最大容量(capacity),并不是说PriorityBlockingQueue
本身无界。LinkedBlockingQueue
默认(注意,这里指的是默认容量,即,你可以指定大于Integer.MAX_VALUE
的值)的最大容量是Integer.MAX_VALUE
,而PriorityBlockingQueue
的最大容量是MAX_ARRAY_SIZE=Integer.MAX_VALUE-8
。PriorityBlockingQueue
无界的另一个意思就是生产者线程不会因为队列满了就阻塞,因为队列是无界的,没有容量满了这一说。offer(E e, long timeout, TimeUnit unit)
的后两个参数没有任何作用查看源代码发现,其方法的实现直接是调用了offer(e)
。但是当队列为空时,take
仍然会阻塞。offer(e)
永远不会返回false
,offer(E e, long timeout, TimeUnit unit)
永远不会返回false
或者阻塞。PriorityBlockingQueue
通过数组来实现队列,在原有数组满了的情况下,通过复制数组来扩展队列容量,如果新扩展的数组容量大小超过MAX_ARRAY_SIZE
,那么抛出OutOfMemoryError
异常。默认情况下元素采取自然顺序排列,也可以通过比较器 comparator 来指定元素的排序规则。元素按照升序排列。
DelayQueue
DelayQueue
是基于PriorityQueue
实现的支持延时获取元素的阻塞队列。
DelayQueue
中存放的对象必须实现Delayed
接口。如果没有到期元素,那么就没有
head
,poll
方法返回 null。当一个元素的
getDelay(TimeUnit.NANOSECONDS)
返回值小于等于 0 时,该元素过期。虽然不能用
take
和poll
移除未过期的元素,但是这些未过期的元素仍然和过期元素一样同等对待。例如,size
方法返回的数量就是过期元素和未过期元素的之和。
DelayQueue
的适用场景:
关闭空闲链接。服务器中有很多空闲链接,在一定时间后,关闭他们。
删除过期缓存。在一定时间后,删除某些缓存的对象。
任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
生成订单后的 60 秒后,给用户发送短信通知。
下单 15 分钟后,如果用户不付款就关闭订单。
通过上面几条场景例子,可以看出来,DelayQueue
适用于_在一定时间后,做某些业务处理_。
SynchronousQueue
SynchronousQueue
是一个没有数据缓冲的BlockingQueue
。
一个线程的插入必须等待另一个线程的删除操作才能完成,反之亦然。
SynchronousQueue
适合传递性设计(handoff designs),即一个线程中运行的对象,需要将某些信息、任务或者事件等传递给另一个线程中运行的对象的场景。SynchronousQueue
支持公平和非公平模式。不能在同步队列上进行
peek
,因为仅在试图要移除元素时,该元素才存在。不能迭代队列,因为其中没有元素可用于迭代。
Executors.newCachedThreadPool()
中就使用了SynchronousQueue
队列。
评论