Skip to content

Commit

Permalink
support for daily ES indices
Browse files Browse the repository at this point in the history
  • Loading branch information
rabbitstack committed Dec 16, 2017
1 parent a67a610 commit 488c679
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
13 changes: 12 additions & 1 deletion fibratus/output/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from fibratus.errors import InvalidPayloadError
from fibratus.output.base import Output

from datetime import datetime

class ElasticsearchOutput(Output):

Expand All @@ -36,6 +36,8 @@ def __init__(self, **kwargs):
hosts = kwargs.pop('hosts', [])
self._hosts = [dict(host=host.split(':')[0], port=int(host.split(':')[1])) for host in hosts]
self._index_name = kwargs.pop('index', None)
self._index_type = kwargs.pop('index_type', 'fixed')
self._daily_index_format = kwargs.pop('daily_index_format', '%Y.%m.%d')
self._document_type = kwargs.pop('document', None)
self._bulk = kwargs.pop('bulk', False)
self._username = kwargs.pop('username', None)
Expand All @@ -61,6 +63,11 @@ def emit(self, body, **kwargs):
% type(body))

self._index_name = kwargs.pop('index', self._index_name)

# build index name for daily index types
if 'daily' in self._index_type:
self._index_name = '%s-%s' % (self._index_name, datetime.now().strftime(self._daily_index_format))

if self._bulk:
actions = [dict(_index=self._index_name, _type=self._document_type, _source=b) for b in body]
elasticsearch.helpers.bulk(self._elasticsearch, actions)
Expand All @@ -75,6 +82,10 @@ def hosts(self):
def index_name(self):
return self._index_name

@property
def index_type(self):
return self._index_type

@property
def document_type(self):
return self._document_type
Expand Down
36 changes: 35 additions & 1 deletion tests/unit/output/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from unittest.mock import patch
from datetime import datetime

import time
import elasticsearch
import pytest
from unittest.mock import Mock

from fibratus.errors import InvalidPayloadError
from fibratus.output.elasticsearch import ElasticsearchOutput
Expand All @@ -29,7 +32,7 @@ def elasticsearch_adapter():
'rabbitstack:9200'
],
'index': 'kernelstream',
'document': 'threads'
'document': 'threads',
}
return ElasticsearchOutput(**config)

Expand All @@ -50,6 +53,24 @@ def elasticsearch_bulk_adapter():
return ElasticsearchOutput(**config)


@pytest.fixture(scope='module')
def elasticsearch_adapter_daily_index():
config = {
'hosts': [
'localhost:9200',
'rabbitstack:9200'
],
'index': 'kernelstream',
'document': 'threads',
'index_type': 'daily'
}
return ElasticsearchOutput(**config)


mock_time = Mock()
mock_time.return_value = time.mktime(datetime(2017, 12, 16).timetuple())


class TestElasticsearchOutput(object):

def test_init(self, elasticsearch_adapter):
Expand All @@ -69,6 +90,17 @@ def test_emit(self, elasticsearch_adapter):
{'host': 'rabbitstack', 'port': 9200}], use_ssl=False)
elasticsearch_adapter._elasticsearch.index.assert_called_with('kernelstream', 'threads', body=body)

@patch('time.time', mock_time)
def test_emit_daily_index(self, elasticsearch_adapter_daily_index):
body = {'kevent_type': 'CreateProcess', 'params': {'name': 'smss.exe'}}
assert elasticsearch_adapter_daily_index._elasticsearch is None
assert elasticsearch_adapter_daily_index.index_type == 'daily'
with patch('elasticsearch.Elasticsearch', spec_set=elasticsearch.Elasticsearch) as es_client_mock:
elasticsearch_adapter_daily_index.emit(body)
es_client_mock.assert_called_with([{'host': 'localhost', 'port': 9200},
{'host': 'rabbitstack', 'port': 9200}], use_ssl=False)
elasticsearch_adapter_daily_index._elasticsearch.index.assert_called_with('kernelstream-2017.12.16', 'threads', body=body)

@patch('elasticsearch.Elasticsearch', spec_set=elasticsearch.Elasticsearch)
def test_emit_invalid_payload(self, es_client_mock, elasticsearch_adapter):
body = ['CreateProcess', 'TerminateProcess']
Expand Down Expand Up @@ -101,3 +133,5 @@ def test_emit_bulk_invalid_payload(self, es_bulk_mock, es_client_mock, elasticse
elasticsearch_bulk_adapter.emit(body)
assert "invalid payload for bulk indexing. list expected but <class 'tuple'> found" == str(e.value)
assert es_bulk_mock.assert_not_called()


0 comments on commit 488c679

Please sign in to comment.