使用场景
由于公司业务需求,需要对接socket、MQTT等消息队列。
众所周知 socket 是双向通信,socket的回复是人为定义的,客户端推送消息给服务端,服务端的回复是两条线。无法像http请求有回复。
下发指令给硬件时,需要校验此次数据下发是否成功。
用户体验而言,点击按钮就要知道此次的下发成功或失败。
如上图模型,
第一种方案使用Tread.sleep
优点:占用资源小,放弃当前cpu资源
缺点: 回复速度快,休眠时间过长,仍然需要等待休眠结束才能返回,响应速度是固定的,无法及时响应第二种方案使用CountDownLatch
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
package com.lzy.demo.delay;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CountDownLatchPool {
//countDonw池
private final static Map<Integer, CountDownLatch> countDownLatchMap = new ConcurrentHashMap<>();
//延迟队列
private final static DelayQueue<MessageDelayQueueUtil> delayQueue = new DelayQueue<>();
private volatile static boolean flag = false ;
//单线程池
private final static ExecutorService t = new ThreadPoolExecutor( 1 , 1 ,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>( 1 ));
public static void addCountDownLatch(Integer messageId) {
CountDownLatch countDownLatch = countDownLatchMap.putIfAbsent(messageId, new CountDownLatch( 1 ) );
if (countDownLatch == null ){
countDownLatch = countDownLatchMap.get(messageId);
}
try {
addDelayQueue(messageId);
countDownLatch.await(3L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println( "阻塞等待结束~~~~~~" );
}
public static void removeCountDownLatch(Integer messageId){
CountDownLatch countDownLatch = countDownLatchMap.get(messageId);
if (countDownLatch == null )
return ;
countDownLatch.countDown();
countDownLatchMap.remove(messageId);
System.out.println( "清除Map数据" +countDownLatchMap);
}
private static void addDelayQueue(Integer messageId){
delayQueue.add( new MessageDelayQueueUtil(messageId));
clearMessageId();
}
private static void clearMessageId(){
synchronized (CountDownLatchPool. class ){
if (flag){
return ;
}
flag = true ;
}
t.execute(()->{
while (delayQueue.size() > 0 ){
System.out.println( "进入线程并开始执行" );
try {
MessageDelayQueueUtil take = delayQueue.take();
Integer messageId1 = take.getMessageId();
removeCountDownLatch(messageId1);
System.out.println( "清除队列数据" +messageId1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag = false ;
System.out.println( "结束end----" );
});
}
public static void main(String[] args) throws InterruptedException {
/*
测试超时清空map
new Thread(()->addCountDownLatch(1)).start();
new Thread(()->addCountDownLatch(2)).start();
new Thread(()->addCountDownLatch(3)).start();
*/
//提前创建线程,清空countdown
new Thread(()->{
try {
Thread.sleep(500L);
removeCountDownLatch( 1 );
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
//开始阻塞
addCountDownLatch( 1 );
//通过调整上面的sleep我们发现阻塞市场取决于countDownLatch.countDown()执行时间
System.out.println( "阻塞结束----" );
}
}
class MessageDelayQueueUtil implements Delayed {
private Integer messageId;
private long avaibleTime;
public Integer getMessageId() {
return messageId;
}
public void setMessageId(Integer messageId) {
this .messageId = messageId;
}
public long getAvaibleTime() {
return avaibleTime;
}
public void setAvaibleTime( long avaibleTime) {
this .avaibleTime = avaibleTime;
}
public MessageDelayQueueUtil(Integer messageId){
this .messageId = messageId;
//avaibleTime = 当前时间+ delayTime
//重试3次,每次3秒+1秒的延迟
this .avaibleTime= 3000 * 3 + 1000 + System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
long diffTime= avaibleTime- System.currentTimeMillis();
return unit.convert(diffTime,TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
//compareTo用在DelayedUser的排序
return ( int )( this .avaibleTime - ((MessageDelayQueueUtil) o).getAvaibleTime());
}
}
|
由于socket并不确定每次都会有数据返回,所以map的数据会越来越大,最终导致内存溢出
需定时清除map内的无效数据。
可以使用DelayedQuene延迟队列来处理,相当于给对象添加一个过期时间
使用方法 addCountDownLatch 等待消息,异步回调消息清空removeCountDownLatch
到此这篇关于详解Java中CountDownLatch异步转同步工具类的文章就介绍到这了,更多相关CountDownLatch异步转同步工具类内容请搜索快网idc以前的文章或继续浏览下面的相关文章希望大家以后多多支持快网idc!
原文链接:https://blog.csdn.net/qq_37256345/article/details/117808156
相关文章
猜你喜欢
- ASP.NET自助建站系统的数据库备份与恢复操作指南 2025-06-10
- 个人网站服务器域名解析设置指南:从购买到绑定全流程 2025-06-10
- 个人网站搭建:如何挑选具有弹性扩展能力的服务器? 2025-06-10
- 个人服务器网站搭建:如何选择适合自己的建站程序或框架? 2025-06-10
- 64M VPS建站:能否支持高流量网站运行? 2025-06-10
TA的动态
- 2025-07-10 怎样使用阿里云的安全工具进行服务器漏洞扫描和修复?
- 2025-07-10 怎样使用命令行工具优化Linux云服务器的Ping性能?
- 2025-07-10 怎样使用Xshell连接华为云服务器,实现高效远程管理?
- 2025-07-10 怎样利用云服务器D盘搭建稳定、高效的网站托管环境?
- 2025-07-10 怎样使用阿里云的安全组功能来增强服务器防火墙的安全性?
快网idc优惠网
QQ交流群
您的支持,是我们最大的动力!
热门文章
-
2025-05-27 29
-
2025-05-29 16
-
2025-05-25 73
-
2025-06-04 21
-
2025-05-29 19
热门评论