forked from SigmaHQ/sigma
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_logsource.py
316 lines (269 loc) · 10.4 KB
/
test_logsource.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
#!/usr/bin/env python3
"""
Checks for logsource or fieldname errors on all rules
Run using the command
# python test_logsource.py
"""
import os
import unittest
import yaml
from colorama import init
from colorama import Fore
import json
class TestRules(unittest.TestCase):
path_to_rules_ = [
"rules",
"rules-emerging-threats",
"rules-placeholder",
"rules-threat-hunting",
"rules-compliance",
]
path_to_rules = []
for path_ in path_to_rules_:
path_to_rules.append(
os.path.join(os.path.dirname(os.path.realpath(__name__)), path_)
)
# Helper functions
def yield_next_rule_file_path(self, path_to_rules: list) -> str:
for path_ in path_to_rules:
for root, _, files in os.walk(path_):
for file in files:
if file.endswith(".yml"):
yield os.path.join(root, file)
def get_rule_yaml(self, file_path: str) -> dict:
data = []
with open(file_path, encoding="utf-8") as f:
yaml_parts = yaml.safe_load_all(f)
for part in yaml_parts:
data.append(part)
return data
def get_rule_part(self, file_path: str, part_name: str):
yaml_dicts = self.get_rule_yaml(file_path)
for yaml_part in yaml_dicts:
if part_name in yaml_part.keys():
return yaml_part[part_name]
return None
def get_detection_field(self, detection: dict):
data = []
def get_field_name(selection: dict):
name = []
for field in selection:
if field == "|all":
continue
elif "|" in field:
name.append(field.split("|")[0])
else:
name.append(field)
return name
for search_identifier in detection:
if isinstance(detection[search_identifier], dict):
data += get_field_name(detection[search_identifier])
if isinstance(detection[search_identifier], list):
for list_value in detection[search_identifier]:
if isinstance(list_value, dict):
data += get_field_name(list_value)
return data
def full_logsource(self, logsource: dict) -> dict:
data = {}
data["product"] = (
logsource["product"] if "product" in logsource.keys() else None
)
data["category"] = (
logsource["category"] if "category" in logsource.keys() else None
)
data["service"] = (
logsource["service"] if "service" in logsource.keys() else None
)
return data
def exist_logsource(self, logsource: dict) -> bool:
# Check New product
if logsource["product"]:
if logsource["product"] in fieldname_dict.keys():
product = logsource["product"]
else:
return False
else:
product = "empty"
if (
logsource["category"]
and logsource["category"] in fieldname_dict[product]["category"].keys()
):
return True
elif (
logsource["service"]
and logsource["service"] in fieldname_dict[product]["service"].keys()
):
return True
elif logsource["category"] == None and logsource["service"] == None:
return True # We known the product but there are no category or service
return False
def get_logsource(self, logsource: dict) -> list:
data = None
product = (
logsource["product"]
if logsource["product"] in fieldname_dict.keys()
else "empty"
)
if (
logsource["category"]
and logsource["category"] in fieldname_dict[product]["category"].keys()
):
data = fieldname_dict[product]["category"][logsource["category"]]
elif (
logsource["service"]
and logsource["service"] in fieldname_dict[product]["service"].keys()
):
data = fieldname_dict[product]["service"][logsource["service"]]
elif logsource["category"] == None and logsource["service"] == None:
data = fieldname_dict[product]["empty"]
return data
def not_commun(self, logsource: dict, data: list) -> bool:
product = (
logsource["product"]
if logsource["product"] in fieldname_dict.keys()
else "empty"
)
if fieldname_dict[product]["common"] == data:
return False
else:
return True
#
# test functions
#
def test_invalid_logsource_attributes(self):
faulty_rules = []
valid_logsource = [
"category",
"product",
"service",
"definition",
]
for file in self.yield_next_rule_file_path(self.path_to_rules):
logsource = self.get_rule_part(file_path=file, part_name="logsource")
if not logsource:
print(Fore.RED + "Rule {} has no 'logsource'.".format(file))
faulty_rules.append(file)
continue
valid = True
for key in logsource:
if key not in valid_logsource:
print(
Fore.RED
+ "Rule {} has a logsource with an invalid field ({})".format(
file, key
)
)
valid = False
elif not isinstance(logsource[key], str):
print(
Fore.RED
+ "Rule {} has a logsource with an invalid field type ({})".format(
file, key
)
)
valid = False
if not valid:
faulty_rules.append(file)
self.assertEqual(
faulty_rules,
[],
Fore.RED
+ "There are rules with non-conform 'logsource' fields. Please check: https://github.com/SigmaHQ/sigma/wiki/Rule-Creation-Guide#log-source",
)
def test_logsource_value(self):
faulty_rules = []
for file in self.yield_next_rule_file_path(self.path_to_rules):
logsource = self.get_rule_part(file_path=file, part_name="logsource")
if logsource:
full_logsource = self.full_logsource(logsource)
if not self.exist_logsource(full_logsource):
faulty_rules.append(file)
print(
Fore.RED
+ "Rule {} has the unknown logsource product/category/service ({}/{}/{})".format(
file,
full_logsource["product"],
full_logsource["category"],
full_logsource["service"],
)
)
self.assertEqual(
faulty_rules,
[],
Fore.RED + "There are rules with non-conform 'logsource' values.",
)
def test_fieldname_case(self):
files_with_fieldname_issues = []
for file in self.yield_next_rule_file_path(self.path_to_rules):
logsource = self.get_rule_part(file_path=file, part_name="logsource")
detection = self.get_rule_part(file_path=file, part_name="detection")
if logsource and detection:
full_logsource = self.full_logsource(logsource)
list_valid = self.get_logsource(full_logsource)
first_time = True
if list_valid and self.not_commun(full_logsource, list_valid):
for field in self.get_detection_field(detection):
if not field in list_valid:
print(
Fore.RED
+ "Rule {} has the invalid field <{}>".format(
file, field
)
)
if first_time:
files_with_fieldname_issues.append(file)
first_time = False # can be many error in the same rule
self.assertEqual(
files_with_fieldname_issues,
[],
Fore.RED
+ "There are rule files which contains unknown field or with cast error",
)
def load_fields_json(name: str):
data = {}
file_path = os.path.abspath(os.path.dirname(__file__)) + "/" + name
with open(file_path, "r") as file:
json_dict = json.load(file)
for product in json_dict["legit"]:
data[product] = json_dict["legit"][product]
for product in json_dict["addon"]:
for category in json_dict["addon"][product]["category"]:
data[product]["category"][category] += json_dict["addon"][product][
"category"
][category]
for service in json_dict["addon"][product]["service"]:
data[product]["service"][service] += json_dict["addon"][product]["service"][
service
]
# We use some extracted hash
# Add common field
for product in data:
for category in data[product]["category"]:
if "Hashes" in data[product]["category"][category]:
data[product]["category"][category] += [
"md5",
"sha1",
"sha256",
"Imphash",
]
if (
"Hash" in data[product]["category"][category]
): # Sysmon 15 create_stream_hash
data[product]["category"][category] += [
"md5",
"sha1",
"sha256",
"Imphash",
]
if "common" in data[product].keys():
data[product]["category"][category] += data[product]["common"]
for service in data[product]["service"]:
if "common" in data[product].keys():
data[product]["service"][service] += data[product]["common"]
return data
if __name__ == "__main__":
init(autoreset=True)
# load field name information
fieldname_dict = load_fields_json("logsource.json")
# Run the tests
unittest.main()