代码之家  ›  专栏  ›  技术社区  ›  hotmeatballsoup

从S3事件触发AWS EMR Spark作业

  •  0
  • hotmeatballsoup  · 技术社区  · 5 年前

    我正在考虑使用AWS EMR Spark对存储在S3上的非常大的拼花文件运行Spark应用程序。这里的总体流程是,一个Java进程将这些大文件上传到S3,我想自动触发在这些文件上运行Spark作业(注入上传文件的S3密钥名)。

    理想情况下,会有某种基于S3的EMR触发器可供连接;也就是说,我将EMR/Spark配置为“监听”S3存储桶,并在对该存储桶进行升级时启动Spark作业。

    如果不存在这样的触发器,我可能会混在一起,比如从S3事件中启动Lambda,并让Lambda以某种方式触发EMR Spark任务。

    然而我的 理解力 ( 如果我错了,请纠正我)启动火花工作的唯一方法是:

    1. 将作业打包为可执行的JAR文件;和
    2. 通过 spark-submit shell脚本

    因此,如果我必须做基于Lambda的乱七八糟的工作,我不确定触发EMR/Spark工作的最佳方式是什么,因为Lambda本身并不携带 火花提交 在运行时。即使我配置了自己的Lambda运行时(我 相信

    有人曾用S3触发器或 任何 你以前有过吗?

    0 回复  |  直到 5 年前
        1
  •  2
  •   mon    5 年前

    EMR Spark作业可以作为一个步骤执行,如中所示 Adding a Spark Step .步骤不仅仅是在引导后的EMR群集创建时。

    aws emr add-steps --cluster-id j-2AXXXXXXGAPLF --steps Type=Spark,Name="Spark Program",ActionOnFailure=CONTINUE,Args=[--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/examples/jars/spark-examples.jar,10]
    

    因为它是AWS CLI,所以可以从Lambda调用它,在Lambda中还可以将jar文件上载到HDFS或S3,然后使用S3://或HDFS://指向它。

    该文档还有一个Java示例。

    AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
    AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
    
    StepFactory stepFactory = new StepFactory();
    AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials);
    AddJobFlowStepsRequest req = new AddJobFlowStepsRequest();
    req.withJobFlowId("j-1K48XXXXXXHCB");
    
    List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
    
    HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
                .withJar("command-runner.jar")
                .withArgs("spark-submit","--executor-memory","1g","--class","org.apache.spark.examples.SparkPi","/usr/lib/spark/examples/jars/spark-examples.jar","10");            
    
    StepConfig sparkStep = new StepConfig()
                .withName("Spark Step")
                .withActionOnFailure("CONTINUE")
                .withHadoopJarStep(sparkStepConf);
    
    stepConfigs.add(sparkStep);
    req.withSteps(stepConfigs);
    AddJobFlowStepsResult result = emr.addJobFlowSteps(req);