forked from chromium/chromium
-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathtest_analysis.py
executable file
·217 lines (198 loc) · 9.48 KB
/
test_analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#!/usr/bin/env python3
# Copyright 2021 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test analysis functions for the testing framework.
"""
from collections import defaultdict
import datetime
import logging
import re
import os
from typing import List, Set
from models import Action
from models import CoverageTest
from models import CoverageTestsByPlatform
from models import ActionType
from models import CoverageTestsByPlatformSet
from models import TestId
from models import TestIdsTestNamesByPlatformSet
from models import TestIdTestNameTuple
from models import TestPartitionDescription
from models import TestPlatform
def filter_tests_for_partition(tests: List[CoverageTest],
partition: TestPartitionDescription
) -> List[CoverageTest]:
def DoesTestHaveActionWithPrefixes(test: CoverageTest):
"""Returns if the given tests has any actions with the given prefixes"""
nonlocal partition
for action in test.actions:
for prefix in partition.action_name_prefixes:
if action.name.startswith(prefix):
return True
return list(filter(DoesTestHaveActionWithPrefixes, tests))
def compare_and_print_tests_to_remove_and_add(
existing_tests: TestIdsTestNamesByPlatformSet,
required_tests: CoverageTestsByPlatformSet,
test_partitions: List[TestPartitionDescription],
default_partition: TestPartitionDescription, add_to_file: bool):
"""
Given the existing tests on disk and the required tests, print out the
changes that need to happen to make them match. This also takes into account
test partitioning, so tests are asked to be written to the appropriate test
partition file.
Note: This does NOT support moving tests between partition files. If a test
was found in any partition file, then it is ignored.
"""
def print_tests(filename: str, tests: List[CoverageTest],
partition: TestPartitionDescription, add_to_file: bool):
new_test_str: str = ""
for test in tests:
new_test_str += ("\n" + test.generate_browsertest(partition) +
"\n")
if add_to_file:
if os.path.exists(filename):
with open(filename, "r") as f:
test_file = f.read()
# Find the last test in the test file
if re.search(r"IN_PROC_BROWSER_TEST_F(.|\n)*?}\n", test_file):
res = re.finditer(r"IN_PROC_BROWSER_TEST_F(.|\n)*?}\n",
test_file)
last_test_end_index = list(res)[-1].end()
# Find the first closing parenthesis (end of namespace) if
# there is no test in the file
elif "}" in test_file:
last_test_end_index = test_file.find("}") - 1
else:
last_test_end_index = len(test_file)
new_content = (test_file[:last_test_end_index] + new_test_str +
test_file[last_test_end_index:])
with open(filename, "w") as f:
f.write(new_content)
else:
print(f"\n\nCreate a new test file: {filename}\n"
"Remember to add the new test file to the BUILD file.\n"
"Add the following tests to the new test file:\n"
f"{new_test_str}")
else:
print(f"\n\nAdd the following tests to {filename}:\n"
f"{new_test_str}")
test_ids_to_keep: TestIdsByPlatformSet = defaultdict(lambda: set())
for platforms, tests in required_tests.items():
tests_to_add: List[CoverageTest] = []
for test in tests:
if platforms in existing_tests:
existing_test_set = set(
[test_id for (test_id, _) in existing_tests[platforms]])
if test.id not in existing_test_set:
tests_to_add.append(test)
else:
test_ids_to_keep[platforms].add(test.id)
else:
tests_to_add.append(test)
tests_added_to_partition: Set[TestId] = set()
for partition in test_partitions:
tests_to_add_partition = filter_tests_for_partition(
tests_to_add, partition)
if not tests_to_add_partition:
continue
# Record all tests to ensure we don't have duplicates in different
# files, and to output remaining tests to the default partition.
for test in tests_to_add_partition:
if test.id in tests_added_to_partition:
raise ValueError(
"Cannot have a test written to multiple test files.")
tests_added_to_partition.add(test.id)
filename = partition.generate_browsertest_filepath(platforms)
print_tests(filename, tests_to_add_partition, partition,
add_to_file)
# All remaining tests go into the default partition
default_tests: List[CoverageTest] = [
test for test in tests_to_add
if test.id not in tests_added_to_partition
]
if not default_tests:
continue
filename = default_partition.generate_browsertest_filepath(platforms)
print_tests(filename, default_tests, default_partition, add_to_file)
# Print out all tests to remove. To keep the algorithm simple the partition
# is not kept track of.
for platforms, test_ids_names in existing_tests.items():
tests_to_remove = []
prompt_str = ""
nice_platform_str = ", ".join(
[f"{platform}" for platform in platforms])
if platforms not in test_ids_to_keep:
prompt_str = (f"\n\nRemove ALL tests from the file for the "
f"platforms [{nice_platform_str}]:\n")
tests_to_remove = [test_name for (_, test_name) in test_ids_names]
else:
prompt_str = (f"\n\nRemove these tests from the file for the "
f"platforms [{nice_platform_str}]:\n")
tests_to_remove = [
test_name for (test_id, test_name) in test_ids_names
if test_id not in test_ids_to_keep[platforms]
]
if not tests_to_remove:
continue
print(f"{prompt_str}{', '.join(tests_to_remove)}")
def expand_parameterized_tests(coverage_tests: List[CoverageTest]
) -> List[CoverageTest]:
"""
Takes a list of coverage tests that contain parameterized actions, and
expands all of the tests with those actions to result in a list of tests
without parameterized actions.
"""
def get_all_parameterized_tests(test_actions: List[Action]
) -> List[List[Action]]:
"""
Takes a list of actions with possible parameterized actions, and outputs
a list of resulting tests with all parameterized actions expanded.
"""
if not test_actions:
return [[]]
for i, action in enumerate(test_actions):
if action.type is not ActionType.PARAMETERIZED:
continue
actions_before_parameterized = test_actions[:i]
actions_after_parameterized = test_actions[i + 1:]
resulting_tests = []
for output_action in action.output_actions:
remaining_expanded_tests = get_all_parameterized_tests(
actions_after_parameterized)
for remaining_test in remaining_expanded_tests:
test = (actions_before_parameterized + [output_action] +
remaining_test)
resulting_tests.append(test)
return resulting_tests
# No parameterized actions were found, so just return the test actions.
return [test_actions]
result_tests = []
for test in coverage_tests:
expanded_tests = get_all_parameterized_tests(test.actions)
logging.info(f"Generated {len(expanded_tests)} test/s from {test.id}")
for resulting_test in get_all_parameterized_tests(test.actions):
result_tests.append(CoverageTest(resulting_test, test.platforms))
return result_tests
def filter_coverage_tests_for_platform(tests: List[CoverageTest],
platform: TestPlatform
) -> List[CoverageTest]:
def IsSupportedOnPlatform(test: CoverageTest):
return platform in test.platforms
return list(filter(IsSupportedOnPlatform, tests))
def partition_framework_tests_per_platform_combination(
generated_tests_per_platform: CoverageTestsByPlatform
) -> CoverageTestsByPlatformSet:
test_id_to_platforms = defaultdict(lambda: set())
test_id_to_test = {}
platform_set_to_tests = defaultdict(lambda: list())
for platform, tests in generated_tests_per_platform.items():
for test in tests:
test_id_to_platforms[test.id].add(platform)
if test.id not in test_id_to_test:
test_id_to_test[test.id] = CoverageTest(test.actions, set())
test_id_to_test[test.id].platforms.add(platform)
for test_id, platforms in test_id_to_platforms.items():
platforms = frozenset(platforms)
platform_set_to_tests[platforms].append(test_id_to_test[test_id])
return platform_set_to_tests