代码之家  ›  专栏  ›  技术社区  ›  Kevin Smeeks

Pyspark JDBC分区读取

  •  2
  • Kevin Smeeks  · 技术社区  · 4 月前

    我正在使用jdbc连接从postgres读取pyspark中的数据。正在读取的表很大,大约有2.4亿行。我试图将其读入16个分区。读取是这样进行的。

    query = f"""
    (select receiptid, itemindex, barcode, productnumberraw, itemdescription, itemdescriptionraw, itemextendedprice, itemdiscountedextendedprice, itemquantity, barcodemanufacturer, barcodebrand, barcodecategory1, barcodecategory2, barcodecategory3, isfetchpromo, ispartnerbrand, subscribeandsave, soldby, yyyymm, retailerid,
     MOD(ABS(CAST('x' || md5(receiptid) AS bit(32))::int), {num_partitions}) AS partition_id from {table}_bck) as subquery
    """
    # Optimize JDBC read options
    df = spark.read \
       .format("jdbc") \
       .option("url", pg_url) \
       .option("dbtable", query) \
       .option("user", pg_properties["user"]) \
       .option("password", pg_properties["password"]) \
       .option("driver", pg_properties["driver"]) \
       .option("numPartitions", num_partitions) \
       .option("partitionColumn", "partition_id") \
       .option("lowerBound", 0) \
       .option("upperBound", num_partitions - 1) \
       .load()
       
    df = df.withColumn(
           "productnumberint",
           regexp_replace(col("productnumberraw"), "[#-]", "").cast(LongType())
       ).withColumn(
           "barcodeint",
           regexp_replace(col("barcode"), "[#-]", "").cast(LongType())
       )
       
    

    然后我想把数据写回postgres

    df.rdd.foreachPartition(write_partition)
    

    其中write_partition只是迭代行,并使用psycopg2进行批量插入。

    我的问题是,我看到数据库上的分区查询增加了一倍。

    SELECT "receiptid","itemindex","barcode","productnumberraw","itemdescription","itemdescriptionraw","itemextendedprice","itemdiscountedextendedprice","itemquantity","barcodemanufacturer","barcodebrand","barcodecategory1","barcodecategory2","barcodecategory3","isfetchpromo","ispartnerbrand","subscribeandsave","soldby","yyyymm","retailerid","partition_id" FROM (select receiptid, itemindex, barcode, productnumberraw, itemdescription, itemdescriptionraw, itemextendedprice, itemdiscountedextendedprice, itemquantity, barcodemanufacturer, barcodebrand, barcodecategory1, barcodecategory2, barcodecategory3, isfetchpromo, ispartnerbrand, subscribeandsave, soldby, yyyymm, retailerid,
      MOD(ABS(CAST('x' || md5(receiptid) AS bit(32))::int), 16) AS partition_id from mytable) as subquery  WHERE "partition_id" >= 10 AND "partition_id" < 11  
    

    是什么导致了数据的双重读取?

    1 回复  |  直到 4 月前
        1
  •  0
  •   Zegarek    4 月前

    如果你看到它们被复制 pg_stat_activity ,其中一些可能会显示 leader_pid 指向 pid s,这意味着查询由多个工作进程处理。

    在分区表上,看到查询分布在多个工作进程之间的可能性尤其大。您将其具体缩小到单个分区的事实:

    WHERE "partition_id" >= 10 AND "partition_id" < 11
    

    不会阻止使用额外的工人。它也不会取消默认分区作为扫描目标的资格。你可以调整 asynchronous behaviour settings 为了控制:

    select name, setting, short_desc 
    from pg_settings
    where name in ( 'max_worker_processes'
                   ,'max_parallel_workers_per_gather'
                   ,'max_parallel_maintenance_workers'
                   ,'max_parallel_workers'
                   ,'parallel_leader_participation');
    
    名称 设置 short_desc
    max_并行维护_工人 2. 设置每次维护操作的最大并行进程数。
    max_paraxil_workers 8. 设置一次可以活动的并行工作线程的最大数量。
    max_parile_workers_per_gather 4. 设置每个执行器节点的最大并行进程数。
    max_worker_processs 8. 并发工作进程的最大数量。
    平行领导_参与 控制“收集”和“收集合并”是否也运行子计划。

    demo at db<>fiddle

    create table test (partition_id int, payload text)
      partition by list(partition_id);
    create table test1 partition of test for values in (1);
    create table test2 partition of test for values in (2);
    create table test_default partition of test default;
    
    select setseed(.42);
    insert into test
    select 1, md5(random()::text)
    from generate_series(1,5e5);
    

    如果我现在说一个平行 dblink 客户端查询该表并观察 pg_stat_活动 ,我也收到了两次查询,作为 pid - leader_pid 一对:

    create extension if not exists dblink;
    select dblink_connect('parallel_client','');
    select dblink_send_query('parallel_client',
      $q$ select*from test where partition_id>=1 and partition_id<2;
      $q$);
    select pid,leader_pid,query from pg_stat_activity;
    
    pid leader_pid 查询
    786 无效的 如果dblink不存在,则创建扩展;
    选择dblink_connect(“并行客户端”,“”);
    选择dblink_send_query(“并行客户端”,
    $q$从partition_id>的测试中选择*=1和partition_id<2.
    q
    从pg_stat_activity中选择pid、leaderpid、查询;
    787 无效的 从partition_id>的测试中选择*=1和partition_id<2.
    788 787 从partition_id>的测试中选择*=1和partition_id<2.

    Explain 还显示查询在2个worker上运行:

    explain(analyze,verbose,settings)
    select*from test where partition_id>=1 and partition_id<2;
    
    查询计划

    收集(成本=1000.00..8766.49行=2652行宽度=36)(实际时间=0.450..206.506行=50000个循环=1)

    输出:test.partition_id,test.payload

    计划工人:2

    员工人数:2

    ->并行追加(成本=0.00..7501.29行=1105行宽度=36)(实际时间=0.013..48.702行=166667个循环=3)

    工人0:实际时间=0.015..3538行=15360个循环=1

    工人1:实际时间=0.015..3262行=15360个循环=1

    ->在public.test1 test_1上进行并行序列扫描(成本=0.00..7474.56行=1102行宽度=36)(实际时间=0.012..35.920行=166667个循环=3)

    输出:test_1.partition_id,test_1.payload

    筛选器:((test_1.partition_id>=1)和(test_1.partition_id<2)

    工人0:实际时间=0.014..2525行=15360个循环=1

    工人1:实际时间=0.014..207行=15360个循环=1

    ->在public.test_default test_2上进行并行序列扫描(成本=0.00..21.21行=4行宽度=36)(实际时间=0.001..001行=0个循环=1)

    输出:test_2.partition_id,test_2.payload

    筛选器:((test_2.partition_id>=1)和(test_2.partition_id<2)

    设置:max_parile_workers_per_gasser=“4”,search_path=“public”

    计划时间:0.232毫秒

    执行时间:224.640毫秒

    即使您将条件更改为指向特定分区并取消默认分区的资格,您仍然可以获得多个worker:

    explain(analyze,verbose,settings)
    select*from test where partition_id=1;
    
    查询计划

    收集(成本=1000.00..8187.90行=2646行宽度=36)(实际时间=0.206..240.995行=50000个循环=1)

    输出:test.partition_id,test.payload

    计划工人:2

    员工人数:2

    ->在public.test1测试中进行并行序列扫描(成本=0.00..6923.30行=1102行宽度=36)(实际时间=0.017..3386行=166667个循环=3)

    输出:test.partition_id,test.payload

    筛选器:(test.partition_id=1)

    工人0:实际时间=0.022..2.331行=13440个循环=1

    工人1:实际时间=0.018..2389行=13440个循环=1

    设置:max_parile_workers_per_gasser=“4”,search_path=“public”

    计划时间:0.227毫秒

    执行时间:268.901毫秒

    这可以在会话甚至事务级别进行更改:

    set max_parallel_workers=0;
    set max_parallel_workers_per_gather=0;
    
    explain(analyze,verbose,settings)
    select*from test where partition_id>=1 and partition_id<2;
    
    查询计划
    附录(成本=0.00..12147.44行=2652行宽度=36)(实际时间=0.011..94.449行=50000个循环=1)

    ->对public.test1 test_1进行序列扫描(成本=0.00..12105.13行=2646行宽度=36)(实际时间=0.010..59.975行=500000个循环=1)

    输出:test_1.partition_id,test_1.payload

    筛选器:((test_1.partition_id>=1)和(test_1.partition_id<2)

    ->在public.test_default test_2上进行序列扫描(成本=0.00..29.05行=6行宽度=36)(实际时间=0.012..012行=0个循环=1)

    输出:test_2.partition_id,test_2.payload

    筛选器:((test_2.partition_id>=1)和(test_2.partition_id<2)

    设置: max_parile_workers=“0”,max_parille_workers_per_gasser=“0 ,search_path=“public”
    计划时间:0.170毫秒
    执行时间:109.770毫秒
    解释(分析、详细、设置)
    从partition_id=1的测试中选择*;
    
    查询计划
    在public.test1测试中进行序列扫描(成本=0.00..10782.11行=2646行宽度=36)(实际时间=0.025..52.193行=500000个循环=1)

    输出:test.partition_id,test.payload

    筛选器:(test.partition_id=1)

    设置: max_parile_workers=“0”,max_parille_workers_per_gasser=“0 ,search_path=“public”
    计划时间:0.073毫秒
    执行时间:67.570毫秒