springBoot整合RocketMQ及坑的示例代码

2025-05-29 0 24

版本:

  • jdk:1.8
  • springboot:1.5.10
  • rocketmq:4.2.0

pom 配置:

?

1

2

3

4

5

6

7

8

9

10
<parent>

<groupid>org.springframework.boot</groupid>

<artifactid>spring-boot-starter-parent</artifactid>

<version>1.5.10.release</version>

</parent>

<dependency>

<groupid>org.apache.rocketmq</groupid>

<artifactid>rocketmq-client</artifactid>

<version>4.2.0</version>

</dependency>

application.properties 配置:

?

1

2

3

4

5

6
# 消费者的组名

apache.rocketmq.consumer.pushconsumer=pushconsumer

# 生产者的组名

apache.rocketmq.producer.producergroup=producer

# nameserver地址

apache.rocketmq.namesrvaddr=localhost:9876

java代码:

生产者

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59
package test.config.rocketmq;

import org.apache.rocketmq.client.producer.defaultmqproducer;

import org.apache.rocketmq.client.producer.sendresult;

import org.apache.rocketmq.common.message.message;

import org.apache.rocketmq.remoting.common.remotinghelper;

import org.springframework.beans.factory.annotation.value;

import org.springframework.stereotype.component;

import org.springframework.util.stopwatch;

import javax.annotation.postconstruct;

@component

public class rocketmqclient {

/**

* 生产者的组名

*/

@value("${apache.rocketmq.producer.producergroup}")

private string producergroup;

/**

* nameserver 地址

*/

@value("${apache.rocketmq.namesrvaddr}")

private string namesrvaddr;

@postconstruct

public void defaultmqproducer() {

//生产者的组名

defaultmqproducer producer = new defaultmqproducer(producergroup);

//指定nameserver地址,多个地址以 ; 隔开

producer.setnamesrvaddr(namesrvaddr);

producer.setvipchannelenabled(false);

try {

/**

* producer对象在使用之前必须要调用start初始化,初始化一次即可

* 注意:切记不可以在每次发送消息时,都调用start方法

*/

producer.start();

//创建一个消息实例,包含 topic、tag 和 消息体

//如下:topic 为 "topictest",tag 为 "push"

message message = new message("topictest", "push", "发送消息----zhisheng-----".getbytes(remotinghelper.default_charset));

stopwatch stop = new stopwatch();

stop.start();

for (int i = 0; i < 1; i++) {

sendresult result = producer.send(message);

system.out.println("发送响应:msgid:" + result.getmsgid() + ",发送状态:" + result.getsendstatus());

}

stop.stop();

system.out.println("----------------发送一万条消息耗时:" + stop.gettotaltimemillis());

} catch (exception e) {

e.printstacktrace();

} finally {

producer.shutdown();

}

}

}

消费者:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63
import org.apache.rocketmq.client.consumer.defaultmqpushconsumer;

import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus;

import org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently;

import org.apache.rocketmq.common.consumer.consumefromwhere;

import org.apache.rocketmq.common.message.messageext;

import org.apache.rocketmq.remoting.common.remotinghelper;

import org.springframework.beans.factory.annotation.value;

import org.springframework.stereotype.component;

import javax.annotation.postconstruct;

@component

public class rocketmqserver {

/**

* 消费者的组名

*/

@value("${apache.rocketmq.consumer.pushconsumer}")

private string consumergroup;

/**

* nameserver 地址

*/

@value("${apache.rocketmq.namesrvaddr}")

private string namesrvaddr;

@postconstruct

public void defaultmqpushconsumer() {

//消费者的组名

defaultmqpushconsumer consumer = new defaultmqpushconsumer(consumergroup);

//指定nameserver地址,多个地址以 ; 隔开

consumer.setnamesrvaddr(namesrvaddr);

consumer.setvipchannelenabled(false);

try {

//订阅pushtopic下tag为push的消息

consumer.subscribe("topictest", "push");

//设置consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

//如果非第一次启动,那么按照上次消费的位置继续消费

consumer.setconsumefromwhere(consumefromwhere.consume_from_first_offset);

consumer.registermessagelistener((messagelistenerconcurrently) (list, context) -> {

try {

for (messageext messageext : list) {

system.out.println("messageext: " + messageext);//输出消息内容

string messagebody = new string(messageext.getbody(), remotinghelper.default_charset);

system.out.println("消费响应:msgid : " + messageext.getmsgid() + ", msgbody : " + messagebody);//输出消息内容

}

} catch (exception e) {

e.printstacktrace();

return consumeconcurrentlystatus.reconsume_later; //稍后再试

}

return consumeconcurrentlystatus.consume_success; //消费成功

});

consumer.start();

} catch (exception e) {

e.printstacktrace();

}

}

}

掉坑总结:

1.rocketmq启动时,命令不是mqbroker -n 127.0.0.1:9876

正确应该是:mqbroker -n 127.0.0.1:9876butiautocreatetopicenable=true

否则会抛出:no route info of this topic, topictest

2.客户端连接时抛出异常

org.apache.rocketmq.client.exception.mqclientexception:

send [3] times, still failed, cost [3180]ms, topic: topictest, brokerssent: \\

[win-93cgo0s5g25, win-93cgo0s5g25, win-93cgo0s5g25]

解决方式两种

1.producer.setvipchannelenabled(false); 生产者和消费者添加这行代买。

2.降rocketmq版本,降成3.2.6

关于spring.rocketmq.name-server的坑

看下图:

springBoot整合RocketMQ及坑的示例代码

注意:

如果你是springboot2.0+的框架,或者是jdk10。

你需要将你自己的项目配置文件中的,spring.rocketmq.name-server改成

spring.rocketmq.nameserver。注意是nameserver。

不然就会报各种稀奇古怪的bug。

关于启动报内存不足的错

在安装启动name server和broker的时候,一定要修改配置文件,不然内存会爆炸。

native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory

springBoot整合RocketMQ及坑的示例代码

将下面的配置文件根据你的需要改

我这里以前默认是xms4g,都是g,我修改到m就行了。

java_opt="${java_opt} -server -xms256m -xmx256m -xmn128m -xx:metaspacesize=128m -xx:maxmetaspacesize=320m"

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持快网idc。

原文链接:https://blog.csdn.net/qq_24853627/article/details/79443437

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 springBoot整合RocketMQ及坑的示例代码 https://www.kuaiidc.com/110855.html

相关文章

发表评论
暂无评论