代码之家  ›  专栏  ›  技术社区  ›  dvlcis

如何使用卡夫卡流窗口为烛台图生成创建一条记录

  •  1
  • dvlcis  · 技术社区  · 8 年前

    我必须使用Kafka流获取交易信息,从交易结果主题中绘制每个特定持续时间的烛台图,它有交易id、金额、价格、交易时间,关键是交易id,每个记录都完全不同, 我要做的是根据事务结果进行计算,以获得 每个持续时间的最高价格、最低价格、开盘价、收盘价、tx close\u时间,并使用它创建烛台图。 我使用kafka stream窗口完成了以下操作:

    final KStreamBuilder builder = new KStreamBuilder();
    KStream<String, JsonNode> transactionKStream = builder.stream(keySerde, valueSerde, srcTopicName);
    KTable<Windowed<String>, InfoRecord> kTableRecords= groupedStream.aggregate(
     InfoRecord::new, /* initializer */
     (k, v, aggregate) -> aggregate.add(k,v), /* adder */
     TimeWindows.of(TimeUnit.SECONDS.toMillis(5)).until(TimeUnit.SECONDS.toMillis(5)),
     infoRecordSerde);
    

    正如在源主题中一样,每个记录都有txId作为键,并且txId从不重复,因此,在进行聚合时,结果K-table将具有与K-stream相同的记录,但我可以使用该窗口获取 具体持续时间。

    我认为kTableRecords应该包含特定持续时间内的所有记录,即5秒, 因此,我可以在5秒内循环查看所有记录,以获得高、低、打开(窗口中第一个记录价格)、关闭(窗口中最后一个记录价格)、关闭时间(窗口中最后一个记录的发送时间), 因此,我只能获取此窗口的一条记录,并将此结果输出到sink kafka主题,但我不知道如何在这些窗口持续时间内执行此操作。

    我认为代码如下:

    K表格记录。foreach((键,值)->{

    //TODO:在此处添加逻辑

    })

    IDE显示此foreach已被弃用,

    但我不知道如何区分此窗口或下一窗口中的记录 或者我需要一个窗口记录保留时间,直到在上面的示例代码中使用为止。

    我已经在这方面挣扎了好几天,但我仍然不知道如何正确地完成我的工作,感谢任何人的帮助,让我走上正确的道路,谢谢

    卡夫卡版本为:0.11.0.0

    更新时间:

    根据Michal在帖子中的提示,我修改了代码 聚合器实例中的高、低、开、关价格计算, 但结果使我对SPCIIC中的每个不同键都有了认识 窗口中,该逻辑为该键创建一个新实例,并仅对当前键执行添加执行,而不与其他键的值交互, 我真正想要的是计算 每个记录的高、低、开盘、收盘价格都有不同的键 窗口持续时间,所以我不需要为每个键创建新实例, 应该为每个特定窗口只创建一个聚合实例 并对工期中的所有记录值进行计算,每个工期窗口得到一组(高、低、开盘、收盘价)。 我读过这个主题: 如何在连续增加的时间窗口上计算窗口聚合? 所以,我怀疑,我不确定,这是否是我的正确解决方案,谢谢。

    顺便说一下,K线是指烛台图。


    更新II:

    根据您的更新,我创建了如下所示的代码:

    KStream<String, JsonNode> transactionKStream = builder.stream(keySerde, valueSerde, srcTopicName);
    
    KGroupedStream<String, JsonNode> groupedStream = transactionKStream.groupBy((k,v)-> "constkey", keySerde, valueSerde);
    
    KTable<Windowed<String>, MarketInfoRecord> kTable =
            groupedStream.aggregate(
            MarketInfoRecord::new, /* initializer */
            (k, v, aggregate) -> aggregate.add(k,v), /* adder */
            TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).until(TimeUnit.SECONDS.toMillis(100)),
            infoRecordSerde, "test-state-store");
    
    KStream<String, MarketInfoRecord> newS = kTable.toStream().map(
            (k,v) -> {
                System.out.println("key: "+k+",  value:"+v);
                return KeyValue.pair(k.window().start() + "_" + k.window().end(), v);
    
            }
    
    );
    
    newS.to(Serdes.String(),infoRecordSerde, "OUTPUT_NEW_RESULT");
    

    如果在执行组时使用静态字符串作为键,那么在执行窗口聚合时, 只为窗口创建了一个聚合器实例,我们可以得到(高、低、打开、关闭) 对于该窗口中的所有记录,但是 由于所有记录的键a相同,此窗口将更新多次,并为一个窗口生成多条记录,如下所示:

    key: [constkey@1521304400000/1521304500000],  value:MarketInfoRecord{high=11, low=11, openTime=1521304432205, closeTime=1521304432205, open=11, close=11, count=1}
    key: [constkey@1521304600000/1521304700000],  value:MarketInfoRecord{high=44, low=44, openTime=1521304622655, closeTime=1521304622655, open=44, close=44, count=1}
    key: [constkey@1521304600000/1521304700000],  value:MarketInfoRecord{high=44, low=33, openTime=1521304604182, closeTime=1521304622655, open=33, close=44, count=2}
    key: [constkey@1521304400000/1521304500000],  value:MarketInfoRecord{high=22, low=22, openTime=1521304440887, closeTime=1521304440887, open=22, close=22, count=1}
    key: [constkey@1521304600000/1521304700000],  value:MarketInfoRecord{high=55, low=55, openTime=1521304629943, closeTime=1521304629943, open=55, close=55, count=1}
    key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=77, low=77, openTime=1521304827181, closeTime=1521304827181, open=77, close=77, count=1}
    key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=77, low=66, openTime=1521304817079, closeTime=1521304827181, open=66, close=77, count=2}
    key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=88, low=66, openTime=1521304817079, closeTime=1521304839047, open=66, close=88, count=3}
    key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=99, low=66, openTime=1521304817079, closeTime=1521304848350, open=66, close=99, count=4}
    key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=100.0, low=66, openTime=1521304817079, closeTime=1521304862006, open=66, close=100.0, count=5}
    

    所以我们需要按照您在“38945277/7897191”中发布的链接进行重复数据消除,对吗?

    所以,我想知道我是否可以做如下事情:

    KGroupedStream<String, JsonNode> groupedStream = transactionKStream.groupByKey();
    // as key was unique txId, so this group is just for doing next window operation, the record number is not changed.
    
    KTable<Windowed<String>, MarketInfoRecord> kTable =
       groupedStream.SOME_METHOD(
    // just use some method to deliver the records in different windows,
    // no sure if this is possible?
    TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).until(TimeUnit.SECONDS.toMillis(100))
    // use until here to let the record purged if out of the window, 
    // please correct me if i am wrong?
    

    我们可以将基于时间的输入记录序列转换为几个窗口组, 每个组都有窗口(或使用窗口开始时间、结束时间组合为字符串键), 因此,对于每个组,键是不同的,但它有几个记录,这些记录具有不同的值, 然后我们进行聚合(此处无需使用窗口聚合),计算出的值 从每个键:值对,即我们可以得到一个结果记录, 并且下一个窗口有不同的基于窗口的键名,所以这样,执行下游应该有多个线程( 随着关键点的变化)

    1 回复  |  直到 8 年前
        1
  •  2
  •   Michal Borowiecki    8 年前

    我建议您不要在foreach中进行所有计算,而是直接在聚合器中进行计算,即在加法器中:

    (k, v, aggregate) -> aggregate.add(k,v), /* adder */
    

    add方法可以完成您提到的所有事情(我建议您首先将JsonNode映射到Java对象,我们称之为事务),请考虑以下伪代码:

    private int low = Integer.MAX; // whatever type you use to represent prices
    private int high = Integer.MIN;
    private long openTime = Long.MAX; // whatever type you use to represent time
    private long closeTime = Long.MIN;
    ...
    public InfoRecord add(String key, Transaction tx) {
      if(tx.getPrice() > this.high) this.high = tx.getPrice();
      if(tx.getPrice() < this.low) this.low = tx.getPrice();
      if(tx.getTime() < this.openTime) {
        this.openTime = tx.getTime();
        this.open = tx.getPrice();
      }
      if(tx.getTime() > this.closeTime) {
        this.closeTime = tx.getTime();
        this.close = tx.getPrice();
      }
      return this;
    }
    

    请记住,实际上,您可能会在每个窗口的输出上获得多条记录,因为这些窗口可以多次更新(它们永远不是最终的),这里将对此进行更详细的解释: https://stackoverflow.com/a/38945277/7897191

    我不知道什么是K线,但如果您想要多个持续时间不断增加的窗口,则可以概述该模式 here

    更新时间: 要聚合窗口中的所有记录,只需在进行聚合之前将键更改为某个静态值。因此,要创建分组流,您可以使用 groupBy(KeyValueMapper) ,类似于:

    KGroupedStream<String, JsonNode> groupedStream = transactionKStream.groupBy( (k, v) -> ""); // give all records the same key (empty string)
    

    请注意,这将导致重新分区(因为分区由键决定,我们正在更改键),并且下游执行将变为单线程(因为现在只有一个分区)。

    推荐文章