代码之家  ›  专栏  ›  技术社区  ›  Wad

IO完成端口:单独的线程池来处理出列的数据包?

  •  0
  • Wad  · 技术社区  · 7 年前

    注意事项 :我在这里添加了C++标记,因为a)代码是C++,b)使用C++的人可能已经使用了IO完成端口。所以请不要大喊大叫。


    我正在使用IO完成端口,并最终在 RbMm -的含义 NumberOfConcurrentThreads 内的参数 CreateIoCompletionPort()

    我有下面的小程序,它创建了10个线程,所有线程都在完成端口上等待。我告诉我的完成端口一次只允许运行4个线程(我有四个CPU)。然后,我将8个数据包排队到端口。如果My thread函数将ID为gt;的数据包出列,则会输出一条消息;4.为了输出此消息,我必须停止当前运行的四个线程中的至少一个线程,这是在控制台输入“1”时发生的。

    现在这是所有相当简单的代码。然而,我有一个很大的担忧,那就是 如果处理完成数据包的所有线程都陷入困境,这将意味着不再有数据包可以出列和处理 这就是我用无限循环模拟的 -在控制台输入“1”之前,不会有更多的数据包出列,这一事实突显了这一潜在问题!

    一个更好的解决方案是不让我的四个线程将数据包(或CPU的线程数)出列,然后在其中一个线程出列时,将数据包的处理转移到一个工作线程 从另一个泳池 ,从而消除IOCP中的所有线程陷入困境从而不再有数据包出列的风险?

    我这样问是因为 全部的 我所看到的IO完成端口代码示例使用了一种类似于下面所示的方法, 我建议使用一个单独的线程池。这就是为什么我会这样想 我错过了一些东西,因为我寡不敌众!

    注意:这是一个有点做作的例子,因为 Windows will allow 如果其中一个可运行线程进入等待状态,则要退出队列的额外数据包;我在代码中用注释掉的 cout 电话:

    系统还允许线程在GetQueuedCompletionStatus中等待 如果另一个正在运行的线程关联 对于相同的I/O完成端口,其他端口进入等待状态 原因,例如SuspendThread函数。当螺纹插入时 等待状态再次开始运行,可能会有一段短暂的时间 活动线程数超过了并发值。 然而 系统通过不允许任何新的活动 直到活动线程数低于并发数 价值

    但我不会打电话的 SuspendThread 在我的线程函数中 我不知道除了 库特 将导致线程进入等待状态 ,因此我无法预测我的一个或多个线程是否会陷入困境!因此我想到了线程池;至少上下文切换意味着其他数据包有机会退出队列!

    #define _CRT_SECURE_NO_WARNINGS
    #include <windows.h>
    #include <thread>
    #include <vector>
    #include <algorithm>
    #include <atomic>
    #include <ctime>
    #include <iostream>
    
    using namespace std;
    
    int main()
    {
        HANDLE hCompletionPort1;
        if ((hCompletionPort1 = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4)) == NULL)
        {
            return -1;
        }
        vector<thread> vecAllThreads;
        atomic_bool bStop(false);
    
        // Fill our vector with 10 threads, each of which waits on our IOCP.
        generate_n(back_inserter(vecAllThreads), 10, [hCompletionPort1, &bStop] {
            thread t([hCompletionPort1, &bStop]()
            {
                // Thread body
                while (true)
                {
                    DWORD dwBytes = 0;
                    LPOVERLAPPED pOverlapped = 0;
                    ULONG_PTR uKey;
                    if (::GetQueuedCompletionStatus(hCompletionPort1, &dwBytes, &uKey, &pOverlapped, INFINITE) == 1)
                    {
                        if (dwBytes == 0 && uKey == 0 && pOverlapped == 0)
                            break;  // Special completion packet; end processing.
    
                        //cout << uKey; // EVEN THIS WILL CAUSE A "wait" which causes MORE THAN 4 THREADS TO ENTER!
    
                        if (uKey >4) 
                            cout << "Started processing packet ID > 4!" << endl;
                        while (!bStop)
                            ;   // INFINITE LOOP
                    }
                }
            });
            return move(t);
        }
        );
    
        // Queue 8 completion packets to our IOCP...only four will be processed until we set our bool
        for (int i = 1; i <= 8; ++i)
        {
            PostQueuedCompletionStatus(hCompletionPort1, 0, i, new OVERLAPPED);
        }
    
        while (!bStop)
        {
            int nVal;
            cout << "Enter 1 to cause current processing threads to end: ";
            cin >> nVal;
            bStop = (nVal == 1);
        }
        for (int i = 0; i < 10; ++i)    // Tell all 10 threads to stop processing on the IOCP
        {
            PostQueuedCompletionStatus(hCompletionPort1, 0, 0, 0);  // Special packet marking end of IOCP usage
        }
        for_each(begin(vecAllThreads), end(vecAllThreads), mem_fn(&thread::join));
    
        return 0;
    }
    


    编辑#1

    我所说的“独立线程池”的意思如下:

    class myThread {
    public:
        void SetTask(LPOVERLAPPED pO) { /* start processing pO*/ }
    private:
        thread m_thread;    // Actual thread object
    };
    
    // The threads in this thread pool are not associated with the IOCP in any way whatsoever; they exist
    // purely to be handed a completion packet which they then process!
    class ThreadPool
    {
    public:
        void Initialise() { /* create 100 worker threads and add them to some internal storage*/}
        myThread& GetNextFreeThread() { /* return one of the 100 worker thread we created*/}
    } g_threadPool;
    

    与IOCP关联的四个线程中的每一个都将更改为

    if (::GetQueuedCompletionStatus(hCompletionPort1, &dwBytes, &uKey, &pOverlapped, INFINITE) == 1)
    {
        if (dwBytes == 0 && uKey == 0 && pOverlapped == 0)
            break;  // Special completion packet; end processing.
    
        // Pick a new thread from a pool of pre-created threads and assign it the packet to process
        myThread& thr = g_threadPool.GetNextFreeThread();
        thr.SetTask(pOverlapped);
    
        // Now, this thread can immediately return to the IOCP; it doesn't matter if the
        // packet we dequeued would take forever to process; that is happening in the 
        // separate thread thr *that will not intefere with packets being dequeued from IOCP!*
    }
    

    这样,我就不可能在没有更多数据包排队的情况下结束!

    1 回复  |  直到 7 年前
        1
  •  0
  •   Wad    7 年前

    对于是否应该使用单独的线程池,似乎存在着相互矛盾的意见。显然,正如我发布的示例代码所示,如果数据包的处理没有进入等待状态,那么数据包就有可能停止从IOCP排队;鉴于此,无限循环可能是不现实的,但它确实证明了这一点。