代码之家  ›  专栏  ›  技术社区  ›  wandermonk

卡夫卡消费者长期闲置后不消费

  •  1
  • wandermonk  · 技术社区  · 6 年前

    嗨,我有卡夫卡消费者在消费数据。下面的命令给我消费者组命令超时。

    kafka-consumer-groups.sh --bootstrap-server b1:9092,b2:9092,b3:9092,b4:9092,b5:9092,b6:9092,b7:9092,b8:9092,b9:9092,b10:9092,b11:9092,b12:9092,b13:9092 --describe --group testgroup
    

    错误:由于消费者组命令在等待组初始化时超时,执行消费者组命令失败:

    所有消费者已经使用这些数据超过26小时。由于制作人在这6个小时内不再制作数据,所以出现了超过6个小时的间隙。

    我怀疑有一些空闲时间可能切断了消费者群体与消费者之间的联系。所有消费者都在以100毫秒的投票间隔消费 poll(100) .

    这种情况已经观察了3次以上。感谢卡夫卡专家的任何帮助。谢谢

    代码:

    @Service
    public class DedupeConsumerService {
    
        final Logger logger = LoggerFactory.getLogger(DedupeConsumerService.class);
    
        @Autowired
        private TaskExecutor taskExecutor;
    
        @Autowired
        private PropertyConfig config;
    
        @Autowired
        private ApplicationContext applicationContext;
    
        public void consume() {
    
            String topic = config.getDedupServiceConsumerTopic();
            String consGroup = config.getDedupServiceConsGroup();
    
            Properties props = new Properties();
            props.put("enable.auto.commit", "false");
            props.put("session.timeout.ms", "20000");
            props.put("max.poll.records", "10000");
    
            KafkaConsumer<String, AvroSyslogMessage> consumer = new GenericConsumer<String, AvroSyslogMessage>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.AVRODESER, props);
    
            logger.info("Dedupe Kafka Consumer Initialized......");
    
            try {
                while (true) {
                    ConsumerRecords<String, AvroSyslogMessage> records = consumer.poll(100);
                    if (records.count() > 0) {
    
                        }
    
    
                        logger.info("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);
    
                    }
                }
    
            } catch (Throwable e) {
                logger.error("Error occured while processing message", e);
                e.printStackTrace();
            } finally {
                logger.debug("dedupe kafka consume is closing");
                consumer.close();
            }
    
        }
    
    }
    

    我试着把投票时间定为 Integer.MAX_VALUE 这没有帮助。

    0 回复  |  直到 6 年前
        1
  •  0
  •   Andrey Pokhilko    4 年前

    考虑为消费者配置心跳设置: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_heartbeat.interval.ms

    Heartbeat将确保连接不会自动关闭(就像保持活动状态一样)。