From b72cffb2dde0975a6cf98c87f2dc654d19859e31 Mon Sep 17 00:00:00 2001 From: Patrick Schlangen Date: Wed, 1 Feb 2017 00:39:17 +0100 Subject: [PATCH] Initial import of current chronos version --- README.md | 45 +++- chronos/App.cpp | 214 ++++++++++++++++ chronos/App.h | 65 +++++ chronos/CMake/Modules/FindMySQLClient.cmake | 7 + chronos/CMake/Modules/FindSQLite.cmake | 7 + chronos/CMake/Modules/Findlibev.cmake | 7 + chronos/CMakeLists.txt | 45 ++++ chronos/Config.cpp | 68 +++++ chronos/Config.h | 38 +++ chronos/HTTPRequest.cpp | 246 ++++++++++++++++++ chronos/HTTPRequest.h | 66 +++++ chronos/JobResult.h | 60 +++++ chronos/MySQL.h | 85 +++++++ chronos/MySQL_DB.cpp | 201 +++++++++++++++ chronos/MySQL_Result.cpp | 45 ++++ chronos/Notification.h | 25 ++ chronos/SQLite.cpp | 168 ++++++++++++ chronos/SQLite.h | 82 ++++++ chronos/UpdateThread.cpp | 269 ++++++++++++++++++++ chronos/UpdateThread.h | 61 +++++ chronos/Utils.cpp | 124 +++++++++ chronos/Utils.h | 34 +++ chronos/WorkerThread.cpp | 269 ++++++++++++++++++++ chronos/WorkerThread.h | 92 +++++++ chronos/chronos.cfg | 29 +++ chronos/main.cpp | 34 +++ database/struct.sql | 109 ++++++++ 27 files changed, 2493 insertions(+), 2 deletions(-) create mode 100644 chronos/App.cpp create mode 100644 chronos/App.h create mode 100644 chronos/CMake/Modules/FindMySQLClient.cmake create mode 100644 chronos/CMake/Modules/FindSQLite.cmake create mode 100644 chronos/CMake/Modules/Findlibev.cmake create mode 100644 chronos/CMakeLists.txt create mode 100644 chronos/Config.cpp create mode 100644 chronos/Config.h create mode 100644 chronos/HTTPRequest.cpp create mode 100644 chronos/HTTPRequest.h create mode 100644 chronos/JobResult.h create mode 100644 chronos/MySQL.h create mode 100644 chronos/MySQL_DB.cpp create mode 100644 chronos/MySQL_Result.cpp create mode 100644 chronos/Notification.h create mode 100644 chronos/SQLite.cpp create mode 100644 chronos/SQLite.h create mode 100644 chronos/UpdateThread.cpp create mode 100644 chronos/UpdateThread.h create mode 100644 chronos/Utils.cpp create mode 100644 chronos/Utils.h create mode 100644 chronos/WorkerThread.cpp create mode 100644 chronos/WorkerThread.h create mode 100644 chronos/chronos.cfg create mode 100644 chronos/main.cpp create mode 100644 database/struct.sql diff --git a/README.md b/README.md index 30929900..8395c33e 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,43 @@ -# cron-job.org -cron-job.org Open Source project +cron-job.org +============ + +Structure +--------- +* `database` contains the MySQL database structure. +* `chronos` is cron-job.org's cron job execution daemon and is responsible for fetching the jobs. +* `web` contains the web interface (coming soon) + +chronos +------- +### Concept +chronos checks the MySQL database every minute to collect all jobs to execute. For every minute, a thread is spawned which processes all the jobs. Actual HTTP fetching is done using the excellent CURL multi library with libev as library used to provide the event loop. Together with the c-ares resovler this allows for thousands of parallel HTTP requests. + +cron-job.org supports storing the job results for the user's convenience. This can quickly lead to I/O bottleneck when storing the result data in a MySQL database. (Which also has the downside that cleaning up old entries is extremely expensive.) To solve this issue, chronos stores the results in per-user per-day SQLite databases. Cleaning up old entries is as easy as deleting the corresponding day's databases. + +The whole software is optimized on performance rather than on data integrity, i.e. when your server crashes or you have a power outage / hardware defect, the job history is most likely lost. Since this is volatile data anyway, it's not considered a big issue. + +### Prerequisites +In order to build chronos, you need development files of: +* curl (preferably with c-ares as resolver) +* libev +* mysqlclient +* sqlite3 +To build, you need a C++14 compiler and cmake. + +### Building +1. Create and enter a build folder: `mkdir build && cd build` +2. Run cmake: `cmake -DCMAKE_BUILD_TYPE=Release ..` +3. Build the project: `make` + +### Running +1. Ensure you've imported the DB scheme from the `database` folder +2. Customize `chronos.cfg` according to your system (especially add your MySQL login) +3. Execute `./chronos /path/to/chronos.cfg` + +General notes +------------- +* Web interface and jitter correction algorithm are still missing in this repository and will be added as soon as they've been refactored to a presentable state. +* We strongly recommend to build CURL using the c-ares resolver. Otherwise every request might spawn its own thread for DNS resolving and your machine will run out of resources *very* soon. +* Before running chronos, ensure that the limit of open files/sockets is not set too low. You might want to run `ulimit -n 65536` or similar first. +* If data integrity is not important for you, we highly recommend to set `innodb_flush_log_at_trx_commit=0` and `innodb_flush_method=O_DIRECT` in your MySQL config for best performance. Otherwise the update thread (which is responsible for storing the job resuls) might lag behind the actual job executions quite soon. +* Parts of the source are quite old and from early stages of the project and might require a refactoring sooner or later. diff --git a/chronos/App.cpp b/chronos/App.cpp new file mode 100644 index 00000000..b3c54aea --- /dev/null +++ b/chronos/App.cpp @@ -0,0 +1,214 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "App.h" + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include "UpdateThread.h" +#include "WorkerThread.h" +#include "Config.h" + +using namespace Chronos; + +App *App::instance = nullptr; + +App::App(int argc, char *argv[]) +{ + if(App::instance != nullptr) + throw std::runtime_error("App instance already exists"); + + if(argc != 2) + throw std::runtime_error(std::string("Usage: ") + std::string(argv[0]) + std::string(" [config-file]")); + + this->config = std::make_shared(argv[1]); + App::instance = this; +} + +App::~App() +{ + App::instance = nullptr; +} + +App *App::getInstance() +{ + if(App::instance == nullptr) + throw std::runtime_error("No app instance available"); + return(App::instance); +} + +void App::processJobs(int hour, int minute, int month, int mday, int wday, int year, time_t timestamp) +{ + std::cout << "App::processJobs(): Called for " + << "hour = " << hour << ", " + << "minute = " << minute << ", " + << "month = " << month << ", " + << "mday = " << mday << ", " + << "wday = " << wday << ", " + << "timestamp = " << timestamp + << std::endl; + + auto res = db->query("SELECT TRIM(`url`),`job`.`jobid`,`auth_enable`,`auth_user`,`auth_pass`,`notify_failure`,`notify_success`,`notify_disable`,`fail_counter`,`save_responses`,`userid` FROM `job` " + "INNER JOIN `job_hours` ON `job_hours`.`jobid`=`job`.`jobid` " + "INNER JOIN `job_mdays` ON `job_mdays`.`jobid`=`job`.`jobid` " + "INNER JOIN `job_wdays` ON `job_wdays`.`jobid`=`job`.`jobid` " + "INNER JOIN `job_minutes` ON `job_minutes`.`jobid`=`job`.`jobid` " + "INNER JOIN `job_months` ON `job_months`.`jobid`=`job`.`jobid` " + "WHERE (`hour`=-1 OR `hour`=%d) " + "AND (`minute`=-1 OR `minute`=%d) " + "AND (`mday`=-1 OR `mday`=%d) " + "AND (`wday`=-1 OR `wday`=%d) " + "AND (`month`=-1 OR `month`=%d) " + "AND `enabled`=1 " + "ORDER BY `fail_counter` ASC, `last_duration` ASC", + hour, minute, mday, wday, month); + + int jobCount = res->numRows(); + std::cout << "App::processJobs(): " << jobCount << " jobs found" << std::endl; + + if(jobCount > 0) + { + std::shared_ptr wt = std::make_shared(mday, month, year, hour, minute); + + MYSQL_ROW row; + while((row = res->fetchRow()) != nullptr) + { + HTTPRequest *req = HTTPRequest::fromURL(row[0], atoi(row[10]), wt); + req->result->jobID = atoi(row[1]); + req->result->datePlanned = (uint64_t)timestamp * 1000; + req->result->notifyFailure = strcmp(row[5], "1") == 0; + req->result->notifySuccess = strcmp(row[6], "1") == 0; + req->result->notifyDisable = strcmp(row[7], "1") == 0; + req->result->oldFailCounter = atoi(row[8]); + req->result->saveResponses = strcmp(row[9], "1") == 0; + if(atoi(row[2]) == 1) + { + req->useAuth = true; + req->authUsername = row[3]; + req->authPassword = row[4]; + } + + wt->addJob(req); + } + + std::cout << "App::processJobs(): Starting worker thread" << std::endl; + wt->run(); + } + + res.reset(); + + std::cout << "App::processJobs(): Finished" << std::endl; +} + +void App::signalHandler(int sig) +{ + if(sig == SIGINT) + App::getInstance()->stop = true; +} + +int App::run() +{ + curl_global_init(CURL_GLOBAL_ALL); + MySQL_DB::libInit(); + + db = createMySQLConnection(); + startUpdateThread(); + + signal(SIGINT, App::signalHandler); + + bool firstLoop = true; + struct tm lastTime = { 0 }; + int jitterCorrectionOffset = calcJitterCorrectionOffset(); + while(!stop) + { + time_t currentTime = time(nullptr) + jitterCorrectionOffset; + struct tm *t = localtime(¤tTime); + + if(t->tm_min > lastTime.tm_min + || t->tm_hour > lastTime.tm_hour + || t->tm_mday > lastTime.tm_mday + || t->tm_mon > lastTime.tm_mon + || t->tm_year > lastTime.tm_year) + { + // update last time + memcpy(&lastTime, t, sizeof(struct tm)); + + if(!firstLoop || t->tm_sec == 59 - jitterCorrectionOffset) + { + processJobs(t->tm_hour, t->tm_min, t->tm_mon+1, t->tm_mday, t->tm_wday, t->tm_year+1900, currentTime - t->tm_sec); + jitterCorrectionOffset = calcJitterCorrectionOffset(); + } + + firstLoop = false; + } + else + { + usleep(100*1000); + } + } + + this->stopUpdateThread(); + + MySQL_DB::libCleanup(); + curl_global_cleanup(); + + return(1); +} + +int App::calcJitterCorrectionOffset() +{ + return 1; //! @todo +} + +void App::updateThreadMain() +{ + try + { + updateThreadObj = std::make_unique(); + updateThreadObj->run(); + updateThreadObj.reset(); + } + catch(const std::runtime_error &ex) + { + std::cout << "Update thread runtime error: " << ex.what() << std::endl; + stop = true; + } +} + +void App::startUpdateThread() +{ + updateThread = std::thread(std::bind(&App::updateThreadMain, this)); +} + +void App::stopUpdateThread() +{ + updateThreadObj->stopThread(); + updateThread.join(); +} + +std::unique_ptr App::createMySQLConnection() +{ + return(std::make_unique(config->get("mysql_host"), + config->get("mysql_user"), + config->get("mysql_pass"), + config->get("mysql_db"), + config->get("mysql_sock"))); +} diff --git a/chronos/App.h b/chronos/App.h new file mode 100644 index 00000000..2dc4598b --- /dev/null +++ b/chronos/App.h @@ -0,0 +1,65 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _APP_H_ +#define _APP_H_ + +#include +#include + +#include + +#include "MySQL.h" +#include "Config.h" + +namespace Chronos +{ + class MySQL_DB; + class UpdateThread; + + class App + { + public: + App(int argc, char *argv[]); + ~App(); + + private: + App(const App &other) = delete; + App(App &&other) = delete; + App &operator=(const App &other) = delete; + App &operator=(App &&other) = delete; + + public: + static App *getInstance(); + static void signalHandler(int sig); + void updateThreadMain(); + int run(); + std::unique_ptr createMySQLConnection(); + + private: + void startUpdateThread(); + void stopUpdateThread(); + void processJobs(int hour, int minute, int month, int mday, int wday, int year, time_t timestamp); + int calcJitterCorrectionOffset(); + + public: + std::shared_ptr config; + + private: + bool stop = false; + static App *instance; + std::thread updateThread; + std::unique_ptr db; + std::unique_ptr updateThreadObj; + }; +}; + +#endif diff --git a/chronos/CMake/Modules/FindMySQLClient.cmake b/chronos/CMake/Modules/FindMySQLClient.cmake new file mode 100644 index 00000000..4548a596 --- /dev/null +++ b/chronos/CMake/Modules/FindMySQLClient.cmake @@ -0,0 +1,7 @@ +find_path(MySQLClient_INCLUDE_DIRS mysql.h PATH_SUFFIXES mysql) + +find_library(MySQLClient_LIBRARIES NAMES libmysqlclient mysqlclient PATH_SUFFIXES mysql) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(MySQLClient DEFAULT_MSG MySQLClient_LIBRARIES MySQLClient_INCLUDE_DIRS) +mark_as_advanced(MySQLClient_INCLUDE_DIRS MySQLClient_LIBRARIES) diff --git a/chronos/CMake/Modules/FindSQLite.cmake b/chronos/CMake/Modules/FindSQLite.cmake new file mode 100644 index 00000000..243a5d1d --- /dev/null +++ b/chronos/CMake/Modules/FindSQLite.cmake @@ -0,0 +1,7 @@ +find_path(SQLITE_INCLUDE_DIRS sqlite3.h) + +find_library(SQLITE_LIBRARIES NAMES libsqlite3 sqlite3) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(SQLite DEFAULT_MSG SQLITE_LIBRARIES SQLITE_INCLUDE_DIRS) +mark_as_advanced(SQLITE_INCLUDE_DIRS SQLITE_LIBRARIES) diff --git a/chronos/CMake/Modules/Findlibev.cmake b/chronos/CMake/Modules/Findlibev.cmake new file mode 100644 index 00000000..f3feb844 --- /dev/null +++ b/chronos/CMake/Modules/Findlibev.cmake @@ -0,0 +1,7 @@ +find_path(LIBEV_INCLUDE_DIRS ev.h) + +find_library(LIBEV_LIBRARIES NAMES libev ev) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(libev DEFAULT_MSG LIBEV_LIBRARIES LIBEV_INCLUDE_DIRS) +mark_as_advanced(LIBEV_INCLUDE_DIRS LIBEV_LIBRARIES) diff --git a/chronos/CMakeLists.txt b/chronos/CMakeLists.txt new file mode 100644 index 00000000..283eb48e --- /dev/null +++ b/chronos/CMakeLists.txt @@ -0,0 +1,45 @@ +cmake_minimum_required(VERSION 3.1) +project(chronos) + +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/CMake/Modules") + +set(CMAKE_CXX_STANDARD 14) + +set(SOURCES + main.cpp + App.cpp + Config.cpp + HTTPRequest.cpp + MySQL_DB.cpp + MySQL_Result.cpp + SQLite.cpp + UpdateThread.cpp + Utils.cpp + WorkerThread.cpp +) + +add_executable(${PROJECT_NAME} ${SOURCES}) + +find_package(MySQLClient REQUIRED) +find_package(CURL REQUIRED) +find_package(libev REQUIRED) +find_package(SQLite REQUIRED) +find_package(Threads REQUIRED) + +include_directories(${MySQLClient_INCLUDE_DIRS} + ${CURL_INCLUDE_DIRS} + ${LIBEV_INCLUDE_DIRS} + ${SQLITE_INCLUDE_DIRS} +) +target_link_libraries(${PROJECT_NAME} + ${MySQLClient_LIBRARIES} + ${CURL_LIBRARIES} + ${LIBEV_LIBRARIES} + ${SQLITE_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} +) + +include_directories(${PROJECT_SOURCE_DIR}) + +install(TARGETS ${PROJECT_NAME} DESTINATION bin) + diff --git a/chronos/Config.cpp b/chronos/Config.cpp new file mode 100644 index 00000000..9b5bc8f7 --- /dev/null +++ b/chronos/Config.cpp @@ -0,0 +1,68 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "Config.h" + +#include +#include + +#include +#include + +#include "Utils.h" + +using namespace Chronos; + +Config::Config(const std::string &fileName) +{ + //! @todo This should really be done properly in a C++ way. + + char buffer[512]; + + FILE *fp = fopen(fileName.c_str(), "r"); + if(fp == nullptr) + throw std::runtime_error(std::string("Config::Config(): Failed to open config file: ") + fileName); + + while(!feof(fp) && fgets(buffer, sizeof(buffer)-2, fp) != nullptr) + { + if(strlen(buffer) < 3 || buffer[0] == '#') + continue; + + char *eqPos = strchr(buffer, '='); + if(eqPos != nullptr) + { + std::string key(buffer, (size_t)(eqPos-buffer)), value(eqPos+1, strlen(eqPos)-1); + this->data[Utils::trim(key)] = Utils::trim(value); + } + } + + fclose(fp); +} + +Config::~Config() +{ +} + +std::string Config::get(const std::string &key) +{ + std::lock_guard lg(this->lock); + return this->data[key]; +} + +int Config::getInt(const std::string &key) +{ + return std::stoi(get(key)); +} + +unsigned int Config::getUInt(const std::string &key) +{ + return std::stoul(get(key)); +} diff --git a/chronos/Config.h b/chronos/Config.h new file mode 100644 index 00000000..52453622 --- /dev/null +++ b/chronos/Config.h @@ -0,0 +1,38 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _CONFIG_H_ +#define _CONFIG_H_ + +#include +#include +#include + +namespace Chronos +{ + class Config + { + public: + Config(const std::string &fileName); + ~Config(); + + public: + std::string get(const std::string &key); + int getInt(const std::string &key); + unsigned int getUInt(const std::string &key); + + private: + std::map data; + std::mutex lock; + }; +}; + +#endif diff --git a/chronos/HTTPRequest.cpp b/chronos/HTTPRequest.cpp new file mode 100644 index 00000000..e78cf926 --- /dev/null +++ b/chronos/HTTPRequest.cpp @@ -0,0 +1,246 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "HTTPRequest.h" + +#include + +#include +#include + +#include "WorkerThread.h" +#include "JobResult.h" +#include "Utils.h" +#include "App.h" + +using namespace Chronos; + +namespace { + +size_t curlWriteFunction(char *buffer, size_t size, size_t nitems, void *userdata) +{ + size_t realSize = size * nitems; + std::string data(buffer, realSize); + if(!static_cast(userdata)->processData(data)) + return 0; + return realSize; +} + +size_t curlHeaderFunction(char *buffer, size_t size, size_t nitems, void *userdata) +{ + size_t realSize = size * nitems; + std::string headerData(buffer, realSize); + static_cast(userdata)->processHeaders(headerData); + return realSize; +} + +} + +HTTPRequest::HTTPRequest(CURLM *curlMultiHandle) + : multiHandle{curlMultiHandle}, + result{std::make_unique()} +{ + maxSize = App::getInstance()->config->getInt("request_max_size"); + memset(curlError, 0, sizeof(curlError)); +} + +HTTPRequest::~HTTPRequest() +{ + if(easy != nullptr) + { + curl_multi_remove_handle(multiHandle, easy); + curl_easy_cleanup(easy); + easy = nullptr; + } +} + +void HTTPRequest::processHeaders(const std::string &headers) +{ + if(headers.length() > sizeof("HTTP/1.1 000") + && headers.find("HTTP/") == 0 + && headers.at(8) == ' ' + && headers.at(12) == ' ') + { + result->statusText = headers.substr(13); + while(!result->statusText.empty() + && (result->statusText.back() == '\n' + || result->statusText.back() == '\r')) + { + result->statusText.pop_back(); + } + return; + } + + result->responseHeaders += headers; +} + +bool HTTPRequest::processData(const std::string &data) +{ + result->responseBody += data; + + if(result->responseBody.length() > maxSize) + { + result->responseBody = {}; + return false; + } + + return true; +} + +void HTTPRequest::done(CURLcode res) +{ + char *clientIP = nullptr; + int clientPort = 0; + CURLcode getRes; + + getRes = curl_easy_getinfo(easy, CURLINFO_PRIMARY_IP, &clientIP); + if(getRes != CURLE_OK) + clientIP = nullptr; + if(clientIP != nullptr) + result->peerAddress = clientIP; + + getRes = curl_easy_getinfo(easy, CURLINFO_PRIMARY_PORT, &clientPort); + if(getRes != CURLE_OK) + clientPort = 0; + if(clientPort > 0) + result->peerPort = clientPort; + + switch(res) + { + case CURLE_URL_MALFORMAT: + result->status = JOBSTATUS_FAILED_URL; + result->statusText = "Malformed URL"; + break; + + case CURLE_UNSUPPORTED_PROTOCOL: + result->status = JOBSTATUS_FAILED_URL; + result->statusText = "Unsupported protocol"; + break; + + case CURLE_COULDNT_CONNECT: + result->status = JOBSTATUS_FAILED_CONNECT; + result->statusText = std::string("Could not connect: ") + curlError; + break; + + case CURLE_COULDNT_RESOLVE_HOST: + result->status = JOBSTATUS_FAILED_DNS; + result->statusText = std::string("Could not resolve host: ") + curlError; + break; + + case CURLE_OPERATION_TIMEDOUT: + result->status = JOBSTATUS_FAILED_TIMEOUT; + break; + + case CURLE_FILESIZE_EXCEEDED: + case CURLE_WRITE_ERROR: + result->status = JOBSTATUS_FAILED_SIZE; + break; + + case CURLE_LOGIN_DENIED: + case CURLE_REMOTE_ACCESS_DENIED: + case CURLE_OK: + { + long httpCode = 0; + curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &httpCode); + + result->httpStatus = httpCode; + + if(httpCode == 200) + { + result->status = JOBSTATUS_OK; + } + else + { + result->status = JOBSTATUS_FAILED_HTTPERROR; + } + } + break; + + default: + result->status = JOBSTATUS_FAILED_OTHERS; + result->statusText = std::string("Other error: ") + curlError + std::string(" (") + std::to_string(res) + std::string(")"); + break; + } + + result->dateDone = Utils::getTimestampMS(); + result->duration = result->dateDone - result->dateStarted; + + if(onDone) + onDone(); +} + +void HTTPRequest::submit() +{ + result->dateStarted = Utils::getTimestampMS(); + result->jitter = result->dateStarted - result->datePlanned; + + if(!isValid) + { + strcpy(curlError, "Job not valid"); + done(CURLE_OBSOLETE); + return; + } + + easy = curl_easy_init(); + if(easy == nullptr) + { + std::cout << "Handle creation failed!" << std::endl; + strcpy(curlError, "Failed to create handle!"); + done(CURLE_OBSOLETE); + return; + } + + curl_easy_setopt(easy, CURLOPT_DNS_CACHE_TIMEOUT, 0); + curl_easy_setopt(easy, CURLOPT_FORBID_REUSE, 1); + curl_easy_setopt(easy, CURLOPT_FRESH_CONNECT, 1); + curl_easy_setopt(easy, CURLOPT_PRIVATE, this); + curl_easy_setopt(easy, CURLOPT_PROTOCOLS, CURLPROTO_HTTP | CURLPROTO_HTTPS); + curl_easy_setopt(easy, CURLOPT_FOLLOWLOCATION, 0); + curl_easy_setopt(easy, CURLOPT_URL, url.c_str()); + curl_easy_setopt(easy, CURLOPT_NOPROGRESS, 1); + curl_easy_setopt(easy, CURLOPT_ERRORBUFFER, curlError); + curl_easy_setopt(easy, CURLOPT_WRITEFUNCTION, curlWriteFunction); + curl_easy_setopt(easy, CURLOPT_WRITEDATA, this); + curl_easy_setopt(easy, CURLOPT_HEADERFUNCTION, curlHeaderFunction); + curl_easy_setopt(easy, CURLOPT_HEADERDATA, this); + curl_easy_setopt(easy, CURLOPT_TIMEOUT, App::getInstance()->config->getInt("request_timeout")); + curl_easy_setopt(easy, CURLOPT_MAXFILESIZE, maxSize); + curl_easy_setopt(easy, CURLOPT_USERAGENT, App::getInstance()->config->get("user_agent").c_str()); + curl_easy_setopt(easy, CURLOPT_SSL_VERIFYPEER, 0); + curl_easy_setopt(easy, CURLOPT_SSL_VERIFYHOST, 0); + curl_easy_setopt(easy, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4); + + if(useAuth) + { + curl_easy_setopt(easy, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); + curl_easy_setopt(easy, CURLOPT_USERNAME, authUsername.c_str()); + curl_easy_setopt(easy, CURLOPT_PASSWORD, authPassword.c_str()); + } + + CURLMcode res = curl_multi_add_handle(multiHandle, easy); + if(res != CURLM_OK) + { + sprintf(curlError, "Failed to add handle! (%d)", res); + done(CURLE_OBSOLETE); + return; + } +} + +HTTPRequest *HTTPRequest::fromURL(const std::string &url, int userID, const std::shared_ptr &wt) +{ + HTTPRequest *req = new HTTPRequest(wt->curlHandle); + req->result->userID = userID; + req->result->url = url; + req->url = url; + req->isValid = true; + return req; +} + diff --git a/chronos/HTTPRequest.h b/chronos/HTTPRequest.h new file mode 100644 index 00000000..941a8901 --- /dev/null +++ b/chronos/HTTPRequest.h @@ -0,0 +1,66 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _HTTPREQUEST_H_ +#define _HTTPREQUEST_H_ + +#include +#include +#include + +#include + +namespace Chronos +{ + class WorkerThread; + class JobResult; + + class HTTPRequest + { + private: + HTTPRequest(CURLM *curlMultiHandle); + + HTTPRequest(const HTTPRequest &other) = delete; + HTTPRequest(HTTPRequest &&other) = delete; + HTTPRequest &operator=(const HTTPRequest &other) = delete; + HTTPRequest &operator=(HTTPRequest &&other) = delete; + + public: + ~HTTPRequest(); + + public: + static HTTPRequest *fromURL(const std::string &url, int userID, const std::shared_ptr &wt); + + void submit(); + void done(CURLcode res); + + bool processData(const std::string &headers); + void processHeaders(const std::string &headers); + + public: + std::string url; + bool useAuth = false; + std::string authUsername; + std::string authPassword; + std::unique_ptr result; + + std::function onDone; + + private: + CURL *easy = nullptr; + CURLM *multiHandle = nullptr; + bool isValid = false; + char curlError[CURL_ERROR_SIZE]; + size_t maxSize; + }; +}; + +#endif diff --git a/chronos/JobResult.h b/chronos/JobResult.h new file mode 100644 index 00000000..290d5183 --- /dev/null +++ b/chronos/JobResult.h @@ -0,0 +1,60 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _JOBRESULT_H_ +#define _JOBRESULT_H_ + +#include + +#include + +namespace Chronos +{ + enum JobStatus_t + { + JOBSTATUS_UNKNOWN = 0, + JOBSTATUS_OK = 1, + JOBSTATUS_FAILED_DNS = 2, + JOBSTATUS_FAILED_CONNECT = 3, + JOBSTATUS_FAILED_HTTPERROR = 4, + JOBSTATUS_FAILED_TIMEOUT = 5, + JOBSTATUS_FAILED_SIZE = 6, + JOBSTATUS_FAILED_URL = 7, + JOBSTATUS_FAILED_INTERNAL = 8, + JOBSTATUS_FAILED_OTHERS = 9 + }; + + struct JobResult + { + int userID = 0; + int jobID = 0; + uint64_t dateStarted = 0; // in ms + uint64_t datePlanned = 0; // in ms + uint64_t dateDone = 0 ; // in ms + int jitter = 0; // in ms + std::string url; + int duration = 0; // in ms + JobStatus_t status = JOBSTATUS_UNKNOWN; + int httpStatus = 0; + std::string responseHeaders; + std::string responseBody; + std::string statusText; + bool notifyFailure = false; + bool notifySuccess = false; + bool notifyDisable = false; + bool saveResponses = false; + int oldFailCounter = 0; + std::string peerAddress; + int peerPort = 0; + }; +}; + +#endif diff --git a/chronos/MySQL.h b/chronos/MySQL.h new file mode 100644 index 00000000..50f2032e --- /dev/null +++ b/chronos/MySQL.h @@ -0,0 +1,85 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _MYSQL_H_ +#define _MYSQL_H_ + +#include +#include +#include +#include +#include + +#include + +namespace Chronos +{ + class MySQL_DB; + + class MySQL_Result + { + private: + MySQL_Result(MYSQL_RES *res); + + MySQL_Result(const MySQL_Result &other) = delete; + MySQL_Result(MySQL_Result &&other) = delete; + MySQL_Result &operator=(const MySQL_Result &other) = delete; + MySQL_Result &operator=(MySQL_Result &&other) = delete; + + public: + ~MySQL_Result(); + + public: + MYSQL_ROW fetchRow(); + MYSQL_FIELD *fetchFields(); + my_ulonglong numRows(); + my_ulonglong numFields(); + + private: + MYSQL_RES *result = nullptr; + + friend class MySQL_DB; + }; + + class MySQL_DB + { + public: + MySQL_DB(const std::string &strHost, + const std::string &strUser, + const std::string &strPass, + const std::string &strDB, + const std::string &strSocket = {}); + ~MySQL_DB(); + + private: + MySQL_DB(const MySQL_DB &other) = delete; + MySQL_DB(MySQL_DB &&other) = delete; + MySQL_DB &operator=(const MySQL_DB &other) = delete; + MySQL_DB &operator=(MySQL_DB &&other) = delete; + + public: + std::unique_ptr query(const char *strQuery, ...); + my_ulonglong insertId(); + my_ulonglong affectedRows(); + static void libInit(); + static void libCleanup(); + + private: + void connect(); + + private: + MYSQL *handle = nullptr; + std::string strHost, strUser, strPass, strDB, strSocket; + time_t lastQuery = 0; + }; +}; + +#endif diff --git a/chronos/MySQL_DB.cpp b/chronos/MySQL_DB.cpp new file mode 100644 index 00000000..20fe1910 --- /dev/null +++ b/chronos/MySQL_DB.cpp @@ -0,0 +1,201 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "MySQL.h" + +#include +#include + +#include +#include + +using namespace Chronos; + +#define MYSQL_MAX_CONNECTION_ATTEMPTS 10 +#define MYSQL_CONNECTION_ATTEMPT_DELAY 500 // ms + +MySQL_DB::MySQL_DB(const std::string &strHost, const std::string &strUser, const std::string &strPass, + const std::string &strDB, const std::string &strSocket) + : strHost{strHost}, strUser{strUser}, strPass{strPass}, strDB{strDB}, strSocket{strSocket} +{ + if(this->strHost.empty()) + this->strHost = "localhost"; + connect(); +} + +MySQL_DB::~MySQL_DB() +{ + if(handle != nullptr) + mysql_close(handle); +} + +void MySQL_DB::connect() +{ + if(handle != nullptr) + mysql_close(handle); + + if((handle = mysql_init(nullptr)) == nullptr) + throw std::runtime_error("MySQL driver initialization failed"); + + int iAttempts = 0; + bool bEstablished = false; + + while(!bEstablished && iAttempts < MYSQL_MAX_CONNECTION_ATTEMPTS) + { + iAttempts++; + + if(mysql_real_connect(handle, + strHost.c_str(), + strUser.c_str(), + strPass.c_str(), + strDB.c_str(), + 0, + strSocket.empty() ? nullptr : strSocket.c_str(), + 0) != handle) + { + // too many connections? + if(mysql_errno(handle) == 1203) + { + // delay for MYSQL_CONNECTION_ATTEMPT_DELAY ms to allow connection slots to free up + usleep(MYSQL_CONNECTION_ATTEMPT_DELAY * 1000); + continue; + } + + // other error => fail + else + { + break; + } + } + else + { + bEstablished = true; + } + } + + if(!bEstablished) + throw std::runtime_error(std::string("MySQL :") + std::string(mysql_error(handle))); + + lastQuery = time(nullptr); +} + +void MySQL_DB::libInit() +{ + if(mysql_library_init(0, nullptr, nullptr) != 0) + throw std::runtime_error("MySQL_DB::LibInit(): mysql_library_init failed"); +} + +void MySQL_DB::libCleanup() +{ + mysql_library_end(); +} + +std::unique_ptr MySQL_DB::query(const char *szQuery, ...) +{ + if(handle == nullptr) + connect(); + else if(lastQuery < time(nullptr)-10) + mysql_ping(handle); + + char szBuff[255], *szBuff2, *szArg; + std::unique_ptr res = nullptr; + std::string strQuery; + va_list arglist; + + // prepare query + va_start(arglist, szQuery); + for(int i=0; i<(int)strlen(szQuery); i++) + { + char c = szQuery[i], + c2 = szQuery[i+1]; + if(c == '%') + { + switch(c2) + { + case '%': + strQuery += '%'; + break; + case 's': + strQuery.append(va_arg(arglist, char *)); + break; + case 'd': + strQuery.append(std::to_string(va_arg(arglist, int))); + break; + case 'f': + strQuery.append(std::to_string(va_arg(arglist, double))); + break; + case 'l': + strQuery.append(std::to_string(va_arg(arglist, long int))); + break; + case 'u': + strQuery.append(std::to_string(va_arg(arglist, unsigned long))); + break; + case 'q': + szArg = va_arg(arglist, char *); + szBuff2 = new char[strlen(szArg)*2+1]; + mysql_real_escape_string(handle, szBuff2, szArg, (unsigned long)strlen(szArg)); + strQuery.append(szBuff2); + delete[] szBuff2; + break; + }; + ++i; + } + else + { + strQuery += c; + } + } + va_end(arglist); + + // execute query + lastQuery = time(nullptr); + int iAttempts = 0; + +tryQuery: + if(mysql_real_query(handle, strQuery.c_str(), (unsigned long)strQuery.length()) == 0) + { + MYSQL_RES *result = mysql_store_result(handle); + if(result != nullptr) + res = std::unique_ptr(new MySQL_Result(result)); + } + else + { + // handling for timed out connections (mysql server gone away): attempt reconnect + if(iAttempts == 0 && (mysql_errno(handle) == 2006)) + { + connect(); + iAttempts++; + goto tryQuery; + } + else if(iAttempts < 10 && mysql_errno(handle) == 1205) + { + std::cout << "Retrying query after lock timeout: " << strQuery << std::endl; + iAttempts++; + goto tryQuery; + } + else + { + throw std::runtime_error(std::string(mysql_error(handle))); + } + } + + return(res); +} + +my_ulonglong MySQL_DB::insertId() +{ + return(mysql_insert_id(handle)); +} + +my_ulonglong MySQL_DB::affectedRows() +{ + return(mysql_affected_rows(handle)); +} diff --git a/chronos/MySQL_Result.cpp b/chronos/MySQL_Result.cpp new file mode 100644 index 00000000..af0613b5 --- /dev/null +++ b/chronos/MySQL_Result.cpp @@ -0,0 +1,45 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "MySQL.h" + +using namespace Chronos; + +MySQL_Result::MySQL_Result(MYSQL_RES *res) +{ + result = res; +} + +MySQL_Result::~MySQL_Result() +{ + if(result != nullptr) + mysql_free_result(result); +} + +MYSQL_ROW MySQL_Result::fetchRow() +{ + return(mysql_fetch_row(result)); +} + +my_ulonglong MySQL_Result::numRows() +{ + return(mysql_num_rows(result)); +} + +my_ulonglong MySQL_Result::numFields() +{ + return(mysql_num_fields(result)); +} + +MYSQL_FIELD *MySQL_Result::fetchFields() +{ + return(mysql_fetch_fields(result)); +} diff --git a/chronos/Notification.h b/chronos/Notification.h new file mode 100644 index 00000000..faff6b5f --- /dev/null +++ b/chronos/Notification.h @@ -0,0 +1,25 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _NOTIFICATION_H_ +#define _NOTIFICATION_H_ + +namespace Chronos +{ + enum NotificationType_t + { + NOTIFICATION_TYPE_FAILURE = 0, + NOTIFICATION_TYPE_SUCCESS = 1, + NOTIFICATION_TYPE_DISABLE = 2 + }; +}; + +#endif diff --git a/chronos/SQLite.cpp b/chronos/SQLite.cpp new file mode 100644 index 00000000..2317695b --- /dev/null +++ b/chronos/SQLite.cpp @@ -0,0 +1,168 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "SQLite.h" + +#include +#include + +#include + +using namespace Chronos; + +SQLite_DB::SQLite_DB(const std::string &fileName, const int BusyTimeoutMs) : strFileName(fileName) +{ + int res = sqlite3_open(strFileName.c_str(), &handle); + if(res != SQLITE_OK) + { + std::stringstream err; + err << "Failed to open database " << strFileName + << ": " << sqlite3_errstr(res); + throw std::runtime_error(err.str()); + } + + res = sqlite3_busy_timeout(handle, BusyTimeoutMs); + if(res != SQLITE_OK) + { + std::stringstream err; + err << "Failed to set busy timeout:" + << sqlite3_errstr(res); + throw std::runtime_error(err.str()); + } +} + +SQLite_DB::~SQLite_DB() +{ + if(handle != nullptr) + { + sqlite3_close(handle); + handle = nullptr; + } +} + +std::unique_ptr SQLite_DB::prepare(const std::string &strQuery) +{ + sqlite3_stmt *stmt = nullptr; + int res = sqlite3_prepare_v2(handle, strQuery.c_str(), strQuery.size(), &stmt, nullptr); + if(res != SQLITE_OK) + { + std::stringstream err; + err << "Failed to prepare query " << strQuery + << ": " << sqlite3_errstr(res) << ", " + << sqlite3_errmsg(handle); + throw std::runtime_error(err.str()); + } + return std::unique_ptr(new SQLite_Statement(stmt)); +} + +int64_t SQLite_DB::insertId() +{ + return sqlite3_last_insert_rowid(handle); +} + +int SQLite_DB::affectedRows() +{ + return sqlite3_changes(handle); +} + +SQLite_Statement::SQLite_Statement(sqlite3_stmt *handle) : stmt(handle) +{ +} + +SQLite_Statement::~SQLite_Statement() +{ + if(stmt != nullptr) + { + sqlite3_finalize(stmt); + stmt = nullptr; + } +} + +void SQLite_Statement::bind(const std::string &field, int val) +{ + int res = sqlite3_bind_int(stmt, fieldIndex(field), val); + if(res != SQLITE_OK) + { + std::stringstream err; + err << "Failed to bind int value " << field + << ": " << sqlite3_errstr(res); + throw std::runtime_error(err.str()); + } +} + +void SQLite_Statement::bind(const std::string &field, const std::string &val) +{ + int res = sqlite3_bind_text(stmt, fieldIndex(field), val.c_str(), val.size(), SQLITE_TRANSIENT); + if(res != SQLITE_OK) + { + std::stringstream err; + err << "Failed to bind string value " << field + << ": " << sqlite3_errstr(res); + throw std::runtime_error(err.str()); + } +} + +int SQLite_Statement::fieldIndex(const std::string &field) +{ + int index = sqlite3_bind_parameter_index(stmt, field.c_str()); + if(index == 0) + { + std::stringstream err; + err << "Field not found: " << field; + throw std::runtime_error(err.str()); + } + return index; +} + +bool SQLite_Statement::execute() +{ + int res = sqlite3_step(stmt); + if(res != SQLITE_OK && res != SQLITE_DONE) + { + std::stringstream err; + err << "Failed to execute query: " + << sqlite3_errstr(res); + throw std::runtime_error(err.str()); + } + if(res == SQLITE_OK && !columnsFetched) + { + columns.clear(); + for(int i = 0; i < sqlite3_column_count(stmt); ++i) + { + columns.emplace(std::string(sqlite3_column_name(stmt, i)), i); + } + columnsFetched = true; + } + return(res == SQLITE_OK); +} + +void SQLite_Statement::reset() +{ + columnsFetched = false; + columns.clear(); + sqlite3_reset(stmt); +} + +int SQLite_Statement::intValue(const std::string &field) +{ + auto it = columns.find(field); + if(it == columns.end()) + throw std::runtime_error("Field not found: " + field); + return sqlite3_column_int(stmt, it->second); +} + +std::string SQLite_Statement::stringValue(const std::string &field) +{ + auto it = columns.find(field); + if(it == columns.end()) + throw std::runtime_error("Field not found: " + field); + return std::string(reinterpret_cast(sqlite3_column_text(stmt, it->second))); +} diff --git a/chronos/SQLite.h b/chronos/SQLite.h new file mode 100644 index 00000000..7f9b156d --- /dev/null +++ b/chronos/SQLite.h @@ -0,0 +1,82 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _SQLITE_H_ +#define _SQLITE_H_ + +#include +#include +#include + +#include + +struct sqlite3; +struct sqlite3_stmt; + +namespace Chronos +{ + class SQLite_DB; + + class SQLite_Statement + { + public: + ~SQLite_Statement(); + + private: + SQLite_Statement(sqlite3_stmt *handle); + + SQLite_Statement(const SQLite_Statement &other) = delete; + SQLite_Statement(SQLite_Statement &&other) = delete; + SQLite_Statement &operator=(const SQLite_Statement &other) = delete; + SQLite_Statement &operator=(SQLite_Statement &&other) = delete; + + public: + void bind(const std::string &field, int val); + void bind(const std::string &field, const std::string &val); + bool execute(); + void reset(); + int intValue(const std::string &field); + std::string stringValue(const std::string &field); + + private: + int fieldIndex(const std::string &field); + + private: + sqlite3_stmt *stmt = nullptr; + bool columnsFetched = false; + std::unordered_map columns; + + friend class SQLite_DB; + }; + + class SQLite_DB + { + public: + SQLite_DB(const std::string &fileName, const int BusyTimeoutMs = 2500); + ~SQLite_DB(); + + SQLite_DB(const SQLite_DB &other) = delete; + SQLite_DB(SQLite_DB &&other) = delete; + SQLite_DB &operator=(const SQLite_DB &other) = delete; + SQLite_DB &operator=(SQLite_DB &&other) = delete; + + public: + std::unique_ptr prepare(const std::string &strQuery); + int64_t insertId(); + int affectedRows(); + + private: + std::string strFileName; + sqlite3 *handle = nullptr; + }; +}; + +#endif diff --git a/chronos/UpdateThread.cpp b/chronos/UpdateThread.cpp new file mode 100644 index 00000000..99c8ebd0 --- /dev/null +++ b/chronos/UpdateThread.cpp @@ -0,0 +1,269 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "UpdateThread.h" + +#include + +#include +#include + +#include "App.h" +#include "Notification.h" +#include "SQLite.h" +#include "Utils.h" + +using namespace Chronos; + +UpdateThread *UpdateThread::instance = nullptr; + +UpdateThread::UpdateThread() +{ + if(UpdateThread::instance != nullptr) + throw std::runtime_error("Update thread instance already exists"); + + UpdateThread::instance = this; + + maxFailures = App::getInstance()->config->getInt("max_failures"); + userDbFilePathScheme = App::getInstance()->config->get("user_db_file_path_scheme"); + userDbFileNameScheme = App::getInstance()->config->get("user_db_file_name_scheme"); +} + +UpdateThread::~UpdateThread() +{ + UpdateThread::instance = nullptr; +} + +UpdateThread *UpdateThread::getInstance() +{ + if(UpdateThread::instance == nullptr) + throw std::runtime_error("No update thread instance available"); + return(UpdateThread::instance); +} + +void UpdateThread::addResult(std::unique_ptr result) +{ + std::lock_guard lg(queueMutex); + queue.push(std::move(result)); + queueSignal.notify_one(); +} + +void UpdateThread::storeResult(const std::unique_ptr &result) +{ + std::string userPathPart = Utils::userPathPart(result->userID); + + struct tm tmStruct = { 0 }; + time_t tmTime = result->datePlanned / 1000; + if(gmtime_r(&tmTime, &tmStruct) == nullptr) + throw std::runtime_error("gmtime_r returned nullptr"); + + // e.g. /var/lib/cron-job.org/%u + std::string dbDirPath = userDbFilePathScheme; + Utils::replace(dbDirPath, "%u", userPathPart); + if(!Utils::directoryExists(dbDirPath)) + Utils::mkPath(dbDirPath); + + // e.g. joblog-%m-%d.db + std::string dbFileName = userDbFileNameScheme; + Utils::replace(dbFileName, "%d", Utils::toString(tmStruct.tm_mday, 2)); + Utils::replace(dbFileName, "%m", Utils::toString(tmStruct.tm_mon, 2)); + + std::string dbFilePath = dbDirPath + "/" + dbFileName; + int jobLogID = 0; + int jobLogIDDay = tmStruct.tm_mday; + int jobLodIDMonth = tmStruct.tm_mon; + + try + { + std::unique_ptr userDB = std::make_unique(dbFilePath.c_str()); + + userDB->prepare("PRAGMA synchronous = OFF")->execute(); + + userDB->prepare("CREATE TABLE IF NOT EXISTS \"joblog\"(" + " \"joblogid\" INTEGER PRIMARY KEY ASC," + " \"jobid\" INTEGER NOT NULL," + " \"date\" INTEGER NOT NULL," + " \"date_planned\" INTEGER NOT NULL," + " \"jitter\" INTEGER NOT NULL," + " \"url\" TEXT NOT NULL," + " \"duration\" INTEGER NOT NULL," + " \"status\" INTEGER NOT NULL," + " \"status_text\" TEXT NOT NULL," + " \"http_status\" INTEGER NOT NULL," + " \"created\" INTEGER NOT NULL" + ")")->execute(); + userDB->prepare("CREATE INDEX IF NOT EXISTS \"idx_joblog_jobid\" ON \"joblog\" (\"jobid\")")->execute(); + + userDB->prepare("CREATE TABLE IF NOT EXISTS \"joblog_response\"(" + " \"joblogid\" INTEGER PRIMARY KEY," + " \"jobid\" INTEGER NOT NULL," + " \"date\" INTEGER NOT NULL," + " \"headers\" TEXT NOT NULL," + " \"body\" TEXT NOT NULL," + " \"created\" INTEGER NOT NULL" + ")")->execute(); + + auto stmt = userDB->prepare("INSERT INTO \"joblog\"(\"jobid\",\"date\",\"date_planned\",\"jitter\",\"url\",\"duration\",\"status\",\"status_text\",\"http_status\",\"created\") " + "VALUES(:jobid,:date,:date_planned,:jitter,:url,:duration,:status,:status_text,:http_status,strftime('%s', 'now'))"); + stmt->bind(":jobid", result->jobID); + stmt->bind(":date", static_cast(result->dateStarted / 1000)); + stmt->bind(":date_planned", static_cast(result->datePlanned / 1000)); + stmt->bind(":jitter", result->jitter); + stmt->bind(":url", result->url); + stmt->bind(":duration", result->duration); + stmt->bind(":status", static_cast(result->status)); + stmt->bind(":status_text", result->statusText); + stmt->bind(":http_status", result->httpStatus); + stmt->execute(); + + jobLogID = static_cast(userDB->insertId()); + + if(result->saveResponses && (!result->responseHeaders.empty() || !result->responseBody.empty())) + { + stmt = userDB->prepare("INSERT INTO \"joblog_response\"(\"joblogid\",\"jobid\",\"date\",\"headers\",\"body\",\"created\") " + "VALUES(:joblogid,:jobid,:date,:headers,:body,strftime('%s', 'now'))"); + stmt->bind(":joblogid", jobLogID); + stmt->bind(":jobid", result->jobID); + stmt->bind(":date", static_cast(result->dateStarted / 1000)); + stmt->bind(":headers", result->responseHeaders); + stmt->bind(":body", result->responseBody); + stmt->execute(); + } + } + catch(const std::exception &ex) + { + std::cout << "Error SQLite query: " << ex.what() << std::endl; + return; + } + + if(result->status == JOBSTATUS_OK || result->status == JOBSTATUS_FAILED_TIMEOUT) + { + db->query("UPDATE `job` SET `last_status`=%d,`last_fetch`=%d,`last_duration`=%d,`fail_counter`=0 WHERE `jobid`=%d", + static_cast(result->status), + static_cast(result->dateStarted / 1000), + static_cast(result->duration), + result->jobID); + } + else + { + db->query("UPDATE `job` SET `last_status`=%d,`last_fetch`=%d,`last_duration`=%d,`fail_counter`=`fail_counter`+1 WHERE `jobid`=%d", + static_cast(result->status), + static_cast(result->dateStarted / 1000), + static_cast(result->duration), + result->jobID); + } + + // get (new) fail counter + int failCounter = 0; + MYSQL_ROW row; + auto res = db->query("SELECT `fail_counter` FROM `job` WHERE `jobid`=%d", + result->jobID); + while((row = res->fetchRow()) != NULL) + { + failCounter = atoi(row[0]); + } + res.reset(); + + bool createNotificationRow = false; + NotificationType_t notificationType; + + // disable job? + if(failCounter > maxFailures) + { + // disable + db->query("UPDATE `job` SET `enabled`=0,`fail_counter`=0 WHERE `jobid`=%d", + result->jobID); + + // notify? + if(result->notifyDisable) + { + createNotificationRow = true; + notificationType = NOTIFICATION_TYPE_DISABLE; + } + } + + // send failure notification? + if(result->notifyFailure + && result->status != JOBSTATUS_OK + && failCounter == 1) + { + createNotificationRow = true; + notificationType = NOTIFICATION_TYPE_FAILURE; + } + + // send success notification? + if(result->notifySuccess + && result->status == JOBSTATUS_OK + && result->oldFailCounter > 0 + && failCounter == 0) + { + createNotificationRow = true; + notificationType = NOTIFICATION_TYPE_SUCCESS; + } + + if(createNotificationRow) + { + db->query("INSERT INTO `notification`(`jobid`,`joblogid`,`date`,`type`) " + "VALUES(%d,%d,%d,%d)", + result->jobID, + jobLogID, + static_cast(time(NULL)), + static_cast(notificationType)); + } +} + +void UpdateThread::stopThread() +{ + stop = true; + + std::lock_guard lg(queueMutex); + queueSignal.notify_one(); +} + +void UpdateThread::run() +{ + std::cout << "UpdateThread::run(): Entered" << std::endl; + + decltype(queue) tempQueue; + db = App::getInstance()->createMySQLConnection(); + + stop = false; + while(!stop) + { + { + std::unique_lock lock(queueMutex); + if(queue.empty()) + queueSignal.wait(lock); + queue.swap(tempQueue); + } + + auto numJobs = tempQueue.size(); + if(numJobs > 100) + std::cout << "UpdateThread::run(): " << numJobs << " update jobs fetched" << std::endl; + + time_t tStart = time(nullptr); + if(!tempQueue.empty()) + { + while(!tempQueue.empty()) + { + std::unique_ptr res = std::move(tempQueue.front()); + tempQueue.pop(); + storeResult(res); + } + } + time_t tEnd = time(nullptr); + + if(numJobs > 100) + std::cout << "UpdateThread::run(): Processing " << numJobs << " took " << (tEnd-tStart) << " seconds" << std::endl; + } + + std::cout << "UpdateThread::run(): Finished" << std::endl; +} diff --git a/chronos/UpdateThread.h b/chronos/UpdateThread.h new file mode 100644 index 00000000..5ca14642 --- /dev/null +++ b/chronos/UpdateThread.h @@ -0,0 +1,61 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _UPDATETHREAD_H_ +#define _UPDATETHREAD_H_ + +#include +#include +#include +#include +#include +#include + +#include "MySQL.h" +#include "JobResult.h" + +namespace Chronos +{ + class UpdateThread + { + public: + UpdateThread(); + ~UpdateThread(); + + private: + UpdateThread(const UpdateThread &other) = delete; + UpdateThread(UpdateThread &&other) = delete; + UpdateThread &operator=(const UpdateThread &other) = delete; + UpdateThread &operator=(UpdateThread &&other) = delete; + + public: + static UpdateThread *getInstance(); + void run(); + void stopThread(); + void addResult(std::unique_ptr result); + + private: + void storeResult(const std::unique_ptr &result); + + private: + bool stop = false; + std::unique_ptr db; + static UpdateThread *instance; + std::mutex queueMutex; + std::condition_variable queueSignal; + std::queue> queue; + int maxFailures = 0; + std::string userDbFilePathScheme; + std::string userDbFileNameScheme; + }; +}; + +#endif diff --git a/chronos/Utils.cpp b/chronos/Utils.cpp new file mode 100644 index 00000000..128b74a0 --- /dev/null +++ b/chronos/Utils.cpp @@ -0,0 +1,124 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "Utils.h" + +#include +#include + +#include +#include +#include + +using namespace Chronos; + +uint64_t Utils::getTimestampMS() +{ + uint64_t result = 0; + struct timeval tv; + + if(gettimeofday(&tv, nullptr) == 0) + result = (uint64_t)tv.tv_sec * 1000 + (uint64_t)tv.tv_usec / 1000; + + return(result); +} + +std::string Utils::trim(const std::string &in) +{ + std::string str = in; + std::string whiteSpaces = " \t\f\v\n\r"; + size_t pos; + + pos = str.find_first_not_of(whiteSpaces); + if(pos != std::string::npos) + str.erase(0, pos); + else + str.clear(); + + pos = str.find_last_not_of(whiteSpaces); + if(pos != std::string::npos) + str.erase(pos+1); + else + str.clear(); + + return(str); +} + +void Utils::replace(std::string &str, const std::string &search, const std::string &repl) +{ + size_t pos = 0; + while((pos = str.find(search, pos)) != std::string::npos) + { + str.replace(pos, search.length(), repl); + pos += search.length(); + } +} + +std::string Utils::userPathPart(const int userID) +{ + std::stringstream ss; + ss << std::hex << userID; + std::string userIdHex = ss.str(); + + std::string result; + for(size_t i = 0; i < userIdHex.length(); ++i) + { + result += userIdHex[i]; + if(i % 2 != 0) + result += '/'; + } + + if(result[result.size()-1] == '/') + result.pop_back(); + + return result; +} + +std::string Utils::toString(int num, int places) +{ + std::string result = std::to_string(num); + while(result.size() < places) + result.insert(result.begin(), '0'); + return result; +} + +bool Utils::directoryExists(const std::string &path) +{ + struct stat st; + + if(stat(path.c_str(), &st) != 0) + return false; + + return((st.st_mode & S_IFDIR) == S_IFDIR); +} + +bool Utils::mkPath(const std::string &path, const mode_t mode) +{ + std::string currentDir; + for(const char c : path) + { + currentDir += c; + if(c == '/' && currentDir.size() > 1) + { + if(!Utils::directoryExists(currentDir)) + { + if(mkdir(currentDir.c_str(), mode) != 0) + return false; + } + } + } + if(!Utils::directoryExists(currentDir)) + { + if(mkdir(currentDir.c_str(), mode) != 0) + return false; + } + return true; +} diff --git a/chronos/Utils.h b/chronos/Utils.h new file mode 100644 index 00000000..20199922 --- /dev/null +++ b/chronos/Utils.h @@ -0,0 +1,34 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _UTILS_H_ +#define _UTILS_H_ + +#include + +#include +#include + +namespace Chronos +{ + namespace Utils + { + uint64_t getTimestampMS(); + std::string trim(const std::string &in); + void replace(std::string &str, const std::string &search, const std::string &repl); + std::string userPathPart(const int userID); + std::string toString(int num, int places); + bool directoryExists(const std::string &path); + bool mkPath(const std::string &path, const mode_t mode = 0755); + }; +}; + +#endif diff --git a/chronos/WorkerThread.cpp b/chronos/WorkerThread.cpp new file mode 100644 index 00000000..47452f36 --- /dev/null +++ b/chronos/WorkerThread.cpp @@ -0,0 +1,269 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include "WorkerThread.h" + +#include +#include +#include + +#include "UpdateThread.h" +#include "App.h" + +using namespace Chronos; + +namespace { + +int curlSocketFunction(CURL *e, curl_socket_t s, int what, void *userdata, void *sockp) +{ + return static_cast(userdata)->socketFunction(e, s, what, static_cast(sockp)); +} + +int curlTimerFunction(CURLM *multi, long timeout_ms, void *userdata) +{ + return static_cast(userdata)->timerFunction(multi, timeout_ms); +} + +void evTimerFunction(EV_P_ struct ev_timer *w, int revents) +{ + static_cast(w->data)->evTimerFunction(w, revents); +} + +void evEventFunction(EV_P_ struct ev_io *w, int revents) +{ + static_cast(w->data)->evEventFunction(w, revents); +} + +} + +WorkerThread::WorkerThread(int mday, int month, int year, int hour, int minute) + : mday(mday), month(month), year(year), hour(hour), minute(minute) +{ + parallelJobs = App::getInstance()->config->getInt("parallel_requests"); +} + +WorkerThread::~WorkerThread() +{ +} + +void WorkerThread::checkResults() +{ + CURLMsg *msg; + int msgsLeft; + + while((msg = curl_multi_info_read(curlHandle, &msgsLeft))) + { + if(msg->msg == CURLMSG_DONE) + { + CURL *easy = msg->easy_handle; + CURLcode res = msg->data.result; + + HTTPRequest *req = nullptr; + if(curl_easy_getinfo(easy, CURLINFO_PRIVATE, &req) != CURLE_OK || req == nullptr) + throw std::runtime_error("Failed to retrieve associated HTTPRequest!"); + + req->done(res); + } + } +} + +int WorkerThread::timerFunction(CURLM *multi, long timeout_ms) +{ + ev_timer_stop(evLoop, &timerEvent); + if(timeout_ms > 0) + { + double t = static_cast(timeout_ms) / 1000; + ev_timer_init(&timerEvent, ::evTimerFunction, t, 0); + ev_timer_start(evLoop, &timerEvent); + } + else + { + ::evTimerFunction(evLoop, &timerEvent, 0); + } + return 0; +} + +int WorkerThread::socketFunction(CURL *e, curl_socket_t s, int what, SockInfo *sockInfo) +{ + if(what == CURL_POLL_REMOVE) + { + removeSocket(sockInfo); + } + else + { + if(sockInfo == nullptr) + addSocket(e, s, what); + else + setSocket(sockInfo, e, s, what); + } + return 0; +} + +void WorkerThread::removeSocket(SockInfo *sockInfo) +{ + if(sockInfo->evset) + ev_io_stop(evLoop, &sockInfo->ev); + delete sockInfo; +} + +void WorkerThread::addSocket(CURL *e, curl_socket_t s, int what) +{ + std::unique_ptr sockInfo = std::make_unique(); + sockInfo->evset = 0; + setSocket(sockInfo.get(), e, s, what); + curl_multi_assign(curlHandle, s, sockInfo.release()); +} + +void WorkerThread::setSocket(SockInfo *sockInfo, CURL *e, curl_socket_t s, int what) +{ + int kind = ((what & CURL_POLL_IN) ? EV_READ : 0) | ((what & CURL_POLL_OUT) ? EV_WRITE : 0); + sockInfo->sockfd = s; + sockInfo->action = what; + sockInfo->easy = e; + if(sockInfo->evset) + ev_io_stop(evLoop, &sockInfo->ev); + ev_io_init(&sockInfo->ev, ::evEventFunction, sockInfo->sockfd, kind); + sockInfo->ev.data = this; + sockInfo->evset = 1; + ev_io_start(evLoop, &sockInfo->ev); +} + +void WorkerThread::evTimerFunction(struct ev_timer *w, int revents) +{ + CURLMcode rc = curl_multi_socket_action(curlHandle, CURL_SOCKET_TIMEOUT, 0, &curlStillRunning); + if(rc != CURLM_OK) + throw std::runtime_error("curl_multi_socket_action failed (1)!"); + checkResults(); +} + +void WorkerThread::evEventFunction(struct ev_io *w, int revents) +{ + int action = (revents & EV_READ) ? CURL_POLL_IN : 0; + action |= (revents & EV_WRITE) ? CURL_POLL_OUT : 0; + CURLMcode rc = curl_multi_socket_action(curlHandle, w->fd, action, &curlStillRunning); + if(rc != CURLM_OK) + throw std::runtime_error("curl_multi_socket_action failed (2)!"); + checkResults(); + if(curlStillRunning <= 0) + ev_timer_stop(evLoop, &timerEvent); +} + +void WorkerThread::addJob(HTTPRequest *req) +{ + requestQueue.push(req); +} + +void WorkerThread::run() +{ + keepAlive = shared_from_this(); + + if(requestQueue.empty()) + return; + + workerThread = std::thread(std::bind(&WorkerThread::threadMain, this)); + workerThread.detach(); +} + +void WorkerThread::runJobs() +{ + while(runningJobs < parallelJobs && !requestQueue.empty()) + { + HTTPRequest *job = requestQueue.front(); + requestQueue.pop(); + + ++runningJobs; + + job->onDone = std::bind(&WorkerThread::jobDone, this, job); + job->submit(); + } +} + +void WorkerThread::jobDone(HTTPRequest *req) +{ + jitterSum += req->result->jitter; + + // push result to result queue + UpdateThread::getInstance()->addResult(std::move(req->result)); + + // clean up + delete req; + --runningJobs; + + // start more jobs + runJobs(); + + // exit event loop when all requests have finished + if(runningJobs == 0) + ev_break(evLoop, EVBREAK_ALL); +} + +void WorkerThread::addStat() +{ + try + { + std::unique_ptr db(App::getInstance()->createMySQLConnection()); + + db->query("REPLACE INTO `stats`(`d`,`m`,`y`,`h`,`i`,`jobs`,`jitter`) VALUES(%d,%d,%d,%d,%d,%d,%f)", + mday, month, year, hour, minute, + jobCount, jitterSum / static_cast(jobCount)); + } + catch(const std::exception &ex) + { + std::cout << "WorkerThread::addStat(): Exception: " << ex.what() << std::endl; + } +} + +void WorkerThread::threadMain() +{ + try + { + std::cout << "WorkerThread::threadMain(): Entered" << std::endl; + + jobCount = requestQueue.size(); + + // init event loop + evLoop = ev_loop_new(EVFLAG_AUTO); + if(evLoop == nullptr) + throw std::runtime_error("ev_loop_new() failed"); + + curlHandle = curl_multi_init(); + if(curlHandle == nullptr) + throw std::runtime_error("curl_multi_init() failed"); + + ev_timer_init(&timerEvent, ::evTimerFunction, 0, 0); + timerEvent.data = this; + + curl_multi_setopt(curlHandle, CURLMOPT_SOCKETFUNCTION, ::curlSocketFunction); + curl_multi_setopt(curlHandle, CURLMOPT_SOCKETDATA, this); + curl_multi_setopt(curlHandle, CURLMOPT_TIMERFUNCTION, ::curlTimerFunction); + curl_multi_setopt(curlHandle, CURLMOPT_TIMERDATA, this); + + // add jobs + runJobs(); + + // main loop + ev_loop(evLoop, 0); + + // clean up + curl_multi_cleanup(curlHandle); + ev_loop_destroy(evLoop); + + addStat(); + + std::cout << "WorkerThread::threadMain(): Finished" << std::endl; + } + catch(const std::runtime_error &ex) + { + std::cout << "WorkerThread::threadEntry(): Exception: " << ex.what() << std::endl; + } + + keepAlive.reset(); +} diff --git a/chronos/WorkerThread.h b/chronos/WorkerThread.h new file mode 100644 index 00000000..a324389b --- /dev/null +++ b/chronos/WorkerThread.h @@ -0,0 +1,92 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#ifndef _WORKERTHREAD_H_ +#define _WORKERTHREAD_H_ + +#include "HTTPRequest.h" + +#include +#include +#include +#include + +#include +#include + +namespace Chronos +{ + struct SockInfo + { + curl_socket_t sockfd; + CURL *easy; + int action; + long timeout; + struct ev_io ev; + int evset; + }; + + class WorkerThread : public std::enable_shared_from_this + { + public: + WorkerThread(int mday, int month, int year, int hour, int minute); + ~WorkerThread(); + + private: + WorkerThread(const WorkerThread &other) = delete; + WorkerThread(WorkerThread &&other) = delete; + WorkerThread &operator=(const WorkerThread &other) = delete; + WorkerThread &operator=(WorkerThread &&other) = delete; + + public: + void addJob(HTTPRequest *req); + void run(); + void threadMain(); + void jobDone(HTTPRequest *req); + + int timerFunction(CURLM *multi, long timeout_ms); + int socketFunction(CURL *e, curl_socket_t s, int what, SockInfo *sockInfo); + void removeSocket(SockInfo *sockInfo); + void addSocket(CURL *e, curl_socket_t s, int what); + void setSocket(SockInfo *sockInfo, CURL *e, curl_socket_t s, int what); + + void evTimerFunction(struct ev_timer *w, int revents); + void evEventFunction(struct ev_io *w, int revents); + + void checkResults(); + + private: + void runJobs(); + void addStat(); + + public: + CURLM *curlHandle = nullptr; + + private: + struct ev_loop *evLoop = nullptr; + struct ev_timer timerEvent; + std::shared_ptr keepAlive; + std::queue requestQueue; + int runningJobs = 0; + std::thread workerThread; + int parallelJobs; + int curlStillRunning = 0; + double jitterSum = 0; + int jobCount = 0; + int mday; + int month; + int year; + int hour; + int minute; + }; +}; + +#endif diff --git a/chronos/chronos.cfg b/chronos/chronos.cfg new file mode 100644 index 00000000..be44ee34 --- /dev/null +++ b/chronos/chronos.cfg @@ -0,0 +1,29 @@ +# MySQL login +mysql_host = localhost +mysql_user = enter_mysql_user_here +mysql_pass = enter_mysql_password_here +mysql_db = enter_mysql_db_name_here + +# Use job log entries up to this age for jitter average calculation +jitter_avg_time = 300 + +# Number of parallel requests per thread +parallel_requests = 8192 + +# Timeout (in seconds) for requests +request_timeout = 30 + +# Maximum size (in bytes) of request body +request_max_size = 8192 + +# Maximum count of subsequent failures before job gets disabled automatically +max_failures = 15 + +# Path scheme for job log databases, %u will be replaced by a path computed from the user ID +user_db_file_path_scheme = /var/lib/cron-job.org-data/%u + +# Scheme for job log databases, %m will be replaced by the month, %d by the month day +user_db_file_name_scheme = joblog-%m-%d.db + +# User-Agent header for requests +user_agent = Mozilla/4.0 (compatible) diff --git a/chronos/main.cpp b/chronos/main.cpp new file mode 100644 index 00000000..cdd4da26 --- /dev/null +++ b/chronos/main.cpp @@ -0,0 +1,34 @@ +/* + * chronos, the cron-job.org execution daemon + * Copyright (C) 2017 Patrick Schlangen + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + */ + +#include +#include +#include + +#include "App.h" + +int main(int argc, char *argv[]) +{ + int result = 1; + std::unique_ptr app; + + try + { + app = std::make_unique(argc, argv); + result = app->run(); + } + catch(const std::runtime_error &ex) + { + std::cout << "Chronos runtime error: " << ex.what() << std::endl; + } + + return(result); +} diff --git a/database/struct.sql b/database/struct.sql new file mode 100644 index 00000000..a0d4af94 --- /dev/null +++ b/database/struct.sql @@ -0,0 +1,109 @@ +CREATE TABLE `job` ( + `jobid` int(11) NOT NULL AUTO_INCREMENT, + `userid` int(11) NOT NULL DEFAULT '0', + `enabled` tinyint(4) NOT NULL DEFAULT '1', + `title` varchar(128) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `url` varchar(255) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `auth_enable` tinyint(1) NOT NULL DEFAULT '0', + `auth_user` varchar(128) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `auth_pass` varchar(128) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `notify_failure` tinyint(1) NOT NULL DEFAULT '1', + `notify_success` tinyint(1) NOT NULL DEFAULT '1', + `notify_disable` tinyint(1) NOT NULL DEFAULT '1', + `last_status` tinyint(4) NOT NULL DEFAULT '0', + `last_fetch` int(11) NOT NULL DEFAULT '0', + `last_duration` int(11) NOT NULL DEFAULT '0', + `fail_counter` int(11) NOT NULL DEFAULT '0', + `save_responses` tinyint(4) NOT NULL DEFAULT '0', + PRIMARY KEY (`jobid`), + KEY `userid` (`userid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `job_hours` ( + `jobid` int(11) NOT NULL DEFAULT '0', + `hour` tinyint(2) NOT NULL DEFAULT '0', + PRIMARY KEY (`jobid`,`hour`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `job_mdays` ( + `jobid` int(11) NOT NULL DEFAULT '0', + `mday` tinyint(2) NOT NULL DEFAULT '0', + PRIMARY KEY (`jobid`,`mday`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `job_minutes` ( + `jobid` int(11) NOT NULL DEFAULT '0', + `minute` tinyint(2) NOT NULL DEFAULT '0', + PRIMARY KEY (`jobid`,`minute`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `job_months` ( + `jobid` int(11) NOT NULL DEFAULT '0', + `month` tinyint(2) NOT NULL DEFAULT '0', + PRIMARY KEY (`jobid`,`month`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `job_wdays` ( + `jobid` int(11) NOT NULL DEFAULT '0', + `wday` tinyint(1) NOT NULL DEFAULT '0', + PRIMARY KEY (`jobid`,`wday`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `jobdeletequeue` ( + `jobid` int(11) NOT NULL DEFAULT '0', + `date` int(11) NOT NULL DEFAULT '0', + PRIMARY KEY (`jobid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `notification` ( + `notificationid` int(11) NOT NULL AUTO_INCREMENT, + `jobid` int(11) NOT NULL DEFAULT '0', + `joblogid` int(11) unsigned NOT NULL DEFAULT '0', + `date` int(14) NOT NULL DEFAULT '0', + `date_processed` int(14) NOT NULL DEFAULT '0', + `type` tinyint(4) NOT NULL DEFAULT '0', + `status` tinyint(4) NOT NULL DEFAULT '0', + PRIMARY KEY (`notificationid`), + KEY `jobid` (`jobid`), + KEY `joblogid` (`joblogid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `stats` ( + `d` tinyint(4) NOT NULL DEFAULT '0', + `m` tinyint(4) NOT NULL DEFAULT '0', + `y` int(11) NOT NULL DEFAULT '0', + `h` tinyint(4) NOT NULL DEFAULT '0', + `i` tinyint(4) NOT NULL DEFAULT '0', + `jobs` int(11) NOT NULL DEFAULT '0', + `jitter` double NOT NULL DEFAULT '0', + PRIMARY KEY (`d`,`m`,`y`,`h`,`i`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `user` ( + `userid` int(11) NOT NULL AUTO_INCREMENT, + `status` tinyint(4) NOT NULL DEFAULT '0', + `email` varchar(255) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `password` varchar(40) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `password_salt` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `salutation` enum('mr','mrs') COLLATE utf8_unicode_ci NOT NULL DEFAULT 'mr', + `firstname` varchar(64) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `lastname` varchar(64) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `signup_ip` varchar(48) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `signup_date` int(11) NOT NULL DEFAULT '0', + `verification_token` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `verification_date` int(11) NOT NULL DEFAULT '0', + `lastlogin_ip` varchar(48) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `lastlogin_date` int(11) NOT NULL DEFAULT '0', + PRIMARY KEY (`userid`), + KEY `email` (`email`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + +CREATE TABLE `user_pwreset` ( + `userid` int(11) NOT NULL DEFAULT '0', + `expires` int(11) NOT NULL DEFAULT '0', + `token` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `password` varchar(40) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + `password_salt` varchar(16) COLLATE utf8_unicode_ci NOT NULL DEFAULT '', + PRIMARY KEY (`userid`), + KEY `expires` (`expires`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;