配置文件
首先需要在配置文件添加:
#确认消息已发送到交换机
publisher-confirm-type: correlated
配置文件整体为:
配置类代码
/**
* 配置类 发布确认
*/
@Configuration
public class ConfirmConfig {
//普通交换机
public static final String CONFIRM_EXCHANGE_NAME="confirm_exchange";
//普通队列
public static final String CONFIRM_QUEUE_NAME="confirm_queue";
//RoutingKey
public static final String CONFIRM_EXCHANGE_ROUTING_KEY="key1";
//声明普通交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
//声明普通队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//绑定普通交换机和普通队列
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange")DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_EXCHANGE_ROUTING_KEY);
}
回调接口配置
/**
*回调接口
*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
//内部接口注入类中
rabbitTemplate.setConfirmCallback(this);
}
/**
*交换机确定回调方法
* 1.发消息 交换机接收到消息 回调
* 1.1 correlationData 保存回调消息的ID及相关信息
* 1.2 交换机收到消息 ack=true
* 1.3 cause null
* 2.发消息 交换机接受失败 回调
* 2.1 correlationData 保存回调消息的ID及相关信息
* 2.2 交换机收到消息 ack=false
* 2.3 cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if(ack){
log.info("交换机已经收到id为:{}的消息",id);
}
else {
log.info("交换机未经收到id为:{}的消息,原因为:{}",id,cause);
}
}
生产者代码
@Slf4j
@RestController
@RequestMapping("confirm")
public class ProduceController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData1 = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_EXCHANGE_ROUTING_KEY,message,correlationData1);
log.info("发送的消息内容为:{}",message);
}
消费者代码
/**
* 接收消息
*/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message){
String msg = new String(message.getBody());
log.info("接受到的队列confirm.queue消息:{}",msg);
}
}
测试
先测试一下正确的情况
消息已发出,交换机正常接收转发消息,消费者正常接收到消息
现在将发消息时的交换机名改错,测试一下
在发消息选择的交换机名字后面拼接1,这样交换机名字就变成了CONFIRM_EXCHANGE_NAME1,我们之前没有声明过,现在测试发消息
交换机未收到消息。
测试结束
但是到现在为止只能确认交换机是否收到信息,如队列出现问题,如不可路由等情况,消息会被直接丢弃,且生产者不知情,就会出现问题
所以可以继续学习:消息回报(队列确认),备份交换机
同系列文章
原理部分
MQ(消息队列)简介
RabbitMQ简介
RabbitMQ 四大核心概念及工作原理
操作部分
Windows版Docker安装RabbitMq
Maven整合RabbitMQ实现生产消费消息
SpringBoot整合RabbitMQ实现生产消费消息
RabbitMQ延迟队列及实战
RabbitMQ-消息回报(队列确认)
RabbitMQ-备份交换机
RabbitMQ-优先级队列