代码之家  ›  专栏  ›  技术社区  ›  reese0106

修改MinimalWordCount示例以从BigQuery读取

  •  2
  • reese0106  · 技术社区  · 8 年前

    我试图修改Apache Beam的MinimalWordCount python示例,以从BigQuery表中读取。我已经做了以下修改,我似乎有查询工作,但示例。

    这里的原始示例:

     with beam.Pipeline(options=pipeline_options) as p:
    
        # Read the text file[pattern] into a PCollection.
        lines = p | ReadFromText(known_args.input)
    
        # Count the occurrences of each word.
        counts = (
            lines
            | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                          .with_output_types(unicode))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))
    
        # Format the counts into a PCollection of strings.
        output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))
    
        # Write the output using a "Write" transform that has side effects.
        # pylint: disable=expression-not-assigned
        output | WriteToText(known_args.output)
    

    而不是 ReadFromText 我试图将其调整为从BigQuery表中的列中读取。为此,我更换了 lines = p | ReadFromText(known_args.input)

    query = 'SELECT text_column FROM `bigquery.table.goes.here` '
    lines = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
    

    当我重新运行管道时,我得到了错误:“ WARNING:root:A task failed with exception. expected string or buffer [while running 'Split'] "

    我知道“Split”操作需要一个字符串,但它显然没有得到一个字符串。如何修改“ReadFromBigQuery”,使其传递字符串/缓冲区?我需要提供一个表模式或什么来将“ReadFromBigQuery”的结果转换成字符串缓冲区吗?

    1 回复  |  直到 8 年前
        1
  •  2
  •   Marcin Zablocki    8 年前

    这是因为 BigQuerySource 退货 PCollection dict ),其中字典中的每个键表示一列。对于你的情况,最简单的方法就是申请 beam.Map 之后 beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True) 这样地:

    lines = (p 
    |"ReadFromBigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)) 
    | "Extract text column" >>  beam.Map(lambda row: row.get("text_column"))
             )
    

    如果遇到列名问题,请尝试将其更改为 u"text_column"

    您可以修改 分裂 转换以提取其中列的值:

    'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x.get("text_column")))
                          .with_output_types(unicode))
    
    推荐文章