Skip to content

Commit

Permalink
WISE - With Intelligence See Everything
Browse files Browse the repository at this point in the history
  • Loading branch information
awick committed Nov 20, 2014
1 parent 8c6ddf1 commit cd3acd9
Show file tree
Hide file tree
Showing 29 changed files with 2,323 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
- Moved HTTP URI parsing from message complete to headers complete
- Better Socks4 support
- Updated easybutton versions of glib, es, node, geoip
- New data feed framework, WISE - https://github.com/aol/moloch/wiki/WISE
- http LOG message has total time now


0.11.2 2014/10/16
Expand Down
4 changes: 2 additions & 2 deletions capture/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ void moloch_db_save_session(MolochSession_t *session, int final)

/* jsonSize is an estimate of how much space it will take to encode the session */
jsonSize = 1100 + session->filePosArray->len*12 + 10*session->fileNumArray->len;
for (pos = 0; pos < config.maxField; pos++) {
for (pos = 0; pos < session->maxFields; pos++) {
if (session->fields[pos]) {
jsonSize += session->fields[pos]->jsonSize;
}
Expand Down Expand Up @@ -456,7 +456,7 @@ void moloch_db_save_session(MolochSession_t *session, int final)
BSB_EXPORT_cstr(jbsb, "],");

int inGroupNum = 0;
for (pos = 0; pos < config.maxField; pos++) {
for (pos = 0; pos < session->maxFields; pos++) {
const int flags = config.fields[pos]->flags;
if (!session->fields[pos] || flags & MOLOCH_FIELD_FLAG_DISABLED)
continue;
Expand Down
2 changes: 1 addition & 1 deletion capture/field.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ void moloch_field_free(MolochSession_t *session)
MolochCertsInfo_t *hci;
MolochCertsInfoHashStd_t *cihash;

for (pos = 0; pos < config.maxField; pos++) {
for (pos = 0; pos < session->maxFields; pos++) {
if (!(field = session->fields[pos]))
continue;

Expand Down
50 changes: 30 additions & 20 deletions capture/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ typedef struct molochconn_t {
char line[1000];
struct timeval startTime;
struct timeval sendTime;
struct timeval sentTime;
struct timeval endTime;
char hp_data[10000000];
uint32_t sent;
Expand Down Expand Up @@ -136,20 +137,12 @@ gboolean moloch_http_write_cb(gint UNUSED(fd), GIOCondition UNUSED(cond), gpoint
MolochConn_t *conn = data;
GError *gerror = 0;

/*struct timeval startTime;
struct timeval endTime;
gettimeofday(&startTime, 0); */
if (!conn->request)
return FALSE;

int sent = g_socket_send(conn->conn, conn->request->data+conn->sent, conn->request->data_len-conn->sent, NULL, &gerror);
conn->sent += sent;

/*gettimeofday(&endTime, 0);
LOG("%s WRITE %d %d %ldms", conn->line, sent, conn->sent,
(endTime.tv_sec - startTime.tv_sec)*1000 + (endTime.tv_usec/1000 - startTime.tv_usec/1000));*/


if (gerror) {
/* Should free stuff here */
LOG("ERROR: %p: Receive Error: %s", (void*)conn, gerror->message);
Expand All @@ -175,12 +168,26 @@ gboolean moloch_http_read_cb(gint UNUSED(fd), GIOCondition cond, gpointer data)

if (gerror || cond & (G_IO_HUP | G_IO_ERR) || len <= 0) {
if (gerror) {
LOG("ERROR: %p: Receive Error: %s", (void*)conn, gerror->message);
LOG("ERROR: %p:%p Receive Error: %s", (void*)conn, conn->request, gerror->message);
g_error_free(gerror);
} else if (cond & (G_IO_HUP | G_IO_ERR))
LOG("ERROR: %p: Lost connection to %s", (void*)conn, conn->name);
LOG("ERROR: %p:%p Lost connection to %s", (void*)conn, conn->request, conn->name);
else if (len <= 0)
LOG("ERROR: %p: len: %d cond: %x", (void*)conn, len, cond);
LOG("ERROR: %p:%p len: %d cond: %x", (void*)conn, conn->request, len, cond);
else
LOG("ERROR HMM: %p:%p len: %d cond: %x", (void*)conn, conn->request, len, cond);

if (conn->request) {
// Must save, free, then call function because of recursive sync GETs
MolochResponse_cb func = conn->request->func;
gpointer uw = conn->request->uw;

MOLOCH_TYPE_FREE(MolochRequest_t, conn->request);

if (func) {
func(0, 0, uw);
}
}

g_object_unref (conn->conn);
conn->conn = 0;
Expand All @@ -194,13 +201,23 @@ gboolean moloch_http_read_cb(gint UNUSED(fd), GIOCondition cond, gpointer data)
http_parser_execute(&conn->parser, &parserSettings, buffer, len);

if (conn->hp_complete) {
gettimeofday(&conn->endTime, NULL);
if (config.logESRequests)
LOG("%s %ldms %ldms %ldms",
conn->line,
(conn->sendTime.tv_sec - conn->startTime.tv_sec)*1000 + (conn->sendTime.tv_usec - conn->startTime.tv_usec)/1000,
(conn->sentTime.tv_sec - conn->startTime.tv_sec)*1000 + (conn->sentTime.tv_usec - conn->startTime.tv_usec)/1000,
(conn->endTime.tv_sec - conn->startTime.tv_sec)*1000 + (conn->endTime.tv_usec - conn->startTime.tv_usec)/1000
);

conn->hp_data[conn->hp_len] = 0;

/* Must save, free, then call function because of recursive sync GETs */
MolochResponse_cb func = conn->request->func;
gpointer uw = conn->request->uw;

MOLOCH_TYPE_FREE(MolochRequest_t, conn->request);
conn->request = 0;

if (func) {
func((unsigned char*)conn->hp_data, conn->hp_len, uw);
Expand Down Expand Up @@ -331,14 +348,7 @@ void moloch_http_finish( MolochConn_t *conn, gboolean sync)
break;
}

gettimeofday(&conn->endTime, NULL);
if (config.logESRequests)
LOG("%s %ldms %ldms",
line,
(conn->sendTime.tv_sec - conn->startTime.tv_sec)*1000 + (conn->sendTime.tv_usec/1000 - conn->startTime.tv_usec/1000),
(conn->endTime.tv_sec - conn->startTime.tv_sec)*1000 + (conn->endTime.tv_usec/1000 - conn->startTime.tv_usec/1000)
);

gettimeofday(&conn->sentTime, NULL);
}
/******************************************************************************/
char *moloch_http_get_name(MolochHttp_t *server)
Expand All @@ -362,7 +372,7 @@ gboolean moloch_http_process_send(MolochConn_t *conn, gboolean sync)

if (conn->conn == 0) {
if (moloch_http_connect(conn, moloch_http_get_name(conn->server), conn->server->port, TRUE)) {
LOG("%p: Couldn't connect from process", (void*)conn);
LOG("%p: Couldn't connect %s", (void*)conn, conn->name);
return FALSE;
}
}
Expand Down
4 changes: 3 additions & 1 deletion capture/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ void moloch_drop_privileges()
*/
gboolean moloch_quit_gfunc (gpointer UNUSED(user_data))
{
if (moloch_db_tags_loading() == 0) {
if (moloch_db_tags_loading() == 0 && moloch_plugins_outstanding() == 0) {
g_main_loop_quit(mainLoop);
return FALSE;
}
Expand All @@ -423,6 +423,8 @@ void moloch_quit()
gboolean moloch_nids_init_gfunc (gpointer UNUSED(user_data))
{
if (moloch_db_tags_loading() == 0 && moloch_http_queue_length(esServer) == 0) {
if (config.debug)
LOG("maxField = %d", config.maxField);
moloch_nids_init();
return FALSE;
}
Expand Down
19 changes: 15 additions & 4 deletions capture/moloch.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#define UNUSED(x) x __attribute((unused))


#define MOLOCH_API_VERSION 11
#define MOLOCH_API_VERSION 12

/******************************************************************************/
/*
Expand Down Expand Up @@ -334,6 +334,8 @@ typedef struct moloch_session {
GArray *fileNumArray;
char *rootId;

struct tcp_stream * tcpStream;

struct timeval firstPacket;
struct timeval lastPacket;
char firstBytes[2][8];
Expand Down Expand Up @@ -363,7 +365,7 @@ typedef struct moloch_session {
uint8_t parserLen;
uint8_t parserNum;

uint16_t haveNidsTcp:1;
uint8_t maxFields;
uint16_t needSave:1;
} MolochSession_t;

Expand Down Expand Up @@ -519,7 +521,8 @@ gboolean moloch_http_send(void *serverV, char *method, char *key, uint32_t key_l

gboolean moloch_http_set(void *server, char *key, int key_len, char *data, uint32_t data_len, MolochResponse_cb func, gpointer uw);
unsigned char *moloch_http_get(void *server, char *key, int key_len, size_t *mlen);
#define moloch_http_get_buffer(s) MOLOCH_SIZE_ALLOC(buffer, s)
#define moloch_http_get_buffer(size) MOLOCH_SIZE_ALLOC(buffer, size)
#define moloch_http_free_buffer(b) MOLOCH_SIZE_FREE(buffer, b)
void moloch_http_exit();
int moloch_http_queue_length(void *server);

Expand All @@ -543,9 +546,11 @@ uint32_t moloch_nids_monitoring_sessions();
uint32_t moloch_nids_disk_queue();
void moloch_nids_exit();

void moloch_nids_incr_outstanding(MolochSession_t *session);
#define moloch_nids_incr_outstanding(session) (session)->outstandingQueries++
void moloch_nids_decr_outstanding(MolochSession_t *session);

char *moloch_friendly_session_id (int protocol, uint32_t addr1, int port1, uint32_t addr2, int port2);

/******************************************************************************/
/*
* plugins.c
Expand All @@ -564,6 +569,7 @@ typedef void (* MolochPluginHttpFunc) (MolochSession_t *session, http_parser *hp

typedef void (* MolochPluginSMTPHeaderFunc) (MolochSession_t *session, const char *field, size_t field_len, const char *value, size_t value_len);
typedef void (* MolochPluginSMTPFunc) (MolochSession_t *session);
typedef uint32_t (* MolochPluginOutstandingFunc) ();

#define MOLOCH_PLUGIN_SAVE 0x00000001
#define MOLOCH_PLUGIN_IP 0x00000002
Expand Down Expand Up @@ -614,6 +620,11 @@ void moloch_plugins_set_smtp_cb(const char * name,
MolochPluginSMTPHeaderFunc on_header,
MolochPluginSMTPFunc on_header_complete);

void moloch_plugins_set_outstanding_cb(const char * name,
MolochPluginOutstandingFunc outstandingFunc);

uint32_t moloch_plugins_outstanding();

void moloch_plugins_cb_pre_save(MolochSession_t *session, int final);
void moloch_plugins_cb_save(MolochSession_t *session, int final);
void moloch_plugins_cb_new(MolochSession_t *session);
Expand Down
8 changes: 1 addition & 7 deletions capture/nids.c
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ void moloch_nids_cb_ip(struct ip *packet, int len)
session->addr2 = packet->ip_dst.s_addr;
session->ip_tos = packet->ip_tos;
session->fields = MOLOCH_SIZE_ALLOC0(fields, sizeof(MolochField_t *)*config.maxField);
session->maxFields = config.maxField;
if (config.numPlugins > 0)
session->pluginData = MOLOCH_SIZE_ALLOC0(pluginData, sizeof(void *)*config.numPlugins);

Expand Down Expand Up @@ -830,13 +831,6 @@ void moloch_nids_cb_ip(struct ip *packet, int len)
moloch_nids_mid_save_session(headSession);
}
}


/******************************************************************************/
void moloch_nids_incr_outstanding(MolochSession_t *session)
{
session->outstandingQueries++;
}
/******************************************************************************/
void moloch_nids_decr_outstanding(MolochSession_t *session)
{
Expand Down
27 changes: 27 additions & 0 deletions capture/plugins.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ typedef struct moloch_plugin {
MolochPluginNewFunc newFunc;
MolochPluginExitFunc exitFunc;
MolochPluginReloadFunc reloadFunc;
MolochPluginOutstandingFunc outstandingFunc;

MolochPluginHttpFunc on_message_begin;
MolochPluginHttpDataFunc on_url;
Expand Down Expand Up @@ -273,6 +274,20 @@ void moloch_plugins_set_smtp_cb(const char * name,
pluginsCbs |= MOLOCH_PLUGIN_SMTP_OHC;
}
/******************************************************************************/
void moloch_plugins_set_outstanding_cb(const char * name,
MolochPluginOutstandingFunc outstanding)
{
MolochPlugin_t *plugin;

HASH_FIND(p_, plugins, name, plugin);
if (!plugin) {
LOG("Can't find plugin with name %s", name);
return;
}

plugin->outstandingFunc = outstanding;
}
/******************************************************************************/
void moloch_plugins_cb_pre_save(MolochSession_t *session, int final)
{
MolochPlugin_t *plugin;
Expand Down Expand Up @@ -447,3 +462,15 @@ void moloch_plugins_reload()
plugin->reloadFunc();
);
}
/******************************************************************************/
uint32_t moloch_plugins_outstanding()
{
MolochPlugin_t *plugin;
uint32_t outstanding = 0;

HASH_FORALL(p_, plugins, plugin,
if (plugin->outstandingFunc)
outstanding += plugin->outstandingFunc();
);
return outstanding;
}
Loading

0 comments on commit cd3acd9

Please sign in to comment.