代码之家  ›  专栏  ›  技术社区  ›  rumdrums

可观察的ReactiveX批处理结果

  •  0
  • rumdrums  · 技术社区  · 8 年前

    rx.Observable.from_iterable([[1],[],[2],[3],[],[],[4],[5,6],[7],[8,9],[10]])

    我希望最终能够将整数批处理到长度为5的列表中,并能够将其传递给函数,所以类似这样:

    batch_function([1,2,3,4,5])
    batch_function([6,7,8,9,10])
    

    实际上,传入的数据将是有限的(可能是空的)列表流。我只是想确保我以后的电话 batch_function 直到我累积了5个实际值才生成。谢谢你的帮助。

    1 回复  |  直到 3 年前
        1
  •  0
  •   rumdrums    8 年前

    以下代码段适用于我使用的 buffer_with_count . 不过,我不确定是否有更省钱的方法,也就是说,没有 flat_map

    BUFFER_COUNT=5
    rx.Observable.from_iterable(iter(get_items())) \
      .flat_map(lambda it: it) \
      .buffer_with_count(BUFFER_COUNT) \
      .map(lambda my_array: do_something_with(my_array)) \
      .subscribe(lambda it: print(it))