代码之家  ›  专栏  ›  技术社区  ›  John Doe

在foreachRDD中使用变量时,是否需要使用广播变量?

  •  1
  • John Doe  · 技术社区  · 7 年前

    在我的代码中,我订阅了一个Kafka流,并在一个函数中处理每个RDD:

      val myStream = KafkaUtils.createDirectStream[K, V](
          streamingContext,
          PreferConsistent,
          Subscribe[K, V](topics, consumerConfig)
        )
    
      val myMap: Map[ObjA, ObjB] = getMyMap() // This is the variable I want to access in 'process'
    
      def process(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = record match {
    
         // Code that uses myMap.get("key")
    
      }
    
      myStream.foreachRDD((x, y) => process((x, y)))
    

    我读了这篇关于 Spark and Kafka integration patterns .据我所知, foreachRDD 在驱动程序上本地执行,但任何内部循环都会分布到集群节点。那是不是意味着我 应该 broadcast myMap 出于性能原因?

    2 回复  |  直到 7 年前
        1
  •  1
  •   user9658280 user9658280    7 年前

    这是否意味着出于性能原因,我应该广播myMap?

    嗯:

    因为变量可以跨多个任务重用,所以广播是有价值的,尤其是当数据足够大,会增加大量开销时。

    如果不是,那么最好还是坚持闭包序列化,以提高可读性(我承认这是一个偏好问题)。

        2
  •  1
  •   Alexey Romanov    7 年前

    但是,任何内部循环都会分布到集群节点

    分布式的不是“任何内部循环”,而是RDD上的操作。

    所以这取决于 myMap.get("key") 在内部使用 process 例如,这里有一个广播完全没有意义的例子:

    def process(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = record match {
      case (rdd, _) => rdd.take(10).filter(/* do something using myMap */)
    }