代码之家  ›  专栏  ›  技术社区  ›  mehere

从Kafka流式传输后对列值应用函数

  •  0
  • mehere  · 技术社区  · 5 年前

    在从Kafka主题读取流之后,在将其写入任何登录或表之前,我需要在某些列上应用一个函数。

    这是在azure数据库中完成的。

    CREATE FUNCTION encrypt AS 'com.encrypt.EncryptJava' using JAR 'hdfs:/.../jars/encryption_1.0.0.jar';
    
    select encrypt(123,'key');
    
    var streamingSelectDF = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", bootStrapServers)
        .option("subscribe", topicName)     
        .option("startingOffsets", "earliest")  
        .load()
     .selectExpr("CAST(value AS STRING)").withColumn("jsonData",from_json($"value",schema)).select($"jsonData.*")
    

    上面的代码创建了一个函数,从kafka流中读取json数据,并将其分解为多列。

    接下来,需要在少数列上应用上述函数,并在将其保存到平台或表之前对其进行转换。

    我在尝试时遇到了各种不同的错误。

    streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")
    .option("mergeschema",true)
     .option("checkpointLocation", checkPointPath)
    .format("delta")
    .trigger(Trigger.ProcessingTime("5 seconds"))
    .table("raw_data")
    
    command-3006790186109139:29: error: not found: value encrypt
    streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")
    
       
    

    有人能帮我实现同样的目标吗。

    0 回复  |  直到 5 年前
        1
  •  0
  •   mehere    5 年前

    已解决

    .withColumn("encrypted_acctnum",expr("encrypt(acctnum, 'b1')"))
    
    推荐文章