代码之家  ›  专栏  ›  技术社区  ›  nomadic_squirrel

如何访问中的管道选项Par.Do 变换?

  •  0
  • nomadic_squirrel  · 技术社区  · 7 年前

    热释光;DR:如何访问在我的系统中创建作业时传递给作业的参数Par.Do 变换?

    我有两个模板,一个用于Dev,一个用于Prod,它们都可以正常工作,只是每个模板中有一个值需要不同。到目前为止,我一直在“硬编码”这个值,然后我“运行”java程序来构建模板(使用DataflowRunner)。但这是容易出错的,如果我不是真的很小心,我会尝试更新dev模板中的一些代码,无意中仍然从prod模板设置了这个值。不好的。

    如果我使用默认的运行程序并在本地运行管道,它可以正常工作,但是当我切换并构建模板时,值总是 null . 我可以用下面的代码来重现这一点:

    /* 
    imports...
    */
    
    @SuppressWarnings("serial")
    public class StarterPipeline {
      private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
    
      static String orgId;
    
      public interface MyOptions extends PipelineOptions {
    
    
         @Description("Org Id")
         @Default.String("123-984-a")
         String getOrgId();
         void setOrgId( String orgID );
    
      }
    
      public static void main(String[] args) {
    
    
         PipelineOptionsFactory.register(MyOptions.class);
    
    
         final MyOptions options = PipelineOptionsFactory.fromArgs( args ).withValidation().create()
            .as( MyOptions.class );
    
    
         orgId = options.getOrgId();
    
         LOG.info( "orgId: " + orgId );
    
         Pipeline p = Pipeline.create( options );
    
    
         PCollection<String> someDataRows = p.apply("Get data from BQ", Create.of(
    
          "string 1", "string2", "string 3"
    
         ) );
    
    
         someDataRows.apply( "Package into a list", ParDo.of( new DoFn<String, String>() {
    
               @ProcessElement
               public void processElement( ProcessContext c ) {
    
                  LOG.info( "Hello? " );
                  LOG.info( "ORG ID: " + orgId );
               }
    
               }));
    
    
        p.run();
      }
    }
    

     2018-09-20 (16:16:49) Hello?
     2018-09-20 (16:16:49) ORG ID: null
     2018-09-20 (16:16:51) Hello?
     2018-09-20 (16:16:51) ORG ID: null
     2018-09-20 (16:16:53) Hello?
     2018-09-20 (16:16:53) ORG ID: null
     ...
    

    但在当地:

    Sep 20, 2018 4:15:32 PM simplepipeline.StarterPipeline main
    INFO: orgId: jomama47
    Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
    INFO: Hello? 
    Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
    INFO: ORG ID: jomama47
    Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
    INFO: Hello? 
    Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
    INFO: Hello? 
    Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
    INFO: ORG ID: jomama47
    Sep 20, 2018 4:15:35 PM simplepipeline.StarterPipeline$1 processElement
    INFO: ORG ID: jomama47
    

    --project=the-project
    --stagingLocation=gs://staging.the-project.appspot.com/staging/
    --tempLocation=gs://staging.the-project.appspot.com/temp/
    --runner=DataflowRunner
    --region=us-west1
    --templateLocation=gs://staging.the-project.appspot.com/templates/NoobPipelineDev
    --orgId=jomama47
    

    对于本地:

    --project=the-project
    --tempLocation=gs://staging.the-project.appspot.com
    --orgId=jomama47
    

    当我在parameters字段的Dataflow控制台(浏览器)中创建作业时,我尝试将参数传递给该作业 orgId jomama77

    1 回复  |  直到 7 年前
        1
  •  1
  •   Guillem Xercavins    7 年前

    这里有两件事。首先,我建议使用 ValueProvider orgId :

    public interface MyOptions extends PipelineOptions {    
         @Description("Org Id")
         @Default.String("123-984-a")
         ValueProvider<String> getOrgId();
         void setOrgId(ValueProvider<String> orgID);   
    }
    

    然后使用以下选项阅读:

    ValueProvider<String> orgId = options.getOrgId();
    

    docs :

    someDataRows.apply( "Package into a list", ParDo.of( new CustomFn(orgId)));
    

    CustomFn 的构造函数将其作为参数并存储在 orgId.get() :

    static class CustomFn extends DoFn<String, String> {
        // access options from wihtin the ParDo
        ValueProvider<String> orgId;
        public CustomFn(ValueProvider<String> orgId) {
            this.orgId = orgId;
        }
    
        @ProcessElement
        public void processElement( ProcessContext c ) {
          LOG.info( "Hello? " );
          LOG.info( "ORG ID: " + orgId.get() );
        }
    }
    

    现在您可以暂存模板并使用以下方法调用它:

    gcloud dataflow jobs run $JOB_NAME \
        --gcs-location gs://$BUCKET/templates/$TEMPLATE_NAME \
        --parameters orgId=jomama47
    

    这应该如预期的那样起作用:

    enter image description here

    推荐文章