代码之家  ›  专栏  ›  技术社区  ›  Daniel Eisenreich

Flink CEP Pojoerializer错误的多态性解析

  •  0
  • Daniel Eisenreich  · 技术社区  · 7 年前

    目前,当我打印CEP模式的结果时,我有一个非常奇怪的行为。

    数据模型如下:

    • 事件 :(类型:string,时间戳:long)
    • 车辆行驶的 扩展事件:(vehicleid:integer)
    • 位置 扩展车辆相关:(位置:整数,方向:整数)
    • 认识 扩展车辆相关:(pos:integer,id:integer,direction:integer)

    CEP部分如下:

    val pattern = Pattern
      .begin[VehicleRelated]("before")
      .subtype(classOf[Position])
      .next("recognize")
      .subtype(classOf[Recognize])
      .next("after")
      .subtype(classOf[Position])
      .within(Time.seconds(5))
    
    val patternStream = CEP.pattern(actionEvents, pattern)
    val recognitions = patternStream
      .select(pattern => {
        val s = pattern("recognize").head.asInstanceOf[Recognize]
        LOG.debug(s.toString)
        s
      })
    
    recognitions.print("RECO")
    

    日志输出如下:

    14:45:27,286 DEBUG stoff.schnaps.RecognizingJob$ - Recognize(VehicleId: 2, Id: 601, Pos: 1601, Direction: 35, Add: Map())
    RECO:8> Recognize(VehicleId: null, Id: 601, Pos: 1601, Direction: 35, Add: Map())
    

    现在最大的问题是,为什么 车辆标识 返回强制转换对象后属性为空?有什么建议吗?

    更新 我做了一些调查,发现Pojoserializer是问题所在。调用copy函数并在第151行 this.numFields 是错的…计数仅包括识别类本身的属性计数,但不包括继承的类,在本例中是事件和车辆相关的。属性类型和时间戳也为空。

    1 回复  |  直到 7 年前
        1
  •  0
  •   Daniel Eisenreich    7 年前

    问题是Flink内部POJO序列化程序无法正确解决多态性。

    因此,我将Kyro序列化程序设置为默认值:

    val config = env.getConfig
    config.enableForceKryo()