代码之家  ›  专栏  ›  技术社区  ›  addictedtohaskell

为什么卡夫卡消费者无视我在汽车上的“最早”指令。抵消重置参数,从而不从绝对第一个事件读取我的主题?

  •  7
  • addictedtohaskell  · 技术社区  · 8 年前

    我想从最早的事件中读一个卡夫卡主题。

    我想做的是从一个主题(从时间上最早的事件)获取所有数据,直到特定日期的事件。

    每个事件的结构都有一个名为 dateCliente 我将其用作筛选事件的阈值。到目前为止,我已经完成了读写操作。我正在写入一个临时拼花文件,将其用作配置单元表的分区。尽管我在 auto.offset.reset 参数,它不会从一开始就读取数据。

    每当我运行代码时,我都会得到从这个日期开始的所有事件。每次我再次执行代码时,它都会在上次代码执行中读取的Kafka事件之后继续读取。

    我用于配置卡夫卡消费者和订阅主题的代码如下:

      // Configurations for kafka consumer
      val conf = ConfigFactory.parseResources("properties.conf")
      val brokersip = conf.getString("enrichment.brokers.value")
      val topics_in = conf.getString("enrichment.topics_in.value")
      //
    
      // Crea la sesion de Spark
      val spark = SparkSession
        .builder()
        .master("yarn")
        .appName("XY")
        .getOrCreate()
    
      spark.sparkContext.setLogLevel("ERROR")
      import spark.implicits._
    
    
      val properties = new Properties
      properties.put("key.deserializer", classOf[StringDeserializer])
      properties.put("value.deserializer", classOf[StringDeserializer])
      properties.put("bootstrap.servers", brokersip)
      properties.put("auto.offset.reset", "earliest")
      properties.put("group.id", "XY")
    
      val consumer = new KafkaConsumer[String, String](properties)
      consumer.subscribe( util.Collections.singletonList("geoevents") )
    

    然而,每当我从命令行创建消费者以读取主题中的数据时,我都会得到前几天的所有事件。 我运行的命令行命令是:

    kafka-console-consumer --new-consumer --topic geoevents --from-beginning --bootstrap-server xx.yy.zz.xx 
    

    你知道为什么我的代码会这样,而忽略我的 "earliest" 在里面 汽车抵消重置 ?

    3 回复  |  直到 8 年前
        1
  •  12
  •   Mickael Maison    8 年前

    这是因为 auto.offset.reset 仅当组没有提交的偏移量时才应用。

    查看消费者配置 documentation :

    如果卡夫卡中没有初始偏移或当前 服务器上不再存在偏移量

    如果要从头开始重新启动,可以:

        2
  •  3
  •   Giorgos Myrianthous    8 年前

    财产 auto.offset.reset 仅当Kafka中存储的给定消费者没有偏移量时才使用。提交记录时,Kafka将记录的偏移量存储在特殊主题中,在下一次运行中,消费者将从上次提交的偏移量中读取主题。要从头开始阅读,你应该打电话 consumer.seekToBeginning 或使用唯一 group.id 所有物

        3
  •  -1
  •   Shrikant Awachar    5 年前

    如果要从所有分区的第一个事件中读取主题,则可以重置偏移量

    kafka-consumer-groups --bootstrap-server <host-ip>:<port> --group <group-name> --reset-offsets --execute --to-earliest --topic <topic>
    
    推荐文章