代码之家  ›  专栏  ›  技术社区  ›  Alejandro Alcalde

有效计算Flink中的属性数

  •  2
  • Alejandro Alcalde  · 技术社区  · 7 年前

    我想知道打火石的属性数量 DataSet 第一次尝试是:

    dataset.input
        .map(_.vector.size)
        .reduce((_, b) => b)
        .collect
        .head
    

    然后,看看 Solver 我看到它是这样做的:

    // TODO: Faster way to do this?
    dataset.map(_.vector.size)
        .reduce((a, b) => b)
    

    但是 托多 评论说明了一切。

    因此,我提出了这个实现:

    dataset.first(1)
        .map(_.vector.size)
        .reduce((_, b) => b)
        .collect
    

    是否有更有效的实施?

    1 回复  |  直到 7 年前
        1
  •  2
  •   Fabian Hueske    7 年前

    最快的是

    dataset
      // only forward first vector of each partition
      .mapPartition(in => if (in.hasNext) Seq(in.next) else Seq())
      // move all remaining vectors to a single partition, compute size of the first and forward it
      .mapPartition(in => if (in.hasNext) Seq(in.next.vector.size) else Seq()).setParallelism(1)
      .collect
    

    使用 reduce groupReduce 效率较低,因为可能会在不首先减少数据的情况下将数据移动到单个计算机,并导致每个输入记录的函数调用。 mapPartition 每个分区调用一次,并且只转发输入迭代器的第一个元素。