代码之家  ›  专栏  ›  技术社区  ›  rxmnnxfpvg

中间DASK计算批处理结果

  •  1
  • rxmnnxfpvg  · 技术社区  · 7 年前

    我有一个大的(10秒的GB)csv文件,我想载入其中 dask 对每一行执行一些计算。我还想将被操纵的csv的结果写入bigquery,但是最好是将网络请求批处理到bigquery,每组10000行,这样我就不会产生每行的网络开销。

    我一直在看 dask delayed 您可以创建一个任意的计算图,但我不确定这是否是正确的方法:如何根据某个组大小(或者可能经过的时间)收集和触发中间计算。有人能提供一个简单的例子吗?简单来说,我们有以下功能:

    def change_row(r):
        # Takes 10ms
        r = some_computation(r)
        return r
    
    def send_to_bigquery(rows): 
        # Ideally, in large-ish groups, say 10,000 rows at a time
        make_network_request(rows)
    
    # And here's how I'd use it
    import dask.dataframe as dd
    df = dd.read_csv('my_large_dataset.csv') # 20 GB
    # run change_row(r) for each r in df
    # run send_to_big_query(rows) for each appropriate size group based on change_row(r)
    

    谢谢!

    1 回复  |  直到 7 年前
        1
  •  1
  •   mdurant    7 年前

    最简单的方法是提供一个块大小参数 read_csv ,这将使您获得每个块大约正确的行数。你可能需要测量你的一些数据或实验来纠正这一点。

    您任务的其余部分将与任何其他“对数据帧块执行此常规操作”的工作方式相同:“map_partitions”方法( docs )。

    def alter_and_send(df):
        rows = [change_row(r) for r in df.iterrows()]
        send_to_big_query(rows)
        return df
    
    df.map_partitions(alter_and_send)
    

    基本上,您在逻辑DASK数据帧的每一块上运行函数,这是真正的熊猫数据帧。 您实际上可能需要在函数中使用map、apply或其他数据帧方法。

    这是一种方法-你不需要地图的“输出”,你可以使用 to_delayed() 相反。