代码之家  ›  专栏  ›  技术社区  ›  Evgenii

使用spark流媒体创建卡夫卡消费者

  •  0
  • Evgenii  · 技术社区  · 7 年前

    我已经开始卡夫卡,创造了一个主题和制片人。现在我想读那个制作人发来的消息。我的代码

    def main(args: Array[String]){
        val sparkConf = new SparkConf()
        val spark = new SparkContext(sparkConf)
    
        val streamingContext = new StreamingContext(spark, Seconds(5))
        val kafkaStream = KafkaUtils.createStream(streamingContext
          , "localhost:2181"
          , "test-group"
          , Map("test" -> 1))
        kafkaStream.print
        streamingContext.start
        streamingContext.awaitTermination
      }
    

    我使用的依赖项

      <properties>
        <spark.version>1.6.2</spark.version>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
      </dependencies>
    

    Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
        at org.apache.spark.util.Utils$.getSystemProperties(Utils.scala:1582)
        at org.apache.spark.SparkConf.<init>(SparkConf.scala:59)
        at org.apache.spark.SparkConf.<init>(SparkConf.scala:53)
        at com.mypackage.KafkaConsumer$.main(KafkaConsumer.scala:10)
        at com.mypackage.KafkaConsumer.main(KafkaConsumer.scala)
    

    这里的其他问题指向依赖关系之间的冲突。

    我使用scala 2.10.5和spark 1.6.2。我在其他项目中试用过,效果很好。

    本例中的第10行是 val sparkConf = new SparkConf()

    我试着在没有打包的情况下按想法运行应用程序。 这个问题的原因是什么?

    1 回复  |  直到 7 年前
        1
  •  0
  •   gasparms    7 年前

    这是Scala版本的错误。您在代码和依赖项中使用了不同版本的scala。

    您说您使用的是scala 2.10,但您导入了spark-XX_2.11依赖项。统一scala版本。