环境:springboot2.3.9RELEASE + RocketMQ4.8.0
依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.0</version>
- </dependency>
配置文件
- server:
- port:8080
- —
- rocketmq:
- nameServer:localhost:9876
- producer:
- group:demo-mq
普通消息
发送
- @Resource
- privateRocketMQTemplaterocketMQTemplate;
- publicvoidsend(Stringmessage){
- rocketMQTemplate.convertAndSend("test-topic:tag2",MessageBuilder.withPayload(message).build());
- }
接受
- @RocketMQMessageListener(topic="test-topic",consumerGroup="consumer01-group",selectorExpression="tag1||tag2")
- @Component
- publicclassConsumerListenerimplementsRocketMQListener<String>{
- @Override
- publicvoidonMessage(Stringmessage){
- System.out.println("接收到消息:"+message);
- }
- }
顺序消息
发送
- @Resource
- privateRocketMQTemplaterocketMQTemplate;
- publicvoidsendOrder(Stringtopic,Stringmessage,Stringtags,intid){
- rocketMQTemplate.asyncSendOrderly(topic+":"+tags,MessageBuilder.withPayload(message).build(),
- "order-"+id,newSendCallback(){
- @Override
- publicvoidonSuccess(SendResultsendResult){
- System.err.println("msg-id:"+sendResult.getMsgId()+":"+message+"\\tqueueId:"+sendResult.getMessageQueue().getQueueId());
- }
- @Override
- publicvoidonException(Throwablee){
- e.printStackTrace();
- }
- });
- }
这里是根据hashkey将消息发送到不同的队列中
- @RocketMQMessageListener(topic="order-topic",consumerGroup="consumer02-group",
- selectorExpression="tag3||tag4",consumeMode=ConsumeMode.ORDERLY)
- @Component
- publicclassConsumerOrderListenerimplementsRocketMQListener<String>{
- @Override
- publicvoidonMessage(Stringmessage){
- System.out.println(Thread.currentThread().getName()+"接收到Order消息:"+message);
- }
- }
consumeMode = ConsumeMode.ORDERLY,指明了消息模式为顺序模式,一个队列,一个线程。
结果
当consumeMode = ConsumeMode.CONCURRENTLY执行结果如下:
集群/广播消息模式
发送端
- @Resource
- privateRocketMQTemplaterocketMQTemplate;
- publicvoidsend(Stringtopic,Stringmessage,Stringtags){
- rocketMQTemplate.send(topic+":"+tags,MessageBuilder.withPayload(message).build());
- }
集群消息模式
消费端
- @RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",
- selectorExpression="tag6||tag7",messageModel=MessageModel.CLUSTERING)
- @Component
- publicclassConsumerBroadListenerimplementsRocketMQListener<String>{
- @Override
- publicvoidonMessage(Stringmessage){
- System.out.println("ConsumerBroadListener1接收到消息:"+message);
- }
- }
messageModel = MessageModel.CLUSTERING
测试
启动两个服务分别端口是8080,8081
8080服务
8081服务
集群消息模式下,每个服务分别接收一部分消息,实现了负载均衡
广播消息模式
消费端
- @RocketMQMessageListener(topic="broad-topic",consumerGroup="consumer03-group",
- selectorExpression="tag6||tag7",messageModel=MessageModel.BROADCASTING)
- @Component
- publicclassConsumerBroadListenerimplementsRocketMQListener<String>{
- @Override
- publicvoidonMessage(Stringmessage){
- System.out.println("ConsumerBroadListener1接收到消息:"+message);
- }
- }
messageModel = MessageModel.BROADCASTING
测试
启动两个服务分别端口是8080,8081
8080服务
8081服务
集群消息模式下,每个服务分别都接受了同样的消息。
事务消息
RocketMQ事务的3个状态
TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息
TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。
RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程 整体流程为:
正常事务发送与提交阶段
1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)
2、服务端响应消息写入结果,半消息发送成功
3、开始执行本地事务
4、根据本地事务的执行状态执行Commit或者Rollback操作
事务信息的补偿流程
1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求
2、生产者收到确认回查请求后,检查本地事务的执行状态
3、根据检查后的结果执行Commit或者Rollback操作
补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。
发送端
- @Resource
- privateRocketMQTemplaterocketMQTemplate;
- publicvoidsendTx(Stringtopic,Longid,Stringtags){
- rocketMQTemplate.sendMessageInTransaction(topic+":"+tags,MessageBuilder.withPayload(
- newUsers(id,UUID.randomUUID().toString().replaceAll("-",""))).
- setHeader("BID",UUID.randomUUID().toString().replaceAll("-","")).build(),
- UUID.randomUUID().toString().replaceAll("-",""));
- }
生产者对应的监听器
- @RocketMQTransactionListener
- publicclassProducerTxListenerimplementsRocketMQLocalTransactionListener{
- @Resource
- privateBusinessServicebs;
- @Override
- publicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){
- //这里执行本地的事务操作,比如保存数据。
- try{
- //创建一个日志记录表,将这唯一的ID存入数据库中,在下面的check方法中可以根据这个id查询是否有数据
- Stringid=(String)msg.getHeaders().get("BID");
- Usersusers=newJsonMapper().readValue((byte[])msg.getPayload(),Users.class);
- System.out.println("消息内容:"+users+"\\t参与数据:"+arg+"\\t本次事务的唯一编号:"+id);
- bs.save(users,newUsersLog(users.getId(),id));
- }catch(Exceptione){
- e.printStackTrace();
- returnRocketMQLocalTransactionState.ROLLBACK;
- }
- returnRocketMQLocalTransactionState.COMMIT;
- }
- @Override
- publicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){
- //这里检查本地事务是否执行成功
- Stringid=(String)msg.getHeaders().get("BID");
- System.out.println("执行查询ID为:"+id+"的数据是否存在");
- UsersLogusersLog=bs.queryUsersLog(id);
- if(usersLog==null){
- returnRocketMQLocalTransactionState.ROLLBACK;
- }
- returnRocketMQLocalTransactionState.COMMIT;
- }
- }
消费端
- @RocketMQMessageListener(topic="tx-topic",consumerGroup="consumer05-group",selectorExpression="tag10")
- @Component
- publicclassConsumerTxListenerimplementsRocketMQListener<Users>{
- @Override
- publicvoidonMessage(Usersusers){
- System.out.println("TX接收到消息:"+users);
- }
- }
Service
- @Transactional
- publicbooleansave(Usersusers,UsersLogusersLog){
- usersRepository.save(users);
- usersLogRepository.save(usersLog);
- if(users.getId()==1){
- thrownewRuntimeException("数据错误");
- }
- returntrue;
- }
- publicUsersLogqueryUsersLog(Stringbid){
- returnusersLogRepository.findByBid(bid);
- }
Controller
- @GetMapping("/tx/{id}")
- publicObjectsendTx(@PathVariable("id")Longid){
- ps.sendTx("tx-topic",id,"tag10");
- return"sendtransactionsuccess";
- }
测试
调用接口后,控制台输出:
从打印日志看出来都保存完毕了后 消费端才接受到消息。
删除数据,再测试ID为1会报错的。
数据库中没有数据。。。
是不是也不是很复杂,2个阶段来处理。
完毕!!!
原文地址:https://www.toutiao.com/i6944980898676769317/