forked from oppia/oppia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscripts_test_utils.py
249 lines (204 loc) · 8.55 KB
/
scripts_test_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# coding: utf-8
#
# Copyright 2014 The Oppia Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common utilities for test classes."""
from __future__ import annotations
import io
import signal
import psutil
class PopenStub:
"""Stubs the API of psutil.Popen() to make unit tests less expensive.
Starting a new process for every unit test is intrinsically more expensive
than checking an object's attributes, and for some developers it isn't even
possible for them to kill a spawned process due to a lack of permission on
their operating system.
We used to spawn real processes for tests, and observed the following:
With actual processes: Runs 78 tests in 50.7 seconds
With PopenStub: Runs 97 tests in 32.3 seconds
Thus, using this stub gives us a ~4.62x speed boost per-test.
Attributes:
pid: int. The ID of the process.
stdout: bytes. The text written to standard output by the process.
stderr: bytes. The text written to error output by the process.
poll_count: int. The number of times poll() has been called.
signals_received: list(int). List of received signals (as ints) in order
of receipt.
terminate_count: int. Number of times terminate() has been called.
kill_count: int. Number of times kill() has been called.
alive: bool. Whether the process should be considered to be alive.
reject_signal: bool. Whether to raise OSError in send_signal().
reject_terminate: bool. Whether to raise OSError in terminate().
reject_kill: bool. Whether to raise OSError in kill().
unresponsive: bool. Whether the process will end normally.
returncode: int. The return code of the process.
"""
def __init__(
self, pid=1, name='process', stdout=b'', stderr=b'',
reject_signal=False, reject_terminate=False, reject_kill=False,
alive=True, unresponsive=False, return_code=0, child_procs=None):
"""Initializes a new PopenStub instance.
Args:
pid: int. The ID of the process.
name: str. The name of the process.
stdout: bytes. The text written to standard output by the process.
stderr: bytes. The text written to error output by the process.
return_code: int. The return code of the process.
reject_signal: bool. Whether to raise OSError in send_signal().
reject_terminate: bool. Whether to raise OSError in terminate().
reject_kill: bool. Whether to raise OSError in kill().
alive: bool. Whether the process should be considered to be alive.
unresponsive: bool. Whether the process will end normally.
child_procs: list(PopenStub)|None. Processes "owned" by the stub, or
None if there aren't any.
"""
self.pid = pid
self.stdin = io.BytesIO()
self.stdout = io.BytesIO(stdout)
self.stderr = io.BytesIO(stderr)
self.poll_count = 0
self.signals_received = []
self.terminate_count = 0
self.kill_count = 0
self.alive = alive
self.reject_signal = reject_signal
self.reject_terminate = reject_terminate
self.reject_kill = reject_kill
self.unresponsive = unresponsive
self._name = name
self._child_procs = tuple(child_procs) if child_procs else ()
self._return_code = return_code
@property
def returncode(self):
"""Returns the return code of the process.
Returns:
int. The return code of the process.
"""
return self._return_code
@returncode.setter
def returncode(self, return_code):
"""Assigns a return code to the process.
Args:
return_code: int. The return code to assign to the process.
"""
self._return_code = return_code
def is_running(self):
"""Returns whether the process is running.
Returns:
bool. The value of self.alive, which mocks whether the process is
still alive.
"""
return self.alive
def name(self):
"""Returns the name of the process.
Returns:
str. The name of the process.
"""
return self._name
def children(self, recursive=False):
"""Returns the children spawned by this process.
Args:
recursive: bool. Whether to also return non-direct decendants from
self (i.e. children of children).
Returns:
list(PopenStub). A list of the child processes.
"""
children = []
for child in self._child_procs:
children.append(child)
if recursive:
children.extend(child.children(recursive=True))
return children
def terminate(self):
"""Increment terminate_count.
Mocks the process being terminated.
"""
self.terminate_count += 1
if self.reject_terminate:
raise OSError('rejected')
if self.unresponsive:
return
self._exit(return_code=1)
def kill(self):
"""Increment kill_count.
NOTE: kill() does not respect self.unresponsive.
Mocks the process being killed.
"""
self.kill_count += 1
if self.reject_kill:
raise OSError('rejected')
self._exit(return_code=1)
def send_signal(self, signal_number):
"""Append signal to self.signals_received.
Mocks receiving a process signal. If a SIGINT signal is received (e.g.
from ctrl-C) and self.unresponsive is True, then we call self._exit().
Args:
signal_number: int. The number of the received signal.
"""
self.signals_received.append(signal_number)
if self.reject_signal:
raise OSError('rejected')
if signal_number == signal.SIGINT and not self.unresponsive:
self._exit(return_code=1)
def poll(self):
"""Increment poll_count.
Mocks checking whether the process is still alive.
Returns:
int|None. The return code of the process if it has ended, otherwise
None.
"""
self.poll_count += 1
return None if self.alive else self._return_code
def wait(self, timeout=None): # pylint: disable=unused-argument
"""Wait for the process completion.
Mocks the process waiting for completion before it continues execution.
No time is actually spent waiting, however, since the lifetime of the
program is completely defined by the initialization params.
Args:
timeout: int|None. Time to wait before raising an exception, or None
to wait indefinitely.
"""
if not self.alive:
return
if not self.unresponsive:
self._exit()
elif timeout is not None:
raise psutil.TimeoutExpired(timeout)
else:
raise RuntimeError('PopenStub has entered an infinite loop')
def communicate(self, input=b''): # pylint: disable=unused-argument, redefined-builtin
"""Mocks an interaction with the process.
Args:
input: bytes. Input string to write to the process's stdin.
Returns:
tuple(bytes, bytes). The stdout and stderr of the process,
respectively.
"""
if not self.alive:
return self.stdout.getvalue(), self.stderr.getvalue()
if not self.unresponsive:
self.stdin.write(input)
self._exit()
return self.stdout.getvalue(), self.stderr.getvalue()
else:
raise RuntimeError('PopenStub has entered an infinite loop')
def _exit(self, return_code=None):
"""Simulates the end of the process.
Args:
return_code: int|None. The return code of the program. If None, the
return code assigned at initialization is used instead.
"""
self.alive = False
if return_code is not None:
self._return_code = return_code