代码之家  ›  专栏  ›  技术社区  ›  zangw

谷歌云数据流:为什么管道与DirectRunner一起运行两次?

  •  0
  • zangw  · 技术社区  · 6 年前

    假设数据集如下

    {"slot":"reward","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42544}
    {"slot":"reward_dlg","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42545}
    

    我试图通过 type:ba 并使用python sdk将它们插入bigquery

    ba_schema = 'slot:STRING,result:INTEGER,play_type:STRING,level:INTEGER'
    
    class ParseJsonDoFn(beam.DoFn):
        B_TYPE = 'tag_B'
        def process(self, element):
            text_line = element.trip()
            data = json.loads(text_line)
    
            if data['type'] == 'ba':
                ba = {'slot': data['slot'], 'result': data['result'], 'p_type': data['p_type'], 'level': data['level']}
                yield pvalue.TaggedOutput(self.B_TYPE, ba)
    
    def run():
        parser = argparse.ArgumentParser()
        parser.add_argument('--input',
                          dest='input',
                          default='data/path/data',
                          help='Input file to process.')
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_args.extend([
          '--runner=DirectRunner',
          '--project=project-id',
          '--job_name=data-job',
        ])
        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(SetupOptions).save_main_session = True
        with beam.Pipeline(options=pipeline_options) as p:
            lines = p | ReadFromText(known_args.input)
    
            multiple_lines = (
                lines
                | 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
                                          ParseJsonDoFn.B_TYPE)))
    
            ba_line = multiple_lines.tag_B
    
           (ba_line
            | "output_ba" >> beam.io.Write(
                          beam.io.BigQuerySink(
                              table = 'ba',
                              dataset = 'temp',
                              project = 'project-id',
                              schema = ba_schema,
                              # write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                              create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                            )
                        ))
    
             p.run().wait_until_finish()
    

    输出是

    /usr/local/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:342: DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will not be supported
      pipeline.replace_all(_get_transform_overrides(pipeline.options))
    INFO:root:Running pipeline with DirectRunner.
    INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
    INFO:oauth2client.client:Refreshing access_token
    INFO:root:Writing 2 rows to project-id:temp.ba table.
    INFO:root:Running pipeline with DirectRunner.
    INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
    INFO:oauth2client.client:Refreshing access_token
    INFO:root:Writing 2 rows to project-id:temp.ba table.
    

    我们注意到有两行 INFO:root:Writing 2 rows to project-id:temp.ba table. ,并在bigquery中查询此表

    select * from `temp.ba`;
    

    此表中有4条重复数据记录。

    我想知道为什么管道运行同一个作业两次?

    1 回复  |  直到 6 年前
        1
  •  2
  •   Pablo    6 年前

    这个 with 声明 Pipeline 运行管道。明确地:

    with beam.Pipeline(...) as p:
      [...code...]
    

    相当于:

    p = beam.Pipeline(...)
    [...code...]
    p.run().wait_until_finish()
    

    the implementation .

    推荐文章