SpringBoot整合RabbitMQ, 实现生产者与消费者的功能

2025-05-29 0 18

自然,依赖是少不了的。除了spring-boot-starter-web依赖外。
就这个是最主要的依赖了,其他的看着办就是了。我用的是gradle,用maven的看着弄也一样的。无非就是包+包名+版本

?

1

2
//AMQP

compile('org.springframework.boot:spring-boot-starter-amqp:2.0.4.RELEASE')

这里有一个坑。导致我后来发送消息时一直连不上去。报错: java.net.SocketException: socket closed。
我去网上寻找了许多方案。大致都是一个意思。没有设置远程连接权限。让我添加一个用户,并且设置最大权限。

下面是添加rabbitmq用户的命令

?

1

2

3

4

5

6
#rabbitmqctl add_user 账号 密码

rabbitmqctl add_user admin 614

#分配用户标签(admin为要赋予administrator权限的刚创建的那个账号的名字)

rabbitmqctl set_user_tags admin administrator

#设置权限,开启远程访问

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

我用完之后去管控台(http://ip:15672)看了一下用户列表。确实已经添加上去了,也是最大权限。
然鹅并没有什么卵用
后来强行摸索出来了,原来是版本差异的原因。我SpringBoot本来是使用的是2.0.3版本,然后AMQP我使用的是2.0.4。可能有什么不兼容的地方。
把Springboot和AMQP的版本给同步成一个就好了。别的版本差一点根本没啥问题,就AMQP特殊,也是醉了。

使用SpriongBoot的yml配置:重点是rabbitmq那一栏
设置好登录用户、密码、地址端口、虚拟地址、超时时间就可以了

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35
server:

port: 8080

servlet:

context-path: /

spring:

http:

encoding:

charset: UTF-8

jackson:

#前端页面传Date值时格式化

date-format: yyyy-MM-dd HH:mm:ss

time-zone: GMT+8

datasource:

driver-class-name: com.mysql.cj.jdbc.Driver

url: jdbc:mysql://192.168.194.128:3306/mysql?serverTimezone=Asia/Shanghai

username: root

password: 614

rabbitmq:

port: 5672

host: 192.168.194.128

username: admin

password: 614

virtual-host: /

connection-timeout: 15s

#Redis配置

redis:

host: 192.168.194.128

port: 6379

#Redis连接池配置

jedis:

pool:

min-idle: 0

max-idle: 8

max-active: 8

max-wait: -1ms

这里又有个小坑,这个rabbitmq的超时时间(connection-timeout)配的我真的是醉了,我看的教程里写的是15000,表示15秒,我一输之后IDEA直接报红线啊。
网上一找,全特么用毫秒值配的,行吧,应该我们用的不是一个版本。
点开看下这参数接受一个java.time.Duration对象,百思不得其解。这玩意咋配?我不会啊。找了二十分钟的攻略才知道是这样子配的,使用数字+时间标志。比如1h、1M、1m、1d、1s、1ms这种格式就行了。


咳咳,配置文件弄好后也就差不多可以使用rabbitmq发消息了。
生产端发消息。只需要使用RabbitTemplate类就够了,看到这个名字,有没有一种很熟悉的感觉?
Redis也有个这玩意 叫RedisTemplate

关于发消息,在这儿最好还是先指定好exchange和routingKey,即交换机和路由键。
这样发过去的消息才能被发到指定的交换机上,然后交换机在通过你的routingKey来发送给绑定了该routingKey的所有队列。
所以首先登陆管控台(http://ip:15672),到Exchanges和Queues菜单下,创建好交换机和队列,还有他们之间的routingKey。这个步骤我就不详细描述了。单靠语言不怎么能够描述清楚。估计得配很多图,有需要的自行google把。

万事俱备。正式开始发送消息。
先准备一个要发的玩意。根据业务需求自己创个model就行。我这随便写一个。
关于这个messageId,及消息唯一ID。他的作用是将该条消息数据和RabbitMQ发送的消息绑定起来。不要也不是不行。只是最好还是设置一个这个参数。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32
package com.skypyb.rabbitmq.entity;

import java.io.Serializable;

public class User1 implements Serializable{

private Long id;

private String name;

private String messageId;//储存消息发送的唯一标识

public User1() {

}

public User1(Long id, String name, String messageId) {

this.id = id;

this.name = name;

this.messageId = messageId;

}

public Long getId() {

return id;

}

public void setId(Long id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public String getMessageId() {

return messageId;

}

public void setMessageId(String messageId) {

this.messageId = messageId;

}

}

要发送的数据模型已经准备好,接下来这个类是一个重点。即发送消息的类。
注入RabbbitTemplate,然后就可以通过他的 convertSendAndReceive() 方法进行消息的发送。
他有很多种重载,最好是选用我这种,比较可控。交换机、路由键、消息唯一ID全部指定好。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21
package com.skypyb.rabbitmq.producer;

import com.skypyb.rabbitmq.entity.User1;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.rabbit.support.CorrelationData;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

@Component("user1Sender")

public class User1Sender {

@Autowired

private RabbitTemplate rabbitTemplate;//操作rabbitmq的模板

public void send(User1 user1){

CorrelationData correlationData= new CorrelationData();

correlationData.setId(user1.getMessageId());

rabbitTemplate.convertSendAndReceive(

"user1-exchange",//exchange

"user1.key1",//routingKey

user1,//消息体内容

correlationData//消息唯一ID

);

}

}

emmmm,是不是感觉还是挺简单的。一个方法调用,消息就过去了。就发送到指定的交换机了。交换机再通过你的routingKey转发给绑定在上边的队列。生产端这边就完事了。

写个测试类测试一下。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25
package com.skypyb.test;

import com.skypyb.rabbitmq.Application;

import com.skypyb.rabbitmq.entity.User1;

import com.skypyb.rabbitmq.producer.User1Sender;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

import java.util.UUID;

@RunWith(SpringRunner.class)

@SpringBootTest(classes = Application.class)

public class TestOne {

@Autowired

private User1Sender user1Sender;

@Test

public void testSend1(){

User1 user1 = new User1();

user1.setId(1L);

user1.setName("测试用户1");

user1.setMessageId("user1$"+System.currentTimeMillis()+"$"+ UUID.randomUUID().toString());

user1Sender.send(user1);

}

}

运行完毕后。登陆管控台(http://ip:15672),进入Queues菜单。即可发现消息队列中已接收到一条消息,会是一个等待消费的状态。
至于到底是哪个消息队列来处理嘛,那就得看你的exchange通过你的routingKey具体把消息转发到哪儿了。这个都是在管控台里边配置的。

生产端准备完毕。接下来是消费端。消费端也很简单,yml需要添加消费端的配置。签收模式最好选择手动签收。可控。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46
server:

port: 8081

servlet:

context-path: /

spring:

http:

encoding:

charset: UTF-8

jackson:

#前端页面传Date值时格式化

date-format: yyyy-MM-dd HH:mm:ss

time-zone: GMT+8

datasource:

driver-class-name: com.mysql.cj.jdbc.Driver

url: jdbc:mysql://192.168.194.128:3306/mysql?serverTimezone=Asia/Shanghai

username: root

password: 614

#rabbitmq基本配置

rabbitmq:

addresses: 192.168.194.128:5672

username: admin

password: 614

virtual-host: /

connection-timeout: 15s

#rabbitmq消费端配置

listener:

simple:

#并发数

concurrency: 5

#最大并发数

max-concurrency: 10

#签收模式:手工签收、自动签收

acknowledge-mode: manual

#限流,在此消费端同一时间只有一条消息消费

prefetch: 1

#Redis配置

redis:

host: 192.168.194.128

port: 6379

#Redis连接池配置

jedis:

pool:

min-idle: 0

max-idle: 8

max-active: 8

max-wait: -1ms

具体的消费者,具体解释都写在注释中了。

关于@Exchange注解中设置的交换机的type属性,主要是用这些值:

  • fanout:会把所有发到Exchange的消息路由到所有和它绑定的Queue
  • direct:会把消息路由到routing key和binding key完全相同的Queue,不相同的丢弃
  • topic:direct是严格匹配,那么topic就算模糊匹配,routing key和binding key都用.来区分单词串,比如A.B.C,*匹配任意单词,#匹配任意多个或0个单词,比如。A.B.*可以匹配到A.B.C
  • headers:不依赖routing key和binding key,通过对比消息属性中的headers属性,对比Exchange和Queue绑定时指定的键值对,相同就路由过来

basicAck()方法可以确认消息消费。执行后,消息队列中这条消息就没了。multiple参数表示是否批量消费,一般都选false。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41
package com.skypyb.rabbitmq.controller;

import com.rabbitmq.client.Channel;

import com.skypyb.rabbitmq.entity.User1;

import org.springframework.amqp.rabbit.annotation.*;

import org.springframework.amqp.support.AmqpHeaders;

import org.springframework.messaging.handler.annotation.Headers;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Component;

import java.io.IOException;

import java.util.Map;

@Component

public class User1Receiver {

/**

* @param user1 消息体,使用 @Payload 注解

* @param headers 消息头,使用 @Headers 注解

* @param channel

*/

/*@RabbitListener表示监听的具体队列.

bindings属性代表绑定。里边有几个值填写,填写好绑定的队列名字和交换机名字

指定好routingKey。若指定的这些参数不存在的话。则会自行给你创建好

durable代表是否持久化

*/

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = "user1-queue", durable = "true"),

exchange = @Exchange(name = "user1-exchange", durable = "true", type = "topic"),

key = "user1.#"

)

)

@RabbitHandler//标识这个方法用于消费消息

public void onUser1Message(@Payload User1 user1,

@Headers Map<String, Object> headers,

Channel channel) throws IOException {

//消费者操作

System.out.println("-------收到消息辣!-----");

System.out.println("发过来的用户名为:" + user1.getName());

//basicAck()表示确认已经消费消息。通知一下mq,需要先得到 delivery tag

//delivery tag可以从消息头里边get出来

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

channel.basicAck(deliveryTag, false);

}

}

把消费端的服务打开后,就已经在监听了。若监听的队列中已有消息,则会立即处理。直到队列中没消息为止。
若队列为空,他就不会动,这个时候我启动一下生产者那边的测试,消息一发出去,立马就被消费。非常完美。就是这个效果。

呼,偶尔也不想咸鱼了啊,今天一天大概把RabbitMQ搞明白一些了,配置也会配了,消息也会发了。踩了一万个坑,有不少是那种比较SB的采坑方式,一般人应该踩不到,我就不打出来了。
还是感觉有很多收获的。就是累成麻瓜了。

以上就是SpringBoot整合RabbitMQ, 实现生产者消费者的功能的详细内容,更多关于SpringBoot整合RabbitMQ, 实现生产者消费者的功能的资料请关注快网idc其它相关文章!

原文链接:https://www.skypyb.com/2018/12/uncategorized/755/

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 SpringBoot整合RabbitMQ, 实现生产者与消费者的功能 https://www.kuaiidc.com/108595.html

相关文章

发表评论
暂无评论