代码之家  ›  专栏  ›  技术社区  ›  sdgfsdh

在F#中一次异步一个?

f#
  •  0
  • sdgfsdh  · 技术社区  · 11 月前

    包装的好方法是什么 Async<'t> 一次只能跑一个?

    我已经想出了一个解决方案,使用 MailboxProcessor 尽管我想知道是否有可能实现更轻量级的机制。

    open System
    open System.Threading.Tasks
    
    let oneAtATime (workflow : Async<'t>) =
      let rec logic (taskCache : Task<'t> option) (inbox : MailboxProcessor<AsyncReplyChannel<Task<'t>>>) =
        async {
          let! channel = inbox.Receive()
    
          match taskCache with
          | Some task when not task.IsCompleted ->
            channel.Reply(task)
    
            return! logic taskCache inbox
          | _ ->
            let task = Async.StartAsTask workflow
    
            channel.Reply(task)
    
            return! logic (Some task) inbox
        }
    
      let actor = MailboxProcessor.Start(logic None)
    
      async {
        let! t = actor.PostAndAsyncReply(id)
    
        return! Async.AwaitTask t
      }
    
    0 回复  |  直到 11 月前
        1
  •  1
  •   Ruben Bartelink    11 月前

    喜欢 this :

    type System.Threading.SemaphoreSlim with
    
        /// Wait for capacity to be available. Returns false if timeout elapsed before this as achieved
        member semaphore.Await(timeout: TimeSpan): Async<bool> = async {
            let! ct = Async.CancellationToken
            return! semaphore.WaitAsync(timeout, ct) |> Async.AwaitTaskCorrect
        }
    
        /// Wait indefinitely for capacity to be available on the semaphore
        member semaphore.Await(): Async<unit> = async {
            let! ct = Async.CancellationToken
            return! semaphore.WaitAsync(ct) |> Async.AwaitTaskCorrect
        }
    
        /// Throttling wrapper that waits asynchronously until the semaphore has available capacity
        member semaphore.Throttle(workflow: Async<'T>): Async<'T> = async {
            do! semaphore.Await()
            try return! workflow
            finally semaphore.Release() |> ignore
    

    如果你想要的DOP是1,那么你:

    let limiter = new System.Threading.SemaphoreSlim(1)
    Seq.replicate 5 asyncThing |> Seq.map limiter.Throttle |> Async.Parallel |> Async.RunSynchronously
    

    ^the asyncThing 体执行将被序列化,受限于任何执行都将被获取/释放信号量括起来的事实。如果你只需要一个,当然可以用比信号量更轻的权重锁原语来做同样的事情。

    (当然,你也可以通过 Seq.replicate 5 asyncThing |> Async.Sequential |> Async.RunSynchronously ,但我相信这种限制就是你想要的)

    (取决于你是什么 真正地 做,还有一个 TaskCell 在里面 Equinox.Core (参见使用示例 in the tests ))

        2
  •  0
  •   CoopC    11 月前

    我认为你可以使用for循环:

    async {
        for item in 1..10 do
            do! Async.Sleep 1000
            printfn $"item: {item}"
    }