代码之家  ›  专栏  ›  技术社区  ›  ozw1z5rd

使用结构化流将数组扩展到spark中的列

  •  2
  • ozw1z5rd  · 技术社区  · 6 年前

    我有个问题:

     sDF2 = sDF.selectExpr("CAST(value as string)").select( split("value",","))
    

    使用这个我有一个新的数据帧,其中“value”是一个字符串,它是CSV行。

    例子: csv行为“abcd,123,frgh,1321”

    sDF schema, which contains the data downloaded from Kafka, is  
    key, value, topic, timestamp etc... and here value is a byte sequence with no type
    
    sDF2.schema has only a column ( named value of type string )
    

    我喜欢新的数据帧

    sDF3.col1 = abcd
    sDF3.col2 = 123
    sDF3.col3 = frgh ...etc
    

     sDF3 = sDF2.select( sDF2.csv[0].alias("EventId").cast("string"),
     sDF2.csv[1].alias("DOEntitlementId").cast("string"),               
     sDF2.csv[3].alias("AmazonSubscriptionId").cast("string"),
     sDF2.csv[4].alias("AmazonPlanId").cast("string"),
     ... etc ... 
    

    但看起来很难看。

    1 回复  |  直到 6 年前
        1
  •  0
  •   Milan Cvejic    6 年前

    我没试过,但这种方法应该行得通。

    sDF2 = 
          sDF.selectExpr("CAST(value as string)")
           .alias("csv").select("csv.*")
           .select("split(value,',')[0] as DOEntitlementId", 
                   "split(value,',')[1] as AmazonSubscriptionId", 
                   "split(value,',')[2] as AmazonPlanId")