代码之家  ›  专栏  ›  技术社区  ›  Chris Snow

java.io.FileNotFoundException:未找到cos://mybucket.myservicename/checkpoint/offsets

  •  1
  • Chris Snow  · 技术社区  · 7 年前

    我试图使用Spark Structured Streaming 2.3从Kafka(IBM消息中心)读取数据,并将其保存到IBM分析引擎集群1.1上的IBM云对象存储中。

    $ ssh clsadmin@myclusterid.bi.services.eu-gb.bluemix.net
    

    创建 jaas.conf

    $ cat << EOF > jaas.conf
    KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        serviceName="kafka"
        username="<<MY_MESSAGEHUB_USERNAME>>"
        password="<<MY_MESSAGEHUB_PASSWORD>>";
    };
    EOF
    

    这将创建一个文件 jaas.conf文件 /home/wce/clsadmin

    创建一个实用程序脚本来启动spark shell(目前我们只有一个执行器):

    $ cat << EOF > start_spark.sh
    spark-shell --master local[1] \
           --files jaas.conf \
           --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 \
           --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
           --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
           --num-executors 1 --executor-cores 1 
    EOF
    $ chmod +x start_spark.sh
    

    使用实用程序脚本启动spark会话:

    $ ./start_spark.sh
    

    现在在spark shell中,读取Kafka(消息集线器)流。确保您更改 kafka.bootstrap.servers 要匹配您的服务凭据:

    val df = spark.readStream.
                    format("kafka").
                    option("kafka.bootstrap.servers", "kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093").
                    option("subscribe", "transactions_load").
                    option("kafka.security.protocol", "SASL_SSL").
                    option("kafka.sasl.mechanism", "PLAIN").
                    option("kafka.ssl.protocol", "TLSv1.2").
                    option("kafka.ssl.enabled.protocols", "TLSv1.2").
                    load()
    

    我们可以测试我们的连接是否正常:

    df.writeStream.format("console").start()
    

    过一段时间,您应该会看到一些数据打印到控制台上,例如。

    -------------------------------------------                                     
    Batch: 1
    -------------------------------------------
    +--------------------+--------------------+-----------------+---------+------+--------------------+-------------+
    |                 key|               value|            topic|partition|offset|           timestamp|timestampType|
    +--------------------+--------------------+-----------------+---------+------+--------------------+-------------+
    |[35 34 30 33 36 3...|[7B 22 49 6E 76 6...|transactions_load|        7| 84874|2018-08-22 15:42:...|            0|
    |[35 34 30 33 36 3...|[7B 22 49 6E 76 6...|transactions_load|        7| 84875|2018-08-22 15:42:...|            0|
    |[35 34 30 38 33 3...|[7B 22 49 6E 76 6...|transactions_load|        7| 84876|2018-08-22 15:42:...|            0|
    ...
    

    设置spark会话,以便它可以访问COS实例:

    val accessKey = "MY_COS_ACCESS_KEY"
    val secretKey = "MY_COS_SECRET_KEY"
    val bucketName = "streamingdata"
    
    // arbitrary name for refering to the cos settings from this code
    val serviceName = "myservicename"
    
    sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.access.key", accessKey)
    sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.secret.key", secretKey)
    sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.endpoint", "s3.eu-geo.objectstorage.service.networklayer.com")
    

    import spark.implicits._
    
    val data = sc.parallelize(Array(1,2,3,4,5))
    data.toDF.write.format("csv").save(s"cos://${bucketName}.${serviceName}/data.txt")
    
    spark.read.csv(s"cos://${bucketName}.${serviceName}/data.txt").collect()
    

    如果读写COS成功,上面的测试应该输出如下内容:

    res7: Array[org.apache.spark.sql.Row] = Array([1], [2], [3], [4], [5])
    

    现在尝试将流数据帧写入COS:

    df.
      writeStream.
      format("parquet").
      option("checkpointLocation", s"cos://${bucketName}.${serviceName}/checkpoint").
      option("path",               s"cos://${bucketName}.${serviceName}/data").
      start()
    

    对我来说,这是失败的:

    scala> 18/08/22 15:43:06 WARN COSAPIClient: file status checkpoint/offsets returned 404
    18/08/22 15:43:06 ERROR MicroBatchExecution: Query [id = 78c8c4af-f21d-457d-b5a7-56559e180634, runId = 50e8759e-0293-4fab-9b73-dd4811423b37] terminated with error
    java.io.FileNotFoundException: Not found cos://streamingdata.myservicename/checkpoint/offsets
        at com.ibm.stocator.fs.cos.COSAPIClient.getFileStatus(COSAPIClient.java:628)
        at com.ibm.stocator.fs.ObjectStoreFileSystem.getFileStatus(ObjectStoreFileSystem.java:486)
        at com.ibm.stocator.fs.ObjectStoreFileSystem.listStatus(ObjectStoreFileSystem.java:360)
        at com.ibm.stocator.fs.ObjectStoreFileSystem.listStatus(ObjectStoreFileSystem.java:336)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileSystemManager.list(HDFSMetadataLog.scala:412)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:231)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:180)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
    

    1 回复  |  直到 7 年前
        1
  •  1
  •   cactus    7 年前

    更改为S3AFileSystem似乎解决了此问题:

    sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
    sc.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
    sc.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)
    sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.eu-geo.objectstorage.service.networklayer.com")
    
    val s3Url = s"s3a://${bucketName}/"
    
    ...
    
    df.
      writeStream.
      format("parquet").
      option("checkpointLocation", s"${s3Url}/checkpoint").
      option("path",               s"${s3Url}/data").
      start()
    

    看起来这个问题和stocator驱动程序有关。


    2018年8月23日更新: 这个问题在Stocator中解决了 v1.0.24

    推荐文章