代码之家  ›  专栏  ›  技术社区  ›  wipman

如何处理从状态中删除的数据

  •  2
  • wipman  · 技术社区  · 8 年前

    我有一个 sessionization 用例。我坚持我的训练 in-memory 幸亏 mapWithstate() 并为每个传入日志更新它们。当会话结束时,用特定日志发出信号,我想检索它并将其从我的 State .

    我遇到的问题是我无法检索和删除( remove() )每次结束时我的会话 batch ,因为检索发生在 updateFunction() 并且它内部的删除,即一旦删除,就无法检索会话,如果会话结束,就不应该再有日志了 key s

    我仍然可以检索已结束的会话,但“死机”会话的数量将增加,从而产生整体异常(“ 状态 -溢出”),如果不选中,将威胁系统本身。此解决方案是不可接受的。

    因为这似乎是一个常见的用例,我想知道是否有人想出了一个解决方案?


    编辑

    示例代码如下:

    def mapWithStateContainer(iResultParsing: DStream[(String, SessionEvent)]) = {
      val lStateSpec = StateSpec.function(stateUpdateFunction _).timeout(Seconds(TIMEOUT)
    
      val lResultMapWithState: DStream[(String, Session)] = 
            iResultParsing.mapWithState(lStateSpec).stateSnapshots()
    
      val lClosedSession: DStream[(String, Session)] = 
            lResultMapWithState.filter(_._2.mTimeout)
    
        //ideally remove here lClosedSession from the state
    }
    
    private def stateUpdateFunction(iKey: String,
                                    iValue: Option[SessionEvent],
                                    iState: State[Session]): Option[(String, Session)] = {
      var lResult = None: Option[(String, Session)]
    
      if (iState.isTimingOut()) {
        val lClosedSession = iState.get()
        lClosedSession.mTimeout = true
    
        lResult = Some(iKey, lClosedSession)
      } else if (iState.exists) {
          val lUpdatedSession = updateSession(lCurrentSession, iValue)
          iState.update(lUpdatedSession)
    
          lResult = Some(iKey, lUpdatedSession)
    
          // we wish to remove the lUpdatedSession from the state once retrieved with lResult
          /*if (lUpdatedSession.mTimeout) {
             iState.remove()
             lResult = None
           }*/
        } else {
           val lInitialState = initSession(iValue)
           iState.update(lInitialState)
    
           lResult = Some(iKey, lInitialState)
        }
    
        lResult
    }
    
    private def updateSession(iCurrentSession: Session, 
                              iNewData: Option[SessionEvent]): Session = {
        //user disconnects manually
        if (iNewData.get.mDisconnection) {
            iCurrentSession.mTimeout = true
        }
    
        iCurrentSession
    }
    
    1 回复  |  直到 8 年前
        1
  •  0
  •   Yuval Itzchakov    8 年前

    而不是打电话 MapWithStateRDD.stateSnapshot ,可以将更新后的状态作为 返回值 您的 mapWithState 活动这样,最终状态在有状态DStream之外始终可用。

    这意味着您可以:

    else if (iState.exists) {
      val lUpdatedSession = updateSession(lCurrentSession, iValue)
      iState.update(lUpdatedSession)
    
      if (lUpdatedSession.mTimeout) {
        iState.remove()
      }
    
      Some(iKey, lUpdatedSession)
    }
    

    现在将图形更改为:

    val lResultMapWithState = iResultParsing
                                .mapWithState(lStateSpec)
                                .filter { case (_, session) => session.mTimeout }
    

    现在发生的情况是,该州正在被取消 内部 ,但因为您要从您的 StateSpec 功能,您可以在外部进行进一步处理。