SpringBoot整合RocketMQ事务/广播/顺序消息

2025-05-29 0 18

SpringBoot整合RocketMQ事务/广播/顺序消息

环境:springboot2.3.9RELEASE + RocketMQ4.8.0

依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.apache.rocketmq</groupId>
  7. <artifactId>rocketmq-spring-boot-starter</artifactId>
  8. <version>2.2.0</version>
  9. </dependency>

配置文件

  1. server:
  2. port:8080
  3. rocketmq:
  4. nameServer:localhost:9876
  5. producer:
  6. group:demo-mq

普通消息

发送

  1. @Resource
  2. privateRocketMQTemplaterocketMQTemplate;
  3. publicvoidsend(Stringmessage){
  4. rocketMQTemplate.convertAndSend("test-topic:tag2",MessageBuilder.withPayload(message).build());
  5. }

接受

  1. @RocketMQMessageListener(topic="test-topic",consumerGroup="consumer01-group",selectorExpression="tag1||tag2")
  2. @Component
  3. publicclassConsumerListenerimplementsRocketMQListener<String>{
  4. @Override
  5. publicvoidonMessage(Stringmessage){
  6. System.out.println("接收到消息:"+message);
  7. }
  8. }

顺序消息

发送

  1. @Resource
  2. privateRocketMQTemplaterocketMQTemplate;
  3. publicvoidsendOrder(Stringtopic,Stringmessage,Stringtags,intid){
  4. rocketMQTemplate.asyncSendOrderly(topic+":"+tags,MessageBuilder.withPayload(message).build(),
  5. "order-"+id,newSendCallback(){
  6. @Override
  7. publicvoidonSuccess(SendResultsendResult){
  8. System.err.println("msg-id:"+sendResult.getMsgId()+":"+message+"\\tqueueId:"+sendResult.getMessageQueue().getQueueId());
  9. }
  10. @Override
  11. publicvoidonException(Throwablee){
  12. e.printStackTrace();
  13. }
  14. });
  15. }

这里是根据hashkey将消息发送到不同的队列中

  1. @RocketMQMessageListener(topic="order-topic",consumerGroup="consumer02-group",
  2. selectorExpression="tag3||tag4",consumeMode=ConsumeMode.ORDERLY)
  3. @Component
  4. publicclassConsumerOrderListenerimplementsRocketMQListener<String>{
  5. @Override
  6. publicvoidonMessage(Stringmessage){
  7. System.out.println(Thread.currentThread().getName()+"接收到Order消息:"+message);
  8. }
  9. }

consumeMode = ConsumeMode.ORDERLY,指明了消息模式为顺序模式,一个队列,一个线程。

结果

SpringBoot整合RocketMQ事务/广播/顺序消息

当consumeMode = ConsumeMode.CONCURRENTLY执行结果如下:

SpringBoot整合RocketMQ事务/广播/顺序消息

集群/广播消息模式

发送端

  1. @Resource
  2. privateRocketMQTemplaterocketMQTemplate;
  3. publicvoidsend(Stringtopic,Stringmessage,Stringtags){
  4. rocketMQTemplate.send(topic+":"+tags,MessageBuilder.withPayload(message).build());
  5. }

集群消息模式

消费端

  1. @RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",
  2. selectorExpression="tag6||tag7",messageModel=MessageModel.CLUSTERING)
  3. @Component
  4. publicclassConsumerBroadListenerimplementsRocketMQListener<String>{
  5. @Override
  6. publicvoidonMessage(Stringmessage){
  7. System.out.println("ConsumerBroadListener1接收到消息:"+message);
  8. }
  9. }

messageModel = MessageModel.CLUSTERING

测试

启动两个服务分别端口是8080,8081

8080服务

SpringBoot整合RocketMQ事务/广播/顺序消息

8081服务

SpringBoot整合RocketMQ事务/广播/顺序消息

集群消息模式下,每个服务分别接收一部分消息,实现了负载均衡

广播消息模式

消费端

  1. @RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",
  2. selectorExpression="tag6||tag7",messageModel=MessageModel.BROADCASTING)
  3. @Component
  4. publicclassConsumerBroadListenerimplementsRocketMQListener<String>{
  5. @Override
  6. publicvoidonMessage(Stringmessage){
  7. System.out.println("ConsumerBroadListener1接收到消息:"+message);
  8. }
  9. }

messageModel = MessageModel.BROADCASTING

测试

启动两个服务分别端口是8080,8081

8080服务

SpringBoot整合RocketMQ事务/广播/顺序消息

8081服务

SpringBoot整合RocketMQ事务/广播/顺序消息

集群消息模式下,每个服务分别都接受了同样的消息。

事务消息

RocketMQ事务的3个状态

TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息

TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。

TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程 整体流程为:

正常事务发送与提交阶段

1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)

2、服务端响应消息写入结果,半消息发送成功

3、开始执行本地事务

4、根据本地事务的执行状态执行Commit或者Rollback操作

事务信息的补偿流程

1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求

2、生产者收到确认回查请求后,检查本地事务的执行状态

3、根据检查后的结果执行Commit或者Rollback操作

补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

发送端

  1. @Resource
  2. privateRocketMQTemplaterocketMQTemplate;
  3. publicvoidsendTx(Stringtopic,Longid,Stringtags){
  4. rocketMQTemplate.sendMessageInTransaction(topic+":"+tags,MessageBuilder.withPayload(
  5. newUsers(id,UUID.randomUUID().toString().replaceAll("-",""))).
  6. setHeader("BID",UUID.randomUUID().toString().replaceAll("-","")).build(),
  7. UUID.randomUUID().toString().replaceAll("-",""));
  8. }

生产者对应的监听器

  1. @RocketMQTransactionListener
  2. publicclassProducerTxListenerimplementsRocketMQLocalTransactionListener{
  3. @Resource
  4. privateBusinessServicebs;
  5. @Override
  6. publicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){
  7. //这里执行本地的事务操作,比如保存数据。
  8. try{
  9. //创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据
  10. Stringid=(String)msg.getHeaders().get("BID");
  11. Usersusers=newJsonMapper().readValue((byte[])msg.getPayload(),Users.class);
  12. System.out.println("消息内容:"+users+"\\t参与数据:"+arg+"\\t本次事务的唯一编号:"+id);
  13. bs.save(users,newUsersLog(users.getId(),id));
  14. }catch(Exceptione){
  15. e.printStackTrace();
  16. returnRocketMQLocalTransactionState.ROLLBACK;
  17. }
  18. returnRocketMQLocalTransactionState.COMMIT;
  19. }
  20. @Override
  21. publicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){
  22. //这里检查本地事务是否执行成功
  23. Stringid=(String)msg.getHeaders().get("BID");
  24. System.out.println("执行查询ID为:"+id+"的数据是否存在");
  25. UsersLogusersLog=bs.queryUsersLog(id);
  26. if(usersLog==null){
  27. returnRocketMQLocalTransactionState.ROLLBACK;
  28. }
  29. returnRocketMQLocalTransactionState.COMMIT;
  30. }
  31. }

消费端

  1. @RocketMQMessageListener(topic="tx-topic",consumerGroup="consumer05-group",selectorExpression="tag10")
  2. @Component
  3. publicclassConsumerTxListenerimplementsRocketMQListener<Users>{
  4. @Override
  5. publicvoidonMessage(Usersusers){
  6. System.out.println("TX接收到消息:"+users);
  7. }
  8. }

Service

  1. @Transactional
  2. publicbooleansave(Usersusers,UsersLogusersLog){
  3. usersRepository.save(users);
  4. usersLogRepository.save(usersLog);
  5. if(users.getId()==1){
  6. thrownewRuntimeException("数据错误");
  7. }
  8. returntrue;
  9. }
  10. publicUsersLogqueryUsersLog(Stringbid){
  11. returnusersLogRepository.findByBid(bid);
  12. }

Controller

  1. @GetMapping("/tx/{id}")
  2. publicObjectsendTx(@PathVariable("id")Longid){
  3. ps.sendTx("tx-topic",id,"tag10");
  4. return"sendtransactionsuccess";
  5. }

测试

调用接口后,控制台输出:

SpringBoot整合RocketMQ事务/广播/顺序消息

从打印日志看出来都保存完毕了后 消费端才接受到消息。

SpringBoot整合RocketMQ事务/广播/顺序消息

SpringBoot整合RocketMQ事务/广播/顺序消息

删除数据,再测试ID为1会报错的。

SpringBoot整合RocketMQ事务/广播/顺序消息

数据库中没有数据。。。

是不是也不是很复杂,2个阶段来处理。

完毕!!!

原文地址:https://www.toutiao.com/i6944980898676769317/

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 SpringBoot整合RocketMQ事务/广播/顺序消息 https://www.kuaiidc.com/112095.html

相关文章

发表评论
暂无评论