我有一个spring boot项目,它运行了几个kafka消费者(@kafkalistener),他们使用8个分区来处理汇合的kakfa主题。每个使用者的并发设置为1。这些主题加载了来自文件的大约一百万行消息,用户将成批使用这些消息来验证、处理和更新数据库。
使用者工厂具有以下设置-max.poll.records=10000,fetch.min.bytes=100000,fetch.max.wait.ms=1000,session.timeout.ms=240000。
更新06/04
这是消费者工厂的设置。它是spring-kafka-1.3.1.release。融合的卡夫卡经纪人是版本
@Bean
public ConsumerFactory<String, ListingMessage> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(ListingMessage.class));
}
@Bean(KAFKA_LISTENER_CONTAINER_FACTORY) @Autowired
public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
ConsumerFactory<String, ListingMessage> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(listingConsumerFactory);
factory.setConcurrency(1);
factory.setAutoStartup(false);
factory.setBatchListener(true);
return factory;
}
注:
容器工厂已将自动启动设置为false。这是在加载大文件时手动启动/停止使用者。
运行大约1小时(时间不同)后,即使主题有许多可用消息,消费者也会停止使用其主题中的消息。consume方法中有一个log语句,它停止在日志中打印。
我使用“../kafka consumer group s”命令跟踪消费者的状态,并在一段时间后查看此组中是否没有消费者。
$ ./kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_name
此使用者失败的日志中没有错误。使用者方法包含在try catch块中,因此它将捕获在处理消息期间引发的任何异常。
我们如何设计Spring Kafka消费者,以便它在停止消费时重新启动消费者?有没有一个监听器可以记录消费者停下来的确切时间点?这是因为将并发性设置为1吗?我必须将并发性设置为1的原因是,如果这个消费者有更多的并发性,那么其他消费者的速度就会减慢。