前言
CountDownLatch是多线程中一个比较重要的概念,它可以使得一个或多个线程等待其他线程执行完毕之后再执行。它内部有一个计数器和一个阻塞队列,每当一个线程调用countDown()方法后,计数器的值减少1。当计数器的值不为0时,调用await()方法的线程将会被加入到阻塞队列,一直阻塞到计数器的值为0。
常用方法
- publicclassCountDownLatch{
- //构造一个值为count的计数器
- publicCountDownLatch(intcount);
- //阻塞当前线程直到计数器为0
- publicvoidawait()throwsInterruptedException;
- //在单位为unit的timeout时间之内阻塞当前线程
- publicbooleanawait(longtimeout,TimeUnitunit);
- //将计数器的值减1,当计数器的值为0时,阻塞队列内的线程才可以运行
- publicvoidcountDown();
- }
下面给一个简单的示例:
- packagecom.yang.testCountDownLatch;
- importjava.util.concurrent.CountDownLatch;
- publicclassMain{
- privatestaticfinalintNUM=3;
- publicstaticvoidmain(String[]args)throwsInterruptedException{
- CountDownLatchlatch=newCountDownLatch(NUM);
- for(inti=0;i<NUM;i++){
- newThread(()->{
- try{
- Thread.sleep(2000);
- System.out.println(Thread.currentThread().getName()+"运行完毕");
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }finally{
- latch.countDown();
- }
- }).start();
- }
- latch.await();
- System.out.println("主线程运行完毕");
- }
- }
输出如下:
看得出来,主线程会等到3个子线程执行完毕才会执行。
原理解析
类图
可以看得出来,CountDownLatch里面有一个继承AQS的内部类Sync,其实是AQS来支持CountDownLatch的各项操作的。
CountDownLatch(int count)
new CountDownLatch(int count)用来创建一个AQS同步队列,并将计数器的值赋给了AQS的state。
- publicCountDownLatch(intcount){
- if(count<0)thrownewIllegalArgumentException("count<0");
- this.sync=newSync(count);
- }
- privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{
- Sync(intcount){
- setState(count);
- }
- }
countDown()
countDown()方法会对计数器进行减1的操作,当计数器值为0时,将会唤醒在阻塞队列中等待的所有线程。其内部调用了Sync的releaseShared(1)方法
- publicvoidcountDown(){
- sync.releaseShared(1);
- }
- publicfinalbooleanreleaseShared(intarg){
- if(tryReleaseShared(arg)){
- //此时计数器的值为0,唤醒所有被阻塞的线程
- doReleaseShared();
- returntrue;
- }
- returnfalse;
- }
tryReleaseShared(arg)内部使用了自旋+CAS操将计数器的值减1,当减为0时,方法返回true,将会调用doReleaseShared()方法。对CAS机制不了解的同学,可以先参考我的另外一篇文章浅探CAS实现原理
- protectedbooleantryReleaseShared(intreleases){
- //自旋
- for(;;){
- intc=getState();
- if(c==0)
- //此时计数器的值已经为0了,其他线程早就执行完毕了,当前线程也已经再执行了,不需要再次唤醒了
- returnfalse;
- intnextc=c-1;
- //使用CAS机制,将state的值变为state-1
- if(compareAndSetState(c,nextc))
- returnnextc==0;
- }
- }
doReleaseShared()是AQS中的方法,该方法会唤醒队列中所有被阻塞的线程。
- privatevoiddoReleaseShared(){
- for(;;){
- Nodeh=head;
- if(h!=null&&h!=tail){
- intws=h.waitStatus;
- if(ws==Node.SIGNAL){
- if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
- continue;//looptorecheckcases
- unparkSuccessor(h);
- }
- elseif(ws==0&&
- !compareAndSetWaitStatus(h,0,Node.PROPAGATE))
- continue;//looponfailedCAS
- }
- if(h==head)//loopifheadchanged
- break;
- }
- }
这段方法比较难理解,会另外篇幅介绍。这里只要认为该段方法会唤醒所有因调用await()方法而阻塞的线程。
await()
当计数器的值不为0时,该方法会将当前线程加入到阻塞队列中,并把当前线程挂起。
- publicvoidawait()throwsInterruptedException{
- sync.acquireSharedInterruptibly(1);
- }
同样是委托内部类Sync,调用其
acquireSharedInterruptibly()方法
- publicfinalvoidacquireSharedInterruptibly(intarg)
- throwsInterruptedException{
- if(Thread.interrupted())
- thrownewInterruptedException();
- if(tryAcquireShared(arg)<0)
- doAcquireSharedInterruptibly(arg);
- }
接着看Sync内的tryAcquireShared()方法,如果当前计数器的值为0,则返回1,最终将导致await()不会将线程阻塞。如果当前计数器的值不为0,则返回-1。
- protectedinttryAcquireShared(intacquires){
- return(getState()==0)?1:-1;
- }
tryAcquireShared方法返回一个负值时,将会调用AQS中的
doAcquireSharedInterruptibly()方法,将调用await()方法的线程加入到阻塞队列中,并将此线程挂起。
- privatevoiddoAcquireSharedInterruptibly(intarg)
- throwsInterruptedException{
- //将当前线程构造成一个共享模式的节点,并加入到阻塞队列中
- finalNodenode=addWaiter(Node.SHARED);
- booleanfailed=true;
- try{
- for(;;){
- finalNodep=node.predecessor();
- if(p==head){
- intr=tryAcquireShared(arg);
- if(r>=0){
- setHeadAndPropagate(node,r);
- p.next=null;//helpGC
- failed=false;
- return;
- }
- }
- if(shouldParkAfterFailedAcquire(p,node)&&
- parkAndCheckInterrupt())
- thrownewInterruptedException();
- }
- }finally{
- if(failed)
- cancelAcquire(node);
- }
- }
同样,以上的代码位于AQS中,在没有了解AQS结构的情况下去理解上述代码,有些困难,关于AQS源码,会另开篇幅介绍。
使用场景
CountDownLatch的使用场景很广泛,一般用于分头做某些事,再汇总的情景。例如:
数据报表:当前的微服务架构十分流行,大多数项目都会被拆成若干的子服务,那么报表服务在进行统计时,需要向各个服务抽取数据。此时可以创建与服务数相同的线程数,交由线程池处理,每个线程去对应服务中抽取数据,注意需要在finally语句块中进行countDown()操作。主线程调用await()阻塞,直到所有数据抽取成功,最后主线程再进行对数据的过滤组装等,形成直观的报表。
风险评估:客户端的一个同步请求查询用户的风险等级,服务端收到请求后会请求多个子系统获取数据,然后使用风险评估规则模型进行风险评估。如果使用单线程去完成这些操作,这个同步请求超时的可能性会很大,因为服务端请求多个子系统是依次排队的,请求子系统获取数据的时间是线性累加的。此时可以使用CountDownLatch,让多个线程并发请求多个子系统,当获取到多个子系统数据之后,再进行风险评估,这样请求子系统获取数据的时间就等于最耗时的那个请求的时间,可以大大减少处理时间。
原文地址:https://www.toutiao.com/i6919384508672295436/?group_id=6919384508672295436