文章目录
该文章已同步收录到我的博客网站,欢迎浏览我的博客网站,xhang’s blog
1.MQ(Message Queue)简介
MQ(message queue),消息队列,遵循FIFO 先入先出原则,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
2.MQ的功能
2.1流量削峰
举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消订单数量限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是性能有所影响。
2.2应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,==如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。==当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,用户感受不到物流系统的故障,提升系统的可用性。
2.3异步处理
有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题, A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。
3.RabbitMQ简介
RabbitMQ是由erlang语言编写的一个消息中间件:它负责接收,存储和转发消息数据。
我们通常谈到消息队列,就会联想到这其中的三者:生产者、消费者和消息队列,生产者将消息发送到消息队列,消费者从消息队列中获取消息进行处理。对于RabbitMQ,它在此基础上做了一层抽象,引入了交换机exchange的概念,交换机是作用于生产者和消息队列之间的中间桥梁,它起了一种消息路由的作用,也就是说生产者并不和消息队列直接关联,而是先发送给交换机,再由交换机路由到对应的队列,至于它是根据何种规则路由到消息队列的,就是我们下面需要介绍的内容了。这里的生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange(交换器)的Channel(信道),将消息发送给Exchange,Exchange根据路由规则,将消息转发给指定的消息队列。消息队列储存消息,等待消费者取出消息,消费者通过建立与消息队列相连的Channel,从消息队列中获取消息。
4.RabbitMQ架构模型(4大核心组件)
- 生产者
生产者就是投递消息的一方。生产者创建消息,然后发布到 RabbitMQ 中。
消息一般可以包含 2 个部分:消息体和标签(Label)。消息体也可以称之为 payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个 JSON 字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由 RabbitMQ,RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者(Consumer)。
生产者发送消息的流程:
-
生产者连接 RabbitMQ ,建立 TCP 连接 ( Connection) ,开启信道/通道( Channel )
-
生产者声明一个 Exchange (交换器),并设置相关属性,比如交换器类型、是否持久化等
-
生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
-
生产者通过 bindingKey (绑定 Key )将交换器和队列绑定( binding )起来
-
生产者发送消息至 RabbitMQ Broker ,其中包含 routingKey (路由键)、交换器等信息
-
相应的交换器根据接收到的 routingKey 查找相匹配的队列。
-
如果找到,则将从生产者发送过来的消息存入相应的队列中。 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
-
关闭信道。
-
关闭连接。
-
交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。
交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。
- 队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。
- 消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序
。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
消费者获取消息的流程:
- 消费者连接到RabbitMQ Broker,建立一个连接(Connection ),开启一个信道(Channel) 。
- 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及 做一些准备工作
- 等待 RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
- 消费者确认 ( ack) 接收到的消息。
- RabbitMQ 从队列中删除相应己经被确认的消息。
- 关闭信道。
- 关闭连接。
5.RabbitMQ的工作原理
- Channel(信道):多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道。
- Producer(消息的生产者):向消息队列发布消息的客户端应用程序。
- Consumer(消息的消费者):从消息队列取得消息的客户端应用程序。
- Message(消息):消息由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)等。
- Routing Key(路由键):消息头的一个属性,用于标记消息的路由规则,决定了交换机的转发路径。最大长度255 字节。
- Queue(消息队列):
存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。它是消息的容器,也是消息的终点。
一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将消息取走。需要注意,当多个消费者订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,每一条消息只能被一个订阅者接收。 - Exchange(交换机|路由器):
提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。
交换器用于转发消息,它不会存储消息 ,如果没有 Queue绑定到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。交换器有四种消息调度策略,分别是fanout, direct, topic, headers。 - BindingKey(绑定):
用于建立Exchange和Queue之间的关联。
一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表。
routingKey和bindingKey的关系
routingkey和 bindingKey是进行相互匹配的关系,bindinKey是queue和exchange绑定的关系,routingkey是发消息带来的路由。然后发消息的时候,根据消息带的routingKey 和 bindingKey做精确匹配或模糊匹配。最后,确定消息投递到哪个queue中
6.RabbitMQ的安装
Downloading and Installing RabbitMQ — RabbitMQ
6.1安装docker环境
- 搭建gcc环境(gcc是编程语言译器)
yum -y install gcc
yum -y install gcc-c++
- 安装需要的软件包
yum install -y yum-utils
- 安装镜像仓库
官网上的是
但是因为docker的服务器是在国外,所以有时候从仓库中下载镜像的时候会连接被拒绝或者连接超时的情况,所以可以使用阿里云镜像仓库
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
- 更新yum软件包索引
yum makecache fast
- 安装docker引擎
yum install docker-ce docker-ce-cli containerd.io docker-compose-plugin
- 启动docker
systemctl start docker
6.2安装RabbitMQ
- 拉取RabbitMQ镜像
使用这种镜像rabbitmq中无需安装管理插件就能实现Channels节点的UI统计信息功能。
docker pull rabbitmq:management
- 开发15672端口
15672端口是rabbitmq管理界面ui端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
- 在命令行交互模式下,根据镜像创建容器实例
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq1.0 rabbitmq:latest
- 访问ip + 15672端口
默认Username和Password都是guest
7.Rabbitmq的常用命令
命令 | 说明 |
---|---|
rabbitmqctl version | 查看rabbitmq的版本 |
rabbitmqctl status | 查看rabbitmq的服务状态 |
rabbitmqctl list_bindings | 查看绑定情况 |
rabbitmqctl list_channels | 查看信道情况 |
rabbitmqctl list_connections | 查看连接信息 |
rabbitmqctl list_consumers | 查看消费者 |
rabbitmqctl list_exchanges | 查看交换机 |
rabbitmqctl list_queues | 查看队列 |
rabbitmqctl delete_queue 队列名 | 删除队列 |
rabbitmqctl add_user 用户名 密码 | 添加用户名和密码 |
rabbitmqctl set_user_tags 用户名 administrator | 赋予普通用户管理员权限 |
rabbitmqctl list_users | 查看所有用户 |
rabbitmqctl list_user_permissions 用户名 | 查看用户权限 |
rabbitmqctl delete_user 用户名 | 删除用户 |
rabbitmqctl change_password admin 用户名 | 修改用户密码 |
rabbitmqctl join_cluster --ram 主节点name | 加入集群[–ram添加内存模式 默认disk模式] |
rabbitmqctl cluster_status | 查看集群状态 |
rabbitmqctl stop_app | 关闭应用(关闭当前启动的节点) |
rabbitmqctl start_app | 启动应用,和上述关闭命令配合使用,达到清空队列的目的 |
rabbitmqctl reset | 从管理数据库中移除所有数据,例如配置过的用户和虚拟宿主, 删除所有持久化的消息(这个命令要在rabbitmqctl stop_app之后使用) |
8.Rabbitmq的六种工作模式
-
simple简单模式
simple简单模式为一个队列中一条消息,只能被一个消费者消费。 -
Work工作模式
Work工作模式为一个生产者,多个消费者,每个消费者获取到的消息唯一。 -
publish/subscribe订阅模式
publish/subscribe订阅模式为一个生产者发送的消息被多个消费者获取。 -
routing路由模式
routing路由模式为生产者发送的消息主要根据定义的路由规则决定往哪个队列发送。 -
topic主题模式
topic 主题模式为生产者,一个交换机(topicExchange),模糊匹配路由规则,多个队列,多个消费者。 -
RPC模式
RPC模式为客户端 Client 先发送消息到消息队列,远程服务端 Server 获取消息,然后再写入另一个消息队列,向原始客户端 Client 响应消息处理结果。
9.simple简单模式
9.1simple简单模式概念
最简单的消息发送。
特点:
- 点对点模式
9.1生产者
- 创建项目,引入相应插件和依赖
<!-- rabbitmq的相关依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<!-- 操作文件流的依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
- 生产者代码
public static final String QUEUE_NAME = "hello";
@ResponseBody
@RequestMapping("/productor")
public String productor() {
// 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 工厂ip连接RabbitMQ的队列
connectionFactory.setHost("192.168.26.142");
// 用户名
connectionFactory.setUsername("guest");
// 密码
connectionFactory.setPassword("guest");
// 创建连接
try {
Connection connection = connectionFactory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
/*
* 创建一个队列(下面是参数说明)
* 1.队列名称
* 2.durable:队列中的消息是否持久化(存在磁盘当中),默认情况消息存储在内存当中
* 3.exclusive:是否排外的。如果不是排外的,可以使用两个消费者都访问同一个队列。
* 如果是排外的,会对当前队列加锁,其他连接connection是不能访问的,同一个连接的不同channel是可以访问的。
* 如果强制访问会报异常
* 4.autoDelete:是否自动删除,至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
* 5.arguments:设置队列的其他一些参数
* */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
String message = "Hello world";
/*
* 发送一个消息
* 1.交换机
* 2.路由的key是哪个(队列名称)
* 3.其他参数信息
* 4.发送消息的消息体
* */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
return "消息发送完毕";
}
测试创建队列,发送消息
9.2消费者
消费者代码
@ResponseBody
@RequestMapping("/consumer")
public String consumer(){
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.26.142");
// 用户名
connectionFactory.setUsername("guest");
// 密码
connectionFactory.setPassword("guest");
try {
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 接收消息时的回调
DeliverCallback deliverCallback = (consumerTag,message) -> {
System.out.println(new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消费消息被中断");
};
// 消费者接收消息
/*
* 1.消费个队列
* 2.消费成功之后是否要自动应答,true表示自动应答
* 3.消费者接收消费的回调
* 4.消费者取消消费的回调
* */
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
} catch (Exception e) {
e.printStackTrace();
}
return "消息接收完毕";
}
10.work工作模式
10.1work工作模式的概念
在多个消费者之间分配任务
特点:
工作模式
和简单模式
差不多,只需要生产端、消费端、队列。- 一个生产者、一个队列对应
多个消费者
,也就是一对多的关系。 - 在多个消费者之间分配消息(竞争消费者模式 ),类似轮询发送消息,每个消息都只发给一个消费者。
10.2工作队列模式的原理
工作队列的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
10.3工作队列的实现
- 创建工具类
public class RabbitmqUtils {
public static Channel getChannel() throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.26.142");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
- 创建两个工作线程(消费者)
public class WorkQueuePro {
public static void main(String[] args) {
try {
// 创建连接,获取信道
Channel channel = RabbitmqUtils.getChannel();
// 接收消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消费消息被中断");
};
// 接收消息
System.out.println("C1等待接收消息......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
} catch (Exception e) {
}
}
}
- 创建生产者
public class WorkQueueCon {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接,获取信道
Channel channel = RabbitmqUtils.getChannel();
/*
* 创建一个队列(下面是参数说明)
* 1.队列名称
* 2.durable:队列中的消息是否持久化(存在磁盘当中),默认情况消息存储在内存当中
* 3.exclusive:是否排外的。如果不是排外的,可以使用两个消费者都访问同一个队列。
* 如果是排外的,会对当前队列加锁,其他连接connection是不能访问的,同一个连接的不同channel是可以访问的。
* 如果强制访问会报异常
* 4.autoDelete:是否自动删除,至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
* 5.arguments:设置队列的其他一些参数
* */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 从控制台获取到信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
/*
* 发送一个消息
* 1.交换机
* 2.路由的key是哪个(队列名称)
* 3.其他参数信息
* 4.发送消息的消息体
* */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完成:"+message);
}
}
}
- 测试工作队列的轮询分发消息
生产者控制台
工作线程控制台(消费者)
11.消息应答机制
11.1消息应答的概念
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它死亡了,那么该消息就会丢失。
RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
11.2消息应答的两种模式
11.2.1自动应答
默认情况下,rabbitmq开启了消息的自动应答。此时,一旦rabbitmq将消息分发给了消费者,就会将消息从内存中删除。这种情况下,如果正在执行的消费者被“杀死”或“崩溃”,就会丢失正在处理的消息。
11.2.2手动应答
rabbitmq将消息发送给消费者,消费者接受并处理完一个消息后,会发送应答给rabbitmq,rabbitmq收到应答后,会将该条消息从内存中删除。如果一个消费者在处理消息的过程中“崩溃”,rabbitmq没有收到应答,那么”崩溃“前正在处理的这条消息会重新被分发到别的消费者。
11.2.3手动应答的方法
使用手动应答时,需要把autoAck属性设置为false,然后进行手动应答。
消息手动应答 有如下几个方法
方法 | 说明 |
---|---|
Channel.basicAck | 用于肯定确认(RabbitMQ已知道该消息并且成功的处理消息, 可以将其丢弃了) |
Channel.basicNack | 用于否定确认 |
Channel.basicReject | 用于否定确认(与Channel.basicNack相比少一个Multiple参数不处理该消息了直接拒绝,可以将其丢弃了) |
参数Multiple说明:
手动应答的好处是可以批量应发并且减少网络阻塞
multiple 的 true 和 false 代表不同意思
true 代表批量应答 channel 上未应答的消息 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答 。
false 同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答。
11.3消息重新入队
==如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。==如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
11.4消息重新入队-手动应答的实现
- 生产者代码
public class handOperatePro {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建信道
Channel channel = RabbitmqUtils.getChannel();
// 声明一个队列
channel.queueDeclare(ACK_QUEUE_NAME,false,false,false,null);
// 从控制台输入信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
// 发布信息
channel.basicPublish("",ACK_QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}
}
}
- 工作线程1(消费者1)
public class handOperateCon1 {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtils.getChannel();
System.out.println("C1等待接收消息处理时间较短");
// 接收消息的回调
DeliverCallback deliverCallback = (consumerTag, message)->{
// 让当前线程休眠1秒
SleepUtils.sleep(1);
System.out.println("C1接受到的消息:"+new String(message.getBody(),"UTF-8"));
/*
* 手动应答
* 1.消息的标记
* 2.是否批量应答
* */
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
// 手动应答
channel.basicConsume(ACK_QUEUE_NAME,deliverCallback,(consumerTag -> {
System.out.println("消费者取消消费接口的回调");
}));
}
}
- 工作线程2(消费者2)
public class handOperateCon2 {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtils.getChannel();
System.out.println("C2等待接收消息处理时间较长");
// 接收消息的回调
DeliverCallback deliverCallback = (consumerTag, message)->{
// 让当前线程休眠10秒
SleepUtils.sleep(10);
System.out.println("C2接受到的消息:"+new String(message.getBody(),"UTF-8"));
/*
* 手动应答
* 1.消息的标记
* 2.是否批量应答
* */
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
// 手动应答
channel.basicConsume(ACK_QUEUE_NAME,deliverCallback,(consumerTag -> {
System.out.println("消费者取消消费接口的回调");
}));
}
}
进行测试:
生产者依次在信道中发送信息,并由生产者1和生产者2进行接收。在C2等待接收消息D时,使消费者2死亡,可以发现消息D由消费者1接收。即实现了消息重新入队。
12.Rabbitmq持久化
12.1持久化的概念
如何保障当 Rabbitmq 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 Rabbitmq 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
11.2队列持久化
之前我们创建的队列都是非持久化的,rabbitmq 如果重启的话,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化。
但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误
未持久化之前:
删除此队列,重新创建此队列,并设置持久化
12.3消息持久化
队列是存放消息的容器,要想让消息实现持久化需要在消息生产者添加消息的时候添加属性MessageProperties.PERSISTENT_TEXT_PLAIN
。
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。
生产者完整代码:
public class handOperatePro {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建信道
Channel channel = RabbitmqUtils.getChannel();
// 声明一个队列
channel.queueDeclare(ACK_QUEUE_NAME,true,false,false,null);
// 从控制台输入信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
// 发布信息
channel.basicPublish("",ACK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}
}
}
13.不公平分发
在Rabbitmq的工作模式之一—work工作模式中, RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在接收处理消息,这种分配方式在这种情况下其实就不太好。
但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。 为了避免这种情况,我们可以设置参数 channel.basicQos(1)
;
在消费者代码设置不公平分发
测试:
生产者发送消息:
C1消费者:
C2消费者:
14.预取值
14.1预取值的概念
预取值就是设置消费者信道最大传输信息数,实现不公平分发。当消息由消费者处理完之后就再次从队列中获取消息,达到预取值。
14.2预取值的设置方式
在设置不公平分发时,信道Channel有方法basicQos,其参数PrefetchCount为0时表示轮询分发,为1时表示不公平分发,当PerfetchCount的值大于1时,就表示设置不公平分发并设置预取值。
测试:
消费者C1处理消息的时间短,设置其预取值为2
消费者C1处理消息的时间长,设置其预取值为5
生产者在短时间内向队列中存入7条消息
消费者C1:
消费者C2:
查看队列中两个消费者的预取值
15.发布确认模式
15.1发布确认的原理
在数据持久化中,生产者设置了队列持久化、消息持久化,但依然存在消息被传送到队列上,还没来得及存储在磁盘上,队列就宕机了,这种情况下消息也是会丢失的。所以在之前两步的基础上还是进行第三步:发布确认。队列持久化、消息持久化、发布确认三步操作加一起才能保证消息是不丢失的。
发布确认的原理:生产者将信道设置成 confirm (发布确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker(代理) 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
15.2开启确认发布的方法
发布确认默认是没有开启的,如果要开启需要信道Channel调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法。
15.3发布确认的方式
15.3.1单个发布确认
这是一种简单的确认方式,它是一种同步发布确认的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirms() 与 waitForConfirmsOrDie() ,可以指定时间参数,这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。只是waitForConfirmsOrDie异常后信道被关闭,生产者发布不能继续发布消息。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。
waitForConfirms和waitForConfirmsOrDie作用和区别
发布消息后通过执行channel.waitForConfirmsOrDie(long)方法或者channel.waitForConfirms(long)等待代理的确认,都具有阻塞性,只是waitForConfirmsOrDie异常后信道被关闭,生产者发布不能继续发布消息,这两个个方法的参数就是确认的超时时间。如果未在超时时间内消息代理确认该消息,则该方法将引发超时的异常。
代码实现:
/*
* 单个确定
* */
public static void individualConfirm() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitmqUtils.getChannel();
// UUID生成队列名称
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
// 开启发布确定
channel.confirmSelect();
long start = System.currentTimeMillis();
for (Integer i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
// 等待单个消息的发布确定
boolean flag = channel.waitForConfirms();
if (flag){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发送" + MESSAGE_COUNT + "条数据共耗时" + (end - start) + "ms");
}
15.3.2批量发布确认
与单个发布确认消息相比,批量发布确认先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种 方案仍然是同步的,也一样阻塞消息的发布。
/*
* 批量发布确认
* */
public static void multipleConfirm() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitmqUtils.getChannel();
// UUID生成队列名称
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
// 开启发布确定
channel.confirmSelect();
long start = System.currentTimeMillis();
for (Integer i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
// 发送100条消息的时候,批量发布确认一次
if (i % 100 == 0){
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("批量发布确认发送" + MESSAGE_COUNT + "条数据共耗时" + (end - start) + "ms");
}
15.3.3异步发布确认
异步发布确认相较于单个发布确定和批量发布确认编程逻辑要复杂,但是可靠性和效率都是最好的。 他是利用ConfirmCallback回调函数来达到消息可靠性传递的。
// 异步发布确认
public static void asynConform() throws IOException, TimeoutException {
Channel channel = RabbitmqUtils.getChannel();
// UUID生成队列名称
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
// 开启发布确定
channel.confirmSelect();
long start = System.currentTimeMillis();
// 消息确定成功的回调函数
ConfirmCallback ackCallback = (long deliveryTag, boolean multiple) -> {
System.out.println("确认的消息编号:" + deliveryTag);
};
// 消息确定失败的回调函数
ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> {
System.out.println("未确认的消息编号:" + deliveryTag);
};
// 消息监听器(异步通知)
channel.addConfirmListener(ackCallback,nackCallback);
for (Integer i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("异步发布确认发送" + MESSAGE_COUNT + "条数据共耗时" + (end - start) + "ms");
}
15.3.4处理异步未确认消息
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentLinkedQueue(并发链路队列) 这个队列在ConfirmCallbacks (确认回调)与发布线程之间进行消息的传递。
15.3.5三种发布确认方式的比较
-
单独发布确认
同步等待确认,实现简单,但是吞吐量十分有限。
-
批量发布确认
批量同步等待确认,实现简单,吞吐量较大,但是很难找出未确认的消息。
-
异步发布确认
可靠性和性能最好,在出现未确认消息时容易处理,但是实现困难。
16.交换机
16.1交换机的概念
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是把他们放到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
16.2交换机的类型
- 直连交换机:
Direct exchange
- 扇出交换机:
Fanout exchange
- 主题交换机:
Topic exchange
- 首部交换机:
Headers exchange(比较少用)
16.3无名交换机
在创建队列时,第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能由路由发送到队列中其实是由 routingKey(bindingkey)
绑定 key 指定的。
16.4临时队列
临时队列:一旦我们断开了消费者的连接,队列将被自动删除。
创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();
16.5绑定(bindings)
binding是exchange(交换机)和queue(队列)之间的桥梁,由binding确定交换机和队列之间的绑定关系。
测试:
创建一个新的交换机和队列,然后将双方进行绑定
16.6Fanout交换机(发布/订阅模式)
16.6.1Fanout交换机简介
Fanout类型的交换机(发布/订阅模式)routingKey是空串,是将接收到的所有消息发送到它绑定的所有队列中。
16.6.2Fanout实现发布/订阅
生产者:
public class PublishSubscribePro {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtils.getChannel();
System.out.println("生产者准备发出消息......");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
// 发布消息(routingKey为空)
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("生产者发出消息:" + message);
}
}
}
消费者C1:
public class PublishSubscribeFanoutCon1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取信道
Channel channel = RabbitmqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 声明临时队列:临时队列在与消费者断开连接后会自动删除
String queueName = channel.queueDeclare().getQueue();
// 将交换机和队列绑定
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("消费者C1等待接收消息......");
// 接收消息回调函数
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者C1接收到得消息:" + new String(message.getBody()));
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
}
}
消费者C2:
public class PublishSubscribeFanoutCon2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取信道
Channel channel = RabbitmqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 声明临时队列:临时队列在与消费者断开连接后会自动删除
String queueName = channel.queueDeclare().getQueue();
// 将交换机和队列绑定
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("消费者C2等待接收消息......");
// 接收消息回调函数
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者C2接收到得消息:" + new String(message.getBody()));
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
}
}
测试:
生产则:
消费者C1:
消费者C2:
16.7Direct交换机(路由模式)
16.7.1Direct交换机简介
交换机可以通过路由(routingKey)与队列进行绑定,在接收到生产者发来消息后,通过路由发送给指定队列,从而达到指定消费者消费。,与fanout交换机不同的是,direct交换机的routingKey是不同的。
16.7.2多重绑定
使用相同的routingKey绑定多个队列是完全合法的。在下面的示例中,我们可以在 X 和 Q1 之间添加一个routingKey—black。在这种情况下,Direct交换机的行为类似于Fanout交换机,并将消息发送到所有绑定的队列。路由routingKey为black的消息将同时传递到 Q1 和 Q2。
16.7.3Direct交换机实现路由模式
交换机和队列之间的关系
消费者console的代码:
public class RoutingCon1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取信道
Channel channel = RabbitmqUtils.getChannel();
// 声明direct交换机
channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明队列
channel.queueDeclare("console",false,false,false,null);
// 交换机绑定队列,路由模式routingKey不同
channel.queueBind("console",ROUTING_EXCHANGE_NAME,"info");
channel.queueBind("console",ROUTING_EXCHANGE_NAME,"warning");
// 接收消息回调函数
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("console_info_warning接收到得消息:" + new String(message.getBody()));
};
// 接收消息
channel.basicConsume("console",true,deliverCallback,consumerTag -> {});
}
}
消费者disk代码:
public class RoutingCon2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取信道
Channel channel = RabbitmqUtils.getChannel();
// 声明direct交换机
channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明队列
channel.queueDeclare("disk",false,false,false,null);
// 交换机绑定队列
channel.queueBind("disk",ROUTING_EXCHANGE_NAME,"error");
// 接收消息回调函数
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("disk_error接收到得消息:" + new String(message.getBody()));
};
// 接收消息
channel.basicConsume("disk",true,deliverCallback,consumerTag -> {});
}
}
生产者代码:
public class RoutingPro {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtils.getChannel();
System.out.println("生产者准备发出消息......");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
// 发布消息
channel.basicPublish(ROUTING_EXCHANGE_NAME,"info",null,message.getBytes());
System.out.println("生产者发出消息:" + message);
}
}
}
测试1:生产者通过交换机direct_logs
,并且routingKey为info
发送信息。由消费者C1获取到消息,因为交换机direct_logs
绑定的其中一个routingKey为info
。
生产者:
消费者C1:
测试2:同上,测试routingKey为error
的情况
生产者:
消费者C2:
16.8 Topic交换机(主题模式)
16.8.1Topic交换机的概念
发送到 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。
binding key也必须采用相同的形式。topic交换机背后的逻辑类似于direct交换机——使用特定 routing key 发送的消息将被传递到与匹配binding key绑定的所有队列。但是binding key有两个重要的特殊情况:
- *表示匹配任意的一个单词
- #表示匹配0个或人一个单词
对于上面的交换机,有以下测试:
routingKey | 说明 |
---|---|
quick.orange.rabbit | 被队列 Q1Q2 接收到 |
lazy.orange.elephant | 被队列 Q1Q2 接收到 |
quick.orange.fox | 被队列 Q1 接收到 |
lazy.brown.fox | 被队列 Q2 接收到 |
lazy.pink.rabbit | 虽然满足两个绑定但只被队列 Q2 接收一次 |
quick.brown.fox | 不匹配任何绑定不会被任何队列接收到会被丢弃 |
quick.orange.male.rabbit | 是四个单词不匹配任何绑定会被丢弃 |
lazy.orange.male.rabbit | 是四个单词但匹配 Q2 |
注意点:
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 交换机。
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 交换机。
16.8.2Topic交换机实现主题模式
要求如下:
交换机名为topic_logs,有两个队列,分别为Q1,Q2。交换机和队列之间通过bindingKey进行绑定。首先创建两个消费者C1、C2,在创建消费者的同时创建交换机和队列,并将交换机和队列进行绑定。最后创建生产者,生产者向路由中发送消息,发送的消息就是上面的测试。
消费者C1:
public class TopicCon1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取信道
Channel channel = RabbitmqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 声明队列
channel.queueDeclare("Q1",false,false,false,null);
// 将交换机和队列通过bindingKey进行绑定
channel.queueBind("Q1",TOPIC_EXCHANGE_NAME,"*.orange.*");
System.out.println("消费者C1等待接收消息......");
// 接收消息的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者C1接收到消息:" + new String(message.getBody()) + "当前的routingKey为:" + message.getEnvelope().getRoutingKey());
};
// 消费消息
channel.basicConsume("Q1",true,deliverCallback,consumerTag -> {});
}
消费者C2:
public class TopicCon2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取信道
Channel channel = RabbitmqUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 声明队列
channel.queueDeclare("Q2",false,false,false,null);
// 将交换机和队列通过bindingKey进行绑定
channel.queueBind("Q2",TOPIC_EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind("Q2",TOPIC_EXCHANGE_NAME,"lazy.#");
System.out.println("消费者C2等待接收消息......");
// 接收消息的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者C2接收到消息:" + new String(message.getBody()) + "当前的routingKey为:" + message.getEnvelope().getRoutingKey());
};
// 消费消息
channel.basicConsume("Q2",true,deliverCallback,consumerTag -> {});
}
}
生产者:
public class TopicPro {
public static void main(String[] args) throws IOException, TimeoutException {
// 获取信道
Channel channel = RabbitmqUtils.getChannel();
Map<String,String> routingKeyMap = new HashMap<>();
routingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
routingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
routingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
routingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
routingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
routingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
routingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
routingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
// 在信道中存入消息
for (Map.Entry<String, String> routingKeys : routingKeyMap.entrySet()) {
String routingKey = routingKeys.getKey();
String message = routingKeys.getValue();
channel.basicPublish(TOPIC_EXCHANGE_NAME,routingKey,null,message.getBytes());
System.out.println("生产者发出消息:" + message);
}
}
}
生产者控制台:
消费者C1控制台:
消费者C2控制台:
17.死信队列
17.1死信队列的概念
死信就是无法被消费的消息。producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
17.2死信产生的原因
- 消息 TTL(存活时间) 过期
- 队列达到最大长度(队列满了,无法再添加数据到队列中)
- 消息被拒绝(basic.reject 或 basic.nack)并且不放回队列中(requeue=false)
17.3死信队列工作原理
正常情况下消费者通过交换机发送信息到队列当中,队列中的消息再由消费者所处理。
而正常消息队列当中的消息如果出现了死信,那就会通过死信交换机到达死信队列,最后由异常处理消费者所处理。
17.4死信队列实现过程(消息TTL过期)
消费者:
public class deadLetterC1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取信道
Channel channel = RabbitmqUtils.getChannel();
// 2.声明常规交换机
channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 3.声明死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 4.声明常规队列
Map<String, Object> arguments = new HashMap<>();
// 1.在常规队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 2.设置死信routingKey
arguments.put("x-dead-letter-routing-key", "dead");
// 3.设置过期时间10s
arguments.put("x-message-ttl", 10000);
channel.queueDeclare(NORMAL_QUEUE_NAME, false, false, false, arguments);
// 8.声明死信队列
channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null);
// 9.绑定普通队列和交换机
channel.queueBind(NORMAL_QUEUE_NAME, NORMAL_EXCHANGE_NAME, "normal");
// 10.绑定死信队列和交换机
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "dead");
System.out.println("消费者等待接收消息.....");
// 接收消息回调函数
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者C1接收的消息为:" + new String(message.getBody(), "UTF-8"));
};
// 处理消息
channel.basicConsume(NORMAL_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
查看交换机信息:
常规交换机绑定的队列为常规队列,routingKey为normal。
死信交换机绑定的队列是死信队列,routingKey为dead
查看队列信息:
normal_queue队列中设置的有TTL(过期时间)、DLX(死信交换机)和DLK(死信routingKey)
生产者代码:
/**
* 死信队列生产者代码
*/
public class deadLetterPro {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtils.getChannel();
// 死信消息,设置TTL过期时间
AMQP.BasicProperties props =
new AMQP.BasicProperties()
.builder()
.expiration("10000")
.build();
for (int i = 0; i < 10; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE_NAME,"normal",props,message.getBytes());
}
}
}
测试:
启动消费者,生成交换机和队列,并等待接收消息。模拟消费者宕机,等待消息的10s过期时间,观察死信消息是否会到达死信队列。
消息到达常规队列,等待消息的TTL过期
消息TTL过期,成为死信消息,死信消息达到死信队列:
由于现在已经有死信队列,就需要有处理死信消息的消费者。
创建处理死信消息的消费者:
public class deadLetterC2 {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,message) -> {
System.out.println("处理死信消息的消费者开始处理死信消息:" + new String(message.getBody()));
};
channel.basicConsume(DEAD_QUEUE_NAME,true,deliverCallback,consumerTag -> {});
}
}
死信队列消息变为空:
17.5死信队列实现过程(队列达到最大长度)
死信队列的产生原因之一消息TTL过期已经演示过了,下面模拟达到队列的最大长度,当达到队列的最大长度后,剩下的消息就会被变为死信消息,被放到死信队列当中去。
要设置队列的最大长度,只需要在声明常规队列时执行队列的最大长度即可
public class deadLetterC1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取信道
Channel channel = RabbitmqUtils.getChannel();
// 2.声明常规交换机
channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 3.声明死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 4.声明常规队列
Map<String, Object> arguments = new HashMap<>();
// 1.在常规队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 2.设置死信routingKey
arguments.put("x-dead-letter-routing-key", "dead");
// 3.限制常规队列的长度
arguments.put("x-max-length",6);
channel.queueDeclare(NORMAL_QUEUE_NAME, false, false, false, arguments);
// 8.声明死信队列
channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null);
// 9.绑定普通队列和交换机
channel.queueBind(NORMAL_QUEUE_NAME, NORMAL_EXCHANGE_NAME, "normal");
// 10.绑定死信队列和交换机
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "dead");
System.out.println("消费者等待接收消息.....");
// 接收消息回调函数
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("消费者C1接收的消息为:" + new String(message.getBody(), "UTF-8"));
};
// 处理消息
channel.basicConsume(NORMAL_QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
17.6死信队列实现过程(消息被拒绝)
要拒绝某些消息,就需要在消费者的接收消息时的回调函数中对消息进行拒绝
模拟消费者C1拒绝接收消息info0
public class deadLetterC1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取信道
Channel channel = RabbitmqUtils.getChannel();
// 2.声明常规交换机
channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 3.声明死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 4.声明常规队列
Map<String, Object> arguments = new HashMap<>();
// 1.在常规队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 2.设置死信routingKey
arguments.put("x-dead-letter-routing-key", "dead");
/*// 3.限制常规队列的长度
arguments.put("x-max-length",6);*/
channel.queueDeclare(NORMAL_QUEUE_NAME, false, false, false, arguments);
// 8.声明死信队列
channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null);
// 9.绑定普通队列和交换机
channel.queueBind(NORMAL_QUEUE_NAME, NORMAL_EXCHANGE_NAME, "normal");
// 10.绑定死信队列和交换机
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "dead");
System.out.println("消费者等待接收消息.....");
// 接收消息回调函数
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(),"UTF-8");
if ("info0".equals(msg)){
System.out.println("消费者拒绝接收当前消息:" + msg);
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else{
System.out.println("消费者C1接收的消息为:" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
};
// 处理消息
channel.basicConsume(NORMAL_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
}
}
死信队列中含有一条死信消息:
消费者C2处理死信消息:
18.SpringBoot整合Rabbitmq
18.1引入相应依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
18.2编写配置文件
spring:
rabbitmq:
host: 192.168.26.142
port: 5672
username: guest
password: guest
19.延迟队列
19.1延迟队列的概述
延迟队列内部是有序的,最重要的特性就体现在它的延迟属性上,延迟队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。
延迟队列就是给队列设置了一个过期时间,延迟队列一般都是配合TTL和死信队列来实现的
19.2延迟队列的使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎 使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果 数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
例如下面是用户购票的实例:
用户购买过票后创建订单,提醒用户付款并将订单信息放入rabbitmq的延迟队列中,在设定的延迟时间过期后查询订单状态,若用户未付款,则取消订单并更新数据库。
19.3延迟队列实现案例(基于死信)
在下面的案例中,普通交换机绑定了两个普通队列,一个队列的过期时间为10s,另一个队列的过期时间为40s。
在两个普通队列中指定死信交换机,再将死信交换机和死信队列进行绑定。
- 配置文件类代码,在配置类中声明交换机和队列,并对交换机和队列进行绑定。
/**
* 延迟队列配置类
*/
@Configuration
public class TTLQueueConfig {
// 声明普通交换机
@Bean
public DirectExchange xExchange(){
return new DirectExchange(NORMAL_EXCHANGE_NAME);
}
// 声明死信交换机
@Bean
public DirectExchange yExchange(){
return new DirectExchange(DEAD_EXCHANGE_NAME);
}
// 声明两个普通队列,并在普通队列里指定死信交换机
@Bean
public Queue queueA(){
Map<String, Object> arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
// 设置routingKey
arguments.put("x-dead-letter-routing-key","YD");
// 设置过期时间
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(NORMAL_QUEUE1_NAME).withArguments(arguments).build();
}
@Bean
public Queue queueB(){
Map<String, Object> arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
// 设置routingKey
arguments.put("x-dead-letter-routing-key","YD");
// 设置过期时间
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(NORMAL_QUEUE2_NAME).withArguments(arguments).build();
}
// 声明死信队列
@Bean
public Queue queueD(){
return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
}
// 绑定普通交换机和普通队列
@Bean
public Binding queueABindingXExchange(){
return BindingBuilder.bind(queueA()).to(xExchange()).with(NORMAL_ROUTING_KEY);
}
@Bean
public Binding queueBBindingXExchange(){
return BindingBuilder.bind(queueB()).to(xExchange()).with(NORMAL_ROUTING_KEY);
}
// 绑定死信交换机和死信队列
@Bean
public Binding queueDBindingYExchange(){
return BindingBuilder.bind(queueD()).to(yExchange()).with(DEAD_ROUTING_KEY);
}
}
- 在生产者中发送消息
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable("message") String message){
log.info("当前时间是{},发送一条消息给两个普通队列,消息为:{}", new Date().toString(),message);
rabbitTemplate.convertAndSend(
NORMAL_EXCHANGE_NAME,
NORMAL1_ROUTING_KEY,"过期时间为10s的队列发送的消息为:" + message);
rabbitTemplate.convertAndSend(
NORMAL_EXCHANGE_NAME,
NORMAL2_ROUTING_KEY,"过期时间为40s的队列发送的消息为:" + message);
}
- 消费者接收消息
@Slf4j
@Component
public class DeadQueueConsumer {
// 接收消息
@RabbitListener(queues = DEAD_QUEUE_NAME)
public void receiveMessage(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间是{},收到死信队列的消息为:{}", new Date().toString(),msg);
}
}
19.4延迟队列案例优化
19.4.1先前的延迟队列存在的问题
在之前延迟队列的案例中,一共有两个带有TTL过期时间的队列,但是如果这种情况下每次增加一个新的时间需求就需要创建一个新的TTL队列,当时间需求数量很大的时候就需要创建很多TTL队列。
为了解决设置过多队列的问题,采用以下方案。
取消在队列中设置TTL过期时间,而在发送信息的时候设置消息的过期时间,从而实现延迟队列。
19.4.2优化实现
- 在配置类中声明队列QC,并且与交换机X进行绑定。在队列QC中声明死信交换机,和routingKey,但不设置TTL过期时间。
// 发送带过期时间的消息
@GetMapping("/sendTTLMessage/{message}/{TTL}")
public void sendTTLMessage(@PathVariable String message,@PathVariable String TTL){
log.info("当前时间是{},发送一条时长{}毫秒消息给TTL普通队列,消息为:{}", new Date().toString(),TTL,message);
rabbitTemplate.convertAndSend(
NORMAL_EXCHANGE_NAME,
NORMAL_ROUTING3_KEY,
message,msg -> {
// 发送消息的时候,延迟时长
msg.getMessageProperties().setExpiration(TTL);
return msg;
});
}
测试发送请求1:
http://localhost:8080/ttl/sendTTLMessage/第一条消息/20000
http://localhost:8080/ttl/sendTTLMessage/第二条消息/2000
可以发现第一条消息的TTL过期时间为20s,而第二条消息的TTL过期时间为2s,按理说应该是第二条消息首先到达死信队列,当时第二条消息却和第一条消息同时到达。
这是因为队列是先进先出的,在第一条消息未被发送完的时候,队列处于阻塞状态,即使第二条消息执行完了也不能发出。
19.5延迟交换机插件实现延迟队列
19.5.1问题描述
之前我们关于消息设置过期时间都是在消息本身以及队列的维度上来进行设置,这两个维度都在不同程度上有一些问题。
问题一:当我们的业务比较复杂的时候, 需要针对不同的业务消息类型设置不同的过期时间策略, name必然我们也需要为不同的队列消息的过期时间创建很多的Queue的Bean对象, 当业务复杂到一定程度时, 这种方式维护成本过高;
问题二:就是队列的先进先出原则导致的问题,当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候,消息是串行被消费的,所以必然是等到先进入队列的消息的过期时间结束, 后进入队列的消息的过期时间才会被监听,然而实际上这个消息早就过期了,这就导致了本来过期时间为3秒的消息,实际上过了13秒才会被处理,这在实际应用场景中肯定是不被允许的;
要解决以上问题,就需要使用延迟交换机插件来实现。
19.5.2延迟交换机的原理
之前设置TTL过期时间是在消息本身和队列当中,现在延迟的实现是在交换机阶段。
该类型消息支持延迟投递机制,消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
19.5.3安装延迟交换机插件
在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange
)来实现延迟队列。
插件GitHub地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange (github.com)
- 下载与Rabbitmq版本相对应的延迟交换机
- 将插件发送到宿主机的指定目录下
- 将宿主机上的插件发送到容器中的指定路径下
docker cp /root/plugins a82402ac8852:/plugins
- 进入容器在插件目录下查看插件
- 在插件目录中启动延迟队列插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 重启Rabbitmq服务
docker restart 容器ID
- 检验延迟队列插件是否安装成功
在rabbitmq的UI界面,创建交换机选项中多了一种交换机类型
19.5.4基于延迟交换机的延迟队列
- 创建配置类,定义延迟交换机和队列,再进行绑定
@Configuration
public class DelayExchangeConfig {
// 声明自定义延迟交换机
@Bean
public CustomExchange delayExchange(){
Map<String, Object> arguments = new HashMap<>();
// 指定发消息的方式为direct
arguments.put("x-delayed-type","direct");
return new CustomExchange(
DELAY_EXCHANGE_NAME,
"x-delayed-message",
true,
false,
arguments);
}
// 声明队列
@Bean
public Queue delayQueue(){
return QueueBuilder.durable(DELAY_QUEUE_NAME).build();
}
// 绑定交换机和队列
@Bean
public Binding delayExchangeBindingDelayQueue(){
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY).noargs();
}
}
- 生产者代码,发送消息的时候指定延迟时间
// 向延迟交换机中发送消息
@GetMapping("/sendTTLMessage/{message}/{delayTime}")
public void sendDelayTimeMsg(@PathVariable String message,@PathVariable Integer delayTime){
log.info("当前时间是{},发送一条时长{}毫秒消息给延迟队列delay.queue,消息为:{}", new Date().toString(),delayTime,message);
rabbitTemplate.convertAndSend(
DELAY_EXCHANGE_NAME,
DELAY_ROUTING_KEY,message,msg -> {
// 发送消息的时候设置延迟时长
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
- 消费者代码,指定监听的延迟队列,接收消息
@Slf4j
@Component
/**
* 延迟交换机消费者
*/
public class DelayQueueConsumer {
// 指定监听延迟队列
@RabbitListener(queues = DELAY_QUEUE_NAME)
public void receiveDelayMessage(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
}
}
- 测试延迟交换机
http://localhost:8080/ttl/sendMessageToDelayExchange/第一条消息/20000
http://localhost:8080/ttl/sendMessageToDelayExchange/第二条消息/2000
19.6延迟队列总结
延迟队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延迟队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。
另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。 当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。
20.发布确定高级
20.1问题引入
在之前的发布确定学习中,我们学习到了单个确认、批量确认、异步确认三种确认方式,也通过实操来比较三种确认方式的性能,其中异步确认是性能最好的。
现在我们考虑以下这种情况,在生产者发送消息的过程中,可能会出现交换机宕机、队列宕机、或者两者一同宕机的情况。这时候消息就会丢失,在不使用集群的情况下,来看看如何解决这一问题。
20.2消息回调(生产者-交换机)
-
与之前异步发布确定的方式相同,都是使用ConfirmCallback回调函数来实现消息回调。
-
设置配置文件,在消息发送成功或消息发送失败的时候都触发回调函数
spring.rabbitmq.publisher.confirm.type的参数讲解:
- correlated:发布消息成功或失败到交换机后会触发回调方法
- none:禁止发布确定模式,是默认值
- simple:和单个发布确定的模式相同
spring:
rabbitmq:
host: 192.168.26.142
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated
- 交换机和队列配置
@Configuration
public class ConfirmConfig {
// 声明交换机
@Bean
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
// 声明队列
@Bean
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// 绑定交换机和队列
@Bean
public Binding confirmExchangeBindingConfirmQueue(){
return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY);
}
}
- 生产者发送消息
// 发布确认高级
@GetMapping("/confirmAdvanced/{message}")
public void confirmAdvanced(@PathVariable String message){
//设置消息的ID
CorrelationData correlationData = new CorrelationData("01");
rabbitTemplate.convertAndSend(
CONFIRM_EXCHANGE_NAME,
CONFIRM_ROUTING_KEY,
message,
correlationData);
log.info("生产者发布消息:" + message);
}
- 实现回调函数接口,编写消息发送成功/失败时的业务逻辑
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
// 将类中的接口指向所实现接口的类
@Resource
private RabbitTemplate rabbitTemplate;
// 在spring容器初始化的时候执行该方法,进行实现类注入
@PostConstruct
public void init(){
// 注入
rabbitTemplate.setConfirmCallback(this);
}
/*
* 交换机确定回调方法
* 1.correlationData 保存回调消息的ID及相关信息
* 2.ack 交换机收到消息为ture
* 3.cause null
*
* 交换机接受失败
* 1.correlationData 保存回调消息的ID及相关信息
* 2.ack 交换机收到消息为false
* 3.cause 发送失败原因
* */
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String msgId = correlationData.getId() != null ? correlationData.getId() : "";
if (ack){
log.info("交换机已经接受到id为‘{}’的信息",msgId);
}else{
log.info("交换机未接受到id为‘{}’的信息,原因是:{}",msgId,cause);
}
}
}
- 消费者
@Slf4j
@Component
public class ConfirmAdvancedConsumer {
// 监听rabbitmq队列
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveConfirmMsg(Message message){
log.info("从队列中接收到消息:" + new String(message.getBody()));
}
}
-
测试
一:测试消息正常发送时的消息回调
二:测试在发送消息的时候,指定消息发送到未定义的交换机(模拟交换机宕机的情况)
20.3回退消息(交换机-队列)
20.3.1Mandatory(强制)参数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由(在交换机中不能发送到队列),那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何 让无法被路由的消息帮我想办法处理一下?通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
20.3.2在springBoot配置文件中设置回退消息机制
spring:
rabbitmq:
host: 192.168.26.142
port: 5672
username: guest
password: guest
# 交换机确定回调
publisher-confirm-type: correlated
# 在交换机发送的消息不可达时回退消息
publisher-returns: true
20.3.3实现回退消息
- 在配置类中实现回退接口
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
// 将类中的接口指向所实现接口的类
@Resource
private RabbitTemplate rabbitTemplate;
// 在spring容器初始化的时候执行该方法,进行实现类注入
@PostConstruct
public void init(){
// 注入确认回退接口
rabbitTemplate.setConfirmCallback(this);
// 注入回退消息接口
rabbitTemplate.setReturnsCallback(this);
/*
* true:
* 交换机无法将消息进行路由时,会将该消息返回给生产者
* false:
* 如果发现消息无法进行路由,则直接丢弃
*/
rabbitTemplate.setMandatory(true);
}
/*
* 交换机确定回调方法
* 1.correlationData 保存回调消息的ID及相关信息
* 2.ack 交换机收到消息为ture
* 3.cause null
*
* 交换机接受失败
* 1.correlationData 保存回调消息的ID及相关信息
* 2.ack 交换机收到消息为false
* 3.cause 发送失败原因
* */
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String msgId = correlationData.getId() != null ? correlationData.getId() : "";
if (ack){
log.info("交换机已经接受到id为‘{}’的信息",msgId);
}else{
log.info("交换机未接受的信息,原因是:{}",cause);
}
}
/*
* 在消息发送过程中,消息出现不可达的时候将消息回退给生产者
* */
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息’{}‘被交换机’{}‘退回,退回的原因是:{},当前routingKey为:{}",
returned.getMessage(),
returned.getExchange(),
returned.getReplyText(),
returned.getRoutingKey());
}
}
- 生产者
// 发布确认高级
@GetMapping("/confirmAdvanced/{message}")
public void confirmAdvanced(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("01");
rabbitTemplate.convertAndSend(
CONFIRM_EXCHANGE_NAME,
CONFIRM_ROUTING_KEY + "连接断开",
message,
correlationData);
log.info("生产者发布消息:" + message);
}
- 消费者
@Slf4j
@Component
public class ConfirmAdvancedConsumer {
// 监听rabbitmq队列
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveConfirmMsg(Message message){
log.info("从队列中接收到消息:" + new String(message.getBody()));
}
}
21.备份交换机
21.1备份交换机的概念
有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增 加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的 复杂性,该怎么做呢?
前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些无法处理的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理。
通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
21.2备份交换机架构
在普通交换机中,若消息因队列宕机或者routingKey匹配错误,在之前的操作中,普通交换机中的消息就会被会退给生产者,再由生产者重新发送。
在备份交换机架构中,无法发送的消息会被普通交换机发送到备份交换机,备份交换机是Fanout交换机,其与警告队列和备份队列进行绑定。
21.3备份交换机实现
- 在配置文件中声明备份交换机、备份队列、警告队列,并将交换机和队列进行绑定
重要的一点是在普通交换机中指定备份交换机
@Configuration
public class ConfirmConfig {
// 声明交换机
@Bean
public DirectExchange confirmExchange() {
// 在发布确定交换机中指定备份交换机
return ExchangeBuilder.
directExchange(CONFIRM_EXCHANGE_NAME).
durable(true).
withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
}
// 声明队列
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// 绑定交换机和队列
@Bean
public Binding confirmExchangeBindingConfirmQueue() {
return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY);
}
// 声明备份交换机
@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
// 声明备份队列
@Bean
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
// 声明警报队列
@Bean
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
// 将备份交换机和备份队列、报警队列进行绑定
@Bean
public Binding backupBinding(){
return BindingBuilder.bind(backupQueue()).to(backupExchange());
}
@Bean
public Binding warningBinding(){
return BindingBuilder.bind(warningQueue()).to(backupExchange());
}
}
- 报警消费者
@Slf4j
@Component
public class WarningQueueConsumer {
// 监听报警队列
@RabbitListener(queues = WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message){
log.info("报警:发现不可路由的消息:" + message);
}
}
这里可以发现打印的日志中,只有报警消费者处理了不可路由的消息,而没有进行回退消息。
回退消息和备份交换机如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。
22.优先级队列
22.1优先级队列的概念
拥有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的权力。
优先级的指定范围是0~255(但是一般的设置范围是0-10),设置优先级的消息会在队列中重新排序,数值越大越优先被消费。
在rabbitmq中,优先队列有两种概念:
- 队列优先级
- 队列中的消息优先级
队列和队列中的消息要同时设置优先级,并且消息的优先级要比队列的优先级低
22.2优先级队列和优先级消息实现
在rabbitmq的UI界面创建队列的时候有优先级参数
消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序
- 配置类
@Configuration
public class PriorityConfig {
// 声明交换机
@Bean
public DirectExchange priorityExchange(){
return new DirectExchange(PRIORITY_EXCHANGE_NAME);
}
// 声明队列
@Bean
public Queue priorityQueue(){
return QueueBuilder.durable(PRIORITY_QUEUE_NAME).maxPriority(10).build();
}
// 绑定交换机和队列
@Bean
public Binding pxBindingPq(){
return BindingBuilder.bind(priorityQueue()).to(priorityExchange()).with(PRIORITY_ROUTING_KEY);
}
}
- 通过接口向优先级队列中发送消息
// 队列优先级生产者
@GetMapping("/queuePriorityPro")
public void queuePriorityPro() {
for (int i = 1; i <= 10; i++) {
String message = "第" + i + "条消息。";
if (i == 5) {
// 第5条消息设置消息优先级参数为5
rabbitTemplate.convertAndSend(
PRIORITY_EXCHANGE_NAME,
PRIORITY_ROUTING_KEY, message, msg -> {
msg.getMessageProperties().setPriority(5);
return msg;
});
} else {
rabbitTemplate.convertAndSend(
PRIORITY_EXCHANGE_NAME,
PRIORITY_ROUTING_KEY, message);
}
}
log.info("向优先级队列中发送消息成功。");
}
- 通过单元测试消费消息,查看消息的消费顺序
@Test
public void test1(){
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.26.142");
// 用户名
connectionFactory.setUsername("guest");
// 密码
connectionFactory.setPassword("guest");
try {
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 接收消息时的回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消费消息被中断");
};
// 消费者接收消息
/*
* 1.消费个队列
* 2.消费成功之后是否要自动应答,true表示自动应答
* 3.消费者接收消费的回调
* 4.消费者取消消费的回调
* */
channel.basicConsume(PRIORITY_QUEUE_NAME,true,deliverCallback,cancelCallback);
} catch (Exception e) {
e.printStackTrace();
}
}
-
测试结果:
因为第5条消息的优先级最高,所以优先消费第5条消息。
23.惰性队列
23.1惰性队列的概念
惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的 时间,也会阻塞队列的操作,进而无法接收新的消息。
23.2懒惰队列的设置
队列具备两种模式:default 和 lazy。默认的为default 模式,在3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以在生命队列的时候在 channel.queueDeclare 方法的中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
23.3内存开销对比
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB
24.Rabbitmq集群
24.1Rabbitmq集群
24.1.1Rabbitmq集群概述
当单台 RabbitMQ 服务器的处理消息的能力达到瓶颈时,此时可以通过 RabbitMQ 集群来进行扩展,从而达到提升吞吐量的目的。RabbitMQ 集群是一个或多个节点的逻辑分组,集群中的每个节点都是对等的,每个节点共享所有的用户,虚拟主机,队列,交换器,绑定关系,运行时参数和其他分布式状态等信息。
24.1.2普通集群和镜像集群
- 普通集群
对于普通模式,集群中各节点有相同的队列结构,但消息只会存在于集群中的一个节点,对于消费者来说,若消息进入A节点的Queue中,当从B节点拉取时,RabbitMQ会将消息从A中取出,并经过B发送给消费者。
**应用场景:**该模式更适合于消息无需持久化的场景,如日志队列。当队列非持久化,且创建该队列的节点宕机,客户端才可以重连集群其他节点,并重新创建队列。若为持久化,只能等故障节点恢复。缺点:无法解决单点故障问题。
- 镜像集群
与普通模式不同之处时消息实体会主动在镜像节点见同步,而不是在取数据时临时拉取,高可用;该模式下 镜像队列(mirror queue)有一套选举算法,即1个master、n个slaver。 生产者、消费者的请求都会转至master。
**应用场景:**可靠性要求较高场合,如下单、库存队列。缺点:若镜像队列过多,且消息体量大,集群内部网络带宽将会被此种同步通讯所消耗。
24.2Docker搭建Rabbitmq集群
下面将采用3个Rabbitmq服务节点,一个节点作为主节点,另外两个作为从节点。
rabbitmq集群的常用命令:
命令 | 说明 |
---|---|
rabbitmqctl join_cluster --ram 主节点name | 加入集群[–ram添加内存模式 默认disk模式] |
rabbitmqctl cluster_status | 查看集群状态 |
rabbitmqctl stop_app | 关闭应用(关闭当前启动的节点) |
rabbitmqctl start_app | 启动应用,和上述关闭命令配合使用,达到清空队列的目的 |
rabbitmqctl reset | 从管理数据库中移除所有数据,例如配置过的用户和虚拟宿主, 删除所有持久化的消息(这个命令要在rabbitmqctl stop_app之后使用) |
24.2.1普通集群
-
开放对应端口
-
使用镜像创建3个rabbitmq节点
docker run -d --hostname rabbitmq01 --name rabbitmqNode01 -v /root/rabbitmq01:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' rabbitmq:management
docker run -d --hostname rabbitmq02 --name rabbitmqNode02 -v /root/rabbitmq01:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' --link rabbitmqNode01:rabbitmq01 rabbitmq:management
docker run -d --hostname rabbitmq03 --name rabbitmqNode03 -v /root/rabbitmq01:/var/lib/rabbitmq -p 15674:15672 -p 5674:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' --link rabbitmqNode01:rabbitmq01 --link rabbitmqNode02:rabbitmq02 rabbitmq:management
注意点:
- -e 指定环境变量,RABBITMQ_ERLANG_COOKIE=‘rabbitcookie’ 必须设置为相同,因为 Erlang节点间是通过认证Erlang cookie的方式来允许互相通信的。
- –link命令的作用:链接两个容器,使得源容器(被链接的容器)和接收容器(主动去链接的容器)之间可以互相通信
三个rabbitmq节点已经准备完毕:
-
内存节点和磁盘节点的选择
每个RabbitMQ节点,要么是内存节点,要么是磁盘节点。内存节点将所有的队列、交换器、绑定、用户等元数据定义都存储在内存中;而磁盘节点将元数据存储在磁盘中。单节点系统只允许磁盘类型的节点,否则当节点重启以后,所有的配置信息都会丢失。如果采用集群的方式,可以选择至少配置一个节点为磁盘节点,其余部分配置为内存节点,,这样可以获得更快的响应。所以本集群中配置节点1位磁盘节点,节点2和节点3位内存节点。
集群中的第一个节点将初始元数据代入集群中,并且无须被告知加入。而第2个和之后加入的节点将加入它并获取它的元数据。要加入节点,需要进入Docker容器,重启RabbitMQ。
-
将RabbitMQ节点加入到集群
#进入rabbitmq01容器,重新初始化一下,如果是新安装则reset可以忽略重置。
docker exec -it rabbitmqNode01 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
#进入rabbitmq02容器,重新初始化一下,将02节点加入到集群中
docker exec -it rabbitmqNode02 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01 #参数“--ram”表示设置为内存节点,忽略该参数默认为磁盘节点。
rabbitmqctl start_app
exit
#进入rabbitmq03容器,重新初始化一下,将03节点加入到集群中
docker exec -it rabbitmqNode03 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit
未成功,原因未知
24.2.1镜像模式
管理界面配置策略
登录 rabbitmq 管理页面 ——> Admin ——> Policies ——> Add / update a policy
-
name:策略名称
-
Pattern:匹配符,只有一个代表匹配所有。message指同步“message”开头的队列名称
-
Definition:ha-mode=all 为匹配类型,分为3种模式:all(表示所有的queue)
-
Priority:优先级,首先根据priority排序,值越大的优先级越高;相同priority则根据创建时间排序,越晚创建的优先级越高。
Operator Policy 和 User Policy 的区别:
- Operator Policy 是给服务提供商或公司基础设施部门用来设置某些需要强制执行的通用规则
- User Policy 是给业务应用用来设置的规则