代码之家  ›  专栏  ›  技术社区  ›  Alejandro Alcalde

Flink中函数调用之间的类状态获取丢失

  •  1
  • Alejandro Alcalde  · 技术社区  · 7 年前

    我有这门课:

    case class IDADiscretizer(
      nAttrs: Int,
      nBins: Int = 5,
      s: Int = 5) extends Serializable {
    
      private[this] val log = LoggerFactory.getLogger(this.getClass)
      private[this] val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))
      private[this] val randomReservoir = SamplingUtils.reservoirSample((1 to s).toList.iterator, 1)
    
      def updateSamples(v: LabeledVector): Vector[IntervalHeapWrapper] = {
        val attrs = v.vector.map(_._2)
        val label = v.label
        // TODO: Check for missing values
        attrs
          .zipWithIndex
          .foreach {
            case (attr, i) =>
              if (V(i).getNbSamples < s) {
                V(i) insertValue attr // insert
              } else {
                if (randomReservoir(0) <= s / (i + 1)) {
                  //val randVal = Random nextInt s
                  //V(i) replace (randVal, attr)
                  V(i) insertValue attr
                }
              }
          }
        V
      }
    
      /**
       * Return the cutpoints for the discretization
       *
       */
      def cutPoints: Vector[Vector[Double]] = V map (_.getBoundaries.toVector)
    
      def discretize(data: DataSet[LabeledVector]): (DataSet[Vector[IntervalHeapWrapper]], Vector[Vector[Double]]) = {
        val r = data map (x => updateSamples(x))
        val c = cutPoints
    
        (r, c)
      }
    }
    

    使用Flink,我想在 discretize ,但似乎存储在 V 得到损失。我必须用吗 Broadcast 就像在 this question ?有没有更好的方法来访问类的状态?

    我试过打电话 cutpoints 有两种方式,一种是:

    def discretize(data: DataSet[LabeledVector]) = data map (x => updateSamples(x))
    

    然后,从外面呼叫:

    val a = IDADiscretizer(nAttrs = 4)
    val r = a.discretize(dataSet)
    r.print
    val cuts = a.cutPoints
    

    这里,切割是空的,所以我试图计算离散化以及内部的切割点。 离散化 :

    def discretize(data: DataSet[LabeledVector]) = {
        val r = data map (x => updateSamples(x))
        val c = cutPoints
    
        (r, c)
      }
    

    像这样使用:

    val a = IDADiscretizer(nAttrs = 4)
    val (d, c) = a.discretize(dataSet)
    c foreach println
    

    但同样的事情发生了。

    最后,我也试着 V 完全公开:

    val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))
    

    仍然是空的

    我做错什么了?

    相关问题:

    回答

    多亏了@tillrohrmann,我终于做到了:

    private[this] def computeCutPoints(x: LabeledVector) = {
        val attrs = x.vector.map(_._2)
        val label = x.label
        attrs
          .zipWithIndex
          .foldLeft(V) {
            case (iv, (v, i)) =>
              iv(i) insertValue v
              iv
          }
      }
    
      /**
       * Return the cutpoints for the discretization
       *
       */
      def cutPoints(data: DataSet[LabeledVector]): Seq[Seq[Double]] =
        data.map(computeCutPoints _)
          .collect
          .last.map(_.getBoundaries.toVector)
    
      def discretize(data: DataSet[LabeledVector]): DataSet[LabeledVector] =
    data.map(updateSamples _)
    

    然后像这样使用:

    val a = IDADiscretizer(nAttrs = 4)
    val d = a.discretize(dataSet)
    val cuts = a.cutPoints(dataSet)
    d.print
    cuts foreach println
    

    我不知道这是不是最好的方法,但至少现在在工作。

    1 回复  |  直到 7 年前
        1
  •  1
  •   Till Rohrmann    7 年前

    Flink的工作方式是用户定义运算符/用户定义函数,这些函数对来自源函数的输入数据进行操作。为了执行一个程序,用户代码被发送到Flink集群并在那里执行。计算结果必须通过接收函数输出到某些存储系统。

    因此,在尝试解决方案时,不可能轻松地混合本地和分布式计算。什么 discretize 是要定义一个 map 转换输入的运算符 DataSet data . 调用后将执行此操作 ExecutionEnvironment#execute DataSet#print 例如。现在,用户代码和的定义 IDADiscretizer 发送到集群,在那里对它们进行实例化。Flink将更新 IDA离散化器 它与您在客户机上拥有的实例不同。

    推荐文章