代码之家  ›  专栏  ›  技术社区  ›  Jacek Laskowski

为什么id和runId在流查询的每次开始都会更改?

  •  4
  • Jacek Laskowski  · 技术社区  · 8 年前

    id runId .

    enter image description here

    为什么当我 stop start

    // Reading datasets with records from a Kafka topic
    val idsPerBatch = spark.
      readStream.
      format("kafka").
      option("subscribe", "topic1").
      option("kafka.bootstrap.servers", "localhost:9092").
      load.
      withColumn("tokens", split('value, ",")).
      withColumn("seconds", 'tokens(0) cast "long").
      withColumn("event_time", to_timestamp(from_unixtime('seconds))). // <-- Event time has to be a timestamp
      withColumn("id", 'tokens(1)).
      withColumn("batch", 'tokens(2) cast "int").
      withWatermark(eventTime = "event_time", delayThreshold = "10 seconds"). // <-- define watermark (before groupBy!)
      groupBy($"event_time"). // <-- use event_time for grouping
      agg(collect_list("batch") as "batches", collect_list("id") as "ids").
      withColumn("event_time", to_timestamp($"event_time")) // <-- convert to human-readable date
    
    // start the query and display results to console
    import scala.concurrent.duration._
    import org.apache.spark.sql.streaming.{OutputMode, Trigger}
    val sq = idsPerBatch.
      writeStream.
      format("console").
      option("truncate", false).
      trigger(Trigger.ProcessingTime(5.seconds)).
      outputMode(OutputMode.Append). // <-- Append output mode
      start
    
    1 回复  |  直到 7 年前
        1
  •  4
  •   Jacek Laskowski    8 年前

    id 作为检查点元数据的一部分,在运行中是持久的。

    ConsoleSink (即。 console 输出),其中 doesn't support checkpointing 并且不提供检查点位置 身份证件

    返回此查询的唯一id,该id在重新启动期间持续存在 第一次开始,每次都是一样的

    runId

    runId . 因此,每次从检查点重新启动查询时, runIds .

    推荐文章