代码之家  ›  专栏  ›  技术社区  ›  Andrey P.

添加spring-cloud-starter-bus-Kafka依赖项后,Kafka消费者没有收到消息

  •  0
  • Andrey P.  · 技术社区  · 1 年前

    有一些服务使用Kafka( spring-cloud-stream-binder-kafka )以便交换消息。服务还使用Spring Config Server。一切都很顺利。我想添加云总线配置更新 /monitor 。当我添加 spring-cloud-starter-bus-kafka dependency-Kafka消费者停止获取消息。 问题是:我做错了什么?对于已经使用Spring Cloud Binder Kafka的项目,用Kafka配置云总线的正确方法是什么?我对卡夫卡没有经验,所以请详细说明。

    客户 pom.xml

    ...
        <properties>
            <java.version>19</java.version>
            <spring-cloud.version>2022.0.4</spring-cloud.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-security</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.security</groupId>
                <artifactId>spring-security-oauth2-authorization-server</artifactId>
                <version>1.0.2</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.security</groupId>
                <artifactId>spring-security-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-jpa</artifactId>
            </dependency>
            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>42.5.4</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.26</version>
            </dependency>
            <dependency>
                <groupId>com.example</groupId>
                <artifactId>common</artifactId>
                <version>${project.version}</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.jobrunr</groupId>
                <artifactId>jobrunr-spring-boot-3-starter</artifactId>
                <version>6.2.3</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-config</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-bus-kafka</artifactId>
            </dependency>
        </dependencies>
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    

    客户 application.yml

    spring:
    ...
    
      cloud:
        stream:
          bindings:
            signup-out-0:
              destination: signup
            signup-in-0:
              destination: signup
              group: user1
          source: signup
    
      kafka:
        bootstrap-servers:
          ${BROKER_HOST}: ${BROKER_PORT}
    
    ...
    
    

    如果需要详细信息,可提供来源 on GitHub

    删除依赖项 弹簧云启动巴士卡夫卡 解决了消费者的问题。但我希望这种依赖应该在不中断服务消息传递的情况下添加云总线配置广播。

    更新:

    日志差异

    无依赖性:

    o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel signup-in-0
    ConsumerConfig values: client.id = consumer-user1-1
    group.id = user1
    

    具有相关性:

    Registering MessageChannel springCloudBusInput
    client.id = consumer-anonymous.3c0d339a-2479-46bc-b8ae-77d4e20929a2-1
    group.id = anonymous.3c0d339a-2479-46bc-b8ae-77d4e20929a2
    

    我的消费者登录0消息通道似乎已被替换 带有springCloudBusInput消息通道。如何配置它以不替换而是添加通道?

    我尝试对yml属性进行以下更改:

      cloud:
    stream:
      bindings:
        signup-out-0:
          destination: signup
        signup-in-0:
          destination: signup
          group: user1
        springCloudBusInput:
          destination: my-bus-topic
          group: bus1
    
      source: signup;my-bus-topic
      function:
        definition: signup;my-bus-topic
      kafka:
        streams:
          binder:
            functions:
              my-bus-topic:
                applicationId: bus-consumer
              signup:
                applicationId: signup-consumer
    

    但唯一的结果是将匿名的client.id和user.id重命名为总线。

    0 回复  |  直到 1 年前
    推荐文章