代码之家  ›  专栏  ›  技术社区  ›  Naveen Cotha

如何在一个Spark应用程序中启动多个流查询?

  •  0
  • Naveen Cotha  · 技术社区  · 7 年前

    我构建了一些Spark结构化流式查询来在EMR上运行,它们是长时间运行的查询,并且需要一直运行,因为它们都是ETL类型的查询,当我在EMR上向YARN cluster提交作业时,我可以提交一个Spark应用程序。所以spark应用程序应该有多个流式查询。

    我对如何以编程方式在同一个submit中构建/启动多个流式查询感到困惑。

    例如:我有这个代码:

    case class SparkJobs(prop: Properties) extends Serializable {
      def run() = {
          Type1SparkJobBuilder(prop).build().awaitTermination()
          Type1SparkJobBuilder(prop).build().awaitTermination()
      }
    }
    

    我在我的主课上用 SparkJobs(new Properties()).run()

    当我在spark history server中看到时,只有第一个spark流作业(Type1SparkJob)正在运行。

    在同一个spark中以编程方式触发多个流式查询的推荐方法是什么,我也找不到合适的文档。

    2 回复  |  直到 7 年前
        1
  •  23
  •   Silvio    7 年前

    awaitTermination 在第一个查询中,它将被阻塞,直到它完成,然后再开始第二个查询。所以您希望启动这两个查询,但是使用 StreamingQueryManager.awaitAnyTermination .

    val query1 = df.writeStream.start()
    val query2 = df.writeStream.start()
    
    spark.streams.awaitAnyTermination()
    

    除此之外,默认情况下Spark使用FIFO调度程序。这意味着第一个查询在执行时获取集群中的所有资源。因为您试图同时运行多个查询,所以应该切换到 FAIR scheduler

    如果您有一些查询应该比其他查询拥有更多的资源,那么您还可以调优各个调度程序池。

        2
  •  0
  •   marc_s MisterSmith    5 年前

    如果退出任何查询,请使用 spark.streams.awaitAnyTermination() .

    如果退出所有查询:

    方案1:

    val query1=ds.writeSteam.{...}.start()
    val query2=ds.writeSteam.{...}.start()
    query1.awaitTermination();
    query2.awaitTermination();
    

    方案2:

    val query1=ds.writeSteam.{...}.start()
    val query2=ds.writeSteam.{...}.start()
    spark.streams.active.foreach(x => x.awaitTermination())
    

    方案3:

    while (!spark.streams.active.isEmpty) {
      println("Queries currently still active: " + spark.streams.active.map(x => x.name).mkString(","))
      spark.streams.awaitAnyTermination()
      spark.streams.resetTerminated()
    }
    
        3
  •  -3
  •   dunlu_98k    7 年前

    val query1=ds.writeSteam.{…}.start()

    val query2=ds.writeSteam.{…}.start()

    val query3=ds.writeSteam.{…}.start()

    查询3.awaitTermination()

    AwaitTermination()将阻止进程直到完成,这在流应用程序中永远不会发生,在最后一次查询时调用它可以解决问题