forked from RabbitBio/RabbitBAM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
BamCompress.cpp
101 lines (69 loc) · 2.6 KB
/
BamCompress.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include "BamCompress.h"
BamCompress::BamCompress() {
};
void BamCompress::resize(int BufferSize, int threadNumber) {
blockNum = 0;
compress_bg = 0;
compress_ed = BufferSize - 1;
compress_size = BufferSize + 1;
compress_data = new bam_block *[compress_size];
for (int i = compress_bg; i <= compress_ed; i++) compress_data[i] = new bam_block;
consumer_bg = 1;
consumer_ed = 0;
consumer_size = BufferSize + 5;
consumer_data = new bam_block *[consumer_size];
is_ok = new bool[consumer_size];
for (int i = 0; i < consumer_size; i++) is_ok[i] = false;
compressThread = threadNumber;
wait_num = 0;
}
BamCompress::BamCompress(int BufferSize, int threadNumber) {
blockNum = 0;
compress_bg = 0;
compress_ed = BufferSize - 1;
compress_size = BufferSize + 1;
compress_data = new bam_block *[compress_size];
for (int i = compress_bg; i <= compress_ed; i++) compress_data[i] = new bam_block;
consumer_bg = 1;
consumer_ed = 0;
consumer_size = BufferSize + 5;
consumer_data = new bam_block *[consumer_size];
compressThread = threadNumber;
wait_num = 0;
}
bam_block *BamCompress::getEmpty() {
mtx_compress.lock();
while ((compress_ed + 1) % compress_size == compress_bg) {
mtx_compress.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
mtx_compress.lock();
}
int num = compress_bg;
compress_bg = (compress_bg + 1) % compress_size;
bam_block *res = compress_data[num];
mtx_compress.unlock();
return res;
}
void BamCompress::inputUnCompressData(bam_block *data, int block_num) {
while (block_num != blockNum.load(std::memory_order_acq_rel)) {
wait_num += 1;
std::this_thread::sleep_for(std::chrono::nanoseconds(block_num - blockNum) / 8);
}
consumer_data[(consumer_ed + 1) % consumer_size] = data;
consumer_ed = (consumer_ed + 1) % consumer_size;
blockNum.store(blockNum.load(std::memory_order_acq_rel) + 1, std::memory_order_acq_rel);
}
bam_block *BamCompress::getUnCompressData() {
while ((consumer_ed + 1) % consumer_size == consumer_bg) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (compressThread == 0 && (consumer_ed + 1) % consumer_size == consumer_bg) return nullptr;
}
int num = consumer_bg;
bam_block *res = consumer_data[consumer_bg];
consumer_bg = (consumer_bg + 1) % consumer_size;
return res;
}
void BamCompress::backEmpty(bam_block *data) {
compress_data[(compress_ed + 1) % compress_size] = data;
compress_ed = (compress_ed + 1) % compress_size;
}