Skip to content

Commit

Permalink
File tranferring implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
DinikaSen committed Mar 14, 2019
1 parent f885900 commit c368b28
Show file tree
Hide file tree
Showing 13 changed files with 347 additions and 151 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ add_executable(JasmineGraph main.cpp src/server/JasmineGraphServer.cpp src/serve
src/partitioner/local/MetisPartitioner.cpp src/partitioner/local/MetisPartitioner.h src/partitioner/local/RDFPartitioner.cpp
src/partitioner/local/RDFPartitioner.h src/backend/JasmineGraphBackend.cpp src/backend/JasmineGraphBackend.h
src/backend/JasmineGraphBackendProtocol.cpp src/backend/JasmineGraphBackendProtocol.h src/server/JasmineGraphInstanceProtocol.cpp
src/server/JasmineGraphInstanceProtocol.h src/server/JasmineGraphInstanceServiceSession.h src/server/JasmineGraphInstanceServiceSession.cpp)
src/server/JasmineGraphInstanceProtocol.h src/server/JasmineGraphInstanceServiceSession.h src/server/JasmineGraphInstanceServiceSession.cpp
src/server/JasmineGraphInstanceFileTransferServiceSession.h src/server/JasmineGraphInstanceFileTransferServiceSession.cpp)

#file(GLOB_RECURSE Dir1_Sources "*.cpp")
#add_executable(JesminGraph ${Dir1_Sources})
Expand Down
1 change: 1 addition & 0 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ int main(int argc, char *argv[]) {
int serverPort = atoi(argv[2]);
int serverDataPort = atoi(argv[3]);

std::cout << "In worker mode" << std::endl;
instance = new JasmineGraphInstance();
instance->start_running(serverPort,serverDataPort);

Expand Down
8 changes: 7 additions & 1 deletion src/partitioner/local/MetisPartitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
#include "MetisPartitioner.h"

thread_local std::vector<string> partitionFileList;
thread_local std::vector<string> centralStoreFileList;

MetisPartitioner::MetisPartitioner(SQLiteDBInterface *sqlite) {
this->sqlite = *sqlite;
Expand Down Expand Up @@ -316,10 +317,15 @@ void MetisPartitioner::createPartitionFiles(idx_t *part) {
this->utils.compressFile(outputFilePart);
partitionFileList.push_back(outputFilePart+".gz");
this->utils.compressFile(outputFilePartMaster);
partitionFileList.push_back(outputFilePartMaster+".gz");
centralStoreFileList.push_back(outputFilePartMaster+".gz");
}
}

std::vector<string> MetisPartitioner::getPartitionFiles() {
return partitionFileList;
}

std::vector<string> MetisPartitioner::getCentalStoreFiles() {
return centralStoreFileList;
}

3 changes: 3 additions & 0 deletions src/partitioner/local/MetisPartitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class MetisPartitioner {
//return list of partition files
static std::vector<string> getPartitionFiles();

//return list of centralStore files
static std::vector<string> getCentalStoreFiles();

MetisPartitioner(SQLiteDBInterface *);

private:
Expand Down
9 changes: 7 additions & 2 deletions src/server/JasmineGraphInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ limitations under the License.

#include "JasmineGraphInstance.h"
#include "JasmineGraphInstanceServiceSession.h"
#include "JasmineGraphInstanceFileTransferServiceSession.h"

#include <iostream>
#include <unistd.h>
Expand All @@ -22,8 +23,10 @@ limitations under the License.


int JasmineGraphInstance::start_running(int serverPort, int serverDataPort) {
std::cout << "Worker started" << std::endl;
std::cout << "Running the server..." << std::endl;
// std::cout << "Worker started" << std::endl;
// std::cout << "Running the server..." << std::endl;
fprintf(stdout, "Worker started\n");
fprintf(stdout, "Running the server...\n");

this->sqlite = *new SQLiteDBInterface();
this->sqlite.init();
Expand Down Expand Up @@ -80,6 +83,8 @@ int JasmineGraphInstance::start_running(int serverPort, int serverDataPort) {
// TODO : multi-threading needs to be applied?
JasmineGraphInstanceServiceSession *serviceSession = new JasmineGraphInstanceServiceSession();
serviceSession->start_session(connFd, serverDataPort);
std::cout << "END>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" << std::endl;

}

}
Expand Down
5 changes: 5 additions & 0 deletions src/server/JasmineGraphInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
#include <map>
#include "../localstore/JasmineGraphLocalStore.h"
#include "../metadb/SQLiteDBInterface.h"
#include "JasmineGraphInstanceFileTransferServiceSession.h"
#include "JasmineGraphInstanceServiceSession.h"

using std::map;

Expand All @@ -36,6 +38,9 @@ class JasmineGraphInstance {
void shutdown();

bool isRunning();

JasmineGraphInstanceServiceSession *instanceService;
JasmineGraphInstanceFileTransferServiceSession *ftpService;
};

struct JasmineGraphInstanceRecord {
Expand Down
108 changes: 108 additions & 0 deletions src/server/JasmineGraphInstanceFileTransferServiceSession.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
Copyright 2019 JasmineGraph Team
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include "JasmineGraphInstanceFileTransferServiceSession.h"
#include "../util/Utils.h"
#include "JasmineGraphInstanceProtocol.h"

#include <iostream>
#include <unistd.h>

using namespace std;

int JasmineGraphInstanceFileTransferServiceSession::startFileTransferSession(int serverDataPort) {
std::cout << "New file transfer service session started" << std::endl;
fprintf(stderr, "New file transfer service session started\n");
Utils utils;

int listenFd;
socklen_t len;
struct sockaddr_in svrAdd;
struct sockaddr_in clntAdd;

//create socket
listenFd = socket(AF_INET, SOCK_STREAM, 0);
if (listenFd < 0) {
std::cerr << "Cannot open socket" << std::endl;
return 0;
}

bzero((char *) &svrAdd, sizeof(svrAdd));

svrAdd.sin_family = AF_INET;
svrAdd.sin_addr.s_addr = INADDR_ANY;
svrAdd.sin_port = htons(serverDataPort);

int yes = 1;

if (setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1) {
perror("setsockopt");
exit(1);
}


//bind socket
if (bind(listenFd, (struct sockaddr *) &svrAdd, sizeof(svrAdd)) < 0) {
std::cerr << "Cannot bind" << std::endl;
return 0;
}

listen(listenFd, 5);

len = sizeof(clntAdd);

while (true) {
std::cout << "Worker listening on port " << serverDataPort << std::endl;
int connFd = accept(listenFd, (struct sockaddr *) &clntAdd, &len);

if (connFd < 0) {
std::cerr << "Cannot accept connection" << std::endl;
} else {
std::cout << "Connection successful" << std::endl;
}

char data[300];
bzero(data, 301);
read(connFd, data, 300);
string fileName = (data);
fileName = utils.trim_copy(fileName, " \f\n\r\t\v");
string filePathWithName =
utils.getJasmineGraphProperty("org.jasminegraph.server.instance.datafolder") + "/" + fileName;

write(connFd, JasmineGraphInstanceProtocol::SEND_FILE.c_str(), JasmineGraphInstanceProtocol::SEND_FILE.size());

char recvBUFF[256];
int bytesReceived = 0;
//memset(recvBUFF, '0', sizeof(recvBUFF));

FILE *fp;
fp = fopen(filePathWithName.c_str(), "w");
if (NULL == fp) {
printf("Error opening file");
return 1;
}

while((bytesReceived = read(connFd, recvBUFF, 256)) > 0)
{
printf("Bytes received %d\n",bytesReceived);
fwrite(recvBUFF, 1,bytesReceived,fp);
}
if(bytesReceived < 0)
{
printf("\n Read Error \n");
}
close(connFd);
std::cout << "Connection to the FTP closed" << std::endl;
return 0;
}
}
29 changes: 29 additions & 0 deletions src/server/JasmineGraphInstanceFileTransferServiceSession.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
Copyright 2019 JasmineGraph Team
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef JASMINEGRAPH_JASMINEGRAPHINSTANCEFILETRANSFERSERVICESESSION_H
#define JASMINEGRAPH_JASMINEGRAPHINSTANCEFILETRANSFERSERVICESESSION_H

#include <string>

class JasmineGraphInstanceFileTransferServiceSession {
private:

public:

int startFileTransferSession(int serverDataPort);

};


#endif //JASMINEGRAPH_JASMINEGRAPHINSTANCEFILETRANSFERSERVICESESSION_H
88 changes: 47 additions & 41 deletions src/server/JasmineGraphInstanceServiceSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ limitations under the License.
*/

#include "JasmineGraphInstanceServiceSession.h"
#include "JasmineGraphInstanceFileTransferServiceSession.h"
#include "../util/Utils.h"

#include <iostream>
Expand All @@ -32,7 +33,6 @@ void JasmineGraphInstanceServiceSession::start_session(int connFd, int serverDat
read(connFd, data, 300);

string line(data);
cout << line << endl;

Utils utils;
line = utils.trim_copy(line, " \f\n\r\t\v");
Expand All @@ -47,22 +47,22 @@ void JasmineGraphInstanceServiceSession::start_session(int connFd, int serverDat
line = utils.trim_copy(line, " \f\n\r\t\v");
server_hostname = line;
std::cout << "ServerName : " << server_hostname << std::endl;
} else if (line.compare(JasmineGraphInstanceProtocol::CLOSE)) {
} else if (line.compare(JasmineGraphInstanceProtocol::CLOSE)==0) {
write(connFd, JasmineGraphInstanceProtocol::CLOSE_ACK.c_str(),
JasmineGraphInstanceProtocol::CLOSE_ACK.size());
close(connFd);
} else if (line.compare(JasmineGraphInstanceProtocol::SHUTDOWN)) {
} else if (line.compare(JasmineGraphInstanceProtocol::SHUTDOWN)==0) {
write(connFd, JasmineGraphInstanceProtocol::SHUTDOWN_ACK.c_str(),
JasmineGraphInstanceProtocol::SHUTDOWN_ACK.size());
close(connFd);
break;
} else if (line.compare(JasmineGraphInstanceProtocol::READY)) {
} else if (line.compare(JasmineGraphInstanceProtocol::READY)==0) {
write(connFd, JasmineGraphInstanceProtocol::OK.c_str(), JasmineGraphInstanceProtocol::OK.size());
}

// TODO :: INSERT_EDGES,TRUNCATE,COUNT_VERTICES,COUNT_EDGES,DELETE,LOADPG etc should be implemented

else if (line.compare(JasmineGraphInstanceProtocol::BATCH_UPLOAD)) {
else if (line.compare(JasmineGraphInstanceProtocol::BATCH_UPLOAD)==0) {
write(connFd, JasmineGraphInstanceProtocol::OK.c_str(), JasmineGraphInstanceProtocol::OK.size());
bzero(data, 301);
read(connFd, data, 300);
Expand All @@ -89,58 +89,64 @@ void JasmineGraphInstanceServiceSession::start_session(int connFd, int serverDat
JasmineGraphInstanceProtocol::SEND_FILE_CONT.size());

// TODO :: Check with Acacia code
JasmineGraphInstanceFileTransferServiceSession *ftpSession = new JasmineGraphInstanceFileTransferServiceSession();
std::cout << "going to start a ftp session" << std::endl;
ftpSession->startFileTransferSession(serverDataPort);

string fullFilePath =
utils.getJasmineGraphProperty("org.jasminegraph.server.instance.datafolder") + "/" + fileName;

while (utils.fileExists(fullFilePath) && utils.getFileSize(fullFilePath) < fileSize) {
bzero(data, 301);
read(connFd, data, 300);
string response = (data);
response = utils.trim_copy(response, " \f\n\r\t\v");

if (response.compare(JasmineGraphInstanceProtocol::FILE_RECV_CHK) == 0) {
write(connFd, JasmineGraphInstanceProtocol::FILE_RECV_WAIT.c_str(),
JasmineGraphInstanceProtocol::FILE_RECV_WAIT.size());
}
}

bzero(data, 301);
read(connFd, data, 300);
string response = (data);
response = utils.trim_copy(response, " \f\n\r\t\v");

if (line.compare(JasmineGraphInstanceProtocol::FILE_RECV_CHK) == 0) {
write(connFd, JasmineGraphInstanceProtocol::FILE_ACK.c_str(),
JasmineGraphInstanceProtocol::FILE_ACK.size());
}
// while (utils.fileExists(fullFilePath) && utils.getFileSize(fullFilePath) < fileSize) {
// bzero(data, 301);
// read(connFd, data, 300);
// string response = (data);
// response = utils.trim_copy(response, " \f\n\r\t\v");
//
// if (response.compare(JasmineGraphInstanceProtocol::FILE_RECV_CHK) == 0) {
// write(connFd, JasmineGraphInstanceProtocol::FILE_RECV_WAIT.c_str(),
// JasmineGraphInstanceProtocol::FILE_RECV_WAIT.size());
// }
// }
//
// bzero(data, 301);
// read(connFd, data, 300);
// string response = (data);
// response = utils.trim_copy(response, " \f\n\r\t\v");
//
// if (line.compare(JasmineGraphInstanceProtocol::FILE_RECV_CHK) == 0) {
// write(connFd, JasmineGraphInstanceProtocol::FILE_ACK.c_str(),
// JasmineGraphInstanceProtocol::FILE_ACK.size());
// }

std::cout << "File Received" << std::endl;
loop = true;

// TODO :: Check with Acacia

utils.unzipFile(fullFilePath);
//utils.unzipFile(fullFilePath);

//TODO:: Check with Acacia

while (!utils.fileExists(fullFilePath)) {
bzero(data, 301);
read(connFd, data, 300);
string response = (data);
response = utils.trim_copy(response, " \f\n\r\t\v");
if (line.compare(JasmineGraphInstanceProtocol::BATCH_UPLOAD_CHK) == 0) {
write(connFd, JasmineGraphInstanceProtocol::BATCH_UPLOAD_WAIT.c_str(),
JasmineGraphInstanceProtocol::BATCH_UPLOAD_WAIT.size());
}
}

write(connFd, JasmineGraphInstanceProtocol::BATCH_UPLOAD_ACK.c_str(),
JasmineGraphInstanceProtocol::BATCH_UPLOAD_ACK.size());
// while (!utils.fileExists(fullFilePath)) {
// bzero(data, 301);
// read(connFd, data, 300);
// string response = (data);
// response = utils.trim_copy(response, " \f\n\r\t\v");
// if (line.compare(JasmineGraphInstanceProtocol::BATCH_UPLOAD_CHK) == 0) {
// write(connFd, JasmineGraphInstanceProtocol::BATCH_UPLOAD_WAIT.c_str(),
// JasmineGraphInstanceProtocol::BATCH_UPLOAD_WAIT.size());
// }
// }
//
// write(connFd, JasmineGraphInstanceProtocol::BATCH_UPLOAD_ACK.c_str(),
// JasmineGraphInstanceProtocol::BATCH_UPLOAD_ACK.size());

}

// TODO :: Implement the rest of the protocol
//else if ()
}

cout << "\nClosing thread " << pthread_self() << " and connection" << endl;
close(connFd);

}
Loading

0 comments on commit c368b28

Please sign in to comment.