前言
ArrayBlockingQueue 由数组支持的有界阻塞队列,队列基于数组实现,容量大小在创建 ArrayBlockingQueue 对象时已经定义好。 此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁,默认采用非公平锁。其数据结构如下图:
- 注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程和请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁
队列创建
BlockingQueue<String>blockingQueue=newArrayBlockingQueue<>(5);
应用场景
在线程池中有比较多的应用,生产者消费者场景。
- 先进先出队列(队列头的是最先进队的元素;队列尾的是最后进队的元素)
- 有界队列(即初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作)
- 队列不支持空元素
- 公平性 (fairness)可以在构造函数中指定。
此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过在构造函数将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。
工作原理
ArrayBlockingQueue是对BlockingQueue的一个数组实现,它使用一把全局的锁并行对queue的读写操作,同时使用两个Condition阻塞容量为空时的取操作和容量满时的写操作。
基于 ReentrantLock 保证线程安全,根据 Condition 实现队列满时的阻塞。
finalReentrantLocklock;
privatefinalConditionnotEmpty;
privatefinalConditionnotFull;
Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细地对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。
notEmpty表示“锁的非空条件”。当某线程想从队列中取数据时,而此时又没有数据,则该线程通过notEmpty.await()进行等待;当其它线程向队列中插入了元素之后,就调用notEmpty.signal()唤醒“之前通过notEmpty.await()进入等待状态的线程”。 同理,notFull表示“锁的满条件”。当某线程想向队列中插入元素,而此时队列已满时,该线程等待;当其它线程从队列中取出元素之后,就唤醒该等待的线程。
- 试图向已满队列中放入元素会导致放入操作受阻塞,直到BlockingQueue里有新的唤空间才会被醒继续操作; 试图从空队列中检索元素将导致类似阻塞,直到BlocingkQueue进了新货才会被唤醒。
源码分析
以下源码分析基于JDK1.8
定义
ArrayBlockingQueue的类继承关系如下:
其包含的方法定义如下:
成员属性
/**真正存入数据的数组*/
finalObject[]items;
/**take,poll,peekorremove的下一个索引*/
inttakeIndex;
/**put,offer,oradd下一个索引*/
intputIndex;
/**队列中元素个数*/
intcount;
/**可重入锁*/
finalReentrantLocklock;
/**如果数组是空的,在该Condition上等待*/
privatefinalConditionnotEmpty;
/**如果数组是满的,在该Condition上等待*/
privatefinalConditionnotFull;
/**遍历器实现*/
transientItrsitrs=null;
构造函数
/**
*构造函数,设置队列的初始容量
*/
publicArrayBlockingQueue(intcapacity){
this(capacity,false);
}
/**
*构造函数,
*capacityandthespecifiedaccesspolicy.
*
*@paramcapacity设置数组大小
*@paramfair设置是否为公平锁
*@throwsIllegalArgumentExceptionif{@codecapacity<1}
*/
publicArrayBlockingQueue(intcapacity,booleanfair){
if(capacity<=0)
thrownewIllegalArgumentException();
this.items=newObject[capacity];
//是否为公平锁,如果是的话,那么先到的线程先获得锁对象
//否则,由操作系统调度由哪个线程获得锁,一般为false,性能会比较高
lock=newReentrantLock(fair);
notEmpty=lock.newCondition();
notFull=lock.newCondition();
}
/**
*构造函数,带有初始内容的队列
*/
publicArrayBlockingQueue(intcapacity,booleanfair,
Collection<?extendsE>c){
this(capacity,fair);
finalReentrantLocklock=this.lock;
//加锁的目的是为了其他CPU能够立即看到修改
//加锁和解锁底层都是CAS,会强制修改写回主存,对其他CPU可见
lock.lock();//要给数组设置内容,先上锁
try{
inti=0;
try{
for(Ee:c){
checkNotNull(e);
items[i++]=e;//依次拷贝内容
}
}catch(ArrayIndexOutOfBoundsExceptionex){
thrownewIllegalArgumentException();
}
count=i;
putIndex=(i==capacity)?0:i;//如果putIndex大于数组大小,那么从0重写开始
}finally{
lock.unlock();//最后一定要释放锁
}
}
入队方法
add / offer / put,这三个方法都是往队列中添加元素,说明如下:
- add方法依赖于offer方法,如果队列满了则抛出异常,否则添加成功返回true;
- offer方法有两个重载版本,只有一个参数的版本,如果队列满了就返回false,否则加入到队列中,返回true,add方法就是调用此版本的offer方法;另一个带时间参数的版本,如果队列满了则等待,可指定等待的时间,如果这期间中断了则抛出异常,如果等待超时了则返回false,否则加入到队列中返回true;
- put方法跟带时间参数的offer方法逻辑一样,不过没有等待的时间限制,会一直等待直到队列有空余位置了,再插入到队列中,返回true
/**
*添加一个元素,其实super.add里面调用了offer方法
*/
publicbooleanadd(Ee){
returnsuper.add(e);
}
/**
*加入成功返回true,否则返回false
*/
publicbooleanoffer(Ee){
//创建插入的元素是否为null,是的话抛出NullPointerException异常
checkNotNull(e);
//获取“该阻塞队列的独占锁”
finalReentrantLocklock=this.lock;
lock.lock();//上锁
try{
//如果队列已满,则返回false。
if(count==items.length)//超过数组的容量
returnfalse;
else{
//如果队列未满,则插入e,并返回true。
enqueue(e);
returntrue;
}
}finally{
//释放锁
lock.unlock();
}
}
/**
*如果队列已满的话,就会等待
*/
publicvoidput(Ee)throwsInterruptedException{
checkNotNull(e);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();//和lock方法的区别是让它在阻塞时可以抛出异常跳出
try{
while(count==items.length)
notFull.await();//这里就是阻塞了,要注意:如果运行到这里,那么它会释放上面的锁,一直等到notify
enqueue(e);
}finally{
lock.unlock();
}
}
/**
*带有超时事件的插入方法,unit表示是按秒、分、时哪一种
*/
publicbooleanoffer(Ee,longtimeout,TimeUnitunit)
throwsInterruptedException{
checkNotNull(e);
longnanos=unit.toNanos(timeout);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
while(count==items.length){
if(nanos<=0)
returnfalse;
nanos=notFull.awaitNanos(nanos);//带有超时等待的阻塞方法
}
enqueue(e);//入队
returntrue;
}finally{
lock.unlock();
}
}
出队方法
poll / take / peek,这几个方法都是获取队列顶的元素,具体说明如下:
- poll方法有两个重载版本,第一个版本,如果队列是空的,返回null,否则移除并返回队列头部元素;另一个带时间参数的版本,如果栈为空则等待,可以指定等待的时间,如果等待超时了则返回null,如果被中断了则抛出异常,否则移除并返回栈顶元素
- take方法同带时间参数的poll方法,但是不能指定等待时间,会一直等待直到队列中有元素为止,然后移除并返回栈顶元素
- peek方法只是返回队列头部元素,不移除
//实现的方法,如果当前队列为空,返回null
publicEpoll(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
return(count==0)?null:dequeue();
}finally{
lock.unlock();
}
}
//实现的方法,如果当前队列为空,一直阻塞
publicEtake()throwsInterruptedException{
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
while(count==0)
notEmpty.await();//队列为空,阻塞方法
returndequeue();
}finally{
lock.unlock();
}
}
//带有超时事件的取元素方法,否则返回null
publicEpoll(longtimeout,TimeUnitunit)throwsInterruptedException{
longnanos=unit.toNanos(timeout);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
while(count==0){
if(nanos<=0)
returnnull;
nanos=notEmpty.awaitNanos(nanos);//超时等待
}
returndequeue();//取得元素
}finally{
lock.unlock();
}
}
//只是看一个队列最前面的元素,取出是不擅长队列中原来的元素,队列为空时返回null
publicEpeek(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
returnitemAt(takeIndex);//队列为空时返回null
}finally{
lock.unlock();
}
}
删除元素方法
remove / clear /drainT,这三个方法用于从队列中移除元素,具体说明如下:
- remove方法用于移除某个元素,如果栈为空或者没有找到该元素则返回false,否则从栈中移除该元素;移除时,如果该元素位于栈顶则直接移除,如果位于栈中间,则需要将该元素后面的其他元素往前面挪动,移除后需要唤醒因为栈满了而阻塞的线程
- clear方法用于整个栈,同时将takeIndex置为putIndex,保证栈中的元素先进先出;最后会唤醒最多count个线程,因为正常一个线程插入一个元素,如果唤醒超过count个线程,可能导致部分线程因为栈满了又再次被阻塞
- drainTo方法有两个重载版本,一个是不带个数,将所有的元素都移除并拷贝到指定的集合中;一个带个数,将指定个数的元素移除并拷贝到指定的集合中,两者的底层实现都是同一个方法。移除后需要重置takeIndex和count,并唤醒最多移除个数的因为栈满而阻塞的线程。
/**
*从队列中删除一个元素的方法。删除成功返回true,否则返回false
*/
publicbooleanremove(Objecto){
if(o==null)returnfalse;
finalObject[]items=this.items;
finalReentrantLocklock=this.lock;
lock.lock();
try{
if(count>0){
finalintputIndex=this.putIndex;
inti=takeIndex;
//从takeIndex开始往后遍历直到等于putIndex
do{
if(o.equals(items[i])){
removeAt(i);//真正删除的方法
returntrue;
}
//走到数组末尾了又从头开始,put时也按照这个规则来
if(++i==items.length)
i=0;
}while(i!=putIndex);//一直不断的循环取出来做判断
}
//如果数组为空,返回false
returnfalse;
}finally{
lock.unlock();
}
}
/**
*指定删除索引上的元素.
*/
voidremoveAt(finalintremoveIndex){
//assertlock.getHoldCount()==1;
//assertitems[removeIndex]!=null;
//assertremoveIndex>=0&&removeIndex<items.length;
finalObject[]items=this.items;
if(removeIndex==takeIndex){
//如果移除的就是栈顶的元素
items[takeIndex]=null;
if(++takeIndex==items.length)
takeIndex=0;
//元素个数减1
count–;
if(itrs!=null)
itrs.elementDequeued();
}else{
//an"interior"remove
//如果移除的是栈中间的某个元素,需要将该元素后面的元素往前挪动
finalintputIndex=this.putIndex;
for(inti=removeIndex;;){
intnext=i+1;
//到数组末尾了,从头开始
if(next==items.length)
next=0;
if(next!=putIndex){
//将后面一个元素复制到前面来
items[i]=items[next];
i=next;
}else{
//如果下一个元素的索引等于putIndex,说明i就是栈中最后一个元素了,直接将该元素置为null
items[i]=null;
//重置putIndex为i
this.putIndex=i;
break;
}
}
count–;
if(itrs!=null)
//通知itrs节点移除了
itrs.removedAt(removeIndex);
}
//唤醒因为栈满了而等待的线程
notFull.signal();//有一个元素删除成功,那肯定队列不满
}
/**
*清空队列
*/
publicvoidclear(){
finalObject[]items=this.items;
finalReentrantLocklock=this.lock;
lock.lock();
try{
intk=count;
if(k>0){
finalintputIndex=this.putIndex;
inti=takeIndex;
//从takeIndex开始遍历直到i等于putIndex,将数组元素置为null
do{
items[i]=null;
if(++i==items.length)
i=0;
}while(i!=putIndex);
//注意此处没有将这两个index置为0,只是让他们相等,因为只要相等就可以实现栈先进先出了
takeIndex=putIndex;
count=0;
if(itrs!=null)
itrs.queueIsEmpty();
//如果有因为栈满了而等待的线程,则将其唤醒
//注意这里没有使用signalAll而是通过for循环来signal多次,单纯从唤醒线程来看是可以使用signalAll的,效果跟这里的for循环是一样的
//如果有等待的线程,说明count就是当前线程的最大容量了,这里清空了,最多只能putcount次,一个线程只能put1次,只唤醒最多count个线程就避免了
//线程被唤醒后再次因为栈满了而阻塞
for(;k>0&&lock.hasWaiters(notFull);k–)
notFull.signal();
}
}finally{
lock.unlock();
}
}
/**
*取出所有元素到集合
*/
publicintdrainTo(Collection<?superE>c){
returndrainTo(c,Integer.MAX_VALUE);
}
/**
*取出所有元素到集合
*/
publicintdrainTo(Collection<?superE>c,intmaxElements){
//校验参数合法
checkNotNull(c);
if(c==this)
thrownewIllegalArgumentException();
if(maxElements<=0)
return0;
finalObject[]items=this.items;
finalReentrantLocklock=this.lock;
lock.lock();
try{
//取两者之间的最小值
intn=Math.min(maxElements,count);
inttake=takeIndex;
inti=0;
try{
//从takeIndex开始遍历,取出元素然后添加到c中,直到满足个数要求为止
while(i<n){
@SuppressWarnings("unchecked")
Ex=(E)items[take];
c.add(x);
items[take]=null;
if(++take==items.length)
take=0;
i++;
}
returnn;
}finally{
//Restoreinvariantsevenifc.add()threw
if(i>0){
//取完了,修改count减去i
count-=i;
takeIndex=take;
if(itrs!=null){
if(count==0)
//通知itrs栈空了
itrs.queueIsEmpty();
elseif(i>take)
//说明take中间变成0了,通知itrs
itrs.takeIndexWrapped();
}
//唤醒在因为栈满而等待的线程,最多唤醒i个,同上避免线程被唤醒了因为栈又满了而阻塞
for(;i>0&&lock.hasWaiters(notFull);i–)
notFull.signal();
}
}
}finally{
lock.unlock();
}
}
iterator / Itr / Itrs
Itr和Itrs都是ArrayBlockingQueue的两个内部类,如下:
iterator方法返回一个迭代器实例,用于实现for循环遍历和部分Collection接口,该方法的实现如下:
publicIterator<E>iterator(){
returnnewItr();
}
Itr(){
//assertlock.getHoldCount()==0;
lastRet=NONE;
finalReentrantLocklock=ArrayBlockingQueue.this.lock;
lock.lock();
try{
if(count==0){
//NONE和DETACHED都是常量
cursor=NONE;
nextIndex=NONE;
prevTakeIndex=DETACHED;
}else{
//初始化各属性
finalinttakeIndex=ArrayBlockingQueue.this.takeIndex;
prevTakeIndex=takeIndex;
nextItem=itemAt(nextIndex=takeIndex);
cursor=incCursor(takeIndex);
if(itrs==null){
itrs=newItrs(this);
}else{
//初始化Itrs,将当前线程注册到Itrs
itrs.register(this);//inthisorder
itrs.doSomeSweeping(false);
}
prevCycles=itrs.cycles;
//asserttakeIndex>=0;
//assertprevTakeIndex==takeIndex;
//assertnextIndex>=0;
//assertnextItem!=null;
}
}finally{
lock.unlock();
}
}
Itrs(Itrinitial){
register(initial);
}
//根据index计算cursor
privateintincCursor(intindex){
//assertlock.getHoldCount()==1;
if(++index==items.length)
index=0;
if(index==putIndex)
index=NONE;
returnindex;
}
/**
*创建一个新的Itr实例时,会调用此方法将该实例添加到Node链表中
*/
voidregister(Itritr){
//创建一个新节点将其插入到head节点的前面
head=newNode(itr,head);
}
小结
ArrayBlockingQueue是一个阻塞队列,内部由ReentrantLock来实现线程安全,由Condition的await和signal来实现等待唤醒的功能。它的数据结构是数组,准确地说是一个循环数组(可以类比一个圆环),所有的下标在到达最大长度时自动从0继续开始。
PS:以上代码提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git