blob: 1eb78f14ba2dbfab1c0374075e3002694be3c782 [file] [log] [blame]
Zhuoyao Zhangcc44d2e2024-03-26 22:50:12 +00001# Copyright 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import dataclasses
16from importlib import resources
17import logging
18import os
19from pathlib import Path
20import re
21import signal
22import stat
23import subprocess
24import tempfile
25import textwrap
26import time
27import unittest
28import zipfile
29import sys
30
31EXII_RETURN_CODE = 0
32INTERRUPTED_RETURN_CODE = 130
33
34
35class RunToolWithLoggingTest(unittest.TestCase):
36
37 @classmethod
38 def setUpClass(cls):
39 super().setUpClass()
40 # Configure to print logging to stdout.
41 logging.basicConfig(filename=None, level=logging.DEBUG)
42 console = logging.StreamHandler(sys.stdout)
43 logging.getLogger('').addHandler(console)
44
45 def setUp(self):
46 super().setUp()
47 self.working_dir = tempfile.TemporaryDirectory()
48 # Run all the tests from working_dir which is our temp Android build top.
49 os.chdir(self.working_dir.name)
50 # Extract envsetup.zip which contains the envsetup.sh and other dependent
51 # scripts required to set up the build environments.
52 with resources.files("testdata").joinpath("envsetup.zip").open('rb') as p:
53 with zipfile.ZipFile(p, "r") as zip_f:
54 zip_f.extractall()
55
56 def tearDown(self):
57 self.working_dir.cleanup()
58 super().tearDown()
59
60 def test_does_not_log_when_logging_disabled(self):
61 test_tool = TestScript.create(self.working_dir)
62 test_logger = TestScript.create(self.working_dir)
63
64 self._run_script_and_wait(f"""
65 ANDROID_ENABLE_TOOL_LOGGING=false
66 ANDROID_TOOL_LOGGER="{test_logger.executable}"
67 run_tool_with_logging "FAKE_TOOL" {test_tool.executable} arg1 arg2
68 """)
69
70 test_tool.assert_called_once_with_args("arg1 arg2")
71 test_logger.assert_not_called()
72
73 def test_does_not_log_when_logger_var_unset(self):
74 test_tool = TestScript.create(self.working_dir)
75 test_logger = TestScript.create(self.working_dir)
76
77 self._run_script_and_wait(f"""
78 unset ANDROID_ENABLE_TOOL_LOGGING
79 ANDROID_TOOL_LOGGER="{test_logger.executable}"
80 run_tool_with_logging "FAKE_TOOL" {test_tool.executable} arg1 arg2
81 """)
82
83 test_tool.assert_called_once_with_args("arg1 arg2")
84 test_logger.assert_not_called()
85
86 def test_does_not_log_when_logger_var_empty(self):
87 test_tool = TestScript.create(self.working_dir)
88
89 self._run_script_and_wait(f"""
90 ANDROID_ENABLE_TOOL_LOGGING=true
91 ANDROID_TOOL_LOGGER=""
92 run_tool_with_logging "FAKE_TOOL" {test_tool.executable} arg1 arg2
93 """)
94
95 test_tool.assert_called_once_with_args("arg1 arg2")
96
97 def test_does_not_log_with_logger_unset(self):
98 test_tool = TestScript.create(self.working_dir)
99
100 self._run_script_and_wait(f"""
101 ANDROID_ENABLE_TOOL_LOGGING=true
102 unset ANDROID_TOOL_LOGGER
103 run_tool_with_logging "FAKE_TOOL" {test_tool.executable} arg1 arg2
104 """)
105
106 test_tool.assert_called_once_with_args("arg1 arg2")
107
108 def test_log_success_with_logger_enabled(self):
109 test_tool = TestScript.create(self.working_dir)
110 test_logger = TestScript.create(self.working_dir)
111
112 self._run_script_and_wait(f"""
113 ANDROID_ENABLE_TOOL_LOGGING=true
114 ANDROID_TOOL_LOGGER="{test_logger.executable}"
115 run_tool_with_logging "FAKE_TOOL" {test_tool.executable} arg1 arg2
116 """)
117
118 test_tool.assert_called_once_with_args("arg1 arg2")
119 expected_logger_args = (
120 "--tool_tag FAKE_TOOL --start_timestamp \d+\.\d+ --end_timestamp"
121 ' \d+\.\d+ --tool_args "arg1 arg2" --exit_code 0'
122 )
123 test_logger.assert_called_once_with_args(expected_logger_args)
124
125 def test_run_tool_output_is_same_with_and_without_logging(self):
126 test_tool = TestScript.create(self.working_dir, "echo 'tool called'")
127 test_logger = TestScript.create(self.working_dir)
128
129 run_tool_with_logging_stdout, run_tool_with_logging_stderr = (
130 self._run_script_and_wait(f"""
131 ANDROID_ENABLE_TOOL_LOGGING=true
132 ANDROID_TOOL_LOGGER="{test_logger.executable}"
133 run_tool_with_logging "FAKE_TOOL" {test_tool.executable} arg1 arg2
134 """)
135 )
136
137 run_tool_without_logging_stdout, run_tool_without_logging_stderr = (
138 self._run_script_and_wait(f"""
139 ANDROID_ENABLE_TOOL_LOGGING=true
140 ANDROID_TOOL_LOGGER="{test_logger.executable}"
141 {test_tool.executable} arg1 arg2
142 """)
143 )
144
145 self.assertEqual(
146 run_tool_with_logging_stdout, run_tool_without_logging_stdout
147 )
148 self.assertEqual(
149 run_tool_with_logging_stderr, run_tool_without_logging_stderr
150 )
151
152 def test_logger_output_is_suppressed(self):
153 test_tool = TestScript.create(self.working_dir)
154 test_logger = TestScript.create(self.working_dir, "echo 'logger called'")
155
156 run_tool_with_logging_output, _ = self._run_script_and_wait(f"""
157 ANDROID_ENABLE_TOOL_LOGGING=true
158 ANDROID_TOOL_LOGGER="{test_logger.executable}"
159 run_tool_with_logging "FAKE_TOOL" {test_tool.executable} arg1 arg2
160 """)
161
162 self.assertNotIn("logger called", run_tool_with_logging_output)
163
164 def test_logger_error_is_suppressed(self):
165 test_tool = TestScript.create(self.working_dir)
166 test_logger = TestScript.create(
167 self.working_dir, "echo 'logger failed' > /dev/stderr; exit 1"
168 )
169
170 _, err = self._run_script_and_wait(f"""
171 ANDROID_ENABLE_TOOL_LOGGING=true
172 ANDROID_TOOL_LOGGER="{test_logger.executable}"
173 run_tool_with_logging "FAKE_TOOL" {test_tool.executable} arg1 arg2
174 """)
175
176 self.assertNotIn("logger failed", err)
177
178 def test_log_success_when_tool_interrupted(self):
179 test_tool = TestScript.create(self.working_dir, script_body="sleep 100")
180 test_logger = TestScript.create(self.working_dir)
181
182 process = self._run_script_in_build_env(f"""
183 ANDROID_ENABLE_TOOL_LOGGING=true
184 ANDROID_TOOL_LOGGER="{test_logger.executable}"
185 run_tool_with_logging "FAKE_TOOL" {test_tool.executable} arg1 arg2
186 """)
187
188 pgid = os.getpgid(process.pid)
189 # Give sometime for the subprocess to start.
190 time.sleep(1)
191 # Kill the subprocess and any processes created in the same group.
192 os.killpg(pgid, signal.SIGINT)
193
194 returncode, _, _ = self._wait_for_process(process)
195 self.assertEqual(returncode, INTERRUPTED_RETURN_CODE)
196
197 expected_logger_args = (
198 "--tool_tag FAKE_TOOL --start_timestamp \d+\.\d+ --end_timestamp"
199 ' \d+\.\d+ --tool_args "arg1 arg2" --exit_code 130'
200 )
201 test_logger.assert_called_once_with_args(expected_logger_args)
202
203 def test_logger_can_be_toggled_on(self):
204 test_tool = TestScript.create(self.working_dir)
205 test_logger = TestScript.create(self.working_dir)
206
207 self._run_script_and_wait(f"""
208 ANDROID_ENABLE_TOOL_LOGGING=false
209 ANDROID_TOOL_LOGGER="{test_logger.executable}"
210 ANDROID_ENABLE_TOOL_LOGGING=true
211 run_tool_with_logging "FAKE_TOOL" {test_tool.executable} arg1 arg2
212 """)
213
214 test_logger.assert_called_with_times(1)
215
216 def test_logger_can_be_toggled_off(self):
217 test_tool = TestScript.create(self.working_dir)
218 test_logger = TestScript.create(self.working_dir)
219
220 self._run_script_and_wait(f"""
221 ANDROID_ENABLE_TOOL_LOGGING=true
222 ANDROID_TOOL_LOGGER="{test_logger.executable}"
223 ANDROID_ENABLE_TOOL_LOGGING=false
224 run_tool_with_logging "FAKE_TOOL" {test_tool.executable} arg1 arg2
225 """)
226
227 test_logger.assert_not_called()
228
229 def _create_build_env_script(self) -> str:
230 return f"""
231 source {Path(self.working_dir.name).joinpath("build/make/envsetup.sh")}
232 """
233
234 def _run_script_and_wait(self, test_script: str) -> tuple[str, str]:
235 process = self._run_script_in_build_env(test_script)
236 returncode, out, err = self._wait_for_process(process)
237 logging.debug("script stdout: %s", out)
238 logging.debug("script stderr: %s", err)
239 self.assertEqual(returncode, EXII_RETURN_CODE)
240 return out, err
241
242 def _run_script_in_build_env(self, test_script: str) -> subprocess.Popen:
243 setup_build_env_script = self._create_build_env_script()
244 return subprocess.Popen(
245 setup_build_env_script + test_script,
246 shell=True,
247 stdout=subprocess.PIPE,
248 stderr=subprocess.PIPE,
249 text=True,
250 start_new_session=True,
251 executable='/bin/bash'
252 )
253
254 def _wait_for_process(
255 self, process: subprocess.Popen
256 ) -> tuple[int, str, str]:
257 pgid = os.getpgid(process.pid)
258 out, err = process.communicate()
259 # Wait for all process in the same group to complete since the logger runs
260 # as a separate detached process.
261 self._wait_for_process_group(pgid)
262 return (process.returncode, out, err)
263
264 def _wait_for_process_group(self, pgid: int, timeout: int = 5):
265 """Waits for all subprocesses within the process group to complete."""
266 start_time = time.time()
267 while True:
268 if time.time() - start_time > timeout:
269 raise TimeoutError(
270 f"Process group did not complete after {timeout} seconds"
271 )
272 for pid in os.listdir("/proc"):
273 if pid.isdigit():
274 try:
275 if os.getpgid(int(pid)) == pgid:
276 time.sleep(0.1)
277 break
278 except (FileNotFoundError, PermissionError, ProcessLookupError):
279 pass
280 else:
281 # All processes have completed.
282 break
283
284
285@dataclasses.dataclass
286class TestScript:
287 executable: Path
288 output_file: Path
289
290 def create(temp_dir: Path, script_body: str = ""):
291 with tempfile.NamedTemporaryFile(dir=temp_dir.name, delete=False) as f:
292 output_file = f.name
293
294 with tempfile.NamedTemporaryFile(dir=temp_dir.name, delete=False) as f:
295 executable = f.name
296 executable_contents = textwrap.dedent(f"""
297 #!/bin/bash
298
299 echo "${{@}}" >> {output_file}
300 {script_body}
301 """)
302 f.write(executable_contents.encode("utf-8"))
303
304 os.chmod(f.name, os.stat(f.name).st_mode | stat.S_IEXEC)
305
306 return TestScript(executable, output_file)
307
308 def assert_called_with_times(self, expected_call_times: int):
309 lines = self._read_contents_from_output_file()
310 assert len(lines) == expected_call_times, (
311 f"Expect to call {expected_call_times} times, but actually called"
312 f" {len(lines)} times."
313 )
314
315 def assert_called_with_args(self, expected_args: str):
316 lines = self._read_contents_from_output_file()
317 assert len(lines) > 0
318 assert re.search(expected_args, lines[0]), (
319 f"Expect to call with args {expected_args}, but actually called with"
320 f" args {lines[0]}."
321 )
322
323 def assert_not_called(self):
324 self.assert_called_with_times(0)
325
326 def assert_called_once_with_args(self, expected_args: str):
327 self.assert_called_with_times(1)
328 self.assert_called_with_args(expected_args)
329
330 def _read_contents_from_output_file(self) -> list[str]:
331 with open(self.output_file, "r") as f:
332 return f.readlines()
333
334
335if __name__ == "__main__":
336 unittest.main()