我正在使用KCL(v2)将Kafka消费者转换为AWS kinisis消费者。在Kafka中,补偿用于帮助消费者跟踪其最近消费的消息。如果我的Kafka应用程序死了,它将使用从重新启动时中断的位置开始的偏移量。
然而,这在动觉上是不一样的。我可以设定
kinesisClientLibConfiguration.withInitialPositionInStream(...)
但唯一的理由是
TRIM_HORIZON
,
LATEST
或
AT_TIMESTAMP
. 如果我的kinisis应用程序死掉了,它将不知道从何处恢复消费。
我的KCL用户非常简单。这个
main()
方法如下:
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("benTestApp",
"testStream", new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString());
config.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
Worker worker = new Worker.Builder()
.recordProcessorFactory(new KCLRecordProcessorFactory())
.config(config)
.build();
以及
RecordProcessor
是一个简单的实现:
@Override
public void initialize(InitializationInput initializationInput) {
LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = processRecordsInput.getRecords();
LOGGER.info("Retrieved {} records", records.size());
records.forEach(r -> LOGGER.info("Record: {}", StandardCharsets.UTF_8.decode(r.getData())));
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
LOGGER.info("Shutting down input");
}
如果我检查相应的DynamoDB表,则
checkpoint
设置为
修剪地平线
,并且在使用记录时不会使用sequenceid更新。
这里有什么解决方案来确保我使用所有消息?