Skip to content

SpringCloud_组件基本使用_Stream

Stream 组件是用于封装消息队列来使用的,Bus这个组件一般会结合 Stream 使用;

看一般性文档解释,主要是围绕 Bus 可以做到配置的动态刷新、事件的传递。具体使用的时候可以看一下相关内容。

下面内容是引用 ChatGpt 的内容,分别是 Stream 和 Bus 的基本使用。

1、Spring Cloud Stream的使用

Spring Cloud Stream 是一个用于构建消息驱动微服务架构的框架,它基于Spring Boot和Spring Integration,提供了一种简单的方式来实现各个微服务组件之间的消息通信。

以下是Spring Cloud Stream的基本使用步骤:

  1. 引入依赖

在项目的 pom.xml 文件中,添加 Spring Cloud Stream 的依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-{binder}</artifactId>
</dependency>

{binder} 可以是不同的消息中间件的名称,比如 RabbitMQ、Kafka 等。

  1. 配置消息中间件

application.propertiesapplication.yml 文件中配置消息中间件的连接信息,以 RabbitMQ 为例:

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: my-output-topic
          binder: rabbit # RabbitMQ的binder
      rabbit:
        bindings:
          output:
            exchangeType: topic
            routingKeyExpression: 'my-output-topic'
  1. 发送消息

在应用程序中使用 Source 接口定义消息发送的输出通道:

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {

    String OUTPUT = "output";

    @Output(OUTPUT)
    MessageChannel output();
}

然后在需要发送消息的地方注入 MySource,并使用 output() 方法发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class MessageSenderService {

    @Autowired
    private MySource mySource;

    public void sendMessage(String payload) {
        Message<String> message = MessageBuilder.withPayload(payload).build();
        mySource.output().send(message);
    }
}
  1. 接收消息

在应用程序中使用 @StreamListener 注解定义消息接收的输入通道:

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class MessageReceiverService {

    @StreamListener(MySource.INPUT)
    public void receiveMessage(@Payload String payload) {
        System.out.println("Received message: " + payload);
        // 处理接收到的消息
    }
}
  1. 运行时绑定

Spring Cloud Stream 会在运行时将消息通道绑定到消息中间件上,使得消息的发送和接收可以与底层的消息中间件解耦。根据配置的 {binder} 不同,可以支持 RabbitMQ、Kafka、ActiveMQ 等多种消息中间件。

通过以上步骤,你就可以使用 Spring Cloud Stream 在微服务中实现消息的发送和接收,从而构建消息驱动的微服务架构。

2、SpringCloud Bus 的使用

Spring Cloud Bus是一个用于在分布式系统中传播事件的工具,它建立在Spring Cloud Stream之上。它允许微服务架构中的组件之间进行消息推送,实现配置的动态刷新、事件的传递等功能。

以下是Spring Cloud Bus的基本使用步骤:

  1. 引入依赖

在项目的pom.xml文件中添加Spring Cloud Bus的依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

上述示例中使用了AMQP(Advanced Message Queuing Protocol)作为消息传递的中间件,你也可以选择其他消息中间件,如Kafka等。

  1. 配置消息中间件

application.propertiesapplication.yml中配置消息中间件的连接信息:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  1. 配置Bus

在微服务的application.propertiesapplication.yml中,添加以下配置:

spring:
  cloud:
    bus:
      enabled: true

4.发送消息

在任意微服务中,可以使用Spring Cloud Bus发送消息来触发事件。可以使用HTTP POST请求向/actuator/bus-refresh端点发送刷新请求:

curl -X POST http://localhost:8080/actuator/bus-refresh

这样就会触发Spring Cloud Bus发送一个刷新事件,然后各个微服务都会接收到这个事件,从而触发配置的刷新操作。

  1. 接收消息

在微服务中,可以使用@RefreshScope注解标记一个类,使得该类中的配置在接收到刷新事件时能够重新加载:

import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RefreshScope
public class MyController {

    @Value("${my.property}")
    private String myProperty;

    @GetMapping("/property")
    public String getProperty() {
        return myProperty;
    }
}

上述示例中,@RefreshScope注解使得MyController类中的配置在接收到刷新事件时会重新加载。

通过这些步骤,你就可以使用Spring Cloud Bus实现在分布式系统中传播事件,例如动态刷新配置、传递消息等。

to be contined.....