Skip to content

Commit

Permalink
Add fallback for AllUsersProfile env var + Require mutation pool for …
Browse files Browse the repository at this point in the history
…more APIs.
  • Loading branch information
ogarod committed Aug 18, 2017
1 parent 672adef commit 5dc6a90
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 33 deletions.
29 changes: 9 additions & 20 deletions grr/server/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,30 +928,19 @@ def Name(self):
return self.__class__.__name__

@classmethod
def MarkForTermination(cls,
flow_urn,
mutation_pool=None,
reason=None,
sync=False,
token=None):
def MarkForTermination(cls, flow_urn, reason=None, mutation_pool=None):
"""Mark the flow for termination as soon as any of its states are called."""
# Doing a blind write here using low-level data store API. Accessing
# the flow via AFF4 is not really possible here, because it forces a state
# to be written in Close() method.
if mutation_pool:
mutation_pool.Set(
flow_urn,
cls.SchemaCls.PENDING_TERMINATION.predicate,
PendingFlowTermination(reason=reason),
replace=False)
else:
data_store.DB.Set(
flow_urn,
cls.SchemaCls.PENDING_TERMINATION.predicate,
PendingFlowTermination(reason=reason),
replace=False,
sync=sync,
token=token)
if mutation_pool is None:
raise ValueError("Mutation pool can't be none.")

mutation_pool.Set(
flow_urn,
cls.SchemaCls.PENDING_TERMINATION.predicate,
PendingFlowTermination(reason=reason),
replace=False)

@classmethod
def TerminateFlow(cls,
Expand Down
5 changes: 3 additions & 2 deletions grr/server/flow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,9 @@ class FlowTerminationTest(BasicFlowTest):

def testFlowMarkedForTerminationTerminatesInStateHandler(self):
flow_obj = self.FlowSetup(flow_test_lib.FlowOrderTest.__name__)
flow.GRRFlow.MarkForTermination(
flow_obj.urn, reason="because i can", token=self.token)
with data_store.DB.GetMutationPool(token=self.token) as pool:
flow.GRRFlow.MarkForTermination(
flow_obj.urn, reason="because i can", mutation_pool=pool)

def ProcessFlow():
for _ in flow_test_lib.TestFlowHelper(
Expand Down
17 changes: 17 additions & 0 deletions grr/server/flows/general/artifact_fallbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,20 @@ def End(self, responses):
raise flow.FlowError("Couldn't guess the system root and drive location")

super(SystemRootSystemDriveFallbackFlow, self).End()


class WindowsAllUsersProfileFallbackFlow(artifact.ArtifactFallbackCollector):
r"""Flow that provides a default value for the AllUsersProfile registry key.
Newer versions of Windows will typically not have the
HKLM\Software\Microsoft\Windows NT\CurrentVersion\ProfileList\AllUsersProfile
key.
"""

artifacts = ["WindowsEnvironmentVariableAllUsersProfile"]

@flow.StateHandler()
def Start(self):
data = rdf_protodict.DataBlob().SetValue("All Users")
self.SendReply(rdf_client.StatEntry(registry_data=data))
self.state.success = True
3 changes: 1 addition & 2 deletions grr/server/hunts/standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,7 @@ def Stop(self, reason=None):
flow.GRRFlow.MarkForTermination(
started_flow,
reason="Parent hunt stopped.",
mutation_pool=mutation_pool,
token=self.token)
mutation_pool=mutation_pool)
num_terminated_flows += 1

self.Log("%d flows terminated.", num_terminated_flows)
Expand Down
15 changes: 8 additions & 7 deletions grr/server/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,14 @@ def Delete(self, queue, tasks, mutation_pool=None):
queue: A queue to clear.
tasks: A list of tasks to remove. Tasks may be Task() instances
or integers representing the task_id.
mutation_pool: An optional MutationPool object to schedule deletions on.
If not given, self.data_store is used directly.
mutation_pool: A MutationPool object to schedule deletions on.
Raises:
ValueError: Mutation pool was not passed in.
"""
if mutation_pool is None:
raise ValueError("Mutation pool can't be none.")

if queue:
predicates = []
for task in tasks:
Expand All @@ -413,11 +418,7 @@ def Delete(self, queue, tasks, mutation_pool=None):
task_id = int(task)
predicates.append(self._TaskIdToColumn(task_id))

if mutation_pool:
mutation_pool.DeleteAttributes(queue, predicates)
else:
self.data_store.DeleteAttributes(
queue, predicates, token=self.token, sync=False)
mutation_pool.DeleteAttributes(queue, predicates)

def Schedule(self, tasks, mutation_pool, timestamp=None):
"""Schedule a set of Task() instances."""
Expand Down
5 changes: 3 additions & 2 deletions grr/server/queue_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def setUp(self):
time.time = lambda: self._current_mock_time

def tearDown(self):
super(QueueManagerTest, self).tearDown()
time.time = self.old_time
super(QueueManagerTest, self).tearDown()

def testCountsActualNumberOfCompletedResponsesWhenApplyingTheLimit(self):
session_id = rdfvalue.SessionID(flow_name="test")
Expand Down Expand Up @@ -255,7 +255,8 @@ def testDelete(self):
self.assertEqual(tasks[0].session_id, "aff4:/Test")

# Now delete the task
manager.Delete(test_queue, tasks)
with data_store.DB.GetMutationPool(token=self.token) as pool:
manager.Delete(test_queue, tasks, mutation_pool=pool)

# Should not exist in the table
value, ts = data_store.DB.Resolve(
Expand Down

0 comments on commit 5dc6a90

Please sign in to comment.