您好,我已经创建了一个apache beam管道,对其进行了测试,并从eclipse内部运行它,包括本地运行和使用dataflow runner运行。我可以在eclipse控制台中看到管道正在运行,我也可以看到详细信息,即控制台上的日志。
现在,我如何将此管道部署到GCP,以便无论我的机器的状态如何,它都能继续工作。例如,如果我使用mvn compile-exec:java运行它,控制台会显示它正在运行,但我无法使用数据流UI找到作业。
此外,如果在本地终止进程,会发生什么情况?GCP基础架构上的作业也会停止吗?在GCP基础设施上,我如何知道作业是独立于我的机器状态触发的?
maven compile exec:java with arguments输出如下:,
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/C:/Users/ThakurG/.m2/repository/org/slf4j/slf4j-
jdk14/1.7.14/slf4j-jdk14-1.7.14.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/ThakurG/.m2/repository/org/slf4j/slf4j-nop/1.7.25/slf4j-nop-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.JDK14LoggerFactory]
Jan 08, 2018 5:33:22 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ main
INFO: starting the process...
Jan 08, 2018 5:33:25 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ
createStream
INFO: pipeline created::Pipeline#73387971
Jan 08, 2018 5:33:27 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ main
INFO: pie crated::Pipeline#73387971
Jan 08, 2018 5:54:57 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ$1 apply
INFO: Message received::1884408,16/09/2017,A,2007156,CLARK RUBBER FRANCHISING PTY LTD,A ,5075,6,Y,296,40467910,-34.868095,138.683535,66 SILKES RD,,,PARADISE,5075,0,7.4,5.6,18/09/2017 2:09,0.22
Jan 08, 2018 5:54:57 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ$1 apply
INFO: Payload from msg::1884408,16/09/2017,A,2007156,CLARK RUBBER FRANCHISING PTY LTD,A ,5075,6,Y,296,40467910,-34.868095,138.683535,66 SILKES RD,,,PARADISE,5075,0,7.4,5.6,18/09/2017 2:09,0.22
Jan 08, 2018 5:54:57 PM com.trial.apps.gcp.df.ReceiveAndPersistToBQ$1 apply
这是我在cmd提示符下使用的maven命令,
`mvn compile exec:java -Dexec.mainClass=com.trial.apps.gcp.df.ReceiveAndPersistToBQ -Dexec.args="--project=analyticspoc-XXX --stagingLocation=gs://analytics_poc_staging --runner=DataflowRunner --streaming=true"`
这是我用来创建管道并在管道上设置选项的代码。
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dfOptions = options.as(DataflowPipelineOptions.class);
dfOptions.setRunner(DataflowRunner.class);
dfOptions.setJobName("gcpgteclipse");
dfOptions.setStreaming(true);
// Then create the pipeline.
Pipeline pipeL = Pipeline.create(dfOptions);