聊聊 CountDownLatch 闭锁源码分析

2025-05-29 0 89

聊聊 CountDownLatch 闭锁源码分析

功能简介

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态【CPJ 3.4.2】。闭锁的作用相当于一扇门∶ 在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如∶

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示"资源R已经被初始化",而所有需要 R 的操作都必须先在这个闭锁上等待。
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S 时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖 S 的服务才能继续执行。
  • 等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。

聊聊 CountDownLatch 闭锁源码分析

CountDownLatch.jpg

CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而 await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

使用案例

TestHarness 中给出了闭锁的两种常见用法。TestHarness 创建一定数量的线程,利用它们并发地执行指定的任务。它使用两个闭锁,分别表示"起始门(Starting Gate)"和"结束门(Ending Gate)"。起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量。每个工作线程首先要做的就是在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情是将调用结束门的 countDown 方法减1,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间。

  1. publicclassTestHarness{
  2. publiclongtimeTasks(intnThreads,finalRunnabletask)throwsInterruptedException{
  3. finalCountDownLatchstartGate=newCountDownLatch(1);
  4. finalCountDownLatchendGate=newCountDownLatch(nThreads);
  5. for(inti=0;i<nThreads;i++){
  6. Threadt=newThread(()->{
  7. try{
  8. startGate.await();
  9. try{
  10. task.run();
  11. }finally{
  12. endGate.countDown();
  13. }
  14. }catch(InterruptedExceptionignored){
  15. }
  16. });
  17. t.start();
  18. }
  19. longstart=System.nanoTime();
  20. startGate.countDown();
  21. endGate.await();
  22. longend=System.nanoTime();
  23. returnend-start;
  24. }
  25. publicstaticvoidmain(String[]args)throwsInterruptedException{
  26. TestHarnesstestHarness=newTestHarness();
  27. AtomicIntegernum=newAtomicInteger(0);
  28. longtime=testHarness.timeTasks(10,()->System.out.println(num.incrementAndGet()));
  29. System.out.println("costtime:"+time+"ms");
  30. }
  31. }
  32. //输出结果
  33. 1
  34. 10
  35. 9
  36. 8
  37. 7
  38. 5
  39. 6
  40. 4
  41. 3
  42. 2
  43. costtime:2960900ms

为什么要在 TestHarness 中使用闭锁,而不是在线程创建后就立即启动? 或许,我们希望测试 n 个线程并发执行某个任务时需要的时间。如果在创建线程后立即启动它们,那么先启动的线程将"领先"后启动的线程,并且活跃线程数量会随着时间的推移而增加或减少,竞争程度也在不断发生变化。启动门将使得主线程能够实时释放所有工作线程,而结束门则使主线程能够等待最后一个线程执行完成,而不是顺序地等待每个线程执行完成。

使用总结

CountDownLatch 是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch 使用完毕后,它不能再次被使用。

源码分析

代码分析

CountDownLatch 在底层还是采用 AbstractQueuedSynchronizer 实现。

  1. CountDownLatchstartGate=**new**CountDownLatch(1);

我们先看它的构造方法, 创建了一个 sync 对象。

  1. publicCountDownLatch(intcount){
  2. if(count<0)thrownewIllegalArgumentException("count<0");
  3. this.sync=newSync(count);
  4. }

Sync 是 AbstractQueuedSynchronizer 的一个实现, 按照字面意思我们可以猜到它是公平方式实现。

  1. privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{
  2. privatestaticfinallongserialVersionUID=4982264981922014374L;
  3. //构造方法
  4. Sync(intcount){
  5. setState(count);
  6. }
  7. //获取资源数
  8. intgetCount(){
  9. returngetState();
  10. }
  11. //获取锁
  12. protectedinttryAcquireShared(intacquires){
  13. return(getState()==0)?1:-1;
  14. }
  15. //释放锁
  16. protectedbooleantryReleaseShared(intreleases){
  17. //Decrementcount;signalwhentransitiontozero
  18. for(;;){
  19. intc=getState();
  20. if(c==0)
  21. returnfalse;
  22. intnextc=c-1;
  23. //CAS解锁
  24. if(compareAndSetState(c,nextc))
  25. returnnextc==0;
  26. }
  27. }
  28. }

在 await 方法中如果存在计算值, 那么当前线程将进入 AQS 队列生成 Node 节点, 线程进入阻塞状态。

  1. publicvoidawait()throwsInterruptedException{
  2. sync.acquireSharedInterruptibly(1);
  3. }

其实主要是获取共享锁。

  1. publicfinalvoidacquireSharedInterruptibly(intarg)
  2. throwsInterruptedException{
  3. if(Thread.interrupted())
  4. thrownewInterruptedException();
  5. if(tryAcquireShared(arg)<0)
  6. doAcquireSharedInterruptibly(arg);
  7. }

CountDownLatch.Sync 实现了 tryAcquireShared 方法 ,如果 getState() == 0 返回 1 , 否则返回 -1. 也就是说创建 CountDownLatch 实例后再执行 await 方法将继续调用 doAcquireSharedInterruptibly(arg);

  1. //是否可获取共享锁
  2. protectedinttryAcquireShared(intacquires){
  3. return(getState()==0)?1:-1;
  4. }
  5. //尝试获取锁,或者入队
  6. privatevoiddoAcquireSharedInterruptibly(intarg)
  7. throwsInterruptedException{
  8. finalNodenode=addWaiter(Node.SHARED);
  9. booleanfailed=true;
  10. try{
  11. for(;;){
  12. finalNodep=node.predecessor();
  13. if(p==head){
  14. intr=tryAcquireShared(arg);
  15. if(r>=0){
  16. setHeadAndPropagate(node,r);
  17. p.next=null;//helpGC
  18. failed=false;
  19. return;
  20. }
  21. }
  22. if(shouldParkAfterFailedAcquire(p,node)&&
  23. parkAndCheckInterrupt())
  24. thrownewInterruptedException();
  25. }
  26. }finally{
  27. if(failed)
  28. cancelAcquire(node);
  29. }
  30. }

在 countDown 方法如果存在等待的线程, 将对其进行唤醒. 或者减少 CountDownLatch 资源数。

  1. publicvoidcountDown(){
  2. sync.releaseShared(1);
  3. }

通过 releaseShared 对共享锁进行解锁。

  1. publicfinalbooleanreleaseShared(intarg){
  2. if(tryReleaseShared(arg)){
  3. doReleaseShared();
  4. returntrue;
  5. }
  6. returnfalse;
  7. }

最终会调用 doReleaseShared 唤醒 AQS 中的头节点。

  1. privatevoiddoReleaseShared(){
  2. /*
  3. *Ensurethatareleasepropagates,evenifthereareother
  4. *in-progressacquires/releases.Thisproceedsintheusual
  5. *wayoftryingtounparkSuccessorofheadifitneeds
  6. *signal.Butifitdoesnot,statusissettoPROPAGATEto
  7. *ensurethatuponrelease,propagationcontinues.
  8. *Additionally,wemustloopincaseanewnodeisadded
  9. *whilewearedoingthis.Also,unlikeotherusesof
  10. *unparkSuccessor,weneedtoknowifCAStoresetstatus
  11. *fails,ifsorechecking.
  12. */
  13. for(;;){
  14. Nodeh=head;
  15. if(h!=null&&h!=tail){
  16. intws=h.waitStatus;
  17. if(ws==Node.SIGNAL){
  18. if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
  19. continue;//looptorecheckcases
  20. unparkSuccessor(h);
  21. }
  22. elseif(ws==0&&
  23. !compareAndSetWaitStatus(h,0,Node.PROPAGATE))
  24. continue;//looponfailedCAS
  25. }
  26. if(h==head)//loopifheadchanged
  27. break;
  28. }
  29. }

详细流程如下图:

源码流程图

聊聊 CountDownLatch 闭锁源码分析

CountDownLatch 闭锁源码分析.png

参考资料

《Java 并发编程实战》

https://www.cnblogs.com/Lee_xy_z/p/10470181.html

原文链接:https://mp.weixin.qq.com/s/7rn6NCPqIcGiDs3cVuVm2g

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 聊聊 CountDownLatch 闭锁源码分析 https://www.kuaiidc.com/108042.html

相关文章

发表评论
暂无评论