RocketMQ 延时级别配置方式

2025-05-29 0 52

RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。

其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。

如何配置

在服务器端(rocketmq-broker端)的属性配置文件中加入以下行:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

描述了各级别与延时时间的对应映射关系。

这个配置配置了从1级开始各级延时的时间,如1表示延时1s,2表示延时5s,14表示延时10m,可以修改这个指定级别的延时时间;

时间单位支持:s、m、h、d,分别表示秒、分、时、天;

默认值就是上面声明的,可手工调整;

默认值已经够用,不建议调整【仅供参考,还是根据实际需要调整。调整默认值时注意同时要修改时间对应的level级别的值】

如何发送延时消息:

发送延时消息只需要在客户端(rocketmq-client端)待发送的消息( com.alibaba.rocketmq.common.message.Message )中设置延时级别delayLevel即可。

?

1

2

3
Message msg = new Message(topicName,"",keys,message.getBytes());

msg.setDelayTimeLevel(delayLevel);

SendResult sendResult = getMQProducer.send(msg);

RocketMQ定时(延迟)消息

RocketMQ 不支持任意时间自定义的延迟消息,仅支持内置预设值的延迟时间间隔的延迟消息。

预设值的延迟时间间隔为:

1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

延时消息的使用场景

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

生产

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25
package com.xin.rocketmq.demo.testrun;

import com.xin.rocketmq.demo.config.JmsConfig;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

public class ProducerDelay {

public static void main(String[] args) throws Exception {

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("192.168.10.11:9876");

producer.start();

Message msg1 = new Message(

JmsConfig.TOPIC,

"订单001".getBytes());

msg1.setDelayTimeLevel(2);//延迟5秒

Message msg2 = new Message(

JmsConfig.TOPIC,

"订单001".getBytes());

msg2.setDelayTimeLevel(4);//延迟30秒

SendResult sendResult1 = producer.send(msg1);

SendResult sendResult2 = producer.send(msg2);

System.out.println("Product1-同步发送-Product信息={}" + sendResult1);

System.out.println("Product2-同步发送-Product信息={}" + sendResult2);

producer.shutdown();

}

}

消费

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31
package com.xin.rocketmq.demo.testrun;

import com.xin.rocketmq.demo.config.JmsConfig;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerDelay {

public static void main(String[] args) throws Exception {

// 实例化消费者

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

// 设置NameServer的地址

consumer.setNamesrvAddr("192.168.10.11:9876");

// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息

consumer.subscribe(JmsConfig.TOPIC, "*");

// 注册消息监听者

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {

for (MessageExt message : messages) {

// Print approximate delay time period

System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

// 启动消费者

consumer.start();

}

}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持快网idc。

原文链接:https://blog.csdn.net/weixin_38951207/article/details/79022875

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 RocketMQ 延时级别配置方式 https://www.kuaiidc.com/105982.html

相关文章

发表评论
暂无评论