Stream 详解:消息驱动
Spring Cloud Stream 是一个用于构建消息驱动微服务的框架。它屏蔽了底层消息中间件的差异,让开发者只需面向统一的编程模型编程,无需关心底层是 Kafka、RocketMQ 还是 RabbitMQ。
一、核心架构
┌─────────────────────────────────────────────────────────────────┐
│ Spring Cloud Stream 架构 │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Application │ │ Application │ │
│ │ (业务代码) │ │ (业务代码) │ │
│ │ │ │ │ │
│ │ @Bean Supplier │───────→│ @Bean Consumer │ │
│ │ @Bean Function │ │ @Bean Function │ │
│ │ @Bean Consumer │ │ │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ ┌────────▼───────────────────────────▼─────────┐ │
│ │ Binder (适配层) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ RabbitMQ │ │ Kafka │ │ RocketMQ │ ... │ │
│ │ │ Binder │ │ Binder │ │ Binder │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └───────────────────────────────────────────────┘ │
│ │
│ 开发者只操作 Supplier/Function/Consumer,不关心底层 MQ │
└─────────────────────────────────────────────────────────────────┘二、编程模型
Spring Cloud Stream 3.x 引入了函数式编程模型,替代了旧版的 @EnableBinding / @StreamListener:
java
@SpringBootApplication
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
// 生产者:发送消息
@Bean
public Supplier<OrderMessage> orderCreated() {
return () -> {
// 从某个数据源获取消息(如数据库轮询、HTTP 请求触发等)
OrderMessage message = messageQueue.poll();
return message;
};
}
// 消费者:接收消息
@Bean
public Consumer<OrderMessage> orderCreatedConsumer() {
return message -> {
log.info("收到订单创建消息: {}", message.getOrderId());
// 发送短信、推送通知等后续处理
};
}
// 处理函数:接收 + 转换 + 返回
@Bean
public Function<OrderMessage, PaymentMessage> orderToPayment() {
return order -> {
PaymentMessage payment = new PaymentMessage();
payment.setOrderId(order.getOrderId());
payment.setAmount(order.getAmount());
return payment;
};
}
}三、配置示例
yaml
spring:
cloud:
stream:
# 绑定配置
bindings:
# 生产者绑定
orderCreated-out-0: # 方法名-out-索引
destination: order-topic # 对应的 Topic/Exchange
content-type: application/json
# 消费者绑定
orderCreatedConsumer-in-0: # 方法名-in-索引
destination: order-topic
group: order-consumer-group # 消费者组(同一组内竞争消费)
content-type: application/json
# 绑定器配置
binders:
rocketmq-binder:
type: rocketmq
environment:
spring.cloud.stream.rocketmq.binder.name-server: 127.0.0.1:9876四、消息分组与分区
消费者组
同一组内的消费者竞争消费(一条消息只被一个实例消费),不同组的消费者广播消费:
Topic: order-topic
├── Group: sms-sender (消费者组)
│ ├── sms-service-instance-1 ← 竞争关系
│ └── sms-service-instance-2 ← 竞争关系
└── Group: email-sender (消费者组)
└── email-service-instance-1消息分区
保证同一特征的消息发到同一个分区,实现顺序消费:
yaml
spring:
cloud:
stream:
bindings:
orderCreated-out-0:
producer:
partition-key-expression: payload.orderId # 按 orderId 分区
partition-count: 4五、适用场景
| 场景 | 说明 |
|---|---|
| 异步解耦 | 下单后异步发送短信、推送通知,不阻塞主流程 |
| 流量削峰 | 秒杀请求先写入 MQ,后端慢慢消费,避免后端被打垮 |
| 数据同步 | 订单状态变更后,通过 MQ 通知其他服务同步数据 |
| 事件驱动 | 领域事件发布/订阅,实现跨服务的事件通知 |
六、Stream vs 直接使用 MQ SDK
| 维度 | Spring Cloud Stream | 直接使用 MQ SDK |
|---|---|---|
| 学习成本 | 低(Spring 风格) | 高(需学习各 MQ 特有 API) |
| 迁移成本 | 低(改 Binder 即可) | 高(需重写大量代码) |
| 灵活性 | 较低(抽象层限制) | 高(可用全部特性) |
| 适用场景 | 简单的生产/消费 | 需要 MQ 高级特性 |