我必须使用Kafka流获取交易信息,从交易结果主题中绘制每个特定持续时间的烛台图,它有交易id、金额、价格、交易时间,关键是交易id,每个记录都完全不同,
我要做的是根据事务结果进行计算,以获得
每个持续时间的最高价格、最低价格、开盘价、收盘价、tx close\u时间,并使用它创建烛台图。
我使用kafka stream窗口完成了以下操作:
final KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> transactionKStream = builder.stream(keySerde, valueSerde, srcTopicName);
KTable<Windowed<String>, InfoRecord> kTableRecords= groupedStream.aggregate(
InfoRecord::new, /* initializer */
(k, v, aggregate) -> aggregate.add(k,v), /* adder */
TimeWindows.of(TimeUnit.SECONDS.toMillis(5)).until(TimeUnit.SECONDS.toMillis(5)),
infoRecordSerde);
正如在源主题中一样,每个记录都有txId作为键,并且txId从不重复,因此,在进行聚合时,结果K-table将具有与K-stream相同的记录,但我可以使用该窗口获取
具体持续时间。
我认为kTableRecords应该包含特定持续时间内的所有记录,即5秒,
因此,我可以在5秒内循环查看所有记录,以获得高、低、打开(窗口中第一个记录价格)、关闭(窗口中最后一个记录价格)、关闭时间(窗口中最后一个记录的发送时间),
因此,我只能获取此窗口的一条记录,并将此结果输出到sink kafka主题,但我不知道如何在这些窗口持续时间内执行此操作。
我认为代码如下:
K表格记录。foreach((键,值)->{
//TODO:在此处添加逻辑
})
IDE显示此foreach已被弃用,
但我不知道如何区分此窗口或下一窗口中的记录
或者我需要一个窗口记录保留时间,直到在上面的示例代码中使用为止。
我已经在这方面挣扎了好几天,但我仍然不知道如何正确地完成我的工作,感谢任何人的帮助,让我走上正确的道路,谢谢
卡夫卡版本为:0.11.0.0
更新时间:
根据Michal在帖子中的提示,我修改了代码
聚合器实例中的高、低、开、关价格计算,
但结果使我对SPCIIC中的每个不同键都有了认识
窗口中,该逻辑为该键创建一个新实例,并仅对当前键执行添加执行,而不与其他键的值交互,
我真正想要的是计算
每个记录的高、低、开盘、收盘价格都有不同的键
窗口持续时间,所以我不需要为每个键创建新实例,
应该为每个特定窗口只创建一个聚合实例
并对工期中的所有记录值进行计算,每个工期窗口得到一组(高、低、开盘、收盘价)。
我读过这个主题:
如何在连续增加的时间窗口上计算窗口聚合?
所以,我怀疑,我不确定,这是否是我的正确解决方案,谢谢。
顺便说一下,K线是指烛台图。
更新II:
根据您的更新,我创建了如下所示的代码:
KStream<String, JsonNode> transactionKStream = builder.stream(keySerde, valueSerde, srcTopicName);
KGroupedStream<String, JsonNode> groupedStream = transactionKStream.groupBy((k,v)-> "constkey", keySerde, valueSerde);
KTable<Windowed<String>, MarketInfoRecord> kTable =
groupedStream.aggregate(
MarketInfoRecord::new, /* initializer */
(k, v, aggregate) -> aggregate.add(k,v), /* adder */
TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).until(TimeUnit.SECONDS.toMillis(100)),
infoRecordSerde, "test-state-store");
KStream<String, MarketInfoRecord> newS = kTable.toStream().map(
(k,v) -> {
System.out.println("key: "+k+", value:"+v);
return KeyValue.pair(k.window().start() + "_" + k.window().end(), v);
}
);
newS.to(Serdes.String(),infoRecordSerde, "OUTPUT_NEW_RESULT");
如果在执行组时使用静态字符串作为键,那么在执行窗口聚合时,
只为窗口创建了一个聚合器实例,我们可以得到(高、低、打开、关闭)
对于该窗口中的所有记录,但是
由于所有记录的键a相同,此窗口将更新多次,并为一个窗口生成多条记录,如下所示:
key: [constkey@1521304400000/1521304500000], value:MarketInfoRecord{high=11, low=11, openTime=1521304432205, closeTime=1521304432205, open=11, close=11, count=1}
key: [constkey@1521304600000/1521304700000], value:MarketInfoRecord{high=44, low=44, openTime=1521304622655, closeTime=1521304622655, open=44, close=44, count=1}
key: [constkey@1521304600000/1521304700000], value:MarketInfoRecord{high=44, low=33, openTime=1521304604182, closeTime=1521304622655, open=33, close=44, count=2}
key: [constkey@1521304400000/1521304500000], value:MarketInfoRecord{high=22, low=22, openTime=1521304440887, closeTime=1521304440887, open=22, close=22, count=1}
key: [constkey@1521304600000/1521304700000], value:MarketInfoRecord{high=55, low=55, openTime=1521304629943, closeTime=1521304629943, open=55, close=55, count=1}
key: [constkey@1521304800000/1521304900000], value:MarketInfoRecord{high=77, low=77, openTime=1521304827181, closeTime=1521304827181, open=77, close=77, count=1}
key: [constkey@1521304800000/1521304900000], value:MarketInfoRecord{high=77, low=66, openTime=1521304817079, closeTime=1521304827181, open=66, close=77, count=2}
key: [constkey@1521304800000/1521304900000], value:MarketInfoRecord{high=88, low=66, openTime=1521304817079, closeTime=1521304839047, open=66, close=88, count=3}
key: [constkey@1521304800000/1521304900000], value:MarketInfoRecord{high=99, low=66, openTime=1521304817079, closeTime=1521304848350, open=66, close=99, count=4}
key: [constkey@1521304800000/1521304900000], value:MarketInfoRecord{high=100.0, low=66, openTime=1521304817079, closeTime=1521304862006, open=66, close=100.0, count=5}
所以我们需要按照您在“38945277/7897191”中发布的链接进行重复数据消除,对吗?
所以,我想知道我是否可以做如下事情:
KGroupedStream<String, JsonNode> groupedStream = transactionKStream.groupByKey();
// as key was unique txId, so this group is just for doing next window operation, the record number is not changed.
KTable<Windowed<String>, MarketInfoRecord> kTable =
groupedStream.SOME_METHOD(
// just use some method to deliver the records in different windows,
// no sure if this is possible?
TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).until(TimeUnit.SECONDS.toMillis(100))
// use until here to let the record purged if out of the window,
// please correct me if i am wrong?
我们可以将基于时间的输入记录序列转换为几个窗口组,
每个组都有窗口(或使用窗口开始时间、结束时间组合为字符串键),
因此,对于每个组,键是不同的,但它有几个记录,这些记录具有不同的值,
然后我们进行聚合(此处无需使用窗口聚合),计算出的值
从每个键:值对,即我们可以得到一个结果记录,
并且下一个窗口有不同的基于窗口的键名,所以这样,执行下游应该有多个线程(
随着关键点的变化)