blob: 5be4beee519498d75346ed0998ff50354383a592 [file] [log] [blame]
Zhuoyao Zhang53359552024-09-16 23:58:11 +00001# Copyright 2024, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Unittests for DaemonManager."""
16
17import logging
18import multiprocessing
19import os
20import pathlib
21import signal
22import subprocess
23import sys
24import tempfile
25import time
26import unittest
27from unittest import mock
28from edit_monitor import daemon_manager
29
30TEST_BINARY_FILE = '/path/to/test_binary'
31TEST_PID_FILE_PATH = (
32 '587239c2d1050afdf54512e2d799f3b929f86b43575eb3c7b4bab105dd9bd25e.lock'
33)
34
35
36def example_daemon(output_file):
37 with open(output_file, 'w') as f:
38 f.write('running daemon target')
39
40
41def long_running_daemon():
42 while True:
43 time.sleep(1)
44
45
46class DaemonManagerTest(unittest.TestCase):
47
48 @classmethod
49 def setUpClass(cls):
50 super().setUpClass()
51 # Configure to print logging to stdout.
52 logging.basicConfig(filename=None, level=logging.DEBUG)
53 console = logging.StreamHandler(sys.stdout)
54 logging.getLogger('').addHandler(console)
55
56 def setUp(self):
57 super().setUp()
58 self.original_tempdir = tempfile.tempdir
59 self.working_dir = tempfile.TemporaryDirectory()
60 # Sets the tempdir under the working dir so any temp files created during
61 # tests will be cleaned.
62 tempfile.tempdir = self.working_dir.name
63
64 def tearDown(self):
65 # Cleans up any child processes left by the tests.
66 self._cleanup_child_processes()
67 self.working_dir.cleanup()
68 # Restores tempdir.
69 tempfile.tempdir = self.original_tempdir
70 super().tearDown()
71
72 def test_start_success(self):
73 damone_output_file = tempfile.NamedTemporaryFile(
74 dir=self.working_dir.name, delete=False
75 )
76 dm = daemon_manager.DaemonManager(
77 TEST_BINARY_FILE,
78 daemon_target=example_daemon,
79 daemon_args=(damone_output_file.name,),
80 )
81 dm.start()
82 dm.daemon_process.join()
83
84 # Verifies the expected pid file is created.
85 expected_pid_file_path = pathlib.Path(self.working_dir.name).joinpath(
86 'edit_monitor', TEST_PID_FILE_PATH
87 )
88 self.assertEqual(dm.pid_file_path, expected_pid_file_path)
89 self.assertTrue(expected_pid_file_path.exists())
90
91 # Verifies the daemon process is executed successfully.
92 with open(damone_output_file.name, 'r') as f:
93 contents = f.read()
94 self.assertEqual(contents, 'running daemon target')
95
96 def test_start_failed_to_write_pidfile(self):
97 pid_file_path_dir = pathlib.Path(self.working_dir.name).joinpath(
98 'edit_monitor'
99 )
100 pid_file_path_dir.mkdir(parents=True, exist_ok=True)
101 # Makes the directory read-only so write pidfile will fail.
102 os.chmod(pid_file_path_dir, 0o555)
103
104 dm = daemon_manager.DaemonManager(TEST_BINARY_FILE)
105 dm.start()
106
107 # Verifies no daemon process is started.
108 self.assertIsNone(dm.daemon_process)
109
110 def test_start_failed_to_start_daemon_process(self):
111 dm = daemon_manager.DaemonManager(
112 TEST_BINARY_FILE, daemon_target='wrong_target', daemon_args=(1)
113 )
114 dm.start()
115
116 # Verifies no daemon process is started.
117 self.assertIsNone(dm.daemon_process)
118
119 def test_stop_success(self):
120 dm = daemon_manager.DaemonManager(
121 TEST_BINARY_FILE, daemon_target=long_running_daemon
122 )
123 dm.start()
124 dm.stop()
125
126 self.assert_no_subprocess_running()
127 self.assertFalse(dm.pid_file_path.exists())
128
129 @mock.patch('os.kill')
130 def test_stop_failed_to_kill_daemon_process(self, mock_kill):
131 mock_kill.side_effect = OSError('Unknown OSError')
132 dm = daemon_manager.DaemonManager(
133 TEST_BINARY_FILE, daemon_target=long_running_daemon
134 )
135 dm.start()
136 dm.stop()
137
138 self.assertTrue(dm.daemon_process.is_alive())
139 self.assertTrue(dm.pid_file_path.exists())
140
141 @mock.patch('os.remove')
142 def test_stop_failed_to_remove_pidfile(self, mock_remove):
143 mock_remove.side_effect = OSError('Unknown OSError')
144
145 dm = daemon_manager.DaemonManager(
146 TEST_BINARY_FILE, daemon_target=long_running_daemon
147 )
148 dm.start()
149 dm.stop()
150
151 self.assert_no_subprocess_running()
152 self.assertTrue(dm.pid_file_path.exists())
153
154 def assert_no_subprocess_running(self):
155 child_pids = self._get_child_processes(os.getpid())
156 for child_pid in child_pids:
157 self.assertFalse(
158 self._is_process_alive(child_pid), f'process {child_pid} still alive'
159 )
160
161 def _get_child_processes(self, parent_pid):
162 try:
163 output = subprocess.check_output(
164 ['ps', '-o', 'pid,ppid', '--no-headers'], text=True
165 )
166
167 child_processes = []
168 for line in output.splitlines():
169 pid, ppid = line.split()
170 if int(ppid) == parent_pid:
171 child_processes.append(int(pid))
172 return child_processes
173 except subprocess.CalledProcessError as e:
174 self.fail(f'failed to get child process, error: {e}')
175
176 def _is_process_alive(self, pid):
177 try:
178 output = subprocess.check_output(
179 ['ps', '-p', str(pid), '-o', 'state='], text=True
180 ).strip()
181 state = output.split()[0]
182 return state != 'Z' # Check if the state is not 'Z' (zombie)
183 except subprocess.CalledProcessError:
184 return False
185
186 def _cleanup_child_processes(self):
187 child_pids = self._get_child_processes(os.getpid())
188 for child_pid in child_pids:
189 try:
190 os.kill(child_pid, signal.SIGKILL)
191 except ProcessLookupError:
192 # process already terminated
193 pass
194
195
196if __name__ == '__main__':
197 unittest.main()