淘先锋技术网

首页 1 2 3 4 5 6 7

1. Exchange作用

在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费。

生产者--(创建消息)-->交换机--(路由键)-->队列--(pull/push)-->消费者

2. Exchange的类型

1)直连交换器: Direct Exchange
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的

什么是路由键?
每个消息都有一个称为路由键(routing key)的属性,它其实就是一个简单的字符串

直连交换机适用场景?
有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

直连交换机不适合的场景
直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,那么直连交换机就不合适了

2)主题交换机: Topic Exchange(发布/订阅)
RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。

主题交换机的routing_key定义规则:
交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开,其中:
*表示一个单词
#表示任意数量(零个或多个)单词

示例:

Q1:  *.TT.*
Q2:   TT.#

如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到 
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到

3)广播交换机: Fanout Exchange
用于广播消息,将发送到Exchange中的消息发送到与该交换器关联的所有队列中。

3. 死信队列

死信队列用于存储没匹配队列的消息,超时没有被处理的消息,如果没有配置死信队列这些消息会被丢弃。即当出现没有匹配的队列的消息,或是超时的消息则将消息转入到死信队列里去,等待重新处理或人工干预。

死信队列的应用场景:

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

4. 队列参数说明

参数作用
exchange交换机名称
type交换机类型
durable是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
autoDelete是否自动删除,如果没有与之绑定的Queue,直接删除
internal是否内置的,如果为true,只能通过Exchange到Exchange
arguments结构化参数

示例:

Exchange.DeclareOk exchangeDeclare(String exchange,
           String type,
           boolean durable,
           boolean autoDelete,
           boolean internal,
           Map<String, Object> arguments) throws IOException;

5. 开发示例

准配虚拟机 开启一个Docker 拉取镜像rabbitmq 运行容器

具体步骤:有道云笔记

需要架包

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <optional>true</optional>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
</dependency>
<dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-rabbit-test</artifactId>
   <scope>test</scope>
</dependency>

配置文件 application 生产与消费者都可用 端口需要改动 还有RabbitMQ服务地址需要改动


server.port=8081
## rabbitmq config
spring.rabbitmq.host=192.168.164.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=xhz
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=my_vhost
## 消费者数量
spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.simple.max-concurrency=10
#消费者每次从队列中获取的消息数量
spring.rabbitmq.listener.simple.prefetch=1
#消费者自动启动
spring.rabbitmq.listener.simple.auto-startup=true
#消费失败,自动重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#启用发送重试
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0

完成演示图

所有消费消息类 模块

5.1 Direct交换机

1)配置直接交换机,队列,并将直接交换机和该队列绑定。(在RabbitMQConfig类中配置,该类使用了@Configuration注解)

package com.rabbitmq.provider.rabbitmqprovider.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;




@Configuration
public class DirectConfig {
@Bean
    public Queue directQueue(){
    return new Queue("zking-direct-queue");
}
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("zking-direct-exchange");
    }

    @Bean
    public Binding directBinding(){
    return BindingBuilder.bind(directQueue()).to(directExchange()).with("zking-direc");
    }

}

2)编写通过直接交换机发送消息的方法

package com.rabbitmq.provider.rabbitmqprovider.web;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

@RestController
public class SenderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/sendDirect")
    public String sendDirect(String routing){
        Map msg=new HashMap<>();
        msg.put("code",200);
        msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
                .ofPattern("yyyy-MM-dd HH:mm:ss")));
        rabbitTemplate.convertAndSend("zking-direct-exchange",routing,msg);
        return "direct success";
    }
  
}

3.测试交换机发送消息

http://localhost:8081/sendDirect?routing=zking-direc

4.消费消息

创建模块

消费消息 我们运行这个项目

package com.zking.rabbitmq.consumer.rabbitmqconsumer.component;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Slf4j
//queues参数指定的是与直接交换机关联的队列名称
@RabbitListener(queues = "zking-direct-queue")
public class DirecReciewer {

    @RabbitHandler
    public void receive(Map msg) {
        log.info("接收通过直接交换机发送的消息: " + msg);
    }
}

打印结果

5.2 主题交换机

1) 配置主题交换机,队列,并将主题交换机和该队列绑定。

第一种方式 ---选一种即可

package com.rabbitmq.provider.rabbitmqprovider.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//@Configuration
public class TopicConfig {
    /**
     * 声明Topic类型的交换机,支持序列化,后面队列进行绑定(topic_queue_q1,topic_queue_q2)
     * @return
     */
    @Bean(name="topicExchange")
    public Exchange topicExchange() {

        return ExchangeBuilder
                .topicExchange("topic_exchange")
                .durable(true)
                .build();
    }


    /**
     * 声明队列,该队列与topic交换机绑定
     * @return
     */
    @Bean(name="topicQueue1")
    public Queue topicQueue1() {
        return QueueBuilder.durable("topic_queue_q1").build();
    }


    /**
     * 声明队列,该队列与topic交换机绑定
     * @return
     */
    @Bean(name="topicQueue2")
    public Queue topicQueue2() {
        return QueueBuilder.durable("topic_queue_q2").build();
    }


    /**
     * 将队列(topic_queue_q1)与topic型交换机进行绑定
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding topicBindingQ1(
            @Qualifier("topicQueue1") Queue queue,
            @Qualifier("topicExchange") Exchange exchange)  {

        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("topic.queue.#")
                .noargs();
    }


    /**
     * 将队列(topic_queue_q2)与topic型交换机进行绑定
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding topicBindingQ2(
            @Qualifier("topicQueue2") Queue queue,
            @Qualifier("topicExchange") Exchange exchange) {

        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("topic.queue.#")
                .noargs();
    }
}

 测试 发送消息

package com.rabbitmq.provider.rabbitmqprovider;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

@SpringBootTest
class RabbitmqProviderApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        Map msg=new HashMap<>();
        msg.put("code",200);
        msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
                .ofPattern("yyyy-MM-dd HH:mm:ss")));
        rabbitTemplate.convertAndSend("topic_exchange","topic.queue.ab",msg);
    }

}

第二种发送消息

package com.rabbitmq.provider.rabbitmqprovider.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicConfig1 {

    @Bean(name="topicQueue1")
    public Queue topicQueue1() {
        return QueueBuilder.durable("topic_queue_q1").build();
    }
    @Bean(name="topicQueue2")
    public Queue topicQueue2() {
        return QueueBuilder.durable("topic_queue_q2").build();
    }

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topic-exchange");
    }
@Bean
    public Binding topicBinding1(  @Qualifier("topicQueue1") Queue queue,
                                   @Qualifier("topicExchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("person.yy");
}

    @Bean
    public Binding topicBinding2(  @Qualifier("topicQueue2") Queue queue,
                                   @Qualifier("topicExchange") TopicExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("person.*");
    }

}

测试发送消息

package com.rabbitmq.provider.rabbitmqprovider.web;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

@RestController
public class SenderController {

    @RequestMapping("/sendTopic")
    public String sendTopic(String routing){
        Map msg=new HashMap<>();
        msg.put("code",200);
        msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
                .ofPattern("yyyy-MM-dd HH:mm:ss")));
        rabbitTemplate.convertAndSend("topic-exchange",routing,msg);
        return "direct success";
    }
  
}

http://localhost:8081/sendTopic?routing=person.y 只有条件为 y 或者 *

http://localhost:8081/sendTopic?routing=person.yy

消费消息

package com.zking.rabbitmq.consumer.rabbitmqconsumer.component;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component

public class TopicReciewer {
   @RabbitListener(queues={"topic_queue_q1"})
    @RabbitHandler
    public void handler(Map map){
       System.out.println(map);

 }

    @RabbitListener(queues={"topic_queue_q2"})
    @RabbitHandler
    public void handler1(Map map){
        System.out.println(map);

    }

}

 

 

5.3 广播交换机 (扇形)

1)配置广播交换机,队列,并将主题交换机和该队列绑定。

package com.rabbitmq.provider.rabbitmqprovider.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout-queue1");
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout-queue2");
    }

    @Bean
    public Queue fanoutQueue3() {
        return new Queue("fanout-queue3");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout-exchange");
    }

    @Bean
    public Binding fanoutBInding1(@Qualifier("fanoutQueue1") Queue queue,
                                  @Qualifier("fanoutExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public Binding fanoutBInding2(@Qualifier("fanoutQueue2") Queue queue,
                                  @Qualifier("fanoutExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public Binding fanoutBInding3(@Qualifier("fanoutQueue3") Queue queue,
                                  @Qualifier("fanoutExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }
}

 

生产的队列

向服务器发送消息

package com.rabbitmq.provider.rabbitmqprovider.web;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

@RestController
public class SenderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendFanout")
    public String sendFanout(){
        Map msg=new HashMap<>();
        msg.put("code",200);
        msg.put("msg","this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
                .ofPattern("yyyy-MM-dd HH:mm:ss")));
        rabbitTemplate.convertAndSend("fanout-exchange",null,msg);
        return "direct success";
    }
}

 

发送与消费

消费消息

package com.zking.rabbitmq.consumer.rabbitmqconsumer.component;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class FanoutRecevier {

    @RabbitListener(queues={"fanout-queue1"})
    @RabbitHandler
    public void fanout(Map map){
        System.out.println(map);

    }

    @RabbitListener(queues={"fanout-queue2"})
    @RabbitHandler
    public void fanout1(Map map){
        System.out.println(map);

    }
    @RabbitListener(queues={"fanout-queue3"})
    @RabbitHandler
    public void fanout2(Map map){
        System.out.println(map);

    }
}