代码之家  ›  专栏  ›  技术社区  ›  Idiot

Spark-处理后移动文件

  •  1
  • Idiot  · 技术社区  · 1 年前

    我将json文件读取到数据帧中: df = spark.read.option("multiline", "true").json(f"/mnt/bronze/{something}*")

    然后我进行一些处理,然后将其写入delta表: rows.write.format("delta").mode("overwrite").save(f"/mnt/silver/{something}")

    我想在处理后移动json文件,这样以后就不会再重新处理它们了。但我不知道如何移动它们。

    我尝试在rdd.foreach()中使用dbutils,但这是不可能的。我还在rdd.foreach()中尝试了shutil.move(),但由于我使用blob存储,所以找不到我的文件。

    1 回复  |  直到 1 年前
        1
  •  0
  •   Daniel Perez Efremova    1 年前

    让我提供一些解决方案。

    我想/mnt/b古铜/{something}是您着陆文件的暂存区,并且您希望防止读取以前着陆的文件。

    您在当前笔记本的末尾有一些选项:

    1. 如果不再需要原始文件,请删除这些文件:
    dbutils.fs.rm("/mnt/bronze/{something}*", True)
    

    dbutils.fs.rm 是删除文件的功能, "/mnt/bronze/{something}*" 是要删除的文件的路径和模式,第二个参数为True表示删除是递归的(如果您确定只删除文件,这是可选的)。

    1. 如果您将来可能需要这些文件,只需将它们移动到临时目录中,以便将来可以安全地删除它们
    dbutils.fs.mv("/mnt/bronze/{something}*", "/mnt/tmp/")
    

    PD:如果你选择2),我建议在tmp/dir的路径中添加一些基于时间戳的规则,以确保它保持有序和干净。例如 f"mnt/tmp/{today()}"