blob: 1a946a1f82d413c992ead2b9a69f9bbaf71f1940 [file] [log] [blame]
Zhuoyao Zhang0edf1d32024-05-21 23:57:11 +00001# Copyright 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import dataclasses
16import glob
17from importlib import resources
18import logging
19import os
20from pathlib import Path
21import re
22import shutil
23import signal
24import stat
25import subprocess
26import sys
27import tempfile
28import textwrap
29import time
30import unittest
31
32EXII_RETURN_CODE = 0
33INTERRUPTED_RETURN_CODE = 130
34
35
36class RunToolWithLoggingTest(unittest.TestCase):
37
38 @classmethod
39 def setUpClass(cls):
40 super().setUpClass()
41 # Configure to print logging to stdout.
42 logging.basicConfig(filename=None, level=logging.DEBUG)
43 console = logging.StreamHandler(sys.stdout)
44 logging.getLogger("").addHandler(console)
45
46 def setUp(self):
47 super().setUp()
48 self.working_dir = tempfile.TemporaryDirectory()
49 # Run all the tests from working_dir which is our temp Android build top.
50 os.chdir(self.working_dir.name)
51
52 self.logging_script_path = self._import_executable("run_tool_with_logging")
53
54 def tearDown(self):
55 self.working_dir.cleanup()
56 super().tearDown()
57
58 def test_does_not_log_when_logger_var_empty(self):
59 test_tool = TestScript.create(self.working_dir)
60
61 self._run_script_and_wait(f"""
62 export ANDROID_TOOL_LOGGER=""
63 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
64 """)
65
66 test_tool.assert_called_once_with_args("arg1 arg2")
67
68 def test_does_not_log_with_logger_unset(self):
69 test_tool = TestScript.create(self.working_dir)
70
71 self._run_script_and_wait(f"""
72 unset ANDROID_TOOL_LOGGER
73 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
74 """)
75
76 test_tool.assert_called_once_with_args("arg1 arg2")
77
78 def test_log_success_with_logger_enabled(self):
79 test_tool = TestScript.create(self.working_dir)
80 test_logger = TestScript.create(self.working_dir)
81
82 self._run_script_and_wait(f"""
83 export ANDROID_TOOL_LOGGER="{test_logger.executable}"
84 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
85 """)
86
87 test_tool.assert_called_once_with_args("arg1 arg2")
88 expected_logger_args = (
89 "--tool_tag=FAKE_TOOL --start_timestamp=\d+\.\d+ --end_timestamp="
90 "\d+\.\d+ --tool_args=arg1 arg2 --exit_code=0"
91 )
92 test_logger.assert_called_once_with_args(expected_logger_args)
93
94 def test_run_tool_output_is_same_with_and_without_logging(self):
95 test_tool = TestScript.create(self.working_dir, "echo 'tool called'")
96 test_logger = TestScript.create(self.working_dir)
97
98 run_tool_with_logging_stdout, run_tool_with_logging_stderr = (
99 self._run_script_and_wait(f"""
100 export ANDROID_TOOL_LOGGER="{test_logger.executable}"
101 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
102 """)
103 )
104
105 run_tool_without_logging_stdout, run_tool_without_logging_stderr = (
106 self._run_script_and_wait(f"""
107 export ANDROID_TOOL_LOGGER="{test_logger.executable}"
108 {test_tool.executable} arg1 arg2
109 """)
110 )
111
112 self.assertEqual(
113 run_tool_with_logging_stdout, run_tool_without_logging_stdout
114 )
115 self.assertEqual(
116 run_tool_with_logging_stderr, run_tool_without_logging_stderr
117 )
118
119 def test_logger_output_is_suppressed(self):
120 test_tool = TestScript.create(self.working_dir)
121 test_logger = TestScript.create(self.working_dir, "echo 'logger called'")
122
123 run_tool_with_logging_output, _ = self._run_script_and_wait(f"""
124 export ANDROID_TOOL_LOGGER="{test_logger.executable}"
125 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
126 """)
127
128 self.assertNotIn("logger called", run_tool_with_logging_output)
129
130 def test_logger_error_is_suppressed(self):
131 test_tool = TestScript.create(self.working_dir)
132 test_logger = TestScript.create(
133 self.working_dir, "echo 'logger failed' > /dev/stderr; exit 1"
134 )
135
136 _, err = self._run_script_and_wait(f"""
137 export ANDROID_TOOL_LOGGER="{test_logger.executable}"
138 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
139 """)
140
141 self.assertNotIn("logger failed", err)
142
143 def test_log_success_when_tool_interrupted(self):
144 test_tool = TestScript.create(self.working_dir, script_body="sleep 100")
145 test_logger = TestScript.create(self.working_dir)
146
147 process = self._run_script(f"""
148 export ANDROID_TOOL_LOGGER="{test_logger.executable}"
149 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
150 """)
151
152 pgid = os.getpgid(process.pid)
153 # Give sometime for the subprocess to start.
154 time.sleep(1)
155 # Kill the subprocess and any processes created in the same group.
156 os.killpg(pgid, signal.SIGINT)
157
158 returncode, _, _ = self._wait_for_process(process)
159 self.assertEqual(returncode, INTERRUPTED_RETURN_CODE)
160
161 expected_logger_args = (
162 "--tool_tag=FAKE_TOOL --start_timestamp=\d+\.\d+ --end_timestamp="
163 "\d+\.\d+ --tool_args=arg1 arg2 --exit_code=130"
164 )
165 test_logger.assert_called_once_with_args(expected_logger_args)
166
167 def test_logger_can_be_toggled_on(self):
168 test_tool = TestScript.create(self.working_dir)
169 test_logger = TestScript.create(self.working_dir)
170
171 self._run_script_and_wait(f"""
172 export ANDROID_TOOL_LOGGER=""
173 export ANDROID_TOOL_LOGGER="{test_logger.executable}"
174 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
175 """)
176
177 test_logger.assert_called_with_times(1)
178
179 def test_logger_can_be_toggled_off(self):
180 test_tool = TestScript.create(self.working_dir)
181 test_logger = TestScript.create(self.working_dir)
182
183 self._run_script_and_wait(f"""
184 export ANDROID_TOOL_LOGGER="{test_logger.executable}"
185 export ANDROID_TOOL_LOGGER=""
186 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
187 """)
188
189 test_logger.assert_not_called()
190
191 def test_integration_tool_event_logger_dry_run(self):
192 test_tool = TestScript.create(self.working_dir)
193 logger_path = self._import_executable("tool_event_logger")
194
195 self._run_script_and_wait(f"""
Zhuoyao Zhanged4c8f32024-07-17 23:38:40 +0000196 export TMPDIR="{self.working_dir.name}"
Zhuoyao Zhang0edf1d32024-05-21 23:57:11 +0000197 export ANDROID_TOOL_LOGGER="{logger_path}"
198 export ANDROID_TOOL_LOGGER_EXTRA_ARGS="--dry_run"
199 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} arg1 arg2
200 """)
201
202 self._assert_logger_dry_run()
203
204 def test_tool_args_do_not_fail_logger(self):
205 test_tool = TestScript.create(self.working_dir)
206 logger_path = self._import_executable("tool_event_logger")
207
208 self._run_script_and_wait(f"""
Zhuoyao Zhanged4c8f32024-07-17 23:38:40 +0000209 export TMPDIR="{self.working_dir.name}"
Zhuoyao Zhang0edf1d32024-05-21 23:57:11 +0000210 export ANDROID_TOOL_LOGGER="{logger_path}"
211 export ANDROID_TOOL_LOGGER_EXTRA_ARGS="--dry_run"
212 {self.logging_script_path} "FAKE_TOOL" {test_tool.executable} --tool-arg1
213 """)
214
215 self._assert_logger_dry_run()
216
217 def _import_executable(self, executable_name: str) -> Path:
218 # logger = "tool_event_logger"
219 executable_path = Path(self.working_dir.name).joinpath(executable_name)
220 with resources.as_file(
221 resources.files("testdata").joinpath(executable_name)
222 ) as p:
223 shutil.copy(p, executable_path)
224 Path.chmod(executable_path, 0o755)
225 return executable_path
226
227 def _assert_logger_dry_run(self):
228 log_files = glob.glob(self.working_dir.name + "/tool_event_logger_*/*.log")
229 self.assertEqual(len(log_files), 1)
230
231 with open(log_files[0], "r") as f:
232 lines = f.readlines()
233 self.assertEqual(len(lines), 1)
234 self.assertIn("dry run", lines[0])
235
236 def _run_script_and_wait(self, test_script: str) -> tuple[str, str]:
237 process = self._run_script(test_script)
238 returncode, out, err = self._wait_for_process(process)
239 logging.debug("script stdout: %s", out)
240 logging.debug("script stderr: %s", err)
241 self.assertEqual(returncode, EXII_RETURN_CODE)
242 return out, err
243
244 def _run_script(self, test_script: str) -> subprocess.Popen:
245 return subprocess.Popen(
246 test_script,
247 shell=True,
248 stdout=subprocess.PIPE,
249 stderr=subprocess.PIPE,
250 text=True,
251 start_new_session=True,
252 executable="/bin/bash",
253 )
254
255 def _wait_for_process(
256 self, process: subprocess.Popen
257 ) -> tuple[int, str, str]:
258 pgid = os.getpgid(process.pid)
259 out, err = process.communicate()
260 # Wait for all process in the same group to complete since the logger runs
261 # as a separate detached process.
262 self._wait_for_process_group(pgid)
263 return (process.returncode, out, err)
264
265 def _wait_for_process_group(self, pgid: int, timeout: int = 5):
266 """Waits for all subprocesses within the process group to complete."""
267 start_time = time.time()
268 while True:
269 if time.time() - start_time > timeout:
270 raise TimeoutError(
271 f"Process group did not complete after {timeout} seconds"
272 )
273 for pid in os.listdir("/proc"):
274 if pid.isdigit():
275 try:
276 if os.getpgid(int(pid)) == pgid:
277 time.sleep(0.1)
278 break
279 except (FileNotFoundError, PermissionError, ProcessLookupError):
280 pass
281 else:
282 # All processes have completed.
283 break
284
285
286@dataclasses.dataclass
287class TestScript:
288 executable: Path
289 output_file: Path
290
291 def create(temp_dir: Path, script_body: str = ""):
292 with tempfile.NamedTemporaryFile(dir=temp_dir.name, delete=False) as f:
293 output_file = f.name
294
295 with tempfile.NamedTemporaryFile(dir=temp_dir.name, delete=False) as f:
296 executable = f.name
297 executable_contents = textwrap.dedent(f"""
298 #!/bin/bash
299
300 echo "${{@}}" >> {output_file}
301 {script_body}
302 """)
303 f.write(executable_contents.encode("utf-8"))
304
305 Path.chmod(f.name, os.stat(f.name).st_mode | stat.S_IEXEC)
306
307 return TestScript(executable, output_file)
308
309 def assert_called_with_times(self, expected_call_times: int):
310 lines = self._read_contents_from_output_file()
311 assert len(lines) == expected_call_times, (
312 f"Expect to call {expected_call_times} times, but actually called"
313 f" {len(lines)} times."
314 )
315
316 def assert_called_with_args(self, expected_args: str):
317 lines = self._read_contents_from_output_file()
318 assert len(lines) > 0
319 assert re.search(expected_args, lines[0]), (
320 f"Expect to call with args {expected_args}, but actually called with"
321 f" args {lines[0]}."
322 )
323
324 def assert_not_called(self):
325 self.assert_called_with_times(0)
326
327 def assert_called_once_with_args(self, expected_args: str):
328 self.assert_called_with_times(1)
329 self.assert_called_with_args(expected_args)
330
331 def _read_contents_from_output_file(self) -> list[str]:
332 with open(self.output_file, "r") as f:
333 return f.readlines()
334
335
336if __name__ == "__main__":
337 unittest.main()