阻塞队列—PriorityBlockingQueue源码分析

2025-05-29 0 103

阻塞队列—PriorityBlockingQueue源码分析

前言

阻塞队列—PriorityBlockingQueue源码分析

PriorityBlockingQueue 优先级队列,线程安全(添加、读取都进行了加锁)、无界、读阻塞的队列,底层采用的堆结构实现(二叉树),默认是小根堆,最小的或者最大的元素会一直置顶,每次获取都取最顶端的数据

队列创建

小根堆

PriorityBlockingQueue<Integer>concurrentLinkedQueue=newPriorityBlockingQueue<Integer>();

大根堆

PriorityBlockingQueue<Integer>concurrentLinkedQueue=newPriorityBlockingQueue<Integer>(10,newComparator<Integer>(){

@Override

publicintcompare(Integero1,Integero2){

returno2-o1;

}

});

应用场景

有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。

我们来看一个具体例子,例子中定义了一个将要放入“优先阻塞队列”的任务类,并且定义了一个任务工场类和一个任务执行类,在任务工场类中产生了各种不同优先级的任务,将其添加到队列中,在任务执行类中,任务被一个个取出并执行。

packagecom.niuh.queue.priority;

importjava.util.ArrayList;

importjava.util.List;

importjava.util.Queue;

importjava.util.Random;

importjava.util.concurrent.ExecutorService;

importjava.util.concurrent.Executors;

importjava.util.concurrent.PriorityBlockingQueue;

importjava.util.concurrent.TimeUnit;

/**

*<p>

*PriorityBlockingQueue使用示例

*</p>

*/

publicclassPriorityBlockingQueueDemo{

publicstaticvoidmain(String[]args)throwsException{

Randomrandom=newRandom(47);

ExecutorServiceexec=Executors.newCachedThreadPool();

PriorityBlockingQueue<Runnable>queue=newPriorityBlockingQueue<>();

exec.execute(newPrioritizedTaskProducer(queue,exec));//这里需要注意,往PriorityBlockingQueue中添加任务和取出任务的

exec.execute(newPrioritizedTaskConsumer(queue));//步骤是同时进行的,因而输出结果并不一定是有序的

}

}

classPrioritizedTaskimplementsRunnable,Comparable<PrioritizedTask>{

privateRandomrandom=newRandom(47);

privatestaticintcounter=0;

privatefinalintid=counter++;

privatefinalintpriority;

protectedstaticList<PrioritizedTask>sequence=newArrayList<>();

publicPrioritizedTask(intpriority){

this.priority=priority;

sequence.add(this);

}

@Override

publicintcompareTo(PrioritizedTasko){

returnpriority<o.priority?1:(priority>o.priority?-1:0);//定义优先级计算方式

}

@Override

publicvoidrun(){

try{

TimeUnit.MILLISECONDS.sleep(random.nextInt(250));

}catch(InterruptedExceptione){

}

System.out.println(this);

}

@Override

publicStringtoString(){

returnString.format("[%1$-3d]",priority)+"Task"+id;

}

publicStringsummary(){

return"("+id+":"+priority+")";

}

publicstaticclassEndSentinelextendsPrioritizedTask{

privateExecutorServiceexec;

publicEndSentinel(ExecutorServiceexec){

super(-1);

this.exec=exec;

}

@Override

publicvoidrun(){

intcount=0;

for(PrioritizedTaskpt:sequence){

System.out.print(pt.summary());

if(++count%5==0){

System.out.println();

}

}

System.out.println();

System.out.println(this+"CallingshutdownNow()");

exec.shutdownNow();

}

}

}

classPrioritizedTaskProducerimplementsRunnable{

privateRandomrandom=newRandom(47);

privateQueue<Runnable>queue;

privateExecutorServiceexec;

publicPrioritizedTaskProducer(Queue<Runnable>queue,ExecutorServiceexec){

this.queue=queue;

this.exec=exec;

}

@Override

publicvoidrun(){

for(inti=0;i<20;i++){

queue.add(newPrioritizedTask(random.nextInt(10)));//往PriorityBlockingQueue中添加随机优先级的任务

Thread.yield();

}

try{

for(inti=0;i<10;i++){

TimeUnit.MILLISECONDS.sleep(250);

queue.add(newPrioritizedTask(10));//往PriorityBlockingQueue中添加优先级为10的任务

}

for(inti=0;i<10;i++){

queue.add(newPrioritizedTask(i));//往PriorityBlockingQueue中添加优先级为1-10的任务

}

queue.add(newPrioritizedTask.EndSentinel(exec));

}catch(InterruptedExceptione){

}

System.out.println("FinishedPrioritizedTaskProducer");

}

}

classPrioritizedTaskConsumerimplementsRunnable{

privatePriorityBlockingQueue<Runnable>queue;

publicPrioritizedTaskConsumer(PriorityBlockingQueue<Runnable>queue){

this.queue=queue;

}

@Override

publicvoidrun(){

try{

while(!Thread.interrupted()){

queue.take().run();//任务的消费者,从PriorityBlockingQueue中取出任务执行

}

}catch(InterruptedExceptione){

}

System.out.println("FinishedPrioritizedTaskConsumer");

}

}

工作原理

PriorityBlockingQueue 是 JDK1.5 的时候出来的一个阻塞队列。但是该队列入队的时候是不会阻塞的,永远会加到队尾。下面我们介绍下它的几个特点:

  • PriorityBlockingQueue 和 ArrayBlockingQueue 一样是基于数组实现的,但后者在初始化时需要指定长度,前者默认长度是 11。
  • 该队列可以说是真正的无界队列,它在队列满的时候会进行扩容,而前面说的无界阻塞队列其实都有有界,只是界限太大可以忽略(最大值是 2147483647)
  • 该队列属于权重队列,可以理解为它可以进行排序,但是排序不是从小到大排或从大到小排,是基于数组的堆结构(具体如何排下面会进行分析)
  • 出队方式和前面的也不同,是根据权重来进行出队,和前面所说队列中那种先进先出或者先进后出方式不同。
  • 其存入的元素必须实现Comparator,或者在创建队列的时候自定义Comparator。

注意:

  1. 堆结构实际上是一种完全二叉树。关于二叉树可以查看 《树、二叉树、二叉搜索树的实现和特性》
  2. 堆又分为大顶堆和小顶堆 。大顶堆中第一个元素肯定是所有元素中最大的,小顶堆中第一个元素是所有元素中最小的。关于二叉堆可以查看《堆和二叉堆的实现和特性》

源码分析

定义

PriorityBlockingQueue的类继承关系如下:

阻塞队列—PriorityBlockingQueue源码分析

其包含的方法定义如下:

阻塞队列—PriorityBlockingQueue源码分析

成员属性

从下面的字段我们可以知道,该队列可以排序,使用显示锁来保证操作的原子性,在空队列时,出队线程会堵塞等。

/**

*默认数组长度

*/

privatestaticfinalintDEFAULT_INITIAL_CAPACITY=11;

/**

*最大达容量,分配时超出可能会出现OutOfMemoryError异常

*/

privatestaticfinalintMAX_ARRAY_SIZE=Integer.MAX_VALUE-8;

/**

*队列,存储我们的元素

*/

privatetransientObject[]queue;

/**

*队列长度

*/

privatetransientintsize;

/**

*比较器,入队进行权重的比较

*/

privatetransientComparator<?superE>comparator;

/**

*显示锁

*/

privatefinalReentrantLocklock;

/**

*空队列时进行线程阻塞的Condition对象

*/

privatefinalConditionnotEmpty;

构造函数

/**

*默认构造,使用长度为11的数组,比较器为空

*/

publicPriorityBlockingQueue(){

this(DEFAULT_INITIAL_CAPACITY,null);

}

/**

*自定义数据长度构造,比较器为空

*/

publicPriorityBlockingQueue(intinitialCapacity){

this(initialCapacity,null);

}

/**

*自定义数组长度,可以自定义比较器

*/

publicPriorityBlockingQueue(intinitialCapacity,

Comparator<?superE>comparator){

if(initialCapacity<1)

thrownewIllegalArgumentException();

this.lock=newReentrantLock();

this.notEmpty=lock.newCondition();

this.comparator=comparator;

this.queue=newObject[initialCapacity];

}

/**

*构造函数,带有初始内容的队列

*/

publicPriorityBlockingQueue(Collection<?extendsE>c){

this.lock=newReentrantLock();

this.notEmpty=lock.newCondition();

booleanheapify=true;//trueifnotknowntobeinheaporder

booleanscreen=true;//trueifmustscreenfornulls

if(cinstanceofSortedSet<?>){

SortedSet<?extendsE>ss=(SortedSet<?extendsE>)c;

this.comparator=(Comparator<?superE>)ss.comparator();

heapify=false;

}

elseif(cinstanceofPriorityBlockingQueue<?>){

PriorityBlockingQueue<?extendsE>pq=

(PriorityBlockingQueue<?extendsE>)c;

this.comparator=(Comparator<?superE>)pq.comparator();

screen=false;

if(pq.getClass()==PriorityBlockingQueue.class)//exactmatch

heapify=false;

}

Object[]a=c.toArray();

intn=a.length;

//Ifc.toArrayincorrectlydoesn'treturnObject[],copyit.

if(a.getClass()!=Object[].class)

a=Arrays.copyOf(a,n,Object[].class);

if(screen&&(n==1||this.comparator!=null)){

for(inti=0;i<n;++i)

if(a[i]==null)

thrownewNullPointerException();

}

this.queue=a;

this.size=n;

if(heapify)

heapify();

}

入队方法

入队方法,下面可以看到 put 方法最终会调用 offer 方法,所以我们只看 offer 方法即可。

offer(E e)

publicvoidput(Ee){

offer(e);//neverneedtoblock

}

publicbooleanoffer(Ee){

//判断是否为空

if(e==null)

thrownewNullPointerException();

//显示锁

finalReentrantLocklock=this.lock;

lock.lock();

//定义临时对象

intn,cap;

Object[]array;

//判断数组是否满了

while((n=size)>=(cap=(array=queue).length))

//数组扩容

tryGrow(array,cap);

try{

//拿到比较器

Comparator<?superE>cmp=comparator;

//判断是否有自定义比较器

if(cmp==null)

//堆上浮

siftUpComparable(n,e,array);

else

//使用自定义比较器进行堆上浮

siftUpUsingComparator(n,e,array,cmp);

//队列长度+1

size=n+1;

//唤醒休眠的出队线程

notEmpty.signal();

}finally{

//释放锁

lock.unlock();

}

returntrue;

}

siftUpComparable(int k, T x, Object[] array)

上浮调整比较器方法的实现

privatestatic<T>voidsiftUpComparable(intk,Tx,Object[]array){

Comparable<?superT>key=(Comparable<?superT>)x;

while(k>0){

//无符号向左移,目的是找到放入位置的父节点

intparent=(k-1)>>>1;

//拿到父节点的值

Objecte=array[parent];

//比较是否大于该元素,不大于就没比较交换

if(key.compareTo((T)e)>=0)

break;

//以下都是元素位置交换

array[k]=e;

k=parent;

}

array[k]=key;

}

根据上面的代码,可以看出这是完全二叉树在进行上浮调整。调整入队的元素,找出最小的,将元素排列有序化。简单理解就是:父节点元素值一定要比它的子节点得小,如果父节点大于子节点了,那就两者位置进行交换。

入队图解

例子:85 添加到二叉堆中(大顶堆)

packagecom.niuh.queue.priority;

importjava.util.Comparator;

importjava.util.concurrent.PriorityBlockingQueue;

/**

*<p>

*PriorityBlockingQueue简单演示demo

*</p>

*/

publicclassTestPriorityBlockingQueue{

publicstaticvoidmain(String[]args)throwsInterruptedException{

//大顶堆

PriorityBlockingQueue<Integer>concurrentLinkedQueue=newPriorityBlockingQueue<Integer>(10,newComparator<Integer>(){

@Override

publicintcompare(Integero1,Integero2){

returno2-o1;

}

});

concurrentLinkedQueue.offer(90);

concurrentLinkedQueue.offer(80);

concurrentLinkedQueue.offer(70);

concurrentLinkedQueue.offer(60);

concurrentLinkedQueue.offer(40);

concurrentLinkedQueue.offer(30);

concurrentLinkedQueue.offer(20);

concurrentLinkedQueue.offer(10);

concurrentLinkedQueue.offer(50);

concurrentLinkedQueue.offer(85);

//输出元素排列

concurrentLinkedQueue.stream().forEach(e->System.out.print(e+""));

//取出元素

Integertake=concurrentLinkedQueue.take();

System.out.println();

concurrentLinkedQueue.stream().forEach(e->System.out.print(e+""));

}

}

阻塞队列—PriorityBlockingQueue源码分析

操作的细节分为两步:

  • 第一步:首先把新元素插入到堆的尾部再说;(新的元素可能是特别大或者特别小,那么要做的一件事情就是重新维护一下堆的所有元素,把新元素挪到这个堆的相应的位置)
  • 第二步:依次向上调整整个堆的结构,就叫 HeapifyUp

阻塞队列—PriorityBlockingQueue源码分析

85 按照上面讲的先插入到堆的尾部,也就是一维数组的尾部,一维数组的尾部的话就上图的位置,因为这是一个完全二叉树,所以它的尾部就是50后面这个结点。插进来之后这个时候就破坏了堆,它的每一个结点都要大于它的儿子的这种属性了,接下来要做的事情就是要把 85 依次地向上浮动,怎么浮动?就是 85 大于它的父亲结点,那么就和父亲结点进行交换,直到走到根如果大于根的话,就和根也进行交换。

阻塞队列—PriorityBlockingQueue源码分析

85 再继续往前走之后,它要和 80 再进行比较,同理可得:也就是说这个结点每次和它的父亲比,如果它大于它的父亲的话就交换,直到它不再大于它的父亲。

阻塞队列—PriorityBlockingQueue源码分析

出队方法

入队列的方法说完后,我们来说说出队列的方法。PriorityBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:

  • E take();
  • E poll();
  • E poll(long timeout, TimeUnit unit);
  • E peek()

poll 和 peek 与上面类似,这里不做说明

take()

出队方法,该方法会阻塞

publicEtake()throwsInterruptedException{

//显示锁

finalReentrantLocklock=this.lock;

//可中断锁

lock.lockInterruptibly();

//结果接收对象

Eresult;

try{

//判断队列是否为空

while((result=dequeue())==null)

//线程阻塞

notEmpty.await();

}finally{

lock.unlock();

}

returnresult;

}

dequeue()

我们再来看看具体出队方法的实现,dequeue方法

privateEdequeue(){

//长度减少1

intn=size-1;

//判断队列中是否有元素

if(n<0)

returnnull;

else{

//队列对象

Object[]array=queue;

//取出第一个元素

Eresult=(E)array[0];

//拿出最后一个元素

Ex=(E)array[n];

//置空

array[n]=null;

Comparator<?superE>cmp=comparator;

if(cmp==null)

//下沉调整

siftDownComparable(0,x,array,n);

else

siftDownUsingComparator(0,x,array,n,cmp);

//成功则减少队列中的元素数量

size=n;

returnresult;

}

总体就是找到父节点与两个子节点中最小的一个节点,然后进行交换位置,不断重复,由上而下的交换。

siftDownComparable(int k, T x, Object[] array, int n)

再来看看下沉比较器方法的实现

privatestatic<T>voidsiftDownComparable(intk,Tx,Object[]array,

intn){

//判断队列长度

if(n>0){

Comparable<?superT>key=(Comparable<?superT>)x;

//找到队列最后一个元素的父节点的索引。

inthalf=n>>>1;//loopwhileanon-leaf

while(k<half){

//拿到k节点下的左子节点

intchild=(k<<1)+1;//assumeleftchildisleast

//取得子节点对应的值

Objectc=array[child];

//取得k右子节点的索引

intright=child+1;

//比较右节点的索引是否小于队列长度和左右子节点的值进行比较

if(right<n&&

((Comparable<?superT>)c).compareTo((T)array[right])>0)

c=array[child=right];

//比较父节点值是否大于子节点

if(key.compareTo((T)c)<=0)

break;

//下面都是元素替换

array[k]=c;

k=child;

}

array[k]=key;

}

}

出队图解

将堆尾元素替换到顶部(即堆顶被替代删除掉)

依次从根部向下调整整个堆的结构(一直到堆尾即可) HeapifyDown

例子:90 从二叉堆中删除(大顶堆)

阻塞队列—PriorityBlockingQueue源码分析

总结

PriorityBlockingQueue 真的是个神奇的队列,可以实现优先出队。最特别的是它只有一个锁,入队操作永远成功,而出队只有在空队列的时候才会进行线程阻塞。可以说有一定的应用场景吧,比如:有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 阻塞队列—PriorityBlockingQueue源码分析 https://www.kuaiidc.com/115806.html

相关文章

发表评论
暂无评论