代码之家  ›  专栏  ›  技术社区  ›  Evgeniy Zhurenko

为什么卡夫卡的消费者没有收到经纪人的记录

  •  0
  • Evgeniy Zhurenko  · 技术社区  · 1 年前

    我的消费者没有从主题中获得任何记录。 消费者和生产者的主题是一样的。我可以在broker的UI上看到我的主题。

    对于消费者,我还将group.id设置为与主题名称相同。 我可以在broker UI中看到生产者发送的消息,但消费者无法获得任何记录。

    我的ConsumerReblanceLister似乎不起作用。我看不出有人叫它。

    这是我的制作人:

    public class BotKafkaProducer implements Runnable {
    
        private BotLogger log;
        private Set<NewsLetter> newsLetters;
        private String topicName;
        private Bot bot;
        private News news;
    
        @Setter
        private NewsDao newsDao;
        @Setter
        private AtomicInteger sentNews;
    
        private KafkaProducer<String, String> kafkaProducer;
    
        public BotKafkaProducer(BotLogger log, Set<NewsLetter> newsLetters, String topicName,
                                String pathPropKafka, Bot bot, News news) {
            this.log = log;
            this.newsLetters = newsLetters;
            this.topicName = topicName;
            this.bot = bot;
            this.news = news;
            try {
                Properties props = PropertiesHelper.getProperties(pathPropKafka);
                log.info("Success creating Properties for kafka producer.");
                this.kafkaProducer = new KafkaProducer<String, String>(props);
            } catch (Exception e){
                log.error("Error creating Properties for kafka producer.");
                log.error(e.getMessage());
            }
        }
    
        @Override
        public void run() {
            log.info("producer start, queue name : " + topicName);
            try {
                log.info("Success creating KafkaProducer .");
                for (NewsLetter newsLetter : newsLetters) {
                    String value = new Gson().toJson(newsLetter);
                    ProducerRecord<String, String> producerRecord =
                            new ProducerRecord<>(topicName, value);
    
                    kafkaProducer.send(producerRecord, (recordMetadata, e) -> {
                        // executes every time a record is successfully sent or an exception is thrown
                        if (e == null) {
                            // the record was successfully sent
                            log.info("Received new metadata. \n" +
                                    "Topic:" + recordMetadata.topic() + "\n" +
                                    "Partition: " + recordMetadata.partition() + "\n" +
                                    "Offset: " + recordMetadata.offset() + "\n" +
                                    "Timestamp: " + recordMetadata.timestamp());
                        } else {
                            log.error("Error while producing" + e.getMessage());
                        }
                    });
                    if(sentNews.get()%2000 == 0) {
                        this.kafkaProducer.flush();
                    }
                    sentNews.getAndIncrement();
                }
                this.kafkaProducer.flush();
                log.info(sentNews.get() + " news letters committed.");
                //update news
                log.info("updating news status...");
                news.setBotStatus(bot.toString(), false);
                log.info(bot.toString() + " set bot status " + news.isBotStatus(bot.toString()));
                newsDao.updateStatus(bot.toString(), news);
                log.info("News status updated for " + bot);
            }catch (Exception e){
                if(kafkaProducer != null)
                    this.kafkaProducer.close();
            } finally {
                if(kafkaProducer != null)
                    this.kafkaProducer.close(Duration.ofMillis(500));
            }
        }
    
    }
    

    这是我的消费者:

    public final class BotKafkaConsumer implements Runnable, ExceptionListener {
    
        private final BotLogger log;
        private final String topicName;
    
        private FacebookSender facebookSender;
    
        private TelegramSender telegramSender;
    
        private ViberSender viberSender;
    
        private volatile LocalTime startSendTime;
        private volatile LocalTime endSendTime;
        private AtomicInteger send;
        private AtomicInteger notSend;
        private AtomicInteger producerSent;
        private LocalDateTime startProcess;
    
        private KafkaConsumer<String, String> kafkaConsumer;
    
        private BotKafkaConsumer(Bot bot, BotLogger log, String topicName, String pathPropKafka) {
            this.log = log;
            this.topicName = topicName;
            Properties props;
            try {
                props = PropertiesHelper.getProperties(pathPropKafka);
                props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-id-" + topicName);
                log.info("Success creating Properties for kafka consumer.");
                this.kafkaConsumer = new KafkaConsumer<String, String>(props);
                this.startSendTime = bot.getStartSendTime();
                this.endSendTime = bot.getEndSendTime();
            } catch (Exception e){
                log.error("Error creating Properties for kafka consumer.");
                log.error(e.getMessage());
            }
        }
    
        public BotKafkaConsumer(Bot bot, BotLogger log, String topicName, String pathPropKafka, Object sender,
                                AtomicInteger send, AtomicInteger notSend, AtomicInteger producerSent, LocalDateTime startProcess){
            this(bot, log, topicName, pathPropKafka);
            this.send = send;
            this.notSend = notSend;
            this.producerSent = producerSent;
            this.startProcess = startProcess;
    
            if(sender instanceof FacebookSender)
                this.facebookSender = (FacebookSender)sender;
            if(sender instanceof TelegramSender)
                this.telegramSender = (TelegramSender)sender;
            if(sender instanceof ViberSender)
                this.viberSender = (ViberSender)sender;
        }
    
        public LocalTime getStartSendTime() {
            return startSendTime;
        }
    
        public LocalTime getEndSendTime() {
            return endSendTime;
        }
    
        private boolean isNotTimeToSend() {
            return ((startSendTime != null && startSendTime.isAfter(LocalTime.now())) ||
                    (endSendTime != null && endSendTime.isBefore(LocalTime.now())));
        }
    
        @Override
        public void run() {
            final Thread mainThread = Thread.currentThread();
            // Registering a shutdown hook so we can exit cleanly
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                System.out.println("Starting exit...");
                // Note that shutdownhook runs in a separate thread, so the only thing we can safely do to a consumer is wake it up
                this.kafkaConsumer.wakeup();
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
            try{
                subscribeConsumer();
                if (facebookSender != null) {
                    send(this.kafkaConsumer, facebookSender, log);
                } else if (telegramSender != null) {
                    send(this.kafkaConsumer, telegramSender, log);
                } else if (viberSender != null) {
                    send(this.kafkaConsumer, viberSender, log);
                }
            } catch (Exception e){
                log.error("BotKafkaConsumer error : " + e.getMessage());
            } finally {
                log.warn("BotKafkaConsumer closed .");
                if(this.kafkaConsumer != null) {
                    this.kafkaConsumer.close(Duration.ofMillis(500));
                }
            }
        }
    
        @Override
        public void onException(JMSException e) {
            log.error("consumer : " + this.toString() + " error : " + e.getMessage());
        }
    
        private void send(KafkaConsumer<String, String> consumer, TelegramSender telegramSender, BotLogger log) throws JMSException {
            ConsumerRecords<String, String> records;
            Gson gson = new Gson();
            TelegramDto telegramBody = new TelegramDto();
            // Creating inline Keyboard with one button
            InlineKeyboard inlineKeyboard = null;
    
            synchronized(this) {
                while (true) {
                    records = consumer.poll(Duration.ofMillis(500));
                    records.partitions().forEach(tp -> {
                        log.info("Topic : " + tp.topic() + " TP : " + tp.partition());
                    });
                    for (ConsumerRecord<String, String> record : records) {
                        NewsLetter newsLetter = getNewsLetter(record, gson);
                        String id;
                        News news;
                        try {
                            id = newsLetter.getUserId();
                            news = newsLetter.getNews();
                            String text = replaceForTelegram(news.getText());
                            String buttonText = news.getButtonText();
                            if (StringUtils.isNotBlank(buttonText)) {
                                String buttonUrl = news.getButtonUrl();
                                if (StringUtils.isNotBlank(buttonUrl)) {
                                    inlineKeyboard = new InlineKeyboard().addRow()
                                            .addButton(0, new InlineKeyboardButton(buttonText).setUrl(buttonUrl));
                                    telegramBody.setReplyMarkup(inlineKeyboard);
                                }
                            }
                            if (news.getImageUrl() != null) {
                                String imageUrl = "(" + news.getImageUrl() + ")";
                                text = "[\u200B]" + imageUrl + text;
                            }
                            telegramBody.setChatId(id)
                                    .setText(text)
                                    .setMessageType(TelegramSendType.SEND_MESSAGE);
    
                            boolean result = telegramSender.sendMessage(telegramBody);
                            processResult(result);
                            log.info("send.get() + notSend.get()" + (send.get() + notSend.get()) +
                                    "    producerSent.get()" + producerSent.get());
                            if((send.get() + notSend.get()) >= producerSent.get()) {
                                break;
                            }
                        }catch (Exception e){
                            log.error(e.getMessage());
                        }
                    }
                    if (isNotTimeToSend()) {
                        log.info("is not time to send is : " + isNotTimeToSend());
                        producerSent.set(0);
                        break;
                    }
                    if((send.get() + notSend.get()) >= producerSent.get()) {
                        LocalDateTime endProcess = LocalDateTime.now();
                        log.info("TELEGRAM PRODUCER TOTAL NEWS are sent : " + producerSent.get() +
                                " by Producer: " + topicName);
                        log.info("TELEGRAM CONSUMER TOTAL NEWS are sent : " + send + ", TOTAL NOT sent : " + notSend +
                                " by Consumer: " + topicName);
                        log.info("START consumerNews process : " + startProcess.toString());
                        log.info("END consumeNews process : " + endProcess.toString());
                        log.info("TOTAL TIME FOR SENDING : " + ChronoUnit.MINUTES.between(startProcess, endProcess) +
                                " min " + ChronoUnit.SECONDS.between(startProcess, endProcess) + " sec " +
                                ChronoUnit.MILLIS.between(startProcess, endProcess) + " ms");
                        producerSent.set(0);
                        break;
                    }
                }
            }
        }
    
        private void send(KafkaConsumer<String, String> consumer, ViberSender viberSender, BotLogger log) throws JMSException {
            // some code of process
        }
    
        private void send(KafkaConsumer<String, String> consumer, FacebookSender facebookSender, BotLogger log) throws JMSException {
        // some code of process
        }
    
    
        private void subscribeConsumer() {
    //        this.kafkaConsumer.subscribe(Collections.singletonList(topicName));
            this.kafkaConsumer.subscribe(Collections.singletonList(topicName), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    log.info("Revoke partitions : " + formatPartitions(collection));
                }
    
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    collection.forEach(tp -> {
                        long offset = kafkaConsumer.position(tp);
                        log.info("Topic : " + tp.topic() + " Partition : " + tp.partition() +
                                " Offset : " + offset);
                    });
                    kafkaConsumer.resume(collection);
                }
    
                @Override
                public void onPartitionsLost(Collection<TopicPartition> collection){
                    collection.forEach(tp -> {
                        long offset = kafkaConsumer.position(tp);
                        log.info("Topic : " + tp.topic() + " Partition : " + tp.partition() +
                                " Offset : " + offset);
                    });
                    kafkaConsumer.resume(collection);
                }
            });
        }
    
        private void processResult(boolean result){
            if (result) {
                send.getAndIncrement();
            } else {
                notSend.getAndIncrement();
            }
        }
    
        private NewsLetter getNewsLetter(ConsumerRecord<String, String> record, Gson gson) throws JMSException {
            String text = record.value();
            return gson.fromJson(text, NewsLetter.class);
        }
    
        private String replaceForTelegram(String text) {
            return text.replace("_", "\\_")
                    .replace("`", "\\`")
                    .replace("*", "\\*");
        }
    
        private static List<String> formatPartitions(Collection<TopicPartition> partitions) {
            return partitions.stream().map(topicPartition ->
                    String.format("topic: %s, partition: %s", topicPartition.topic(), topicPartition.partition()))
                    .collect(Collectors.toList());
        }
    }
    

    这是我用来设置经纪人、生产者和消费者的属性:

    # The location of the Kafka server
    bootstrap.servers=50-kafka-a:9092
    #count of replicas of brokers
    replication.factor=3
    #max size of message on broker side
    #max.message.bytes=10485880
    #replica.fetch.max.bytes=10485880
    #max size of message on consumer side
    #max.partition.fetch.bytes=10485880
    broker.id=1
    zookeeper.connect=50-kafka-a:2181
    
    ##### PRODUCER SETTINGS ######
    
    #to show how many confirmations have to get producer - in our case it is all
    acks=all
    #it means that two brokers which are async replicas include leader have to answer that they have data
    #min.insync.replicas=2
    #type of compression message on side of producer
    compression.type=snappy
    #the count of retry for trying to send message
    retries=3
    #timeout of producer after that message will be failed
    delivery.timeout.ms=120000
    #max size of message on the producer side
    #max.request.size=10485880
    # classes for serializing ...
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    
    
    ##### CONSUMER SETTINGS ######
    
    #max size of messages which will be return after calling one call poll()
    max.poll.records=1000
    # Make Kafka keep track of record reads by the consumer
    enable.auto.commit=true
    # The time in milliseconds to Kafka write the offset of the last message read
    auto.commit.interval.ms=500
    #it is need when the start offset is absent or if current offset is not exist more
    auto.offset.reset=earliest
    #auto.offset.reset=latest
    #max size of message on the consumer side
    #fetch.max.bytes=10485880
    # ... and deserializing messages
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    

    并且依赖性是:

    <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>3.4.0</version>
                <exclusions>
                    <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
    0 回复  |  直到 1 年前
        1
  •  0
  •   Evgeniy Zhurenko    1 年前

    我非常感谢@OneCricketer的时间和支持。 问题的原因是在我的卡夫卡属性文件。一些物业或他们中的一些人扰乱了卡夫卡消费者的正常工作。 所以现在我使用kafka的这些属性:

    # The location of the Kafka server
    bootstrap.servers=50-kafka-a:9092
    
    ##### PRODUCER SETTINGS ######
    
    #to show how many confirmations have to get producer - in our case it is all
    acks=all
    # classes for serializing ...
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    
    
    ##### CONSUMER SETTINGS ######
    
    #max size of messages which will be return after calling one call poll()
    max.poll.records=500
    #it is need when the start offset is absent or if current offset is not exist more
    auto.offset.reset=latest
    # ... and deserializing messages
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer