在从Kafka主题读取流之后,在将其写入任何登录或表之前,我需要在某些列上应用一个函数。
这是在azure数据库中完成的。
CREATE FUNCTION encrypt AS 'com.encrypt.EncryptJava' using JAR 'hdfs:/.../jars/encryption_1.0.0.jar';
select encrypt(123,'key');
var streamingSelectDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootStrapServers)
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)").withColumn("jsonData",from_json($"value",schema)).select($"jsonData.*")
上面的代码创建了一个函数,从kafka流中读取json数据,并将其分解为多列。
接下来,需要在少数列上应用上述函数,并在将其保存到平台或表之前对其进行转换。
我在尝试时遇到了各种不同的错误。
streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")
.option("mergeschema",true)
.option("checkpointLocation", checkPointPath)
.format("delta")
.trigger(Trigger.ProcessingTime("5 seconds"))
.table("raw_data")
command-3006790186109139:29: error: not found: value encrypt
streamingSelectDF.withColumn("encrypted_col",encrypt($"acntnum","b1")).writeStream.outputMode("append")
有人能帮我实现同样的目标吗。