嗨,我有卡夫卡消费者在消费数据。下面的命令给我消费者组命令超时。
kafka-consumer-groups.sh --bootstrap-server b1:9092,b2:9092,b3:9092,b4:9092,b5:9092,b6:9092,b7:9092,b8:9092,b9:9092,b10:9092,b11:9092,b12:9092,b13:9092 --describe --group testgroup
错误:由于消费者组命令在等待组初始化时超时,执行消费者组命令失败:
所有消费者已经使用这些数据超过26小时。由于制作人在这6个小时内不再制作数据,所以出现了超过6个小时的间隙。
我怀疑有一些空闲时间可能切断了消费者群体与消费者之间的联系。所有消费者都在以100毫秒的投票间隔消费
poll(100)
.
这种情况已经观察了3次以上。感谢卡夫卡专家的任何帮助。谢谢
代码:
@Service
public class DedupeConsumerService {
final Logger logger = LoggerFactory.getLogger(DedupeConsumerService.class);
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private PropertyConfig config;
@Autowired
private ApplicationContext applicationContext;
public void consume() {
String topic = config.getDedupServiceConsumerTopic();
String consGroup = config.getDedupServiceConsGroup();
Properties props = new Properties();
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "20000");
props.put("max.poll.records", "10000");
KafkaConsumer<String, AvroSyslogMessage> consumer = new GenericConsumer<String, AvroSyslogMessage>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.AVRODESER, props);
logger.info("Dedupe Kafka Consumer Initialized......");
try {
while (true) {
ConsumerRecords<String, AvroSyslogMessage> records = consumer.poll(100);
if (records.count() > 0) {
}
logger.info("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);
}
}
} catch (Throwable e) {
logger.error("Error occured while processing message", e);
e.printStackTrace();
} finally {
logger.debug("dedupe kafka consume is closing");
consumer.close();
}
}
}
我试着把投票时间定为
Integer.MAX_VALUE
这没有帮助。