Skip to content

Commit

Permalink
Comma seperate list of elasticsearch hosts (issue arkime#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
awick committed Jun 10, 2014
1 parent cbe7fb3 commit f71f769
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- Mouse over view names shows expression (issue #220)
- Display SPI Data even if node is unavailabe (issue #219)
- Netflow plugin timestamp fixes (issue #241)
- Comma seperate list of elasticsearch hosts (issue #176)

0.11.0 2014/05/08
- BREAKING: elasticsearch 0.90.7 or newer required, recommend 0.90.12+,
Expand Down
61 changes: 43 additions & 18 deletions capture/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ typedef struct molochconn_t {
http_parser parser;
MolochRequest_t *request;
struct molochhttp_t *server;
char *name;
} MolochConn_t;

typedef struct {
Expand All @@ -78,7 +79,9 @@ typedef struct {

typedef struct molochhttp_t {
MolochConn_t *syncConn;
char *name;
char **names;
int namesCnt;
int namesPos;
char compress;
int port;
uint16_t maxConns;
Expand Down Expand Up @@ -175,7 +178,7 @@ gboolean moloch_http_read_cb(gint UNUSED(fd), GIOCondition cond, gpointer data)
LOG("ERROR: %p: Receive Error: %s", (void*)conn, gerror->message);
g_error_free(gerror);
} else if (cond & (G_IO_HUP | G_IO_ERR))
LOG("ERROR: %p: Lost connection to %s", (void*)conn, conn->server->name);
LOG("ERROR: %p: Lost connection to %s", (void*)conn, conn->name);
else if (len <= 0)
LOG("ERROR: %p: len: %d cond: %x", (void*)conn, len, cond);

Expand Down Expand Up @@ -244,6 +247,8 @@ int moloch_http_connect(MolochConn_t *conn, char *name, int defaultport, int blo
exit(0);
}

conn->name = name;

enumerator = g_socket_connectable_enumerate (connectable);
g_object_unref(connectable);

Expand Down Expand Up @@ -282,7 +287,6 @@ int moloch_http_connect(MolochConn_t *conn, char *name, int defaultport, int blo
return 1;
}


//g_object_ref (conn->conn);
g_socket_set_keepalive(conn->conn, TRUE);
int fd = g_socket_get_fd(conn->conn);
Expand Down Expand Up @@ -337,6 +341,18 @@ void moloch_http_finish( MolochConn_t *conn, gboolean sync)

}
/******************************************************************************/
char *moloch_http_get_name(MolochHttp_t *server)
{
char *name;
if (server->names[server->namesPos]) {
name = server->names[server->namesPos];
server->namesPos++;
return name;
}
server->namesPos = 0;
return server->names[0];
}
/******************************************************************************/
gboolean moloch_http_process_send(MolochConn_t *conn, gboolean sync)
{
char buffer[3000];
Expand All @@ -345,7 +361,7 @@ gboolean moloch_http_process_send(MolochConn_t *conn, gboolean sync)
MolochRequest_t *request = conn->request;

if (conn->conn == 0) {
if (moloch_http_connect(conn, conn->server->name, conn->server->port, TRUE)) {
if (moloch_http_connect(conn, moloch_http_get_name(conn->server), conn->server->port, TRUE)) {
LOG("%p: Couldn't connect from process", (void*)conn);
return FALSE;
}
Expand Down Expand Up @@ -544,40 +560,49 @@ unsigned char *moloch_http_get(void *server, char *key, int key_len, size_t *mle
MolochConn_t *
moloch_http_create(MolochHttp_t *server, int blocking) {
MolochConn_t *conn;
int tries = server->namesCnt;

conn = MOLOCH_TYPE_ALLOC0(MolochConn_t);
conn->parser.data = conn;
conn->server = server;

if (moloch_http_connect(conn, server->name, server->port, blocking)) {
LOG("Couldn't connect to '%s'", server->name);
exit (1);
while (tries > 0) {
if (!moloch_http_connect(conn, moloch_http_get_name(server), server->port, blocking)) {
return conn;
}
tries--;
LOG("Couldn't connect to '%s'", conn->name);
}
return conn;
exit (1);
}
/******************************************************************************/
void *moloch_http_create_server(char *hostname, int defaultPort, int maxConns, int maxOutstandingRequests, int compress)
void *moloch_http_create_server(char *hostnames, int defaultPort, int maxConns, int maxOutstandingRequests, int compress)
{
MolochHttp_t *server = MOLOCH_TYPE_ALLOC0(MolochHttp_t);

DLL_INIT(r_, &server->requestQ[0]);
DLL_INIT(r_, &server->requestQ[1]);
DLL_INIT(e_, &server->connQ);
if (strncmp(hostname, "http://", 7) == 0) {
server->name = strdup(hostname+7);
} else if (strncmp(hostname, "https://", 8) == 0) {
LOG("https not supported yet %s", hostname);
exit(0);
} else {
server->name = strdup(hostname);

server->names = g_strsplit(hostnames, ",", 0);
uint32_t i;
for (i = 0; server->names[i]; i++) {
if (strncmp(server->names[i], "http://", 7) == 0) {
char *tmp = g_strdup(server->names[i] + 7);
g_free(server->names[i]);
server->names[i] = tmp;
} else if (strncmp(server->names[i], "https://", 8) == 0) {
LOG("https not supported yet %s", server->names[i]);
exit(0);
}
}
server->namesCnt = i;
server->port = defaultPort;
server->maxConns = maxConns;
server->maxOutstandingRequests = maxOutstandingRequests;
server->compress = compress;

server->syncConn = moloch_http_create(server, TRUE);
uint32_t i;
for (i = 0; i < server->maxConns; i++) {
MolochConn_t *conn = moloch_http_create(server, FALSE);
DLL_PUSH_TAIL(e_, &server->connQ, conn);
Expand Down Expand Up @@ -620,7 +645,7 @@ void moloch_http_free_server(void *serverV)

MOLOCH_TYPE_FREE(MolochConn_t, server->syncConn);
server->syncConn = 0;
free(server->name);
g_strfreev(server->names);

MOLOCH_TYPE_FREE(MolochHttp_t, server);
}
Expand Down
6 changes: 3 additions & 3 deletions config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# 3rd) The section titled "default" is used last.

[default]
# The elasticsearch host:port. If not using a elasticsearch VIP, a different
# elasticsearch node in the cluster can be specified for each Moloch node to
# help spread load on high volume clusters
# Comma seperated list of elasticsearch host:port combinations. If not using a
# elasticsearch VIP, a different elasticsearch node in the cluster can be specified
# for each Moloch node to help spread load on high volume clusters
elasticsearch=CHANGEME_ESHOSTNAME:9200

# How often to create a new elasticsearch index. hourly,daily,weekly,monthly
Expand Down
6 changes: 3 additions & 3 deletions single-host/etc/config.ini.template
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# 3rd) The section titled "default" is used last.

[default]
# The elasticsearch host:port. If not using a elasticsearch VIP, a different
# elasticsearch node in the cluster can be specified for each Moloch node to
# help spread load on high volume clusters
# Comma seperated list of elasticsearch host:port combinations. If not using a
# elasticsearch VIP, a different elasticsearch node in the cluster can be specified
# for each Moloch node to help spread load on high volume clusters
elasticsearch=localhost:9200

# How often to create a new elasticsearch index. hourly,daily,weekly,monthly
Expand Down
8 changes: 4 additions & 4 deletions viewer/viewer.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var app = express();
//// Config
//////////////////////////////////////////////////////////////////////////////////
var internals = {
elasticBase: Config.get("elasticsearch", "http://localhost:9200"),
elasticBase: Config.get("elasticsearch", "http://localhost:9200").split(","),
httpAgent: new KAA({maxSockets: 40}),
httpsAgent: new KAA.Secure({maxSockets: 40}),
previousNodeStats: [],
Expand All @@ -77,8 +77,8 @@ var internals = {
PNG_LINE_WIDTH: 256,
};

if (internals.elasticBase.lastIndexOf('http', 0) !== 0) {
internals.elasticBase = "http://" + internals.elasticBase;
if (internals.elasticBase[0].lastIndexOf('http', 0) !== 0) {
internals.elasticBase[0] = "http://" + internals.elasticBase[0];
}

function userCleanup(suser) {
Expand Down Expand Up @@ -113,7 +113,7 @@ app.configure(function() {
app.locals.molochversion = molochversion.version;
app.locals.isIndex = false;
app.locals.basePath = Config.basePath();
app.locals.elasticBase = internals.elasticBase;
app.locals.elasticBase = internals.elasticBase[0];
app.locals.allowUploads = Config.get("uploadCommand") !== undefined;
app.locals.sendSession = Config.getObj("sendSession");

Expand Down

0 comments on commit f71f769

Please sign in to comment.