我正在使用jdbc连接从postgres读取pyspark中的数据。正在读取的表很大,大约有2.4亿行。我试图将其读入16个分区。读取是这样进行的。
query = f"""
(select receiptid, itemindex, barcode, productnumberraw, itemdescription, itemdescriptionraw, itemextendedprice, itemdiscountedextendedprice, itemquantity, barcodemanufacturer, barcodebrand, barcodecategory1, barcodecategory2, barcodecategory3, isfetchpromo, ispartnerbrand, subscribeandsave, soldby, yyyymm, retailerid,
MOD(ABS(CAST('x' || md5(receiptid) AS bit(32))::int), {num_partitions}) AS partition_id from {table}_bck) as subquery
"""
# Optimize JDBC read options
df = spark.read \
.format("jdbc") \
.option("url", pg_url) \
.option("dbtable", query) \
.option("user", pg_properties["user"]) \
.option("password", pg_properties["password"]) \
.option("driver", pg_properties["driver"]) \
.option("numPartitions", num_partitions) \
.option("partitionColumn", "partition_id") \
.option("lowerBound", 0) \
.option("upperBound", num_partitions - 1) \
.load()
df = df.withColumn(
"productnumberint",
regexp_replace(col("productnumberraw"), "[#-]", "").cast(LongType())
).withColumn(
"barcodeint",
regexp_replace(col("barcode"), "[#-]", "").cast(LongType())
)
然后我想把数据写回postgres
df.rdd.foreachPartition(write_partition)
其中write_partition只是迭代行,并使用psycopg2进行批量插入。
我的问题是,我看到数据库上的分区查询增加了一倍。
SELECT "receiptid","itemindex","barcode","productnumberraw","itemdescription","itemdescriptionraw","itemextendedprice","itemdiscountedextendedprice","itemquantity","barcodemanufacturer","barcodebrand","barcodecategory1","barcodecategory2","barcodecategory3","isfetchpromo","ispartnerbrand","subscribeandsave","soldby","yyyymm","retailerid","partition_id" FROM (select receiptid, itemindex, barcode, productnumberraw, itemdescription, itemdescriptionraw, itemextendedprice, itemdiscountedextendedprice, itemquantity, barcodemanufacturer, barcodebrand, barcodecategory1, barcodecategory2, barcodecategory3, isfetchpromo, ispartnerbrand, subscribeandsave, soldby, yyyymm, retailerid,
MOD(ABS(CAST('x' || md5(receiptid) AS bit(32))::int), 16) AS partition_id from mytable) as subquery WHERE "partition_id" >= 10 AND "partition_id" < 11
是什么导致了数据的双重读取?