Skip to content

Stream 详解:消息驱动

Spring Cloud Stream 是一个用于构建消息驱动微服务的框架。它屏蔽了底层消息中间件的差异,让开发者只需面向统一的编程模型编程,无需关心底层是 Kafka、RocketMQ 还是 RabbitMQ。

一、核心架构

┌─────────────────────────────────────────────────────────────────┐
│                    Spring Cloud Stream 架构                      │
│                                                                 │
│  ┌──────────────────┐        ┌──────────────────┐               │
│  │   Application    │        │   Application    │               │
│  │   (业务代码)      │        │   (业务代码)      │               │
│  │                  │        │                  │               │
│  │  @Bean Supplier  │───────→│  @Bean Consumer  │               │
│  │  @Bean Function  │        │  @Bean Function  │               │
│  │  @Bean Consumer  │        │                  │               │
│  └────────┬─────────┘        └────────┬─────────┘               │
│           │                           │                         │
│  ┌────────▼───────────────────────────▼─────────┐               │
│  │              Binder (适配层)                   │               │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐       │               │
│  │  │ RabbitMQ │ │  Kafka   │ │ RocketMQ │  ...  │               │
│  │  │  Binder  │ │  Binder  │ │  Binder  │       │               │
│  │  └──────────┘ └──────────┘ └──────────┘       │               │
│  └───────────────────────────────────────────────┘               │
│                                                                 │
│  开发者只操作 Supplier/Function/Consumer,不关心底层 MQ            │
└─────────────────────────────────────────────────────────────────┘

二、编程模型

Spring Cloud Stream 3.x 引入了函数式编程模型,替代了旧版的 @EnableBinding / @StreamListener

java
@SpringBootApplication
public class OrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }

    // 生产者:发送消息
    @Bean
    public Supplier<OrderMessage> orderCreated() {
        return () -> {
            // 从某个数据源获取消息(如数据库轮询、HTTP 请求触发等)
            OrderMessage message = messageQueue.poll();
            return message;
        };
    }

    // 消费者:接收消息
    @Bean
    public Consumer<OrderMessage> orderCreatedConsumer() {
        return message -> {
            log.info("收到订单创建消息: {}", message.getOrderId());
            // 发送短信、推送通知等后续处理
        };
    }

    // 处理函数:接收 + 转换 + 返回
    @Bean
    public Function<OrderMessage, PaymentMessage> orderToPayment() {
        return order -> {
            PaymentMessage payment = new PaymentMessage();
            payment.setOrderId(order.getOrderId());
            payment.setAmount(order.getAmount());
            return payment;
        };
    }
}

三、配置示例

yaml
spring:
  cloud:
    stream:
      # 绑定配置
      bindings:
        # 生产者绑定
        orderCreated-out-0:           # 方法名-out-索引
          destination: order-topic    # 对应的 Topic/Exchange
          content-type: application/json
        # 消费者绑定
        orderCreatedConsumer-in-0:    # 方法名-in-索引
          destination: order-topic
          group: order-consumer-group # 消费者组(同一组内竞争消费)
          content-type: application/json
      # 绑定器配置
      binders:
        rocketmq-binder:
          type: rocketmq
          environment:
            spring.cloud.stream.rocketmq.binder.name-server: 127.0.0.1:9876

四、消息分组与分区

消费者组

同一组内的消费者竞争消费(一条消息只被一个实例消费),不同组的消费者广播消费:

Topic: order-topic
├── Group: sms-sender (消费者组)
│   ├── sms-service-instance-1  ← 竞争关系
│   └── sms-service-instance-2  ← 竞争关系
└── Group: email-sender (消费者组)
    └── email-service-instance-1

消息分区

保证同一特征的消息发到同一个分区,实现顺序消费:

yaml
spring:
  cloud:
    stream:
      bindings:
        orderCreated-out-0:
          producer:
            partition-key-expression: payload.orderId  # 按 orderId 分区
            partition-count: 4

五、适用场景

场景说明
异步解耦下单后异步发送短信、推送通知,不阻塞主流程
流量削峰秒杀请求先写入 MQ,后端慢慢消费,避免后端被打垮
数据同步订单状态变更后,通过 MQ 通知其他服务同步数据
事件驱动领域事件发布/订阅,实现跨服务的事件通知

六、Stream vs 直接使用 MQ SDK

维度Spring Cloud Stream直接使用 MQ SDK
学习成本低(Spring 风格)高(需学习各 MQ 特有 API)
迁移成本低(改 Binder 即可)高(需重写大量代码)
灵活性较低(抽象层限制)高(可用全部特性)
适用场景简单的生产/消费需要 MQ 高级特性