SpringBoot+netty-socketio实现服务器端消息推送

2025-05-29 0 56

首先:因为工作需要,需要对接socket.io框架对接,所以目前只能使用netty-socketio。websocket是不支持对接socket.io框架的。

netty-socketio顾名思义他是一个底层基于netty'实现的socket。

在springboot项目中的集成,请看下面的代码

maven依赖

?

1

2

3

4

5
<dependency>

<groupId>com.corundumstudio.socketio</groupId>

<artifactId>netty-socketio</artifactId>

<version>1.7.11</version>

</dependency>

下面就是代码了

首先是配置参数

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18
#socketio配置

socketio:

host: localhost

port: 9099

# 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器

maxFramePayloadLength: 1048576

# 设置http交互最大内容长度

maxHttpContentLength: 1048576

# socket连接数大小(如只监听一个端口boss线程组为1即可)

bossCount: 1

workCount: 100

allowCustomRequests: true

# 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间

upgradeTimeout: 1000000

# Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件

pingTimeout: 6000000

# Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔

pingInterval: 25000

上面的注释写的很清楚。下面是config代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67
import com.corundumstudio.socketio.Configuration;

import com.corundumstudio.socketio.SocketConfig;

import com.corundumstudio.socketio.SocketIOServer;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**

* kcm

*/

@Component

public class PushServer implements InitializingBean {

@Autowired

private EventListenner eventListenner;

@Value("${socketio.port}")

private int serverPort;

@Value("${socketio.host}")

private String serverHost;

@Value("${socketio.bossCount}")

private int bossCount;

@Value("${socketio.workCount}")

private int workCount;

@Value("${socketio.allowCustomRequests}")

private boolean allowCustomRequests;

@Value("${socketio.upgradeTimeout}")

private int upgradeTimeout;

@Value("${socketio.pingTimeout}")

private int pingTimeout;

@Value("${socketio.pingInterval}")

private int pingInterval;

@Override

public void afterPropertiesSet() throws Exception {

Configuration config = new Configuration();

config.setPort(serverPort);

config.setHostname(serverHost);

config.setBossThreads(bossCount);

config.setWorkerThreads(workCount);

config.setAllowCustomRequests(allowCustomRequests);

config.setUpgradeTimeout(upgradeTimeout);

config.setPingTimeout(pingTimeout);

config.setPingInterval(pingInterval);

SocketConfig socketConfig = new SocketConfig();

socketConfig.setReuseAddress(true);

socketConfig.setTcpNoDelay(true);

socketConfig.setSoLinger(0);

config.setSocketConfig(socketConfig);

SocketIOServer server = new SocketIOServer(config);

server.addListeners(eventListenner);

server.start();

System.out.println("启动正常");

}

}

在就是监听代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55
import com.corundumstudio.socketio.AckRequest;

import com.corundumstudio.socketio.SocketIOClient;

import com.corundumstudio.socketio.annotation.OnConnect;

import com.corundumstudio.socketio.annotation.OnDisconnect;

import com.corundumstudio.socketio.annotation.OnEvent;

import org.apache.commons.lang3.StringUtils;

import org.bangying.auth.JwtSupport;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import java.util.UUID;

@Component

public class EventListenner {

@Resource

private ClientCache clientCache;

@Resource

private JwtSupport jwtSupport;

/**

* 客户端连接

*

* @param client

*/

@OnConnect

public void onConnect(SocketIOClient client) {

String userId = client.getHandshakeData().getSingleUrlParam("userId");

// userId = jwtSupport.getApplicationUser().getId().toString();

// userId = "8";

UUID sessionId = client.getSessionId();

clientCache.saveClient(userId, sessionId, client);

System.out.println("建立连接");

}

/**

* 客户端断开

*

* @param client

*/

@OnDisconnect

public void onDisconnect(SocketIOClient client) {

String userId = client.getHandshakeData().getSingleUrlParam("userId");

if (StringUtils.isNotBlank(userId)) {

clientCache.deleteSessionClient(userId, client.getSessionId());

System.out.println("关闭连接");

}

}

//消息接收入口,当接收到消息后,查找发送目标客户端,并且向该客户端发送消息,且给自己发送消息

// 暂未使用

@OnEvent("messageevent")

public void onEvent(SocketIOClient client, AckRequest request) {

}

}

本地缓存信息

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50
import com.corundumstudio.socketio.SocketIOClient;

import org.apache.commons.lang3.StringUtils;

import org.springframework.stereotype.Component;

import java.util.HashMap;

import java.util.Map;

import java.util.UUID;

import java.util.concurrent.ConcurrentHashMap;

/**

* kcm

*/

@Component

public class ClientCache {

//本地缓存

private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();

/**

* 存入本地缓存

* @param userId 用户ID

* @param sessionId 页面sessionID

* @param socketIOClient 页面对应的通道连接信息

*/

public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){

if(StringUtils.isNotBlank(userId)){

HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);

if(sessionIdClientCache==null){

sessionIdClientCache = new HashMap<>();

}

sessionIdClientCache.put(sessionId,socketIOClient);

concurrentHashMap.put(userId,sessionIdClientCache);

}

}

/**

* 根据用户ID获取所有通道信息

* @param userId

* @return

*/

public HashMap<UUID, SocketIOClient> getUserClient(String userId){

return concurrentHashMap.get(userId);

}

/**

* 根据用户ID及页面sessionID删除页面链接信息

* @param userId

* @param sessionId

*/

public void deleteSessionClient(String userId,UUID sessionId){

concurrentHashMap.get(userId).remove(sessionId);

}

}

下面是存储客户端连接信息

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50
import com.corundumstudio.socketio.SocketIOClient;

import org.apache.commons.lang3.StringUtils;

import org.springframework.stereotype.Component;

import java.util.HashMap;

import java.util.Map;

import java.util.UUID;

import java.util.concurrent.ConcurrentHashMap;

/**

* kcm

*/

@Component

public class ClientCache {

//本地缓存

private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();

/**

* 存入本地缓存

* @param userId 用户ID

* @param sessionId 页面sessionID

* @param socketIOClient 页面对应的通道连接信息

*/

public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){

if(StringUtils.isNotBlank(userId)){

HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);

if(sessionIdClientCache==null){

sessionIdClientCache = new HashMap<>();

}

sessionIdClientCache.put(sessionId,socketIOClient);

concurrentHashMap.put(userId,sessionIdClientCache);

}

}

/**

* 根据用户ID获取所有通道信息

* @param userId

* @return

*/

public HashMap<UUID, SocketIOClient> getUserClient(String userId){

return concurrentHashMap.get(userId);

}

/**

* 根据用户ID及页面sessionID删除页面链接信息

* @param userId

* @param sessionId

*/

public void deleteSessionClient(String userId,UUID sessionId){

concurrentHashMap.get(userId).remove(sessionId);

}

}

控制层推送方法

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20
@RestController

@RequestMapping("/push")

public class PushController {

@Resource

private ClientCache clientCache;

@Autowired

private JwtSupport jwtSupport;

@GetMapping("/message")

public String pushTuUser(@Param("id") String id){

Integer userId = jwtSupport.getApplicationUser().getId();

HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(String.valueOf(userId));

userClient.forEach((uuid, socketIOClient) -> {

//向客户端推送消息

socketIOClient.sendEvent("chatevent","服务端推送消息");

});

return "success";

}

}

到此这篇关于SpringBoot+netty-socketio实现服务器端消息推送的文章就介绍到这了,更多相关SpringBoot netty-socketio服务器端推送内容请搜索快网idc以前的文章或继续浏览下面的相关文章希望大家以后多多支持快网idc!

原文链接:https://blog.csdn.net/kang649882/article/details/104840441/

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 SpringBoot+netty-socketio实现服务器端消息推送 https://www.kuaiidc.com/107549.html

相关文章

发表评论
暂无评论