代码之家  ›  专栏  ›  技术社区  ›  Henrik

Apache Beam每用户会话窗口

  •  0
  • Henrik  · 技术社区  · 6 年前

    我们有一个有用户的应用程序;每个用户每次使用我们的应用程序大约10-40分钟,我想根据已经发生的特定事件(例如,“this user converted”、“this user had a problem last session”、“this user had a successful last session”),计算每个会话发生的事件的分布/发生次数。

    (在这之后,我想每天统计这些更高级别的事件,但这是一个单独的问题)

    docs 似乎面向全局会话窗口,但我想按用户创建它们(这也是一种自然分区)。

    我很难找到关于如何做到这一点的文档(python首选)。你能给我指一下正确的方向吗?

    或者换句话说:

    我所拥有的

    class DebugPrinter(beam.DoFn):
      """Just prints the element with logging"""
      def process(self, element, window=beam.DoFn.WindowParam):
        _, x = element
        logging.info(">>> Received %s %s with window=%s", x['jsonPayload']['value'], x['timestamp'], window)
        yield element
    
    def sum_by_event_type(user_session_events):
      logging.debug("Received %i events: %s", len(user_session_events), user_session_events)
      d = {}
      for key, group in groupby(user_session_events, lambda e: e['jsonPayload']['value']):
        d[key] = len(list(group))
      logging.info("After counting: %s", d)
      return d
    
    # ...
    
    by_user = valid \
      | 'keyed_on_user_id'      >> beam.Map(lambda x: (x['jsonPayload']['userId'], x))
    
    session_gap = 5 * 60 # [s]; 5 minutes
    
    user_sessions = by_user \
      | 'user_session_window'   >> beam.WindowInto(beam.window.Sessions(session_gap),
                                                   timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EOW) \
      | 'debug_printer'         >> beam.ParDo(DebugPrinter()) \
      | beam.CombinePerKey(sum_by_event_type)
    

    INFO:root:>>> Received event_1 2019-03-12T08:54:29.200Z with window=[1552380869.2, 1552381169.2)
    INFO:root:>>> Received event_2 2019-03-12T08:54:29.200Z with window=[1552380869.2, 1552381169.2)
    INFO:root:>>> Received event_3 2019-03-12T08:54:30.400Z with window=[1552380870.4, 1552381170.4)
    INFO:root:>>> Received event_4 2019-03-12T08:54:36.300Z with window=[1552380876.3, 1552381176.3)
    INFO:root:>>> Received event_5 2019-03-12T08:54:38.100Z with window=[1552380878.1, 1552381178.1)
    

    如您所见,Session()窗口不会展开窗口,而是将非常接近的事件分组在一起。。。哪里做错了?

    0 回复  |  直到 6 年前
        1
  •  1
  •   Guillem Xercavins    6 年前

    您可以通过在窗口化之后添加一个groupbykey转换来让它工作。您已经为记录分配了键,但实际上还没有按键将它们组合在一起,而会话窗口(按键工作)不知道这些事件需要合并在一起。

    为了证实这一点,我用一些内存中的虚拟数据做了一个可复制的示例(将Pub/Sub与问题隔离开来,并能够更快地测试它)。所有五个事件都将具有相同的键或 user_id 但它们会依次“到达”彼此相隔1、2、4和8秒。就像我用的那样 session_gap 在5秒钟内,我希望前4个元素合并到同一个会话中。第五场比赛将在第四场比赛后8秒进行,因此必须降级到下一场比赛(差距超过5秒)。数据的创建方式如下:

    data = [{'user_id': 'Thanos', 'value': 'event_{}'.format(event), 'timestamp': time.time() + 2**event} for event in range(5)]
    

    beam.Create(data) beam.window.TimestampedValue 分配“假”时间戳。同样,我们只是用这个来模拟流行为。之后,我们创建键-值对,这要归功于 用户id 菲尔德,我们打开窗户 window.Sessions 我们加上失踪的 beam.GroupByKey() 步骤。最后,我们使用稍微修改的 DebugPrinter :. 管道现在如下所示:

    events = (p
      | 'Create Events' >> beam.Create(data) \
      | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
      | 'keyed_on_user_id'      >> beam.Map(lambda x: (x['user_id'], x))
      | 'user_session_window'   >> beam.WindowInto(window.Sessions(session_gap),
                                                 timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW) \
      | 'Group' >> beam.GroupByKey()
      | 'debug_printer'         >> beam.ParDo(DebugPrinter()))
    

    调试打印机 是:

    class DebugPrinter(beam.DoFn):
      """Just prints the element with logging"""
      def process(self, element, window=beam.DoFn.WindowParam):
        for x in element[1]:
          logging.info(">>> Received %s %s with window=%s", x['value'], x['timestamp'], window)
    
        yield element
    

    INFO:root:>>> Received event_0 1554117323.0 with window=[1554117323.0, 1554117328.0)
    INFO:root:>>> Received event_1 1554117324.0 with window=[1554117324.0, 1554117329.0)
    INFO:root:>>> Received event_2 1554117326.0 with window=[1554117326.0, 1554117331.0)
    INFO:root:>>> Received event_3 1554117330.0 with window=[1554117330.0, 1554117335.0)
    INFO:root:>>> Received event_4 1554117338.0 with window=[1554117338.0, 1554117343.0)
    

    但是在添加它之后,窗口现在可以正常工作了。事件0到3合并在一个扩展的12s会话窗口中。事件4属于单独的5s会话。

    INFO:root:>>> Received event_0 1554118377.37 with window=[1554118377.37, 1554118389.37)
    INFO:root:>>> Received event_1 1554118378.37 with window=[1554118377.37, 1554118389.37)
    INFO:root:>>> Received event_3 1554118384.37 with window=[1554118377.37, 1554118389.37)
    INFO:root:>>> Received event_2 1554118380.37 with window=[1554118377.37, 1554118389.37)
    INFO:root:>>> Received event_4 1554118392.37 with window=[1554118392.37, 1554118397.37)
    

    完整代码 here

    另外两件事值得一提。第一个问题是,即使使用DirectRunner在一台机器上本地运行,记录也可能是无序的(在我的例子中,事件3是在事件2之前处理的)。这样做的目的是模拟分布式处理 here .

    TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'Write Results/Write/WriteImpl/WriteBundles']
    

    从2.10.0/2.11.0 SDK降级到2.9.0。看到这个了吗 answer 例如。