forked from SigmaHQ/sigma
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsigma.py
351 lines (297 loc) · 13.6 KB
/
sigma.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# Sigma parser
import yaml
import re
COND_NONE = 0
COND_AND = 1
COND_OR = 2
COND_NOT = 3
class SigmaParser:
def __init__(self, sigma):
self.definitions = dict()
self.parsedyaml = yaml.safe_load(sigma)
def parse_sigma(self):
try: # definition uniqueness check
for definitionName, definition in self.parsedyaml["detection"].items():
if definitionName in self.definitions:
raise SigmaParseError("Definition '%s' was already defined" % (definitionName))
self.definitions[definitionName] = definition
except KeyError:
raise SigmaParseError("No detection definitions found")
try: # tokenization
conditions = self.parsedyaml["detection"]["condition"]
self.condtoken = list() # list of tokenized conditions
if type(conditions) == str:
self.condtoken.append(SigmaConditionTokenizer(conditions))
elif type(conditions) == list:
for condition in conditions:
self.condtoken.append(SigmaConditionTokenizer(condition))
except KeyError:
raise SigmaParseError("No condition found")
self.condparsed = list() # list of parsed conditions
for tokens in self.condtoken:
self.condparsed.append(SigmaConditionParser(self, tokens))
def parse_definition_byname(self, definitionName, condOverride=None):
try:
definition = self.definitions[definitionName]
except KeyError as e:
raise SigmaParseError("Unknown definition '%s'" % (definitionName)) from e
return self.parse_definition(definition, condOverride)
def parse_definition(self, definition, condOverride=None):
if type(definition) not in (dict, list):
raise SigmaParseError("Expected map or list, got type %s: '%s'" % (type(definition), str(definition)))
if type(definition) == list: # list of values or maps
if condOverride: # condition given through rule detection condition, e.g. 1 of x
cond = condOverride()
else: # no condition given, use default from spec
cond = ConditionOR()
subcond = None
for value in definition:
if type(value) in (str, int):
cond.add(value)
elif type(value) in (dict, list):
cond.add(self.parse_definition(value))
else:
raise SigmaParseError("Definition list may only contain plain values or maps")
elif type(definition) == dict: # map
cond = ConditionAND()
for key, value in definition.items():
cond.add((key, value))
return cond
class SigmaConditionToken:
"""Token of a Sigma condition expression"""
TOKEN_AND = 1
TOKEN_OR = 2
TOKEN_NOT = 3
TOKEN_ID = 4
TOKEN_LPAR = 5
TOKEN_RPAR = 6
TOKEN_PIPE = 7
TOKEN_ONE = 8
TOKEN_ALL = 9
TOKEN_AGG = 10
TOKEN_EQ = 11
TOKEN_LT = 12
TOKEN_LTE = 13
TOKEN_GT = 14
TOKEN_GTE = 15
TOKEN_BY = 16
tokenstr = [
"INVALID",
"AND",
"OR",
"NOT",
"ID",
"LPAR",
"RPAR",
"PIPE",
"ONE",
"ALL",
"AGG",
"EQ",
"LT",
"LTE",
"GT",
"GTE",
"BY",
]
def __init__(self, tokendef, match, pos):
self.type = tokendef[0]
self.matched = match.group()
self.pos = pos
def __eq__(self, other):
if type(other) == int: # match against type
return self.type == other
else:
raise NotImplementedError("SigmaConditionToken can only be compared against token type constants")
def __str__(self):
return "[ Token: %s: '%s' ]" % (self.tokenstr[self.type], self.matched)
class SigmaConditionTokenizer:
"""Tokenize condition string into token sequence"""
tokendefs = [ # list of tokens, preferred recognition in given order, (token identifier, matching regular expression). Ignored if token id == None
(SigmaConditionToken.TOKEN_ONE, re.compile("1 of", re.IGNORECASE)),
(SigmaConditionToken.TOKEN_ALL, re.compile("all of", re.IGNORECASE)),
(None, re.compile("[\\s\\r\\n]+")),
(SigmaConditionToken.TOKEN_AGG, re.compile("count|distcount|min|max|avg|sum", re.IGNORECASE)),
(SigmaConditionToken.TOKEN_BY, re.compile("by", re.IGNORECASE)),
(SigmaConditionToken.TOKEN_EQ, re.compile("==")),
(SigmaConditionToken.TOKEN_LT, re.compile("<")),
(SigmaConditionToken.TOKEN_LTE, re.compile("<=")),
(SigmaConditionToken.TOKEN_GT, re.compile(">")),
(SigmaConditionToken.TOKEN_GTE, re.compile(">=")),
(SigmaConditionToken.TOKEN_PIPE, re.compile("\\|")),
(SigmaConditionToken.TOKEN_AND, re.compile("and", re.IGNORECASE)),
(SigmaConditionToken.TOKEN_OR, re.compile("or", re.IGNORECASE)),
(SigmaConditionToken.TOKEN_NOT, re.compile("not", re.IGNORECASE)),
(SigmaConditionToken.TOKEN_ID, re.compile("\\w+")),
(SigmaConditionToken.TOKEN_LPAR, re.compile("\\(")),
(SigmaConditionToken.TOKEN_RPAR, re.compile("\\)")),
]
def __init__(self, condition):
if type(condition) == str: # String that is parsed
self.tokens = list()
pos = 1
while len(condition) > 0:
for tokendef in self.tokendefs: # iterate over defined tokens and try to recognize the next one
match = tokendef[1].match(condition)
if match:
if tokendef[0] != None:
self.tokens.append(SigmaConditionToken(tokendef, match, pos + match.start()))
pos += match.end() # increase position and cut matched prefix from condition
condition = condition[match.end():]
break
else: # no valid token identified
raise SigmaParseError("Unexpected token in condition at position %d")
elif type(condition) == list: # List of tokens to be converted into SigmaConditionTokenizer class
self.tokens = condition
else:
raise TypeError("SigmaConditionTokenizer constructor expects string or list, got %s" % (type(condition)))
def __str__(self):
return " ".join([str(token) for token in self.tokens])
def __iter__(self):
return iter(self.tokens)
def __len__(self):
return len(self.tokens)
def __getitem__(self, i):
if type(i) == int:
return self.tokens[i]
elif type(i) == slice:
return SigmaConditionTokenizer(self.tokens[i])
else:
raise IndexError("Expected index or slice")
def __add__(self, other):
if isinstance(other, SigmaConditionTokenizer):
return SigmaConditionTokenizer(self.tokens + other.tokens)
elif isinstance(other, (SigmaConditionToken, ParseTreeNode)):
return SigmaConditionTokenizer(self.tokens + [ other ])
else:
raise TypeError("+ operator expects SigmaConditionTokenizer or token type, got %s: %s" % (type(other), str(other)))
def index(self, item):
return self.tokens.index(item)
class SigmaParseError(Exception):
pass
### Parse Tree Node Classes ###
class ParseTreeNode:
"""Parse Tree Node Base Class"""
def __init__(self):
raise NotImplementedError("ConditionBase is no usable class")
def __str__(self):
return "[ %s: %s ]" % (self.__doc__, str([str(item) for item in self.items]))
class ConditionBase(ParseTreeNode):
"""Base class for conditional operations"""
op = COND_NONE
items = None
def __init__(self):
raise NotImplementedError("ConditionBase is no usable class")
def add(self, item):
self.items.append(item)
def __iter__(self):
return iter(self.items)
class ConditionAND(ConditionBase):
"""AND Condition"""
op = COND_AND
def __init__(self, sigma=None, op=None, val1=None, val2=None):
if sigma == None and op == None and val1 == None and val2 == None: # no parameters given - initialize empty
self.items = list()
else: # called by parser, use given values
self.items = [ val1, val2 ]
class ConditionOR(ConditionAND):
"""OR Condition"""
op = COND_OR
class ConditionNOT(ConditionBase):
"""NOT Condition"""
op = COND_NOT
def __init__(self, sigma=None, op=None, val=None):
if sigma == None and op == None and val == None: # no parameters given - initialize empty
self.items = list()
else: # called by parser, use given values
self.items = [ val ]
def add(self, item):
if len(self.items) == 0:
super.add(item)
else:
raise ValueError("Only one element allowed in NOT condition")
@property
def item(self):
try:
return self.items[0]
except IndexError:
return None
class NodeSubexpression(ParseTreeNode):
"""Subexpression"""
def __init__(self, subexpr):
self.items = subexpr
# Parse tree converters: convert something into one of the parse tree node classes defined above
def convertAllOf(sigma, op, val):
"""Convert 'all of x' into ConditionAND"""
return NodeSubexpression(sigma.parse_definition_byname(val.matched, ConditionAND))
def convertOneOf(sigma, op, val):
"""Convert '1 of x' into ConditionOR"""
return NodeSubexpression(sigma.parse_definition_byname(val.matched, ConditionOR))
def convertId(sigma, op):
"""Convert search identifiers (lists or maps) into condition nodes according to spec defaults"""
return NodeSubexpression(sigma.parse_definition_byname(op.matched))
# Condition parser class
class SigmaConditionParser:
"""Parser for Sigma condition expression"""
searchOperators = [ # description of operators: (token id, number of operands, parse tree node class) - order == precedence
(SigmaConditionToken.TOKEN_ALL, 1, convertAllOf),
(SigmaConditionToken.TOKEN_ONE, 1, convertOneOf),
(SigmaConditionToken.TOKEN_ID, 0, convertId),
(SigmaConditionToken.TOKEN_NOT, 1, ConditionNOT),
(SigmaConditionToken.TOKEN_AND, 2, ConditionAND),
(SigmaConditionToken.TOKEN_OR, 2, ConditionOR),
]
def __init__(self, sigmaParser, tokens):
if SigmaConditionToken.TOKEN_PIPE in tokens: # aggregations are not yet supported
raise NotImplementedError("Aggregation expressions are not yet supported")
self.sigmaParser = sigmaParser
self.parsedSearch = self.parseSearch(tokens)
def parseSearch(self, tokens):
"""
Iterative parsing of search expression.
"""
# 1. Identify subexpressions with parentheses around them and parse them like a separate search expression
while SigmaConditionToken.TOKEN_LPAR in tokens:
lPos = tokens.index(SigmaConditionToken.TOKEN_LPAR)
lTok = tokens[lPos]
try:
rPos = tokens.index(SigmaConditionToken.TOKEN_RPAR)
rTok = tokens[rPos]
except ValueError as e:
raise SigmaParseError("Missing matching closing parentheses") from e
if lPos + 1 == rPos:
raise SigmaParseError("Empty subexpression at " + str(lTok.pos))
if lPos > rPos:
raise SigmaParseError("Closing parentheses at position " + str(rTok.pos) + " precedes opening at position " + str(lTok.pos))
subparsed = self.parseSearch(tokens[lPos + 1:rPos])[0]
tokens = tokens[:lPos] + NodeSubexpression(subparsed) + tokens[rPos + 1:] # replace parentheses + expression with group node that contains parsed subexpression
# 2. Iterate over all known operators in given precedence
for operator in self.searchOperators:
# 3. reduce all occurrences into corresponding parse tree nodes
while operator[0] in tokens:
pos_op = tokens.index(operator[0])
tok_op = tokens[pos_op]
if operator[1] == 0: # operator
treenode = operator[2](self.sigmaParser, tok_op)
tokens = tokens[:pos_op] + treenode + tokens[pos_op + 1:]
elif operator[1] == 1: # operator value
pos_val = pos_op + 1
tok_val = tokens[pos_val]
treenode = operator[2](self.sigmaParser, tok_op, tok_val)
tokens = tokens[:pos_op] + treenode + tokens[pos_val + 1:]
elif operator[1] == 2: # value1 operator value2
pos_val1 = pos_op - 1
pos_val2 = pos_op + 1
tok_val1 = tokens[pos_val1]
tok_val2 = tokens[pos_val2]
treenode = operator[2](self.sigmaParser, tok_op, tok_val1, tok_val2)
tokens = tokens[:pos_val1] + treenode + tokens[pos_val2 + 1:]
if len(tokens) != 1: # parse tree must begin with exactly one node
raise ValueError("Parse tree must have exactly one start node!")
return tokens
def __str__(self):
return str(self.parsedSearch)
def __len__(self):
return len(self.parsedSearch)
def getParseTree(self):
return(self.parsedSearch[0])