我在研究并发数据结构,并试图实现细粒度和粗粒度案例的链表,以比较时间。
然而,与我的预期相反,细粒度的情况表现更糟。我想知道这是由于开销还是我实现错误。
这是一个粗粒度的链表。
struct Node {
int data;
Node* next;
Node(int k) : data(k), next(nullptr) {}
};
class LinkedList {
private:
Node* head;
mutex lock;
public:
LinkedList() : head(nullptr) {}
void insert_node(int data) {
Node* new_node = new Node(data);
lock.lock();
if(head == nullptr) {
head = new_node;
} else if(head->data >= data) {
new_node->next = head;
head = new_node;
}
else {
Node* prev = nullptr;
Node* curr = head;
while(curr != nullptr and data > curr->data) {
prev = curr;
curr = curr->next;
}
prev->next = new_node;
new_node->next = curr;
}
lock.unlock();
}
void delete_node(int data) {
Node* prev = nullptr;
lock.lock();
Node* curr = head;
while(curr != nullptr) {
if(curr->data == data) {
if(prev == nullptr)
head = curr->next;
else
prev->next = curr->next;
delete curr;
break;
}
prev = curr;
curr = curr->next;
}
lock.unlock();
}
这是细粒度的链表。
struct HOHNode {
int data;
HOHNode* next;
mutex node_lock;
HOHNode(int k) : data(k), next(nullptr) {}
};
class HOHLinkedList {
private:
HOHNode* head;
mutex list_lock;
public:
HOHLinkedList() : head(nullptr) {}
void insert_node(int data) {
HOHNode* new_node = new HOHNode(data);
list_lock.lock();
if(head == nullptr or head->data >= data) {
if(head != nullptr)
head->node_lock.lock();
new_node->next = head;
head = new_node;
list_lock.unlock();
if(new_node->next != nullptr)
new_node->next->node_lock.unlock();
}
else {
HOHNode* prev = head;
HOHNode* curr = head->next;
prev->node_lock.lock();
list_lock.unlock();
if(curr != nullptr)
curr->node_lock.lock();
while(curr != nullptr and data > curr->data) {
HOHNode* temp = prev;
prev = curr;
curr = curr->next;
temp->node_lock.unlock();
if(curr != nullptr)
curr->node_lock.lock();
}
new_node->next = curr;
prev->next = new_node;
prev->node_lock.unlock();
if(curr != nullptr)
curr->node_lock.unlock();
}
}
void delete_node(int data) {
list_lock.lock();
HOHNode* prev = head;
prev->node_lock.lock();
if(prev->data == data) {
head = prev->next;
delete prev;
list_lock.unlock();
} else {
HOHNode* curr = head->next;
list_lock.unlock();
if(curr != nullptr) {
curr->node_lock.lock();
}
while(curr != nullptr and data >= curr->data) {
if(curr->data == data) {
prev->next = curr->next;
delete curr;
prev->node_lock.unlock();
return;
}
HOHNode* temp = prev;
prev = curr;
curr = curr->next;
temp->node_lock.unlock();
curr->node_lock.lock();
}
}
}
这是用1到1000插入和1到1000删除混合测试的结果。
[编辑]
谢谢你的建议。这是完整的代码。
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>
#include <algorithm>
#include <random>
// #include "matplotlibcpp.h"
using namespace std;
// namespace plt = matplotlibcpp;
struct Node {
int data;
Node* next;
Node(int k) : data(k), next(nullptr) {}
};
class LinkedList {
private:
Node* head;
mutex lock;
public:
LinkedList() : head(nullptr) {}
void insert_node(int data) {
Node* new_node = new Node(data);
lock.lock();
if(head == nullptr) {
head = new_node;
} else if(head->data >= data) {
new_node->next = head;
head = new_node;
}
else {
Node* prev = nullptr;
Node* curr = head;
while(curr != nullptr and data > curr->data) {
prev = curr;
curr = curr->next;
}
prev->next = new_node;
new_node->next = curr;
}
lock.unlock();
}
void delete_node(int data) {
Node* prev = nullptr;
lock.lock();
Node* curr = head;
while(curr != nullptr) {
if(curr->data == data) {
if(prev == nullptr)
head = curr->next;
else
prev->next = curr->next;
delete curr;
break;
}
prev = curr;
curr = curr->next;
}
lock.unlock();
}
void print() {
lock.lock();
Node* curr = head;
while(curr != nullptr) {
cout << curr->data << " ";
curr = curr->next;
}
cout << endl;
lock.unlock();
}
void free() {
lock.lock();
while(head != nullptr) {
Node* temp = head;
head = head->next;
delete temp;
}
lock.unlock();
}
};
struct HOHNode {
int data;
HOHNode* next;
mutex node_lock;
HOHNode(int k) : data(k), next(nullptr) {}
};
class HOHLinkedList {
private:
HOHNode* head;
mutex list_lock;
public:
HOHLinkedList() : head(nullptr) {}
void insert_node(int data) {
HOHNode* new_node = new HOHNode(data);
list_lock.lock();
if(head == nullptr or head->data >= data) {
if(head != nullptr)
head->node_lock.lock();
new_node->next = head;
head = new_node;
list_lock.unlock();
if(new_node->next != nullptr)
new_node->next->node_lock.unlock();
}
else {
HOHNode* prev = head;
HOHNode* curr = head->next;
prev->node_lock.lock();
list_lock.unlock();
if(curr != nullptr)
curr->node_lock.lock();
while(curr != nullptr and data > curr->data) {
HOHNode* temp = prev;
prev = curr;
curr = curr->next;
temp->node_lock.unlock();
if(curr != nullptr)
curr->node_lock.lock();
}
new_node->next = curr;
prev->next = new_node;
prev->node_lock.unlock();
if(curr != nullptr)
curr->node_lock.unlock();
}
}
void delete_node(int data) {
list_lock.lock();
HOHNode* prev = head;
prev->node_lock.lock();
if(prev->data == data) {
head = prev->next;
prev->node_lock.unlock(); // editted
delete prev;
list_lock.unlock();
} else {
HOHNode* curr = head->next;
list_lock.unlock();
if(curr != nullptr) {
curr->node_lock.lock();
}
while(curr != nullptr and data >= curr->data) {
if(curr->data == data) {
prev->next = curr->next;
curr->node_lock.unlock(); // editted
delete curr;
prev->node_lock.unlock();
return;
}
HOHNode* temp = prev;
prev = curr;
curr = curr->next;
temp->node_lock.unlock();
curr->node_lock.lock();
}
}
}
void print() {
HOHNode* curr = head;
while(curr != nullptr) {
cout << curr->data << " ";
curr = curr->next;
}
cout << endl;
}
void free() {
list_lock.lock();
while(head != nullptr) {
HOHNode* temp = head;
head = head->next;
delete temp;
}
list_lock.unlock();
}
};
void test_simple(LinkedList& list) {
random_device rd;
mt19937 rng(rd());
vector<int> numbers;
numbers.reserve(1000);
for(int i = 1; i <= 1000; i++)
numbers.push_back(i);
shuffle(numbers.begin(), numbers.end(), rng);
for(int number : numbers)
list.insert_node(number);
shuffle(numbers.begin(), numbers.end(), rng);
for(int number : numbers)
list.delete_node(number);
}
void test_HOH(HOHLinkedList& list) {
random_device rd;
mt19937 rng(rd());
vector<int> numbers;
numbers.reserve(1000);
for(int i = 1; i <= 1000; i++)
numbers.push_back(i);
shuffle(numbers.begin(), numbers.end(), rng);
for(int number : numbers)
list.insert_node(number);
shuffle(numbers.begin(), numbers.end(), rng);
for(int number : numbers)
list.delete_node(number);
}
int main() {
LinkedList simple_list;
HOHLinkedList HOH_list;
vector<thread> list_threads;
vector<int> x;
for(int i = 1; i <= 8; i++)
x.push_back(i);
vector<double> simple_y;
vector<double> HOH_y;
for(int num_threads : x) {
simple_list.free();
list_threads.clear();
auto start_time = chrono::high_resolution_clock::now();
for(int i = 0; i < num_threads; i++)
list_threads.push_back(thread(test_simple, ref(simple_list)));
for(thread& list_thread : list_threads)
list_thread.join();
auto end_time = chrono::high_resolution_clock::now();
chrono::duration<double> elapsed_time = end_time - start_time;
simple_y.push_back(elapsed_time.count());
HOH_list.free();
list_threads.clear();
start_time = chrono::high_resolution_clock::now();
for(int i = 0; i < num_threads; i++)
list_threads.push_back(thread(test_HOH, ref(HOH_list)));
for(thread& list_thread : list_threads)
list_thread.join();
end_time = chrono::high_resolution_clock::now();
elapsed_time = end_time - start_time;
HOH_y.push_back(elapsed_time.count());
}
cout << "Coarse grained" << endl;
for(double t : simple_y)
cout << t << endl;
cout << "Fine grained" << endl;
for(double t : HOH_y)
cout << t << endl;
// plt::named_plot("Simple", x, simple_y, "x-");
// plt::named_plot("Hand over Hand", x, HOH_y, "o-");
// plt::title("Performance of Simple vs. Hand over Hand Linked List");
// plt::xlabel("Threads");
// plt::ylabel("Time (seconds)");
// plt::xticks(x);
// plt::legend();
// plt::save("./linked_list.png");
}
用g++9.4.0在4芯8螺纹上运行。当我试图删除列表中不存在的数据时,我并不关心这种情况,因为我测试了插入1到1000和删除1到1000的随机顺序。我试着混淆顺序,以查看细粒度案例的好处。