让我们从开始
your other question
如果我在磁盘上的数据保证按密钥进行预排序,该密钥将用于组聚合或缩减,那么Spark有没有办法利用这一点?
这要看情况。如果您应用的操作可以从映射端聚合中受益,那么您可以通过预排序数据而无需对代码进行任何进一步干预,从而获得很多好处。共享同一密钥的数据应该位于相同的分区上,并且可以在洗牌之前在本地聚合。
不幸的是,在这种特殊情况下,它不会有太大帮助。即使启用地图端聚合(
groupBy(Key)
不使用is,所以您需要自定义实现)或聚合特征向量(您可以在我的回答中找到一些示例
How to define a custom aggregation function to sum a column of Vectors?
)没有太多收获。您可以在这里和那里保存一些工作,但仍然需要在节点之间传输所有索引。
如果你想获得更多,你就必须做更多的工作。我可以看到两种利用现有订单的基本方法:
-
使用自定义Hadoop输入格式只生成完整的记录(标签、id、所有特性),而不是逐行读取数据。如果您的数据每个id有固定的行数,您甚至可以尝试使用
NLineInputFormat
并应用
mapPartitions
以在之后聚合记录。
这绝对是一个更详细的解决方案,但不需要在Spark中进行额外的洗牌。
-
照常读取数据,但使用自定义分区器
groupBy
。据我所知,使用
rangePartitioner
应该可以正常工作,但要确保您可以尝试以下步骤:
-
使用
mapPartitionsWithIndex
查找每个分区的最小/最大id。
-
创建保持最小值的分区器<=ids<最大电流(
第i个
)分区并将最大值推送到分区
i+1
-
使用此分区器
groupBy(键)
这可能是一个更友好的解决方案,但至少需要一些洗牌。如果要移动的预期记录数很低(每个分区<<#个记录),您甚至可以使用
映射分区
和
broadcast
*尽管在实践中进行分区可能更有用,也更便宜。
*您可以使用类似的方法:
https://stackoverflow.com/a/33072089/1560062