代码之家  ›  专栏  ›  技术社区  ›  IInspectable

防止代码段在协同程序中同时执行

  •  1
  • IInspectable  · 技术社区  · 8 年前

    我需要保护代码的一部分不在协程中并发执行。在多线程环境中防止并发执行是使用 std::lock_guard 类模板。但是,我的协程是从一个线程调用的,因此该解决方案不适用。

    以下是我要完成的(伪)代码:

    future<http_response> send_req_async(http_request req) {
        while (true) {
            // Attempt to send an HTTP request
            auto const& access_token{ token_store::access_token() };
            auto const response{ co_await impl::send_req_async(req, access_token) };
            if (response.status_code() == http_code::ok) {
                co_return response;
            }
    
            // Attempt to refresh access token
            if (response.status_code() == http_code::unauthorized) {
                // The following scope needs to be guarded against concurrent execution.
                // To guard against concurrent execution from multiple threads I would use:
                // lock_guard<mutex> guard(refresh_token_mutex);
                if (access_token != token_store::access_token()) {
                    continue;
                }
                auto const& token{ co_await refresh_token(token_store::refresh_token()) };
                token_store::save_access_token(token);
                // End of section that needs to be guarded.
            }
        }
    }
    

    该代码旨在允许并行发出多个请求,同时只允许尝试刷新过期的访问令牌的单个协程调用。理想情况下,当令牌刷新操作正在运行时,解决方案应该挂起并发协程调用,然后自动恢复它(即 std::lock_guard 在多线程环境中)。

    在协同程序机器或C++标准库中有什么东西可以让我以一种干净的方式实现这一点,还是我必须自己滚动?


    注意:我使用VisualStudio 2017的7.7.2,所以可以完全支持C++ 17,再加上它的协同程序TS实现。

    1 回复  |  直到 8 年前
        1
  •  1
  •   IInspectable    8 年前

    没有C++或标准库提供的基础设施来获得所需的功能。但是, Coroutine TS 提供用于实现 co_await -能够异步互斥。

    一般的想法是实现一个waitable,在评估 await_suspend 表达式。如果无法获取锁,协程将被挂起并添加到等待队列中,否则执行将立即继续(锁被保持)。

    unlock 方法从队列中恢复等待者,除非等待者队列为空。

    网络上有预先构建的解决方案。我和刘易斯·贝克一起去的 async_mutex 实施的原因有很多:

    • 没有内部依赖项的外部。只需将编译单元和头文件放到项目中,就完成了。
    • 锁属于协同程序,而不是线程。实现允许协同程序在不同的线程上恢复。
    • 这是一个无锁的实现。

    此实现的使用与 std::lock_guard :

    #include <cppcoro/async_mutex.hpp>
    
    namespace {
        cppcoro::async_mutex refresh_mutex;
    }
    
    future<http_response> send_req_async(http_request req) {
        while (true) {
            // Attempt to send an HTTP request
            auto const& access_token{ token_store::access_token() };
            auto const response{ co_await impl::send_req_async(req, access_token) };
            if (response.status_code() == http_code::ok) {
                co_return response;
            }
    
            // Attempt to refresh access token
            if (response.status_code() == http_code::unauthorized) {
                // The following scope needs to be guarded against concurrent execution.
                auto const refresh_guard{ co_await refresh_mutex.scoped_lock_async() };
                if (access_token != token_store::access_token()) {
                    continue;
                }
                auto const& token{ co_await refresh_token(token_store::refresh_token()) };
                token_store::save_access_token(token);
                // refresh_guard falls out of scope, unlocking the mutex.
                // If there are any suspended coroutines, the oldest one gets resumed.
            }
        }
    }