-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSWMR.cpp
147 lines (137 loc) · 4.09 KB
/
SWMR.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//#pragma once
#include <memory>
#include <thread>
#include <iostream>
#include <random>
using namespace std;
//multiple reader single writer first in first out fixed length ring buffer queue
//compatible with x86/x86_64 GCC
template<typename T,uint32_t N> class RnW1FifoFixed
{
private:
static constexpr uint32_t QSIZE= 2<<N;
static constexpr uint32_t MASK = QSIZE-1;
public:
RnW1FifoFixed()
:m_array{new T[QSIZE]},m_read{0},m_write{0}
{
static_assert(std::is_default_constructible<T>::value,"T does not have a default constructor.");
static_assert(std::is_copy_assignable<T>::value,"T does not support copy assignment.");
static_assert(N!=0,"N is too small.");
cout << "Queue size = " << QSIZE << endl;
}
//one thread
bool Write(const T& t)
{
//full
if(m_write-m_read==QSIZE)
return false;
cout << "m_write = " << m_write << endl;
m_array[m_write++&MASK] = t;
//CPU does not reorder writes
//prevent compiler from reordering writes
//asm volatile("":::"memory");
//m_write++;
return true;
}
//multiple threads
bool Read(T& t)
{
while(true)
{
//use a constant m_read each loop
uint32_t read = m_read;
//empty
if(read==m_write)
return false;
t = m_array[read&MASK];
if(__sync_bool_compare_and_swap(&m_read,read,read+1))
return true;
}
}
private:
std::unique_ptr<T[]> m_array;
uint32_t m_read;
uint32_t m_write;
};
/* unique_ptr can create array, but not shared_ptr usually.
std::shared_ptr<int> p(new int[10]); // ERROR, but compiles
std::shared_ptr<int> p(new int[10],
[](int* p) {
delete[] p;
});
https://herbsutter.com/gotw/_102/
template<typename T, typename ...Args>
std::unique_ptr<T> make_unique( Args&& ...args )
{
return std::unique_ptr<T>( new T( std::forward<Args>(args)... ) );
}
make_unique<int>(5);
*/
main() {
auto N = std::thread::hardware_concurrency();
std::cout << "Number of threads = " << N << std::endl;
RnW1FifoFixed<int, 10> dataPipe;
int d[N-2] = {};
for( auto i : d)
cout << ", " << i << endl;
auto rt = [&](int i) {
// random-number generator(1-5) (use i as seed to get different sequences)
thread_local int ME = i;
std::default_random_engine dre(i);
std::uniform_int_distribution<int> id(1,5);
cout << "starting reader " << ME << endl;
while( d[ME] <= 5000 - N + 2) {
if (dataPipe.Read(d[ME]))
cout << "T " << ME << ": " << d[ME] << endl;
std::this_thread::yield(); // hint to reschedule to the next thread
this_thread::sleep_for(chrono::milliseconds(id(dre)));
}
cout << "exiting reader " << ME << endl;
};
for(int i = 0; i < N-2; ++i) {
thread r(rt, i);
/*
thread r([&]() {
// random-number generator(1-5) (use i as seed to get different sequences)
thread_local int ME = i;
std::default_random_engine dre(i);
std::uniform_int_distribution<int> id(1,5);
cout << "starting reader " << ME << endl;
while( d[ME] < 5000 - N + 2) {
if (dataPipe.Read(d[ME]))
cout << "T " << ME << ": " << d[ME] << endl;
std::this_thread::yield(); // hint to reschedule to the next thread
this_thread::sleep_for(chrono::milliseconds(id(dre)));
}
cout << "exiting reader " << ME << endl;
});
*/
r.detach(); // run in background; not r.join() --- main thread will wait after the first thread starts
}
cout << "starting writer " << endl;
thread w([&]() {
std::default_random_engine dre(200);
std::uniform_int_distribution<int> id(1,10);
for (int i = 1; i <= 5000; ) {
int loop = id(dre);
for (int j = 0; j < loop && i <= 5000; )
if(dataPipe.Write(i)) {
++i;
++j;
}
this_thread::sleep_for(chrono::milliseconds(1));
}
for(int i = 0; i < N-2; ) {
if (d[i] > 5000 - N + 2) {
cout << "DONE reader " << i << " = " << d[i] << endl;
++i;
}
}
cout << "exiting writer" << endl;
cout << "Press Enter to exit\n";
cin.get();
});
w.join();
cout << "done" << endl;
}