有一些服务使用Kafka(
spring-cloud-stream-binder-kafka
)以便交换消息。服务还使用Spring Config Server。一切都很顺利。我想添加云总线配置更新
/monitor
。当我添加
spring-cloud-starter-bus-kafka
dependency-Kafka消费者停止获取消息。
问题是:我做错了什么?对于已经使用Spring Cloud Binder Kafka的项目,用Kafka配置云总线的正确方法是什么?我对卡夫卡没有经验,所以请详细说明。
客户
pom.xml
...
<properties>
<java.version>19</java.version>
<spring-cloud.version>2022.0.4</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-oauth2-authorization-server</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<dependency>
<groupId>com.example</groupId>
<artifactId>common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.jobrunr</groupId>
<artifactId>jobrunr-spring-boot-3-starter</artifactId>
<version>6.2.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
客户
application.yml
spring:
...
cloud:
stream:
bindings:
signup-out-0:
destination: signup
signup-in-0:
destination: signup
group: user1
source: signup
kafka:
bootstrap-servers:
${BROKER_HOST}: ${BROKER_PORT}
...
如果需要详细信息,可提供来源
on GitHub
删除依赖项
弹簧云启动巴士卡夫卡
解决了消费者的问题。但我希望这种依赖应该在不中断服务消息传递的情况下添加云总线配置广播。
更新:
日志差异
无依赖性:
o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel signup-in-0
ConsumerConfig values: client.id = consumer-user1-1
group.id = user1
具有相关性:
Registering MessageChannel springCloudBusInput
client.id = consumer-anonymous.3c0d339a-2479-46bc-b8ae-77d4e20929a2-1
group.id = anonymous.3c0d339a-2479-46bc-b8ae-77d4e20929a2
我的消费者登录0消息通道似乎已被替换
带有springCloudBusInput消息通道。如何配置它以不替换而是添加通道?
我尝试对yml属性进行以下更改:
cloud:
stream:
bindings:
signup-out-0:
destination: signup
signup-in-0:
destination: signup
group: user1
springCloudBusInput:
destination: my-bus-topic
group: bus1
source: signup;my-bus-topic
function:
definition: signup;my-bus-topic
kafka:
streams:
binder:
functions:
my-bus-topic:
applicationId: bus-consumer
signup:
applicationId: signup-consumer
但唯一的结果是将匿名的client.id和user.id重命名为总线。