代码之家  ›  专栏  ›  技术社区  ›  mb7155

用Polars模拟没有循环的多节点服务队列的惯用方法?

  •  0
  • mb7155  · 技术社区  · 1 年前

    我正在模拟一个先进先出的服务队列,该队列具有参数化数量的服务节点。我想创建一个用户定义的函数或Polars表达式(在Python中或作为Rust插件也可以),用于计算每个对象的服务的模拟开始和结束时间。是否有使用Polars表达式的惯用方法或最推荐的方法?

    我的polars DataFrame的每一行都对应于一个在特定时间到达队列的对象,一旦其服务时间开始,就需要特定分钟的服务时间。我的输入数据帧如下所示:

    import polars as pl
    df = pl.DataFrame({
        'id': [1,2,3,4],
        'servicing_time_requirement': [30, 5, 30, 5],
        'arrival_time': [0, 15, 16, 17],
    })
    df
    shape: (5, 3)
    ┌─────┬────────────────────────────┬──────────────┐
    │ id  ┆ servicing_time_requirement ┆ arrival_time │
    │ --- ┆ ---                        ┆ ---          │
    │ i64 ┆ i64                        ┆ i64          │
    ╞═════╪════════════════════════════╪══════════════╡
    │ 1   ┆ 30                         ┆ 0            │
    │ 2   ┆ 5                          ┆ 15           │
    │ 3   ┆ 30                         ┆ 16           │
    │ 4   ┆ 5                          ┆ 17           │
    └─────┴────────────────────────────┴──────────────┘
    

    例如,如果服务节点的数量=5(一次最多可以处理五个对象),则所需的输出为:

    df.pipe(simulateQueue, nodes=5)
    shape: (4, 5)
    ┌─────┬────────────────────────────┬──────────────┬──────────────────────┬────────────────────┐
    │ id  ┆ servicing_time_requirement ┆ arrival_time ┆ servicing_start_time ┆ servicing_end_time │
    │ --- ┆ ---                        ┆ ---          ┆ ---                  ┆ ---                │
    │ i64 ┆ i64                        ┆ i64          ┆ i64                  ┆ i64                │
    ╞═════╪════════════════════════════╪══════════════╪══════════════════════╪════════════════════╡
    │ 1   ┆ 30                         ┆ 0            ┆ 0                    ┆ 30                 │
    │ 2   ┆ 5                          ┆ 15           ┆ 15                   ┆ 20                 │
    │ 3   ┆ 30                         ┆ 16           ┆ 16                   ┆ 46                 │
    │ 4   ┆ 5                          ┆ 17           ┆ 17                   ┆ 22                 │
    └─────┴────────────────────────────┴──────────────┴──────────────────────┴────────────────────┘
    

    但是,如果服务节点的数量=2(一次只处理两个对象),则在本例中需要等待,因此所需的输出为:

    df.pipe(simulateQueue, nodes=2)
    shape: (4, 5)
    ┌─────┬────────────────────────────┬──────────────┬──────────────────────┬────────────────────┐
    │ id  ┆ servicing_time_requirement ┆ arrival_time ┆ servicing_start_time ┆ servicing_end_time │
    │ --- ┆ ---                        ┆ ---          ┆ ---                  ┆ ---                │
    │ i64 ┆ i64                        ┆ i64          ┆ i64                  ┆ i64                │
    ╞═════╪════════════════════════════╪══════════════╪══════════════════════╪════════════════════╡
    │ 1   ┆ 30                         ┆ 0            ┆ 0                    ┆ 30                 │
    │ 2   ┆ 5                          ┆ 15           ┆ 15                   ┆ 20                 │
    │ 3   ┆ 30                         ┆ 16           ┆ 20                   ┆ 50                 │
    │ 4   ┆ 5                          ┆ 17           ┆ 30                   ┆ 35                 │
    └─────┴────────────────────────────┴──────────────┴──────────────────────┴────────────────────┘
    

    我当前循环浏览的行 df ,计算 servicing_start_time 使用前几行的值 servicing_end_time ,但似乎必须有更有效的方法。Polars滚动表达式是我想到的一个想法,但它们似乎不太适合这个用例。

    Python中的当前实现:

    nodes = 2
    arrival_times = df.get_column("arrival_times")
    servicing_end_times = df.get_column("servicing_end_time")
    servicing_time_requirements = df.get_column("servicing_time_requirement")
    for i in range(0, servicing_end_times.len()):
        if servicing_end_times[i] is not None: continue
        next_done = servicing_end_times.filter(
            (servicing_end_times.is_not_null()) & 
            (servicing_end_times.rank(method='ordinal', descending = True).eq(nodes)))
        if next_done.len() == 0: next_done = arrival_times[i]
        else: next_done = max(next_done[0], arrival_times[i])
        servicing_end_times[i] = next_done + servicing_time_requirements[i]
    
    0 回复  |  直到 1 年前
        1
  •  1
  •   Dean MacGregor    1 年前

    您的算法需要在每一步进行逻辑检查,这并不能真正转化为polar方法。你也许可以/也许可以用 cumulative_eval 但它指出它可能是O(n^2)。相反,您可以使用numba来编译一个ufunc,这将非常快。

    运行一个需要两个(或多个)矢量输入的函数有点麻烦,但这绝对是可行的,而且你可以制作一个辅助表达式,这样更容易使用。一般来说,您必须将这些输入放入一个结构中,然后使用该结构作为输入。在创建结构时,helper函数会对字段进行别名,因为使用它们需要有它们的名称。

    第一件事是使用numba的guvectorize装饰器创建ufunc,如下所示:

    @nb.guvectorize([(nb.int64[:], nb.int64[:], nb.int64, nb.int64[:])], '(n),(n),()->(n)', nopython=True)
    def effective_start(dur, arrival, num_workers, res):
        workers = np.zeros(num_workers,dtype=np.int64)
        for i in range(len(dur)):
            next_avail = np.min(workers)
            next_worker = np.where(workers==next_avail)[0][0]
            res[i]=max(arrival[i], next_avail)
            workers[next_worker]=res[i]+dur[i]
    

    下一步是可选的,但它为ufunc创建了一个表达式包装器,您可以在 with_columns select

    def eff_start(dur, arrival, num_workers):
        if isinstance(dur, str):
            dur=pl.col(dur)       
        if isinstance(arrival, str):
           arrival=pl.col(arrival)
        return (
            pl.struct(dur.alias('___dur'), arrival.alias('___arrival'))
                .map_batches(
                    lambda x, num_workers = num_workers: (
                    effective_start(
                        x.struct.field('___dur'),
                        x.struct.field('___arrival'), 
                        num_workers)
                    )
                )
        )
    

    定义了这些函数后,您可以简单地执行

    df.with_columns(
        z=eff_start('servicing_time_requirement','arrival_time',2)
    )
    

    或者你甚至可以使用生成器一次获得所有的可能性

    (df.with_columns(
        eff_start('servicing_time_requirement','arrival_time',x).alias(f"workers={x}")
        for x in range(1,df.shape[0]+1)
    ))
    shape: (5, 8)
    ┌─────┬─────────────────┬──────────────┬───────────┬───────────┬───────────┬───────────┬───────────┐
    │ id  ┆ servicing_time_ ┆ arrival_time ┆ workers=1 ┆ workers=2 ┆ workers=3 ┆ workers=4 ┆ workers=5 │
    │ --- ┆ requirement     ┆ ---          ┆ ---       ┆ ---       ┆ ---       ┆ ---       ┆ ---       │
    │ i64 ┆ ---             ┆ i64          ┆ i64       ┆ i64       ┆ i64       ┆ i64       ┆ i64       │
    │     ┆ i64             ┆              ┆           ┆           ┆           ┆           ┆           │
    ╞═════╪═════════════════╪══════════════╪═══════════╪═══════════╪═══════════╪═══════════╪═══════════╡
    │ 1   ┆ 20              ┆ 0            ┆ 0         ┆ 0         ┆ 0         ┆ 0         ┆ 0         │
    │ 2   ┆ 3               ┆ 15           ┆ 20        ┆ 15        ┆ 15        ┆ 15        ┆ 15        │
    │ 3   ┆ 8               ┆ 16           ┆ 23        ┆ 18        ┆ 16        ┆ 16        ┆ 16        │
    │ 4   ┆ 30              ┆ 25           ┆ 31        ┆ 25        ┆ 25        ┆ 25        ┆ 25        │
    │ 5   ┆ 5               ┆ 30           ┆ 61        ┆ 30        ┆ 30        ┆ 30        ┆ 30        │
    └─────┴─────────────────┴──────────────┴───────────┴───────────┴───────────┴───────────┴───────────┘
    
    推荐文章