JUC之Semaphore源码分析

2025-05-29 0 21

Semaphore 主要用于限量控制并发执行代码的工具类, 其内部通过 一个 permit 来进行定义并发执行的数量。

?

1

2

3

4

5

6

7

8

9

10

11

12

13
/**

* 使用非公平版本构件 Semaphore

*/

public KSemaphore(int permits){

sync = new NonfairSync(permits);

}

/**

* 指定版本构件 Semaphore

*/

public KSemaphore(int permits, boolean fair){

sync = fair ? new FairSync(permits) : new NonfairSync(permits);

}

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72
/** AQS 的子类主要定义获取释放 lock */

abstract static class Sync extends KAbstractQueuedSynchronizer{

private static final long serialVersionUID = 1192457210091910933L;

/**

* 指定 permit 初始化 Semaphore

*/

Sync(int permits){

setState(permits);

}

/**

* 返回剩余 permit

*/

final int getPermits(){

return getState();

}

/**

* 获取 permit

*/

final int nonfairTryAcquireShared(int acquires){

for(;;){

int available = getState();

int remaining = available - acquires; // 判断获取 acquires 的剩余 permit 数目

if(remaining < 0 ||

compareAndSetState(available, remaining)){ // cas改变 state

return remaining;

}

}

}

/**

* 释放 lock

*/

protected final boolean tryReleaseShared(int releases){

for(;;){

int current = getState();

int next = current + releases;

if(next < current){ // overflow

throw new Error(" Maximum permit count exceeded");

}

if(compareAndSetState(current, next)){ // cas改变 state

return true;

}

}

}

final void reducePermits(int reductions){ // 减少 permits

for(;;){

int current = getState();

int next = current - reductions;

if(next > current){ // underflow

throw new Error(" Permit count underflow ");

}

if(compareAndSetState(current, next)){

return;

}

}

}

/** 将 permit 置为 0 */

final int drainPermits(){

for(;;){

int current = getState();

if(current == 0 || compareAndSetState(current, 0)){

return current;

}

}

}

}

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67
/**

* 调用 acquireSharedInterruptibly 响应中断的方式获取 permit

*/

public void acquire() throws InterruptedException{

sync.acquireSharedInterruptibly(1);

}

/**

* 调用 acquireUninterruptibly 非响应中断的方式获取 permit

*/

public void acquireUninterruptibly(){

sync.acquireShared(1);

}

/**

* 尝试获取 permit

*/

public boolean tryAcquire(){

return sync.nonfairTryAcquireShared(1) >= 0;

}

/**

* 尝试的获取 permit, 支持超时与中断

*/

public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException{

return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

}

/**

* 支持中断的获取permit

*/

public void acquire(int permits) throws InterruptedException{

if(permits < 0){

throw new IllegalArgumentException();

}

sync.acquireSharedInterruptibly(permits);

}

/**

* 不响应中断的获取 permit

*/

public void acquireUninterruptibly(int permits){

if(permits < 0) throw new IllegalArgumentException();

sync.acquireShared(permits);

}

/**

* 尝试获取 permit

*/

public boolean tryAcquire(int permits){

if(permits < 0) throw new IllegalArgumentException();

return sync.nonfairTryAcquireShared(permits) >= 0;

}

/**

* 尝试 支持超时机制, 支持中断 的获取 permit

*/

public boolean tryAcquire(int permits, long timout, TimeUnit unit) throws InterruptedException{

if(permits < 0) throw new IllegalArgumentException();

return sync.tryAcquireSharedNanos(permits, unit.toNanos(timout));

}

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14
/**

* 释放 permit

*/

public void release(){

sync.releaseShared(1);

}

/**

* 释放 permit

*/

public void release(int permits){

if(permits < 0) throw new IllegalArgumentException();

sync.releaseShared(permits);

}

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54
/**

* 返回可用的 permit

*/

public int availablePermits(){

return sync.getPermits();

}

/**

* 消耗光 permit

*/

public int drainPermits(){

return sync.drainPermits();

}

/**

* 减少 reduction 个permit

*/

protected void reducePermits(int reduction){

if(reduction < 0) throw new IllegalArgumentException();

sync.reducePermits(reduction);

}

/**

* 判断是否是公平版本

*/

public boolean isFair(){

return sync instanceof FairSync;

}

/**

* 返回 AQS 中 Sync Queue 里面的等待线程

*/

public final boolean hasQueuedThreads(){

return sync.hasQueuedThreads();

}

/**

* 返回 AQS 中 Sync Queue 里面的等待线程长度

*/

public final int getQueueLength(){

return sync.getQueueLength();

}

/**

* 返回 AQS 中 Sync Queue 里面的等待线程

*/

protected Collection<Thread> getQueueThreads(){

return sync.getQueuedThreads();

}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持快网idc。

原文链接:https://blog.csdn.net/m0_37039331/article/details/87870587

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 JUC之Semaphore源码分析 https://www.kuaiidc.com/112270.html

相关文章

发表评论
暂无评论