代码之家  ›  专栏  ›  技术社区  ›  Gatusko

从Apache Camel中的特定偏移量开始读取Kafka主题

  •  5
  • Gatusko  · 技术社区  · 7 年前

    我读了Camel Kafka的所有文档,我读到的唯一方法就是这个 from git 以及指定

        public void configure() throws Exception {
                        from("kafka:" + TOPIC
                                     + "?groupId=A"
                                     + "&autoOffsetReset=earliest"             // Ask to start from the beginning if we have unknown offset
                                     + "&consumersCount=2"                     // We have 2 partitions, we want 1 consumer per partition
                                     + "&offsetRepository=#offset")            // Keep the offset in our repository
                                .to("mock:result");
    
    }
    

    但是对于客户的订单,我需要使用Spring,所以我的kafka端点是

    <!--DEFINE KAFKA'S TOPCIS AS ENDPOINT-->
            <endpoint id="tagBlink" uri="kafka:10.0.0.165:9092">
                <property key="topic" value="tagBlink"/>
                <property key="brokers" value="10.0.0.165:9092"/>
                <property key="offsetRepository" value="100"/>
            </endpoint>
    

    但是有个例外

    找不到适合的属性设置程序:OffsetReportory为 没有相同类型的setter方法:java.lang.String或type 可能的转换:没有可从类型转换的类型转换器: 所需类型的java.lang.String: 值为100的org.apache.camel.spi.statepository

    在我当前的配置下,这可能吗如何从特定偏移恢复??

    2 回复  |  直到 6 年前
        1
  •  1
  •   Vasyl Sarzhynskyi    7 年前

    在这之后,我设法解决了这个问题。为此,我遵循了spring bean的创建过程,并检查了 FileStateRepository 我需要一个文件,所以我创建了一个文件Bean并添加为构造函数arg在那之后我加了一个 init-method="doStart" . 此方法加载文件,如果存在,如果没有,它将创建文件。

         <endpoint id="event" uri="kafka:localhost:9092">
            <property key="topic" value="eventTopic4"/>
            <property key="brokers" value="localhost:9092"/>
            <property key="autoOffsetReset" value="earliest"/>
            <property key="offsetRepository" value="#myRepo2"/>
        </endpoint>
    
        <bean id="myFileOfMyRepo" class="java.io.File">
            <constructor-arg type="java.lang.String" value="C:\repoDat\repo.dat"/>
        </bean>
    
        <bean id="myRepo2" class="org.apache.camel.impl.FileStateRepository " factory-method="fileStateRepository" init-method="doStart">
            <constructor-arg ref="myFileOfMyRepo"/>
        </bean>
    

    在这之后,我在吉特看到了骆驼卡夫卡的密码。

        offsetRepository.getState(serializeOffsetKey(topicPartition));
        if (offsetState != null && !offsetState.isEmpty()) {
            // The state contains the last read offset so you need to seek from the next one
            long offset = deserializeOffsetValue(offsetState) + 1;
            log.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
            consumer.seek(topicPartition, offset);
        } 
    

    有了这个,我设法从最后一个偏移量读取我希望camel文档为kafka添加了这些额外的步骤。

        2
  •  -1
  •   TacheDeChoco    7 年前

    重要的词是“repository”而不是“offset”:它不是一个整数值,但它是一个对bean的引用,该bean指定了保存偏移量的位置。

    (非弹簧)示例

    // Create the repository in which the Kafka offsets will be persisted
    FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));
    
    // Bind this repository into the Camel registry
    JndiRegistry registry = new JndiRegistry();
    registry.bind("offsetRepo", repository);
    
    // Configure the camel context
    DefaultCamelContext camelContext = new DefaultCamelContext(registry);
    camelContext.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
                         "&groupId=A" +                            //
                         "&autoOffsetReset=earliest" +             // Ask to start from the beginning if we have unknown offset
                         "&offsetRepository=#offsetRepo")          // Keep the offsets in the previously configured repository
                    .to("mock:result");
        }
    });