首先下载并安装rabbitmq,与springboot整合:
pom依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="rabbitConnectionFactory"
username="${rabbit_username}"
password="${rabbit_password}"
host="${rabbit_host}"
port="${rabbit_port}" />
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"
exchange="exchangeTest" />
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<!--定义queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />
<!-- 定义direct exchange,绑定queueTest -->
<rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queueTestKey"> </rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
消息生产者:
@RequestMapping(value = "/send",method = RequestMethod.GET)
public void send(@RequestParam String message) {
amqpTemplate.convertAndSend("queueTestKey",message);
}
消息消费者
@Component
public class testListener implements MessageListener {
@Override
@RabbitListener(queues = "queueTest")
public void onMessage(Message message) {
System.out.println("------消费者处理消息------");
System.out.println("receive message" + message);
}
}
其中Spring AMQP提供了一个发送和接收消息的操作模板类AmqpTemplate。 AmqpTemplate它定义包含了发送和接收消息等的一些基本的操作功能。RabbitTemplate是AmqpTemplate的一个实现。
消息的可靠性在四个步骤中都有所体现:
1.生产者到交换机
2.交换机到对应队列
3.消息在队列上存储
4.消费者成功消费
对应的解决:
1.事务/confirm机制
(confirm可以异步发送消息,不必等待消费者的返回)
2.mandatory/备份交换机
(mandatory需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂化)
3.声明队列时将durable参数置为true
4.autoAck参数,分为了自动确认和手工确认
下文将介绍其中confirm机制和手动ack机制