forked from google/grr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathartifacts.py
107 lines (85 loc) · 4.05 KB
/
artifacts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python
"""End to end tests that run ArtifactCollectorFlow."""
from grr.endtoend_tests import base
from grr.lib import aff4
from grr.lib.rdfvalues import client as rdf_client
class TestDarwinPersistenceMechanisms(base.AutomatedTest):
"""Test DarwinPersistenceMechanisms."""
platforms = ["Darwin"]
flow = "ArtifactCollectorFlow"
test_output_path = "analysis/persistence/testing"
args = {"artifact_list": ["DarwinPersistenceMechanisms"],
"output": test_output_path}
def CheckFlow(self):
output_urn = self.client_id.Add(self.test_output_path)
collection = aff4.FACTORY.Open(output_urn, mode="r", token=self.token)
self.assertIsInstance(collection, aff4.RDFValueCollection)
persistence_list = list(collection)
# Make sure there are at least some results.
self.assertGreater(len(persistence_list), 5)
launchservices = "/System/Library/CoreServices/launchservicesd"
for p in persistence_list:
if p.pathspec.path == launchservices:
return
self.fail("Service listing does not contain launchservices: %s." %
launchservices)
class TestRootDiskVolumeUsage(base.AutomatedTest):
"""Test RootDiskVolumeUsage."""
platforms = ["Linux", "Darwin"]
flow = "ArtifactCollectorFlow"
test_output_path = "analysis/diskusage/testing"
args = {"artifact_list": ["RootDiskVolumeUsage"],
"output": test_output_path}
def CheckFlow(self):
output_urn = self.client_id.Add(self.test_output_path)
collection = aff4.FACTORY.Open(output_urn, mode="r", token=self.token)
self.assertIsInstance(collection, aff4.RDFValueCollection)
volume_list = list(collection)
# Make sure there are at least some results.
self.assertEqual(len(volume_list), 1)
self.assertEqual(volume_list[0].unix.mount_point, "/")
self.assertTrue(isinstance(volume_list[0].FreeSpacePercent(), float))
class TestParserDependency(base.AutomatedTest):
"""Test Artifacts complete when KB is empty."""
platforms = ["Windows"]
flow = "ArtifactCollectorFlow"
test_output_path = "analysis/testing/TestParserDependency"
args = {"artifact_list": ["WinPathEnvironmentVariable"], "dependencies":
"FETCH_NOW", "output": test_output_path}
def setUp(self):
# Set the KB to an empty object
client = aff4.FACTORY.Open(self.client_id, mode="rw", token=self.token)
self.old_kb = client.Get(client.Schema.KNOWLEDGE_BASE)
client.Set(client.Schema.KNOWLEDGE_BASE, rdf_client.KnowledgeBase())
client.Flush()
super(TestParserDependency, self).setUp()
def CheckFlow(self):
output_urn = self.client_id.Add(self.test_output_path)
self.collection = aff4.FACTORY.Open(output_urn, mode="r", token=self.token)
self.assertIsInstance(self.collection, aff4.RDFValueCollection)
volume_list = list(self.collection)
# Make sure there are at least some results.
self.assertEqual(len(volume_list), 1)
def tearDown(self):
# Set the KB to an empty object
client = aff4.FACTORY.Open(self.client_id, mode="rw", token=self.token)
client.Set(client.Schema.KNOWLEDGE_BASE, self.old_kb)
client.Flush()
super(TestParserDependency, self).tearDown()
class TestParserDependencyWinDir(TestParserDependency):
test_output_path = "analysis/testing/TestParserDependencyWinDir"
args = {"artifact_list": ["WinDirEnvironmentVariable"], "dependencies":
"FETCH_NOW", "output": test_output_path}
class TestParserDependencyTemp(TestParserDependency):
test_output_path = "analysis/testing/TestParserDependencyTemp"
args = {"artifact_list": ["TempEnvironmentVariable"], "dependencies":
"FETCH_NOW", "output": test_output_path}
class TestParserDependencyUserShellFolders(TestParserDependency):
test_output_path = "analysis/testing/TestParserDependencyUserShellFolders"
args = {"artifact_list": ["UserShellFolders"], "dependencies": "FETCH_NOW",
"output": test_output_path}
def CheckFlow(self):
super(TestParserDependencyUserShellFolders, self).CheckFlow()
for userobj in self.collection:
self.assertTrue(userobj.appdata)
self.assertTrue(userobj.temp)