代码之家  ›  专栏  ›  技术社区  ›  codebot

为Kafka消费者编写JUnit测试

  •  2
  • codebot  · 技术社区  · 7 年前

    我有一个卡夫卡消费者正在订阅一个主题。实施效果良好。但是,当尝试为此实现单元测试时,会出现一个问题,因为它是通过 Runnable 接口。

    实施

    @Override
    public void run() {
        kafkaConsumer.subscribe(kafkaTopics);
    
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            Map<String, InventoryStock> skuMap = new LinkedHashMap<>();
    
            try {
                // populating sku map with consumer record
                for (ConsumerRecord<String, String> record : records) {
                    populateMap(skuMap, record.value());
                }
    
                if (MapUtils.isNotEmpty(skuMap)) {
                    // writing sku inventory with populated sku map
                    inventoryDao.updateInventoryTable(INVENTORY_JOB_ID, skuMap);
                }
            } catch (Exception e) {
    
            }
            kafkaConsumer.commitAsync();
        }
    }
    

    我试着用 MockConsumer . 但它需要在实现中分配给使用者。但实现中的使用者不会暴露外部。我试过了。

    @Before
    public void onBefore() {
        MockitoAnnotations.initMocks(this);
    
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        skuInventoryConsumer = new SkuInventoryConsumer(consumerProps);
    
        KafkaConsumer kafkaConsumerMock = mock(KafkaConsumer.class);
    
        Whitebox.setInternalState(skuInventoryConsumer, "LOGGER", LOGGER);
        Whitebox.setInternalState(skuInventoryConsumer, "kafkaConsumer", kafkaConsumerMock);
    
    }
    
    @Test
    public void should_subscribe_on_topic() {
        consumer.assign(Arrays.asList(new TopicPartition("my_topic", 0)));
    
        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
        beginningOffsets.put(new TopicPartition("my_topic", 0), 0L);
        consumer.updateBeginningOffsets(beginningOffsets);
    
        consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 0L, "mykey", "myvalue0"));
        consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 1L, "mykey", "myvalue1"));
        consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 2L, "mykey", "myvalue2"));
        consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 3L, "mykey", "myvalue3"));
        consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 4L, "mykey", "myvalue4"));
    }
    

    因为这是一个 runnable 而且消费者没有暴露出这个测试没有按预期工作。我该怎么修?

    0 回复  |  直到 7 年前
        1
  •  0
  •   Utkarsh M    5 年前

    我建议使用Mockito,如下面的示例

        Consumer<String, String> kafkaConsumerLocal = mock(Consumer.class);
        KafkaConsumer kafkaConsumer = spy(new KafkaConsumer("topic-name));
    
        ReflectionTestUtils.setField(kafkaConsumer, "threadPoolCount", 1);
        ReflectionTestUtils.setField(kafkaConsumer, "consumer", kafkaConsumerLocal);
    
        doNothing().when(kafkaConsumer).runConsumer();
        doNothing().when(kafkaConsumer).addShutDownHook();
        doReturn(kafkaConsumerLocal).when(consumerInit).getKafkaConsumer();
    
    推荐文章