代码之家  ›  专栏  ›  技术社区  ›  totooooo Richard Dally

apachebeam with Python:如何计算会话窗口中的最小值,并将其应用于所有相关的pcollection

  •  1
  • totooooo Richard Dally  · 技术社区  · 7 年前

    我正在使用apachebeam的pythonsdk来处理字典,它表示流分析命中率。由于会话窗口的存在,点击率得以聚合。我的数据流真正要做的就是应用这些会话窗口,并为所有相关的点击分配一个会话ID。

    作为会话ID,我发现应该使用第一次命中的时间戳(与每个用户的cookie ID结合使用)。这是我的管道:

    msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
        topic='projects/{PROJECT_ID}/topics/{SOURCE_TOPIC}'.format(
            PROJECT_ID=PROJECT_ID, SOURCE_TOPIC=SOURCE_TOPIC),
        id_label='hit_id',
        timestamp_attribute='time')
    
    hits = msgs | 'Parse' >> beam.Map(my_parser)
    
    windowed_hits = hits | 'Windowing' >> beam.WindowInto(beam.window.Sessions(1 * 60))
    
    visit_id = (windowed_hits | 'ExtractTimestamp' >> beam.Map(my_extracter)
        | 'GetMinimum' >> beam.CombineGlobally(my_min).without_defaults())
    
    windowed_hits | 'SetVisitId' >>
        beam.Map(set_visit_id, visit_id=beam.pvalue.AsSingleton(visit_id))
    

    这似乎不管用。调试时,我的visit\u id分支似乎工作正常,它在计算最小值之前等待会话结束。但是当用作侧输入时,我只得到一个pvalue.EmptySideInput。我怎样才能得到我想要的结果,为什么我的代码返回一个空边输入?

    编辑:

    • _包含一个WindowWalue的iterable。该值是我发送的唯一命中的时间戳(我们称之为TS1)。它与一个窗口相关联,从TS1到TS1+60。奇怪的是,这个WindowedValue的timestamp属性是TS1+60(.238),但我猜这是因为计算最小值的分支在计算最小值之前等待会话完成。
    • _目标窗口包含一个窗口,从TS+60(.24)到TS+120(.24)。

    所以我猜问题是这个窗口,但我不明白为什么它的范围是从TS+60到TS+120,这可能是因为WindowedValue的时间戳吗?似乎很有可能,因为\u target\u窗口的边界似乎是从其舍入值派生的。

    1 回复  |  直到 7 年前
        1
  •  1
  •   totooooo Richard Dally    7 年前

    我最终通过扔掉任何联合收割机并用一个GroupByKey代替它来实现我的目标。

    def my_parser(msg):
        result = literal_eval(msg)
        return result
    
    def set_key(hit):
        return (hit['cid'], hit)
    
    def set_vid2(keyed_hits):
        k, hits = keyed_hits
        visit_id = min([h['time'] for h in hits])
        for h in hits:
            h['visit_id'] = visit_id
        return hits
    
    def unpack_list(l):
        for d in l:
            yield d
    
    msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
        topic='projects/{PROJECT_ID}/topics/{SOURCE_TOPIC}'.format(
            PROJECT_ID=PROJECT_ID, SOURCE_TOPIC=SOURCE_TOPIC),
        id_label='hit_id',
        timestamp_attribute='time')
    
    hits = msgs | 'Parse' >> beam.Map(my_parser)
    
    keyed_hits = hits | 'SetKey' >> beam.Map(set_key)
    
    windowed_hits = (keyed_hits | 'Windowing' >> beam.WindowInto(beam.window.Sessions(1 * 60))
        | 'Grouping' >> beam.GroupByKey())
    
    clean_hits = windowed_hits | 'ComputeMin' >> beam.Map(set_vid2)
    
    clean_hits | 'Unpack' >> beam.FlatMap(unpack_list)
    

    在GroupByKey之后,我有一个包含点击列表的PCollection(按CookieID+会话窗口分组)。然后,在每次点击时计算并设置访问ID后,我将点击列表的PCollection转换为点击列表的PCollection,并使用unpack\u list。

    我不确定这是正确的方法,如果它对性能有任何影响。