forked from SigmaHQ/sigma
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest-backend-es-qs.py
executable file
·140 lines (118 loc) · 4.27 KB
/
test-backend-es-qs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python3
# CI Test script: generate all queries with es-qs backend and test them against local ES instance.
# Copyright 2018 Thomas Patzke
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import sys
import pprint
import elasticsearch
import elasticsearch_async
pp = pprint.PrettyPrinter()
# Configuration
eshost = "localhost:9200"
index = "test"
sigmac_cmd = "tools/sigmac"
sigmac_processing_prefix = "* Processing Sigma input "
es = elasticsearch.Elasticsearch(hosts=[eshost])
esa = elasticsearch_async.AsyncElasticsearch(hosts=[eshost])
# Create empty test index
try:
es.indices.create(index)
except elasticsearch.exceptions.RequestError as e:
if e.error != 'resource_already_exists_exception': # accept already existing index with same name
raise e
queries = asyncio.Queue()
# sigmac runner coroutinne
async def run_sigmac():
sigmac = asyncio.create_subprocess_exec(
sigmac_cmd, "-t", "es-qs", "-c", "winlogbeat", "-v", "-I", "-r", "rules/",
stdout=asyncio.subprocess.PIPE,
)
print("* Launching sigmac")
proc = await sigmac
print("* sigmac launched with PID {}".format(proc.pid))
cur_rule = None
while True:
line = await proc.stdout.readline()
if not line:
print("* sigmac finished")
await queries.put((None, None))
break
else:
strline = str(line, 'utf-8').rstrip()
if strline.startswith(sigmac_processing_prefix):
cur_rule = strline[len(sigmac_processing_prefix):]
else:
await queries.put((cur_rule, strline))
await proc.wait()
exitcode = proc.returncode
print("* sigmac returned with exit code {}".format(exitcode))
return exitcode
# Generated query checker loop
async def check_queries():
failed = list()
print("# Waiting for queries")
while True:
rule, query = await queries.get()
if query is not None:
print("# Checking query (rule {}): {}".format(rule, query))
result = await esa.indices.validate_query(index=index, q=query)
valid = result['valid']
print("# Received Result for rule {} query={}: {}".format(rule, query, valid))
if not valid:
try:
detail_result = await esa.search(index=index, q=query)
except Exception as e:
error = e.info
failed.append((rule, query, error))
queries.task_done()
else:
queries.task_done()
break
print("# Finished query checks")
return failed
task_check_query = asyncio.ensure_future(check_queries())
task_sigmac = asyncio.ensure_future(run_sigmac())
tasks = [
task_check_query,
task_sigmac
]
loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
esa.transport.close()
print()
# Check if sigmac runned successfully
try:
if task_sigmac.result() != 0: # sigmac failed
print("!!! sigmac failed while test!")
sys.exit(1)
except Exception:
print("!!! sigmac failed while test!")
sys.exit(2)
# Check if query checks failed
try:
query_check_result = task_check_query.result()
except Exception:
print("!!! Query check failed!")
sys.exit(3)
query_check_result_cnt = len(query_check_result)
if query_check_result_cnt > 0:
print("!!! {} queries failed to check:".format(query_check_result_cnt))
for rule, query, error in query_check_result:
print("- {}: {}".format(rule, query))
print("Error:")
pp.pprint(error)
print()
sys.exit(4)
else:
print("All query checks passed!")