前言
PriorityBlockingQueue 优先级队列,线程安全(添加、读取都进行了加锁)、无界、读阻塞的队列,底层采用的堆结构实现(二叉树),默认是小根堆,最小的或者最大的元素会一直置顶,每次获取都取最顶端的数据
队列创建
小根堆
PriorityBlockingQueue<Integer>concurrentLinkedQueue=newPriorityBlockingQueue<Integer>();
大根堆
PriorityBlockingQueue<Integer>concurrentLinkedQueue=newPriorityBlockingQueue<Integer>(10,newComparator<Integer>(){
@Override
publicintcompare(Integero1,Integero2){
returno2-o1;
}
});
应用场景
有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。
我们来看一个具体例子,例子中定义了一个将要放入“优先阻塞队列”的任务类,并且定义了一个任务工场类和一个任务执行类,在任务工场类中产生了各种不同优先级的任务,将其添加到队列中,在任务执行类中,任务被一个个取出并执行。
packagecom.niuh.queue.priority;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.Queue;
importjava.util.Random;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.PriorityBlockingQueue;
importjava.util.concurrent.TimeUnit;
/**
*<p>
*</p>
*/
publicclassPriorityBlockingQueueDemo{
publicstaticvoidmain(String[]args)throwsException{
Randomrandom=newRandom(47);
ExecutorServiceexec=Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable>queue=newPriorityBlockingQueue<>();
exec.execute(newPrioritizedTaskProducer(queue,exec));//这里需要注意,往PriorityBlockingQueue中添加任务和取出任务的
exec.execute(newPrioritizedTaskConsumer(queue));//步骤是同时进行的,因而输出结果并不一定是有序的
}
}
classPrioritizedTaskimplementsRunnable,Comparable<PrioritizedTask>{
privateRandomrandom=newRandom(47);
privatestaticintcounter=0;
privatefinalintid=counter++;
privatefinalintpriority;
protectedstaticList<PrioritizedTask>sequence=newArrayList<>();
publicPrioritizedTask(intpriority){
this.priority=priority;
sequence.add(this);
}
@Override
publicintcompareTo(PrioritizedTasko){
returnpriority<o.priority?1:(priority>o.priority?-1:0);//定义优先级计算方式
}
@Override
publicvoidrun(){
try{
TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
}catch(InterruptedExceptione){
}
System.out.println(this);
}
@Override
publicStringtoString(){
returnString.format("[%1$-3d]",priority)+"Task"+id;
}
publicStringsummary(){
return"("+id+":"+priority+")";
}
publicstaticclassEndSentinelextendsPrioritizedTask{
privateExecutorServiceexec;
publicEndSentinel(ExecutorServiceexec){
super(-1);
this.exec=exec;
}
@Override
publicvoidrun(){
intcount=0;
for(PrioritizedTaskpt:sequence){
System.out.print(pt.summary());
if(++count%5==0){
System.out.println();
}
}
System.out.println();
System.out.println(this+"CallingshutdownNow()");
exec.shutdownNow();
}
}
}
classPrioritizedTaskProducerimplementsRunnable{
privateRandomrandom=newRandom(47);
privateQueue<Runnable>queue;
privateExecutorServiceexec;
publicPrioritizedTaskProducer(Queue<Runnable>queue,ExecutorServiceexec){
this.queue=queue;
this.exec=exec;
}
@Override
publicvoidrun(){
for(inti=0;i<20;i++){
queue.add(newPrioritizedTask(random.nextInt(10)));//往PriorityBlockingQueue中添加随机优先级的任务
Thread.yield();
}
try{
for(inti=0;i<10;i++){
TimeUnit.MILLISECONDS.sleep(250);
queue.add(newPrioritizedTask(10));//往PriorityBlockingQueue中添加优先级为10的任务
}
for(inti=0;i<10;i++){
queue.add(newPrioritizedTask(i));//往PriorityBlockingQueue中添加优先级为1-10的任务
}
queue.add(newPrioritizedTask.EndSentinel(exec));
}catch(InterruptedExceptione){
}
System.out.println("FinishedPrioritizedTaskProducer");
}
}
classPrioritizedTaskConsumerimplementsRunnable{
privatePriorityBlockingQueue<Runnable>queue;
publicPrioritizedTaskConsumer(PriorityBlockingQueue<Runnable>queue){
this.queue=queue;
}
@Override
publicvoidrun(){
try{
while(!Thread.interrupted()){
queue.take().run();//任务的消费者,从PriorityBlockingQueue中取出任务执行
}
}catch(InterruptedExceptione){
}
System.out.println("FinishedPrioritizedTaskConsumer");
}
}
工作原理
PriorityBlockingQueue 是 JDK1.5 的时候出来的一个阻塞队列。但是该队列入队的时候是不会阻塞的,永远会加到队尾。下面我们介绍下它的几个特点:
- PriorityBlockingQueue 和 ArrayBlockingQueue 一样是基于数组实现的,但后者在初始化时需要指定长度,前者默认长度是 11。
- 该队列可以说是真正的无界队列,它在队列满的时候会进行扩容,而前面说的无界阻塞队列其实都有有界,只是界限太大可以忽略(最大值是 2147483647)
- 该队列属于权重队列,可以理解为它可以进行排序,但是排序不是从小到大排或从大到小排,是基于数组的堆结构(具体如何排下面会进行分析)
- 出队方式和前面的也不同,是根据权重来进行出队,和前面所说队列中那种先进先出或者先进后出方式不同。
- 其存入的元素必须实现Comparator,或者在创建队列的时候自定义Comparator。
注意:
- 堆结构实际上是一种完全二叉树。关于二叉树可以查看 《树、二叉树、二叉搜索树的实现和特性》
- 堆又分为大顶堆和小顶堆 。大顶堆中第一个元素肯定是所有元素中最大的,小顶堆中第一个元素是所有元素中最小的。关于二叉堆可以查看《堆和二叉堆的实现和特性》
源码分析
定义
PriorityBlockingQueue的类继承关系如下:
其包含的方法定义如下:
成员属性
从下面的字段我们可以知道,该队列可以排序,使用显示锁来保证操作的原子性,在空队列时,出队线程会堵塞等。
/**
*默认数组长度
*/
privatestaticfinalintDEFAULT_INITIAL_CAPACITY=11;
/**
*最大达容量,分配时超出可能会出现OutOfMemoryError异常
*/
privatestaticfinalintMAX_ARRAY_SIZE=Integer.MAX_VALUE-8;
/**
*队列,存储我们的元素
*/
privatetransientObject[]queue;
/**
*队列长度
*/
privatetransientintsize;
/**
*比较器,入队进行权重的比较
*/
privatetransientComparator<?superE>comparator;
/**
*显示锁
*/
privatefinalReentrantLocklock;
/**
*空队列时进行线程阻塞的Condition对象
*/
privatefinalConditionnotEmpty;
构造函数
/**
*默认构造,使用长度为11的数组,比较器为空
*/
publicPriorityBlockingQueue(){
this(DEFAULT_INITIAL_CAPACITY,null);
}
/**
*自定义数据长度构造,比较器为空
*/
publicPriorityBlockingQueue(intinitialCapacity){
this(initialCapacity,null);
}
/**
*自定义数组长度,可以自定义比较器
*/
publicPriorityBlockingQueue(intinitialCapacity,
Comparator<?superE>comparator){
if(initialCapacity<1)
thrownewIllegalArgumentException();
this.lock=newReentrantLock();
this.notEmpty=lock.newCondition();
this.comparator=comparator;
this.queue=newObject[initialCapacity];
}
/**
*构造函数,带有初始内容的队列
*/
publicPriorityBlockingQueue(Collection<?extendsE>c){
this.lock=newReentrantLock();
this.notEmpty=lock.newCondition();
booleanheapify=true;//trueifnotknowntobeinheaporder
booleanscreen=true;//trueifmustscreenfornulls
if(cinstanceofSortedSet<?>){
SortedSet<?extendsE>ss=(SortedSet<?extendsE>)c;
this.comparator=(Comparator<?superE>)ss.comparator();
heapify=false;
}
elseif(cinstanceofPriorityBlockingQueue<?>){
PriorityBlockingQueue<?extendsE>pq=
(PriorityBlockingQueue<?extendsE>)c;
this.comparator=(Comparator<?superE>)pq.comparator();
screen=false;
if(pq.getClass()==PriorityBlockingQueue.class)//exactmatch
heapify=false;
}
Object[]a=c.toArray();
intn=a.length;
//Ifc.toArrayincorrectlydoesn'treturnObject[],copyit.
if(a.getClass()!=Object[].class)
a=Arrays.copyOf(a,n,Object[].class);
if(screen&&(n==1||this.comparator!=null)){
for(inti=0;i<n;++i)
if(a[i]==null)
thrownewNullPointerException();
}
this.queue=a;
this.size=n;
if(heapify)
heapify();
}
入队方法
入队方法,下面可以看到 put 方法最终会调用 offer 方法,所以我们只看 offer 方法即可。
offer(E e)
publicvoidput(Ee){
offer(e);//neverneedtoblock
}
publicbooleanoffer(Ee){
//判断是否为空
if(e==null)
thrownewNullPointerException();
//显示锁
finalReentrantLocklock=this.lock;
lock.lock();
//定义临时对象
intn,cap;
Object[]array;
//判断数组是否满了
while((n=size)>=(cap=(array=queue).length))
//数组扩容
tryGrow(array,cap);
try{
//拿到比较器
Comparator<?superE>cmp=comparator;
//判断是否有自定义比较器
if(cmp==null)
//堆上浮
siftUpComparable(n,e,array);
else
//使用自定义比较器进行堆上浮
siftUpUsingComparator(n,e,array,cmp);
//队列长度+1
size=n+1;
//唤醒休眠的出队线程
notEmpty.signal();
}finally{
//释放锁
lock.unlock();
}
returntrue;
}
siftUpComparable(int k, T x, Object[] array)
上浮调整比较器方法的实现
privatestatic<T>voidsiftUpComparable(intk,Tx,Object[]array){
Comparable<?superT>key=(Comparable<?superT>)x;
while(k>0){
//无符号向左移,目的是找到放入位置的父节点
intparent=(k-1)>>>1;
//拿到父节点的值
Objecte=array[parent];
//比较是否大于该元素,不大于就没比较交换
if(key.compareTo((T)e)>=0)
break;
//以下都是元素位置交换
array[k]=e;
k=parent;
}
array[k]=key;
}
根据上面的代码,可以看出这是完全二叉树在进行上浮调整。调整入队的元素,找出最小的,将元素排列有序化。简单理解就是:父节点元素值一定要比它的子节点得小,如果父节点大于子节点了,那就两者位置进行交换。
入队图解
例子:85 添加到二叉堆中(大顶堆)
packagecom.niuh.queue.priority;
importjava.util.Comparator;
importjava.util.concurrent.PriorityBlockingQueue;
/**
*<p>
*PriorityBlockingQueue简单演示demo
*</p>
*/
publicclassTestPriorityBlockingQueue{
publicstaticvoidmain(String[]args)throwsInterruptedException{
//大顶堆
PriorityBlockingQueue<Integer>concurrentLinkedQueue=newPriorityBlockingQueue<Integer>(10,newComparator<Integer>(){
@Override
publicintcompare(Integero1,Integero2){
returno2-o1;
}
});
concurrentLinkedQueue.offer(90);
concurrentLinkedQueue.offer(80);
concurrentLinkedQueue.offer(70);
concurrentLinkedQueue.offer(60);
concurrentLinkedQueue.offer(40);
concurrentLinkedQueue.offer(30);
concurrentLinkedQueue.offer(20);
concurrentLinkedQueue.offer(10);
concurrentLinkedQueue.offer(50);
concurrentLinkedQueue.offer(85);
//输出元素排列
concurrentLinkedQueue.stream().forEach(e->System.out.print(e+""));
//取出元素
Integertake=concurrentLinkedQueue.take();
System.out.println();
concurrentLinkedQueue.stream().forEach(e->System.out.print(e+""));
}
}
操作的细节分为两步:
- 第一步:首先把新元素插入到堆的尾部再说;(新的元素可能是特别大或者特别小,那么要做的一件事情就是重新维护一下堆的所有元素,把新元素挪到这个堆的相应的位置)
- 第二步:依次向上调整整个堆的结构,就叫 HeapifyUp
85 按照上面讲的先插入到堆的尾部,也就是一维数组的尾部,一维数组的尾部的话就上图的位置,因为这是一个完全二叉树,所以它的尾部就是50后面这个结点。插进来之后这个时候就破坏了堆,它的每一个结点都要大于它的儿子的这种属性了,接下来要做的事情就是要把 85 依次地向上浮动,怎么浮动?就是 85 大于它的父亲结点,那么就和父亲结点进行交换,直到走到根如果大于根的话,就和根也进行交换。
85 再继续往前走之后,它要和 80 再进行比较,同理可得:也就是说这个结点每次和它的父亲比,如果它大于它的父亲的话就交换,直到它不再大于它的父亲。
出队方法
入队列的方法说完后,我们来说说出队列的方法。PriorityBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:
- E take();
- E poll();
- E poll(long timeout, TimeUnit unit);
- E peek()
poll 和 peek 与上面类似,这里不做说明
take()
出队方法,该方法会阻塞
publicEtake()throwsInterruptedException{
//显示锁
finalReentrantLocklock=this.lock;
//可中断锁
lock.lockInterruptibly();
//结果接收对象
Eresult;
try{
//判断队列是否为空
while((result=dequeue())==null)
//线程阻塞
notEmpty.await();
}finally{
lock.unlock();
}
returnresult;
}
dequeue()
我们再来看看具体出队方法的实现,dequeue方法
privateEdequeue(){
//长度减少1
intn=size-1;
//判断队列中是否有元素
if(n<0)
returnnull;
else{
//队列对象
Object[]array=queue;
//取出第一个元素
Eresult=(E)array[0];
//拿出最后一个元素
Ex=(E)array[n];
//置空
array[n]=null;
Comparator<?superE>cmp=comparator;
if(cmp==null)
//下沉调整
siftDownComparable(0,x,array,n);
else
siftDownUsingComparator(0,x,array,n,cmp);
//成功则减少队列中的元素数量
size=n;
returnresult;
}
总体就是找到父节点与两个子节点中最小的一个节点,然后进行交换位置,不断重复,由上而下的交换。
siftDownComparable(int k, T x, Object[] array, int n)
再来看看下沉比较器方法的实现
privatestatic<T>voidsiftDownComparable(intk,Tx,Object[]array,
intn){
//判断队列长度
if(n>0){
Comparable<?superT>key=(Comparable<?superT>)x;
//找到队列最后一个元素的父节点的索引。
inthalf=n>>>1;//loopwhileanon-leaf
while(k<half){
//拿到k节点下的左子节点
intchild=(k<<1)+1;//assumeleftchildisleast
//取得子节点对应的值
Objectc=array[child];
//取得k右子节点的索引
intright=child+1;
//比较右节点的索引是否小于队列长度和左右子节点的值进行比较
if(right<n&&
((Comparable<?superT>)c).compareTo((T)array[right])>0)
c=array[child=right];
//比较父节点值是否大于子节点
if(key.compareTo((T)c)<=0)
break;
//下面都是元素替换
array[k]=c;
k=child;
}
array[k]=key;
}
}
出队图解
将堆尾元素替换到顶部(即堆顶被替代删除掉)
依次从根部向下调整整个堆的结构(一直到堆尾即可) HeapifyDown
例子:90 从二叉堆中删除(大顶堆)
总结
PriorityBlockingQueue 真的是个神奇的队列,可以实现优先出队。最特别的是它只有一个锁,入队操作永远成功,而出队只有在空队列的时候才会进行线程阻塞。可以说有一定的应用场景吧,比如:有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。