您可以使用
map
将文件名添加到每个
List[ByteString]
:
val fileName = "10002070.csv"
val source =
FileIO.fromPath(Path.get(fileName))
.via(CsvParsing.lineScanner())
.map(List(ByteString(fileName)) ++ _)
例如:
Source.single(ByteString("""header1,header2,header3
|1,2,3
|4,5,6""".stripMargin))
.via(CsvParsing.lineScanner())
.map(List(ByteString("myfile.csv")) ++ _)
.runForeach(row => println(row.map(_.utf8String)))
// The above code prints the following:
// List(myfile.csv, header1, header2, header3)
// List(myfile.csv, 1, 2, 3)
// List(myfile.csv, 4, 5, 6)
同样的方法也适用于更一般的情况,即您事先不知道文件名。如果您想读取目录中的所有文件(假设所有这些文件都是csv文件),将文件连接到单个流中,并在每个流元素中保留文件名,那么您可以使用Alpakka的
Directory
按以下方式使用:
val source =
Directory.ls(Paths.get("/my/dir")) // Source[Path, NotUsed]
.flatMapConcat { path =>
FileIO.fromPath(path)
.via(CsvParsing.lineScanner())
.map(List(ByteString(path.getFileName.toString)) ++ _)
}