代码之家  ›  专栏  ›  技术社区  ›  Shankar

SpringBootKafkalistener在运行一段时间后停止使用消息

  •  1
  • Shankar  · 技术社区  · 7 年前

    我有一个spring boot项目,它运行了几个kafka消费者(@kafkalistener),他们使用8个分区来处理汇合的kakfa主题。每个使用者的并发设置为1。这些主题加载了来自文件的大约一百万行消息,用户将成批使用这些消息来验证、处理和更新数据库。

    使用者工厂具有以下设置-max.poll.records=10000,fetch.min.bytes=100000,fetch.max.wait.ms=1000,session.timeout.ms=240000。

    更新06/04 这是消费者工厂的设置。它是spring-kafka-1.3.1.release。融合的卡夫卡经纪人是版本

    @Bean
    public ConsumerFactory<String, ListingMessage> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);
    
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
            new JsonDeserializer<>(ListingMessage.class));
    }
    
    @Bean(KAFKA_LISTENER_CONTAINER_FACTORY) @Autowired
    public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
        ConsumerFactory<String, ListingMessage> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(listingConsumerFactory);
        factory.setConcurrency(1);
        factory.setAutoStartup(false);
        factory.setBatchListener(true);
        return factory;
    }
    

    注: 容器工厂已将自动启动设置为false。这是在加载大文件时手动启动/停止使用者。

    运行大约1小时(时间不同)后,即使主题有许多可用消息,消费者也会停止使用其主题中的消息。consume方法中有一个log语句,它停止在日志中打印。

    我使用“../kafka consumer group s”命令跟踪消费者的状态,并在一段时间后查看此组中是否没有消费者。

    $ ./kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group  group_name
    

    此使用者失败的日志中没有错误。使用者方法包含在try catch块中,因此它将捕获在处理消息期间引发的任何异常。

    我们如何设计Spring Kafka消费者,以便它在停止消费时重新启动消费者?有没有一个监听器可以记录消费者停下来的确切时间点?这是因为将并发性设置为1吗?我必须将并发性设置为1的原因是,如果这个消费者有更多的并发性,那么其他消费者的速度就会减慢。

    1 回复  |  直到 7 年前
        1
  •  3
  •   Gary Russell    7 年前

    我刚做了一个30秒的测试 max.poll.interval.ms=30000 ,挂起侦听器,30秒后继续;我在日志中看到…

    2018-06-04 18:35:59.361  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so50687794-0]
    foo
    
    2018-06-04 18:37:07.347 ERROR 4191 --- [      foo-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null
    
    org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722) ~[kafka-clients-1.0.1.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) ~[kafka-clients-1.0.1.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250) ~[kafka-clients-1.0.1.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1329) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1190) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
    
    2018-06-04 18:37:07.350  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Revoking previously assigned partitions [so50687794-0]
    2018-06-04 18:37:07.351  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [so50687794-0]
    2018-06-04 18:37:07.351  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=foo] (Re-)joining group
    2018-06-04 18:37:10.400  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Successfully joined group with generation 15
    2018-06-04 18:37:10.401  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Setting newly assigned partitions [so50687794-0]
    2018-06-04 18:37:10.445  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so50687794-0]
    foo
    

    你可以看到,在重新平衡之后,消费者被重新添加,同样的消息被重新传递;这正是我所期望的。

    我得到了同样的结果;即使是1.3.1。