目录
SpringCloud Stream消息驱动为什么被引入
为什么被引用?解决的痛点是什么?
常见的消息中间件:
- ActiveMQ
- RabbitMQ
- RocketMQ
- kafaka
消息驱动概述
是什么
屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型.
官网:https://spring.io/projects/spring-cloud-stream#overview
API:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/
设计思想
标准MQ:
- 生产者/消费者之间靠消息媒介传递信息内容:Message
- 消息必须走特定的通道:消息通道MessageChannel
- 消息通道里的消息如何被消费呢,谁负责收发处理:消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器订阅
为什么用Cloud Stream
stream凭什么可以统一底层差异
Binder
- INPUT对应于消费者
- OUTPUT对应于生产者
Stream中的消息通信方式遵循了发布-订阅模式
Topic主题进行广播,在RabbitMQ就是Exchange,在kafka中就是Topic
Spring Cloud Stream标准流程套路
- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置
- Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入
编码API和常用注解
消息驱动之生产者
新建Module:cloud-stream-rabbitmq-provider8801
pom:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.lzh.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件:
defaultRabbit
会爆红,不影响使用
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 192.168.63.131
port: 5672
username: user
password: password
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka #集群版
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
启动类:
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}
业务类
发送消息接口
public interface IMessageProvider {
String send();
}
发送消息接口实现类:
source
表示:从Stream
发布消息就是输出,生产者
//定义消息的推送管道
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {
//消息的发送通道
@Autowired
private MessageChannel output;
@Override
public String send() {
String s = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(s).build());
System.out.println("*******s:"+s);
return null;
}
}
controller:
@RestController
public class SendMessageController {
@Autowired
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String send(){
return messageProvider.send();
}
}
测试:启动Eureka集群,启动RabbitMQ,启动消息生产者。
注意:启动消息生产者会报Connection refused: connect
的错误。但仔细看日志:
它先是尝试连接本地的RabbitMQ,连接不上,才尝试去连接配置文件里配置的MQ。emmm,这个情况还是第一次见
到RabbitMQ
界面查看,发现订阅了一个Exchanges->studyExchange
,就是在配置文件配置的destination
多次访问接口:http://localhost:8801/sendMessage
消息驱动之消费者
新建Module cloud-stream-rabbitmq-consumer8802
POM:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.lzh.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件:
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 192.168.63.131
port: 5672
username: user
password: password
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka #集群版
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
启动类:
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class, args);
}
}
业务类:
@Component
//定义消息的推送管道
//Sink表示:从Stream接受消息就是输入,消费者
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
//监听队列,用于消费者的队列的消息接受
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消费者1号,接受:"+message.getPayload()+"\t port:"+serverPort);
}
}
测试:
启动Eureka,启动提供者,启动消费
注意:提供者和消费者都会出现异常:Connection refused: connect;不影响使用
访问提供者的接口:http://localhost:8801/sendMessage
提供者:
消费者:
RabbitMQ界面:
消息重复消费
再建一个子模块和8802一模一样,只是端口不一样。还有就是名字不一样。
启动Eureka,服务提供者8801,启动两个消费者8802/8803。
访问提供者接口:http://localhost:8801/sendMessage
提供者日志:
*******s:2a7717b2-70a4-4fef-b108-5841390d3b04
消费者8802:
消费者1号,接受:2a7717b2-70a4-4fef-b108-5841390d3b04 port:8802
消费者8803:
消费者1号,接受:2a7717b2-70a4-4fef-b108-5841390d3b04 port:8803
出现了一个消息,被两个消费者消费的情况!
如何解决?
微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以重复消费消息,同一个组内会发生竞争关系,只有其中一个可以消费。
两个不同的分组,所以出现消息重复消费。默认分组。
分组
如何自定义分组?
修改8802配置文件:
修改8803配置文件:
重启。
到RabbitMQ
里查看自定义的分组:看以看到现在的分组是按照自定义的分组,而不是默认的
因为是不同的组,所以现在还是会出现消息重复消费的问题。
解决消息重复消费问题
将8802/8803分到同一组即可解决问题:
相同就行。
group: TestGroup
访问提供者接口:http://localhost:8801/sendMessage 访问两次
提供者:
*******s:ab9ec6b3-3f24-4997-b39e-b70cb20a84a4
*******s:819095db-b1a7-4675-a02f-1922a8d566d7
消费者8002
消费者1号,接受:819095db-b1a7-4675-a02f-1922a8d566d7 port:8802
消费者8003
消费者1号,接受:ab9ec6b3-3f24-4997-b39e-b70cb20a84a4 port:8803
结果:组内消费者不会重复消费消息,并且是竞争关系,去消费消息。
持久化
演示:把消费者8802/8803停了。
并且把8802的group注释了,8803的group保留
提供者连续访问接口:
重启服务消费者8802/8003:
发现8803启动之后就会去消费提供者的消息,但是8802就不会。
group是具备消息的持久化功能的。可以防止服务器宕机后消息丢失的问题,所以group属性是一定配置的。
分组group:可以解决消息重复消费的问题以及消息持久化。