forked from SigmaHQ/sigma
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test-backend-netwitness.py
executable file
·77 lines (65 loc) · 2.35 KB
/
test-backend-netwitness.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python3
# CI Test script: generate all queries with netwitness backend.
# Copyright 2018 John Tuckner
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import functools
import sys
import pprint
pp = pprint.PrettyPrinter()
# Configuration
index = "test"
sigmac_cmd = "tools/sigmac"
sigmac_processing_prefix = "* Processing Sigma input "
queries = asyncio.Queue()
# sigmac runner coroutinne
async def run_sigmac():
sigmac = asyncio.create_subprocess_exec(
sigmac_cmd, "-t", "netwitness", "-v", "-I", "-r", "rules/",
stdout=asyncio.subprocess.PIPE,
)
print("* Launching sigmac")
proc = await sigmac
print("* sigmac launched with PID {}".format(proc.pid))
cur_rule = None
while True:
line = await proc.stdout.readline()
if not line:
print("* sigmac finished")
await queries.put((None, None))
break
else:
strline = str(line, 'utf-8').rstrip()
if strline.startswith(sigmac_processing_prefix):
cur_rule = strline[len(sigmac_processing_prefix):]
else:
await queries.put((cur_rule, strline))
await proc.wait()
exitcode = proc.returncode
print("* sigmac returned with exit code {}".format(exitcode))
return exitcode
task_sigmac = asyncio.ensure_future(run_sigmac())
tasks = [
task_sigmac
]
loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print()
# Check if sigmac runned successfully
try:
if task_sigmac.result() != 0: # sigmac failed
print("!!! sigmac failed while test!")
sys.exit(1)
except Exception as e:
print("!!! sigmac failed while test!")
sys.exit(2)