Skip to content

Commit

Permalink
pcap writers, S3 writer, http rework
Browse files Browse the repository at this point in the history
  - NOTICE: db.pl upgrade is required
  - http connection handling rework, hopefully better
  - moloch-capture will no longer monitor itself even if it uses the same interface to talk to ES
  - now save packet lengths to ES
  - writing pcap to disk is now pluggable
  - S3 is the first pcap writer plugin, currently experimental
    https://github.com/aol/moloch/wiki/S3
  - multies supports file query
  - maxFileSizeG can now be a float
  • Loading branch information
awick committed Feb 27, 2015
1 parent a9bd231 commit 3a4cd13
Show file tree
Hide file tree
Showing 90 changed files with 3,651 additions and 1,018 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
0.11.4 2015/02/26
- NOTICE: db.pl upgrade is required
- http connection handling rework, hopefully better
- moloch-capture will no longer monitor itself even if it uses the same interface to talk to ES
- now save packet lengths to ES
- writing pcap to disk is now pluggable
- S3 is the first pcap writer plugin, currently experimental
https://github.com/aol/moloch/wiki/S3
- multies supports file query
- maxFileSizeG can now be a float


0.11.3 2015/02/26
- NOTICE: Only 1.[234].x are supported by this version.
Restart viewer AFTER upgrading ES versions
Expand Down
2 changes: 1 addition & 1 deletion capture/Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ LIB_OTHER = @NIDS_LIBS@ \
thirdparty/patricia.o \
@DL_LIB@ -lpthread

C_FILES = main.c db.c nids.c yara.c http.c config.c parsers.c plugins.c field.c trie.c
C_FILES = main.c db.c nids.c yara.c http.c config.c parsers.c plugins.c field.c trie.c writers.c writer-inplace.c writer-disk.c writer-null.c
O_FILES = $(C_FILES:.c=.o)

INSTALL = @INSTALL@
Expand Down
88 changes: 31 additions & 57 deletions capture/config.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/******************************************************************************/
/* config.c -- Functions dealing with the config file
*
* Copyright 2012-2014 AOL Inc. All rights reserved.
* Copyright 2012-2015 AOL Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this Software except in compliance with the License.
Expand Down Expand Up @@ -140,6 +140,30 @@ uint32_t moloch_config_int(GKeyFile *keyfile, char *key, uint32_t d, uint32_t mi
return value;
}

/******************************************************************************/
double moloch_config_double(GKeyFile *keyfile, char *key, double d, double min, double max)
{
double value = d;

if (!keyfile)
keyfile = molochKeyFile;

if (g_key_file_has_key(keyfile, config.nodeName, key, NULL)) {
value = g_key_file_get_double(keyfile, config.nodeName, key, NULL);
} else if (config.nodeClass && g_key_file_has_key(keyfile, config.nodeClass, key, NULL)) {
value = g_key_file_get_double(keyfile, config.nodeClass, key, NULL);
} else if (g_key_file_has_key(keyfile, "default", key, NULL)) {
value = g_key_file_get_double(keyfile, "default", key, NULL);
}

if (value < min)
value = min;
if (value > max)
value = max;

return value;
}

/******************************************************************************/
char moloch_config_boolean(GKeyFile *keyfile, char *key, char d)
{
Expand Down Expand Up @@ -245,7 +269,7 @@ void moloch_config_load()
if (num > 0xffff)
num = 0xffff;
}
moloch_string_add((MolochStringHash_t *)(char*)&config.dontSaveTags, tags[i], num, TRUE);
moloch_string_add((MolochStringHash_t *)(char*)&config.dontSaveTags, tags[i], (gpointer)(long)num, TRUE);
}
g_strfreev(tags);
}
Expand Down Expand Up @@ -273,32 +297,6 @@ void moloch_config_load()
g_regex_unref(regex);
}

char *writeMethod = moloch_config_str(keyfile, "pcapWriteMethod", "normal");

if (strcmp(writeMethod, "normal") == 0)
config.writeMethod = MOLOCH_WRITE_NORMAL;
else if (strcmp(writeMethod, "direct") == 0)
config.writeMethod = MOLOCH_WRITE_DIRECT;
else if (strcmp(writeMethod, "thread") == 0)
config.writeMethod = MOLOCH_WRITE_THREAD | MOLOCH_WRITE_NORMAL;
else if (strcmp(writeMethod, "thread-direct") == 0)
config.writeMethod = MOLOCH_WRITE_THREAD | MOLOCH_WRITE_DIRECT;
/*else if (strcmp(writeMethod, "mmap") == 0)
config.writeMethod = MOLOCH_WRITE_MMAP;*/
else {
printf("Unknown pcapWriteMethod '%s'\n", writeMethod);
exit(1);
}
g_free(writeMethod);

#ifndef O_DIRECT
if (config.writeMethod & MOLOCH_WRITE_DIRECT) {
printf("OS doesn't support direct write method\n");
exit(1);
}
#endif


config.plugins = moloch_config_str_list(keyfile, "plugins", NULL);
config.smtpIpHeaders = moloch_config_str_list(keyfile, "smtpIpHeaders", NULL);

Expand Down Expand Up @@ -345,7 +343,7 @@ void moloch_config_load()
exit(1);
}

config.maxFileSizeG = moloch_config_int(keyfile, "maxFileSizeG", 4, 1, 1024);
config.maxFileSizeG = moloch_config_double(keyfile, "maxFileSizeG", 4, 0.01, 1024);
config.maxFileSizeB = config.maxFileSizeG*1024LL*1024LL*1024LL;
config.maxFileTimeM = moloch_config_int(keyfile, "maxFileTimeM", 0, 0, 0xffff);
config.icmpTimeout = moloch_config_int(keyfile, "icmpTimeout", 10, 1, 0xffff);
Expand All @@ -362,7 +360,7 @@ void moloch_config_load()
config.logEveryXPackets = moloch_config_int(keyfile, "logEveryXPackets", 50000, 1000, 1000000);
config.packetsPerPoll = moloch_config_int(keyfile, "packetsPerPoll", 50000, 1000, 1000000);
config.pcapBufferSize = moloch_config_int(keyfile, "pcapBufferSize", 300000000, 100000, 0xffffffff);
config.pcapWriteSize = moloch_config_int(keyfile, "pcapWriteSize", 0x40000, 0x40000, 0x400000);
config.pcapWriteSize = moloch_config_int(keyfile, "pcapWriteSize", 0x40000, 0x40000, 0x800000);
config.maxFreeOutputBuffers = moloch_config_int(keyfile, "maxFreeOutputBuffers", 50, 0, 0xffff);


Expand Down Expand Up @@ -431,7 +429,7 @@ void moloch_config_add_header(MolochStringHashStd_t *hash, char *key, int pos)
hstring = MOLOCH_TYPE_ALLOC0(MolochString_t);
hstring->str = key;
hstring->len = strlen(key);
hstring->uw = pos;
hstring->uw = (gpointer)(long)pos;
HASH_ADD(s_, *hash, hstring->str, hstring);
}
/******************************************************************************/
Expand Down Expand Up @@ -602,7 +600,8 @@ void moloch_config_init()
g_free(str);
}

LOG("maxFileSizeG: %u", config.maxFileSizeG);
LOG("maxFileSizeG: %lf", config.maxFileSizeG);
LOG("maxFileSizeB: %ld", config.maxFileSizeB);
LOG("maxFileTimeM: %u", config.maxFileTimeM);
LOG("icmpTimeout: %u", config.icmpTimeout);
LOG("udpTimeout: %u", config.udpTimeout);
Expand Down Expand Up @@ -630,22 +629,6 @@ void moloch_config_init()
LOG("compressES: %s", (config.compressES?"true":"false"));

LOG("rotateIndex = %s", rotates[config.rotate]);
switch (config.writeMethod) {
case MOLOCH_WRITE_NORMAL:
LOG("pcapWriteMethod = normal");
break;
case MOLOCH_WRITE_DIRECT:
LOG("pcapWriteMethod = direct");
break;
case MOLOCH_WRITE_THREAD | MOLOCH_WRITE_NORMAL:
LOG("pcapWriteMethod = thread");
break;
case MOLOCH_WRITE_THREAD | MOLOCH_WRITE_DIRECT:
LOG("pcapWriteMethod = thread-direct");
break;
default:
LOG("pcapWriteMethod = config.c needs to be updated");
}
LOG("offlineFilenameRegex: %s", g_regex_get_pattern(config.offlineRegex));

MolochString_t *tstring;
Expand All @@ -658,15 +641,6 @@ void moloch_config_init()
}
}

if ((config.writeMethod & MOLOCH_WRITE_DIRECT) && sizeof(off_t) == 4 && config.maxFileSizeG > 2)
printf("WARNING - DIRECT mode on 32bit machines may not work with maxFileSizeG > 2");

config.pagesize = getpagesize();
if (config.writeMethod & MOLOCH_WRITE_DIRECT && (config.pcapWriteSize % config.pagesize != 0)) {
printf("When using pcapWriteMethod of direct pcapWriteSize must be a multiple of %d", config.pagesize);
exit (1);
}

if (!config.interface && !config.pcapReadOffline) {
printf("Need to set interface, pcap file (-r) or pcap directory (-R) \n");
exit (1);
Expand Down
54 changes: 38 additions & 16 deletions capture/db.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/******************************************************************************/
/* db.c -- Functions dealing with database queries and updates
*
* Copyright 2012-2014 AOL Inc. All rights reserved.
* Copyright 2012-2015 AOL Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this Software except in compliance with the License.
Expand All @@ -24,13 +24,13 @@
#include <errno.h>
#include <sys/resource.h>
#include <sys/statvfs.h>
#include "glib.h"
#include "moloch.h"
#include "bsb.h"
#include "glib.h"
#include "patricia.h"
#include "GeoIP.h"

#define MOLOCH_MIN_DB_VERSION 22
#define MOLOCH_MIN_DB_VERSION 23

extern uint64_t totalPackets;
extern uint64_t totalBytes;
Expand Down Expand Up @@ -226,7 +226,7 @@ void moloch_db_save_session(MolochSession_t *session, int final)
moloch_plugins_cb_save(session, final);

/* jsonSize is an estimate of how much space it will take to encode the session */
jsonSize = 1100 + session->filePosArray->len*12 + 10*session->fileNumArray->len;
jsonSize = 1100 + session->filePosArray->len*12 + 10*session->fileNumArray->len + 10*session->fileLenArray->len;
for (pos = 0; pos < session->maxFields; pos++) {
if (session->fields[pos]) {
jsonSize += session->fields[pos]->jsonSize;
Expand Down Expand Up @@ -447,6 +447,14 @@ void moloch_db_save_session(MolochSession_t *session, int final)
}
BSB_EXPORT_cstr(jbsb, "],");

BSB_EXPORT_cstr(jbsb, "\"psl\":[");
for(i = 0; i < session->fileLenArray->len; i++) {
if (i != 0)
BSB_EXPORT_u08(jbsb, ',');
BSB_EXPORT_sprintf(jbsb, "%u", (uint16_t)g_array_index(session->fileLenArray, uint16_t, i));
}
BSB_EXPORT_cstr(jbsb, "],");

BSB_EXPORT_cstr(jbsb, "\"fs\":[");
for(i = 0; i < session->fileNumArray->len; i++) {
if (i == 0)
Expand Down Expand Up @@ -972,7 +980,7 @@ void moloch_db_update_stats()
usage.ru_maxrss * 1024UL,
#endif
diffusage*10000/diffms,
moloch_nids_disk_queue(),
moloch_writer_queue_length?moloch_writer_queue_length():0,
dbTotalPackets,
dbTotalK,
dbTotalSessions,
Expand Down Expand Up @@ -1062,7 +1070,7 @@ void moloch_db_update_dstats(int n)
usage.ru_maxrss * 1024UL,
#endif
diffusage*10000/diffms,
moloch_nids_disk_queue(),
moloch_writer_queue_length?moloch_writer_queue_length():0,
(totalPackets - lastPackets[n]),
(totalBytes - lastBytes[n]),
(totalSessions - lastSessions[n]),
Expand Down Expand Up @@ -1163,7 +1171,7 @@ uint32_t moloch_db_get_sequence_number_sync(char *name)

key_len = snprintf(key, sizeof(key), "/%ssequence/sequence/%s", config.prefix, name);

data = moloch_http_send_sync(esServer, "POST", key, key_len, "{}", 2, &data_len);
data = moloch_http_send_sync(esServer, "POST", key, key_len, "{}", 2, NULL, &data_len);
version = moloch_js0n_get(data, data_len, "_version", &version_len);

if (!version_len || !version) {
Expand Down Expand Up @@ -1236,7 +1244,7 @@ void moloch_db_load_file_num()

/* Now create the new style */
key_len = snprintf(key, sizeof(key), "/%ssequence/sequence/fn-%s?version_type=external&version=%d", config.prefix, config.nodeName, fileNum + 100);
moloch_http_send_sync(esServer, "POST", key, key_len, "{}", 2, NULL);
moloch_http_send_sync(esServer, "POST", key, key_len, "{}", 2, NULL, NULL);

fetch_file_num:
if (!config.pcapReadOffline) {
Expand All @@ -1246,7 +1254,7 @@ void moloch_db_load_file_num()
}
}
/******************************************************************************/
char *moloch_db_create_file(time_t firstPacket, char *name, uint64_t size, uint32_t *id)
char *moloch_db_create_file(time_t firstPacket, char *name, uint64_t size, int locked, uint32_t *id)
{
char key[100];
int key_len;
Expand All @@ -1257,6 +1265,7 @@ char *moloch_db_create_file(time_t firstPacket, char *name, uint64_t size, uint3
int json_len;
const uint64_t fp = firstPacket;


snprintf(key, sizeof(key), "fn-%s", config.nodeName);
if (nextFileNum == 0) {
/* If doing an offline file OR the last async call hasn't returned, just get a sync filenum */
Expand All @@ -1270,7 +1279,20 @@ char *moloch_db_create_file(time_t firstPacket, char *name, uint64_t size, uint3


if (name) {
json_len = snprintf(json, MOLOCH_HTTP_BUFFER_SIZE, "{\"num\":%d, \"name\":\"%s\", \"first\":%" PRIu64 ", \"node\":\"%s\", \"filesize\":%" PRIu64 ", \"locked\":1}", num, name, fp, config.nodeName, size);
static GRegex *numRegex;
static GRegex *numHexRegex;
if (!numRegex) {
numRegex = g_regex_new("#NUM#", 0, 0, 0);
numHexRegex = g_regex_new("#NUMHEX#", 0, 0, 0);
}
char numstr[100];
snprintf(numstr, sizeof(numstr), "%d", num);

char *name1 = g_regex_replace_literal(numRegex, name, -1, 0, numstr, 0, NULL);
name = g_regex_replace_literal(numHexRegex, name1, -1, 0, (char *)moloch_char_to_hexstr[num%256], 0, NULL);
g_free(name1);

json_len = snprintf(json, MOLOCH_HTTP_BUFFER_SIZE, "{\"num\":%d, \"name\":\"%s\", \"first\":%" PRIu64 ", \"node\":\"%s\", \"filesize\":%" PRIu64 ", \"locked\":%d}", num, name, fp, config.nodeName, size, locked);
key_len = snprintf(key, sizeof(key), "/%sfiles/file/%s-%d?refresh=true", config.prefix, config.nodeName,num);
} else {
tmp = localtime(&firstPacket);
Expand All @@ -1283,7 +1305,7 @@ char *moloch_db_create_file(time_t firstPacket, char *name, uint64_t size, uint3
strcat(filename, "/");
snprintf(filename+strlen(filename), sizeof(filename) - strlen(filename), "%s-%02d%02d%02d-%08d.pcap", config.nodeName, tmp->tm_year%100, tmp->tm_mon+1, tmp->tm_mday, num);

json_len = snprintf(json, MOLOCH_HTTP_BUFFER_SIZE, "{\"num\":%d, \"name\":\"%s\", \"first\":%" PRIu64 ", \"node\":\"%s\", \"locked\":0}", num, filename, fp, config.nodeName);
json_len = snprintf(json, MOLOCH_HTTP_BUFFER_SIZE, "{\"num\":%d, \"name\":\"%s\", \"first\":%" PRIu64 ", \"node\":\"%s\", \"locked\":%d}", num, filename, fp, config.nodeName, locked);
key_len = snprintf(key, sizeof(key), "/%sfiles/file/%s-%d?refresh=true", config.prefix, config.nodeName,num);
}

Expand All @@ -1297,7 +1319,7 @@ char *moloch_db_create_file(time_t firstPacket, char *name, uint64_t size, uint3
if (name)
return name;

return filename;
return g_strdup(filename);
}
/******************************************************************************/
void moloch_db_check()
Expand Down Expand Up @@ -1444,7 +1466,7 @@ void moloch_db_free_tag_request(MolochTagRequest_t *r)
}

key_len = snprintf(key, sizeof(key), "/%stags/tag/%s?fields=n", config.prefix, r->escaped);
moloch_http_send(esServer, "GET", key, key_len, NULL, 0, FALSE, moloch_db_tag_cb, r);
moloch_http_send(esServer, "GET", key, key_len, NULL, 0, NULL, FALSE, moloch_db_tag_cb, r);
outstandingTagRequests++;
break;
}
Expand All @@ -1459,7 +1481,7 @@ void moloch_db_tag_create_cb(unsigned char *data, int UNUSED(data_len), gpointer

if (strstr((char *)data, "{\"error\":") != 0) {
key_len = snprintf(key, sizeof(key), "/%stags/tag/%s?fields=n", config.prefix, r->escaped);
moloch_http_send(esServer, "GET", key, key_len, NULL, 0, FALSE, moloch_db_tag_cb, r);
moloch_http_send(esServer, "GET", key, key_len, NULL, 0, NULL, FALSE, moloch_db_tag_cb, r);
return;
}

Expand Down Expand Up @@ -1572,7 +1594,7 @@ void moloch_db_get_tag(void *uw, int tagtype, const char *tagname, MolochTag_cb
int key_len;

key_len = snprintf(key, sizeof(key), "/%stags/tag/%s?fields=n", config.prefix, r->escaped);
moloch_http_send(esServer, "GET", key, key_len, NULL, 0, FALSE, moloch_db_tag_cb, r);
moloch_http_send(esServer, "GET", key, key_len, NULL, 0, NULL, FALSE, moloch_db_tag_cb, r);
outstandingTagRequests++;
} else {
DLL_PUSH_TAIL(t_, &tagRequests, r);
Expand Down Expand Up @@ -1716,7 +1738,7 @@ void moloch_db_add_field(char *group, char *kind, char *expression, char *friend
}

BSB_EXPORT_u08(bsb, '}');
moloch_http_send(esServer, "POST", key, key_len, json, BSB_LENGTH(bsb), FALSE, NULL, NULL);
moloch_http_send(esServer, "POST", key, key_len, json, BSB_LENGTH(bsb), NULL, FALSE, NULL, NULL);
}
/******************************************************************************/
gboolean moloch_db_file_exists(char *filename)
Expand Down
10 changes: 7 additions & 3 deletions capture/hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ typedef int (* HASH_CMP_FUNC)(const void *key, const void *element);
} \
} while (0)

#define HASH_HASH(varname, key) (varname).hash(key)

#define HASH_ADD(name, varname, key, element) \

#define HASH_ADD_HASH(name, varname, h, key, element) \
do { \
const uint32_t _hh = element->name##hash = (varname).hash(key); \
const uint32_t _hh = element->name##hash = h; \
const int _b = element->name##bucket = element->name##hash % (varname).size; \
const void *_end = (void*)&((varname).buckets[_b]); \
for (element->name##next = (varname).buckets[_b].name##next; element->name##next != _end; element->name##next = element->name##next->name##next) { \
Expand All @@ -64,6 +66,8 @@ typedef int (* HASH_CMP_FUNC)(const void *key, const void *element);
(varname).count++; \
} while(0)

#define HASH_ADD(name, varname, key, element) HASH_ADD_HASH(name, varname, HASH_HASH(varname, key), key, element)

#define HASH_REMOVE(name, varname, element) \
do { \
DLL_REMOVE(name, &((varname).buckets[element->name##bucket]), element); \
Expand All @@ -84,7 +88,7 @@ typedef int (* HASH_CMP_FUNC)(const void *key, const void *element);

#define HASH_FIND_INT(name, varname, key, element) HASH_FIND_HASH(name, varname, (uint32_t)key, (void*)(long)key, element)

#define HASH_FIND(name, varname, key, element) HASH_FIND_HASH(name, varname, (varname).hash(key), key, element)
#define HASH_FIND(name, varname, key, element) HASH_FIND_HASH(name, varname, HASH_HASH(varname, key), key, element)

#define HASH_COUNT(name, varname) ((varname).count)

Expand Down
Loading

0 comments on commit 3a4cd13

Please sign in to comment.