代码之家  ›  专栏  ›  技术社区  ›  Michael Lihs Paul Kenjora

通过Avro使用Spring Boot应用程序中的Spark SQL

  •  2
  • Michael Lihs Paul Kenjora  · 技术社区  · 8 年前

    我正在尝试编写一个Spring Boot应用程序,对Spark主服务器运行Spark SQL查询。下面是应该运行查询的代码:

    public class SparkJob {
    
        public void run() {
            SparkConf conf = new SparkConf()
                .setMaster("spark://10.20.30.50:7077")
                .setAppName("show_avro_data");
            JavaSparkContext sc = new JavaSparkContext(conf);
            SQLContext sqlContext = new SQLContext(sc);
    
            DataFrame df = sqlContext.read()
                .format("com.databricks.spark.avro")
                .load("hdfs://localhost:8020/test.avro");
    
            df.show();
        }
    
    }
    

    这个 run() 方法从Spring MVC控制器中调用。它应该“在线”执行Spark查询,即我不想将应用程序上传到Spark,例如 spark-submit .

    当我现在启动应用程序并运行该方法时,我得到以下stacktrace:

    2016-11-08 17:38:18.805 ERROR 43109 --- [nio-9003-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]: 
      Servlet.service() for servlet [dispatcherServlet] in context with path [] 
      threw exception [Request processing failed; nested exception is 
      org.apache.spark.SparkException: Job aborted due to stage failure: 
      Task 0 in stage 0.0 failed 4 times, most recent failure: 
      Lost task 0.3 in stage 0.0 (TID 3, 10.20.30.50): 
      java.lang.ClassNotFoundException:
      com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$3
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    

    因为我使用的是Spark版本 1.5.2 和Scala版本 2.10.4 我使用了以下Maven依赖项 according to the spark-avro documentation :

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.5.2</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.5.2</version>
    </dependency>
    
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.10</artifactId>
        <version>2.0.1</version>
    </dependency>
    

    我想,我必须在Spark master上提供Avro库-但我不知道如何做到这一点,我在互联网上找到的所有示例都没有提供任何解决方案。

    编辑

    当我使用JSON而不是Avro

    DataFrame df = sqlContext.read().json("hdfs://...");
    
    1 回复  |  直到 8 年前
        1
  •  1
  •   VladoDemcak    8 年前

    由于您正在从spring应用程序调用spark(您没有使用 spark-submit )您正在尝试使用MVC作为“驱动程序”,并在独立的spark上运行spark作业。

    所以你想用弹簧 MVC client 模式正当

    MVC spring project (在火花驱动器中 客户 因此您需要将此库添加到spark standalone(工作人员)。

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.10</artifactId>
        <version>2.0.1</version>
    </dependency>
    

    你可以 addJar spark-avro_2.10.jar 到HDFS中,并添加到 SparkContext 比如:

     JavaSparkContext sc = new JavaSparkContext(conf);
     sc.addJar("pathToAvroOnHDFS/spark-avro_2.10.jar")
    

    现在,spark意识到 .

    另一个选择是添加以下内容: jar 进入spark类路径。您可以检查以下属性(应在中支持这些属性 spark 1.5.2 此外):

    spark.driver.extraClassPath /avroPath.jar
    spark.executor.extraClassPath /avroPath.jar