blob: 496a0ffff37250a1f01d150de3627c8c0f09c64b [file] [log] [blame]
Josh Gao93dee962020-02-06 17:52:38 -08001#!/usr/bin/env python3
Josh Gao191c1542015-12-09 11:26:11 -08002# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
Josh Gaoc970aef2018-03-19 15:35:11 -070034import threading
Josh Gaofe50bb72016-06-22 18:27:22 -070035import time
Josh Gao191c1542015-12-09 11:26:11 -080036import unittest
37
Josh Gao74b7ec72019-01-11 14:42:08 -080038from datetime import datetime
39
Josh Gao191c1542015-12-09 11:26:11 -080040import adb
41
Josh Gao191c1542015-12-09 11:26:11 -080042def requires_root(func):
43 def wrapper(self, *args):
44 if self.device.get_prop('ro.debuggable') != '1':
45 raise unittest.SkipTest('requires rootable build')
46
47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48 if not was_root:
49 self.device.root()
50 self.device.wait()
51
52 try:
53 func(self, *args)
54 finally:
55 if not was_root:
56 self.device.unroot()
57 self.device.wait()
58
59 return wrapper
60
61
62def requires_non_root(func):
63 def wrapper(self, *args):
64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65 if was_root:
66 self.device.unroot()
67 self.device.wait()
68
69 try:
70 func(self, *args)
71 finally:
72 if was_root:
73 self.device.root()
74 self.device.wait()
75
76 return wrapper
77
78
Josh Gao191c1542015-12-09 11:26:11 -080079class DeviceTest(unittest.TestCase):
Josh Gao49ba5582020-03-26 19:33:25 -070080 device = adb.get_device()
Josh Gao191c1542015-12-09 11:26:11 -080081
82
Josh Gaoe4d66fc2020-02-04 12:32:43 -080083class AbbTest(DeviceTest):
84 def test_smoke(self):
85 result = subprocess.run(['adb', 'abb'], capture_output=True)
86 self.assertEqual(1, result.returncode)
87 expected_output = b"cmd: No service specified; use -l to list all services\n"
88 self.assertEqual(expected_output, result.stderr)
89
Josh Gao191c1542015-12-09 11:26:11 -080090class ForwardReverseTest(DeviceTest):
91 def _test_no_rebind(self, description, direction_list, direction,
92 direction_no_rebind, direction_remove_all):
93 msg = direction_list()
94 self.assertEqual('', msg.strip(),
95 description + ' list must be empty to run this test.')
96
97 # Use --no-rebind with no existing binding
98 direction_no_rebind('tcp:5566', 'tcp:6655')
99 msg = direction_list()
100 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
101
102 # Use --no-rebind with existing binding
103 with self.assertRaises(subprocess.CalledProcessError):
104 direction_no_rebind('tcp:5566', 'tcp:6677')
105 msg = direction_list()
106 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
107 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
108
109 # Use the absence of --no-rebind with existing binding
110 direction('tcp:5566', 'tcp:6677')
111 msg = direction_list()
112 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
113 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
114
115 direction_remove_all()
116 msg = direction_list()
117 self.assertEqual('', msg.strip())
118
119 def test_forward_no_rebind(self):
120 self._test_no_rebind('forward', self.device.forward_list,
121 self.device.forward, self.device.forward_no_rebind,
122 self.device.forward_remove_all)
123
124 def test_reverse_no_rebind(self):
125 self._test_no_rebind('reverse', self.device.reverse_list,
126 self.device.reverse, self.device.reverse_no_rebind,
127 self.device.reverse_remove_all)
128
129 def test_forward(self):
130 msg = self.device.forward_list()
131 self.assertEqual('', msg.strip(),
132 'Forwarding list must be empty to run this test.')
133 self.device.forward('tcp:5566', 'tcp:6655')
134 msg = self.device.forward_list()
135 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
136 self.device.forward('tcp:7788', 'tcp:8877')
137 msg = self.device.forward_list()
138 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
139 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
140 self.device.forward_remove('tcp:5566')
141 msg = self.device.forward_list()
142 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
143 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
144 self.device.forward_remove_all()
145 msg = self.device.forward_list()
146 self.assertEqual('', msg.strip())
147
Josh Gao727b07b2019-09-13 00:12:26 +0800148 def test_forward_old_protocol(self):
149 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
150
151 msg = self.device.forward_list()
152 self.assertEqual('', msg.strip(),
153 'Forwarding list must be empty to run this test.')
154
155 s = socket.create_connection(("localhost", 5037))
156 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
157 cmd = b"%04x%s" % (len(service), service)
158 s.sendall(cmd)
159
160 msg = self.device.forward_list()
161 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
162
163 self.device.forward_remove_all()
164 msg = self.device.forward_list()
165 self.assertEqual('', msg.strip())
166
David Purselleaae97e2016-04-07 11:25:48 -0700167 def test_forward_tcp_port_0(self):
168 self.assertEqual('', self.device.forward_list().strip(),
169 'Forwarding list must be empty to run this test.')
170
171 try:
172 # If resolving TCP port 0 is supported, `adb forward` will print
173 # the actual port number.
174 port = self.device.forward('tcp:0', 'tcp:8888').strip()
175 if not port:
176 raise unittest.SkipTest('Forwarding tcp:0 is not available.')
177
178 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
179 self.device.forward_list()))
180 finally:
181 self.device.forward_remove_all()
182
Josh Gao191c1542015-12-09 11:26:11 -0800183 def test_reverse(self):
184 msg = self.device.reverse_list()
185 self.assertEqual('', msg.strip(),
186 'Reverse forwarding list must be empty to run this test.')
187 self.device.reverse('tcp:5566', 'tcp:6655')
188 msg = self.device.reverse_list()
189 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
190 self.device.reverse('tcp:7788', 'tcp:8877')
191 msg = self.device.reverse_list()
192 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
193 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
194 self.device.reverse_remove('tcp:5566')
195 msg = self.device.reverse_list()
196 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
197 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
198 self.device.reverse_remove_all()
199 msg = self.device.reverse_list()
200 self.assertEqual('', msg.strip())
201
David Purselleaae97e2016-04-07 11:25:48 -0700202 def test_reverse_tcp_port_0(self):
203 self.assertEqual('', self.device.reverse_list().strip(),
204 'Reverse list must be empty to run this test.')
205
206 try:
207 # If resolving TCP port 0 is supported, `adb reverse` will print
208 # the actual port number.
209 port = self.device.reverse('tcp:0', 'tcp:8888').strip()
210 if not port:
211 raise unittest.SkipTest('Reversing tcp:0 is not available.')
212
213 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
214 self.device.reverse_list()))
215 finally:
216 self.device.reverse_remove_all()
217
Josh Gao191c1542015-12-09 11:26:11 -0800218 def test_forward_reverse_echo(self):
219 """Send data through adb forward and read it back via adb reverse"""
220 forward_port = 12345
221 reverse_port = forward_port + 1
Josh Gao255c5c82016-03-03 14:49:02 -0800222 forward_spec = 'tcp:' + str(forward_port)
223 reverse_spec = 'tcp:' + str(reverse_port)
Josh Gao191c1542015-12-09 11:26:11 -0800224 forward_setup = False
225 reverse_setup = False
226
227 try:
228 # listen on localhost:forward_port, connect to remote:forward_port
229 self.device.forward(forward_spec, forward_spec)
230 forward_setup = True
231 # listen on remote:forward_port, connect to localhost:reverse_port
232 self.device.reverse(forward_spec, reverse_spec)
233 reverse_setup = True
234
235 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
236 with contextlib.closing(listener):
237 # Use SO_REUSEADDR so that subsequent runs of the test can grab
238 # the port even if it is in TIME_WAIT.
239 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
240
241 # Listen on localhost:reverse_port before connecting to
242 # localhost:forward_port because that will cause adb to connect
243 # back to localhost:reverse_port.
244 listener.bind(('127.0.0.1', reverse_port))
245 listener.listen(4)
246
247 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
248 with contextlib.closing(client):
249 # Connect to the listener.
250 client.connect(('127.0.0.1', forward_port))
251
252 # Accept the client connection.
253 accepted_connection, addr = listener.accept()
254 with contextlib.closing(accepted_connection) as server:
Josh Gao93dee962020-02-06 17:52:38 -0800255 data = b'hello'
Josh Gao191c1542015-12-09 11:26:11 -0800256
257 # Send data into the port setup by adb forward.
258 client.sendall(data)
259 # Explicitly close() so that server gets EOF.
260 client.close()
261
262 # Verify that the data came back via adb reverse.
Josh Gao93dee962020-02-06 17:52:38 -0800263 self.assertEqual(data, server.makefile().read().encode("utf8"))
Josh Gao191c1542015-12-09 11:26:11 -0800264 finally:
265 if reverse_setup:
266 self.device.reverse_remove(forward_spec)
267 if forward_setup:
268 self.device.forward_remove(forward_spec)
269
270
271class ShellTest(DeviceTest):
272 def _interactive_shell(self, shell_args, input):
273 """Runs an interactive adb shell.
274
275 Args:
276 shell_args: List of string arguments to `adb shell`.
Josh Gao93dee962020-02-06 17:52:38 -0800277 input: bytes input to send to the interactive shell.
Josh Gao191c1542015-12-09 11:26:11 -0800278
279 Returns:
280 The remote exit code.
281
282 Raises:
283 unittest.SkipTest: The device doesn't support exit codes.
284 """
David Pursellcf467412016-04-26 13:25:57 -0700285 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800286 raise unittest.SkipTest('exit codes are unavailable on this device')
287
288 proc = subprocess.Popen(
289 self.device.adb_cmd + ['shell'] + shell_args,
290 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
291 stderr=subprocess.PIPE)
292 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
293 # to explicitly add an exit command to close the session from the device
294 # side, plus the necessary newline to complete the interactive command.
Josh Gao93dee962020-02-06 17:52:38 -0800295 proc.communicate(input + b'; exit\n')
Josh Gao191c1542015-12-09 11:26:11 -0800296 return proc.returncode
297
298 def test_cat(self):
299 """Check that we can at least cat a file."""
300 out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
301 elements = out.split()
302 self.assertEqual(len(elements), 2)
303
304 uptime, idle = elements
305 self.assertGreater(float(uptime), 0.0)
306 self.assertGreater(float(idle), 0.0)
307
308 def test_throws_on_failure(self):
309 self.assertRaises(adb.ShellError, self.device.shell, ['false'])
310
311 def test_output_not_stripped(self):
312 out = self.device.shell(['echo', 'foo'])[0]
313 self.assertEqual(out, 'foo' + self.device.linesep)
314
Josh Gaoa019f782017-06-16 15:34:34 -0700315 def test_shell_command_length(self):
316 # Devices that have shell_v2 should be able to handle long commands.
317 if self.device.has_shell_protocol():
318 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
319 self.assertEqual(rc, 0)
320 self.assertTrue(out == ('x' * 16384 + '\n'))
321
Josh Gao191c1542015-12-09 11:26:11 -0800322 def test_shell_nocheck_failure(self):
323 rc, out, _ = self.device.shell_nocheck(['false'])
324 self.assertNotEqual(rc, 0)
325 self.assertEqual(out, '')
326
327 def test_shell_nocheck_output_not_stripped(self):
328 rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
329 self.assertEqual(rc, 0)
330 self.assertEqual(out, 'foo' + self.device.linesep)
331
332 def test_can_distinguish_tricky_results(self):
333 # If result checking on ADB shell is naively implemented as
334 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
335 # output from the result for a cmd of `echo -n 1`.
336 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
337 self.assertEqual(rc, 0)
338 self.assertEqual(out, '1')
339
340 def test_line_endings(self):
341 """Ensure that line ending translation is not happening in the pty.
342
343 Bug: http://b/19735063
344 """
345 output = self.device.shell(['uname'])[0]
346 self.assertEqual(output, 'Linux' + self.device.linesep)
347
348 def test_pty_logic(self):
349 """Tests that a PTY is allocated when it should be.
350
Elliott Hughescabfa112016-10-19 14:47:11 -0700351 PTY allocation behavior should match ssh.
Josh Gao191c1542015-12-09 11:26:11 -0800352 """
Josh Gao191c1542015-12-09 11:26:11 -0800353 def check_pty(args):
354 """Checks adb shell PTY allocation.
355
356 Tests |args| for terminal and non-terminal stdin.
357
358 Args:
359 args: -Tt args in a list (e.g. ['-t', '-t']).
360
361 Returns:
362 A tuple (<terminal>, <non-terminal>). True indicates
363 the corresponding shell allocated a remote PTY.
364 """
365 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
366
367 terminal = subprocess.Popen(
368 test_cmd, stdin=None,
369 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
370 terminal.communicate()
371
372 non_terminal = subprocess.Popen(
373 test_cmd, stdin=subprocess.PIPE,
374 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
375 non_terminal.communicate()
376
377 return (terminal.returncode == 0, non_terminal.returncode == 0)
378
379 # -T: never allocate PTY.
380 self.assertEqual((False, False), check_pty(['-T']))
381
Elliott Hughescabfa112016-10-19 14:47:11 -0700382 # These tests require a new device.
383 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
384 # No args: PTY only if stdin is a terminal and shell is interactive,
385 # which is difficult to reliably test from a script.
386 self.assertEqual((False, False), check_pty([]))
Josh Gao191c1542015-12-09 11:26:11 -0800387
Elliott Hughescabfa112016-10-19 14:47:11 -0700388 # -t: PTY if stdin is a terminal.
389 self.assertEqual((True, False), check_pty(['-t']))
Josh Gao191c1542015-12-09 11:26:11 -0800390
391 # -t -t: always allocate PTY.
392 self.assertEqual((True, True), check_pty(['-t', '-t']))
393
Elliott Hughescabfa112016-10-19 14:47:11 -0700394 # -tt: always allocate PTY, POSIX style (http://b/32216152).
395 self.assertEqual((True, True), check_pty(['-tt']))
396
397 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
398 # we follow the man page instead.
399 self.assertEqual((True, True), check_pty(['-ttt']))
400
401 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
402 self.assertEqual((True, True), check_pty(['-ttx']))
403
404 # -Ttt: -tt cancels out -T.
405 self.assertEqual((True, True), check_pty(['-Ttt']))
406
407 # -ttT: -T cancels out -tt.
408 self.assertEqual((False, False), check_pty(['-ttT']))
409
Josh Gao191c1542015-12-09 11:26:11 -0800410 def test_shell_protocol(self):
411 """Tests the shell protocol on the device.
412
413 If the device supports shell protocol, this gives us the ability
414 to separate stdout/stderr and return the exit code directly.
415
416 Bug: http://b/19734861
417 """
David Pursellcf467412016-04-26 13:25:57 -0700418 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800419 raise unittest.SkipTest('shell protocol unsupported on this device')
420
421 # Shell protocol should be used by default.
422 result = self.device.shell_nocheck(
423 shlex.split('echo foo; echo bar >&2; exit 17'))
424 self.assertEqual(17, result[0])
425 self.assertEqual('foo' + self.device.linesep, result[1])
426 self.assertEqual('bar' + self.device.linesep, result[2])
427
Josh Gao93dee962020-02-06 17:52:38 -0800428 self.assertEqual(17, self._interactive_shell([], b'exit 17'))
Josh Gao191c1542015-12-09 11:26:11 -0800429
430 # -x flag should disable shell protocol.
431 result = self.device.shell_nocheck(
432 shlex.split('-x echo foo; echo bar >&2; exit 17'))
433 self.assertEqual(0, result[0])
434 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
435 self.assertEqual('', result[2])
436
Josh Gao93dee962020-02-06 17:52:38 -0800437 self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17'))
Josh Gao191c1542015-12-09 11:26:11 -0800438
439 def test_non_interactive_sigint(self):
440 """Tests that SIGINT in a non-interactive shell kills the process.
441
442 This requires the shell protocol in order to detect the broken
443 pipe; raw data transfer mode will only see the break once the
444 subprocess tries to read or write.
445
446 Bug: http://b/23825725
447 """
David Pursellcf467412016-04-26 13:25:57 -0700448 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800449 raise unittest.SkipTest('shell protocol unsupported on this device')
450
451 # Start a long-running process.
452 sleep_proc = subprocess.Popen(
453 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
454 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
455 stderr=subprocess.STDOUT)
Josh Gao93dee962020-02-06 17:52:38 -0800456 remote_pid = sleep_proc.stdout.readline().strip().decode("utf8")
Josh Gao191c1542015-12-09 11:26:11 -0800457 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
458 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
459
460 # Verify that the process is running, send signal, verify it stopped.
461 self.device.shell(proc_query)
462 os.kill(sleep_proc.pid, signal.SIGINT)
463 sleep_proc.communicate()
Josh Gaoe76b9f32016-10-21 12:40:42 -0700464
465 # It can take some time for the process to receive the signal and die.
466 end_time = time.time() + 3
467 while self.device.shell_nocheck(proc_query)[0] != 1:
468 self.assertFalse(time.time() > end_time,
469 'subprocess failed to terminate in time')
Josh Gao191c1542015-12-09 11:26:11 -0800470
471 def test_non_interactive_stdin(self):
472 """Tests that non-interactive shells send stdin."""
David Pursellcf467412016-04-26 13:25:57 -0700473 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800474 raise unittest.SkipTest('non-interactive stdin unsupported '
475 'on this device')
476
477 # Test both small and large inputs.
Josh Gao93dee962020-02-06 17:52:38 -0800478 small_input = b'foo'
479 characters = [c.encode("utf8") for c in string.ascii_letters + string.digits]
480 large_input = b'\n'.join(characters)
481
Josh Gao191c1542015-12-09 11:26:11 -0800482
483 for input in (small_input, large_input):
484 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
485 stdin=subprocess.PIPE,
486 stdout=subprocess.PIPE,
487 stderr=subprocess.PIPE)
488 stdout, stderr = proc.communicate(input)
489 self.assertEqual(input.splitlines(), stdout.splitlines())
Josh Gao93dee962020-02-06 17:52:38 -0800490 self.assertEqual(b'', stderr)
Josh Gao191c1542015-12-09 11:26:11 -0800491
Josh Gaofe50bb72016-06-22 18:27:22 -0700492 def test_sighup(self):
493 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
494 log_path = "/data/local/tmp/adb_signal_test.log"
495
496 # Clear the output file.
497 self.device.shell_nocheck(["echo", ">", log_path])
498
499 script = """
500 trap "echo SIGINT > {path}; exit 0" SIGINT
501 trap "echo SIGHUP > {path}; exit 0" SIGHUP
502 echo Waiting
Josh Gao470622f2016-10-21 13:17:32 -0700503 read
Josh Gaofe50bb72016-06-22 18:27:22 -0700504 """.format(path=log_path)
505
506 script = ";".join([x.strip() for x in script.strip().splitlines()])
507
Josh Gao470622f2016-10-21 13:17:32 -0700508 process = self.device.shell_popen([script], kill_atexit=False,
509 stdin=subprocess.PIPE,
510 stdout=subprocess.PIPE)
Josh Gaofe50bb72016-06-22 18:27:22 -0700511
Josh Gao93dee962020-02-06 17:52:38 -0800512 self.assertEqual(b"Waiting\n", process.stdout.readline())
Josh Gaofe50bb72016-06-22 18:27:22 -0700513 process.send_signal(signal.SIGINT)
514 process.wait()
515
516 # Waiting for the local adb to finish is insufficient, since it hangs
517 # up immediately.
Josh Gao470622f2016-10-21 13:17:32 -0700518 time.sleep(1)
Josh Gaofe50bb72016-06-22 18:27:22 -0700519
520 stdout, _ = self.device.shell(["cat", log_path])
521 self.assertEqual(stdout.strip(), "SIGHUP")
522
Josh Gaoc970aef2018-03-19 15:35:11 -0700523 def test_exit_stress(self):
524 """Hammer `adb shell exit 42` with multiple threads."""
525 thread_count = 48
526 result = dict()
527 def hammer(thread_idx, thread_count, result):
528 success = True
529 for i in range(thread_idx, 240, thread_count):
530 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
531 if ret != i % 256:
532 success = False
533 break
534 result[thread_idx] = success
535
536 threads = []
537 for i in range(thread_count):
538 thread = threading.Thread(target=hammer, args=(i, thread_count, result))
539 thread.start()
540 threads.append(thread)
541 for thread in threads:
542 thread.join()
Josh Gao93dee962020-02-06 17:52:38 -0800543 for i, success in result.items():
Josh Gaoc970aef2018-03-19 15:35:11 -0700544 self.assertTrue(success)
545
Josh Gao63da8e62019-12-16 17:13:51 -0800546 def disabled_test_parallel(self):
547 """Spawn a bunch of `adb shell` instances in parallel.
548
549 This was broken historically due to the use of select, which only works
550 for fds that are numerically less than 1024.
551
552 Bug: http://b/141955761"""
553
554 n_procs = 2048
555 procs = dict()
Josh Gao93dee962020-02-06 17:52:38 -0800556 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800557 procs[i] = subprocess.Popen(
558 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
559 stdin=subprocess.PIPE,
560 stdout=subprocess.PIPE
561 )
562
Josh Gao93dee962020-02-06 17:52:38 -0800563 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800564 procs[i].stdin.write("%d\n" % i)
565
Josh Gao93dee962020-02-06 17:52:38 -0800566 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800567 response = procs[i].stdout.readline()
568 assert(response == "%d\n" % i)
569
Josh Gao93dee962020-02-06 17:52:38 -0800570 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800571 procs[i].stdin.write("%d\n" % (i % 256))
572
Josh Gao93dee962020-02-06 17:52:38 -0800573 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800574 assert(procs[i].wait() == i % 256)
575
Josh Gao191c1542015-12-09 11:26:11 -0800576
577class ArgumentEscapingTest(DeviceTest):
578 def test_shell_escaping(self):
579 """Make sure that argument escaping is somewhat sane."""
580
581 # http://b/19734868
582 # Note that this actually matches ssh(1)'s behavior --- it's
583 # converted to `sh -c echo hello; echo world` which sh interprets
584 # as `sh -c echo` (with an argument to that shell of "hello"),
585 # and then `echo world` back in the first shell.
586 result = self.device.shell(
587 shlex.split("sh -c 'echo hello; echo world'"))[0]
588 result = result.splitlines()
589 self.assertEqual(['', 'world'], result)
590 # If you really wanted "hello" and "world", here's what you'd do:
591 result = self.device.shell(
592 shlex.split(r'echo hello\;echo world'))[0].splitlines()
593 self.assertEqual(['hello', 'world'], result)
594
595 # http://b/15479704
596 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
597 self.assertEqual('t', result)
598 result = self.device.shell(
599 shlex.split("sh -c 'true && echo t'"))[0].strip()
600 self.assertEqual('t', result)
601
602 # http://b/20564385
603 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
604 self.assertEqual('t', result)
605 result = self.device.shell(
606 shlex.split(r'echo -n 123\;uname'))[0].strip()
607 self.assertEqual('123Linux', result)
608
609 def test_install_argument_escaping(self):
610 """Make sure that install argument escaping works."""
611 # http://b/20323053, http://b/3090932.
Josh Gao93dee962020-02-06 17:52:38 -0800612 for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"):
Josh Gao191c1542015-12-09 11:26:11 -0800613 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
614 delete=False)
615 tf.close()
616
617 # Installing bogus .apks fails if the device supports exit codes.
618 try:
Josh Gao93dee962020-02-06 17:52:38 -0800619 output = self.device.install(tf.name.decode("utf8"))
Josh Gao191c1542015-12-09 11:26:11 -0800620 except subprocess.CalledProcessError as e:
621 output = e.output
622
623 self.assertIn(file_suffix, output)
624 os.remove(tf.name)
625
626
627class RootUnrootTest(DeviceTest):
628 def _test_root(self):
629 message = self.device.root()
630 if 'adbd cannot run as root in production builds' in message:
631 return
632 self.device.wait()
633 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
634
635 def _test_unroot(self):
636 self.device.unroot()
637 self.device.wait()
638 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
639
640 def test_root_unroot(self):
641 """Make sure that adb root and adb unroot work, using id(1)."""
642 if self.device.get_prop('ro.debuggable') != '1':
643 raise unittest.SkipTest('requires rootable build')
644
645 original_user = self.device.shell(['id', '-un'])[0].strip()
646 try:
647 if original_user == 'root':
648 self._test_unroot()
649 self._test_root()
650 elif original_user == 'shell':
651 self._test_root()
652 self._test_unroot()
653 finally:
654 if original_user == 'root':
655 self.device.root()
656 else:
657 self.device.unroot()
658 self.device.wait()
659
660
661class TcpIpTest(DeviceTest):
662 def test_tcpip_failure_raises(self):
663 """adb tcpip requires a port.
664
665 Bug: http://b/22636927
666 """
667 self.assertRaises(
668 subprocess.CalledProcessError, self.device.tcpip, '')
669 self.assertRaises(
670 subprocess.CalledProcessError, self.device.tcpip, 'foo')
671
672
673class SystemPropertiesTest(DeviceTest):
674 def test_get_prop(self):
675 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
676
677 @requires_root
678 def test_set_prop(self):
679 prop_name = 'foo.bar'
680 self.device.shell(['setprop', prop_name, '""'])
681
682 self.device.set_prop(prop_name, 'qux')
683 self.assertEqual(
684 self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
685
686
687def compute_md5(string):
688 hsh = hashlib.md5()
689 hsh.update(string)
690 return hsh.hexdigest()
691
692
693def get_md5_prog(device):
694 """Older platforms (pre-L) had the name md5 rather than md5sum."""
695 try:
696 device.shell(['md5sum', '/proc/uptime'])
697 return 'md5sum'
698 except adb.ShellError:
699 return 'md5'
700
701
702class HostFile(object):
703 def __init__(self, handle, checksum):
704 self.handle = handle
705 self.checksum = checksum
706 self.full_path = handle.name
707 self.base_name = os.path.basename(self.full_path)
708
709
710class DeviceFile(object):
711 def __init__(self, checksum, full_path):
712 self.checksum = checksum
713 self.full_path = full_path
714 self.base_name = posixpath.basename(self.full_path)
715
716
717def make_random_host_files(in_dir, num_files):
718 min_size = 1 * (1 << 10)
719 max_size = 16 * (1 << 10)
720
721 files = []
Josh Gao93dee962020-02-06 17:52:38 -0800722 for _ in range(num_files):
Josh Gao191c1542015-12-09 11:26:11 -0800723 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
724
725 size = random.randrange(min_size, max_size, 1024)
726 rand_str = os.urandom(size)
727 file_handle.write(rand_str)
728 file_handle.flush()
729 file_handle.close()
730
731 md5 = compute_md5(rand_str)
732 files.append(HostFile(file_handle, md5))
733 return files
734
735
736def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
737 min_size = 1 * (1 << 10)
738 max_size = 16 * (1 << 10)
739
740 files = []
Josh Gao93dee962020-02-06 17:52:38 -0800741 for file_num in range(num_files):
Josh Gao191c1542015-12-09 11:26:11 -0800742 size = random.randrange(min_size, max_size, 1024)
743
744 base_name = prefix + str(file_num)
745 full_path = posixpath.join(in_dir, base_name)
746
747 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
748 'bs={}'.format(size), 'count=1'])
749 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
750
751 files.append(DeviceFile(dev_md5, full_path))
752 return files
753
754
Josh Gao49ba5582020-03-26 19:33:25 -0700755class FileOperationsTest:
756 class Base(DeviceTest):
757 SCRATCH_DIR = '/data/local/tmp'
758 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
759 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
Josh Gao191c1542015-12-09 11:26:11 -0800760
Josh Gao49ba5582020-03-26 19:33:25 -0700761 def setUp(self):
762 self.previous_env = os.environ.get("ADB_COMPRESSION")
763 os.environ["ADB_COMPRESSION"] = self.compression
Josh Gao191c1542015-12-09 11:26:11 -0800764
Josh Gao49ba5582020-03-26 19:33:25 -0700765 def tearDown(self):
766 if self.previous_env is None:
767 del os.environ["ADB_COMPRESSION"]
768 else:
769 os.environ["ADB_COMPRESSION"] = self.previous_env
Josh Gao191c1542015-12-09 11:26:11 -0800770
Josh Gao49ba5582020-03-26 19:33:25 -0700771 def _verify_remote(self, checksum, remote_path):
772 dev_md5, _ = self.device.shell([get_md5_prog(self.device),
773 remote_path])[0].split()
774 self.assertEqual(checksum, dev_md5)
Josh Gao191c1542015-12-09 11:26:11 -0800775
Josh Gao49ba5582020-03-26 19:33:25 -0700776 def _verify_local(self, checksum, local_path):
777 with open(local_path, 'rb') as host_file:
778 host_md5 = compute_md5(host_file.read())
779 self.assertEqual(host_md5, checksum)
Josh Gao191c1542015-12-09 11:26:11 -0800780
Josh Gao49ba5582020-03-26 19:33:25 -0700781 def test_push(self):
782 """Push a randomly generated file to specified device."""
783 kbytes = 512
784 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
785 rand_str = os.urandom(1024 * kbytes)
786 tmp.write(rand_str)
787 tmp.close()
Josh Gao191c1542015-12-09 11:26:11 -0800788
Josh Gao49ba5582020-03-26 19:33:25 -0700789 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
790 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
Josh Gao191c1542015-12-09 11:26:11 -0800791
Josh Gao49ba5582020-03-26 19:33:25 -0700792 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
793 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
Josh Gao191c1542015-12-09 11:26:11 -0800794
Josh Gao49ba5582020-03-26 19:33:25 -0700795 os.remove(tmp.name)
Josh Gao191c1542015-12-09 11:26:11 -0800796
Josh Gao49ba5582020-03-26 19:33:25 -0700797 def test_push_dir(self):
798 """Push a randomly generated directory of files to the device."""
Josh Gao191c1542015-12-09 11:26:11 -0800799 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
Josh Gao49ba5582020-03-26 19:33:25 -0700800 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
Josh Gao191c1542015-12-09 11:26:11 -0800801
Josh Gao49ba5582020-03-26 19:33:25 -0700802 try:
803 host_dir = tempfile.mkdtemp()
Josh Gao191c1542015-12-09 11:26:11 -0800804
Josh Gao49ba5582020-03-26 19:33:25 -0700805 # Make sure the temp directory isn't setuid, or else adb will complain.
806 os.chmod(host_dir, 0o700)
Josh Gao191c1542015-12-09 11:26:11 -0800807
Josh Gao49ba5582020-03-26 19:33:25 -0700808 # Create 32 random files.
809 temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
810 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
Josh Gao191c1542015-12-09 11:26:11 -0800811
Josh Gao49ba5582020-03-26 19:33:25 -0700812 for temp_file in temp_files:
813 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
814 os.path.basename(host_dir),
815 temp_file.base_name)
816 self._verify_remote(temp_file.checksum, remote_path)
817 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
818 finally:
819 if host_dir is not None:
820 shutil.rmtree(host_dir)
Josh Gao191c1542015-12-09 11:26:11 -0800821
Josh Gao49ba5582020-03-26 19:33:25 -0700822 def disabled_test_push_empty(self):
823 """Push an empty directory to the device."""
Josh Gao191c1542015-12-09 11:26:11 -0800824 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
Josh Gao49ba5582020-03-26 19:33:25 -0700825 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
Josh Gao191c1542015-12-09 11:26:11 -0800826
Josh Gao49ba5582020-03-26 19:33:25 -0700827 try:
828 host_dir = tempfile.mkdtemp()
Josh Gao94dc19f2016-09-14 16:13:50 -0700829
Josh Gao49ba5582020-03-26 19:33:25 -0700830 # Make sure the temp directory isn't setuid, or else adb will complain.
831 os.chmod(host_dir, 0o700)
Josh Gao94dc19f2016-09-14 16:13:50 -0700832
Josh Gao49ba5582020-03-26 19:33:25 -0700833 # Create an empty directory.
834 empty_dir_path = os.path.join(host_dir, 'empty')
835 os.mkdir(empty_dir_path);
Josh Gao94dc19f2016-09-14 16:13:50 -0700836
Josh Gao49ba5582020-03-26 19:33:25 -0700837 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
Josh Gao94dc19f2016-09-14 16:13:50 -0700838
Josh Gao49ba5582020-03-26 19:33:25 -0700839 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
840 test_empty_cmd = ["[", "-d", remote_path, "]"]
841 rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
842
843 self.assertEqual(rc, 0)
844 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
845 finally:
846 if host_dir is not None:
847 shutil.rmtree(host_dir)
848
849 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
850 def test_push_symlink(self):
851 """Push a symlink.
852
853 Bug: http://b/31491920
854 """
855 try:
856 host_dir = tempfile.mkdtemp()
857
858 # Make sure the temp directory isn't setuid, or else adb will
859 # complain.
860 os.chmod(host_dir, 0o700)
861
862 with open(os.path.join(host_dir, 'foo'), 'w') as f:
863 f.write('foo')
864
865 symlink_path = os.path.join(host_dir, 'symlink')
866 os.symlink('foo', symlink_path)
867
868 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
869 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
870 self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
871 rc, out, _ = self.device.shell_nocheck(
872 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
873 self.assertEqual(0, rc)
874 self.assertEqual(out.strip(), 'foo')
875 finally:
876 if host_dir is not None:
877 shutil.rmtree(host_dir)
878
879 def test_multiple_push(self):
880 """Push multiple files to the device in one adb push command.
881
882 Bug: http://b/25324823
883 """
Josh Gao94dc19f2016-09-14 16:13:50 -0700884
885 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
886 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
Josh Gao94dc19f2016-09-14 16:13:50 -0700887
Josh Gaoafcdcd72016-02-19 15:55:55 -0800888 try:
Josh Gao49ba5582020-03-26 19:33:25 -0700889 host_dir = tempfile.mkdtemp()
890
891 # Create some random files and a subdirectory containing more files.
892 temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
893
894 subdir = os.path.join(host_dir, 'subdir')
895 os.mkdir(subdir)
896 subdir_temp_files = make_random_host_files(in_dir=subdir,
897 num_files=4)
898
899 paths = [x.full_path for x in temp_files]
900 paths.append(subdir)
901 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
902
903 for temp_file in temp_files:
904 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
905 temp_file.base_name)
906 self._verify_remote(temp_file.checksum, remote_path)
907
908 for subdir_temp_file in subdir_temp_files:
909 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
910 # BROKEN: http://b/25394682
911 # 'subdir';
912 temp_file.base_name)
913 self._verify_remote(temp_file.checksum, remote_path)
914
915
916 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
917 finally:
918 if host_dir is not None:
919 shutil.rmtree(host_dir)
920
921 @requires_non_root
922 def test_push_error_reporting(self):
923 """Make sure that errors that occur while pushing a file get reported
924
925 Bug: http://b/26816782
926 """
927 with tempfile.NamedTemporaryFile() as tmp_file:
928 tmp_file.write(b'\0' * 1024 * 1024)
929 tmp_file.flush()
930 try:
931 self.device.push(local=tmp_file.name, remote='/system/')
932 self.fail('push should not have succeeded')
933 except subprocess.CalledProcessError as e:
934 output = e.output
935
936 self.assertTrue(b'Permission denied' in output or
937 b'Read-only file system' in output)
938
939 @requires_non_root
940 def test_push_directory_creation(self):
941 """Regression test for directory creation.
942
943 Bug: http://b/110953234
944 """
945 with tempfile.NamedTemporaryFile() as tmp_file:
946 tmp_file.write(b'\0' * 1024 * 1024)
947 tmp_file.flush()
948 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
949 self.device.shell(['rm', '-rf', remote_path])
950
951 remote_path += '/filename'
952 self.device.push(local=tmp_file.name, remote=remote_path)
953
954 def disabled_test_push_multiple_slash_root(self):
955 """Regression test for pushing to //data/local/tmp.
956
957 Bug: http://b/141311284
958
959 Disabled because this broken on the adbd side as well: b/141943968
960 """
961 with tempfile.NamedTemporaryFile() as tmp_file:
962 tmp_file.write('\0' * 1024 * 1024)
963 tmp_file.flush()
964 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
965 self.device.shell(['rm', '-rf', remote_path])
966 self.device.push(local=tmp_file.name, remote=remote_path)
967
968 def _test_pull(self, remote_file, checksum):
969 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
970 tmp_write.close()
971 self.device.pull(remote=remote_file, local=tmp_write.name)
972 with open(tmp_write.name, 'rb') as tmp_read:
973 host_contents = tmp_read.read()
974 host_md5 = compute_md5(host_contents)
975 self.assertEqual(checksum, host_md5)
976 os.remove(tmp_write.name)
977
978 @requires_non_root
979 def test_pull_error_reporting(self):
980 self.device.shell(['touch', self.DEVICE_TEMP_FILE])
981 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
982
983 try:
984 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
Josh Gaoafcdcd72016-02-19 15:55:55 -0800985 except subprocess.CalledProcessError as e:
986 output = e.output
987
Josh Gao49ba5582020-03-26 19:33:25 -0700988 self.assertIn(b'Permission denied', output)
Josh Gao191c1542015-12-09 11:26:11 -0800989
Josh Gao49ba5582020-03-26 19:33:25 -0700990 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
Josh Gao4c0078d2018-06-28 18:43:19 -0700991
Josh Gao49ba5582020-03-26 19:33:25 -0700992 def test_pull(self):
993 """Pull a randomly generated file from specified device."""
994 kbytes = 512
995 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
996 cmd = ['dd', 'if=/dev/urandom',
997 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
998 'count={}'.format(kbytes)]
999 self.device.shell(cmd)
1000 dev_md5, _ = self.device.shell(
1001 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
1002 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
1003 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
Josh Gao4c0078d2018-06-28 18:43:19 -07001004
Josh Gao49ba5582020-03-26 19:33:25 -07001005 def test_pull_dir(self):
1006 """Pull a randomly generated directory of files from the device."""
1007 try:
1008 host_dir = tempfile.mkdtemp()
Josh Gao4c0078d2018-06-28 18:43:19 -07001009
Josh Gao49ba5582020-03-26 19:33:25 -07001010 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1011 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
Josh Gao76b64ba2019-09-26 01:49:56 +08001012
Josh Gao49ba5582020-03-26 19:33:25 -07001013 # Populate device directory with random files.
1014 temp_files = make_random_device_files(
1015 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
Josh Gaoea3f43c2019-10-01 14:14:07 -07001016
Josh Gao49ba5582020-03-26 19:33:25 -07001017 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
Josh Gao76b64ba2019-09-26 01:49:56 +08001018
Josh Gao49ba5582020-03-26 19:33:25 -07001019 for temp_file in temp_files:
1020 host_path = os.path.join(
1021 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1022 temp_file.base_name)
1023 self._verify_local(temp_file.checksum, host_path)
Josh Gao191c1542015-12-09 11:26:11 -08001024
Josh Gao49ba5582020-03-26 19:33:25 -07001025 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1026 finally:
1027 if host_dir is not None:
1028 shutil.rmtree(host_dir)
Josh Gao191c1542015-12-09 11:26:11 -08001029
Josh Gao49ba5582020-03-26 19:33:25 -07001030 def test_pull_dir_symlink(self):
1031 """Pull a directory into a symlink to a directory.
Josh Gao191c1542015-12-09 11:26:11 -08001032
Josh Gao49ba5582020-03-26 19:33:25 -07001033 Bug: http://b/27362811
1034 """
1035 if os.name != 'posix':
1036 raise unittest.SkipTest('requires POSIX')
Josh Gao191c1542015-12-09 11:26:11 -08001037
Josh Gao49ba5582020-03-26 19:33:25 -07001038 try:
1039 host_dir = tempfile.mkdtemp()
1040 real_dir = os.path.join(host_dir, 'dir')
1041 symlink = os.path.join(host_dir, 'symlink')
1042 os.mkdir(real_dir)
1043 os.symlink(real_dir, symlink)
Josh Gao191c1542015-12-09 11:26:11 -08001044
Josh Gao49ba5582020-03-26 19:33:25 -07001045 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1046 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
Josh Gao191c1542015-12-09 11:26:11 -08001047
Josh Gao49ba5582020-03-26 19:33:25 -07001048 # Populate device directory with random files.
1049 temp_files = make_random_device_files(
1050 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
Josh Gao191c1542015-12-09 11:26:11 -08001051
Josh Gao49ba5582020-03-26 19:33:25 -07001052 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
Josh Gao191c1542015-12-09 11:26:11 -08001053
Josh Gao49ba5582020-03-26 19:33:25 -07001054 for temp_file in temp_files:
1055 host_path = os.path.join(
1056 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1057 temp_file.base_name)
1058 self._verify_local(temp_file.checksum, host_path)
Josh Gao191c1542015-12-09 11:26:11 -08001059
Josh Gao49ba5582020-03-26 19:33:25 -07001060 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1061 finally:
1062 if host_dir is not None:
1063 shutil.rmtree(host_dir)
Josh Gao191c1542015-12-09 11:26:11 -08001064
Josh Gao49ba5582020-03-26 19:33:25 -07001065 def test_pull_dir_symlink_collision(self):
1066 """Pull a directory into a colliding symlink to directory."""
1067 if os.name != 'posix':
1068 raise unittest.SkipTest('requires POSIX')
1069
1070 try:
1071 host_dir = tempfile.mkdtemp()
1072 real_dir = os.path.join(host_dir, 'real')
1073 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1074 symlink = os.path.join(host_dir, tmp_dirname)
1075 os.mkdir(real_dir)
1076 os.symlink(real_dir, symlink)
1077
1078 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1079 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1080
1081 # Populate device directory with random files.
1082 temp_files = make_random_device_files(
1083 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1084
1085 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1086
1087 for temp_file in temp_files:
1088 host_path = os.path.join(real_dir, temp_file.base_name)
1089 self._verify_local(temp_file.checksum, host_path)
1090
1091 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1092 finally:
1093 if host_dir is not None:
1094 shutil.rmtree(host_dir)
1095
1096 def test_pull_dir_nonexistent(self):
1097 """Pull a directory of files from the device to a nonexistent path."""
1098 try:
1099 host_dir = tempfile.mkdtemp()
1100 dest_dir = os.path.join(host_dir, 'dest')
1101
1102 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1103 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1104
1105 # Populate device directory with random files.
1106 temp_files = make_random_device_files(
1107 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1108
1109 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1110
1111 for temp_file in temp_files:
1112 host_path = os.path.join(dest_dir, temp_file.base_name)
1113 self._verify_local(temp_file.checksum, host_path)
1114
1115 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1116 finally:
1117 if host_dir is not None:
1118 shutil.rmtree(host_dir)
1119
1120 # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1121 def disabled_test_pull_symlink_dir(self):
1122 """Pull a symlink to a directory of symlinks to files."""
1123 try:
1124 host_dir = tempfile.mkdtemp()
1125
1126 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1127 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1128 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1129
1130 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1131 self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1132 self.device.shell(['ln', '-s', remote_links, remote_symlink])
1133
1134 # Populate device directory with random files.
1135 temp_files = make_random_device_files(
1136 self.device, in_dir=remote_dir, num_files=32)
1137
1138 for temp_file in temp_files:
1139 self.device.shell(
1140 ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1141 posixpath.join(remote_links, temp_file.base_name)])
1142
1143 self.device.pull(remote=remote_symlink, local=host_dir)
1144
1145 for temp_file in temp_files:
1146 host_path = os.path.join(
1147 host_dir, 'symlink', temp_file.base_name)
1148 self._verify_local(temp_file.checksum, host_path)
1149
1150 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1151 finally:
1152 if host_dir is not None:
1153 shutil.rmtree(host_dir)
1154
1155 def test_pull_empty(self):
1156 """Pull a directory containing an empty directory from the device."""
1157 try:
1158 host_dir = tempfile.mkdtemp()
1159
1160 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1161 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1162 self.device.shell(['mkdir', '-p', remote_empty_path])
1163
1164 self.device.pull(remote=remote_empty_path, local=host_dir)
1165 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1166 finally:
1167 if host_dir is not None:
1168 shutil.rmtree(host_dir)
1169
1170 def test_multiple_pull(self):
1171 """Pull a randomly generated directory of files from the device."""
1172
1173 try:
1174 host_dir = tempfile.mkdtemp()
1175
1176 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1177 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1178 self.device.shell(['mkdir', '-p', subdir])
1179
1180 # Create some random files and a subdirectory containing more files.
1181 temp_files = make_random_device_files(
1182 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1183
1184 subdir_temp_files = make_random_device_files(
1185 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1186
1187 paths = [x.full_path for x in temp_files]
1188 paths.append(subdir)
1189 self.device._simple_call(['pull'] + paths + [host_dir])
1190
1191 for temp_file in temp_files:
1192 local_path = os.path.join(host_dir, temp_file.base_name)
1193 self._verify_local(temp_file.checksum, local_path)
1194
1195 for subdir_temp_file in subdir_temp_files:
1196 local_path = os.path.join(host_dir,
1197 'subdir',
1198 subdir_temp_file.base_name)
1199 self._verify_local(subdir_temp_file.checksum, local_path)
1200
1201 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1202 finally:
1203 if host_dir is not None:
1204 shutil.rmtree(host_dir)
1205
1206 def verify_sync(self, device, temp_files, device_dir):
1207 """Verifies that a list of temp files was synced to the device."""
1208 # Confirm that every file on the device mirrors that on the host.
Josh Gao191c1542015-12-09 11:26:11 -08001209 for temp_file in temp_files:
Josh Gao49ba5582020-03-26 19:33:25 -07001210 device_full_path = posixpath.join(
1211 device_dir, temp_file.base_name)
1212 dev_md5, _ = device.shell(
1213 [get_md5_prog(self.device), device_full_path])[0].split()
1214 self.assertEqual(temp_file.checksum, dev_md5)
Josh Gao191c1542015-12-09 11:26:11 -08001215
Josh Gao49ba5582020-03-26 19:33:25 -07001216 def test_sync(self):
1217 """Sync a host directory to the data partition."""
Josh Gao191c1542015-12-09 11:26:11 -08001218
Josh Gao49ba5582020-03-26 19:33:25 -07001219 try:
1220 base_dir = tempfile.mkdtemp()
Josh Gao1e611a32016-02-26 13:26:55 -08001221
Josh Gao49ba5582020-03-26 19:33:25 -07001222 # Create mirror device directory hierarchy within base_dir.
1223 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1224 os.makedirs(full_dir_path)
Josh Gao1e611a32016-02-26 13:26:55 -08001225
Josh Gao49ba5582020-03-26 19:33:25 -07001226 # Create 32 random files within the host mirror.
1227 temp_files = make_random_host_files(
1228 in_dir=full_dir_path, num_files=32)
Josh Gao1e611a32016-02-26 13:26:55 -08001229
Josh Gao49ba5582020-03-26 19:33:25 -07001230 # Clean up any stale files on the device.
1231 device = adb.get_device() # pylint: disable=no-member
1232 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
Josh Gao1e611a32016-02-26 13:26:55 -08001233
Josh Gao49ba5582020-03-26 19:33:25 -07001234 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1235 os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1236 device.sync('data')
1237 if old_product_out is None:
1238 del os.environ['ANDROID_PRODUCT_OUT']
1239 else:
1240 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
Josh Gao1e611a32016-02-26 13:26:55 -08001241
Josh Gao49ba5582020-03-26 19:33:25 -07001242 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
Josh Gao1e611a32016-02-26 13:26:55 -08001243
Josh Gao49ba5582020-03-26 19:33:25 -07001244 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1245 finally:
1246 if base_dir is not None:
1247 shutil.rmtree(base_dir)
Josh Gao1e611a32016-02-26 13:26:55 -08001248
Josh Gao49ba5582020-03-26 19:33:25 -07001249 def test_push_sync(self):
1250 """Sync a host directory to a specific path."""
Josh Gao1e611a32016-02-26 13:26:55 -08001251
Josh Gao49ba5582020-03-26 19:33:25 -07001252 try:
1253 temp_dir = tempfile.mkdtemp()
1254 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
Josh Gao1e611a32016-02-26 13:26:55 -08001255
Josh Gao49ba5582020-03-26 19:33:25 -07001256 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
Josh Gao1e611a32016-02-26 13:26:55 -08001257
Josh Gao49ba5582020-03-26 19:33:25 -07001258 # Clean up any stale files on the device.
1259 device = adb.get_device() # pylint: disable=no-member
1260 device.shell(['rm', '-rf', device_dir])
Josh Gao1e611a32016-02-26 13:26:55 -08001261
Josh Gao49ba5582020-03-26 19:33:25 -07001262 device.push(temp_dir, device_dir, sync=True)
Josh Gao1e611a32016-02-26 13:26:55 -08001263
Josh Gao49ba5582020-03-26 19:33:25 -07001264 self.verify_sync(device, temp_files, device_dir)
Josh Gao1e611a32016-02-26 13:26:55 -08001265
Josh Gao49ba5582020-03-26 19:33:25 -07001266 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1267 finally:
1268 if temp_dir is not None:
1269 shutil.rmtree(temp_dir)
Josh Gao1e611a32016-02-26 13:26:55 -08001270
Josh Gao49ba5582020-03-26 19:33:25 -07001271 def test_unicode_paths(self):
1272 """Ensure that we can support non-ASCII paths, even on Windows."""
1273 name = u'로보카 폴리'
Josh Gao1e611a32016-02-26 13:26:55 -08001274
Josh Gao49ba5582020-03-26 19:33:25 -07001275 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1276 remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
Josh Gao89ec3a82016-03-02 16:00:02 -08001277
Josh Gao49ba5582020-03-26 19:33:25 -07001278 ## push.
1279 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1280 tf.close()
1281 self.device.push(tf.name, remote_path)
1282 os.remove(tf.name)
1283 self.assertFalse(os.path.exists(tf.name))
Josh Gao89ec3a82016-03-02 16:00:02 -08001284
Josh Gao49ba5582020-03-26 19:33:25 -07001285 # Verify that the device ended up with the expected UTF-8 path
1286 output = self.device.shell(
1287 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1288 self.assertEqual(remote_path, output)
Josh Gao89ec3a82016-03-02 16:00:02 -08001289
Josh Gao49ba5582020-03-26 19:33:25 -07001290 # pull.
1291 self.device.pull(remote_path, tf.name)
1292 self.assertTrue(os.path.exists(tf.name))
1293 os.remove(tf.name)
1294 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
Josh Gao89ec3a82016-03-02 16:00:02 -08001295
Josh Gao89ec3a82016-03-02 16:00:02 -08001296
Josh Gao49ba5582020-03-26 19:33:25 -07001297class FileOperationsTestUncompressed(FileOperationsTest.Base):
1298 compression = "none"
Josh Gao89ec3a82016-03-02 16:00:02 -08001299
Josh Gaof2642242015-12-09 14:03:30 -08001300
Josh Gao49ba5582020-03-26 19:33:25 -07001301class FileOperationsTestBrotli(FileOperationsTest.Base):
1302 compression = "brotli"
Josh Gao191c1542015-12-09 11:26:11 -08001303
1304
Yabin Cuib5e11412017-03-10 16:01:01 -08001305class DeviceOfflineTest(DeviceTest):
1306 def _get_device_state(self, serialno):
1307 output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1308 for line in output.split('\n'):
1309 m = re.match('(\S+)\s+(\S+)', line)
1310 if m and m.group(1) == serialno:
1311 return m.group(2)
1312 return None
1313
Josh Gao33d14b82017-09-13 14:51:23 -07001314 def disabled_test_killed_when_pushing_a_large_file(self):
Yabin Cuib5e11412017-03-10 16:01:01 -08001315 """
1316 While running adb push with a large file, kill adb server.
1317 Occasionally the device becomes offline. Because the device is still
1318 reading data without realizing that the adb server has been restarted.
1319 Test if we can bring the device online automatically now.
1320 http://b/32952319
1321 """
1322 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1323 # 1. Push a large file
1324 file_path = 'tmp_large_file'
1325 try:
1326 fh = open(file_path, 'w')
1327 fh.write('\0' * (100 * 1024 * 1024))
1328 fh.close()
1329 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1330 time.sleep(0.1)
1331 # 2. Kill the adb server
1332 subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1333 subproc.terminate()
1334 finally:
1335 try:
1336 os.unlink(file_path)
1337 except:
1338 pass
1339 # 3. See if the device still exist.
1340 # Sleep to wait for the adb server exit.
1341 time.sleep(0.5)
1342 # 4. The device should be online
1343 self.assertEqual(self._get_device_state(serialno), 'device')
1344
Josh Gao33d14b82017-09-13 14:51:23 -07001345 def disabled_test_killed_when_pulling_a_large_file(self):
Yabin Cuib5e11412017-03-10 16:01:01 -08001346 """
1347 While running adb pull with a large file, kill adb server.
1348 Occasionally the device can't be connected. Because the device is trying to
1349 send a message larger than what is expected by the adb server.
1350 Test if we can bring the device online automatically now.
1351 """
1352 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1353 file_path = 'tmp_large_file'
1354 try:
1355 # 1. Create a large file on device.
1356 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1357 'bs=1000000', 'count=100'])
1358 # 2. Pull the large file on host.
1359 subproc = subprocess.Popen(self.device.adb_cmd +
1360 ['pull','/data/local/tmp/tmp_large_file', file_path])
1361 time.sleep(0.1)
1362 # 3. Kill the adb server
1363 subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1364 subproc.terminate()
1365 finally:
1366 try:
1367 os.unlink(file_path)
1368 except:
1369 pass
1370 # 4. See if the device still exist.
1371 # Sleep to wait for the adb server exit.
1372 time.sleep(0.5)
1373 self.assertEqual(self._get_device_state(serialno), 'device')
1374
1375
Josh Gaoef3d3432017-05-02 15:01:09 -07001376 def test_packet_size_regression(self):
1377 """Test for http://b/37783561
1378
1379 Receiving packets of a length divisible by 512 but not 1024 resulted in
1380 the adb client waiting indefinitely for more input.
1381 """
1382 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1383 # Probe some surrounding values as well, for the hell of it.
Josh Gao93dee962020-02-06 17:52:38 -08001384 for base in [512] + list(range(1024, 1024 * 16, 1024)):
Josh Gao2ea46522018-04-10 14:35:06 -07001385 for offset in [-6, -5, -4]:
1386 length = base + offset
1387 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1388 'echo', 'foo']
1389 rc, stdout, _ = self.device.shell_nocheck(cmd)
Josh Gaoef3d3432017-05-02 15:01:09 -07001390
Josh Gao2ea46522018-04-10 14:35:06 -07001391 self.assertEqual(0, rc)
Josh Gaoef3d3432017-05-02 15:01:09 -07001392
Josh Gao2ea46522018-04-10 14:35:06 -07001393 # Output should be '\0' * length, followed by "foo\n"
1394 self.assertEqual(length, len(stdout) - 4)
1395 self.assertEqual(stdout, "\0" * length + "foo\n")
Josh Gaoef3d3432017-05-02 15:01:09 -07001396
Josh Gao5d799cd2018-08-22 15:13:18 -07001397 def test_zero_packet(self):
1398 """Test for http://b/113070258
1399
1400 Make sure that we don't blow up when sending USB transfers that line up
1401 exactly with the USB packet size.
1402 """
1403
1404 local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1405 try:
1406 for size in [512, 1024]:
1407 def listener():
1408 cmd = ["echo foo | nc -l -p 12345; echo done"]
1409 rc, stdout, stderr = self.device.shell_nocheck(cmd)
1410
1411 thread = threading.Thread(target=listener)
1412 thread.start()
1413
1414 # Wait a bit to let the shell command start.
1415 time.sleep(0.25)
1416
1417 sock = socket.create_connection(("localhost", local_port))
1418 with contextlib.closing(sock):
Josh Gao93dee962020-02-06 17:52:38 -08001419 bytesWritten = sock.send(b"a" * size)
Josh Gao5d799cd2018-08-22 15:13:18 -07001420 self.assertEqual(size, bytesWritten)
1421 readBytes = sock.recv(4096)
Josh Gao93dee962020-02-06 17:52:38 -08001422 self.assertEqual(b"foo\n", readBytes)
Josh Gao5d799cd2018-08-22 15:13:18 -07001423
1424 thread.join()
1425 finally:
1426 self.device.forward_remove("tcp:{}".format(local_port))
1427
Josh Gaoef3d3432017-05-02 15:01:09 -07001428
Josh Gao74b7ec72019-01-11 14:42:08 -08001429class SocketTest(DeviceTest):
1430 def test_socket_flush(self):
1431 """Test that we handle socket closure properly.
1432
1433 If we're done writing to a socket, closing before the other end has
1434 closed will send a TCP_RST if we have incoming data queued up, which
1435 may result in data that we've written being discarded.
1436
1437 Bug: http://b/74616284
1438 """
1439 s = socket.create_connection(("localhost", 5037))
1440
1441 def adb_length_prefixed(string):
1442 encoded = string.encode("utf8")
1443 result = b"%04x%s" % (len(encoded), encoded)
1444 return result
1445
1446 if "ANDROID_SERIAL" in os.environ:
1447 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1448 else:
1449 transport_string = "host:transport-any"
1450
1451 s.sendall(adb_length_prefixed(transport_string))
1452 response = s.recv(4)
Josh Gao93dee962020-02-06 17:52:38 -08001453 self.assertEqual(b"OKAY", response)
Josh Gao74b7ec72019-01-11 14:42:08 -08001454
1455 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1456 s.sendall(adb_length_prefixed(shell_string))
1457
1458 response = s.recv(4)
Josh Gao93dee962020-02-06 17:52:38 -08001459 self.assertEqual(b"OKAY", response)
Josh Gao74b7ec72019-01-11 14:42:08 -08001460
1461 # Spawn a thread that dumps garbage into the socket until failure.
1462 def spam():
1463 buf = b"\0" * 16384
1464 try:
1465 while True:
1466 s.sendall(buf)
1467 except Exception as ex:
1468 print(ex)
1469
1470 thread = threading.Thread(target=spam)
1471 thread.start()
1472
1473 time.sleep(1)
1474
1475 received = b""
1476 while True:
1477 read = s.recv(512)
1478 if len(read) == 0:
1479 break
1480 received += read
1481
Josh Gao93dee962020-02-06 17:52:38 -08001482 self.assertEqual(1024 * 1024 + len("foo\n"), len(received))
Josh Gao74b7ec72019-01-11 14:42:08 -08001483 thread.join()
1484
1485
Spencer Low69d8c392018-08-11 00:16:16 -07001486if sys.platform == "win32":
1487 # From https://stackoverflow.com/a/38749458
1488 import os
1489 import contextlib
1490 import msvcrt
1491 import ctypes
1492 from ctypes import wintypes
1493
1494 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1495
1496 GENERIC_READ = 0x80000000
1497 GENERIC_WRITE = 0x40000000
1498 FILE_SHARE_READ = 1
1499 FILE_SHARE_WRITE = 2
1500 CONSOLE_TEXTMODE_BUFFER = 1
1501 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1502 STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1503 STD_ERROR_HANDLE = wintypes.DWORD(-12)
1504
1505 def _check_zero(result, func, args):
1506 if not result:
1507 raise ctypes.WinError(ctypes.get_last_error())
1508 return args
1509
1510 def _check_invalid(result, func, args):
1511 if result == INVALID_HANDLE_VALUE:
1512 raise ctypes.WinError(ctypes.get_last_error())
1513 return args
1514
1515 if not hasattr(wintypes, 'LPDWORD'): # Python 2
1516 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1517 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1518
1519 class COORD(ctypes.Structure):
1520 _fields_ = (('X', wintypes.SHORT),
1521 ('Y', wintypes.SHORT))
1522
1523 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1524 _fields_ = (('cbSize', wintypes.ULONG),
1525 ('dwSize', COORD),
1526 ('dwCursorPosition', COORD),
1527 ('wAttributes', wintypes.WORD),
1528 ('srWindow', wintypes.SMALL_RECT),
1529 ('dwMaximumWindowSize', COORD),
1530 ('wPopupAttributes', wintypes.WORD),
1531 ('bFullscreenSupported', wintypes.BOOL),
1532 ('ColorTable', wintypes.DWORD * 16))
1533 def __init__(self, *args, **kwds):
1534 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1535 *args, **kwds)
1536 self.cbSize = ctypes.sizeof(self)
1537
1538 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1539 CONSOLE_SCREEN_BUFFER_INFOEX)
1540 LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1541
1542 kernel32.GetStdHandle.errcheck = _check_invalid
1543 kernel32.GetStdHandle.restype = wintypes.HANDLE
1544 kernel32.GetStdHandle.argtypes = (
1545 wintypes.DWORD,) # _In_ nStdHandle
1546
1547 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1548 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1549 kernel32.CreateConsoleScreenBuffer.argtypes = (
1550 wintypes.DWORD, # _In_ dwDesiredAccess
1551 wintypes.DWORD, # _In_ dwShareMode
1552 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes
1553 wintypes.DWORD, # _In_ dwFlags
1554 wintypes.LPVOID) # _Reserved_ lpScreenBufferData
1555
1556 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1557 kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1558 wintypes.HANDLE, # _In_ hConsoleOutput
1559 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1560
1561 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1562 kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1563 wintypes.HANDLE, # _In_ hConsoleOutput
1564 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo
1565
1566 kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1567 kernel32.SetConsoleWindowInfo.argtypes = (
1568 wintypes.HANDLE, # _In_ hConsoleOutput
1569 wintypes.BOOL, # _In_ bAbsolute
1570 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1571
1572 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1573 kernel32.FillConsoleOutputCharacterW.argtypes = (
1574 wintypes.HANDLE, # _In_ hConsoleOutput
1575 wintypes.WCHAR, # _In_ cCharacter
1576 wintypes.DWORD, # _In_ nLength
1577 COORD, # _In_ dwWriteCoord
1578 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1579
1580 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1581 kernel32.ReadConsoleOutputCharacterW.argtypes = (
1582 wintypes.HANDLE, # _In_ hConsoleOutput
1583 wintypes.LPWSTR, # _Out_ lpCharacter
1584 wintypes.DWORD, # _In_ nLength
1585 COORD, # _In_ dwReadCoord
1586 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1587
1588 @contextlib.contextmanager
1589 def allocate_console():
1590 allocated = kernel32.AllocConsole()
1591 try:
1592 yield allocated
1593 finally:
1594 if allocated:
1595 kernel32.FreeConsole()
1596
1597 @contextlib.contextmanager
1598 def console_screen(ncols=None, nrows=None):
1599 info = CONSOLE_SCREEN_BUFFER_INFOEX()
1600 new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1601 nwritten = (wintypes.DWORD * 1)()
1602 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1603 kernel32.GetConsoleScreenBufferInfoEx(
1604 hStdOut, ctypes.byref(info))
1605 if ncols is None:
1606 ncols = info.dwSize.X
1607 if nrows is None:
1608 nrows = info.dwSize.Y
1609 elif nrows > 9999:
1610 raise ValueError('nrows must be 9999 or less')
1611 fd_screen = None
1612 hScreen = kernel32.CreateConsoleScreenBuffer(
1613 GENERIC_READ | GENERIC_WRITE,
1614 FILE_SHARE_READ | FILE_SHARE_WRITE,
1615 None, CONSOLE_TEXTMODE_BUFFER, None)
1616 try:
1617 fd_screen = msvcrt.open_osfhandle(
1618 hScreen, os.O_RDWR | os.O_BINARY)
1619 kernel32.GetConsoleScreenBufferInfoEx(
1620 hScreen, ctypes.byref(new_info))
1621 new_info.dwSize = COORD(ncols, nrows)
1622 new_info.srWindow = wintypes.SMALL_RECT(
1623 Left=0, Top=0, Right=(ncols - 1),
1624 Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1625 kernel32.SetConsoleScreenBufferInfoEx(
1626 hScreen, ctypes.byref(new_info))
1627 kernel32.SetConsoleWindowInfo(hScreen, True,
1628 ctypes.byref(new_info.srWindow))
1629 kernel32.FillConsoleOutputCharacterW(
1630 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1631 kernel32.SetConsoleActiveScreenBuffer(hScreen)
1632 try:
1633 yield fd_screen
1634 finally:
1635 kernel32.SetConsoleScreenBufferInfoEx(
1636 hStdOut, ctypes.byref(info))
1637 kernel32.SetConsoleWindowInfo(hStdOut, True,
1638 ctypes.byref(info.srWindow))
1639 kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1640 finally:
1641 if fd_screen is not None:
1642 os.close(fd_screen)
1643 else:
1644 kernel32.CloseHandle(hScreen)
1645
1646 def read_screen(fd):
1647 hScreen = msvcrt.get_osfhandle(fd)
1648 csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1649 kernel32.GetConsoleScreenBufferInfoEx(
1650 hScreen, ctypes.byref(csbi))
1651 ncols = csbi.dwSize.X
1652 pos = csbi.dwCursorPosition
1653 length = ncols * pos.Y + pos.X + 1
1654 buf = (ctypes.c_wchar * length)()
1655 n = (wintypes.DWORD * 1)()
1656 kernel32.ReadConsoleOutputCharacterW(
1657 hScreen, buf, length, COORD(0,0), n)
1658 lines = [buf[i:i+ncols].rstrip(u'\0')
1659 for i in range(0, n[0], ncols)]
1660 return u'\n'.join(lines)
1661
1662@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1663class WindowsConsoleTest(DeviceTest):
1664 def test_unicode_output(self):
1665 """Test Unicode command line parameters and Unicode console window output.
1666
1667 Bug: https://issuetracker.google.com/issues/111972753
1668 """
1669 # If we don't have a console window, allocate one. This isn't necessary if we're already
1670 # being run from a console window, which is typical.
1671 with allocate_console() as allocated_console:
1672 # Create a temporary console buffer and switch to it. We could also pass a parameter of
1673 # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1674 # likely unnecessary given the typical console window size.
1675 with console_screen(nrows=1000) as screen:
1676 unicode_string = u'로보카 폴리'
1677 # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1678 # device.shell_popen() which does not use a pipe, unlike device.shell().
1679 process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1680 process.wait()
1681 # Read what was written by adb to the temporary console buffer.
1682 console_output = read_screen(screen)
1683 self.assertEqual(unicode_string, console_output)
1684
1685
Josh Gao191c1542015-12-09 11:26:11 -08001686def main():
1687 random.seed(0)
1688 if len(adb.get_devices()) > 0:
1689 suite = unittest.TestLoader().loadTestsFromName(__name__)
1690 unittest.TextTestRunner(verbosity=3).run(suite)
1691 else:
1692 print('Test suite must be run with attached devices')
1693
1694
1695if __name__ == '__main__':
1696 main()