Skip to content

Commit

Permalink
mpi comm pre 64 chunks, reduce mpi comm time
Browse files Browse the repository at this point in the history
  • Loading branch information
yanlifeng committed Apr 17, 2024
1 parent 3b63db0 commit 480442f
Show file tree
Hide file tree
Showing 12 changed files with 867 additions and 565 deletions.
48 changes: 30 additions & 18 deletions slave/slave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,29 @@ struct OverlapRes {
};


struct dupInfo{
uint32_t key;
uint kmer32;
uint8_t gc;
//struct dupInfo{
// uint32_t key;
// uint kmer32;
// uint8_t gc;
//};

struct qc_data_tgs {
ThreadInfo **thread_info_;
CmdInfo *cmd_info_;
std::vector<neoReference> *data1_;
int *cnt;
int bit_len;
};

struct qc_data {
ThreadInfo **thread_info_;
CmdInfo *cmd_info_;
//std::vector <dupInfo> *dups;
Duplicate *duplicate_;
std::vector <neoReference> *data1_;
std::vector <neoReference> *data2_;
std::vector <neoReference> *pass_data1_;
std::vector <neoReference> *pass_data2_;
//std::vector<dupInfo> *dups;
std::vector<neoReference> *data1_[64];
std::vector<neoReference> *data2_[64];
std::vector<neoReference> *pass_data1_[64];
std::vector<neoReference> *pass_data2_[64];
int *cnt;
int bit_len;
};
Expand Down Expand Up @@ -105,7 +113,7 @@ void updateTop5(int rlen, double avgQual, TGSStats *tgs_state) {
}
}

extern "C" void tgsfunc(qc_data *para) {
extern "C" void tgsfunc(qc_data_tgs *para) {
std::vector <neoReference> *data = para->data1_;
ThreadInfo *thread_info = para->thread_info_[_PEN];
CmdInfo *cmd_info_ = para->cmd_info_;
Expand Down Expand Up @@ -944,12 +952,13 @@ extern "C" void ngsfunc(qc_data *para){
unsigned long start_t, end_t;
unsigned long start_tt, end_tt;
rtc_(&start_tt);
std::vector <neoReference> *data = para->data1_;
std::vector <neoReference> *pass_data = para->pass_data1_;
std::vector <neoReference> *data = para->data1_[_PEN];
std::vector <neoReference> *pass_data = para->pass_data1_[_PEN];
ThreadInfo *thread_info = para->thread_info_[_PEN];
CmdInfo *cmd_info_ = para->cmd_info_;
int bit_len = para->bit_len;
int data_num = data->size();
if(data_num == 0) return;

int pre_pos_seq_len = thread_info->pre_state1_->malloc_seq_len_ * 4;
int *pre_pos_cnt_ = (int*)ldm_malloc(pre_pos_seq_len * sizeof(int));
Expand Down Expand Up @@ -1004,7 +1013,8 @@ extern "C" void ngsfunc(qc_data *para){
int aft_gc_bases_ = 0;
int aft_real_seq_len_ = 0;
int aft_lines_ = 0;
for(int id = _PEN * BATCH_SIZE; id < data_num; id += 64 * BATCH_SIZE) {
// for(int id = _PEN * BATCH_SIZE; id < data_num; id += 64 * BATCH_SIZE) {
for(int id = 0; id < data_num; id += BATCH_SIZE) {


rtc_(&start_t);
Expand Down Expand Up @@ -1329,16 +1339,17 @@ extern "C" void ngspefunc(qc_data *para){
unsigned long start_t, end_t;
unsigned long start_tt, end_tt;
rtc_(&start_tt);
std::vector <neoReference> *data1 = para->data1_;
std::vector <neoReference> *data2 = para->data2_;
std::vector <neoReference> *pass_data1 = para->pass_data2_;
std::vector <neoReference> *pass_data2 = para->pass_data1_;
std::vector <neoReference> *data1 = para->data1_[_PEN];
std::vector <neoReference> *data2 = para->data2_[_PEN];
std::vector <neoReference> *pass_data1 = para->pass_data2_[_PEN];
std::vector <neoReference> *pass_data2 = para->pass_data1_[_PEN];
ThreadInfo *thread_info = para->thread_info_[_PEN];
CmdInfo *cmd_info_ = para->cmd_info_;
int bit_len = para->bit_len;

int data_num1 = data1->size();
int data_num2 = data2->size();
if(data_num1 == 0) return;


int pre_pos_seq_len1 = thread_info->pre_state1_->malloc_seq_len_ * 4;
Expand Down Expand Up @@ -1457,7 +1468,8 @@ extern "C" void ngspefunc(qc_data *para){



for(int id = _PEN * BATCH_SIZE; id < data_num1; id += 64 * BATCH_SIZE) {
// for(int id = _PEN * BATCH_SIZE; id < data_num1; id += 64 * BATCH_SIZE) {
for(int id = 0; id < data_num1; id += BATCH_SIZE) {


rtc_(&start_t);
Expand Down
2 changes: 1 addition & 1 deletion src/DataPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ namespace rabbit {
th::unique_lock<th::mutex> lock(mutex);

while (partNum >= maxPartNum) {
partsAvailableCondition.wait(lock);
usleep(1000000);
partsAvailableCondition.wait(lock);
}
ASSERT(availablePartsPool.size() > 0);

Expand Down
54 changes: 27 additions & 27 deletions src/FastxStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1070,33 +1070,33 @@ namespace rabbit {
//--------------read right chunk end---------------------//
if (eof1 && eof2) eof = true;
if (!eof) {
left_line_count = count_line(data, chunkEnd);
right_line_count = count_line(data_right, chunkEnd_right);
int64 difference = left_line_count - right_line_count;
if(difference) printf("difference.......\n");
if (difference > 0) {
while (chunkEnd >= 0) {
if (data[chunkEnd] == '\n') {
difference--;
if (difference == -1) {
chunkEnd++;
break;
}
}
chunkEnd--;
}
} else if (difference < 0) {
while (chunkEnd_right >= 0) {
if (data_right[chunkEnd_right] == '\n') {
difference++;
if (difference == 1) {
chunkEnd_right++;
break;
}
}
chunkEnd_right--;
}
}
// left_line_count = count_line(data, chunkEnd);
// right_line_count = count_line(data_right, chunkEnd_right);
// int64 difference = left_line_count - right_line_count;
// if(difference) printf("difference.......\n");
// if (difference > 0) {
// while (chunkEnd >= 0) {
// if (data[chunkEnd] == '\n') {
// difference--;
// if (difference == -1) {
// chunkEnd++;
// break;
// }
// }
// chunkEnd--;
// }
// } else if (difference < 0) {
// while (chunkEnd_right >= 0) {
// if (data_right[chunkEnd_right] == '\n') {
// difference++;
// if (difference == 1) {
// chunkEnd_right++;
// break;
// }
// }
// chunkEnd_right--;
// }
// }
leftPart->size = chunkEnd - 1;
if (usesCrlf) leftPart->size -= 1;
std::copy(data + chunkEnd, data + cbufSize, swapBuffer.Pointer());
Expand Down
6 changes: 3 additions & 3 deletions src/FastxStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ namespace rabbit {
* @param isZippedNew if true, it will use gzopen to read fileName_ and fileName2_
*/
FastqFileReader(const std::string &fileName_, FastqDataPool &pool_,
std::string fileName2_ = "", bool isZippedNew = false, uint32 mxLen_ = 1 << 20, int start_line = 0, int end_line = 0)
std::string fileName2_ = "", bool isZippedNew = false, uint32 mxLen_ = 1 << 20, int start_line = 0, int end_line = 0, bool in_mem = 0)
: swapBuffer(SwapBufferSize),
swapBuffer2(SwapBufferSize),
bufferSize(0),
Expand All @@ -334,9 +334,9 @@ namespace rabbit {
recordsPool(pool_) {
GetNxtBuffSize = mxLen_;
tot_read_size2 = 0;
mFqReader = new FileReader(fileName_, isZipped, start_line, end_line);
mFqReader = new FileReader(fileName_, isZipped, start_line, end_line, in_mem);
if (fileName2_ != "") {
mFqReader2 = new FileReader(fileName2_, isZipped, start_line, end_line);
mFqReader2 = new FileReader(fileName2_, isZipped, start_line, end_line, in_mem);
}
}

Expand Down
Loading

0 comments on commit 480442f

Please sign in to comment.