浅谈spring-boot-rabbitmq动态管理的方法

2025-05-29 0 72

使用spring boot + rabbitmq的时候,在开发过程中,可能会想要临时停用/启用监听,或修改监听消费者数量。如果每次修改都重启比较浪费时间,所以研究了一下不停机就启用停用监听或修改一些配置

一. 关于rabbitmq监听的配置

  1. 配置属性类:RabbitProperties,包含rabbitmq的认证、监听、发送者以及其他的一些配置
  2. 自动配置类:RabbitAutoConfiguration,主要配置rabbitmq的连接工厂和发送者等,不包含监听的配置
  3. rabbitmq监听的配置是RabbitAnnotationDrivenConfiguration,是通过RabbitAutoConfiguration引入的
?

1

2

3

4

5

6

7
@Configuration

@ConditionalOnClass({ RabbitTemplate.class, Channel.class })

@EnableConfigurationProperties(RabbitProperties.class)

@Import(RabbitAnnotationDrivenConfiguration.class)

public class RabbitAutoConfiguration {

...

}

RabbitAnnotationDrivenConfiguration中主要就是监听工厂的配置、监听工厂,但是这里也只是创建bean,并没有真正的初始化

通过配置里的bean类名,分析一下,rabbitmq的监听肯定是由监听工厂创建的,所以找到监听工厂SimpleRabbitListenerContainerFactory

?

1

2

3

4

5

6

7

8

9
@Bean

@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")

public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(

SimpleRabbitListenerContainerFactoryConfigurer configurer,

ConnectionFactory connectionFactory) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

configurer.configure(factory, connectionFactory);

return factory;

}

既然自动配置里面没有初始化监听,那就应该是在其他地方调用的,进入监听工厂类中,发现有initializeContainer(SimpleMessageListenerContainer instance)方法,猜测初始化肯定与这个方法有关,所以查看有哪些地方调用,于是找到RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory)方法中有创建监听容器和初始化的代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27
/**

* Create and start a new {@link MessageListenerContainer} using the specified factory.

* @param endpoint the endpoint to create a {@link MessageListenerContainer}.

* @param factory the {@link RabbitListenerContainerFactory} to use.

* @return the {@link MessageListenerContainer}.

*/

protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,

RabbitListenerContainerFactory<?> factory) {

MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);

if (listenerContainer instanceof InitializingBean) {

try {

((InitializingBean) listenerContainer).afterPropertiesSet();

}

catch (Exception ex) {

throw new BeanInitializationException("Failed to initialize message listener container", ex);

}

}

int containerPhase = listenerContainer.getPhase();

if (containerPhase < Integer.MAX_VALUE) { // a custom phase value

if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {

throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +

this.phase + " vs " + containerPhase);

}

this.phase = listenerContainer.getPhase();

}

return listenerContainer;

}

继续找调用这个方法的地方,找到RabbitListenerEndpointRegistrar.afterPropertiesSet()方法之后,发现调用的地方很多了

浅谈spring-boot-rabbitmq动态管理的方法

看看afterPropertiesSet方法,是InitializingBean接口中的,猜测应该是spring容器创建bean之后都会调用的bean初始化的方法,所以查找找到RabbitListenerEndpointRegistrar是在哪里创建的实例。原来是在RabbitListenerAnnotationBeanPostProcessor中的私有属性,而RabbitListenerAnnotationBeanPostProcessor是在RabbitBootstrapConfiguration这个自动配置里面初始化的,所以这就找到rabbitmq初始化监听的源头了

二. 动态管理rabbitmq监听

回到最初的问题,想要动态的启用停用mq的监听,所以先看看初始化配置的类,既然有初始化,那可能会有相关的管理,于是在RabbitListenerEndpointRegistry中找到了start()和stop()方法,里面有对监听容器进行操作,主要源码如下

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31
/**

* @return the managed {@link MessageListenerContainer} instance(s).

*/

public Collection<MessageListenerContainer> getListenerContainers() {

return Collections.unmodifiableCollection(this.listenerContainers.values());

}

@Override

public void start() {

for (MessageListenerContainer listenerContainer : getListenerContainers()) {

startIfNecessary(listenerContainer);

}

}

/**

* Start the specified {@link MessageListenerContainer} if it should be started

* on startup or when start is called explicitly after startup.

* @see MessageListenerContainer#isAutoStartup()

*/

private void startIfNecessary(MessageListenerContainer listenerContainer) {

if (this.contextRefreshed || listenerContainer.isAutoStartup()) {

listenerContainer.start();

}

}

@Override

public void stop() {

for (MessageListenerContainer listenerContainer : getListenerContainers()) {

listenerContainer.stop();

}

}

写个controller,注入RabbitListenerEndpointRegistry,使用start()和stop()对监听进行启用停用的操作,并且RabbitListenerEndpointRegistry实例还可以获取监听容器,对监听的一些参数也能进行修改,比如消费者数量。代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43
import java.util.Set;

import javax.annotation.Resource;

import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import com.itopener.framework.ResultMap;

/**

* Created by fuwei.deng on 2017年7月24日.

*/

@RestController

@RequestMapping("rabbitmq/listener")

public class RabbitMQController {

@Resource

private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

@RequestMapping("stop")

public ResultMap stop(){

rabbitListenerEndpointRegistry.stop();

return ResultMap.buildSuccess();

}

@RequestMapping("start")

public ResultMap start(){

rabbitListenerEndpointRegistry.start();

return ResultMap.buildSuccess();

}

@RequestMapping("setup")

public ResultMap setup(int consumer, int maxConsumer){

Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds();

SimpleMessageListenerContainer container = null;

for(String id : containerIds){

container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id);

if(container != null){

container.setConcurrentConsumers(consumer);

container.setMaxConcurrentConsumers(maxConsumer);

}

}

return ResultMap.buildSuccess();

}

}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持快网idc。

原文链接:https://my.oschina.net/dengfuwei/blog/1595044

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 浅谈spring-boot-rabbitmq动态管理的方法 https://www.kuaiidc.com/113408.html

相关文章

发表评论
暂无评论