CountDownLatch源码解析之countDown()

2025-05-29 0 52

CountDownLatch 源码解析—— countDown()

上一篇文章从源码层面说了一下CountDownLatch 中 await() 的原理。这篇文章说一下countDown() 。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23
public void countDown() { //CountDownLatch

sync.releaseShared(1);

}

public final boolean releaseShared(int arg) { //AQS

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

protected boolean tryReleaseShared(int releases) { //CountDownLatch.Sync

// Decrement count; signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextc == 0;

}

}

通过构造器 CountDownLatch end = new CountDownLatch(2); state 被设置为2,所以c == 2,nextc = 2-1,

然后通过下面这个CAS操作将state设置为1。

?

1

2

3

4
protected final boolean compareAndSetState(int expect, int update) {

// See below for intrinsics setup to support this

return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

}

此时nextc还不为0,返回false。一直等到countDown() 方法被调用两次,state == 0,nextc ==0,此时返回true。

进入doReleaseShared()方法。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31
doReleaseShared();

private void doReleaseShared() {

/*

* Ensure that a release propagates, even if there are other

* in-progress acquires/releases. This proceeds in the usual

* way of trying to unparkSuccessor of head if it needs

* signal. But if it does not, status is set to PROPAGATE to

* ensure that upon release, propagation continues.

* Additionally, we must loop in case a new node is added

* while we are doing this. Also, unlike other uses of

* unparkSuccessor, we need to know if CAS to reset status

* fails, if so rechecking.

*/

for (;;) {

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);

}

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

if (h == head) // loop if head changed

break;

}

}

回顾一下此时的等待队列模型。

?

1

2

3
+--------------------------+ prev +------------------+

head | waitStatus = Node.SIGNAL | <---- node(tail) | currentThread |

+--------------------------+ +------------------+

此时head 不为null,也不为tail,waitStatus == Node.SIGNAL,所以进入 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 这个判断。

?

1

2

3

4

5

6

7

8

9

10

11
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

/**

* CAS waitStatus field of a node.

*/

private static final boolean compareAndSetWaitStatus(Node node,

int expect,

int update) {

return unsafe.compareAndSwapInt(node, waitStatusOffset,

expect, update);

}

这个CAS 操作将 state 设置为 0 ,也就是说此时Head 中的 waitStatus 是0.此时队列模型如下所示

?

1

2

3
+----------------+ prev +------------------+

head | waitStatus = 0 | <---- node(tail) | currentThread |

+----------------+ +------------------+

该方法返回true。进入unparkSuccessor(h);

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28
unparkSuccessor(h);

private void unparkSuccessor(Node node) {

/*

* If status is negative (i.e., possibly needing signal) try

* to clear in anticipation of signalling. It is OK if this

* fails or if status is changed by waiting thread.

*/

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

/*

* Thread to unpark is held in successor, which is normally

* just the next node. But if cancelled or apparently null,

* traverse backwards from tail to find the actual

* non-cancelled successor.

*/

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

s 就是head的后继结点,也就是装有当前线程的结点。s != null ,并且s.waitStatus ==0 ,所以进入 LockSupport.unpark(s.thread);

?

1

2

3

4
public static void unpark(Thread thread) {

if (thread != null)

UNSAFE.unpark(thread);

}

也就是unlock 被阻塞的线程。裁判被允许吹哨了!

countDown() 的原理就此就非常清晰了。

每执行一次countDown() 方法,state 就是减1,直到state == 0,则开始释放被阻塞在队列中的线程,根据前驱结点中waitStatus的状态,释放后续结点中的线程。

OK,回到上一篇文章的问题,什么时候跳出下面这个循环(await方法中的循环)

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15
for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

此时state == 0,所以进入 setHeadAndPropagate 方法。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34
setHeadAndPropagate(node, r);

private void setHeadAndPropagate(Node node, int propagate) {

Node h = head; // Record old head for check below

setHead(node);

/*

* Try to signal next queued node if:

* Propagation was indicated by caller,

* or was recorded (as h.waitStatus either before

* or after setHead) by a previous operation

* (note: this uses sign-check of waitStatus because

* PROPAGATE status may transition to SIGNAL.)

* and

* The next node is waiting in shared mode,

* or we don't know, because it appears null

*

* The conservatism in both of these checks may cause

* unnecessary wake-ups, but only when there are multiple

* racing acquires/releases, so most need signals now or soon

* anyway.

*/

if (propagate > 0 || h == null || h.waitStatus < 0 ||

(h = head) == null || h.waitStatus < 0) {

Node s = node.next;

if (s == null || s.isShared())

doReleaseShared();

}

}

private void setHead(Node node) {

head = node;

node.thread = null;

node.prev = null;

}

这个方法将head 的后继结点变为head。该方法过后,又将node的next结点设置为null,模型变成下图

?

1

2

3
prev +---------+ next

null <---- node(tail/head) | null | ----> null

+---------+

也就是node head tail 什么的都被置为null,等待GC回收了,这个时候return,跳出了for循环,队列被清空。

下面演示一下整个过程

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31
setHeadAndPropagate(node, r);

+----------------+

head(tail) | waitStatus=0 |

| thread =null |

+----------------+

+----------------+ +----------------+

| waitStatus=0 | prev | waitStatus=0 |

head(tail) | thread =null | <---- node | currentThread |

+----------------+ +----------------+

+----------------+ +----------------+

| waitStatus=0 | prev | waitStatus=0 |

head | thread =null | <---- node(tail) | currentThread |

+----------------+ +----------------+

+----------------+ +----------------+

| Node.SIGNAL | prev | waitStatus=0 |

head | thread =null | <---- node(tail) | currentThread |

+----------------+ +----------------+

+----------------+ +----------------+

| waitStatus=0 | prev | waitStatus=0 |

head | thread =null | <---- node(tail) | currentThread |

+----------------+ +----------------+

+----------------+

prev | waitStatus=0 | next

null <---- node(tail/head) | null | ----> null

+----------------+

CountDownLatch 的核心就是一个阻塞线程队列,这是由链表构造而成的队列,里面包含thread 和 waitStatus,其中waitStatus说明了后继结点线程状态。

state 是一个非常重要的标志,构造时,设置为对应的n值,如果n != 0,阻塞队列将一直阻塞,除非中断线程。

每次调用countDown() 方法,就是将state-1,而调用await() 方法就是将调用该方法的线程加入到阻塞队列,直到state==0,才能释放线程。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持快网idc。

原文链接:https://www.cnblogs.com/cuglkb/p/8686415.html

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 CountDownLatch源码解析之countDown() https://www.kuaiidc.com/112310.html

相关文章

发表评论
暂无评论