代码之家  ›  专栏  ›  技术社区  ›  Zmnako Awrahman beloncfy

在Spark/Scala中提取json数据

  •  0
  • Zmnako Awrahman beloncfy  · 技术社区  · 7 年前

    root
     |-- labels: struct (nullable = true)
     |    |-- compute.googleapis.com/resource_name: string (nullable = true)
     |    |-- container.googleapis.com/namespace_name: string (nullable = true)
     |    |-- container.googleapis.com/pod_name: string (nullable = true)
     |    |-- container.googleapis.com/stream: string (nullable = true)
    

    我要提取四个 .....googleapis.com/... 分成四列。

    我试过这个:

    import org.apache.spark.sql.functions._
    df = df.withColumn("resource_name", df("labels.compute.googleapis.com/resource_name"))
           .withColumn("namespace_name", df("labels.compute.googleapis.com/namespace_name"))
           .withColumn("pod_name", df("labels.compute.googleapis.com/pod_name"))
           .withColumn("stream", df("labels.compute.googleapis.com/stream"))
    

    labels 一个数组,它解决了第一个错误,它说子级不是 array map

    df2 = df.withColumn("labels", explode(array(col("labels"))))   
            .select(col("labels.compute.googleapis.com/resource_name").as("resource_name"), col("labels.compute.googleapis.com/namespace_name").as("namespace_name"), col("labels.compute.googleapis.com/pod_name").as("pod_name"), col("labels.compute.googleapis.com/stream").as("stream"))
    

    我仍然得到这个错误

    org.apache.spark.sql.AnalysisException: No such struct field compute in compute.googleapis.com/resource_name .....
    

    我知道 Spark compute.googleapis.com/resource_name spark 识别为级别的名称而不是多级名称。

    How to get Apache spark to ignore dots in a query?

    但这也没有解决我的问题。我有labels.compute.googleapis.com/resource\u name,添加backticks到compute.googleapis.com/resource\u name仍然会产生相同的错误。

    2 回复  |  直到 7 年前
        1
  •  -1
  •   alexeipab    7 年前

    你可以用反撇号 ` 要分离包含特殊字符的名称,如' . '. 你需要在考试结束后用反勾号

    val extracted = df.withColumn("resource_name", df("labels.`compute.googleapis.com/resource_name`"))
        .withColumn("namespace_name", df("labels.`container.googleapis.com/namespace_name`"))
        .withColumn("pod_name", df("labels.`container.googleapis.com/pod_name`"))
        .withColumn("stream", df("labels.`container.googleapis.com/stream`"))
    
      extracted.show(10, false)
    

    输出:

    +--------------------+-------------+--------------+--------+------+
    |labels              |resource_name|namespace_name|pod_name|stream|
    +--------------------+-------------+--------------+--------+------+
    |[RN_1,NM_1,PM_1,S_1]|RN_1         |NM_1          |PM_1    |S_1   |
    +--------------------+-------------+--------------+--------+------+
    

    更新1 完整的工作示例。

    import org.apache.spark.sql.functions._
    val j_1 =
      """
        |{ "labels" : {
        |   "compute.googleapis.com/resource_name" : "RN_1",
        |   "container.googleapis.com/namespace_name" : "NM_1",
        |   "container.googleapis.com/pod_name" : "PM_1",
        |   "container.googleapis.com/stream" : "S_1"
        |             }
        |}
      """.stripMargin
    
      val df = spark.read.json(Seq(j_1).toDS)
      df.printSchema()
    
      val extracted = df.withColumn("resource_name", df("labels.`compute.googleapis.com/resource_name`"))
        .withColumn("namespace_name", df("labels.`container.googleapis.com/namespace_name`"))
        .withColumn("pod_name", df("labels.`container.googleapis.com/pod_name`"))
        .withColumn("stream", df("labels.`container.googleapis.com/stream`"))
    
      extracted.show(10, false)
    
        2
  •  0
  •   Zmnako Awrahman beloncfy    7 年前

    重命名列(或子级),然后执行 withColumn

    val schema = """struct<resource_name:string, namespace_name:string, pod_name:string, stream:string>"""
    val df1 = df.withColumn("labels", $"labels".cast(schema))