代码之家  ›  专栏  ›  技术社区  ›  Praytic

即使在Kafka中轮询之后,也不会发生当前的分区分配

  •  0
  • Praytic  · 技术社区  · 6 年前

    我有Java8应用程序与ApacheKafka 2.11-0.10.1.0一起工作。我需要使用 seek poll 来自分区的旧消息。然而,我遇到了一个例外 No current assignment for partition 这是每次我试图 seekByOffset . 这是我的班级,负责 正在将主题标记为指定的时间戳:

    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.CollectionUtils;
    
    import java.time.Instant;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.List;
    import java.util.Map;
    import java.util.function.Function;
    import java.util.regex.Pattern;
    import java.util.stream.Collectors;
    
    /**
     * The main purpose of this class is to move fetching point for each partition of the {@link KafkaConsumer}
     * to some offset which is determined either by timestamp or by offset number.
     */
    public class KafkaSeeker {
        public static final long APP_STARTUP_TIME = Instant.now().toEpochMilli();
    
        private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
        private final KafkaConsumer<String, String> kafkaConsumer;
        private ConsumerRecords<String, String> polledRecords;
    
        public KafkaSeeker(KafkaConsumer<String, String> kafkaConsumer) {
            this.kafkaConsumer = kafkaConsumer;
            this.polledRecords = new ConsumerRecords<>(Collections.emptyMap());
        }
    
        /**
         * For each assigned or subscribed topic {@link org.apache.kafka.clients.consumer.KafkaConsumer#seek(TopicPartition, long)}
         * fetching pointer to the specified {@code timestamp}.
         * If no messages were found in each partition for a topic,
         * then {@link org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(Collection)} will be called.
         *
         * Due to {@link KafkaConsumer#subscribe(Pattern)} and {@link KafkaConsumer#assign(Collection)} laziness
         * method needs to execute dummy {@link KafkaConsumer#poll(long)} method. All {@link ConsumerRecords} which were
         * polled from buffer are swallowed and produce warning logs.
         *
         * @param timestamp is used to find proper offset to seek to
         * @param topics are used to seek only specific topics. If not specified or empty, all subscribed topics are used.
         */
        public Map<TopicPartition, OffsetAndTimestamp> seek(long timestamp, Collection<String> topics) {
            this.polledRecords = kafkaConsumer.poll(0);
            Collection<TopicPartition> topicPartitions;
            if (CollectionUtils.isEmpty(topics)) {
                topicPartitions = kafkaConsumer.assignment();
            } else {
                topicPartitions = topics.stream()
                        .map(it -> {
                            List<Integer> partitions = kafkaConsumer.partitionsFor(it).stream()
                                    .map(PartitionInfo::partition).collect(Collectors.toList());
                            return partitions.stream().map(partition -> new TopicPartition(it, partition));
                        })
                        .flatMap(it -> it)
                        .collect(Collectors.toList());
            }
    
            if (topicPartitions.isEmpty()) {
                throw new IllegalStateException("Kafka consumer doesn't have any subscribed topics.");
            }
    
            Map<TopicPartition, Long> timestampsByTopicPartitions = topicPartitions.stream()
                    .collect(Collectors.toMap(Function.identity(), topicPartition -> timestamp));
            Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
            Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampsByTopicPartitions);
            for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                if (entry.getValue() != null) {
                    LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to [{} offset].",
                            topicPartition.topic(),
                            topicPartition.partition(),
                            beginningOffsets.get(topicPartition),
                            entry.getValue());
                    kafkaConsumer.seek(topicPartition, entry.getValue().offset());
                } else {
                    LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to the end of partition.",
                            topicPartition.topic(),
                            topicPartition.partition());
                    kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
                }
            }
            return offsets;
        }
    
        public ConsumerRecords<String, String> getPolledRecords() {
            return polledRecords;
        }
    }
    

    consumer.subscribe(singletonList(kafkaTopic)); . 当我得到 kafkaConsumer.assignment() 它返回零 TopicPartition 他被指派了。但是如果我指定了主题并得到了它的分区,那么我就有了有效的 主题划分 s、 尽管他们在这方面失败了 寻找

    1 回复  |  直到 6 年前
        1
  •  20
  •   Mickael Maison    6 年前

    可靠地查找和检查当前分配的正确方法是等待 onPartitionsAssigned() 订阅后回调。在新创建(尚未连接)的使用者上,呼叫 poll()

    public static final Map<TopicPartition, Long> offsets = Map.of(new TopicPartition("testtopic", 0), 5L);
    
    public static void main(String args[]) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
    
            consumer.subscribe(Collections.singletonList("testtopic"), new ConsumerRebalanceListener() {
    
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
    
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    System.out.println("Assigned " + partitions);
                    for (TopicPartition tp : partitions) {
                        OffsetAndMetadata oam = consumer.committed(tp);
                        if (oam != null) {
                            System.out.println("Current offset is " + oam.offset());
                        } else {
                            System.out.println("No committed offsets");
                        }
                        Long offset = offsets.get(tp);
                        if (offset != null) {
                            System.out.println("Seeking to " + offset);
                            consumer.seek(tp, offset);
                        }
                    }
                }
            });
    
            for (int i = 0; i < 10; i++) {
                System.out.println("Calling poll");
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100L));
                for (ConsumerRecord<String, String> r : records) {
                    System.out.println("record from " + r.topic() + "-" + r.partition() + " at offset " + r.offset());
                }
            }
        }
    }
    
        2
  •  2
  •   JMess    4 年前
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    // Get topic partitions
    List<TopicPartition> partitions = consumer
                        .partitionsFor(topic)
                        .stream()
                        .map(partitionInfo ->
                                new TopicPartition(topic, partitionInfo.partition()))
                        .collect(Collectors.toList());
    // Explicitly assign the partitions to our consumer
    consumer.assign(partitions);
    //seek, query offsets, or poll
    

    请注意,这将禁用消费者组管理和再平衡操作。如果可能,使用@Mickeel Maison的方法。