org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1416)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1377)
at basics.KafkaConsumerExample$1.commitOffsets(KafkaConsumerExample.java:74)
at basics.KafkaConsumerExample$1.onPartitionsRevoked(KafkaConsumerExample.java:61)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:465)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:408)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at basics.KafkaConsumerExample.run(KafkaConsumerExample.java:97)
at basics.KafkaConsumerExample.main(KafkaConsumerExample.java:305)
代码:-
public void runConsumerWithRebalanceListener() throws Exception {
final KafkaConsumer<byte[], byte[]> consumer = createConsumer();
final TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener(consumer);
consumer.subscribe(Collections.singletonList(SIMPLE_CONSUMER_TEST_TOPIC), rebalanceListener);
while (true) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
for (final ConsumerRecord<byte[], byte[]> record : records) {
Thread.sleep(1000);
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());
rebalanceListener.addOffset(record.topic(), record.partition(), record.offset());
}
}
}
重新平衡侦听器代码:-
private static class TestConsumerRebalanceListener implements ConsumerRebalanceListener {
final List<Future<Boolean>> futures = new ArrayList<>();
private final KafkaConsumer<byte[], byte[]> consumer;
private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
public TestConsumerRebalanceListener(final KafkaConsumer<byte[], byte[]> consumer) {
this.consumer = consumer;
}
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
System.out.println(" Called onPartitionsRevoked with partitions: " + partitions);
if(!futures.isEmpty())
futures.get(0).cancel(true);
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
public void addOffset(final String topic, final int partition, final long offset) {
currentOffsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset));
}
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
System.out.println("Called onPartitionsAssigned with partitions: " + partitions);
}
}
设置:-
auto.commit.offset=true
max.poll.records = 100 // Waiting for 1 sec for each msg
max.poll.interval.ms = 60000
ConsumerRebalanceListenre在新消费者或消费者死亡时用于重新平衡。