代码之家  ›  专栏  ›  技术社区  ›  dev_jongtae

我用多线程发送Kafka生产者的消息,但是消息丢失了

  •  1
  • dev_jongtae  · 技术社区  · 8 年前

    我正在使用Kafka Producer,并将数据发送到由Kafka集群中的复制因子3和分区1组成的主题“测试主题”(由三个代理组成)。

    我创造了五线。每个线程发送10000条消息(每个消息大小为4000字节)。

    我预计最新的补偿是50000,但实际上是44993。

    大约有5000条消息丢失。

    为什么会发生信息丢失?在我的代码下面…(卡夫卡1.1.0版)

    Kafkamessagesender.类

    public class KafkaMessageSender {
        private final static Logger logger = 
     LoggerFactory.getLogger(KafkaMessageSender.class);
        private Properties props;
        private KafkaProducer<String, String> producer;
        private String topic;
        private AtomicInteger count;
    
        public KafkaMessageSender(AtomicInteger count, String bootstrapUrls, String topic) {
            logger.info("KafkaMessageSender initializing...");
            this.topic = topic;
            this.count = count;
            props = new Properties();
            props.put(ProducerConfig.ACKS_CONFIG, "1");
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //16384
            props.put(ProducerConfig.RETRIES_CONFIG, 3);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<>(props);
            logger.info("KafkaMessageSender initializing end");     
    
        }
    
        public void sendMessages() {
            producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K)); //Messages.MSG_4K indicates 4000bytes message
            count.getAndIncrement();
            logger.info("count : "+count.get());
        }
    }
    

    Kafkamessagesendermain.class.类

    public class KafkaMessageSenderMain {
    
        private final static Logger logger = LoggerFactory.getLogger(KafkaMessageSenderMain.class);
    
        final static String bootstrap_url = "ism1.solulink.co.kr:9092,ism2.solulink.co.kr:9092,ism3.solulink.co.kr:9092";
        final static String topic = "test-topic"; //topic name
        final static AtomicInteger count = new AtomicInteger(0);
        final static int MAX_LOOP = 10000; //message sending count
        final static int MAX_THREAD = 5;  //created number of threads
    
        public static void main(String[] args) {        
            long startTime = System.currentTimeMillis();
            ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD);
            for(int i = 0; i < MAX_THREAD; i++) {
                executorService.execute(() ->{
                        KafkaMessageSender sender = new KafkaMessageSender(count, bootstrap_url, topic);
                        for(int j = 0; j < MAX_LOOP; j++) {
                            sender.sendMessages(); //send message
                        }
                });
            }
    
            executorService.shutdown();
            try {
                boolean flag = executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 
                long endTime = System.currentTimeMillis();
                long procTime = (endTime - startTime);
                logger.info("all Threads is shutdown? : "+flag);
                logger.info("processTime : " + ((double)procTime/(double)1000L)+"sec");
            } catch (InterruptedException e) {
                logger.error("awaitTermination exception",e);
            }
        }
    }
    

    结果

    Result image

    1 回复  |  直到 8 年前
        1
  •  1
  •   Quang Vien    8 年前

    您能修改并运行下面的代码来查看错误是什么吗?

    producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K), new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null)
                    e.printStackTrace();
            }
    });
    
    推荐文章