Skip to content

Commit

Permalink
Added performance metric retrieval logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishad-M-I-M authored and thevindu-w committed Feb 29, 2024
1 parent c6a4f4c commit c93a155
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 118 deletions.
2 changes: 2 additions & 0 deletions conf/hosts.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ localhost

# TODO: Replace this
192.168.8.150
192.168.43.135
10.10.15.244

#[email protected]
#192.168.1.8
Expand Down
2 changes: 1 addition & 1 deletion conf/jasminegraph-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ org.jasminegraph.scheduler.enabled=true
org.jasminegraph.scheduler.performancecollector.timing=120

#pushgateway address
org.jasminegraph.collector.pushgateway=http://10.43.150.139:9091/metrics/job/
org.jasminegraph.collector.pushgateway=http://192.168.8.150:9091/

#--------------------------------------------------------------------------------
#MetaDB information
Expand Down
4 changes: 3 additions & 1 deletion main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ int main(int argc, char *argv[]) {
main_logger.log("Using JASMINE_GRAPH_HOME", "info");
std::cout << JASMINEGRAPH_HOME << std::endl;

StatisticCollector::init();
thread schedulerThread(SchedulerService::startScheduler);

if (mode == Conts::JASMINEGRAPH_RUNTIME_PROFILE_MASTER) {
std::string masterIp = argv[3];
int numberOfWorkers = atoi(argv[4]);
std::string workerIps = argv[5];
enableNmon = argv[6];
server = JasmineGraphServer::getInstance();
thread schedulerThread(SchedulerService::startScheduler);

if (profile == Conts::PROFILE_K8S) {
K8sInterface *interface = new K8sInterface();
Expand Down
134 changes: 45 additions & 89 deletions src/performance/metrics/PerformanceUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ limitations under the License.

using namespace std::chrono;
std::map<std::string, std::vector<ResourceUsageInfo>> resourceUsageMap;
std::string pushGatewayAddr = Utils::getJasmineGraphProperty("org.jasminegraph.collector.pushgateway");

static size_t write_callback(void* contents, size_t size, size_t nmemb, std::string* output);

Expand All @@ -35,59 +34,33 @@ void PerformanceUtil::init() {
}
}

int PerformanceUtil::collectPerformanceStatistics() {
vector<std::string> hostList = Utils::getHostListFromProperties();
int hostListSize = hostList.size();
int counter = 0;
std::vector<std::future<long>> intermRes;
PlacesToNodeMapper placesToNodeMapper;
int PerformanceUtil::collectPerformanceStatistics() {
long memoryUsage = StatisticCollector::getMemoryUsageByProcess();
Utils::send_job("WorkerPerfData", "memory_usage", std::to_string(memoryUsage));

std::string placeLoadQuery =
"select ip, user, server_port, is_master, is_host_reporter,host_idhost,idplace from place";
std::vector<vector<pair<string, string>>> placeList = perfDb->runSelect(placeLoadQuery);
std::vector<vector<pair<string, string>>>::iterator placeListIterator;
double cpuUsage = StatisticCollector::getCpuUsage();
Utils::send_job("WorkerPerfData", "cpu_usage", std::to_string(cpuUsage));

for (placeListIterator = placeList.begin(); placeListIterator != placeList.end(); ++placeListIterator) {
vector<pair<string, string>> place = *placeListIterator;
std::string host;
std::string requestResourceAllocation = "false";
int threadCount = StatisticCollector::getThreadCount();
Utils::send_job("HostPerfData", "thread_count", std::to_string(threadCount));

std::string ip = place.at(0).second;
std::string user = place.at(1).second;
std::string serverPort = place.at(2).second;
std::string isMaster = place.at(3).second;
std::string isHostReporter = place.at(4).second;
std::string hostId = place.at(5).second;
std::string placeId = place.at(6).second;
long totalSwapSpace = StatisticCollector::getTotalSwapSpace();
Utils::send_job("HostPerfData", "total_swap_space", std::to_string(totalSwapSpace));

if (ip.find("localhost") != std::string::npos) {
host = "localhost";
} else {
host = user + "@" + ip;
}
long usedSwapSpace = StatisticCollector::getUsedSwapSpace();
Utils::send_job("HostPerfData", "used_swap_space", std::to_string(usedSwapSpace));

if (isHostReporter.find("true") != std::string::npos) {
std::string hostSearch = "select total_cpu_cores,total_memory from host where idhost='" + hostId + "'";
std::vector<vector<pair<string, string>>> hostAllocationList = perfDb->runSelect(hostSearch);
long rxBytes = StatisticCollector::getRXBytes();
Utils::send_job("WorkerPerfData", "rx_bytes", std::to_string(rxBytes));

vector<pair<string, string>> firstHostAllocation = hostAllocationList.at(0);
long txBytes = StatisticCollector::getTXBytes();
Utils::send_job("WorkerPerfData", "tx_bytes", std::to_string(txBytes));

std::string totalCPUCores = firstHostAllocation.at(0).second;
std::string totalMemory = firstHostAllocation.at(1).second;

if (totalCPUCores.empty() || totalMemory.empty()) {
requestResourceAllocation = "true";
}
}

if (isMaster.find("true") != std::string::npos) {
collectLocalPerformanceData(isHostReporter, requestResourceAllocation, hostId, placeId);
} else {
collectRemotePerformanceData(host, atoi(serverPort.c_str()), isHostReporter, requestResourceAllocation,
hostId, placeId);
}
}
int socketCount = StatisticCollector::getSocketCount();
Utils::send_job("HostPerfData", "socket_count", std::to_string(socketCount));

long totalMemoryUsage = StatisticCollector::getTotalMemoryUsage();
Utils::send_job("HostPerfData", "total_memory", std::to_string(totalMemoryUsage));
return 0;
}

Expand Down Expand Up @@ -206,12 +179,6 @@ std::vector<ResourceConsumption> PerformanceUtil::retrieveCurrentResourceUtiliza
return placeResourceConsumptionList;
}

static size_t write_callback(void* contents, size_t size, size_t nmemb, std::string* output) {
size_t totalSize = size * nmemb;
output->append(static_cast<char*>(contents), totalSize);
return totalSize;
}

void PerformanceUtil::collectRemotePerformanceData(std::string host, int port, std::string isVMStatManager,
std::string isResourceAllocationRequired, std::string hostId,
std::string placeId) {
Expand All @@ -220,7 +187,7 @@ void PerformanceUtil::collectRemotePerformanceData(std::string host, int port, s
bool loop = false;
socklen_t len;
struct sockaddr_in serv_addr;
struct hostent* server;
struct hostent *server;

sockfd = socket(AF_INET, SOCK_STREAM, 0);

Expand All @@ -235,11 +202,11 @@ void PerformanceUtil::collectRemotePerformanceData(std::string host, int port, s
return;
}

bzero((char*)&serv_addr, sizeof(serv_addr));
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
bcopy((char*)server->h_addr, (char*)&serv_addr.sin_addr.s_addr, server->h_length);
bcopy((char *) server->h_addr, (char *) &serv_addr.sin_addr.s_addr, server->h_length);
serv_addr.sin_port = htons(port);
if (Utils::connect_wrapper(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) {
if (Utils::connect_wrapper(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
std::cerr << "ERROR connecting" << std::endl;
return;
}
Expand Down Expand Up @@ -301,7 +268,7 @@ void PerformanceUtil::collectRemotePerformanceData(std::string host, int port, s
std::string cpuUsage = strArr[5];
string vmPerformanceSql =
"insert into host_performance_data (date_time, memory_usage, cpu_usage, idhost) values ('" +
processTime + "','" + memoryConsumption + "','" + cpuUsage + "','" + hostId + "')";
processTime + "','" + memoryConsumption + "','" + cpuUsage + "','" + hostId + "')";

perfDb->runInsert(vmPerformanceSql);
Utils::send_job("hostPerfData", "memory_consumption", memoryConsumption);
Expand All @@ -311,32 +278,36 @@ void PerformanceUtil::collectRemotePerformanceData(std::string host, int port, s
std::string totalMemory = strArr[6];
std::string totalCores = strArr[7];
string allocationUpdateSql = "update host set total_cpu_cores='" + totalCores +
"',total_memory='" + totalMemory + "' where idhost='" + hostId +
"'";

perfDb->runUpdate(allocationUpdateSql);
"',total_memory='" + totalMemory + "' where idhost='" + hostId +
"'";

if (isResourceAllocationRequired == "true") {
std::string totalMemory = strArr[6];
std::string totalCores = strArr[7];
string allocationUpdateSql =
"update host set total_cpu_cores='" + totalCores + "',total_memory='" +
totalMemory + "' where idhost='" + hostId + "'";

Utils::send_job("placePerfData", "memory_usage", memoryUsage);
Utils::send_job("placePerfData", "cpu_usage", cpuUsage);
}
}
}

string placePerfSql =
"insert into place_performance_data (idplace, memory_usage, cpu_usage, date_time) values ('" +
placeId + "','" + memoryUsage + "','" + cpuUsage + "','" + processTime + "')";
placeId + "','" + memoryUsage + "','" + cpuUsage + "','" + processTime + "')";

perfDb->runInsert(placePerfSql);
Utils::send_job("placePerfData", "memory_usage", memoryUsage);
Utils::send_job("placePerfData", "cpu_usage", cpuUsage);
}
}
}
}

void PerformanceUtil::collectLocalPerformanceData(std::string isVMStatManager, std::string isResourceAllocationRequired,
std::string hostId, std::string placeId) {
StatisticCollector statisticCollector;
statisticCollector.init();

int memoryUsage = statisticCollector.getMemoryUsageByProcess();
double cpuUsage = statisticCollector.getCpuUsage();
int memoryUsage = StatisticCollector::getMemoryUsageByProcess();
double cpuUsage = StatisticCollector::getCpuUsage();

auto executedTime = std::chrono::system_clock::now();
std::time_t reportTime = std::chrono::system_clock::to_time_t(executedTime);
Expand All @@ -345,7 +316,7 @@ void PerformanceUtil::collectLocalPerformanceData(std::string isVMStatManager, s

if (isVMStatManager.find("true") != std::string::npos) {
std::string vmLevelStatistics =
statisticCollector.collectVMStatistics(isVMStatManager, isResourceAllocationRequired);
StatisticCollector::collectVMStatistics(isVMStatManager, isResourceAllocationRequired);
std::vector<std::string> strArr = Utils::split(vmLevelStatistics, ',');

string totalMemoryUsed = strArr[0];
Expand All @@ -357,9 +328,6 @@ void PerformanceUtil::collectLocalPerformanceData(std::string isVMStatManager, s

perfDb->runInsert(vmPerformanceSql);

Utils::send_job("localPerfDataO_" + hostId, "total_memory_usage", totalMemoryUsed);
Utils::send_job("localPerfDataO_" + hostId, "total_CPU_usage", totalCPUUsage);

if (isResourceAllocationRequired == "true") {
std::string totalMemory = strArr[2];
std::string totalCores = strArr[3];
Expand All @@ -375,11 +343,6 @@ void PerformanceUtil::collectLocalPerformanceData(std::string isVMStatManager, s
reportTimeString + "')";

perfDb->runInsert(placePerfSql);

Utils::send_job("placePerfDataLocalO_" + hostId, "memory_usage",
to_string(memoryUsage));
Utils::send_job("placePerfDataLocalO_" + hostId, "cpu_usage",
to_string(cpuUsage));
}

int PerformanceUtil::collectRemoteSLAResourceUtilization(std::string host, int port, std::string isVMStatManager,
Expand Down Expand Up @@ -501,11 +464,9 @@ int PerformanceUtil::collectRemoteSLAResourceUtilization(std::string host, int p

void PerformanceUtil::collectLocalSLAResourceUtilization(std::string graphId, std::string placeId, std::string command,
std::string category, int elapsedTime, bool autoCalibrate) {
StatisticCollector statisticCollector;
statisticCollector.init();
string graphSlaId;

double loadAverage = statisticCollector.getLoadAverage();
double loadAverage = StatisticCollector::getLoadAverage();

auto executedTime = std::chrono::system_clock::now();
std::time_t reportTime = std::chrono::system_clock::to_time_t(executedTime);
Expand Down Expand Up @@ -534,11 +495,8 @@ void PerformanceUtil::collectLocalSLAResourceUtilization(std::string graphId, st
ResourceConsumption PerformanceUtil::retrieveLocalResourceConsumption(std::string host, std::string placeId) {
ResourceConsumption placeResourceConsumption;

StatisticCollector statisticCollector;
statisticCollector.init();

int memoryUsage = statisticCollector.getMemoryUsageByProcess();
double cpuUsage = statisticCollector.getCpuUsage();
int memoryUsage = StatisticCollector::getMemoryUsageByProcess();
double cpuUsage = StatisticCollector::getCpuUsage();

placeResourceConsumption.memoryUsage = memoryUsage;
placeResourceConsumption.host = host;
Expand Down Expand Up @@ -915,9 +873,7 @@ void PerformanceUtil::adjustAggregateLoadMap(std::map<std::string, std::vector<d
}

void PerformanceUtil::logLoadAverage() {
StatisticCollector statisticCollector;

double currentLoadAverage = statisticCollector.getLoadAverage();
double currentLoadAverage = StatisticCollector::getLoadAverage();

std::cout << "###PERF### CURRENT LOAD: " + std::to_string(currentLoadAverage) << std::endl;
Utils::send_job("loadAverage", "load_average", std::to_string(currentLoadAverage));
Expand Down
14 changes: 2 additions & 12 deletions src/performance/metrics/StatisticCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ limitations under the License.

static clock_t lastCPU, lastSysCPU, lastUserCPU;
static int numProcessors;
std::string pushGatewayJobAddr = Utils::getJasmineGraphProperty("");

static long parseLine(char* line);
static long getSwapSpace(const char* type);
Expand Down Expand Up @@ -103,16 +102,12 @@ static long getSwapSpace(int field) {
long StatisticCollector::getUsedSwapSpace() {
long result = getSwapSpace(4);
std::cout << "Used swap space: " + std::to_string(result) << std::endl;
std::string response_used_swap = Utils::send_job("usedSwap", "used_swap_space",
std::to_string(result));
return result;
}

long StatisticCollector::getTotalSwapSpace() {
long result = getSwapSpace(3);
std::cout << "Total swap space: " + std::to_string(result) << std::endl;
std::string response_total_swap = Utils:: send_job("totalSwap", "total_swap_space",
std::to_string(result));
return result;
}

Expand All @@ -122,8 +117,6 @@ long StatisticCollector::getRXBytes() {
fscanf(file, "%li", &result);
fclose(file);
std::cout << "Total read bytes: " + std::to_string(result) << std::endl;
std::string response_rx = Utils::send_job("totalRead", "total_read_bytes",
std::to_string(result));
return result;
}

Expand All @@ -134,9 +127,6 @@ long StatisticCollector::getTXBytes() {
fclose(file);
std::cout << "Total sent bytes: " + std::to_string(result) << std::endl;

std::string response_tx = Utils::send_job("totalSent", "total_sent_bytes",
std::to_string(result));

return result;
}

Expand All @@ -161,6 +151,8 @@ int StatisticCollector::getSocketCount() {
}
}
(void)closedir(d);

std::cout << "Total sockets: " + std::to_string(count) << std::endl;
return count;
}

Expand Down Expand Up @@ -268,8 +260,6 @@ long StatisticCollector::getTotalMemoryUsage() {
}
memUsage = memTotal - (memFree + buffers + cached + sReclaimable);

std::string response_total_memory = Utils::send_job("totalMemory", "total_memory",
std::to_string(memTotal));
return memUsage;
}

Expand Down
2 changes: 1 addition & 1 deletion src/performance/metrics/StatisticCollector.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StatisticCollector {
static const int BUFFER_SIZE = 128;

public:
int init();
static int init();
static long getMemoryUsageByProcess();
static int getThreadCount();
static long getUsedSwapSpace();
Expand Down
Loading

0 comments on commit c93a155

Please sign in to comment.