代码之家  ›  专栏  ›  技术社区  ›  user1578872

Kafka consumerRebalanceListener不工作

  •  0
  • user1578872  · 技术社区  · 7 年前

    org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1416)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1377)
        at basics.KafkaConsumerExample$1.commitOffsets(KafkaConsumerExample.java:74)
        at basics.KafkaConsumerExample$1.onPartitionsRevoked(KafkaConsumerExample.java:61)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:465)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:408)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
        at basics.KafkaConsumerExample.run(KafkaConsumerExample.java:97)
        at basics.KafkaConsumerExample.main(KafkaConsumerExample.java:305)
    

    代码:-

    public void runConsumerWithRebalanceListener() throws Exception {
            final KafkaConsumer<byte[], byte[]> consumer = createConsumer();
            final TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener(consumer);
    
            consumer.subscribe(Collections.singletonList(SIMPLE_CONSUMER_TEST_TOPIC), rebalanceListener);
    
            while (true) {
                final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
                for (final ConsumerRecord<byte[], byte[]> record : records) {
                    Thread.sleep(1000);
                    System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(),
                                record.offset(), record.key(), record.value());
    
    
                    rebalanceListener.addOffset(record.topic(), record.partition(), record.offset());
                }
    
            }
        }
    

    重新平衡侦听器代码:-

    private static class TestConsumerRebalanceListener implements ConsumerRebalanceListener {
    
            final List<Future<Boolean>> futures = new ArrayList<>();
            private final KafkaConsumer<byte[], byte[]> consumer;
            private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    
            public TestConsumerRebalanceListener(final KafkaConsumer<byte[], byte[]> consumer) {
                this.consumer = consumer;
            }
    
            @Override
            public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
                System.out.println(" Called onPartitionsRevoked with partitions: " + partitions);
                if(!futures.isEmpty())
                    futures.get(0).cancel(true);
                consumer.commitSync(currentOffsets);
                currentOffsets.clear();
            }
    
            public void addOffset(final String topic, final int partition, final long offset) {
                currentOffsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset));
    
            }
    
            @Override
            public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
                System.out.println("Called onPartitionsAssigned with partitions: " + partitions);
            }
    
        }
    

    设置:-

    auto.commit.offset=true
    max.poll.records = 100 // Waiting for 1 sec for each msg
    max.poll.interval.ms = 60000
    

    ConsumerRebalanceListenre在新消费者或消费者死亡时用于重新平衡。

    0 回复  |  直到 6 年前
    推荐文章