代码之家  ›  专栏  ›  技术社区  ›  Daniel Eisenreich

多种事件类型的ApacheFlink CEP模式

  •  0
  • Daniel Eisenreich  · 技术社区  · 7 年前

    目前我正在做一个学期项目,我必须认识到三个事件的系列。喜欢 P -> R -> P

    我们有两种不同的事件类型,它们通过同一主题中的Kafka连接器使用。

    我创建了一个名为event的父类,其他两种类型都从中派生。

    Kafka连接器使用EventSchema将JSON反序列化为父类事件。

    val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
    val stream = env.addSource(consumer)
    

    图案如下:

    val pattern = Pattern
      .begin[Event]("before")
      .subtype(classOf[Position])
      .next("recognized")
      .subtype(classOf[Recognized])
      .next("after")
      .subtype(classOf[Position])
    

    当前的问题是,如果我以适当的格式发送三条消息,模式将无法识别。

    我还试过别的……我这样改变了模式:

    val pattern = Pattern
      .begin[Event]("before")
      .where(e => e.getType == "position")
      .next("recognized")
      .where(e => e.getType == "recognition")
      .next("after")
      .where(e => e.getType == "position")
    

    这个模式有效,但稍后我无法将事件类强制转换为位置或识别。

    我在这里想什么?

    1 回复  |  直到 7 年前
        1
  •  1
  •   Jiayi Liao    7 年前

    根据注释,我认为您应该返回子类型实例,而不是事件。以下是我为您提供的示例代码:

    val event = mapper.readValue(bytes, classOf[Event])
    event.getType match {
      case "position" => mapper.readValue(bytes, classOf[Position])
      case "recognition" => mapper.readValue(bytes, classOf[Recognized])
      case _ =>
    }
    

    我成功地从cepitcase.java中的一个测试用例中尝试了这个示例。

    DataStream<Event> input = env.fromElements(
      new Event(1, "foo", 4.0),
      new SubEvent(2, "foo", 4.0, 1.0),
      new SubEvent(3, "foo", 4.0, 1.0),
      new SubEvent(4, "foo", 4.0, 1.0),
      new Event(5, "middle", 5.0)
    );
    
    Pattern<Event, ?> pattern = Pattern.<Event>begin("start").subtype(SubEvent.class)
    .followedByAny("middle").subtype(SubEvent.class)
    .followedByAny("end").subtype(SubEvent.class);