代码之家  ›  专栏  ›  技术社区  ›  RedNay8080

卡夫卡至火花流至HDFS

  •  0
  • RedNay8080  · 技术社区  · 10 年前

    我使用createDirectStream来集成SparkStreaming和Kafka。下面是我使用的代码:

    val ssc = new StreamingContext(new SparkConf, Seconds(10))
        val kafkaParams = Map("metadata.broker.list" -> "sandbox:6667")
        val topics = Set("topic1")
    
        val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topics)
    

    现在我想将消息存储到HDFS中。这样做对吗?

    messages.saveAsTextFiles("/tmp/spark/messages")
    
    1 回复  |  直到 10 年前
        1
  •  0
  •   Sumit    10 年前

    saveAsTextFiles("/tmp/spark/messages") -这将把您的数据保存在本地文件系统中,如果提供的文件夹结构(“/tmp/spark/messages”)是本地HDFS的一部分,那么它也将显示在HDFS目录中,因为 saveAsTextFiles 利用相同的MapeReduce API来编写输出。

    上述方法适用于Spark Executors和HDFS位于同一物理机器上的场景,但如果您的HDFS目录或URL不同,并且不在运行执行程序的同一机器上,则这将不起作用。

    如果您需要确保数据在HDFS中持久化,那么作为一种良好的做法,您应该始终提供完整的HDFS URL。像这样的- saveAsTextFiles("http://<HOST-NAME>:9000/tmp/spark/messages")

    或者您也可以利用以下方法之一:-

    1. DStream.saveAsNewAPIHadoopFiles(<HDFS URL with Location>)
    2. DStream.saveAsHadoopFiles(<HDFS URL with Location>)