我有一个卡夫卡消费者正在订阅一个主题。实施效果良好。但是,当尝试为此实现单元测试时,会出现一个问题,因为它是通过
Runnable
接口。
实施
@Override
public void run() {
kafkaConsumer.subscribe(kafkaTopics);
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
Map<String, InventoryStock> skuMap = new LinkedHashMap<>();
try {
// populating sku map with consumer record
for (ConsumerRecord<String, String> record : records) {
populateMap(skuMap, record.value());
}
if (MapUtils.isNotEmpty(skuMap)) {
// writing sku inventory with populated sku map
inventoryDao.updateInventoryTable(INVENTORY_JOB_ID, skuMap);
}
} catch (Exception e) {
}
kafkaConsumer.commitAsync();
}
}
我试着用
MockConsumer
. 但它需要在实现中分配给使用者。但实现中的使用者不会暴露外部。我试过了。
@Before
public void onBefore() {
MockitoAnnotations.initMocks(this);
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
skuInventoryConsumer = new SkuInventoryConsumer(consumerProps);
KafkaConsumer kafkaConsumerMock = mock(KafkaConsumer.class);
Whitebox.setInternalState(skuInventoryConsumer, "LOGGER", LOGGER);
Whitebox.setInternalState(skuInventoryConsumer, "kafkaConsumer", kafkaConsumerMock);
}
@Test
public void should_subscribe_on_topic() {
consumer.assign(Arrays.asList(new TopicPartition("my_topic", 0)));
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition("my_topic", 0), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 0L, "mykey", "myvalue0"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 1L, "mykey", "myvalue1"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 2L, "mykey", "myvalue2"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 3L, "mykey", "myvalue3"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 4L, "mykey", "myvalue4"));
}
因为这是一个
runnable
而且消费者没有暴露出这个测试没有按预期工作。我该怎么修?