1.入门案例
生产者(Producer)示例
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setHost("172.25.196.240");//默认为localhost
factory.setPort(5672);//端口默认也是5672
factory.setVirtualHost("/zzk");//设置虚拟机
factory.setUsername("zzk");//用户名 默认guest
factory.setPassword("200079111zzk");//密码 默认guest
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
queue:队列名称
durable:是否持久化,即mq重启之后是否还在
exclusive:
是否独占,即是否只有一个消费者监听
当connection关闭时,是否删除队列
autoDelete:当没有consumer时是否自动删除
arguments:参数
*/
channel.queueDeclare("hello",true,false,false,null);
// 6.发送消息
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
参数:
1.exchange:交换机名称.简单模式下交换机默认
2.routingKey:路由名称
3.props:配置信息
4.body:发送消息数据
*/
channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
// 7.释放资源
channel.close();
connection.close();
消费者(Consumer)示例
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setHost("172.25.196.240");//默认为localhost
factory.setPort(5672);//端口默认也是5672
factory.setVirtualHost("/zzk");//设置虚拟机
factory.setUsername("zzk");//用户名 默认guest
factory.setPassword("200079111zzk");//密码 默认guest
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
queue:队列名称
durable:是否持久化,即mq重启之后是否还在
exclusive:
是否独占,即是否只有一个消费者监听
当connection关闭时,是否删除队列
autoDelete:当没有consumer时是否自动删除
arguments:参数
*/
channel.queueDeclare("hello",true,false,false,null);
// 6.接收消息
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1.queue: 队列名称
2.autoAck: 是否自动确认
3.callback: 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/*回调方法
1.consumerTag: 标识
2.envelope: 获取一些信息,交换机
3.properties: 配置信息
4.body: 数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println(consumerTag);
System.out.println(envelope.getExchange());
System.out.println(envelope.getRoutingKey());
System.out.println(properties);
System.out.println(new String(body));
}
};
channel.basicConsume("hello",true,consumer);
2.Work queues
该模式下,在一个队列中多个消费者之间对于同一个消息的关系是竞争的关系。
对于任务过重或者任务较多的情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功即可。
3.Publish/Subscribe
PubSub模式通过交换机(exchange
)对多个队列同时发送信息,达到一条生产者消息多个消费者共享的效果
生产者代码
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setPort(5672);//端口默认也是5672
factory.setVirtualHost("/zzk");//设置虚拟机
factory.setUsername("zzk");//用户名 默认guest
factory.setPassword("200079111zzk");//密码 默认guest
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建交换机
String exchangeName = "test_fanout";
//广播模式
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
// 6.创建队列
String queueName = "test_fanout_queue";
String queueName2 = "test_fanout_queue2";
channel.queueDeclare(queueName,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
// 7.绑定队列和交换机
channel.queueBind(queueName,exchangeName,"");
channel.queueBind(queueName2,exchangeName,"");
// 8.发送消息
String body = "Hello PubSub";
channel.basicPublish(exchangeName,"",null,body.getBytes());
// 9.释放资源
channel.close();
connection.close();
4.Routing
Routing
模式与Publish/Subscribe
模式的区别是Routing
模式下交换机与队列的绑定不再是任意绑定,而是指定一个RoutingKey
(路由key)
生产者将消息向交换机(Exchange
)发送消息时,携带了RoutingKey
,交换机(Exchange
)根据RoutingKey
发送给RoutingKey
相同的队列
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setPort(5672);//端口默认也是5672
factory.setVirtualHost("/zzk");//设置虚拟机
factory.setUsername("zzk");//用户名 默认guest
factory.setPassword("200079111zzk");//密码 默认guest
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建交换机
String exchangeName = "test_direct";
//广播模式
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
// 6.创建队列
String queueName = "test_direct_queue";
String queueName2 = "test_direct_queue2";
channel.queueDeclare(queueName,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
// 7.绑定队列和交换机
channel.queueBind(queueName,exchangeName,"error");
channel.queueBind(queueName2,exchangeName,"info");
channel.queueBind(queueName2,exchangeName,"error");
// 8.发送消息
String body = "Hello Routing";
channel.basicPublish(exchangeName,"error",null,body.getBytes());
channel.basicPublish(exchangeName,"info",null,body.getBytes());
// 9.释放资源
channel.close();
connection.close();
5.Topics
Topic模式可以实现Pub/Sub
发布与订阅模式和Routing
路由模式的功能,Topic
在配置routingKey
时可以使用通配符,更加灵活
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setPort(5672);//端口默认也是5672
factory.setVirtualHost("/zzk");//设置虚拟机
factory.setUsername("zzk");//用户名 默认guest
factory.setPassword("200079111zzk");//密码 默认guest
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建交换机
String exchangeName = "test_topic";
//广播模式
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
// 6.创建队列
String queueName = "test_topic_queue";
String queueName2 = "test_topic_queue2";
channel.queueDeclare(queueName,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
// 7.绑定队列和交换机
// 例:routingKey 系统的名称.日志级别
// 需求:所有error级别的日志都会走队列1,所有order系统的日志都会走队列2
channel.queueBind(queueName,exchangeName,"#.error");
channel.queueBind(queueName2,exchangeName,"order.*");
// 8.发送消息
String body = "Hello Topic";
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
channel.basicPublish(exchangeName,"order.error",null,body.getBytes());
// 9.释放资源
channel.close();
connection.close();
SpringBoot整合RabbitMQ
发送方
创建交换机和队列
@Configuration
public class RabbitConfig {
public static final String Exchange_name = "boot_topic_exchange";
public static final String Queue_name = "boot_queue2";
//1.交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(Exchange_name).durable(true).build();
}
// 2. Queue 队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(Queue_name).build();
}
// 3.交换机和队列绑定
@Bean
public Binding binding(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("#.error").noargs();
}
}
发送消息
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
rabbitTemplate.convertAndSend(RabbitConfig.Exchange_name,"order.error","SpringBoot-Rabbit_Topics_Test");
}
}
接收方
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
System.out.println(message);
}
}