diff --git a/CHANGELOG b/CHANGELOG index 2354a0ca64..612b06d407 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -20,6 +20,7 @@ - Mouse over view names shows expression (issue #220) - Display SPI Data even if node is unavailabe (issue #219) - Netflow plugin timestamp fixes (issue #241) + - Comma seperate list of elasticsearch hosts (issue #176) 0.11.0 2014/05/08 - BREAKING: elasticsearch 0.90.7 or newer required, recommend 0.90.12+, diff --git a/capture/http.c b/capture/http.c index 366382b6cc..8fb36ef916 100644 --- a/capture/http.c +++ b/capture/http.c @@ -69,6 +69,7 @@ typedef struct molochconn_t { http_parser parser; MolochRequest_t *request; struct molochhttp_t *server; + char *name; } MolochConn_t; typedef struct { @@ -78,7 +79,9 @@ typedef struct { typedef struct molochhttp_t { MolochConn_t *syncConn; - char *name; + char **names; + int namesCnt; + int namesPos; char compress; int port; uint16_t maxConns; @@ -175,7 +178,7 @@ gboolean moloch_http_read_cb(gint UNUSED(fd), GIOCondition cond, gpointer data) LOG("ERROR: %p: Receive Error: %s", (void*)conn, gerror->message); g_error_free(gerror); } else if (cond & (G_IO_HUP | G_IO_ERR)) - LOG("ERROR: %p: Lost connection to %s", (void*)conn, conn->server->name); + LOG("ERROR: %p: Lost connection to %s", (void*)conn, conn->name); else if (len <= 0) LOG("ERROR: %p: len: %d cond: %x", (void*)conn, len, cond); @@ -244,6 +247,8 @@ int moloch_http_connect(MolochConn_t *conn, char *name, int defaultport, int blo exit(0); } + conn->name = name; + enumerator = g_socket_connectable_enumerate (connectable); g_object_unref(connectable); @@ -282,7 +287,6 @@ int moloch_http_connect(MolochConn_t *conn, char *name, int defaultport, int blo return 1; } - //g_object_ref (conn->conn); g_socket_set_keepalive(conn->conn, TRUE); int fd = g_socket_get_fd(conn->conn); @@ -337,6 +341,18 @@ void moloch_http_finish( MolochConn_t *conn, gboolean sync) } /******************************************************************************/ +char *moloch_http_get_name(MolochHttp_t *server) +{ + char *name; + if (server->names[server->namesPos]) { + name = server->names[server->namesPos]; + server->namesPos++; + return name; + } + server->namesPos = 0; + return server->names[0]; +} +/******************************************************************************/ gboolean moloch_http_process_send(MolochConn_t *conn, gboolean sync) { char buffer[3000]; @@ -345,7 +361,7 @@ gboolean moloch_http_process_send(MolochConn_t *conn, gboolean sync) MolochRequest_t *request = conn->request; if (conn->conn == 0) { - if (moloch_http_connect(conn, conn->server->name, conn->server->port, TRUE)) { + if (moloch_http_connect(conn, moloch_http_get_name(conn->server), conn->server->port, TRUE)) { LOG("%p: Couldn't connect from process", (void*)conn); return FALSE; } @@ -544,40 +560,49 @@ unsigned char *moloch_http_get(void *server, char *key, int key_len, size_t *mle MolochConn_t * moloch_http_create(MolochHttp_t *server, int blocking) { MolochConn_t *conn; + int tries = server->namesCnt; conn = MOLOCH_TYPE_ALLOC0(MolochConn_t); conn->parser.data = conn; conn->server = server; - if (moloch_http_connect(conn, server->name, server->port, blocking)) { - LOG("Couldn't connect to '%s'", server->name); - exit (1); + while (tries > 0) { + if (!moloch_http_connect(conn, moloch_http_get_name(server), server->port, blocking)) { + return conn; + } + tries--; + LOG("Couldn't connect to '%s'", conn->name); } - return conn; + exit (1); } /******************************************************************************/ -void *moloch_http_create_server(char *hostname, int defaultPort, int maxConns, int maxOutstandingRequests, int compress) +void *moloch_http_create_server(char *hostnames, int defaultPort, int maxConns, int maxOutstandingRequests, int compress) { MolochHttp_t *server = MOLOCH_TYPE_ALLOC0(MolochHttp_t); DLL_INIT(r_, &server->requestQ[0]); DLL_INIT(r_, &server->requestQ[1]); DLL_INIT(e_, &server->connQ); - if (strncmp(hostname, "http://", 7) == 0) { - server->name = strdup(hostname+7); - } else if (strncmp(hostname, "https://", 8) == 0) { - LOG("https not supported yet %s", hostname); - exit(0); - } else { - server->name = strdup(hostname); + + server->names = g_strsplit(hostnames, ",", 0); + uint32_t i; + for (i = 0; server->names[i]; i++) { + if (strncmp(server->names[i], "http://", 7) == 0) { + char *tmp = g_strdup(server->names[i] + 7); + g_free(server->names[i]); + server->names[i] = tmp; + } else if (strncmp(server->names[i], "https://", 8) == 0) { + LOG("https not supported yet %s", server->names[i]); + exit(0); + } } + server->namesCnt = i; server->port = defaultPort; server->maxConns = maxConns; server->maxOutstandingRequests = maxOutstandingRequests; server->compress = compress; server->syncConn = moloch_http_create(server, TRUE); - uint32_t i; for (i = 0; i < server->maxConns; i++) { MolochConn_t *conn = moloch_http_create(server, FALSE); DLL_PUSH_TAIL(e_, &server->connQ, conn); @@ -620,7 +645,7 @@ void moloch_http_free_server(void *serverV) MOLOCH_TYPE_FREE(MolochConn_t, server->syncConn); server->syncConn = 0; - free(server->name); + g_strfreev(server->names); MOLOCH_TYPE_FREE(MolochHttp_t, server); } diff --git a/config.ini b/config.ini index 73fdfb2755..fb9999eea3 100644 --- a/config.ini +++ b/config.ini @@ -12,9 +12,9 @@ # 3rd) The section titled "default" is used last. [default] -# The elasticsearch host:port. If not using a elasticsearch VIP, a different -# elasticsearch node in the cluster can be specified for each Moloch node to -# help spread load on high volume clusters +# Comma seperated list of elasticsearch host:port combinations. If not using a +# elasticsearch VIP, a different elasticsearch node in the cluster can be specified +# for each Moloch node to help spread load on high volume clusters elasticsearch=CHANGEME_ESHOSTNAME:9200 # How often to create a new elasticsearch index. hourly,daily,weekly,monthly diff --git a/single-host/etc/config.ini.template b/single-host/etc/config.ini.template index bd04764579..e111258205 100644 --- a/single-host/etc/config.ini.template +++ b/single-host/etc/config.ini.template @@ -12,9 +12,9 @@ # 3rd) The section titled "default" is used last. [default] -# The elasticsearch host:port. If not using a elasticsearch VIP, a different -# elasticsearch node in the cluster can be specified for each Moloch node to -# help spread load on high volume clusters +# Comma seperated list of elasticsearch host:port combinations. If not using a +# elasticsearch VIP, a different elasticsearch node in the cluster can be specified +# for each Moloch node to help spread load on high volume clusters elasticsearch=localhost:9200 # How often to create a new elasticsearch index. hourly,daily,weekly,monthly diff --git a/viewer/viewer.js b/viewer/viewer.js index 735066b52b..e713881e2c 100644 --- a/viewer/viewer.js +++ b/viewer/viewer.js @@ -66,7 +66,7 @@ var app = express(); //// Config ////////////////////////////////////////////////////////////////////////////////// var internals = { - elasticBase: Config.get("elasticsearch", "http://localhost:9200"), + elasticBase: Config.get("elasticsearch", "http://localhost:9200").split(","), httpAgent: new KAA({maxSockets: 40}), httpsAgent: new KAA.Secure({maxSockets: 40}), previousNodeStats: [], @@ -77,8 +77,8 @@ var internals = { PNG_LINE_WIDTH: 256, }; -if (internals.elasticBase.lastIndexOf('http', 0) !== 0) { - internals.elasticBase = "http://" + internals.elasticBase; +if (internals.elasticBase[0].lastIndexOf('http', 0) !== 0) { + internals.elasticBase[0] = "http://" + internals.elasticBase[0]; } function userCleanup(suser) { @@ -113,7 +113,7 @@ app.configure(function() { app.locals.molochversion = molochversion.version; app.locals.isIndex = false; app.locals.basePath = Config.basePath(); - app.locals.elasticBase = internals.elasticBase; + app.locals.elasticBase = internals.elasticBase[0]; app.locals.allowUploads = Config.get("uploadCommand") !== undefined; app.locals.sendSession = Config.getObj("sendSession");