代码之家  ›  专栏  ›  技术社区  ›  doplano

针对超大稀疏数据优化Pandas Dataframe concat方法

  •  0
  • doplano  · 技术社区  · 1 年前

    给定:

    import pandas as pd
    import numpy as np
    
    def get_rnd_df(row:int=10, col:int=7): # generate random Sparse Pandas dataframe
        np.random.seed(0)
        d=np.random.randint(low=0, high=10, size=(row,col)).astype(np.float32)
        d[d < 3] = np.nan
        df=pd.DataFrame(data=d,
                        index=[f"ip{i}" for i in np.random.choice(range(max(row, 10)), row, replace=False) ],
                        columns=[f"col_{c}" for c in np.random.choice(range(max(col, 10)), col, replace=False) ],
                        dtype=pd.SparseDtype(dtype=np.float32), # sparse: memory efficient xxx but SUPER SLOW xxx
                    )
        df.index.name='usr'
        return df
        
    def get_df_concat(dfs):
        t=time.time()
        dfc=pd.concat(dfs, axis=0, sort=True) # vstack dfs=[df1, df2,..., dfN], sort=True: sort columns
        print(f"elapsed_time [concat]{time.time()-t:>{12}.{4}f} sec")
    
        t=time.time()
        dfc=dfc.groupby(level=0) # groupby index
        print(f"elapsed_time [groupby]{time.time()-t:>{11}.{4}f} sec")
    
        t=time.time()
        dfc=dfc.sum() # <<<========== Time Consuming ==========>>> 
        print(f"elapsed_time [sum]{time.time()-t:>{15}.{4}f} sec")
    
        t=time.time()
        dfc=dfc.sort_index(key=lambda x: ( x.to_series().str[2:].astype(int) ))
        print(f"elapsed_time [sort idx]{time.time()-t:>{10}.{4}f} sec")
    
        return dfc
    

    我使用助手函数应用串联 get_df_concat(dfs) 对于几个熊猫数据帧[ Sparse ]其中对列和索引进行排序。对于两个小样本Panda数据帧,它可以正常工作:

    df1=get_rnd_df(row=8, col=7) # small sample random dataframe
            col_0   col_6   col_4   col_2   col_5   col_3   col_8
    usr                             
    ip3     5.0     NaN     3.0     3.0     7.0     9.0     3.0
    ip5     5.0     NaN     4.0     7.0     6.0     8.0     8.0
    ip0     NaN     6.0     7.0     7.0     8.0     NaN     5.0
    ip6     9.0     8.0     9.0     4.0     3.0     NaN     3.0
    ip2     5.0     NaN     NaN     3.0     8.0     NaN     3.0
    ip8     3.0     3.0     7.0     NaN     NaN     9.0     9.0
    ip9     NaN     4.0     7.0     3.0     NaN     7.0     NaN
    ip7     NaN     NaN     4.0     5.0     5.0     6.0     8.0
    
    df2=get_rnd_df(row=5, col=11) # small sample random dataframe
            col_4   col_10  col_5   col_0   col_9   col_2   col_8   col_6   col_3   col_7   col_1
    usr                                             
    ip3     5.0     NaN     3.0     3.0     7.0     9.0     3.0     5.0     NaN     4.0     7.0
    ip5     6.0     8.0     8.0     NaN     6.0     7.0     7.0     8.0     NaN     5.0     9.0
    ip0     8.0     9.0     4.0     3.0     NaN     3.0     5.0     NaN     NaN     3.0     8.0
    ip6     NaN     3.0     3.0     3.0     7.0     NaN     NaN     9.0     9.0     NaN     4.0
    ip2     7.0     3.0     NaN     7.0     NaN     NaN     NaN     4.0     5.0     5.0     6.0
    
    %%time
    df_concat=get_df_concat(dfs=[df1, df2])
            col_0   col_1   col_2   col_3   col_4   col_5   col_6   col_7   col_8   col_9   col_10
    usr                                             
    ip0     3.0     8.0     10.0    0.0     15.0    12.0    6.0     3.0     10.0    0.0     9.0
    ip2     12.0    6.0     3.0     5.0     7.0     8.0     4.0     5.0     3.0     0.0     3.0
    ip3     8.0     7.0     12.0    9.0     8.0     10.0    5.0     4.0     6.0     7.0     0.0
    ip5     5.0     9.0     14.0    8.0     10.0    14.0    8.0     5.0     15.0    6.0     8.0
    ip6     12.0    4.0     4.0     9.0     9.0     6.0     17.0    0.0     3.0     7.0     3.0
    ip7     0.0     0.0     5.0     6.0     4.0     5.0     0.0     0.0     8.0     0.0     0.0
    ip8     3.0     0.0     0.0     9.0     7.0     0.0     3.0     0.0     9.0     0.0     0.0
    ip9     0.0     0.0     3.0     7.0     7.0     0.0     4.0     0.0     0.0     0.0     0.0
    

    问题是我的实际数据太大了,需要很多小时才能完成连接 sum() 方法,特别是在给定足够的可用内存的情况下:

    df1=get_rnd_df(row=int(7e+5), col=int(2e+8)) # resembles my real data
    df2=get_rnd_df(row=int(9e+6), col=int(1e+9)) # resembles my real data
    
    %%time
    df_concat=get_df_concat(dfs=[df1, df2]) # SUPER SLOW & time-consuming!!!
    

    有没有更好的替代方案可以更有效地实现这种连接?我想知道是否会有SciPy csr_matrix 帮助我更快地实现?

    PS。我利用了 pd.SparseDtype("float32", fill_value=np.nan) 以确定它是否适合我的可用内存。

    更新

    感谢@RomanPerekhrest,我现在优化了连接,特别是 sum() 方法使用 numba Pandas的发动机:

    def get_df_concat_optimized(dfs):
        dfc=pd.concat(dfs, axis=0, sort=True).astype(pd.SparseDtype(dtype=np.float32)) # dfs=[df1, df2,..., dfN], sort=True: sort columns
        dfc=dfc.groupby(level=0) # groupby index
        dfc=dfc.sum(engine="numba", # <<=== saves time using NUMBA engine!
                    engine_kwargs={'nopython': True, 'parallel': True, 'nogil': False},
                    ).astype(pd.SparseDtype(dtype=np.float32, fill_value=0.0,))
        dfc=dfc.sort_index(key=lambda x: ( x.to_series().str[2:].astype(int) )).astype(pd.SparseDtype(dtype=np.float32, fill_value=0.0))
        return dfc
    

    这里是一些相当大的随机稀疏数据帧的时间比较,使用我的低效 get_df_conat(dfs) 方法和优化的`get_df_contact_optimized(dfs)方法:

    df1=get_rnd_df(row=int(6e2), col=int(9e2))
    df2=get_rnd_df(row=int(2e2), col=int(7e2))
    
    %%time
    df_concat_opt=get_df_concat_optimized(dfs=[df1, df2])
    CPU times: user 2.32 s, sys: 11.6 ms, total: 2.33 s
    Wall time: 2.47 s
    
    %%time
    df_concat=get_df_concat(dfs=[df1, df2])
    elapsed_time [concat]      0.2443 sec
    elapsed_time [groupby]     0.0008 sec
    elapsed_time [sum]        67.2486 sec <<< time consuming >>>
    elapsed_time [sort idx]    0.2136 sec
    CPU times: user 1min 6s, sys: 721 ms, total: 1min 7s
    Wall time: 1min 7s
    
    0 回复  |  直到 1 年前
        1
  •  1
  •   RomanPerekhrest    1 年前

    优化要点:

    • 一个小的:更喜欢 numpy.random.Generator ,这是推荐的,总的来说,我注意到它的工作速度比 np.random.<func>
    • 对数字索引/列标签进行操作。然后,立即(矢量化地)在末尾添加所需的前缀。这将总体上加快处理速度。
    • 回顾前一点,取消列的自定义排序,因为它们可以在具有设置的串联阶段正确排序 sort 参数: pd.concat(dfs, axis=0, sort=True)
    • 消除索引排序 sort_index pd.DataFrame.groupby 默认情况下将对组密钥进行排序
    • 的提升性能 groupby.DataFrameGroupBy.sum 通过选择 numba (作为 engine )用于具有并行计算的JIT编译代码( {'nopython': True, 'parallel': False} )

    我在两个数据帧上进行了测试,每个数据帧的大小都是( row=int(1e+3), col=int(2e+3) ),在我的机器上,前一个/初始解决方案在144秒内完成。
    而优化的方法运行了大约 8. sec(因为Numba将有一些函数编译开销)。但所有后续呼叫都在2.6秒内运行,即 55倍 加速。

    完整的优化版本:

    def get_rnd_df(row=10, col=7):
        rng = np.random.default_rng(0)
        a = rng.integers(low=0, high=10, size=(row, col)).astype("float32")
        a[a < 3] = np.nan
    
        df = pd.DataFrame(data=a,
                          index=rng.choice(range(max(row, 10)), row, replace=False),
                          columns=rng.choice(range(max(col, 10)), col, replace=False),
                          dtype=pd.SparseDtype("float32", fill_value=np.nan),
                          )
    
        df.index.name = 'usr'
        return df
    
    def get_df_concat(dfs):
        df = pd.concat(dfs, axis=0, sort=True)  # dfs=[df1, df2, df3, ..., dfN]
        df = df.groupby(level=0).sum(engine="numba",
                                     engine_kwargs={'nopython': True,
                                                    'parallel': True})
    
        df.index = 'ip' + df.index.astype(str)
        df.columns = 'col_' + df.columns.astype(str)
    
        return df
    
    t0 = time.time()
    
    df1=get_rnd_df(row=int(1e+3), col=int(2e+3)) # resembles my real data
    df2=get_rnd_df(row=int(1e+3), col=int(2e+3)) # resembles my real data
    df_concat=get_df_concat(dfs=[df1, df2])
    
    print('time spent: ', time.time() - t0)
    

    time spent:  2.636758327484131