给定:
import pandas as pd
import numpy as np
def get_rnd_df(row:int=10, col:int=7): # generate random Sparse Pandas dataframe
np.random.seed(0)
d=np.random.randint(low=0, high=10, size=(row,col)).astype(np.float32)
d[d < 3] = np.nan
df=pd.DataFrame(data=d,
index=[f"ip{i}" for i in np.random.choice(range(max(row, 10)), row, replace=False) ],
columns=[f"col_{c}" for c in np.random.choice(range(max(col, 10)), col, replace=False) ],
dtype=pd.SparseDtype(dtype=np.float32), # sparse: memory efficient xxx but SUPER SLOW xxx
)
df.index.name='usr'
return df
def get_df_concat(dfs):
t=time.time()
dfc=pd.concat(dfs, axis=0, sort=True) # vstack dfs=[df1, df2,..., dfN], sort=True: sort columns
print(f"elapsed_time [concat]{time.time()-t:>{12}.{4}f} sec")
t=time.time()
dfc=dfc.groupby(level=0) # groupby index
print(f"elapsed_time [groupby]{time.time()-t:>{11}.{4}f} sec")
t=time.time()
dfc=dfc.sum() # <<<========== Time Consuming ==========>>>
print(f"elapsed_time [sum]{time.time()-t:>{15}.{4}f} sec")
t=time.time()
dfc=dfc.sort_index(key=lambda x: ( x.to_series().str[2:].astype(int) ))
print(f"elapsed_time [sort idx]{time.time()-t:>{10}.{4}f} sec")
return dfc
我使用助手函数应用串联
get_df_concat(dfs)
对于几个熊猫数据帧[
Sparse
]其中对列和索引进行排序。对于两个小样本Panda数据帧,它可以正常工作:
df1=get_rnd_df(row=8, col=7) # small sample random dataframe
col_0 col_6 col_4 col_2 col_5 col_3 col_8
usr
ip3 5.0 NaN 3.0 3.0 7.0 9.0 3.0
ip5 5.0 NaN 4.0 7.0 6.0 8.0 8.0
ip0 NaN 6.0 7.0 7.0 8.0 NaN 5.0
ip6 9.0 8.0 9.0 4.0 3.0 NaN 3.0
ip2 5.0 NaN NaN 3.0 8.0 NaN 3.0
ip8 3.0 3.0 7.0 NaN NaN 9.0 9.0
ip9 NaN 4.0 7.0 3.0 NaN 7.0 NaN
ip7 NaN NaN 4.0 5.0 5.0 6.0 8.0
df2=get_rnd_df(row=5, col=11) # small sample random dataframe
col_4 col_10 col_5 col_0 col_9 col_2 col_8 col_6 col_3 col_7 col_1
usr
ip3 5.0 NaN 3.0 3.0 7.0 9.0 3.0 5.0 NaN 4.0 7.0
ip5 6.0 8.0 8.0 NaN 6.0 7.0 7.0 8.0 NaN 5.0 9.0
ip0 8.0 9.0 4.0 3.0 NaN 3.0 5.0 NaN NaN 3.0 8.0
ip6 NaN 3.0 3.0 3.0 7.0 NaN NaN 9.0 9.0 NaN 4.0
ip2 7.0 3.0 NaN 7.0 NaN NaN NaN 4.0 5.0 5.0 6.0
%%time
df_concat=get_df_concat(dfs=[df1, df2])
col_0 col_1 col_2 col_3 col_4 col_5 col_6 col_7 col_8 col_9 col_10
usr
ip0 3.0 8.0 10.0 0.0 15.0 12.0 6.0 3.0 10.0 0.0 9.0
ip2 12.0 6.0 3.0 5.0 7.0 8.0 4.0 5.0 3.0 0.0 3.0
ip3 8.0 7.0 12.0 9.0 8.0 10.0 5.0 4.0 6.0 7.0 0.0
ip5 5.0 9.0 14.0 8.0 10.0 14.0 8.0 5.0 15.0 6.0 8.0
ip6 12.0 4.0 4.0 9.0 9.0 6.0 17.0 0.0 3.0 7.0 3.0
ip7 0.0 0.0 5.0 6.0 4.0 5.0 0.0 0.0 8.0 0.0 0.0
ip8 3.0 0.0 0.0 9.0 7.0 0.0 3.0 0.0 9.0 0.0 0.0
ip9 0.0 0.0 3.0 7.0 7.0 0.0 4.0 0.0 0.0 0.0 0.0
问题是我的实际数据太大了,需要很多小时才能完成连接
sum()
方法,特别是在给定足够的可用内存的情况下:
df1=get_rnd_df(row=int(7e+5), col=int(2e+8)) # resembles my real data
df2=get_rnd_df(row=int(9e+6), col=int(1e+9)) # resembles my real data
%%time
df_concat=get_df_concat(dfs=[df1, df2]) # SUPER SLOW & time-consuming!!!
有没有更好的替代方案可以更有效地实现这种连接?我想知道是否会有SciPy
csr_matrix
帮助我更快地实现?
PS。我利用了
pd.SparseDtype("float32", fill_value=np.nan)
以确定它是否适合我的可用内存。
更新
感谢@RomanPerekhrest,我现在优化了连接,特别是
sum()
方法使用
numba
Pandas的发动机:
def get_df_concat_optimized(dfs):
dfc=pd.concat(dfs, axis=0, sort=True).astype(pd.SparseDtype(dtype=np.float32)) # dfs=[df1, df2,..., dfN], sort=True: sort columns
dfc=dfc.groupby(level=0) # groupby index
dfc=dfc.sum(engine="numba", # <<=== saves time using NUMBA engine!
engine_kwargs={'nopython': True, 'parallel': True, 'nogil': False},
).astype(pd.SparseDtype(dtype=np.float32, fill_value=0.0,))
dfc=dfc.sort_index(key=lambda x: ( x.to_series().str[2:].astype(int) )).astype(pd.SparseDtype(dtype=np.float32, fill_value=0.0))
return dfc
这里是一些相当大的随机稀疏数据帧的时间比较,使用我的低效
get_df_conat(dfs)
方法和优化的`get_df_contact_optimized(dfs)方法:
df1=get_rnd_df(row=int(6e2), col=int(9e2))
df2=get_rnd_df(row=int(2e2), col=int(7e2))
%%time
df_concat_opt=get_df_concat_optimized(dfs=[df1, df2])
CPU times: user 2.32 s, sys: 11.6 ms, total: 2.33 s
Wall time: 2.47 s
%%time
df_concat=get_df_concat(dfs=[df1, df2])
elapsed_time [concat] 0.2443 sec
elapsed_time [groupby] 0.0008 sec
elapsed_time [sum] 67.2486 sec <<< time consuming >>>
elapsed_time [sort idx] 0.2136 sec
CPU times: user 1min 6s, sys: 721 ms, total: 1min 7s
Wall time: 1min 7s