代码之家  ›  专栏  ›  技术社区  ›  lo tolmencre

当Dask诊断报告它已完成时,Dask进程怎么可能没有完成?

  •  2
  • lo tolmencre  · 技术社区  · 6 年前

    complevel=9 complib='blosc:lz4' . 它是用英文写的 format='table' . 文件在磁盘上是1.1千兆字节。

    我提到这一切是因为我不知道这是否与我的问题有关。

    here

    我最终成功地用Dask构建了一些代码,这些代码似乎可以在合理的时间内完成这项工作(如果这里的问题可以解决,我将用这些代码回答我自己的问题)。不过,有一个大问题。代码不终止。它运行良好,直到 compute() 打电话。它也完成了 print(f'{time_to_display()}: dropped rows with low frequency words')

    现在。。。我真不明白为什么会这样。事实上,我几乎是在把头发拔出来。你能解释一下为什么 计算() -呼叫显示为已完成,但未完成?

    import time
    import dask.dataframe as ddf
    import dask
    import numpy as np
    import pandas as pd
    
    
    def time_to_display():
        return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    
    
    
    hdfstore_filename =  '<the HDF file>'
    threshold = 100
    
    print(f'{time_to_display()}: filtering data for low frequency words')
    print(f'file = {hdfstore_filename}.h5')
    
    store = pd.HDFStore(f'{hdfstore_filename}.h5')
    df = store.select(key='/ngrams')
    
    print(f'{time_to_display()}: data loaded')
    
    unique, counts = np.unique(df.values.ravel(), return_counts=True)
    print(f'{time_to_display()}: gathered frequencies of words [1]')
    
    d = dict(zip(unique, counts))
    print(f'{time_to_display()}: gathered frequencies of words [2]')
    
    to_remove = [k for k, v in d.items() if v < threshold]
    print(f'{time_to_display()}: gathered indices of values to remove')
    
    
    df_dask = ddf.from_pandas(df, chunksize=1000000) # results in 60 chunks for this data
    print(f'{time_to_display()}: df_dask created')
    
    mask = df_dask.isin(to_remove)
    print(f'{time_to_display()}: mask created')
    
    column_mask = (~mask).all(axis=1)
    print(f'{time_to_display()}: column_mask created')
    
    df_dask = df_dask[column_mask]
    print(f'{time_to_display()}: df_dask filtered')
    
    df_dask.visualize(filename='log/df_dask', format='pdf')
    print(f'{time_to_display()}: computation graph rendered')
    
    from dask.diagnostics import ProgressBar
    
    with ProgressBar():
        df_out = dask.compute(df_dask)[0]
    print(f'{time_to_display()}: dropped rows with low frequency words')
    
    df_out.to_hdf(f'{hdfstore_filename}_filtered_complete.h5', 'ngrams', complevel=9,
              complib='blosc:lz4', format='table')
    print(f'{time_to_display()}: store written')
    
    store.close()
    

    2018-12-04 22:35:46: filtering data for low frequency words
    file = data/5grams_wiki_00/5grams_wiki_00_cat_final.h5
    2018-12-04 22:36:31: data loaded
    2018-12-04 22:45:33: gathered frequencies of words [1]
    2018-12-04 22:45:34: gathered frequencies of words [2]
    2018-12-04 22:45:35: gathered indices of values to remove
    2018-12-04 22:45:52: df_dask created
    2018-12-04 22:46:12: mask created
    2018-12-04 22:46:12: column_mask created
    2018-12-04 22:46:12: df_dask filtered
    2018-12-04 22:46:13: computation graph rendered
    [########################################] | 100% Completed | 12min  2.4s
    

    我让程序再运行30分钟,但没有新消息出现。

    我现在用同一个数据集的一个版本运行这个程序,但有一个区别:所有列都有自己独特的分类编码。这一次,进度条报告完成和下一条正在打印的进度消息之间的时间差“只有”15分钟。所以关键是分类编码。知道为什么吗?

    2018-12-05 13:00:46: filtering data for low frequency words
    file = data/5grams_wiki_00/5grams_wiki_00_cat.h5
    2018-12-05 13:01:34: data loaded
    2018-12-05 13:11:00: gathered frequencies of words [1]
    2018-12-05 13:11:01: gathered frequencies of words [2]
    2018-12-05 13:11:02: gathered indices of values to remove
    2018-12-05 13:11:19: df_dask created
    2018-12-05 13:11:31: mask created
    2018-12-05 13:11:31: column_mask created
    2018-12-05 13:11:31: df_dask filtered
    2018-12-05 13:11:31: computation graph rendered
    [########################################] | 100% Completed |  5min 52.3s
    2018-12-05 13:28:49: dropped rows with low frequency words
    2018-12-05 13:31:05: store written
    
    0 回复  |  直到 6 年前