代码之家  ›  专栏  ›  技术社区  ›  DougR

时间序列抽取基准:Dask与Vaex

  •  4
  • DougR  · 技术社区  · 7 年前

    Vaex (读取HDF文件)和Dask(读取拼花文件),并将其保留在核心内存之外。

    更新3(我已删除以前的更新):

    Dask在第一次运行时比Vaex快30%,但在重复运行时比Vaex快4.5倍。我相信Vaex是通过内存映射来加速的。在Dask中有没有办法提高重复运行的执行时间?

    首先,创建一些随机数据并生成一些文件,警告:这将生成1.5GB的数据。

    import numpy as np
    import vaex as vx
    import pandas as pd
    import dask.dataframe as dd
    import os
    
    #cwd = os.getcwd() # Change this to your directory for path to save hdf and parquet files 
    cwd = r'F:\temp\DaskVaexx' # Write files to this directory.  Use a fast SSD for fast read calculations in Dask/Vaex
    
    ### Create random data
    size = 20000000 # number of rows
    scale = 1.
    scaleLocal = 20
    np.random.seed(0)
    x_data = np.arange(size)
    y_data = np.cumsum(np.random.randn(size)  * scale) + np.random.randn(size) * scaleLocal
    
    np.random.seed(1)
    scaleLocal2 = 3
    y_data2 = np.cumsum(np.random.randn(size)  * scale) + np.random.randn(size) * scaleLocal2
    df = pd.DataFrame({'t': x_data.astype(np.float32),'Channel1' : y_data.astype(np.float32),'Channel2' : y_data2.astype(np.float32)})
    # df
    
    #Create Dask dataframe
    dask_df = dd.from_pandas(df, npartitions=1)
    
    # Creat a Vaex dataset from pandas and then export to hdf5
    dataVX = vx.from_pandas(df)
    dataVX.export_hdf5(os.path.join(cwd, 'randomData.hdf'))
    
    # Create a parquet folder and files from dask dataframe
    dask_df.to_parquet(os.path.join(cwd, 'randomData.parquet'))
    
    # Create a hdf file from dask dataframe
    #dask_df.to_hdf(os.path.join(cwd, 'randomDataDask.hdf'), '/data')
    

    现在执行Vaex和Dask处理:

    import dask.dataframe as dd
    import dask.array as da
    import vaex as vx
    import dask
    import time
    import os
    import numpy as np
    import pandas as pd
    
    #
    
    bins = 1000
    minLimit = 0
    maxLimit = 1000000
    timeArrayName = 't'
    column = 'Channel1'
    
    # filePath = os.getcwd() # location of hdf and parquet data
    filePath = r'F:\temp\DaskVaexx' # location of hdf and parquet data
    
    # ------------------------------
    # Vaex code
    
    startTime = time.time()
    
    dataVX = vx.open(os.path.join(filePath,r'randomData.hdf'))
    
    #Calculate the min & max of a columnar dataset for each bin
    minMaxVaexOutputArray = dataVX.minmax(column, binby=[timeArrayName],  shape=(bins,), limits=[minLimit,maxLimit])
    
    VaexResults_df = pd.DataFrame(data = minMaxVaexOutputArray, columns = ['min','max'])
    
    #Calculate the mean of a columnar dataset for each bin
    VaexResults_df['mean'] = dataVX.mean(column, binby=[timeArrayName],  shape=(bins,), limits=[minLimit, maxLimit]) 
    
    print('Vaex hdf computation time: ' + str(time.time() - startTime))
    
    # dataVX.close_files() # option to close down the opened Vaex dataset
    # ------------------------------
    
    # ------------------------------
    # Dask computation
    startTime = time.time()
    
    # Read parquet file or folder of files
    parquet_dd = dd.read_parquet(os.path.join(filePath,r'randomData.parquet'))  
    
    # Create a virtual column which assigns integers to the time signal according to its assigned bin
    parquet_dd['timeGroups'] = parquet_dd[timeArrayName].where((parquet_dd[timeArrayName]>=minLimit) & (parquet_dd[timeArrayName]<maxLimit))   // ((maxLimit - minLimit ) / bins)
    
    # Groupby using the virtual column
    df3 = parquet_dd[column].groupby(parquet_dd['timeGroups']).aggregate(['min', 'max', 'mean'])
    
    #Execute Dask and return results to a Pandas Dataframe
    DaskResults_df = dask.compute(df3)[0]
    
    print('Dask with parquet computation time: ' + str(time.time() - startTime))
    
    0 回复  |  直到 6 年前