淘先锋技术网

首页 1 2 3 4 5 6 7

一、什么是SpringCloudStream

SpringCloudStream是SpringCloud的一个子项目,他提供了一套更加通用的操作MQ的解决方案

在这里插入图片描述

  1. Destination Binder(目标绑定器) :微服务与消息中间件通信的组件
  2. Destination Bindings(目标绑定):Binding是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建
  3. Message:消息

二、为什么选择SpringCloudStream

​ 现在的mq产品主流有4中:rabbitmq,rocketmq,activemq,kafka;有时候很意外的是:学的其中一个,公司用的又是另外一个,导致学习成本提高。有或者是 业务服务使用rabbitmq,而数据库后台使用kafka,整个项目使用了2种mq,可能会导致切换困难,维护成本高等因素。

我们希望能够像学习 hibernate时那样,不管底层是oracle还是mysql或其他数据库,只要给我一组统一的API操作即可;而springcloud-stream就相当于mq的统一接口。

在这里插入图片描述

Tip:input表示微服务接收消息,output表示微服务发送消息

二、SpringBoot整合SpringCloudStream

2.1、创建项目

父工程:stream-mq-demo

子工程(消息生产者):producer

子工程(消息消费者):consumer

2.1、pom.xml

配置父工程的pom.xml

  • springboot版本
  • 各组件的版本号
  • 配置SpringCloud和SpringCloudAlibaba
  • 添加依赖(RocketMQ、SpringCloudStream等)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
    </parent>

    <groupId>org.example</groupId>
    <artifactId>stream-mq-demo</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>producer</module>
        <module>consumer</module>
    </modules>

    <properties>
        <spring-cloud.version>Greenwich.SR1</spring-cloud.version>
        <spring-cloud-alibaba.version>0.9.0.RELEASE</spring-cloud-alibaba.version>
        <java.version>1.8</java.version>
        <lombok.version>1.18.8</lombok.version>
        <rocketmq.version>2.0.3</rocketmq.version>
    </properties>

    <dependencies>
        <!-- RocketMQ坐标 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        <!-- SpringCloudStream坐标 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
        <!-- SpringWeb坐标 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- lombok坐标 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <!--整合spring cloud-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--整合spring cloud alibaba-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>

2.2、开发消息生产者

在子工程producer中进行开发

2.2.1、application.yml

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        #消费者
        output:
          #用来指定topic
          destination: stream-test-topic
server:
  port: 8081

2.2.2、启动类

在启动类中配置生产者

package demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

2.2.3、在controller中测试

package demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {
    @Autowired
    private Source source;
    
    @GetMapping("/send")
    public String testSend(){
        source.output()
                .send(
                        MessageBuilder
                                .withPayload("这是一条测试的消息")
                                .build());
        return "消息发送完成,请到MQ控制台查看";
    }
}

2.2.4、测试

使用postman发送请求 http://localhost:8081/send

在MQ的控制台可以看到有新的消息

在这里插入图片描述

2.3、开发消费者

在子工程consumer中进行开发

2.3.1、application.yml

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        #消息消费者
        input:
          #用来指定topic,要和消息生产者的的topic匹配
          destination: stream-test-topic
          #一定要设置,必填项,如果用其他MQ,该属性可以不设置
          group: test
server:
  port: 8082

2.3.2、启动类

在启动类中配置消费者

package demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

2.3.3、监听消息

在消费者中监听生产者发送的消息

package demo.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class TestStreamConsumer {

    @StreamListener(Sink.INPUT)
    public void receive(String messageBody){
        log.info("通过stream收到了消息:messageBody={}", messageBody);
//        throw new RuntimeException();
    }

    /**
     * 全局异常处理
     *
     * @param message 发生异常的消息
     */
    @StreamListener("errorChannel")
    public void error(Message<?> message) {
        ErrorMessage errorMessage = (ErrorMessage) message;
        log.warn("RocketMQ-SpringCloudStream发生异常,errorMessage={}", errorMessage);
    }
}

2.3.4、测试

使用postman发送请求 http://localhost:8081/send

在消费者控制台可以看到该条消息被消费了

在这里插入图片描述