代码之家  ›  专栏  ›  技术社区  ›  Ben Watson

AWS kinisis-如何从上一个检查点恢复消费

  •  0
  • Ben Watson  · 技术社区  · 7 年前

    我正在使用KCL(v2)将Kafka消费者转换为AWS kinisis消费者。在Kafka中,补偿用于帮助消费者跟踪其最近消费的消息。如果我的Kafka应用程序死了,它将使用从重新启动时中断的位置开始的偏移量。

    然而,这在动觉上是不一样的。我可以设定 kinesisClientLibConfiguration.withInitialPositionInStream(...) 但唯一的理由是 TRIM_HORIZON , LATEST AT_TIMESTAMP . 如果我的kinisis应用程序死掉了,它将不知道从何处恢复消费。

    我的KCL用户非常简单。这个 main() 方法如下:

    KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("benTestApp",
                "testStream", new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString());
    config.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
    
    Worker worker = new Worker.Builder()
                .recordProcessorFactory(new KCLRecordProcessorFactory())
                .config(config)
                .build();
    

    以及 RecordProcessor 是一个简单的实现:

    @Override
    public void initialize(InitializationInput initializationInput) {
        LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
    }
    
    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        List<Record> records = processRecordsInput.getRecords();
        LOGGER.info("Retrieved {} records", records.size());
        records.forEach(r -> LOGGER.info("Record: {}", StandardCharsets.UTF_8.decode(r.getData())));
    }
    
    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        LOGGER.info("Shutting down input");
    }
    

    如果我检查相应的DynamoDB表,则 checkpoint 设置为 修剪地平线 ,并且在使用记录时不会使用sequenceid更新。

    这里有什么解决方案来确保我使用所有消息?

    1 回复  |  直到 7 年前
        1
  •  2
  •   Ben Watson    7 年前

    正如@kdgregory所指出的,KCL要求用户设置自己的检查点工作代码:

    @Override
    public void initialize(InitializationInput initializationInput) {
        LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
    }
    
    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        List<Record> records = processRecordsInput.getRecords();
        LOGGER.info("Retrieved {} records", records.size());
        records.forEach(r -> LOGGER.info("Record with sequenceId {} at date {} : {}", r.getSequenceNumber(),
                r.getApproximateArrivalTimestamp(), StandardCharsets.UTF_8.decode(r.getData())));
        try {
            processRecordsInput.getCheckpointer().checkpoint();
        } catch (InvalidStateException | ShutdownException e) {
            LOGGER.error("Unable to checkpoint");
        }
    }
    
    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        LOGGER.info("Shutting down input");
        try {
            shutdownInput.getCheckpointer().checkpoint();
        } catch (InvalidStateException | ShutdownException e) {
            LOGGER.error("Unable to checkpoint");
        }
    }