1 导入maven依赖jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2 配置application.properties
#mq
spring.rabbitmq.host=192.168.43.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=my_vhost
# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirm-type=correlated
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
3 配置普通交换机,队列,消息生成者,消费者
配置交换机和队列进行绑定
package com.example.springbootdemo.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@Configuration
public class RabbitmqTest {
public static final String STATION_EXCHANGE = "station-exchange";
public static final String STATION_QUEUE_11 = "station-queue-11";
public static final String STATION_QUEUE_197 = "station-queue-197";
@Bean
public Binding testBinging(TopicExchange exchange,Queue testQueue){
return BindingBuilder.bind(testQueue).to(exchange).with("#");
}
@Bean
public TopicExchange stationExchange(){
return new TopicExchange(STATION_EXCHANGE,true,false,null);
}
@Bean
public Queue stationQueue(){
return new Queue(STATION_QUEUE_11,true,false,false,null);
}
@Bean
public Queue stationQueue197(){
return new Queue(STATION_QUEUE_197,true,false,false,null);
}
@Bean
public Binding stationBinding(TopicExchange stationExchange,Queue stationQueue){
return BindingBuilder.bind(stationQueue).to(stationExchange).with("11");
}
@Bean
public Binding stationBinding2(TopicExchange stationExchange,Queue stationQueue197){
return BindingBuilder.bind(stationQueue197).to(stationExchange).with("197");
}
}
配置消费者
package com.example.springbootdemo.listener;
import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class StationListener {
@RabbitListener(queues = RabbitmqTest.STATION_QUEUE_11)
public void station11(Message message){
log.info("11监听到消息: {}",new String(message.getBody()));
}
@RabbitListener(queues = RabbitmqTest.STATION_QUEUE_197)
public void station197(Message message){
log.info("197监听到消息: {}",new String(message.getBody()));
}
}
消息生产者
package com.example.springbootdemo.rabbitmq.production;
import com.example.springbootdemo.config.rabbitmq.RabbitmqTemplateConfirm;
import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Slf4j
public class TestMessageProduction {
@Resource
private RabbitmqTemplateConfirm rabbitmqTemplateConfirm;
public void testMessage(String key){
log.info("消息发送确认");
RabbitTemplate rabbitTemplate = rabbitmqTemplateConfirm.getRabbitTemplate();
rabbitTemplate.convertAndSend(RabbitmqTest.STATION_EXCHANGE,key,"消息发送者确认");
}
}
4 配置延时交换机和队列,生产者,消费者
配置延时交换机和队列
package com.example.springbootdemo.rabbitmq.config;
import com.google.common.collect.Maps;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class PayMessageMq {
public static final String ONLINE_PAY_EXCHANGE = "online-pay-exchange";
public static final String ONLINE_PAY_QUEUE = "online-pay-queue";
public static final String REFUND_EXCHANGE = "refund-exchange";
public static final String REFUND_QUEUE = "refund-queue";
@Bean
public CustomExchange onlinePayExchange(){
Map<String, Object> args = Maps.newHashMap();
//自定义交换机的类型,指定分发方式
args.put("x-delayed-type", "topic");
//此处type指定为延迟交换机
return new CustomExchange(ONLINE_PAY_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Queue onlinePayQueue(){
return new Queue(ONLINE_PAY_QUEUE,true,false,false);
}
@Bean
public Binding onlineBinding(Queue onlinePayQueue, CustomExchange onlinePayExchange){
return BindingBuilder.bind(onlinePayQueue).to(onlinePayExchange).with("#").noargs();
}
@Bean
public CustomExchange refundExchange(){
Map<String,Object> args = Maps.newHashMap();
args.put("x-delayed-type", "topic");
return new CustomExchange(REFUND_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Queue refundQueue(){
return new Queue(REFUND_QUEUE, true,false,false);
}
@Bean
public Binding refundBinding(CustomExchange refundExchange,Queue refundQueue){
return BindingBuilder.bind(refundQueue).to(refundExchange).with("#").noargs();
}
}
配置消费者
package com.example.springbootdemo.listener;
import com.example.springbootdemo.application.pay.OnlinePay;
import com.example.springbootdemo.application.pay.ali.response.QueryAliPayResponse;
import com.example.springbootdemo.entity.Order;
import com.example.springbootdemo.entity.PaymentRecord;
import com.example.springbootdemo.entity.dict.PaymentRecordEnumStatus;
import com.example.springbootdemo.rabbitmq.config.PayMessageMq;
import com.example.springbootdemo.rabbitmq.entity.QueryPayMessageEvent;
import com.example.springbootdemo.rabbitmq.entity.RefundMessageEvent;
import com.example.springbootdemo.rabbitmq.production.MessageProduction;
import com.example.springbootdemo.service.PaymentRecordService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Objects;
@Slf4j
@Component
public class OnlinePayListener {
@Resource
private OnlinePay onlinePay;
@Resource
private ObjectMapper objectMapper;
@Resource
private PaymentRecordService paymentRecordService;
@Resource
private MessageProduction messageProduction;
@RabbitListener(queues = PayMessageMq.ONLINE_PAY_QUEUE)
public void queryPay(@Payload QueryPayMessageEvent event) throws JsonProcessingException {
if(Objects.nonNull(event) && event.getQueryNum() <= 16) {
log.info("查询支付结果orderCod: {}", event.getOrderCode());
String payStatus = onlinePay.queryPayStatus(Order.builder().code(event.getOrderCode()).build());
QueryAliPayResponse queryAliPayResponse = objectMapper.readValue(payStatus, QueryAliPayResponse.class);
QueryAliPayResponse.AlipayTradeQueryResponse alipayTradeQueryResponse = queryAliPayResponse.getAlipay_trade_query_response();
if(Objects.nonNull(alipayTradeQueryResponse) && alipayTradeQueryResponse.getCode().equals("10000") && alipayTradeQueryResponse.getTrade_status().equals("TRADE_SUCCESS")){
PaymentRecord paymentRecord = paymentRecordService.lambdaQuery().eq(PaymentRecord::getOrderCode, event.getOrderCode()).one();
paymentRecord.setTradeNo(alipayTradeQueryResponse.getTrade_no());
paymentRecord.setStatus(PaymentRecordEnumStatus.SUCCESS);
paymentRecordService.updateById(paymentRecord);
}else {
event.setQueryNum(event.getQueryNum()+1);
messageProduction.queryPayRecord(event,59*1000);
}
log.info("支付结果: {}", payStatus);
}
}
public void queryRefund(@Payload RefundMessageEvent event){
if(Objects.nonNull(event) && event.getQueryNum() <= 16) {
log.info("查询退款结果orderCode: {}", event.getOrderCode());
String queryRefund = onlinePay.queryRefund(event.getOrderCode(), event.getOutRequestNo(),event.getTradeNo());
System.out.println(queryRefund);
}else {
}
}
}
生产者
package com.example.springbootdemo.rabbitmq.production;
import com.example.springbootdemo.rabbitmq.config.PayMessageMq;
import com.example.springbootdemo.rabbitmq.entity.QueryPayMessageEvent;
import com.example.springbootdemo.rabbitmq.entity.RefundMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
public class MessageProduction {
@Resource
private RabbitTemplate rabbitTemplate;
public void queryPayRecord(QueryPayMessageEvent event,Integer delayTime){
log.info("发送查询支付结果事件:订单号: {} ",event.getOrderCode());
rabbitTemplate.convertAndSend(PayMessageMq.ONLINE_PAY_EXCHANGE,"pay",event,correlationData->{
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info("发送查询支付结果事件完成");
}
public void queryRefund(RefundMessageEvent event,Integer delayTime){
log.info("发送查询退款事件: {}",event.getOrderCode());
rabbitTemplate.convertAndSend(PayMessageMq.REFUND_EXCHANGE,"hello",event,corn->{
corn.getMessageProperties().setDelay(delayTime);
return corn;
});
log.info("发送查询退款事件完成");
}
}
5 配置某些交换机,队列进行消息发布的确认
- 在application.properties配置:
发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirm-type=correlated
发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
- 代码配置如下
package com.example.springbootdemo.config.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/***
*发送者开启 confirm 确认机制
*/
@Slf4j
@Component
public class RabbitmqTemplateConfirm implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("confirm==>发送到broker失败\r\n" +
"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
} else {
log.info("confirm==>发送到broker成功\r\n" +
"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
"replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
message, replyCode, replyText, exchange, routingKey);
}
private RabbitTemplate rabbitTemplate;
/* 连接工厂 */
@Resource
private ConnectionFactory connectionFactory;
public RabbitTemplate getRabbitTemplate(){
if(Objects.isNull(rabbitTemplate)){
rabbitTemplate=new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
return rabbitTemplate;
}
}
package com.example.springbootdemo.rabbitmq.production;
import com.example.springbootdemo.config.rabbitmq.RabbitmqTemplateConfirm;
import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Slf4j
public class TestMessageProduction {
@Resource
private RabbitmqTemplateConfirm rabbitmqTemplateConfirm;
public void testMessage(String key){
log.info("消息发送确认");
RabbitTemplate rabbitTemplate = rabbitmqTemplateConfirm.getRabbitTemplate();
rabbitTemplate.convertAndSend(RabbitmqTest.STATION_EXCHANGE,key,"消息发送者确认");
}
}
6 对某些队列进行手动ack
设置消费端手动 ack none不确认 auto自动确认 manual手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
该配置是全局的,如果只对某些队列进行手动需要进行如下配置
package com.example.springbootdemo.config.rabbitmq;
import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/***
* rabbitmq对单个队列消费这设置手动ack
*/
@Slf4j
@Configuration
public class RabbitmqConsumerAck {
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(RabbitmqTest.STATION_QUEUE_197);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
log.info(RabbitmqTest.STATION_QUEUE_197 + "get msg:" +new String(message.getBody()));
if(message.getMessageProperties().getHeaders().get("error") == null){
// 消息手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息确认");
}else {
// 消息重新回到队列
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
// 拒绝消息(删除)
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息拒绝");
}
});
return container;
}
}
7 rabbitmq其他配置
##发送重试配置
#启用发送重试
#spring.rabbitmq.template.retry.enabled=true
#最大重试次数
#spring.rabbitmq.template.retry.max-attempts=5
#第一次和第二次尝试发布或传递消息之间的间隔
#spring.rabbitmq.template.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数 步长
#spring.rabbitmq.template.retry.multiplier=2
#最大重试时间间隔
#spring.rabbitmq.template.retry.max-interval=10000ms