安装及配置RabbitMQ
使用docker安装:
//拉取镜像
docker pull rabbitmq:3.6.5-management
//创建RabbitMQ容器
docker run -id --name=rabbitmq --restart=always -p 5670:5670 -p 15670:15670 -p 5674:5674 -p 15674:15674 --publish 5671:5671 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 rabbitmq:3.6.5-management
//进入容器内部
docker exec -it rabbitmq /bin/bash
//开启相关的插件
rabbitmq-plugins enable rabbitmq_stomp rabbitmq_web_stomp rabbitmq_web_stomp_examples
//退出容器
exit
//重启容器
docker restart rabbitmq
1. MQ是什么?作用?
- 解耦,降低系统复杂度
- 异步,提高响应速度
- 削峰填谷 , 高并发访问
2. 什么情况下,系统采用MQ
- 不需要返回数据,增删改
- 允许短暂的不一致
- 大型的高并发,大数据量的应用
RabbitMQ的工作模式:
1.简单模式:
没有交换机,使用""默认交换机,只有一个消费者.一个生产者发送消息,被一个消费者消费.
2.工作队列模式:
一个生产者发送消息,多个消费者轮询接受(一人一次)
3.pub/sub订阅发布模式
群发的效果,交换机定义fanout(),所有绑定在fanout类型的交换机上的队列,都会收到消息
4.Route 路由模式
绑定:指定队列名称,交换机名称,routing key(匹配的规则)
一个交换机和队列,可以绑定多个routing key
生产者:定义交换机,指定名称,指定类型为 Direct 类型。发送消息(交换机名称,“routing key”,消息内容)
特点: 发送的消息了,根据规则routing key,可以选择性的发送到某些队列,或者是全部队列。
5.Topice 通配符模式
绑定: 可以使用通配符指定routing key,*表示一个任意字符串,#表示0个或者多个字符串
生产者:定义交换机,指定名称,指定类型为 TOPIC 类型。发送消息(交换机名称,“routing key”,消息内容) 注意: 发消息的时候,routing key要固定,根据发消息的时候routing key去匹配绑定的时候通配符。
配置类可以用通配符
特点:功能上和路由模式一样的,简化了绑定的流程,并且更加的灵活。
使用RabbitMQ
1.生产者导入POM坐标
<!--springboot工程需要继承的父工程-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
2.创建springboot启动类
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class,args);
}
}
3.创建RabbitMQ配置类
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME ="boot_topic_exchange";
public static final String QUEUE_NAME ="boot_queue1";
//1.配置交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2.配置Queue队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3.队列和交换机绑定关系 binding
/* 1.知道哪个队列
2.知道哪个交换机
3.routing key
* */
@Bean
public Binding bindingExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
4.配置文件yml
spring:
rabbitmq:
host: 192.168.200.129 # ip
port: 5672
username: guest
password: guest
virtual-host: /
5.测试类生产方发送信息
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
//1.注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot..mq.hello~~~~~");
}
}
消费方
1.导入POM坐标
<!--springboot工程需要继承的父工程-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
2.yml配置文件同上
3.编写启动类
4.消费方消费生产方
@Component
public class RabbitmqConsumer {
@RabbitListener(queues = "boot_queue1")
public void ListenerQueue(Message message){
System.out.println(message);
}
}
RabbitMQ 高级
如何保证消息的高可靠性传输?
1.持久化
• exchange(交换机)要持久化
• queue(队列)要持久化
• message要持久化
2.生产方确认Confirm、return 回退模式
Confirm: 确保生产者能够将消息发送到交换机。通过ConfirmCallBack,其中的方法得知消息是否到达了交换机。
return :当消息从交换机发送到队列中的时候,失败了有回调。通过ReturnCallBack,只有当发送失败的时候才会调用。
3.消费方确认Ack
消费者接收消息的时候,能够在处理成功的时候,手动签收消息。如果处理失败,抛出异常等情况,可以拒签消息,消息会再次发送给消费者。
4.Broker高可用(集群):
搭建rabbitMQ集群
- 安装多个rabbitMQ节点(服务器)
- 配置镜像节点,同步所有节点之间的信息
- 安装HAProxy,配置所有节点的负载均衡,使用MQ连接HAProxy
如何进行消费端限流?为什么要限流?
- 开启ack消息确认机制
- 配置消费端限流,prefetch=“同时处理的消息个数”
- 如果消费端不进行消息的确认,就不在接收消息。
默认情况下消息的消费者会一次性将MQ中累积的大量的消息全部拉取到自己的服务,导致服务在短时间内会处理大量的业务,可能会导致系统服务的崩溃。 所以消费端限流是非常有必要的。
消息成为死信队列的三种情况
1.队列消息长度到达限制
2.消费者拒接消费信息,并且不重回队列
3.原队列存在消息过期的设置,消息到达超时时间而未被消费
延迟队列
即消息进入队列后不会立即被消费,只用到达指定时间后才会被消费
提出需求:
- 下单后,30分钟未支付,取消订单,回滚库存。
- 新用户注册成功7天后,发送短信问候。
TTL+死信队列组合实现
死信交换机,过期时间,路由key
RabbitMQ应用问题
消息可靠性保障
如何能够保证消息的 100% 发送成功?
消息业务方面的可靠性?
消息补偿机制
保证业务逻辑正常完成,除了保证能够正常的发送,接收消息以外,还能够保证基于消息队列的业务成功。
需求:用户下单,订单系统生成订单,发送消息,通知库存系统。库存系统接收消息,扣减库存。
实现:
-
订单系统完成本地订单生成的操作之后,发送消息(订单ID,商品ID,商品数量)
-
库存系统接收该消息,操作数据库,扣减库存。
-
库存系统完成扣减库存,立刻向 消息检查系统 发送已经完成扣减操作。消息检查系统就会将当前消息存入检测系统的数据库。
-
订单系统 延迟一段时间(预估库存系统能够完成操作的时间),向消息检查系统,发送一个和刚才扣减库存相同的消息。
-
消息检查系统 再次接收到 订单系统 延迟消息之后,去检测系统的数据库检查是否已经存在了该消息的处理结果。
-
如果存在当前的消息,说明操作正常。结束。
2. 如果没有当前的消息,说明操作失败。再次调用订单服务,再次发送调用库存的消息。 -
如果库存系统和延迟消息都失败的情况下,检测系统的数据库就没有消息。
设置定时任务: 订单数据库 和 检测数据库 的数据是否一致。再次调用订单服务,再次发送调用库存的消息。
- 如果存在一致不成功的信息,记录一个重发次数,如果达到了3次。通知管理员,人工干预。
问题: 如果消息检查系统消息出现了问题,就会导致消息不停的重发,但是业务已经执行成功了?怎么处理?
-
通过乐观锁,来保障消息的幂等性
扣减库存
库存表
id goods count version
1 手机 10 2
消息内容
orderid=101, goodsid=1,count=2,version=2 (要扣减前的库存表中的version一致)
库存系统,业务操作
update 库存表 set count = count-2,version=version+1 where id =1 and version=2;
库存表
id goods count version
1 手机 8 3
update 库存表 set count = count-2,version=version+1 where id =1 and version=2;
mybatis:
update 库存表 set count = count-?,version=version+1 where id =? and version=?;
思考题: 增删改。 改,删除,新增如何保证幂等性?
新增的幂等性,就是不要使用数据库的主键自增。UUID,雪花算法。