注意事项
:我在这里添加了C++标记,因为a)代码是C++,b)使用C++的人可能已经使用了IO完成端口。所以请不要大喊大叫。
我正在使用IO完成端口,并最终在
RbMm
-的含义
NumberOfConcurrentThreads
内的参数
CreateIoCompletionPort()
。
我有下面的小程序,它创建了10个线程,所有线程都在完成端口上等待。我告诉我的完成端口一次只允许运行4个线程(我有四个CPU)。然后,我将8个数据包排队到端口。如果My thread函数将ID为gt;的数据包出列,则会输出一条消息;4.为了输出此消息,我必须停止当前运行的四个线程中的至少一个线程,这是在控制台输入“1”时发生的。
现在这是所有相当简单的代码。然而,我有一个很大的担忧,那就是
如果处理完成数据包的所有线程都陷入困境,这将意味着不再有数据包可以出列和处理
。
这就是我用无限循环模拟的
-在控制台输入“1”之前,不会有更多的数据包出列,这一事实突显了这一潜在问题!
一个更好的解决方案是不让我的四个线程将数据包(或CPU的线程数)出列,然后在其中一个线程出列时,将数据包的处理转移到一个工作线程
从另一个泳池
,从而消除IOCP中的所有线程陷入困境从而不再有数据包出列的风险?
我这样问是因为
全部的
我所看到的IO完成端口代码示例使用了一种类似于下面所示的方法,
不
我建议使用一个单独的线程池。这就是为什么我会这样想
我
我错过了一些东西,因为我寡不敌众!
注意:这是一个有点做作的例子,因为
Windows will allow
如果其中一个可运行线程进入等待状态,则要退出队列的额外数据包;我在代码中用注释掉的
cout
电话:
系统还允许线程在GetQueuedCompletionStatus中等待
如果另一个正在运行的线程关联
对于相同的I/O完成端口,其他端口进入等待状态
原因,例如SuspendThread函数。当螺纹插入时
等待状态再次开始运行,可能会有一段短暂的时间
活动线程数超过了并发值。
然而
系统通过不允许任何新的活动
直到活动线程数低于并发数
价值
。
但我不会打电话的
SuspendThread
在我的线程函数中
我不知道除了
库特
将导致线程进入等待状态
,因此我无法预测我的一个或多个线程是否会陷入困境!因此我想到了线程池;至少上下文切换意味着其他数据包有机会退出队列!
#define _CRT_SECURE_NO_WARNINGS
#include <windows.h>
#include <thread>
#include <vector>
#include <algorithm>
#include <atomic>
#include <ctime>
#include <iostream>
using namespace std;
int main()
{
HANDLE hCompletionPort1;
if ((hCompletionPort1 = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4)) == NULL)
{
return -1;
}
vector<thread> vecAllThreads;
atomic_bool bStop(false);
// Fill our vector with 10 threads, each of which waits on our IOCP.
generate_n(back_inserter(vecAllThreads), 10, [hCompletionPort1, &bStop] {
thread t([hCompletionPort1, &bStop]()
{
// Thread body
while (true)
{
DWORD dwBytes = 0;
LPOVERLAPPED pOverlapped = 0;
ULONG_PTR uKey;
if (::GetQueuedCompletionStatus(hCompletionPort1, &dwBytes, &uKey, &pOverlapped, INFINITE) == 1)
{
if (dwBytes == 0 && uKey == 0 && pOverlapped == 0)
break; // Special completion packet; end processing.
//cout << uKey; // EVEN THIS WILL CAUSE A "wait" which causes MORE THAN 4 THREADS TO ENTER!
if (uKey >4)
cout << "Started processing packet ID > 4!" << endl;
while (!bStop)
; // INFINITE LOOP
}
}
});
return move(t);
}
);
// Queue 8 completion packets to our IOCP...only four will be processed until we set our bool
for (int i = 1; i <= 8; ++i)
{
PostQueuedCompletionStatus(hCompletionPort1, 0, i, new OVERLAPPED);
}
while (!bStop)
{
int nVal;
cout << "Enter 1 to cause current processing threads to end: ";
cin >> nVal;
bStop = (nVal == 1);
}
for (int i = 0; i < 10; ++i) // Tell all 10 threads to stop processing on the IOCP
{
PostQueuedCompletionStatus(hCompletionPort1, 0, 0, 0); // Special packet marking end of IOCP usage
}
for_each(begin(vecAllThreads), end(vecAllThreads), mem_fn(&thread::join));
return 0;
}
编辑#1
我所说的“独立线程池”的意思如下:
class myThread {
public:
void SetTask(LPOVERLAPPED pO) { /* start processing pO*/ }
private:
thread m_thread; // Actual thread object
};
// The threads in this thread pool are not associated with the IOCP in any way whatsoever; they exist
// purely to be handed a completion packet which they then process!
class ThreadPool
{
public:
void Initialise() { /* create 100 worker threads and add them to some internal storage*/}
myThread& GetNextFreeThread() { /* return one of the 100 worker thread we created*/}
} g_threadPool;
与IOCP关联的四个线程中的每一个都将更改为
if (::GetQueuedCompletionStatus(hCompletionPort1, &dwBytes, &uKey, &pOverlapped, INFINITE) == 1)
{
if (dwBytes == 0 && uKey == 0 && pOverlapped == 0)
break; // Special completion packet; end processing.
// Pick a new thread from a pool of pre-created threads and assign it the packet to process
myThread& thr = g_threadPool.GetNextFreeThread();
thr.SetTask(pOverlapped);
// Now, this thread can immediately return to the IOCP; it doesn't matter if the
// packet we dequeued would take forever to process; that is happening in the
// separate thread thr *that will not intefere with packets being dequeued from IOCP!*
}
这样,我就不可能在没有更多数据包排队的情况下结束!