功能简介
闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态【CPJ 3.4.2】。闭锁的作用相当于一扇门∶ 在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如∶
- 确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示"资源R已经被初始化",而所有需要 R 的操作都必须先在这个闭锁上等待。
- 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S 时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖 S 的服务才能继续执行。
- 等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。
CountDownLatch.jpg
CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而 await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。
使用案例
TestHarness 中给出了闭锁的两种常见用法。TestHarness 创建一定数量的线程,利用它们并发地执行指定的任务。它使用两个闭锁,分别表示"起始门(Starting Gate)"和"结束门(Ending Gate)"。起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量。每个工作线程首先要做的就是在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情是将调用结束门的 countDown 方法减1,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间。
- publicclassTestHarness{
- publiclongtimeTasks(intnThreads,finalRunnabletask)throwsInterruptedException{
- finalCountDownLatchstartGate=newCountDownLatch(1);
- finalCountDownLatchendGate=newCountDownLatch(nThreads);
- for(inti=0;i<nThreads;i++){
- Threadt=newThread(()->{
- try{
- startGate.await();
- try{
- task.run();
- }finally{
- endGate.countDown();
- }
- }catch(InterruptedExceptionignored){
- }
- });
- t.start();
- }
- longstart=System.nanoTime();
- startGate.countDown();
- endGate.await();
- longend=System.nanoTime();
- returnend-start;
- }
- publicstaticvoidmain(String[]args)throwsInterruptedException{
- TestHarnesstestHarness=newTestHarness();
- AtomicIntegernum=newAtomicInteger(0);
- longtime=testHarness.timeTasks(10,()->System.out.println(num.incrementAndGet()));
- System.out.println("costtime:"+time+"ms");
- }
- }
- //输出结果
- 1
- 10
- 9
- 8
- 7
- 5
- 6
- 4
- 3
- 2
- costtime:2960900ms
为什么要在 TestHarness 中使用闭锁,而不是在线程创建后就立即启动? 或许,我们希望测试 n 个线程并发执行某个任务时需要的时间。如果在创建线程后立即启动它们,那么先启动的线程将"领先"后启动的线程,并且活跃线程数量会随着时间的推移而增加或减少,竞争程度也在不断发生变化。启动门将使得主线程能够实时释放所有工作线程,而结束门则使主线程能够等待最后一个线程执行完成,而不是顺序地等待每个线程执行完成。
使用总结
CountDownLatch 是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch 使用完毕后,它不能再次被使用。
源码分析
代码分析
CountDownLatch 在底层还是采用 AbstractQueuedSynchronizer 实现。
- CountDownLatchstartGate=**new**CountDownLatch(1);
我们先看它的构造方法, 创建了一个 sync 对象。
- publicCountDownLatch(intcount){
- if(count<0)thrownewIllegalArgumentException("count<0");
- this.sync=newSync(count);
- }
Sync 是 AbstractQueuedSynchronizer 的一个实现, 按照字面意思我们可以猜到它是公平方式实现。
- privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{
- privatestaticfinallongserialVersionUID=4982264981922014374L;
- //构造方法
- Sync(intcount){
- setState(count);
- }
- //获取资源数
- intgetCount(){
- returngetState();
- }
- //获取锁
- protectedinttryAcquireShared(intacquires){
- return(getState()==0)?1:-1;
- }
- //释放锁
- protectedbooleantryReleaseShared(intreleases){
- //Decrementcount;signalwhentransitiontozero
- for(;;){
- intc=getState();
- if(c==0)
- returnfalse;
- intnextc=c-1;
- //CAS解锁
- if(compareAndSetState(c,nextc))
- returnnextc==0;
- }
- }
- }
在 await 方法中如果存在计算值, 那么当前线程将进入 AQS 队列生成 Node 节点, 线程进入阻塞状态。
- publicvoidawait()throwsInterruptedException{
- sync.acquireSharedInterruptibly(1);
- }
其实主要是获取共享锁。
- publicfinalvoidacquireSharedInterruptibly(intarg)
- throwsInterruptedException{
- if(Thread.interrupted())
- thrownewInterruptedException();
- if(tryAcquireShared(arg)<0)
- doAcquireSharedInterruptibly(arg);
- }
CountDownLatch.Sync 实现了 tryAcquireShared 方法 ,如果 getState() == 0 返回 1 , 否则返回 -1. 也就是说创建 CountDownLatch 实例后再执行 await 方法将继续调用 doAcquireSharedInterruptibly(arg);
- //是否可获取共享锁
- protectedinttryAcquireShared(intacquires){
- return(getState()==0)?1:-1;
- }
- //尝试获取锁,或者入队
- privatevoiddoAcquireSharedInterruptibly(intarg)
- throwsInterruptedException{
- finalNodenode=addWaiter(Node.SHARED);
- booleanfailed=true;
- try{
- for(;;){
- finalNodep=node.predecessor();
- if(p==head){
- intr=tryAcquireShared(arg);
- if(r>=0){
- setHeadAndPropagate(node,r);
- p.next=null;//helpGC
- failed=false;
- return;
- }
- }
- if(shouldParkAfterFailedAcquire(p,node)&&
- parkAndCheckInterrupt())
- thrownewInterruptedException();
- }
- }finally{
- if(failed)
- cancelAcquire(node);
- }
- }
在 countDown 方法如果存在等待的线程, 将对其进行唤醒. 或者减少 CountDownLatch 资源数。
- publicvoidcountDown(){
- sync.releaseShared(1);
- }
通过 releaseShared 对共享锁进行解锁。
- publicfinalbooleanreleaseShared(intarg){
- if(tryReleaseShared(arg)){
- doReleaseShared();
- returntrue;
- }
- returnfalse;
- }
最终会调用 doReleaseShared 唤醒 AQS 中的头节点。
- privatevoiddoReleaseShared(){
- /*
- *Ensurethatareleasepropagates,evenifthereareother
- *in-progressacquires/releases.Thisproceedsintheusual
- *wayoftryingtounparkSuccessorofheadifitneeds
- *signal.Butifitdoesnot,statusissettoPROPAGATEto
- *ensurethatuponrelease,propagationcontinues.
- *Additionally,wemustloopincaseanewnodeisadded
- *whilewearedoingthis.Also,unlikeotherusesof
- *unparkSuccessor,weneedtoknowifCAStoresetstatus
- *fails,ifsorechecking.
- */
- for(;;){
- Nodeh=head;
- if(h!=null&&h!=tail){
- intws=h.waitStatus;
- if(ws==Node.SIGNAL){
- if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
- continue;//looptorecheckcases
- unparkSuccessor(h);
- }
- elseif(ws==0&&
- !compareAndSetWaitStatus(h,0,Node.PROPAGATE))
- continue;//looponfailedCAS
- }
- if(h==head)//loopifheadchanged
- break;
- }
- }
详细流程如下图:
源码流程图
参考资料
《Java 并发编程实战》
https://www.cnblogs.com/Lee_xy_z/p/10470181.html
原文链接:https://mp.weixin.qq.com/s/7rn6NCPqIcGiDs3cVuVm2g