Springboot+Netty+Websocket实现消息推送实例

2025-05-29 0 51

前言

websocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 websocket api 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

netty框架的优势

1. api使用简单,开发门槛低;
2. 功能强大,预置了多种编解码功能,支持多种主流协议;
3. 定制能力强,可以通过channelhandler对通信框架进行灵活地扩展;
4. 性能高,通过与其他业界主流的nio框架对比,netty的综合性能最优;
5. 成熟、稳定,netty修复了已经发现的所有jdk nio bug,业务开发人员不需要再为nio的bug而烦恼

提示:以下是本篇文章正文内容,下面案例可供参考

一、引入netty依赖

?

1

2

3

4

5
<dependency>

<groupid>io.netty</groupid>

<artifactid>netty-all</artifactid>

<version>4.1.48.final</version>

</dependency>

二、使用步骤

1.引入基础配置类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22
package com.test.netty;

public enum cmd {

start("000", "连接成功"),

wmessage("001", "消息提醒"),

;

private string cmd;

private string desc;

cmd(string cmd, string desc) {

this.cmd = cmd;

this.desc = desc;

}

public string getcmd() {

return cmd;

}

public string getdesc() {

return desc;

}

}

2.netty服务启动监听器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61
package com.test.netty;

import io.netty.bootstrap.serverbootstrap;

import io.netty.channel.channelfuture;

import io.netty.channel.channeloption;

import io.netty.channel.eventloopgroup;

import io.netty.channel.nio.nioeventloopgroup;

import io.netty.channel.socket.nio.nioserversocketchannel;

import lombok.extern.slf4j.slf4j;

import org.springframework.beans.factory.annotation.autowired;

import org.springframework.beans.factory.annotation.value;

import org.springframework.boot.applicationrunner;

import org.springframework.context.annotation.bean;

import org.springframework.stereotype.component;

/**

* @author test

* <p>

* 服务启动监听器

**/

@slf4j

@component

public class nettyserver {

@value("${server.netty.port}")

private int port;

@autowired

private serverchannelinitializer serverchannelinitializer;

@bean

applicationrunner nettyrunner() {

return args -> {

//new 一个主线程组

eventloopgroup bossgroup = new nioeventloopgroup(1);

//new 一个工作线程组

eventloopgroup workgroup = new nioeventloopgroup();

serverbootstrap bootstrap = new serverbootstrap()

.group(bossgroup, workgroup)

.channel(nioserversocketchannel.class)

.childhandler(serverchannelinitializer)

//设置队列大小

.option(channeloption.so_backlog, 1024)

// 两小时内没有数据的通信时,tcp会自动发送一个活动探测数据报文

.childoption(channeloption.so_keepalive, true);

//绑定端口,开始接收进来的连接

try {

channelfuture future = bootstrap.bind(port).sync();

log.info("服务器启动开始监听端口: {}", port);

future.channel().closefuture().sync();

} catch (interruptedexception e) {

e.printstacktrace();

} finally {

//关闭主线程组

bossgroup.shutdowngracefully();

//关闭工作线程组

workgroup.shutdowngracefully();

}

};

}

}

3.netty服务端处理器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121
package com.test.netty;

import com.test.common.util.jsonutil;

import io.netty.channel.channel;

import io.netty.channel.channelhandler;

import io.netty.channel.channelhandlercontext;

import io.netty.channel.simplechannelinboundhandler;

import io.netty.handler.codec.http.websocketx.textwebsocketframe;

import io.netty.handler.codec.http.websocketx.websocketserverprotocolhandler;

import lombok.data;

import lombok.extern.slf4j.slf4j;

import org.springframework.beans.factory.annotation.autowired;

import org.springframework.stereotype.component;

import java.net.urldecoder;

import java.util.*;

/**

* @author test

* <p>

* netty服务端处理器

**/

@slf4j

@component

@channelhandler.sharable

public class nettyserverhandler extends simplechannelinboundhandler<textwebsocketframe> {

@autowired

private serverchannelcache cache;

private static final string datakey = "test=";

@data

public static class channelcache {

}

/**

* 客户端连接会触发

*/

@override

public void channelactive(channelhandlercontext ctx) throws exception {

channel channel = ctx.channel();

log.info("通道连接已打开,id->{}......", channel.id().aslongtext());

}

@override

public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception {

if (evt instanceof websocketserverprotocolhandler.handshakecomplete) {

channel channel = ctx.channel();

websocketserverprotocolhandler.handshakecomplete handshakecomplete = (websocketserverprotocolhandler.handshakecomplete) evt;

string requesturi = handshakecomplete.requesturi();

requesturi = urldecoder.decode(requesturi, "utf-8");

log.info("handshake_complete,id->{},uri->{}", channel.id().aslongtext(), requesturi);

string socketkey = requesturi.substring(requesturi.lastindexof(datakey) + datakey.length());

if (socketkey.length() > 0) {

cache.add(socketkey, channel);

this.send(channel, cmd.down_start, null);

} else {

channel.disconnect();

ctx.close();

}

}

super.usereventtriggered(ctx, evt);

}

@override

public void channelinactive(channelhandlercontext ctx) throws exception {

channel channel = ctx.channel();

log.info("通道连接已断开,id->{},用户id->{}......", channel.id().aslongtext(), cache.getcacheid(channel));

cache.remove(channel);

}

/**

* 发生异常触发

*/

@override

public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {

channel channel = ctx.channel();

log.error("连接出现异常,id->{},用户id->{},异常->{}......", channel.id().aslongtext(), cache.getcacheid(channel), cause.getmessage(), cause);

cache.remove(channel);

ctx.close();

}

/**

* 客户端发消息会触发

*/

@override

protected void channelread0(channelhandlercontext ctx, textwebsocketframe msg) throws exception {

try {

// log.info("接收到客户端发送的消息:{}", msg.text());

ctx.channel().writeandflush(new textwebsocketframe(jsonutil.tostring(collections.singletonmap("cmd", "100"))));

} catch (exception e) {

log.error("消息处理异常:{}", e.getmessage(), e);

}

}

public void send(cmd cmd, string id, object obj) {

hashmap<string, channel> channels = cache.get(id);

if (channels == null) {

return;

}

map<string, object> data = new linkedhashmap<>();

data.put("cmd", cmd.getcmd());

data.put("data", obj);

string msg = jsonutil.tostring(data);

log.info("服务器下发消息: {}", msg);

channels.values().foreach(channel -> {

channel.writeandflush(new textwebsocketframe(msg));

});

}

public void send(channel channel, cmd cmd, object obj) {

map<string, object> data = new linkedhashmap<>();

data.put("cmd", cmd.getcmd());

data.put("data", obj);

string msg = jsonutil.tostring(data);

log.info("服务器下发消息: {}", msg);

channel.writeandflush(new textwebsocketframe(msg));

}

}

4.netty服务端缓存类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48
package com.test.netty;

import io.netty.channel.channel;

import io.netty.util.attributekey;

import org.springframework.stereotype.component;

import java.util.hashmap;

import java.util.concurrent.concurrenthashmap;

@component

public class serverchannelcache {

private static final concurrenthashmap<string, hashmap<string, channel>> cache_map = new concurrenthashmap<>();

private static final attributekey<string> channel_attr_key = attributekey.valueof("test");

public string getcacheid(channel channel) {

return channel.attr(channel_attr_key).get();

}

public void add(string cacheid, channel channel) {

channel.attr(channel_attr_key).set(cacheid);

hashmap<string, channel> hashmap = cache_map.get(cacheid);

if (hashmap == null) {

hashmap = new hashmap<>();

}

hashmap.put(channel.id().asshorttext(), channel);

cache_map.put(cacheid, hashmap);

}

public hashmap<string, channel> get(string cacheid) {

if (cacheid == null) {

return null;

}

return cache_map.get(cacheid);

}

public void remove(channel channel) {

string cacheid = getcacheid(channel);

if (cacheid == null) {

return;

}

hashmap<string, channel> hashmap = cache_map.get(cacheid);

if (hashmap == null) {

hashmap = new hashmap<>();

}

hashmap.remove(channel.id().asshorttext());

cache_map.put(cacheid, hashmap);

}

}

5.netty服务初始化器

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33
package com.test.netty;

import io.netty.channel.channelinitializer;

import io.netty.channel.channelpipeline;

import io.netty.channel.socket.socketchannel;

import io.netty.handler.codec.http.httpobjectaggregator;

import io.netty.handler.codec.http.httpservercodec;

import io.netty.handler.codec.http.websocketx.websocketserverprotocolhandler;

import io.netty.handler.stream.chunkedwritehandler;

import org.springframework.beans.factory.annotation.autowired;

import org.springframework.stereotype.component;

/**

* @author test

* <p>

* netty服务初始化器

**/

@component

public class serverchannelinitializer extends channelinitializer<socketchannel> {

@autowired

private nettyserverhandler nettyserverhandler;

@override

protected void initchannel(socketchannel socketchannel) throws exception {

channelpipeline pipeline = socketchannel.pipeline();

pipeline.addlast(new httpservercodec());

pipeline.addlast(new chunkedwritehandler());

pipeline.addlast(new httpobjectaggregator(8192));

pipeline.addlast(new websocketserverprotocolhandler("/test.io", true, 5000));

pipeline.addlast(nettyserverhandler);

}

}

6.html测试

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53
<!doctype html>

<html>

<head>

<meta charset="utf-8">

<title>test</title>

<script type="text/javascript">

function websockettest()

{

if ("websocket" in window)

{

alert("您的浏览器支持 websocket!");

// 打开一个 web socket

var ws = new websocket("ws://localhost:port/test.io");

ws.onopen = function()

{

// web socket 已连接上,使用 send() 方法发送数据

ws.send("发送数据");

alert("数据发送中...");

};

ws.onmessage = function (evt)

{

var received_msg = evt.data;

alert("数据已接收...");

};

ws.onclose = function()

{

// 关闭 websocket

alert("连接已关闭...");

};

}

else

{

// 浏览器不支持 websocket

alert("您的浏览器不支持 websocket!");

}

}

</script>

</head>

<body>

<div id="sse">

<a href="javascript:websockettest()" rel="external nofollow" >运行 websocket</a>

</div>

</body>

</html>

7.vue测试

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35
mounted() {

this.initwebsocket();

},

methods: {

initwebsocket() {

let websocket = new websocket('ws://localhost:port/test.io?test=123456');

websocket.onmessage = (event) => {

let msg = json.parse(event.data);

switch (msg.cmd) {

case "000":

this.$message({

type: 'success',

message: "建立实时连接成功!",

duration: 1000

})

setinterval(()=>{websocket.send("heartbeat")},60*1000);

break;

case "001":

this.$message.warning("收到一条新的信息,请及时查看!")

break;

}

}

websocket.onclose = () => {

settimeout(()=>{

this.initwebsocket();

},30*1000);

}

websocket.onerror = () => {

settimeout(()=>{

this.initwebsocket();

},30*1000);

}

},

},

![在这里插入图片描述](https://img-blog.csdnimg.cn/20210107160420568.jpg?x-oss-process=image/watermark,type_zmfuz3pozw5nagvpdgk,shadow_10,text_ahr0chm6ly9ibg9nlmnzzg4ubmv0l3d1x3fpbmdfc29uzw==,size_16,color_ffffff,t_70#pic_center)

8.服务器下发消息

?

1

2

3
@autowired

private nettyserverhandler nettyserverhandler;

nettyserverhandler.send(cmdweb.wmessage, id, message);

到此这篇关于springboot+netty+websocket实现消息推送实例的文章就介绍到这了,更多相关springboot websocket消息推送内容请搜索快网idc以前的文章或继续浏览下面的相关文章希望大家以后多多支持快网idc!

原文链接:https://blog.csdn.net/wu_qing_song/article/details/112311860

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 Springboot+Netty+Websocket实现消息推送实例 https://www.kuaiidc.com/109475.html

相关文章

发表评论
暂无评论