-
- 问题描述:
自动删除的队列在网络断开恢复后不能自动创建并绑定关系。
-
- 解决办法:
在RabbitMQConfig中分别声明两个连接工厂类,其中一个用于RabbitAdmin声明的连接,另一个用于生产者和消费者的连接。其中用于RabbitAdmin声明的连接不设置连接、通道、恢复的事件监听,用于生产者和消费者的连接需要设置连接、通道、恢复的事件监听,并且在通道监听事件“onCreate(Channel channel, boolean b)”方法中调用声明队列和绑定关系。
-
- 参考示例:
CalcRabbitMQConfig
package com.cdjdgm.calc.config.rabbitmq; import com.cdjdgm.calc.component.rabbitmq.CalcRabbitEventListener; import com.cdjdgm.calc.component.rabbitmq.CalcRabbitMessageConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import java.util.concurrent.atomic.AtomicInteger; @Configuration @EnableRabbit @ConditionalOnBean(name = "calcRabbitMQProperties") public class CalcRabbitMQConfig { private final Logger logger = LoggerFactory.getLogger(getClass()); private final String prefix = "calc"; private final AtomicInteger connectionNameStrategyCounter1 = new AtomicInteger(); private final AtomicInteger connectionNameStrategyCounter2 = new AtomicInteger(); @Primary @Bean(name = "calcRabbitConnectionFactory") public CachingConnectionFactory calcRabbitConnectionFactory(CalcRabbitMQProperties calcRabbitMQProperties,// CalcRabbitEventListener calcRabbitEventListener) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); if (calcRabbitMQProperties.getHost().contains(":")) { connectionFactory.setAddresses(calcRabbitMQProperties.getHost()); } else { connectionFactory.setHost(calcRabbitMQProperties.getHost()); connectionFactory.setPort(calcRabbitMQProperties.getPort()); } connectionFactory.setUsername(calcRabbitMQProperties.getUsername()); connectionFactory.setPassword(calcRabbitMQProperties.getPassword()); connectionFactory.setVirtualHost(calcRabbitMQProperties.getVirtualHost()); connectionFactory.setPublisherReturns(calcRabbitMQProperties.getPublisherReturns()); connectionFactory.setPublisherConfirmType(calcRabbitMQProperties.getPublisherConfirmType()); connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true); connectionFactory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(true); connectionFactory.getRabbitConnectionFactory().setNetworkRecoveryInterval(5000L); connectionFactory.setConnectionThreadFactory(new CustomizableThreadFactory("calc-rabbitmq-")); connectionFactory.addConnectionListener(calcRabbitEventListener); connectionFactory.addChannelListener(calcRabbitEventListener); connectionFactory.setRecoveryListener(calcRabbitEventListener); connectionFactory.setConnectionNameStrategy(connectionFactory1 -> "[" + prefix + "]ConnectionFactory" + // ":" + connectionNameStrategyCounter1.getAndIncrement()); return connectionFactory; } @Bean(name = "calcRabbitConnectionFactoryAdmin") public CachingConnectionFactory calcRabbitConnectionFactoryAdmin(CalcRabbitMQProperties calcRabbitMQProperties) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); if (calcRabbitMQProperties.getHost().contains(":")) { connectionFactory.setAddresses(calcRabbitMQProperties.getHost()); } else { connectionFactory.setHost(calcRabbitMQProperties.getHost()); connectionFactory.setPort(calcRabbitMQProperties.getPort()); } connectionFactory.setUsername(calcRabbitMQProperties.getUsername()); connectionFactory.setPassword(calcRabbitMQProperties.getPassword()); connectionFactory.setVirtualHost(calcRabbitMQProperties.getVirtualHost()); connectionFactory.setPublisherReturns(calcRabbitMQProperties.getPublisherReturns()); connectionFactory.setPublisherConfirmType(calcRabbitMQProperties.getPublisherConfirmType()); connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true); connectionFactory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(true); connectionFactory.getRabbitConnectionFactory().setNetworkRecoveryInterval(5000L); connectionFactory.setConnectionThreadFactory(new CustomizableThreadFactory("calc-rabbitmq-admin-")); connectionFactory.setConnectionNameStrategy(connectionFactory2 -> "[" + prefix + "]ConnectionFactoryAdmin" + // ":" + connectionNameStrategyCounter2.getAndIncrement()); return connectionFactory; } @Primary @Bean(name = "calcRabbitAdmin") public RabbitAdmin calcRabbitAdmin(@Qualifier("calcRabbitConnectionFactoryAdmin") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } @Primary @Bean(name = "calcRabbitTemplate") public RabbitTemplate calcRabbitTemplate(@Qualifier("calcRabbitConnectionFactory") ConnectionFactory connectionFactory,// @Value("${spring.rabbitmq.calc.template.mandatory:true}") Boolean mandatory,// CalcRabbitMessageConverter messageConverter) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setEncoding("UTF-8"); rabbitTemplate.setMandatory(mandatory); rabbitTemplate.setMessageConverter(messageConverter); return rabbitTemplate; } // @Primary // @Bean(name = "calcRabbitContainerFactory") // public SimpleRabbitListenerContainerFactory calcRabbitContainerFactory(@Qualifier("calcRabbitConnectionFactory") ConnectionFactory connectionFactory,// // CalcRabbitMessageConverter messageConverter) { // SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // factory.setConnectionFactory(connectionFactory); // factory.setMessageConverter(messageConverter); // factory.setAcknowledgeMode(AcknowledgeMode.NONE); // return factory; // } } |
CalcRabbitComponent
package com.cdjdgm.calc.component.rabbitmq; import com.cdjdgm.calc.config.common.CalcConstants; import com.cdjdgm.calc.config.rabbitmq.CalcRabbitMQConfig; import com.cdjdgm.calc.config.rabbitmq.CalcRabbitMQProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.Random; @Component @ConditionalOnBean(CalcRabbitMQConfig.class) public class CalcRabbitComponent { private final Logger logger = LoggerFactory.getLogger(getClass()); @Resource(name = "calcRabbitMQProperties") private CalcRabbitMQProperties calcRabbitMQProperties; @Resource(name = "calcRabbitConnectionFactory") private ConnectionFactory connectionFactory; @Resource(name = "calcRabbitAdmin") private RabbitAdmin calcRabbitAdmin; @Resource(name = "calcRabbitMessageListener") private CalcRabbitMessageListener calcRabbitMessageListener; private TopicExchange dcExchange = null; private TopicExchange healthExchange = null; private TopicExchange lifeExchange = null; private Queue dcQueue = null; private String dcQueueName = ""; private SimpleMessageListenerContainer container = null; @PostConstruct public void init() { logger.info("Init rabbitmq exchange and queue."); // 随机生成队列名称 dcQueueName = calcRabbitMQProperties.getQueueNameDcEquipData() + CalcConstants.HOST_NAME + "_" + // CalcConstants.HOST_ADDRESS + "_" + (new Random().nextInt(999)); // 声明DC交换机 dcExchange = new TopicExchange(calcRabbitMQProperties.getExchangeDcEquipData(), false, false); calcRabbitAdmin.declareExchange(dcExchange);// 申明交换机 // 声明HEALTH交换机 healthExchange = new TopicExchange(calcRabbitMQProperties.getExchangeCalcHealthData(), true, false); calcRabbitAdmin.declareExchange(healthExchange);// 申明交换机 // 声明LIFE交换机 lifeExchange = new TopicExchange(calcRabbitMQProperties.getExchangeCalcLifeData(), true, false); calcRabbitAdmin.declareExchange(lifeExchange);// 申明交换机 // 声明绑定MQ的交换机和队列(此处不声明,在CalcRabbitEventListener中创建Channel成功后再申明) // declareQueueAndBinding(); logger.info("Init rabbitmq message listener."); // 手动注册消费监听器 container = new SimpleMessageListenerContainer(connectionFactory); // 设置消费线程数 container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(1); container.setExposeListenerChannel(true); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置消息监听器 container.setMessageListener(calcRabbitMessageListener); // 添加关注的队列 container.addQueueNames(dcQueueName); // 启动监听器(此处不启动,在CalcApplicationRunner中由应用启动完成后再统一启动) // container.start(); } @PreDestroy public void destroy() { if (container != null && container.isRunning()) { logger.info("Destroy rabbitmq message listener."); container.stop(); } }
public void declareQueueAndBinding() { // 声明从DC接收数据的交换机和队列并循环绑定 logger.info("Declare queue [{}] and binding.", dcQueueName); Map<String, Object> args = new HashMap<>(2); args.put("x-max-length", calcRabbitMQProperties.getQueueMaxLengthDcEquipData()); dcQueue = new Queue(dcQueueName, false, false, true, args); calcRabbitAdmin.declareQueue(dcQueue); // 申明队列 String[] dcEquipRouteKeys = calcRabbitMQProperties.getRouteKeysDcEquipData(); if (dcEquipRouteKeys != null && dcEquipRouteKeys.length > 0) { for (String dcEquipRouteKey : dcEquipRouteKeys) { calcRabbitAdmin.declareBinding(BindingBuilder.bind(dcQueue).to(dcExchange).with(dcEquipRouteKey)); // 申明绑定关系 } } }
public void startMessageListenerContainer() { if (container != null && !container.isRunning()) { logger.info("Start rabbitmq message listener."); container.start(); } }
public void stopMessageListenerContainer() { if (container != null && container.isRunning()) { logger.info("Stop rabbitmq message listener."); container.stop(); } } } |
CalcRabbitEventListener
package com.cdjdgm.calc.component.rabbitmq; import com.cdjdgm.calc.config.rabbitmq.CalcRabbitMQConfig; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Recoverable; import com.rabbitmq.client.RecoveryListener; import com.rabbitmq.client.ShutdownSignalException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.ChannelListener; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component @ConditionalOnBean(CalcRabbitMQConfig.class) public class CalcRabbitEventListener implements ConnectionListener, ChannelListener, RecoveryListener { private final Logger logger = LoggerFactory.getLogger(getClass()); private final String prefix = "calc"; @Resource private CalcRabbitComponent calcRabbitComponent; @Override public void onCreate(Connection connection) { logger.info("[{}]Connection created: [{}]", prefix, connection.toString()); } @Override public void onClose(Connection connection) { logger.info("[{}]Connection closed: [{}]", prefix, connection.toString()); } @Override public void onCreate(Channel channel, boolean b) { logger.info("[{}]Channel created: [{}]", prefix, channel.getChannelNumber()); // 声明队列并绑定关系 calcRabbitComponent.declareQueueAndBinding(); } @Override public void onShutDown(ShutdownSignalException signal) { logger.error("[{}]Connection shutdowned: [{}]", prefix, signal.getMessage()); } @Override public void onFailed(Exception exception) { logger.error("[{}]Connection failed: [{}]", prefix, exception.getMessage()); } @Override public void handleRecovery(Recoverable recoverable) { logger.info("[{}]Connection recovery complete: [{}]", prefix, recoverable); } @Override public void handleRecoveryStarted(Recoverable recoverable) { logger.info("[{}]Connection recovery started: [{}]", prefix, recoverable); } @Override public void handleTopologyRecoveryStarted(Recoverable recoverable) { logger.info("[{}]Connection topology recovery started: [{}]", prefix, recoverable); } } |