Contents

spring-cloud-stream 3.x 开发

spring-cloud-stream 3.x 开发

基本使用

基本配置

  1. 依赖

    1
    
    implementation 'org.springframework.cloud:spring-cloud-stream:3.2.3'
    
  2. 配置

    1
    2
    3
    4
    5
    6
    
    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: username
        password: password
    

生产者

配置

注意:analysePdf-out-0,analysePdf发送消息,接收消息都需要使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
spring:
  cloud:
    stream:
      rabbit:
        bindings:
          analysePdf-out-0:
            producer:
              destination: analysePdf
              content-type: application/json
      bindings:
        analysePdf-out-0:
          destination: analysePdf

代码

1
2
3
4
5
6
7
    @Autowired
    private StreamBridge streamBridge;

    public void sendMethod() {
        //与配置文件中相同
        streamBridge.send("analysePdf-out-0", "pdf"+i);
    }

消费者

配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
spring:
  cloud:
    stream:
      rabbit:
        bindings:
          analysePdf-in-0:
            consumer:
              destination: analysePdf
              content-type: application/json
      bindings:
        analysePdf-in-0:
          destination: analysePdf

代码

注意:方法名需要和配置中心中相同

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@Configuration
public class Test {
    @Bean("analysePdf")
  	//方法名与配置中相同
    public Consumer<String> analysePdf() {
        return str -> {
            System.out.println("analysePdf: " + str);
        };
    }
}

消息分组(多实例下避免重复消费)

消费者配置:group: in,配置后同组消费者下只能消费一次

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
spring:
  cloud:
    stream:
      rabbit:
        bindings:
          analysePdf-in-0:
            consumer:
              destination: analysePdf
              content-type: application/json
      bindings:
        analysePdf-in-0:
        	group: in
          destination: analysePdf

ack配置

消费者配置:acknowledge-mode: auto

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
spring:
  cloud:
    stream:
      rabbit:
        bindings:
          analysePdf-in-0:
            consumer:
              destination: analysePdf
              content-type: application/json
              #ack模式
              acknowledge-mode: auto
              #重试次数
              max-attempts: 5
      bindings:
        analysePdf-in-0:
        	group: in
          destination: analysePdf

多消息使用

注意配置,其他配置重复一遍

1
2
3
4
spring:
	cloud:
  	function:
    	definition: 

完整配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
spring:
  cloud:
    function:
      definition: analysePdf;jsh
    stream:
      rabbit:
        bindings:
          analysePdf-in-0:
            consumer:
              destination: analysePdf
              content-type: application/json
              #ack模式
              acknowledge-mode: auto
              #重试次数
              max-attempts: 5
          analysePdf-out-0:
            producer:
              destination: analysePdf
              content-type: application/json
          jsh-in-0:
            consumer:
              destination: jsh
              content-type: application/json
          jsh-out-0:
            producer:
              destination: jsh
              content-type: application/json
      bindings:
        analysePdf-in-0:
          group: in
          destination: analysePdf
        analysePdf-out-0:
          destination: analysePdf
        jsh-in-0:
          group: in
          destination: jsh
        jsh-out-0:
          destination: jsh

Spring Cloud Stream 官网

源码地址