RabbitMQ原理、整合,面试题
1. 原理
RabbitMQ:提起消息队列,就会关联到这其中的三者:生产者、消费者和消息队列。
生产者:将消息发送到消息队列。
消费者:从队列中获取消息进行处理。
而RabbitMQ在此基础上做了一层抽象,引入了交换器exchange的概念,交换器作用于生产者和消息队列之间的中间桥梁,它起到了一种消息路由的作用。就是说生产者并不直接和消息队列关联,而是先发给交换器,再由交换器路由到对应的队列。生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange(交换器)的Channel(信道),将消息发送给Exchange,Exchange根据路由规则,将消息转发给指定的消息队列。消息队列存储消息,等待消费者取出消息,消费者通过建立与消息队列相连的Channel,从消费队列中获取消息。
通俗来讲,你可以把它想象成一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确的传递你的信件。
RabbitMQ就是一个邮箱,一个邮局和一个邮递员。
实现原理图:
基本概念:
-
Channel(信道) :多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道。
-
Producer(消息的生产者) :向消息队列发布消息的客户端应用程序。
-
Consumer(消息的消费者) :从消息队列取得消息的客户端应用程序。
-
Message(消息) :消息由消息头和消息体组成。消息体是不透明的,而消息头则是有一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)等。
-
Routing Key(路由键) :消息头的一个属性,用于标记消息的路由规则,决定交换机的转发路径。最大长度 255 字节。
-
Queue(消息队列) :存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。他是消息容器,也是消息终点。一个消息可以投入一个或者多个队列。消息一直在队列里面,等待消费者连接到这个队列将消息取走。需要注意的是:当多个消费者订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,每一条消息只能被一个订阅者接收。
-
Exchange(交换器|路由器) :提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。交换器用于转发消息,他不会存储消息,如果没有Queue绑定到Exchange的话,它会直接丢弃掉Producer发送过来的消息。交换器有四种消息调度策略,分别是:fanout、direct、topic、headers。
-
Binding(绑定) :用于建立Exchange与Queue之间的关联。一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表。
-
Binding Key(绑定键) :用于建立Exchange和Queue的绑定关系,用于匹配Routing Key。最大长度 255 字节。
-
Broker :RabbitMQ Server,服务器实体。
调度策略是指Exchange在收到生产者发送的消息后依据什么规则把消息转发到一个或多个队列中保存。调度策略与三个因素相关:Exchange Type(Exchange的类型),Binding Key(Exchange和Queue的绑定关系),消息的标记信息(Routing Key和headers)。Exchange根据消息的Routing Key和Exchange绑定Queue的Binding Key分配消息。生产者将消息发送给Exchange时,一般会指定一个Routing Key,来指定这个消息的路由规则,而这个Routing Key需要与Exchange Type及Binding Key联合使用才能最终生效。在Exchange Type与Binding Key固定的情况下(一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定Routing Key来决定消息流向哪里。
2. 作用
应用解耦、弹性伸、冗余存储、流量削峰、异步通信、数据同步
3.RabbitMQ五种工作模式
3.1 简单队列
一个生产者对应一个消费者!
/**
* 生产者 发送消息
*/
public class Send {
private static final String QUEUE_NAME="simple_queue";
public static void main(String[] args) throws IOException, TimeoutException{
//获取连接
Connection connection = RabbitUtil.getCon();
//返回一个通道
Channel channel = connection.createChannel();
//第二个参数是否持久化
//第三个参数是否排他 如果一个队列声明为排他队列 该队列仅能对首次声明它的连接可见,并在断开连接时自动删除
//第四个参数 是否自动删除 自动删除的前提是:至少有一个消费者连接到这个队列,之后所有的与这个队列连接的消费者都断开时 才会自动删除
//第五个参数 设置一些其他参数
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "hello world";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("sent'"+ message +"'");
channel.close();
connection.close();
}
}
public class Receive1 {
private final static String QUEUE_NAME="simple_queue";
public static void main(String[] args) throws IOException, TimeoutException{
Connection connection = RabbitUtil.getCon();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("received1:"+msg);
}
};
//监听队列,第二个参数 是否自动进行消息确认。(ACK消息确认机制 true 自动进行)
channel.basicConsume(QUEUE_NAME,true,consumer);
channel.close();
connection.close();
}
}
/**
* 消息确认机制(ACK)
* 自动ACK:消息一旦接收 消费者自动发送ACK
* 手动ACK: 消息接收后 不会发送ACK 需要手动调用
*/
public class Receive2 {
private final static String QUEUE_NAME="simple_queue";
public static void main(String[] args) throws IOException,TimeoutException {
Connection connection = RabbitUtil.getCon();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
int i = 6/0;
System.out.println("received1:"+msg);
//手动进行ACK
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听队列,第二个参数 是否自动进行消息确认。(ACK消息确认机制 true 自动进行)
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
3.2 work模式
一个生产者对应多个消费者,但是一条消息只能有一个消费者获得消息!!!
轮询分发就是将消息队列中的消息,依次发送给所有消费者。一个消息只能被一个消费者获取。
public class Send {
private static final String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接
Connection con = RabbitUtil.getCon();
//获取通道
Channel channel = con.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//循环发布任务
for(int i=0;i<100;i++){
// 消息内容
String message = "hello zhangpan。。。"+i;
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("Sent'"+ message +"'");
}
//关闭通道和连接
channel.close();
con.close();
}
}
/**
* channel.basicQos(1);
* 能者多劳
* 消费者1比消费者2的效率要低,一次任务的耗时较长
* 然而两人最终消费的消息数量是一样的
* 消费者2大量时间处于空闲状态,消费者1一直忙碌
* 现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
* 怎么实现呢?
* 通过BasicQos方法设置 prefetchCount = 1。这样RabbitMQ就hi使得每个Consumer在同一个时
间点最多处理一个Message。换句话说,在接收到该Consumer
*的ack前,他不会将新的Message分发给它。相反,他会将其他分派给不是忙碌的笑一个Consumer。
* 值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。
*/
public class Receive1 {
private final static String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException{
//创建连接
Connection con = RabbitUtil.getCon();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = con.createChannel();
channel.basicQos(1); //指该消费着在接收到队列的消息但没有返回确认结果之前,不会将新的消息分发给他
//声明队列名称
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
//获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//body 即消息体
String msg = new String(body);
System.out.println("消费者1 receive1:"+msg);
try {
//模拟消费耗时
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//监听队列,第二个参数:是否自动进行消息确认.
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
public class Receive2 {
private final static String QUEUE_NAME="work_queue";
public static void main(String[] args) throws IOException, TimeoutException{
Connection con = RabbitUtil.getCon();
Channel channel = con.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者2 receive2:"+msg);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
3.3 发布/订阅模式
一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。
ps:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers、这里的交换器是fanout。
两个消费者获得了同一条消息。即就是,一个消息从交换机同时发送给了两个队列中。监听这两个队列的消费者消费了这个消息。
如果没有队列绑定交换机,则消息将丢失。因为交换机没有存储能力,消息只能存储在队列中。
3.4 路由模式
生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。也就是让消费者有选择性的就收消息。
路由模式,是以路由规则为导向,引导消息存入符合规则的队列中。再由队列的消费者进行消费。
3.5 主题模式
上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的讲就是模糊匹配。
符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。
与路由模式相似,但是,主题模式是一种模糊的匹配方式。
3.6 工作模式总结
这五种工作模式,可以归为三类:
生产者,消息队列,一个消费者;
生产者,消息队列,多个消费者;
生产者,交换机,多个消息队列,多个消费者;
4.Exchange(交换器)类型介绍
4.1 Fanout(订阅模式|广播模式)
Fanout交换器会把所有发送到该交换器的消息路由到所有与该交换器绑定的消息队列中。订阅模式与Binding Key和Routing Key无关,交换器将接受到的消息分发给有绑定关系的所有消息队列(不论Binding Key和Routing Key是什么)。类似于子网广播,子网内的每台主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
public class RabbitUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getCon() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("192.168.1.103");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/kavito");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
factory.setUsername("kavito");
factory.setPassword("123456");
// 通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}
/**
* 定义生产者
* 广播模式 (交换机模式:Fanout,也成为广播)
* fanout与work两种模式有什么区别?
* 区别:
* ( 1 )work不用定义交换机,而fanout需要定义交换机
* ( 2 )fanout的生产是面向交换机发送消息,work的生产方是面向队列发送消息(底层使用默认交换机)。
* ( 3 )fanout需要设置队列和交换机的绑定,work不需要设置队列和交换机,实际上work会将队列绑定到默认的交换机上。
* 相同点:
* 两者实现发布订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
* 实际工作中用fanout还是work模式?
* 建议使用fanout广播模式,发布订阅模式比工作队列模式更强大(也可以做到同一队列竞争),并且发布订阅模式可以指定自己专用的交换机。
*/
public class Send {
private static final String EXCHANGE_NAME="fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = RabbitUtil.getCon();
//获取通道
Channel channel = connection.createChannel();
//声明exchange 指定类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//消息内容
String msg = "hello everyone";
//发布消息到Exchange
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
channel.close();
connection.close();
}
}
/**
* 定义消费者1(注册成功发给短信服务)
* 广播模式
*/
public class Recv1 {
//短信队列
private final static String QUEUE_NAME = "fanout_exchange_queue_sms";
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接
Connection connection = RabbitUtil.getCon();
//获取到通道
Channel channel = connection.createChannel();
//生命队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//body 即消息体
String msg = new String(body);
System.out.println("消费者1 【短线服务】:"+msg);
}
};
//监听队列 自动返回完成
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
/**
* 定义消费者2
* 广播模式
*/
public class Recv2 {
//邮件队列
private final static String QUEUE_NAME = "fanout_exchange_queue_email";
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接
Connection connection = RabbitUtil.getCon();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//获取消息,并且处理,这个方法类似事件监听,如果有消息的时候会被调用。
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//body 即消息体
String msg = new String(body);
System.out.println("消费者2:"+msg);
}
};
//监听队列 自动返回完成
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
4.2 Direct(路由模式)
Direct交换器需要消息的Routing Key与Exchange和Queue之间的Binding Key完全匹配,如果匹配成功,将消息分发给该Queue。只有当Routing Key和Binding Key完全匹配的时候,消息队列才可以获取消息。Direct Exchange的默认模式。RabbitMQ默认提供了一个Exchange,名字是空字符串,类型是Direct,绑定到所有Queue(每一个Queue和这个无名Exchange之间的Binding Key是Queue的名字)。所以,有时候我们感觉不需要交换器也可以发送和接收消息,但实际上使用了RabbitMQ默认提供的Exchange。
/**
* 订阅者模式 Direct
* 生产者
*/
public class Send {
private final static String EXCHANGE_NAME ="direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接
Connection connection = RabbitUtil.getCon();
//获取通道
Channel channel = connection.createChannel();
//声明exchange,指定类型为direct
// channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//消息内容
String msg = "插入商品";
//发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息
channel.basicPublish(EXCHANGE_NAME,"sms",null,msg.getBytes());
System.out.println("【X】send:"+msg);
channel.close();
connection.close();
}
}
public class Receive1 {
//短信队列
private final static String QUEUE_NAME = "direct_exchange_queue_sms";
private final static String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接
Connection connection = RabbitUtil.getCon();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机,同时指定需要订阅的routing key. 可以指定多个
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"sms");//指定接收发送方指定
routing key为sms的消息
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"email");
//定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//body 即消息体
String msg = new String(body);
System.out.println("消费者1接收 【短信服务】:"+msg);
}
};
//监听队列,自动ACK
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
public class Receive2 {
//邮件队列
private final static String QUEUE_NAME = "direct_queue_2";
private final static String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接
Connection connection = RabbitUtil.getCon();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"email");//指定接收方指定
routing key为email的消息
//将三种队列绑定到交换机
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"insert");
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
//定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用。
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//body 即消息体
String msg = new String(body);
System.out.println("消费者1接收 【邮件服务】:"+msg);
}
};
//监听队列,自动ACK
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
4.3 Topic(通配符模式)
Topic交换器按照正则表达式模糊匹配:用消息的Routing Key与Exchange的Queue之间的Binding Key进行模糊匹配,如果匹配成功,将消息分发到该Queue。Routing Key是一个句点号”.“分割的字符串(我们将被句点号”.“分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号”.“分隔的字符串。Binding Key中可以存在两种特殊字符”*“与”#“,用于做模糊匹配,其中 双引号中的星用于匹配一个单词,“#”用于匹配多个单词(也可以是零个或一个)。
例如下面这个消息队列场景,用的是topic类型交换器:
/**
* 通配符模式(交换机类型:topics)
* 每个消费者监听自己的队列,并且设置带通配符的routingKey,生产者将消息发给broker,由交换机根
据routingKey来转发消息到指定的队列。
* 通配符规则:
* # :匹配一个或多个词
* * :匹配不多不少恰好1个词
*/
public class Send {
private static final String EXCHANGE_NAME="topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接
Connection con = RabbitUtil.getCon();
//获取通道
Channel channel = con.createChannel();
/*交换机持久化 在S1后面加上 b:true*/
channel.exchangeDeclare(EXCHANGE_NAME,"topic");//声明exchange:指定类型为topic
String message = "商品新增";
//发送消息,并且指定routing key为:item.insert
channel.basicPublish(EXCHANGE_NAME,"item.insert",null,message.getBytes());
//消息持久化
// channel.basicPublish(EXCHANGE_NAME,"item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("send【信息描述】:"+message);
channel.close();
con.close();
}
}
public class Receive1 {
private final static String QUEUE_NAME = "topic_queue_1";
private final static String EXCHANGE_NAME = "topic_exchange_1";
public static void main(String[] args) throws IOException, TimeoutException {
//
Connection connection = RabbitUtil.getCon();
Channel channel = connection.createChannel();
/*b:true 队列持久化*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.insert");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.delete");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者1接收:"+message);
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
public class Receive2 {
private final static String QUEUE_NAME = "topic_queue_2";
private final static String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtil.getCon();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.*");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者2接收:"+message);
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
RPC机制实现:
MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或失败,甚至连有没有消费者来处理这条消息都不知道。但实际的应用场景中,我们很可能需要做一些同步处理需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(远程调用)。
RabbitMQ中实现RPC的机制是:
1.生产者发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了 14 个属性,这些属性会随着消息一起发送)中设置两个属性值replyTo(一个Queue名称,用于告诉消费者处理完成后将通知我的消息发送到这个Queue中)和correlationld(此次请求的标识号,消费者处理完成后需要将此属性返还,生产者将根据这个id了解哪条请求被成功执行了或执行失败了)。
2.消费者收到消息并处理。
3.消费者处理完消息后,将生成一条应答消息到replyTo指定到Queue,同时带上correlationld属性。
4.生产者之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationld属性分析哪条请求被执行了,根据执行结果进行后续业务处理。
消息确认机制:
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在Timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。这里会产生另一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的问题,Queue中堆积的消息越来越多,消费者重启后会重复消费这些消息并重复执行业务逻辑。如果我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ会立即把这个Message标记为完成,然后从queue中删除。
消息持久化机制:
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依旧解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到 事务 。
事务:
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器接收。当然RabbitMQ也提供了txRollback()命令用于回滚某一个事务。
消息分发机制:
我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据。因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里,这时,就得使用工作队列了。一个队列有多个消费者同时消费数据。工作队列有两种分发数据的方式: 轮询分发 (Round-robin)和 公平分发 (Fairdispatch)。 轮询分发 :队列给每一个消费者发送数量一样的数据。 公平分发: 消费者设置没次从队列中取一条数据,并且消费完后手动应答,继续从队列取下一个数据。
1.轮询分发
如果工作队列中有两个消费者,两个消费者得到的数据量一样的,并不会因为两个消费者处理数据速度不一样使得两个消费者取得不一样数量的数据。但是这种分发方式存在着一些隐患,消费者虽然得到了消息,但是如果消费者没能成功处理业务逻辑,在RabbitMQ中也不存在这条消息。就会出现消息丢失并且业务逻辑没能成功处理的情况。
2.公平分发
消费者设置每次从队列里取一条数据,并且关闭自动回复机制,每次取完一条数据后,手动回复并继续取下一条数据。与轮询分发不同的是,当每个消费都设置了每次只会从队列取一条数据时,并且关闭自动应答,在每次处理完数据后手动给队列发送确认收到数据。这样队列就会公平给每个消费者发送数据,消费一条再发第二条,而且可以在管理界面中看到数据是一条条随着消费者消费完,从而减少的。并不是一下子全部分发完了。采用公平分发方式就不会出现消息丢失并且业务逻辑没能成功处理的情况。
4.4 四种交换器总结
1 、direct 如果路由键完全匹配的话,消息才会被投放到相应的队列。
2 、fanout 当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。
3 、topic 设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。
4 、header headers 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器
5. SpringBoot整合RabbitMQ
5.1 消息推送到接收的流程
示意图
生产者就是消息推送服务,将消息推送到中间方框里面也就是rabbitMQ的服务器,然后经过服务器里面的交换机、队列等各种关系将数据处理入列后,最终消费者获取对应监听的消息。
Direct Exchange
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键routing key。然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X区寻找绑定值也是X的队列。
Fanout Exchange
扇形交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:
*(星号)用来表示一个单词(必须出现的)
#(并号)用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举一个小例子
队列Q1绑定键为 * .TT. * 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为A.T.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
主题交换机是非常强大的,为啥这么膨胀?
当一个队列的绑定键为 “#” (井号) 的时候,这个队列将无视消息的路由键,接收所有的消息。
当*(星号)和 #(井号)这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能和直连交换机的功能。
另外还有Header Exchange头交换机,Default Exchange默认交换机,Dead Letter Exchange 死信交换机。
5.2 SpringBoot整合RabbitMQ代码案例
SpringBoot整合RabbitMQ,实现了work模型,发布订阅模型,topic模型,confirm消息确认机制,return消息机制,TTL队列,死信队列等。整合的方式有很多,比如使用RabbitAdmin进行整合,本文介绍一个最简单的整合方式,只需要在RabbitmqConfig.java配置文件中进行简单配置,便可以方便的使用。
5.2.1 基本配置信息
pom.xml里用到的依赖
<!--rabbitmq依赖-->
<dependency
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
然后application.yml
#应用名
spring:
application:
name: springboot-rabbitmq
rabbitmq:
host: 192.168.45.128
port: 5672
username: zhangpan
password: 123456
virtual-host: /zhangpan
#消息开启手动确认
listener:
direct:
acknowledge-mode: manual
#开启消息确认机制 confirm 异步
publisher-confirm-type: correlated
#之前旧版本 开启消息确认机制的方式
# publisher-confirms: true
#开启return机制
publisher-returns: true
对象User类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serialzable {
private static final long serialVersionUID = 8142836626401616290L;
private Integer id;
private String name;
private String password;
}
5.2.2 work模型
RabbitmqConfig.java
@SpringBootConfiguration
public class RabbitmqConfig{
//配置一个工作模型队列
@Bean
public Queue queueWork1() {
return new Queue("queue_work");
}
}
RabbitmqMapper.java
@Componet
public class RabbitmqMapper{
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendWork() {
for(int i = 0 ; i < 10 ;i++) {
rabbitTemplate.convertAndSend("queue_work","测试work模型:"+i);
}
}
}
RabbitmqServiceImpl.java
@Service
public class RabbitmqServiceImpl implements RabbitmqService {
@Autowired
private RabbitmqMapper rabbitmqMapper;
@Override
public void sendWork(){
rabbitmqMapper.sendWork();
}
}
RabbitmqController.java
@RestController
public class RabbitmqController {
@Autowired
private RabbitmqService rabbitmqService;
@RequestMapping("/sendWork")
public Object sendWork(){
rabbitmqService.sendWork();
}
}
WorkReceiveListener.java
//2个消费者
@Component
public class WorkReceiveListener {
@RabbitListener(queues = "queue_work")
public void receiveMessage(String msg,Channel channel,Message message) {
//只包含发送消息
System.out.println("1.接收到消息:"+ msg);
//channel 通道信号
//message 附加的参数信号
}
@RabbitListener(queues = "queue_work")
public void receiveMessage2(Object object,Channel channel,Message message) {
//包含所有的信息
System.out.println("2.接收到消息:"+object);
}
}
访问 http://127.0.0.1:8080/sendWork,输出如下数据
2接收到的消息:(Body:'测试work类型:1' MessageProperties [headers={},
contentType=text/plain, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=, receivedRoutingKey=queue_work, deliveryTag=1,
consumerTag=amq.ctag-mcL5nglbYgV3sz16fgnAnQ, consumerQueue=queue_work])
2接收到的消息:(Body:'测试work类型:3' MessageProperties [headers={},
contentType=text/plain, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=, receivedRoutingKey=queue_work, deliveryTag=2,
consumerTag=amq.ctag-mcL5nglbYgV3sz16fgnAnQ, consumerQueue=queue_work])
2接收到的消息:(Body:'测试work类型:5' MessageProperties [headers={},
contentType=text/plain, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=, receivedRoutingKey=queue_work, deliveryTag=3,
consumerTag=amq.ctag-mcL5nglbYgV3sz16fgnAnQ, consumerQueue=queue_work])
2接收到的消息:(Body:'测试work类型:8' MessageProperties [headers={},
contentType=text/plain, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=, receivedRoutingKey=queue_work, deliveryTag=4,
consumerTag=amq.ctag-mcL5nglbYgV3sz16fgnAnQ, consumerQueue=queue_work])
1接收到消息:测试work类型:0
1接收到消息:测试work类型:2
1接收到消息:测试work类型:4
1接收到消息:测试work类型:6
1接收到消息:测试work类型:7
2接收到的消息:(Body:'测试work类型:9' MessageProperties [headers={},
contentType=text/plain, contentEncoding=UTF-8, contentLength=0,
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
receivedExchange=, receivedRoutingKey=queue_work, deliveryTag=5,
consumerTag=amq.ctag-mcL5nglbYgV3sz16fgnAnQ, consumerQueue=queue_work])
5.2.3 发布订阅模型
RabbitmqConfig.java
//发布订阅模式
//声明两个队列
@Bean
public Queue queueFanout1() {
return new Queue("queue_fanout1");
}
@Bean
public Queue queueFanout2() {
return new Queue(queue_fanout2);
}
//准备一个交换机
@Bean
public FanoutExchange exchangeFanout() {
return new FanoutExchange("exchange_fanout");
}
//将交换机和队列进行绑定
@Bean
public Binding bindingExchange1() {
return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());
}
@Bean
public Binding bindingExchange2() {
return BindingBuilder.bind(queueFanout2()).to(ExchangeFanout());
}
RabbitmqMapper.java
//向发布订阅模式里面发送消息
public void sendPublish() {
for(int i = 0;i < 5;i++) {
//rabbitTemplate.convertAndSendReceive("exchange_fanout","","测试发布订阅
模型:"+ i);
rabbitTemplate.convertAndSend("exchange_fanout","","测试发布订阅模
型:"+i);
}
}
RabbitmqServiceImpl.java
@Override
public void sendPublish() {
rabbitmqMapper.sendPublish();
}
RabbitmqController.java
@RequestMapping
public String sendPublish() {
rabbitmqService.sendPublish();
return "发送成功";
}
PublishReceiveListener.java
@Component
public class PublishReceiveListener {
@RabbitListener(queues = "queue_fanout1")
public void receiveMsg1(String msg) {
System.out.println("队列1接收到消息:"+msg);
}
@RabbitListener(queues = "queue_fanout2")
public void receiveMsg2(String msg) {
System.out.println("队列2接收到的消息:"+ msg);
}
}
注:使用convertAndSend方法时的结果:输出时没有顺序,不需要等待,直接运行。
使用convertSendAndReceive方法时的结果:按照一定的顺序,只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有间隔时间。
访问 http://localhost:8080/sendPublish,输出如下:
队列1接收到消息:测试发布订阅模型:0
队列1接收到消息:测试发布订阅模型:1
队列1接收到消息:测试发布订阅模型:2
队列1接收到消息:测试发布订阅模型:3
队列1接收到消息:测试发布订阅模型:4
队列2接收到消息:测试发布订阅模型:0
队列2接收到消息:测试发布订阅模型:1
队列2接收到消息:测试发布订阅模型:2
队列2接收到消息:测试发布订阅模型:3
队列2接收到消息:测试发布订阅模型:4
5.2.4 topic模型
RabbitmqConfig.java
//topic 模型
@Bean
public Queue queueTopic1() {
return new Queue("queue_topic1");
}
@Bean
public Queue queueTopic2() {
return new Queue("queue_topic2");
}
@Bean
public TopicExchange exchangeTopic() {
return new TopicExchange("exchange_topic");
}
@Bean
public Binding bindingTopic1(){
return BindingBuilder.bind(queueTopic1()).to(exchangeTopic()).with("topic.#");
}
@Bean
public Binding bindingTopic2(){
return BindingBuilder.bind(queueTopic2()).to(exchangeTopic()).with("topic.*");
}
RabbitmqMapper.java
//向topic模型发送数据
public void sendTopic() {
for(int i = 0;i < 10; i++) {
if(i%2==0){
rabbitTemplate.convertSendAndReceive("echange_topic","topic.km.topic","测试发布订阅模型:"+i);
} else {
rabbitTemplate.convertSendAndReceive("exchange_topic","topic.km","测试发布订阅模型:"+ i);
}
}
}
RabbitmqServiceImpl.java
@Override
public void sendTopic() {
rabbitmqMapper.sendTopic();
}
RabbitmqController.java
@RequestMapping("/sendTopic")
public String sendTopic() {
rabbitmqService.sendTopic();
return "发送成功。。。";
}
TopicReceiveListener.java
@Component
public class Topic ReceiveListener{
@RabbitListener(queues = "queue_topic1")
public void receiveMsg1(String msg) {
System.out.println("消费者1接收到:"+msg);
}
@RabbitListener(queues = "queue_topic2")
public void receiveMsg2(String msg) {
System.out.println("消费者2接收到:"+ msg);
}
}
结论: 消费者 1 接收到了所有的数据,消费者 2 只接收到了一半(奇数)的数据
消费者1接收到:测试发布订阅模型:0
消费者1接收到:测试发布订阅模型:1
消费者2接收到:测试发布订阅模型:1
消费者1接收到:测试发布订阅模型:2
消费者2接收到:测试发布订阅模型:3
消费者1接收到:测试发布订阅模型:3
消费者1接收到:测试发布订阅模型:4
消费者2接收到:测试发布订阅模型:5
消费者1接收到:测试发布订阅模型:5
消费者1接收到:测试发布订阅模型:6
消费者1接收到:测试发布订阅模型:7
消费者2接收到:测试发布订阅模型:7
消费者1接收到:测试发布订阅模型:8
消费者1接收到:测试发布订阅模型:9
消费者2接收到:测试发布订阅模型:9
confirm机制
application.properties
#开启消息确认机制 confirm 异步
spring.rabbitmq.publisher-confirm-type=correlated
#之前的旧版本 开启消息确认机制的方式
#spring.rabbitmq.publisher-confirms=true
#消息开启手动确认
spring.rabbit.listener.direct.acknowledge-mode=manual
RabbitmqConfig.java
//测试confirm机制,专门创建了一个队列
@Bean
public Queue queueConfirm() {
return new Queue("queue_confirm");
}
RabbitmqMapper.java
//配置confirm机制
private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
// @param correlationData 消息相关的数据,一般用于获取唯一标识id
// @param b true 消息确认成功, false 失败
// @param s 确认失败的原因
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if(b){
System.out.println("confirm 消息确认成功..."+correlationData.getId());
}else{
System.out.println("confirm 消息确认失败..."+ correlationData.getId()+ "cause:"+ s);
}
}
};
//测试 confirm机制
public void sendConfirm() {
rabbitTemplate.convertAndSend("queue_confirm",new User(1,"km","km123"),new
CorrelationData(""+System.currentTimeMillis()));
rabbitTemplate.setConfirmCallBack(confirmCallback);
}
confirmRabbitmqServiceImpl.java
@RequestMapping("/sendConfirm")
public String sendConfirm() {
rabbitmqService.sendConfirm();
return "发送成功";
}
ConfirmReceiveListener.java
@Component
public class ConfirmReceiveListener {
@RabbitListener(queues = "queue_confirm")
public void receiveMsg(User user) {
System.out.println("接收到的消息为:"+ user);
}
}
注意:
使用confirm机制时,发送消息时最好把 CorrelationData 加上,因此如果出错了,使用CorrelationData 可以更快的定位到错误信息
访问 http://localhost:8080/sendConfirm,输出如下:
confirm 消息确认成功...1663137940862
接收到的消息为:User{id=1, name='km', password='km123'}
return机制
application.properties
#开启return机制
spring.rabbitmq.publisher-returns=true
#消息开启手动确认
spring.rabbitmq.listener.direct.acknowledge-mode=manual
RabbitmqConfig.java
//测试return机制
@Bean
public Queue queueReturn() {
return new Queue("queue_return");
}
@Bean
public TopicExchange exchangeReturn(){
return new TopicExchange("exchange_return");
}
@Bean
public Binding bindingReturn(){
return BindingBuilder.bind(queueReturn()).to(exchangeReturn()).with("return.*");
}
RabbitmqMapper.java
//配置return 消息机制
private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback(){
//return 的回调方法(找不到路由才会触发)
//@param message 消息的相关信息
//@param i 错误状态码
//@param s 错误状态码对应的文本信息
//@param s1 交换机的名字
//@param s2 路由的key
@Override
public void returnMessage(Message message,int i,String s,String s1,String
s2){
System.out.println(message);
System.out.println(new String(message.getBody()));
System.out.println(i);
System.out.println(s);
System.out.println(s1);
System.out.println(s2);
}
};
//测试return机制
public void sendReturn() {
rabbitTemplate.setReturnsCallback(returnsCallback);
/**
* convertAndSend(String exchange,String routingKey,Object message);
* 使用此方法,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程
* 参数代表如下:
* 参数 1 :交换机名称
* 参数 2 :路由键,如果没有使用到路由键,可以为空。
* 参数 3 :发送的消息内容
*/
//正确的路由
//rabbitTemplate.convertAndSend("exchange_return","return.km","测试return机制");
//错误的路由
rabbitTemplate.convertAndSend("exchange_return","return.km.km","测试return机制");
}
RabbitmqServiceImpl.java
@Override
public void sendReturn() {
rabbitmqMapper.sendReturn();
}
RabbitmqController.java
@RequestMapping("/sendReturn")
public String sendReturn(){
rabbitmqService.sendReturn();
return "发送成功";
}
ReturnReceiveListener.java
@Component
public class ReturnReceiveListener{
@RabbitListener(queues = "queue_return")
public void receiveMsg(String msg) {
System.out.println("接收的消息为:"+msg);
}
}
测试结果
使用正确的路由:return.km
使用错误的路由打印的信息:return.km.km
(Body:'测试return机制' MessageProperties [headers={}, contentType=text/plain,
contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT,
priority=0, deliveryTag=0])
[B@3f30e8be
312
NO_ROUTE
exchange_return
return.km.km
TTL队列、死信队列
TTL队列、死信队列和普通队列的用法是一致的,这里说明其创建方式
RabbitmqConfig.java
//TTL队列
@Bean
public Queue queueTTL(){
Map<String,Object> map = new HashMap<String,Object>(1);
map.put("x-message-ttl",10000);
return new Queue("queue_ttl",true,false,false,map);
}
//产生死信的队列
@Bean
public Queue queueDLX() {
Map<String,Object> map = new HashMap<>(2);
//5秒后,消息自动变为死信
map.put("x-message-ttl",5000);
map.put("x-dead-letter-exchange","exchange_receive");
map.put("x-dead-letter-routing-key","receive_key");
return new Queue("queue_dlx",true,false,false,map);
}
//死信交换机
@Bean
public DirectExchange exchangeDLX(){
return new DirectExchange("exchange_dlx");
}
//给死信队列绑定交换机
@Bean
public Binding bindingDLX(){
return BindingBuilder.bind(queueDLX()).to(exchangeDLX()).with("receive_key");
}
//死信接收交换机
@Bean
public DirectExchange exchangeReceive() {
return new DirectExchange("exchange_receive");
}
//接收死信的队列
public Queue queueReceive(){
return BindingBuilder.bind(queueReceive()).to(exchangeReceive()).with("receive_key");
}
//将交换机和死信队列绑定
@Bean
public Binding bindingReceive(){
return BindingBuilder.bind(queueReceive()).to(exchangeReceive()).with("exchange_receive
");
}
测试
启动项目后,队列和交换机已经初始化完毕,可前往 http://192.168.5.128:15672/自行验证死信队列以及TTL队列
点击dlx交换机
输入key和要发送的值,publish
刚开始,queue_dlx中有一条数据, 5 秒后,消息变成死信,自动转移到queue——receive中
6. RabbitMQ面试题
6.1 什么是消息队列
6.1.1 消息队列的优点:
( 1 )解耦:将系统按照不同的业务功能拆分出来,消息生产者只管把 消息发布到MQ中 而不用管谁来取,消息消费者只管从MQ中 取消息 而不管是谁发布的。消息生产者和消费者都不知道对方的存在;
( 2 )异步:主流程只需要完成业务的核心功能;对于业务非核心功能,将消息放入到消息队列之中进行异步处理,减少请求的等待,提高系统的总体性能;
( 3 )削峰/限流:将所有请求都写到消息队列中,消息服务器按照自身能够处理的请求从队列中拿到请求,防止请求并发过高将系统搞崩溃;
6.1.2 消息队列的缺点:
( 1 )系统的可用性降低:系统引用的外部依赖越多,越容易挂掉,如果MQ服务器挂掉,那么可能会导致整套系统崩溃。这时就要考虑如何保证消息队列的高可用了
( 2 )系统复杂度提高:加入消息队列之后,需要保证消息没有重复消费,如何处理消息丢失的情况,如何保证消息传递的有序性等问题;
( 3 )数据一致性问题:A系统处理完了直接返回成功了,使用者都以为你这个请求就成功了;但是问题是,要是BCD三个系统那里,BD两个系统写库成功了,结果C系统写库失败了,就会导致数据不一致了
6.1.3 Kafka、ActiveMQ、RabbitMQ、RocketMQ消息队列的选型:
每种MQ没有绝对的好坏,主要依据使用场景,扬长避短,利用其优势,规避其劣势。
( 1 )中小型软件公司,技术实力较为一般,建议选RabbitMQ:一方面,erlang语言天生具备高并发的特性,而且管理界面用起来十分方便。代码是开源的,而且社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。
不考虑rocketmq的原因是,rocketmq是阿里出品,如果阿里放弃维护rocketmq,中小型公司一般抽不出人
来进行rocketmq的定制化开发,因此不推荐。
不考虑kafka的原因是:中小型软件公司不如互联网公司,数据量没那么大,选消息中间件应首选功能比较完
备的,所以kafka排除
( 2 )大型软件公司:根据具体使用场景在rocketmq和kafka之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对rocketMQ,大型软件公司有能力对rocketMQ进行定制化开发。至于kafka,如果是大数据领域的实时计算、日志采集功能,肯定是首选kafka了。
6.2 RabbitMQ的构造
RabbitMQ是AMQP协议的一个开源实现,所以其内部实际上也是AMQP中的基本概念:
~ (1)生产者Publisher:生产消息,就是投递消息的一方。消息一般包含两个部分:消息体(payload)
和标签(Label)
~ (2)消费者Consumer:消费消息,也就是接收消息的一方。消费者连接到RabbitMQ服务器,并订阅到队
列上。消费消息时只消费消息体,丢弃标签。
~ (3)Broker服务节点:表示消息队列服务器实体。一般情况下一个Broker可以看做一个RabbitMQ服务
器。
~ (4)Queue:消息队列,用来存放消息。一个消息可投入一个或多个队列,多个消费者可以订阅同一队
列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
~ (5)Exchange:交换器,接受生产者发送的消息,根据路由键将消息路由到绑定的队列上。
~ (6)Routing Key:路由关键字,用于指定这个消息的路由规则,需要与交换器类型和绑定键
(Binding Key)联合使用才能最终生效。
~ (7)Binding:绑定,通过绑定将交换器和队列关联起来,一般会指定一个BindingKey,通过
BindingKey,交换器就知道将消息路由给哪个队列了。
~ (8)Connection:网络连接,比如一个TCP连接,用于连接到具体broker
~ (9)Channel:信道,AMQP命令都是在信道中进行的,不管是发布消息,订阅队列还是接收消息,这些动
作都是通过信道完成。因为建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连
接,一个TCP连接可以用多个信道。客户端可以建立多个channel,每个channel表示一个会话任务。
~ (10)Message:消息,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组
成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、deliverymode(指出该消息可能需要持久性存储)等。
~ (11)Virtual host:虚拟主机,用于逻辑隔离,表示一批独立的交换器、消息队列和相关对象。一个
Virtual host可以由若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或
Queue。最重要的是,其拥有独立的权限系统,可以做到vhost范围的用户控制。当然,从RabbitMQ的全局
角度,vhost可以作为把不同权限隔离的手段
6.3 Exchange交换器的类型:
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers
( 1 )direct:消息中的路由键(RoutingKey)如果和Bingding中的bindingKey完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
( 2 )fanout:把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout类型转发消息是最快的。
( 3 )topic:通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
匹配规则:
1. RoutingKey和BindingKey为一个点号‘.’分割的字符串。比如:java.xiaoka.show
2. BindingKey可使用*和#用于做模糊匹配:*匹配一个单词,#匹配多个或者0个单词
( 4 )headers:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外headers交换器和direct交换器完全一致,但性能差很多,目前几乎用不到了
6.4 生产者消息的过程:
(1)Producer先连接到Broker,建立连接Connection,开启一个信道channel
(2)Producer声明一个交换器并设置好相关属性
(3)Producer声明一个队列并设置好相关属性
(4)Producer通过绑定键将交换器和队列绑定起来
(5)Producer发送消息到Broker,其中包含路由键交换器等信息
(6)交换器根据接收到的路由键查找匹配的队列
(7)如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者。
(8)关闭信道
6.5 消费者接收消息过程
(1)Producer先连接到Broker,建立连接Connection,开启一个信道channel
(2)向Broker请求消费相应队列中的消息,可能会设置响应的回调函数。
(3)等待Broker回应并投递相应队列中的消息,接收消息。
(4)消费者确认收到的消息,ack。
(5)RabbitMQ从队列中删除已经确定的消息。
(6)关闭信道
6.6 如何保证消息不被重复消费?
正常情况下,消费者在消费消息后,会给消息队列发送一个确认,消息队列接收后就知道消息已经被成功消费了,然后就从队列中删除该消息,也就不会将该消息再发送给其他消费者了。不同消息队列发出的确认消息形式不同,RabbitMQ是通过发送一个ACK确认消息。但是因为网络故障,消费者发出的确认并没有传到消息队列,导致消息队列不知道该消息已经被消费,然后就再次将消息发送给了其他消费者,从而导致重复消费的情况。
重复消费问题的解决思路是:保证消息的唯一性,即使多次传输,也不让消息的多次消费带来影响,也就是保证消息等幂性;幂等性指一个操作执行任意多次所产生的影响均与一次执行的影响相同。具体
解决方案如下:
( 1 )改造业务逻辑,使得再重复消费时也不影响最终的结果。例如对SQL语句:update t1 set money = 150 where id = 1 anyd money = 100;做了个前置条件判断,即money = 100的情况下才会做出更新,更通用的是做一个version即版本号控制,对比消息中的版本号和数据库中的版本号。
( 2 )基于数据的唯一键进行约束。消费完消息之后,到数据库中做一个insert操作,如果出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
( 3 )通过记录关键的key,当重复消息过来时,先判断这个key是否已经被处理过了,如果没处理再进行下一步。
1.通过数据库:比如处理订单时,记录订单ID,在消费前,去数据库中进行查询该记录是否存在,如果存在则
直接返回。
2.使用全局唯一ID,在配合第三组主键做消费记录,比如使用redis的set结构,生产者发送消息时给消息分
配一个全局ID,在每次消费者开始消费前,先去redis中查询有没有消费记录,如果消费过则不处理,如果没
消费过,则进行处理,消费完之后,就将这个ID以k-v的形式存入redis中(过期时间根据具体情况设置)。
6.7 如何保证消息不丢失,进行可靠性传输?
对于消息的可靠性传输,每种MQ都要从三个角度来分析:生产者丢数据、消息队列丢数据、消费者丢数据。以rabbitMQ为例:
6.7.1 生产者丢数据
RabbitMQ提供事务机制(transaction)和确认机制(confirm)两种模式来确保生产者不丢消息。
( 1 )事务机制:
发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())
该方式的缺点是生产者发送消息会同步阻塞等待发送结果是成功还是失败,导致生产者发送消息的吞吐量降下来。
//开启事务
channel.txSelect
try{
//发送消息
}catch(Exception e){
//回滚事务
channel.txRollback;
//再次重试发送这条消息
....
}
//提交事务
channel.txCommit;
( 2 )确认机制:
生产环境常用的是confirm模式。生产者将信道channel设置成confirm模式,一旦channel进入confirm模式,所有在该信道上发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,rabbitmq就会发送一个确认给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确的到达目的队列了。如果rabbitmq没能处理该消息,也会发送一个Nack消息给你,这时就可以进行重试操作。
Confirm模式最大的好处在于它是异步的,一旦发布消息,生产者就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法来处理该确认消息。
处理ACK和Nack的代码,如下所示:
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws
IOException {
System.out.println("nack: deliveryTag = "+deliveryTag+" multiple:"+multiple);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("ack: deliveryTag = "+deliveryTag+" multiple:"+multiple);
}
});
6.7.2 消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘。持久化配置可以和生产者的confirm机制配合使用,在消息持久化磁盘后,再给生产者发送一个ACK信号。这样的话,如果消息持久化磁盘之前,即使RabbitMQ挂掉了,生产者也会因为收不到Ack信息而再次重发消息。
持久化设置如下(必须同时设置以下2个配置):
(1)创建queue的时候,将queue的持久化标志durable在设置为true,代表是一个持久化的队列。这样就
可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里的数据;
(2)发送消息的时候将deliveryMode设置为2,将消息设置为持久化的,此时Rabbitmq就会将消息持久化
到磁盘上去。
这样设置以后,RabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时时,可能服务已经死掉,这种情况可以通过引入镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
6.7.3 消费者丢数据:
消费者丢数据一般是因为采用了自动确认消息模式。该模式下,虽然消息还在处理中,但是消费者会自动发送一个确认,通知RabbitMQ已经收到消息了,这时RabbitMQ就会立即将消息删除。这种情况下,如果消费者出现异常而未能处理消息,那就会丢失该消息。
解决方案就是采用手动确认消息,设置autoAck = false,等到消息被真正消费后,再手动发送一个确认信号,即使中途消息没处理完,但是服务器宕机了,那RabbitMQ就收不到发的ack,然后RabbitMQ就会将这条消息重新分配给其他的消费者去处理。
但是RabbitMQ并没有使用超时机制,RabbitMQ仅通过与消费者的连接来确认是否需要重新发送消息,也就是说,只要连接不中断,RabbitMQ会给消费者足够长的时间来处理消息。另外,采用手动确认消息的方式,我们也需要考虑一下几种特殊情况:
如果消费者接收到消息,再确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被消费,然后重新分发给下一个订阅的消费者,所以存在消息重复消费的隐患
如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给消费者分发更多的消息
需要注意的点:
1、消息可靠性增强了,性能就下降了,因为写磁盘比写RAM慢的多,两者的吞吐量可能有10倍的差距。所
以,是否要对消息进行持久化,需要综合考虑业务场景、性能需求、以及可能遇到的问题。若想达到单
RabbitMQ服务器10w条/秒以上的消息吞吐量,则要么使用其他的方式来确保消息的可靠传递,要么使用非常
快速的存储系统以支持全持久化,例如使用SSD。或者仅对关键消息做持久化处理,且应该保证关键消息的量
不会导致性能瓶颈。
2、当设置autoAck = False时,如果忘记手动ack,那么将会导致大量任务都处于Unacked状态,造成队
列堆积,直至消费者断开才会重新回到队列。解决方法是及时ack,确保异常时ack或者拒绝消息。
3、启用消息拒绝或者发送nack后导致死循环的问题:如果再消息处理异常时,直接拒绝消息,消息会重新进
入队列。这时候如果消息再次被处理时又被拒绝。这样就会形成死循环。
6.8 如何保证消息的有序性?
针对保证消息有序性的问题,解决方法就是保证生产者入队的顺序是有序的,出队后的顺序消费则交给消费者去保证。
( 1 )方法一:拆分queue,使得一个queue只对应一个消费者。由于MQ一般都能保证内部队列是先进先出的,所以把需要保持先后顺序的一组消息使用某种算法都分配到同一个消息队列中。然后只用一个消费者单线程去消费该队列,这样就能保证消费者是按照顺序进行消费的了。但是消费者的吞吐量会出现瓶颈。如果多个消费者同时消费一个队列,还是可能会出现顺序错乱的情况,这就相当于是多线程消费了。
( 2 )方法二:对于多线程的消费同一个队列的情况,可以使用重试机制:比如有一个微博业务场景的操作,发微博、写评论、删除微博,这三个异步操作。如果一个消费者先执行了写评论的操作,但是这时微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,等一段时间。等另一个消费者,先执行发微博的操作后,再执行,就可以成功。
6.9 如何处理消息堆积情况?
场景题:几千万条数据再MQ里积压了七八个小时。
6.9.1 出现该问题的原因:
消息堆积往往是生产者的生产速度与消费者的消费速度不匹配导致。有可能就是消费者消费能力弱,渐渐地消息就积压了,也有可能是因为消息消费失败反反复复重试造成的,也有可能是消费者端出了问题,导致不消费了或者消费极其慢。比如,消费端每次消费之后要写mysql,结果mysql挂了,消费端卡住不动了,或者消费者本地依赖的一个东西挂了,导致消费者挂了。
所以如果是bug则处理bug;如果是因为本身消费能力较弱,则优化消费逻辑,比如优化前是一条一条消息消费处理的,那么就可以批量处理进行优化。
6.9.2 临时扩容,快速处理积压的消息
( 1 )先修复consumer的问题,确保其恢复消费速度,然后将现有的consumer都停掉;
( 2 )临时创建原先N倍数量的queue,然后写一个临时分发数据的消费者程序,将该程序部署上去消费队列中积压的数据,消费之后不做任何耗时处理,直接均匀轮询写入临时建立好的N倍数量的queue中;
( 3 )接着,临时征用N倍的机器来部署consumer,每个consumer消费一个临时queue的数据
( 4 )等快速消费完积压数据之后,恢复原先部署架构,重新用原先的consumer机器消费消息。
这种做法相当于临时将queue资源和consumer资源扩大N倍,以正常N倍速度消费。
6.9.3 恢复队列中丢失的数据
如果使用的是rabbitMQ,并且设置了过期时间,消息再queue里积压超过一定的时间会被rabbitmq清理掉,导致数据丢失。这种情况下,实际上队列中没有什么消息挤压,而是丢了大量的消息。所以就不能说增加consumer消费积压的数据了,这种情况可以采取 “批量重导” 的方案来进行解决。再流量低峰期,写一个程序,手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来。
6.9.4 MQ长时间未处理导致MQ写满的情况下如何处理
如果消息积压再MQ里,并且长时间都没处理掉,导致MQ都快写满了,这种情况肯定是临时扩容方案执行太慢,这种时候只好采用 “丢弃+批量重导” 的方式来解决了。首先,临时写个程序,连接到mq里面消费数据,消费一个丢弃一个,快速消费掉积压的消息,降低MQ的压力,然后再流量低峰期时去手动查询导致重导丢失的这部分数据。
6.9.5 如何避免消息堆积?
( 1 )采用workqueue,多个消费者监听同一队列。
( 2 )接收消息以后,而是通过线程池,异步消费。
6.9.6 如何避免消息丢失?
( 1 )消费者的ACK机制。可以防止消费者丢失消息。
但是,如果在消息消费之前,MQ就宕机了消息没了?
( 2 )可以将消息进行持久化。要将消息持久化,前提是:队列、Exchange都持久化
交换机持久化
Channel channel = connection.createChannel();
//声明exchange,指定类型为topic
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC,true);
//消息内容
String message = "这是一只行动迅速的橙色的兔子";
队列持久化
Channel channel = conection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//绑定队列到交换机,同时需要订阅routing key。订阅关于兔子以及懒惰动物的消息
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
6.10 如何保证消息队列的高可用?
RabbitMQ是基于主从(非分布式)做高可用性的,RabbitMQ有三种模式:单机模式,普通集群模式,镜像集群模式
6.10.1 单机模式 一般没人生产用单机模式
6.10.2 普通集群模式
普通集群模式用于提高系统的吞吐量,通过添加节点来线性扩展消息队列的吞吐量。也就是在多台机器上启动多个RabbitMQ实例,而队列queue的消息只会存放在其中一个RabbitMQ实例上,但是每个实例都同步queue元数据(元数据是queue的一些配置信息,通过元数据,可以找到queue所在实例)。消费的时候,如果连接到了另外的实例,那么该实例就会从数据实际所在的实例上的queue拉取消息过来,就是说让集群中多个节点来服务某个queue的读写操作
但普通集群模式的缺点在于:无高可用性,queue所在的节点宕机了,其他实例就无法从那个实例拉去数据;RabbitMQ内部也会产生大量的数据传输。
6.10.3 镜像队列集群模式
镜像队列集群是RabbitMQ真正的高可用模式,集群中一般会包含一个主节点master和若干个从节点slave,如果master由于某种原因失效,那么按照slave加入的时间排序,“资历最老” 的slave会被提升为新的master。镜像队列下,所有的消息只会向master发送,再由master将命令的执行结果广播给slave,所以master与slave节点的状态是相同的。比如,每次写消息到queue时,master会自动将消息同步到各个slave实例的queue;如果消费者与slave建立连接并进行订阅消费,其实质也是从master上获取消息,只不过看似是从slave上消费而已,比如消费者与slave建立了TCP连接并执行Basic.Get的操作,那么也是由slave将Basic.Get请求发往master,再由master准备好数据返回给slave,最后由slave投递给消费者。
从上面可以看出。队列的元素据和消息会存在于多个实例上,也就是说每个RabbitMQ节点都有这个queue的完整镜像,任何一个机器宕机了,其他机器节点还包含了这个queue的完整数据,其它消费者都可以到其他节点上去消费数据。
( 1 )缺点:
① 性能开销大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重
② 非分布式,没有扩展性,如果queue的数据量大到这个机器上的容量无法容纳了,此时该方案就会出现问题了
( 2 )如何开启镜像集群模式呢?
在rabbitmq的管理控制台Admin页面下,新增一个镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
6.11 其他
( 1 )交换器无法根据自身类型和路由键找到符合条件队列时,有哪些处理方式:设置mandatory = true,代表返回消息给生产者;设置mandatory = false,代表直接丢弃
( 2 )消费者得到消息队列中的数据的方式:push 和 pull
( 3 )消息基于什么传输:由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。所以RabbitMQ使用信道channel的方式来传输数据,信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。
( 4 )死信队列DLX:
DLX也是一个正常的Exchange,和一般的Exchange没有任何区别。能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列出现死信(dead message,就是没有任何消费者消费)的时候,RabbitMQ就会自动将这条消息重新发布到Exchange上去,进而被路由到另一个队列。可以监听这个队列中的消息做相应的处理。消息变为死信的几种情况:
消息被拒绝(basic.reject/basic.nack)同时requeue=false(不重回队列)
TTL过期
队列达到最大长度,无法再添加
( 5 )延迟队列:存储对应的延迟消息,当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。再RabbitMQ中并不存在延迟队列,但我们可以通过设置消息的过期时间和死信队列来实现延迟队列,消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。
( 6 )优先级队列:优先级高的队列会先被消费,可以通过x-max-priority参数来实现。但是当消费速度大于生产速度且Broker没有堆积的情况下,优先级显得没有意义。
( 7 )RabbitMQ要求集群中至少有一个磁盘节点,其他节点可以是内存结点,当节点加入或离开集群时,必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列,创建交换器,创建绑定,添加用户,更改权限,添加或删除集群节点。也就是说集群中的唯一磁盘节点崩溃的话,集群仍然可以运行,但直到该节点恢复前,无法更改任何东西。