您可以通过在窗口化之后添加一个groupbykey转换来让它工作。您已经为记录分配了键,但实际上还没有按键将它们组合在一起,而会话窗口(按键工作)不知道这些事件需要合并在一起。
为了证实这一点,我用一些内存中的虚拟数据做了一个可复制的示例(将Pub/Sub与问题隔离开来,并能够更快地测试它)。所有五个事件都将具有相同的键或
user_id
但它们会依次“到达”彼此相隔1、2、4和8秒。就像我用的那样
session_gap
在5秒钟内,我希望前4个元素合并到同一个会话中。第五场比赛将在第四场比赛后8秒进行,因此必须降级到下一场比赛(差距超过5秒)。数据的创建方式如下:
data = [{'user_id': 'Thanos', 'value': 'event_{}'.format(event), 'timestamp': time.time() + 2**event} for event in range(5)]
beam.Create(data)
beam.window.TimestampedValue
分配“假”时间戳。同样,我们只是用这个来模拟流行为。之后,我们创建键-值对,这要归功于
用户id
菲尔德,我们打开窗户
window.Sessions
我们加上失踪的
beam.GroupByKey()
步骤。最后,我们使用稍微修改的
DebugPrinter
:. 管道现在如下所示:
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
| 'keyed_on_user_id' >> beam.Map(lambda x: (x['user_id'], x))
| 'user_session_window' >> beam.WindowInto(window.Sessions(session_gap),
timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW) \
| 'Group' >> beam.GroupByKey()
| 'debug_printer' >> beam.ParDo(DebugPrinter()))
调试打印机
是:
class DebugPrinter(beam.DoFn):
"""Just prints the element with logging"""
def process(self, element, window=beam.DoFn.WindowParam):
for x in element[1]:
logging.info(">>> Received %s %s with window=%s", x['value'], x['timestamp'], window)
yield element
INFO:root:>>> Received event_0 1554117323.0 with window=[1554117323.0, 1554117328.0)
INFO:root:>>> Received event_1 1554117324.0 with window=[1554117324.0, 1554117329.0)
INFO:root:>>> Received event_2 1554117326.0 with window=[1554117326.0, 1554117331.0)
INFO:root:>>> Received event_3 1554117330.0 with window=[1554117330.0, 1554117335.0)
INFO:root:>>> Received event_4 1554117338.0 with window=[1554117338.0, 1554117343.0)
但是在添加它之后,窗口现在可以正常工作了。事件0到3合并在一个扩展的12s会话窗口中。事件4属于单独的5s会话。
INFO:root:>>> Received event_0 1554118377.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_1 1554118378.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_3 1554118384.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_2 1554118380.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_4 1554118392.37 with window=[1554118392.37, 1554118397.37)
完整代码
here
另外两件事值得一提。第一个问题是,即使使用DirectRunner在一台机器上本地运行,记录也可能是无序的(在我的例子中,事件3是在事件2之前处理的)。这样做的目的是模拟分布式处理
here
.
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'Write Results/Write/WriteImpl/WriteBundles']
从2.10.0/2.11.0 SDK降级到2.9.0。看到这个了吗
answer
例如。