一、什么是SpringCloudStream
SpringCloudStream是SpringCloud的一个子项目,他提供了一套更加通用的操作MQ的解决方案
- Destination Binder(目标绑定器) :微服务与消息中间件通信的组件
- Destination Bindings(目标绑定):Binding是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建
- Message:消息
二、为什么选择SpringCloudStream
现在的mq产品主流有4中:rabbitmq,rocketmq,activemq,kafka;有时候很意外的是:学的其中一个,公司用的又是另外一个,导致学习成本提高。有或者是 业务服务使用rabbitmq,而数据库后台使用kafka,整个项目使用了2种mq,可能会导致切换困难,维护成本高等因素。
我们希望能够像学习 hibernate时那样,不管底层是oracle还是mysql或其他数据库,只要给我一组统一的API操作即可;而springcloud-stream就相当于mq的统一接口。
Tip:input表示微服务接收消息,output表示微服务发送消息
二、SpringBoot整合SpringCloudStream
2.1、创建项目
父工程:stream-mq-demo
子工程(消息生产者):producer
子工程(消息消费者):consumer
2.1、pom.xml
配置父工程的pom.xml
- springboot版本
- 各组件的版本号
- 配置SpringCloud和SpringCloudAlibaba
- 添加依赖(RocketMQ、SpringCloudStream等)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
</parent>
<groupId>org.example</groupId>
<artifactId>stream-mq-demo</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>producer</module>
<module>consumer</module>
</modules>
<properties>
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
<spring-cloud-alibaba.version>0.9.0.RELEASE</spring-cloud-alibaba.version>
<java.version>1.8</java.version>
<lombok.version>1.18.8</lombok.version>
<rocketmq.version>2.0.3</rocketmq.version>
</properties>
<dependencies>
<!-- RocketMQ坐标 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- SpringCloudStream坐标 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<!-- SpringWeb坐标 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok坐标 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!--整合spring cloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--整合spring cloud alibaba-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
2.2、开发消息生产者
在子工程producer中进行开发
2.2.1、application.yml
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
#消费者
output:
#用来指定topic
destination: stream-test-topic
server:
port: 8081
2.2.2、启动类
在启动类中配置生产者
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
2.2.3、在controller中测试
package demo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private Source source;
@GetMapping("/send")
public String testSend(){
source.output()
.send(
MessageBuilder
.withPayload("这是一条测试的消息")
.build());
return "消息发送完成,请到MQ控制台查看";
}
}
2.2.4、测试
使用postman发送请求 http://localhost:8081/send
在MQ的控制台可以看到有新的消息
2.3、开发消费者
在子工程consumer中进行开发
2.3.1、application.yml
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
#消息消费者
input:
#用来指定topic,要和消息生产者的的topic匹配
destination: stream-test-topic
#一定要设置,必填项,如果用其他MQ,该属性可以不设置
group: test
server:
port: 8082
2.3.2、启动类
在启动类中配置消费者
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
2.3.3、监听消息
在消费者中监听生产者发送的消息
package demo.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class TestStreamConsumer {
@StreamListener(Sink.INPUT)
public void receive(String messageBody){
log.info("通过stream收到了消息:messageBody={}", messageBody);
// throw new RuntimeException();
}
/**
* 全局异常处理
*
* @param message 发生异常的消息
*/
@StreamListener("errorChannel")
public void error(Message<?> message) {
ErrorMessage errorMessage = (ErrorMessage) message;
log.warn("RocketMQ-SpringCloudStream发生异常,errorMessage={}", errorMessage);
}
}
2.3.4、测试
使用postman发送请求 http://localhost:8081/send
在消费者控制台可以看到该条消息被消费了