blob: 6a9ff89ef9574713f71497e84063cebf20434928 [file] [log] [blame]
Josh Gao93dee962020-02-06 17:52:38 -08001#!/usr/bin/env python3
Josh Gao191c1542015-12-09 11:26:11 -08002# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
Josh Gaoc970aef2018-03-19 15:35:11 -070034import threading
Josh Gaofe50bb72016-06-22 18:27:22 -070035import time
Josh Gao191c1542015-12-09 11:26:11 -080036import unittest
37
Josh Gao74b7ec72019-01-11 14:42:08 -080038from datetime import datetime
39
Josh Gao191c1542015-12-09 11:26:11 -080040import adb
41
Josh Gao191c1542015-12-09 11:26:11 -080042def requires_root(func):
43 def wrapper(self, *args):
44 if self.device.get_prop('ro.debuggable') != '1':
45 raise unittest.SkipTest('requires rootable build')
46
47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48 if not was_root:
49 self.device.root()
50 self.device.wait()
51
52 try:
53 func(self, *args)
54 finally:
55 if not was_root:
56 self.device.unroot()
57 self.device.wait()
58
59 return wrapper
60
61
62def requires_non_root(func):
63 def wrapper(self, *args):
64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65 if was_root:
66 self.device.unroot()
67 self.device.wait()
68
69 try:
70 func(self, *args)
71 finally:
72 if was_root:
73 self.device.root()
74 self.device.wait()
75
76 return wrapper
77
78
Josh Gao191c1542015-12-09 11:26:11 -080079class DeviceTest(unittest.TestCase):
80 def setUp(self):
81 self.device = adb.get_device()
82
83
Josh Gaoe4d66fc2020-02-04 12:32:43 -080084class AbbTest(DeviceTest):
85 def test_smoke(self):
86 result = subprocess.run(['adb', 'abb'], capture_output=True)
87 self.assertEqual(1, result.returncode)
88 expected_output = b"cmd: No service specified; use -l to list all services\n"
89 self.assertEqual(expected_output, result.stderr)
90
Josh Gao191c1542015-12-09 11:26:11 -080091class ForwardReverseTest(DeviceTest):
92 def _test_no_rebind(self, description, direction_list, direction,
93 direction_no_rebind, direction_remove_all):
94 msg = direction_list()
95 self.assertEqual('', msg.strip(),
96 description + ' list must be empty to run this test.')
97
98 # Use --no-rebind with no existing binding
99 direction_no_rebind('tcp:5566', 'tcp:6655')
100 msg = direction_list()
101 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
102
103 # Use --no-rebind with existing binding
104 with self.assertRaises(subprocess.CalledProcessError):
105 direction_no_rebind('tcp:5566', 'tcp:6677')
106 msg = direction_list()
107 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
108 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
109
110 # Use the absence of --no-rebind with existing binding
111 direction('tcp:5566', 'tcp:6677')
112 msg = direction_list()
113 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
114 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
115
116 direction_remove_all()
117 msg = direction_list()
118 self.assertEqual('', msg.strip())
119
120 def test_forward_no_rebind(self):
121 self._test_no_rebind('forward', self.device.forward_list,
122 self.device.forward, self.device.forward_no_rebind,
123 self.device.forward_remove_all)
124
125 def test_reverse_no_rebind(self):
126 self._test_no_rebind('reverse', self.device.reverse_list,
127 self.device.reverse, self.device.reverse_no_rebind,
128 self.device.reverse_remove_all)
129
130 def test_forward(self):
131 msg = self.device.forward_list()
132 self.assertEqual('', msg.strip(),
133 'Forwarding list must be empty to run this test.')
134 self.device.forward('tcp:5566', 'tcp:6655')
135 msg = self.device.forward_list()
136 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
137 self.device.forward('tcp:7788', 'tcp:8877')
138 msg = self.device.forward_list()
139 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
140 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
141 self.device.forward_remove('tcp:5566')
142 msg = self.device.forward_list()
143 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
144 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
145 self.device.forward_remove_all()
146 msg = self.device.forward_list()
147 self.assertEqual('', msg.strip())
148
Josh Gao727b07b2019-09-13 00:12:26 +0800149 def test_forward_old_protocol(self):
150 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
151
152 msg = self.device.forward_list()
153 self.assertEqual('', msg.strip(),
154 'Forwarding list must be empty to run this test.')
155
156 s = socket.create_connection(("localhost", 5037))
157 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
158 cmd = b"%04x%s" % (len(service), service)
159 s.sendall(cmd)
160
161 msg = self.device.forward_list()
162 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
163
164 self.device.forward_remove_all()
165 msg = self.device.forward_list()
166 self.assertEqual('', msg.strip())
167
David Purselleaae97e2016-04-07 11:25:48 -0700168 def test_forward_tcp_port_0(self):
169 self.assertEqual('', self.device.forward_list().strip(),
170 'Forwarding list must be empty to run this test.')
171
172 try:
173 # If resolving TCP port 0 is supported, `adb forward` will print
174 # the actual port number.
175 port = self.device.forward('tcp:0', 'tcp:8888').strip()
176 if not port:
177 raise unittest.SkipTest('Forwarding tcp:0 is not available.')
178
179 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
180 self.device.forward_list()))
181 finally:
182 self.device.forward_remove_all()
183
Josh Gao191c1542015-12-09 11:26:11 -0800184 def test_reverse(self):
185 msg = self.device.reverse_list()
186 self.assertEqual('', msg.strip(),
187 'Reverse forwarding list must be empty to run this test.')
188 self.device.reverse('tcp:5566', 'tcp:6655')
189 msg = self.device.reverse_list()
190 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
191 self.device.reverse('tcp:7788', 'tcp:8877')
192 msg = self.device.reverse_list()
193 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
194 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
195 self.device.reverse_remove('tcp:5566')
196 msg = self.device.reverse_list()
197 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
198 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
199 self.device.reverse_remove_all()
200 msg = self.device.reverse_list()
201 self.assertEqual('', msg.strip())
202
David Purselleaae97e2016-04-07 11:25:48 -0700203 def test_reverse_tcp_port_0(self):
204 self.assertEqual('', self.device.reverse_list().strip(),
205 'Reverse list must be empty to run this test.')
206
207 try:
208 # If resolving TCP port 0 is supported, `adb reverse` will print
209 # the actual port number.
210 port = self.device.reverse('tcp:0', 'tcp:8888').strip()
211 if not port:
212 raise unittest.SkipTest('Reversing tcp:0 is not available.')
213
214 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
215 self.device.reverse_list()))
216 finally:
217 self.device.reverse_remove_all()
218
Josh Gao191c1542015-12-09 11:26:11 -0800219 def test_forward_reverse_echo(self):
220 """Send data through adb forward and read it back via adb reverse"""
221 forward_port = 12345
222 reverse_port = forward_port + 1
Josh Gao255c5c82016-03-03 14:49:02 -0800223 forward_spec = 'tcp:' + str(forward_port)
224 reverse_spec = 'tcp:' + str(reverse_port)
Josh Gao191c1542015-12-09 11:26:11 -0800225 forward_setup = False
226 reverse_setup = False
227
228 try:
229 # listen on localhost:forward_port, connect to remote:forward_port
230 self.device.forward(forward_spec, forward_spec)
231 forward_setup = True
232 # listen on remote:forward_port, connect to localhost:reverse_port
233 self.device.reverse(forward_spec, reverse_spec)
234 reverse_setup = True
235
236 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
237 with contextlib.closing(listener):
238 # Use SO_REUSEADDR so that subsequent runs of the test can grab
239 # the port even if it is in TIME_WAIT.
240 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
241
242 # Listen on localhost:reverse_port before connecting to
243 # localhost:forward_port because that will cause adb to connect
244 # back to localhost:reverse_port.
245 listener.bind(('127.0.0.1', reverse_port))
246 listener.listen(4)
247
248 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
249 with contextlib.closing(client):
250 # Connect to the listener.
251 client.connect(('127.0.0.1', forward_port))
252
253 # Accept the client connection.
254 accepted_connection, addr = listener.accept()
255 with contextlib.closing(accepted_connection) as server:
Josh Gao93dee962020-02-06 17:52:38 -0800256 data = b'hello'
Josh Gao191c1542015-12-09 11:26:11 -0800257
258 # Send data into the port setup by adb forward.
259 client.sendall(data)
260 # Explicitly close() so that server gets EOF.
261 client.close()
262
263 # Verify that the data came back via adb reverse.
Josh Gao93dee962020-02-06 17:52:38 -0800264 self.assertEqual(data, server.makefile().read().encode("utf8"))
Josh Gao191c1542015-12-09 11:26:11 -0800265 finally:
266 if reverse_setup:
267 self.device.reverse_remove(forward_spec)
268 if forward_setup:
269 self.device.forward_remove(forward_spec)
270
271
272class ShellTest(DeviceTest):
273 def _interactive_shell(self, shell_args, input):
274 """Runs an interactive adb shell.
275
276 Args:
277 shell_args: List of string arguments to `adb shell`.
Josh Gao93dee962020-02-06 17:52:38 -0800278 input: bytes input to send to the interactive shell.
Josh Gao191c1542015-12-09 11:26:11 -0800279
280 Returns:
281 The remote exit code.
282
283 Raises:
284 unittest.SkipTest: The device doesn't support exit codes.
285 """
David Pursellcf467412016-04-26 13:25:57 -0700286 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800287 raise unittest.SkipTest('exit codes are unavailable on this device')
288
289 proc = subprocess.Popen(
290 self.device.adb_cmd + ['shell'] + shell_args,
291 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
292 stderr=subprocess.PIPE)
293 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
294 # to explicitly add an exit command to close the session from the device
295 # side, plus the necessary newline to complete the interactive command.
Josh Gao93dee962020-02-06 17:52:38 -0800296 proc.communicate(input + b'; exit\n')
Josh Gao191c1542015-12-09 11:26:11 -0800297 return proc.returncode
298
299 def test_cat(self):
300 """Check that we can at least cat a file."""
301 out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
302 elements = out.split()
303 self.assertEqual(len(elements), 2)
304
305 uptime, idle = elements
306 self.assertGreater(float(uptime), 0.0)
307 self.assertGreater(float(idle), 0.0)
308
309 def test_throws_on_failure(self):
310 self.assertRaises(adb.ShellError, self.device.shell, ['false'])
311
312 def test_output_not_stripped(self):
313 out = self.device.shell(['echo', 'foo'])[0]
314 self.assertEqual(out, 'foo' + self.device.linesep)
315
Josh Gaoa019f782017-06-16 15:34:34 -0700316 def test_shell_command_length(self):
317 # Devices that have shell_v2 should be able to handle long commands.
318 if self.device.has_shell_protocol():
319 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
320 self.assertEqual(rc, 0)
321 self.assertTrue(out == ('x' * 16384 + '\n'))
322
Josh Gao191c1542015-12-09 11:26:11 -0800323 def test_shell_nocheck_failure(self):
324 rc, out, _ = self.device.shell_nocheck(['false'])
325 self.assertNotEqual(rc, 0)
326 self.assertEqual(out, '')
327
328 def test_shell_nocheck_output_not_stripped(self):
329 rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
330 self.assertEqual(rc, 0)
331 self.assertEqual(out, 'foo' + self.device.linesep)
332
333 def test_can_distinguish_tricky_results(self):
334 # If result checking on ADB shell is naively implemented as
335 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
336 # output from the result for a cmd of `echo -n 1`.
337 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
338 self.assertEqual(rc, 0)
339 self.assertEqual(out, '1')
340
341 def test_line_endings(self):
342 """Ensure that line ending translation is not happening in the pty.
343
344 Bug: http://b/19735063
345 """
346 output = self.device.shell(['uname'])[0]
347 self.assertEqual(output, 'Linux' + self.device.linesep)
348
349 def test_pty_logic(self):
350 """Tests that a PTY is allocated when it should be.
351
Elliott Hughescabfa112016-10-19 14:47:11 -0700352 PTY allocation behavior should match ssh.
Josh Gao191c1542015-12-09 11:26:11 -0800353 """
Josh Gao191c1542015-12-09 11:26:11 -0800354 def check_pty(args):
355 """Checks adb shell PTY allocation.
356
357 Tests |args| for terminal and non-terminal stdin.
358
359 Args:
360 args: -Tt args in a list (e.g. ['-t', '-t']).
361
362 Returns:
363 A tuple (<terminal>, <non-terminal>). True indicates
364 the corresponding shell allocated a remote PTY.
365 """
366 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
367
368 terminal = subprocess.Popen(
369 test_cmd, stdin=None,
370 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
371 terminal.communicate()
372
373 non_terminal = subprocess.Popen(
374 test_cmd, stdin=subprocess.PIPE,
375 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
376 non_terminal.communicate()
377
378 return (terminal.returncode == 0, non_terminal.returncode == 0)
379
380 # -T: never allocate PTY.
381 self.assertEqual((False, False), check_pty(['-T']))
382
Elliott Hughescabfa112016-10-19 14:47:11 -0700383 # These tests require a new device.
384 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
385 # No args: PTY only if stdin is a terminal and shell is interactive,
386 # which is difficult to reliably test from a script.
387 self.assertEqual((False, False), check_pty([]))
Josh Gao191c1542015-12-09 11:26:11 -0800388
Elliott Hughescabfa112016-10-19 14:47:11 -0700389 # -t: PTY if stdin is a terminal.
390 self.assertEqual((True, False), check_pty(['-t']))
Josh Gao191c1542015-12-09 11:26:11 -0800391
392 # -t -t: always allocate PTY.
393 self.assertEqual((True, True), check_pty(['-t', '-t']))
394
Elliott Hughescabfa112016-10-19 14:47:11 -0700395 # -tt: always allocate PTY, POSIX style (http://b/32216152).
396 self.assertEqual((True, True), check_pty(['-tt']))
397
398 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
399 # we follow the man page instead.
400 self.assertEqual((True, True), check_pty(['-ttt']))
401
402 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
403 self.assertEqual((True, True), check_pty(['-ttx']))
404
405 # -Ttt: -tt cancels out -T.
406 self.assertEqual((True, True), check_pty(['-Ttt']))
407
408 # -ttT: -T cancels out -tt.
409 self.assertEqual((False, False), check_pty(['-ttT']))
410
Josh Gao191c1542015-12-09 11:26:11 -0800411 def test_shell_protocol(self):
412 """Tests the shell protocol on the device.
413
414 If the device supports shell protocol, this gives us the ability
415 to separate stdout/stderr and return the exit code directly.
416
417 Bug: http://b/19734861
418 """
David Pursellcf467412016-04-26 13:25:57 -0700419 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800420 raise unittest.SkipTest('shell protocol unsupported on this device')
421
422 # Shell protocol should be used by default.
423 result = self.device.shell_nocheck(
424 shlex.split('echo foo; echo bar >&2; exit 17'))
425 self.assertEqual(17, result[0])
426 self.assertEqual('foo' + self.device.linesep, result[1])
427 self.assertEqual('bar' + self.device.linesep, result[2])
428
Josh Gao93dee962020-02-06 17:52:38 -0800429 self.assertEqual(17, self._interactive_shell([], b'exit 17'))
Josh Gao191c1542015-12-09 11:26:11 -0800430
431 # -x flag should disable shell protocol.
432 result = self.device.shell_nocheck(
433 shlex.split('-x echo foo; echo bar >&2; exit 17'))
434 self.assertEqual(0, result[0])
435 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
436 self.assertEqual('', result[2])
437
Josh Gao93dee962020-02-06 17:52:38 -0800438 self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17'))
Josh Gao191c1542015-12-09 11:26:11 -0800439
440 def test_non_interactive_sigint(self):
441 """Tests that SIGINT in a non-interactive shell kills the process.
442
443 This requires the shell protocol in order to detect the broken
444 pipe; raw data transfer mode will only see the break once the
445 subprocess tries to read or write.
446
447 Bug: http://b/23825725
448 """
David Pursellcf467412016-04-26 13:25:57 -0700449 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800450 raise unittest.SkipTest('shell protocol unsupported on this device')
451
452 # Start a long-running process.
453 sleep_proc = subprocess.Popen(
454 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
455 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
456 stderr=subprocess.STDOUT)
Josh Gao93dee962020-02-06 17:52:38 -0800457 remote_pid = sleep_proc.stdout.readline().strip().decode("utf8")
Josh Gao191c1542015-12-09 11:26:11 -0800458 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
459 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
460
461 # Verify that the process is running, send signal, verify it stopped.
462 self.device.shell(proc_query)
463 os.kill(sleep_proc.pid, signal.SIGINT)
464 sleep_proc.communicate()
Josh Gaoe76b9f32016-10-21 12:40:42 -0700465
466 # It can take some time for the process to receive the signal and die.
467 end_time = time.time() + 3
468 while self.device.shell_nocheck(proc_query)[0] != 1:
469 self.assertFalse(time.time() > end_time,
470 'subprocess failed to terminate in time')
Josh Gao191c1542015-12-09 11:26:11 -0800471
472 def test_non_interactive_stdin(self):
473 """Tests that non-interactive shells send stdin."""
David Pursellcf467412016-04-26 13:25:57 -0700474 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800475 raise unittest.SkipTest('non-interactive stdin unsupported '
476 'on this device')
477
478 # Test both small and large inputs.
Josh Gao93dee962020-02-06 17:52:38 -0800479 small_input = b'foo'
480 characters = [c.encode("utf8") for c in string.ascii_letters + string.digits]
481 large_input = b'\n'.join(characters)
482
Josh Gao191c1542015-12-09 11:26:11 -0800483
484 for input in (small_input, large_input):
485 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
486 stdin=subprocess.PIPE,
487 stdout=subprocess.PIPE,
488 stderr=subprocess.PIPE)
489 stdout, stderr = proc.communicate(input)
490 self.assertEqual(input.splitlines(), stdout.splitlines())
Josh Gao93dee962020-02-06 17:52:38 -0800491 self.assertEqual(b'', stderr)
Josh Gao191c1542015-12-09 11:26:11 -0800492
Josh Gaofe50bb72016-06-22 18:27:22 -0700493 def test_sighup(self):
494 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
495 log_path = "/data/local/tmp/adb_signal_test.log"
496
497 # Clear the output file.
498 self.device.shell_nocheck(["echo", ">", log_path])
499
500 script = """
501 trap "echo SIGINT > {path}; exit 0" SIGINT
502 trap "echo SIGHUP > {path}; exit 0" SIGHUP
503 echo Waiting
Josh Gao470622f2016-10-21 13:17:32 -0700504 read
Josh Gaofe50bb72016-06-22 18:27:22 -0700505 """.format(path=log_path)
506
507 script = ";".join([x.strip() for x in script.strip().splitlines()])
508
Josh Gao470622f2016-10-21 13:17:32 -0700509 process = self.device.shell_popen([script], kill_atexit=False,
510 stdin=subprocess.PIPE,
511 stdout=subprocess.PIPE)
Josh Gaofe50bb72016-06-22 18:27:22 -0700512
Josh Gao93dee962020-02-06 17:52:38 -0800513 self.assertEqual(b"Waiting\n", process.stdout.readline())
Josh Gaofe50bb72016-06-22 18:27:22 -0700514 process.send_signal(signal.SIGINT)
515 process.wait()
516
517 # Waiting for the local adb to finish is insufficient, since it hangs
518 # up immediately.
Josh Gao470622f2016-10-21 13:17:32 -0700519 time.sleep(1)
Josh Gaofe50bb72016-06-22 18:27:22 -0700520
521 stdout, _ = self.device.shell(["cat", log_path])
522 self.assertEqual(stdout.strip(), "SIGHUP")
523
Josh Gaoc970aef2018-03-19 15:35:11 -0700524 def test_exit_stress(self):
525 """Hammer `adb shell exit 42` with multiple threads."""
526 thread_count = 48
527 result = dict()
528 def hammer(thread_idx, thread_count, result):
529 success = True
530 for i in range(thread_idx, 240, thread_count):
531 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
532 if ret != i % 256:
533 success = False
534 break
535 result[thread_idx] = success
536
537 threads = []
538 for i in range(thread_count):
539 thread = threading.Thread(target=hammer, args=(i, thread_count, result))
540 thread.start()
541 threads.append(thread)
542 for thread in threads:
543 thread.join()
Josh Gao93dee962020-02-06 17:52:38 -0800544 for i, success in result.items():
Josh Gaoc970aef2018-03-19 15:35:11 -0700545 self.assertTrue(success)
546
Josh Gao63da8e62019-12-16 17:13:51 -0800547 def disabled_test_parallel(self):
548 """Spawn a bunch of `adb shell` instances in parallel.
549
550 This was broken historically due to the use of select, which only works
551 for fds that are numerically less than 1024.
552
553 Bug: http://b/141955761"""
554
555 n_procs = 2048
556 procs = dict()
Josh Gao93dee962020-02-06 17:52:38 -0800557 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800558 procs[i] = subprocess.Popen(
559 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
560 stdin=subprocess.PIPE,
561 stdout=subprocess.PIPE
562 )
563
Josh Gao93dee962020-02-06 17:52:38 -0800564 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800565 procs[i].stdin.write("%d\n" % i)
566
Josh Gao93dee962020-02-06 17:52:38 -0800567 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800568 response = procs[i].stdout.readline()
569 assert(response == "%d\n" % i)
570
Josh Gao93dee962020-02-06 17:52:38 -0800571 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800572 procs[i].stdin.write("%d\n" % (i % 256))
573
Josh Gao93dee962020-02-06 17:52:38 -0800574 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800575 assert(procs[i].wait() == i % 256)
576
Josh Gao191c1542015-12-09 11:26:11 -0800577
578class ArgumentEscapingTest(DeviceTest):
579 def test_shell_escaping(self):
580 """Make sure that argument escaping is somewhat sane."""
581
582 # http://b/19734868
583 # Note that this actually matches ssh(1)'s behavior --- it's
584 # converted to `sh -c echo hello; echo world` which sh interprets
585 # as `sh -c echo` (with an argument to that shell of "hello"),
586 # and then `echo world` back in the first shell.
587 result = self.device.shell(
588 shlex.split("sh -c 'echo hello; echo world'"))[0]
589 result = result.splitlines()
590 self.assertEqual(['', 'world'], result)
591 # If you really wanted "hello" and "world", here's what you'd do:
592 result = self.device.shell(
593 shlex.split(r'echo hello\;echo world'))[0].splitlines()
594 self.assertEqual(['hello', 'world'], result)
595
596 # http://b/15479704
597 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
598 self.assertEqual('t', result)
599 result = self.device.shell(
600 shlex.split("sh -c 'true && echo t'"))[0].strip()
601 self.assertEqual('t', result)
602
603 # http://b/20564385
604 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
605 self.assertEqual('t', result)
606 result = self.device.shell(
607 shlex.split(r'echo -n 123\;uname'))[0].strip()
608 self.assertEqual('123Linux', result)
609
610 def test_install_argument_escaping(self):
611 """Make sure that install argument escaping works."""
612 # http://b/20323053, http://b/3090932.
Josh Gao93dee962020-02-06 17:52:38 -0800613 for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"):
Josh Gao191c1542015-12-09 11:26:11 -0800614 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
615 delete=False)
616 tf.close()
617
618 # Installing bogus .apks fails if the device supports exit codes.
619 try:
Josh Gao93dee962020-02-06 17:52:38 -0800620 output = self.device.install(tf.name.decode("utf8"))
Josh Gao191c1542015-12-09 11:26:11 -0800621 except subprocess.CalledProcessError as e:
622 output = e.output
623
624 self.assertIn(file_suffix, output)
625 os.remove(tf.name)
626
627
628class RootUnrootTest(DeviceTest):
629 def _test_root(self):
630 message = self.device.root()
631 if 'adbd cannot run as root in production builds' in message:
632 return
633 self.device.wait()
634 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
635
636 def _test_unroot(self):
637 self.device.unroot()
638 self.device.wait()
639 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
640
641 def test_root_unroot(self):
642 """Make sure that adb root and adb unroot work, using id(1)."""
643 if self.device.get_prop('ro.debuggable') != '1':
644 raise unittest.SkipTest('requires rootable build')
645
646 original_user = self.device.shell(['id', '-un'])[0].strip()
647 try:
648 if original_user == 'root':
649 self._test_unroot()
650 self._test_root()
651 elif original_user == 'shell':
652 self._test_root()
653 self._test_unroot()
654 finally:
655 if original_user == 'root':
656 self.device.root()
657 else:
658 self.device.unroot()
659 self.device.wait()
660
661
662class TcpIpTest(DeviceTest):
663 def test_tcpip_failure_raises(self):
664 """adb tcpip requires a port.
665
666 Bug: http://b/22636927
667 """
668 self.assertRaises(
669 subprocess.CalledProcessError, self.device.tcpip, '')
670 self.assertRaises(
671 subprocess.CalledProcessError, self.device.tcpip, 'foo')
672
673
674class SystemPropertiesTest(DeviceTest):
675 def test_get_prop(self):
676 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
677
678 @requires_root
679 def test_set_prop(self):
680 prop_name = 'foo.bar'
681 self.device.shell(['setprop', prop_name, '""'])
682
683 self.device.set_prop(prop_name, 'qux')
684 self.assertEqual(
685 self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
686
687
688def compute_md5(string):
689 hsh = hashlib.md5()
690 hsh.update(string)
691 return hsh.hexdigest()
692
693
694def get_md5_prog(device):
695 """Older platforms (pre-L) had the name md5 rather than md5sum."""
696 try:
697 device.shell(['md5sum', '/proc/uptime'])
698 return 'md5sum'
699 except adb.ShellError:
700 return 'md5'
701
702
703class HostFile(object):
704 def __init__(self, handle, checksum):
705 self.handle = handle
706 self.checksum = checksum
707 self.full_path = handle.name
708 self.base_name = os.path.basename(self.full_path)
709
710
711class DeviceFile(object):
712 def __init__(self, checksum, full_path):
713 self.checksum = checksum
714 self.full_path = full_path
715 self.base_name = posixpath.basename(self.full_path)
716
717
718def make_random_host_files(in_dir, num_files):
719 min_size = 1 * (1 << 10)
720 max_size = 16 * (1 << 10)
721
722 files = []
Josh Gao93dee962020-02-06 17:52:38 -0800723 for _ in range(num_files):
Josh Gao191c1542015-12-09 11:26:11 -0800724 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
725
726 size = random.randrange(min_size, max_size, 1024)
727 rand_str = os.urandom(size)
728 file_handle.write(rand_str)
729 file_handle.flush()
730 file_handle.close()
731
732 md5 = compute_md5(rand_str)
733 files.append(HostFile(file_handle, md5))
734 return files
735
736
737def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
738 min_size = 1 * (1 << 10)
739 max_size = 16 * (1 << 10)
740
741 files = []
Josh Gao93dee962020-02-06 17:52:38 -0800742 for file_num in range(num_files):
Josh Gao191c1542015-12-09 11:26:11 -0800743 size = random.randrange(min_size, max_size, 1024)
744
745 base_name = prefix + str(file_num)
746 full_path = posixpath.join(in_dir, base_name)
747
748 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
749 'bs={}'.format(size), 'count=1'])
750 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
751
752 files.append(DeviceFile(dev_md5, full_path))
753 return files
754
755
756class FileOperationsTest(DeviceTest):
757 SCRATCH_DIR = '/data/local/tmp'
758 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
759 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
760
761 def _verify_remote(self, checksum, remote_path):
762 dev_md5, _ = self.device.shell([get_md5_prog(self.device),
763 remote_path])[0].split()
764 self.assertEqual(checksum, dev_md5)
765
766 def _verify_local(self, checksum, local_path):
767 with open(local_path, 'rb') as host_file:
768 host_md5 = compute_md5(host_file.read())
769 self.assertEqual(host_md5, checksum)
770
771 def test_push(self):
772 """Push a randomly generated file to specified device."""
773 kbytes = 512
774 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
775 rand_str = os.urandom(1024 * kbytes)
776 tmp.write(rand_str)
777 tmp.close()
778
779 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
780 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
781
782 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
783 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
784
785 os.remove(tmp.name)
786
787 def test_push_dir(self):
788 """Push a randomly generated directory of files to the device."""
789 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
790 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
791
792 try:
793 host_dir = tempfile.mkdtemp()
794
795 # Make sure the temp directory isn't setuid, or else adb will complain.
796 os.chmod(host_dir, 0o700)
797
798 # Create 32 random files.
799 temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
800 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
801
802 for temp_file in temp_files:
803 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
804 os.path.basename(host_dir),
805 temp_file.base_name)
806 self._verify_remote(temp_file.checksum, remote_path)
807 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
808 finally:
809 if host_dir is not None:
810 shutil.rmtree(host_dir)
811
Josh Gaofb085102018-10-22 13:00:05 -0700812 def disabled_test_push_empty(self):
Josh Gaoed176502018-09-21 13:40:16 -0700813 """Push an empty directory to the device."""
Josh Gao191c1542015-12-09 11:26:11 -0800814 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
815 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
816
817 try:
818 host_dir = tempfile.mkdtemp()
819
820 # Make sure the temp directory isn't setuid, or else adb will complain.
821 os.chmod(host_dir, 0o700)
822
823 # Create an empty directory.
Josh Gao6c1b42c2018-08-08 14:25:41 -0700824 empty_dir_path = os.path.join(host_dir, 'empty')
825 os.mkdir(empty_dir_path);
Josh Gao191c1542015-12-09 11:26:11 -0800826
Josh Gao6c1b42c2018-08-08 14:25:41 -0700827 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
Josh Gao191c1542015-12-09 11:26:11 -0800828
Josh Gaoed176502018-09-21 13:40:16 -0700829 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
830 test_empty_cmd = ["[", "-d", remote_path, "]"]
Josh Gao191c1542015-12-09 11:26:11 -0800831 rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
Josh Gaoed176502018-09-21 13:40:16 -0700832
Josh Gao191c1542015-12-09 11:26:11 -0800833 self.assertEqual(rc, 0)
834 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
835 finally:
836 if host_dir is not None:
837 shutil.rmtree(host_dir)
838
Josh Gao94dc19f2016-09-14 16:13:50 -0700839 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
840 def test_push_symlink(self):
841 """Push a symlink.
842
843 Bug: http://b/31491920
844 """
845 try:
846 host_dir = tempfile.mkdtemp()
847
848 # Make sure the temp directory isn't setuid, or else adb will
849 # complain.
850 os.chmod(host_dir, 0o700)
851
852 with open(os.path.join(host_dir, 'foo'), 'w') as f:
853 f.write('foo')
854
855 symlink_path = os.path.join(host_dir, 'symlink')
856 os.symlink('foo', symlink_path)
857
858 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
859 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
860 self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
861 rc, out, _ = self.device.shell_nocheck(
862 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
863 self.assertEqual(0, rc)
864 self.assertEqual(out.strip(), 'foo')
865 finally:
866 if host_dir is not None:
867 shutil.rmtree(host_dir)
868
Josh Gao191c1542015-12-09 11:26:11 -0800869 def test_multiple_push(self):
870 """Push multiple files to the device in one adb push command.
871
872 Bug: http://b/25324823
873 """
874
875 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
876 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
877
878 try:
879 host_dir = tempfile.mkdtemp()
880
881 # Create some random files and a subdirectory containing more files.
882 temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
883
Josh Gao255c5c82016-03-03 14:49:02 -0800884 subdir = os.path.join(host_dir, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -0800885 os.mkdir(subdir)
886 subdir_temp_files = make_random_host_files(in_dir=subdir,
887 num_files=4)
888
Josh Gao93dee962020-02-06 17:52:38 -0800889 paths = [x.full_path for x in temp_files]
Josh Gao191c1542015-12-09 11:26:11 -0800890 paths.append(subdir)
891 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
892
893 for temp_file in temp_files:
894 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
895 temp_file.base_name)
896 self._verify_remote(temp_file.checksum, remote_path)
897
898 for subdir_temp_file in subdir_temp_files:
899 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
900 # BROKEN: http://b/25394682
Josh Gao255c5c82016-03-03 14:49:02 -0800901 # 'subdir';
Josh Gao191c1542015-12-09 11:26:11 -0800902 temp_file.base_name)
903 self._verify_remote(temp_file.checksum, remote_path)
904
905
906 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
907 finally:
908 if host_dir is not None:
909 shutil.rmtree(host_dir)
910
Josh Gaoafcdcd72016-02-19 15:55:55 -0800911 @requires_non_root
912 def test_push_error_reporting(self):
913 """Make sure that errors that occur while pushing a file get reported
914
915 Bug: http://b/26816782
916 """
917 with tempfile.NamedTemporaryFile() as tmp_file:
Josh Gao93dee962020-02-06 17:52:38 -0800918 tmp_file.write(b'\0' * 1024 * 1024)
Josh Gaoafcdcd72016-02-19 15:55:55 -0800919 tmp_file.flush()
920 try:
921 self.device.push(local=tmp_file.name, remote='/system/')
Josh Gao255c5c82016-03-03 14:49:02 -0800922 self.fail('push should not have succeeded')
Josh Gaoafcdcd72016-02-19 15:55:55 -0800923 except subprocess.CalledProcessError as e:
924 output = e.output
925
Josh Gao93dee962020-02-06 17:52:38 -0800926 self.assertTrue(b'Permission denied' in output or
927 b'Read-only file system' in output)
Josh Gao191c1542015-12-09 11:26:11 -0800928
Josh Gao4c0078d2018-06-28 18:43:19 -0700929 @requires_non_root
930 def test_push_directory_creation(self):
931 """Regression test for directory creation.
932
933 Bug: http://b/110953234
934 """
935 with tempfile.NamedTemporaryFile() as tmp_file:
Josh Gao93dee962020-02-06 17:52:38 -0800936 tmp_file.write(b'\0' * 1024 * 1024)
Josh Gao4c0078d2018-06-28 18:43:19 -0700937 tmp_file.flush()
938 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
939 self.device.shell(['rm', '-rf', remote_path])
940
941 remote_path += '/filename'
942 self.device.push(local=tmp_file.name, remote=remote_path)
943
Josh Gaoea3f43c2019-10-01 14:14:07 -0700944 def disabled_test_push_multiple_slash_root(self):
Josh Gao76b64ba2019-09-26 01:49:56 +0800945 """Regression test for pushing to //data/local/tmp.
946
947 Bug: http://b/141311284
Josh Gaoea3f43c2019-10-01 14:14:07 -0700948
949 Disabled because this broken on the adbd side as well: b/141943968
Josh Gao76b64ba2019-09-26 01:49:56 +0800950 """
951 with tempfile.NamedTemporaryFile() as tmp_file:
952 tmp_file.write('\0' * 1024 * 1024)
953 tmp_file.flush()
954 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
955 self.device.shell(['rm', '-rf', remote_path])
956 self.device.push(local=tmp_file.name, remote=remote_path)
957
Josh Gao191c1542015-12-09 11:26:11 -0800958 def _test_pull(self, remote_file, checksum):
959 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
960 tmp_write.close()
961 self.device.pull(remote=remote_file, local=tmp_write.name)
962 with open(tmp_write.name, 'rb') as tmp_read:
963 host_contents = tmp_read.read()
964 host_md5 = compute_md5(host_contents)
965 self.assertEqual(checksum, host_md5)
966 os.remove(tmp_write.name)
967
968 @requires_non_root
969 def test_pull_error_reporting(self):
970 self.device.shell(['touch', self.DEVICE_TEMP_FILE])
971 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
972
973 try:
974 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
975 except subprocess.CalledProcessError as e:
976 output = e.output
977
Josh Gao93dee962020-02-06 17:52:38 -0800978 self.assertIn(b'Permission denied', output)
Josh Gao191c1542015-12-09 11:26:11 -0800979
980 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
981
982 def test_pull(self):
983 """Pull a randomly generated file from specified device."""
984 kbytes = 512
985 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
986 cmd = ['dd', 'if=/dev/urandom',
987 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
988 'count={}'.format(kbytes)]
989 self.device.shell(cmd)
990 dev_md5, _ = self.device.shell(
991 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
992 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
993 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
994
995 def test_pull_dir(self):
996 """Pull a randomly generated directory of files from the device."""
997 try:
998 host_dir = tempfile.mkdtemp()
999
1000 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1001 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1002
1003 # Populate device directory with random files.
1004 temp_files = make_random_device_files(
1005 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1006
1007 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1008
1009 for temp_file in temp_files:
Josh Gaoce8f2cd2015-12-09 14:20:23 -08001010 host_path = os.path.join(
1011 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1012 temp_file.base_name)
1013 self._verify_local(temp_file.checksum, host_path)
Josh Gao191c1542015-12-09 11:26:11 -08001014
1015 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1016 finally:
1017 if host_dir is not None:
1018 shutil.rmtree(host_dir)
1019
Josh Gao1e611a32016-02-26 13:26:55 -08001020 def test_pull_dir_symlink(self):
1021 """Pull a directory into a symlink to a directory.
1022
1023 Bug: http://b/27362811
1024 """
Josh Gao255c5c82016-03-03 14:49:02 -08001025 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -08001026 raise unittest.SkipTest('requires POSIX')
1027
1028 try:
1029 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -08001030 real_dir = os.path.join(host_dir, 'dir')
1031 symlink = os.path.join(host_dir, 'symlink')
Josh Gao1e611a32016-02-26 13:26:55 -08001032 os.mkdir(real_dir)
1033 os.symlink(real_dir, symlink)
1034
1035 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1036 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1037
1038 # Populate device directory with random files.
1039 temp_files = make_random_device_files(
1040 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1041
1042 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1043
1044 for temp_file in temp_files:
1045 host_path = os.path.join(
1046 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1047 temp_file.base_name)
1048 self._verify_local(temp_file.checksum, host_path)
1049
1050 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1051 finally:
1052 if host_dir is not None:
1053 shutil.rmtree(host_dir)
1054
1055 def test_pull_dir_symlink_collision(self):
1056 """Pull a directory into a colliding symlink to directory."""
Josh Gao255c5c82016-03-03 14:49:02 -08001057 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -08001058 raise unittest.SkipTest('requires POSIX')
1059
1060 try:
1061 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -08001062 real_dir = os.path.join(host_dir, 'real')
Josh Gao1e611a32016-02-26 13:26:55 -08001063 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1064 symlink = os.path.join(host_dir, tmp_dirname)
1065 os.mkdir(real_dir)
1066 os.symlink(real_dir, symlink)
1067
1068 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1069 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1070
1071 # Populate device directory with random files.
1072 temp_files = make_random_device_files(
1073 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1074
1075 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1076
1077 for temp_file in temp_files:
1078 host_path = os.path.join(real_dir, temp_file.base_name)
1079 self._verify_local(temp_file.checksum, host_path)
1080
1081 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1082 finally:
1083 if host_dir is not None:
1084 shutil.rmtree(host_dir)
1085
Josh Gao89ec3a82016-03-02 16:00:02 -08001086 def test_pull_dir_nonexistent(self):
1087 """Pull a directory of files from the device to a nonexistent path."""
1088 try:
1089 host_dir = tempfile.mkdtemp()
1090 dest_dir = os.path.join(host_dir, 'dest')
1091
1092 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1093 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1094
1095 # Populate device directory with random files.
1096 temp_files = make_random_device_files(
1097 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1098
1099 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1100
1101 for temp_file in temp_files:
1102 host_path = os.path.join(dest_dir, temp_file.base_name)
1103 self._verify_local(temp_file.checksum, host_path)
1104
1105 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1106 finally:
1107 if host_dir is not None:
1108 shutil.rmtree(host_dir)
1109
Josh Gaoa8db2742018-08-08 14:26:03 -07001110 # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1111 def disabled_test_pull_symlink_dir(self):
Josh Gaof2642242015-12-09 14:03:30 -08001112 """Pull a symlink to a directory of symlinks to files."""
1113 try:
1114 host_dir = tempfile.mkdtemp()
1115
1116 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1117 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1118 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1119
1120 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1121 self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1122 self.device.shell(['ln', '-s', remote_links, remote_symlink])
1123
1124 # Populate device directory with random files.
1125 temp_files = make_random_device_files(
1126 self.device, in_dir=remote_dir, num_files=32)
1127
1128 for temp_file in temp_files:
1129 self.device.shell(
1130 ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1131 posixpath.join(remote_links, temp_file.base_name)])
1132
1133 self.device.pull(remote=remote_symlink, local=host_dir)
1134
1135 for temp_file in temp_files:
1136 host_path = os.path.join(
1137 host_dir, 'symlink', temp_file.base_name)
1138 self._verify_local(temp_file.checksum, host_path)
1139
1140 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1141 finally:
1142 if host_dir is not None:
1143 shutil.rmtree(host_dir)
1144
Josh Gao191c1542015-12-09 11:26:11 -08001145 def test_pull_empty(self):
1146 """Pull a directory containing an empty directory from the device."""
1147 try:
1148 host_dir = tempfile.mkdtemp()
1149
1150 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1151 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1152 self.device.shell(['mkdir', '-p', remote_empty_path])
1153
1154 self.device.pull(remote=remote_empty_path, local=host_dir)
1155 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1156 finally:
1157 if host_dir is not None:
1158 shutil.rmtree(host_dir)
1159
1160 def test_multiple_pull(self):
1161 """Pull a randomly generated directory of files from the device."""
1162
1163 try:
1164 host_dir = tempfile.mkdtemp()
1165
Josh Gao255c5c82016-03-03 14:49:02 -08001166 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -08001167 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1168 self.device.shell(['mkdir', '-p', subdir])
1169
1170 # Create some random files and a subdirectory containing more files.
1171 temp_files = make_random_device_files(
1172 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1173
1174 subdir_temp_files = make_random_device_files(
1175 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1176
Josh Gao93dee962020-02-06 17:52:38 -08001177 paths = [x.full_path for x in temp_files]
Josh Gao191c1542015-12-09 11:26:11 -08001178 paths.append(subdir)
1179 self.device._simple_call(['pull'] + paths + [host_dir])
1180
1181 for temp_file in temp_files:
1182 local_path = os.path.join(host_dir, temp_file.base_name)
1183 self._verify_local(temp_file.checksum, local_path)
1184
1185 for subdir_temp_file in subdir_temp_files:
1186 local_path = os.path.join(host_dir,
Josh Gao255c5c82016-03-03 14:49:02 -08001187 'subdir',
Josh Gao191c1542015-12-09 11:26:11 -08001188 subdir_temp_file.base_name)
1189 self._verify_local(subdir_temp_file.checksum, local_path)
1190
1191 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1192 finally:
1193 if host_dir is not None:
1194 shutil.rmtree(host_dir)
1195
Dan Albert06b0d6b2017-05-18 22:56:48 -07001196 def verify_sync(self, device, temp_files, device_dir):
1197 """Verifies that a list of temp files was synced to the device."""
1198 # Confirm that every file on the device mirrors that on the host.
1199 for temp_file in temp_files:
1200 device_full_path = posixpath.join(
1201 device_dir, temp_file.base_name)
1202 dev_md5, _ = device.shell(
1203 [get_md5_prog(self.device), device_full_path])[0].split()
1204 self.assertEqual(temp_file.checksum, dev_md5)
1205
Josh Gao191c1542015-12-09 11:26:11 -08001206 def test_sync(self):
Dan Albert06b0d6b2017-05-18 22:56:48 -07001207 """Sync a host directory to the data partition."""
Josh Gao191c1542015-12-09 11:26:11 -08001208
1209 try:
1210 base_dir = tempfile.mkdtemp()
1211
1212 # Create mirror device directory hierarchy within base_dir.
1213 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1214 os.makedirs(full_dir_path)
1215
1216 # Create 32 random files within the host mirror.
Dan Albert06b0d6b2017-05-18 22:56:48 -07001217 temp_files = make_random_host_files(
1218 in_dir=full_dir_path, num_files=32)
Josh Gao191c1542015-12-09 11:26:11 -08001219
Dan Albert06b0d6b2017-05-18 22:56:48 -07001220 # Clean up any stale files on the device.
Dan Albertdef4aae2017-05-18 13:52:45 -07001221 device = adb.get_device() # pylint: disable=no-member
Josh Gao191c1542015-12-09 11:26:11 -08001222 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1223
Dan Albertdef4aae2017-05-18 13:52:45 -07001224 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1225 os.environ['ANDROID_PRODUCT_OUT'] = base_dir
Josh Gao191c1542015-12-09 11:26:11 -08001226 device.sync('data')
Dan Albertdef4aae2017-05-18 13:52:45 -07001227 if old_product_out is None:
1228 del os.environ['ANDROID_PRODUCT_OUT']
1229 else:
1230 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
Josh Gao191c1542015-12-09 11:26:11 -08001231
Dan Albert06b0d6b2017-05-18 22:56:48 -07001232 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
Josh Gao191c1542015-12-09 11:26:11 -08001233
Dan Albert06b0d6b2017-05-18 22:56:48 -07001234 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
Josh Gao191c1542015-12-09 11:26:11 -08001235 finally:
1236 if base_dir is not None:
1237 shutil.rmtree(base_dir)
1238
Dan Albert06b0d6b2017-05-18 22:56:48 -07001239 def test_push_sync(self):
1240 """Sync a host directory to a specific path."""
1241
1242 try:
1243 temp_dir = tempfile.mkdtemp()
1244 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1245
1246 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1247
1248 # Clean up any stale files on the device.
1249 device = adb.get_device() # pylint: disable=no-member
1250 device.shell(['rm', '-rf', device_dir])
1251
1252 device.push(temp_dir, device_dir, sync=True)
1253
1254 self.verify_sync(device, temp_files, device_dir)
1255
1256 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1257 finally:
1258 if temp_dir is not None:
1259 shutil.rmtree(temp_dir)
1260
Josh Gao191c1542015-12-09 11:26:11 -08001261 def test_unicode_paths(self):
1262 """Ensure that we can support non-ASCII paths, even on Windows."""
1263 name = u'로보카 폴리'
1264
1265 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1266 remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1267
1268 ## push.
1269 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1270 tf.close()
1271 self.device.push(tf.name, remote_path)
1272 os.remove(tf.name)
1273 self.assertFalse(os.path.exists(tf.name))
1274
1275 # Verify that the device ended up with the expected UTF-8 path
1276 output = self.device.shell(
1277 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
Josh Gao2c621ae2018-03-19 16:09:05 -07001278 self.assertEqual(remote_path, output)
Josh Gao191c1542015-12-09 11:26:11 -08001279
1280 # pull.
1281 self.device.pull(remote_path, tf.name)
1282 self.assertTrue(os.path.exists(tf.name))
1283 os.remove(tf.name)
1284 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1285
1286
Yabin Cuib5e11412017-03-10 16:01:01 -08001287class DeviceOfflineTest(DeviceTest):
1288 def _get_device_state(self, serialno):
1289 output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1290 for line in output.split('\n'):
1291 m = re.match('(\S+)\s+(\S+)', line)
1292 if m and m.group(1) == serialno:
1293 return m.group(2)
1294 return None
1295
Josh Gao33d14b82017-09-13 14:51:23 -07001296 def disabled_test_killed_when_pushing_a_large_file(self):
Yabin Cuib5e11412017-03-10 16:01:01 -08001297 """
1298 While running adb push with a large file, kill adb server.
1299 Occasionally the device becomes offline. Because the device is still
1300 reading data without realizing that the adb server has been restarted.
1301 Test if we can bring the device online automatically now.
1302 http://b/32952319
1303 """
1304 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1305 # 1. Push a large file
1306 file_path = 'tmp_large_file'
1307 try:
1308 fh = open(file_path, 'w')
1309 fh.write('\0' * (100 * 1024 * 1024))
1310 fh.close()
1311 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1312 time.sleep(0.1)
1313 # 2. Kill the adb server
1314 subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1315 subproc.terminate()
1316 finally:
1317 try:
1318 os.unlink(file_path)
1319 except:
1320 pass
1321 # 3. See if the device still exist.
1322 # Sleep to wait for the adb server exit.
1323 time.sleep(0.5)
1324 # 4. The device should be online
1325 self.assertEqual(self._get_device_state(serialno), 'device')
1326
Josh Gao33d14b82017-09-13 14:51:23 -07001327 def disabled_test_killed_when_pulling_a_large_file(self):
Yabin Cuib5e11412017-03-10 16:01:01 -08001328 """
1329 While running adb pull with a large file, kill adb server.
1330 Occasionally the device can't be connected. Because the device is trying to
1331 send a message larger than what is expected by the adb server.
1332 Test if we can bring the device online automatically now.
1333 """
1334 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1335 file_path = 'tmp_large_file'
1336 try:
1337 # 1. Create a large file on device.
1338 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1339 'bs=1000000', 'count=100'])
1340 # 2. Pull the large file on host.
1341 subproc = subprocess.Popen(self.device.adb_cmd +
1342 ['pull','/data/local/tmp/tmp_large_file', file_path])
1343 time.sleep(0.1)
1344 # 3. Kill the adb server
1345 subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1346 subproc.terminate()
1347 finally:
1348 try:
1349 os.unlink(file_path)
1350 except:
1351 pass
1352 # 4. See if the device still exist.
1353 # Sleep to wait for the adb server exit.
1354 time.sleep(0.5)
1355 self.assertEqual(self._get_device_state(serialno), 'device')
1356
1357
Josh Gaoef3d3432017-05-02 15:01:09 -07001358 def test_packet_size_regression(self):
1359 """Test for http://b/37783561
1360
1361 Receiving packets of a length divisible by 512 but not 1024 resulted in
1362 the adb client waiting indefinitely for more input.
1363 """
1364 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1365 # Probe some surrounding values as well, for the hell of it.
Josh Gao93dee962020-02-06 17:52:38 -08001366 for base in [512] + list(range(1024, 1024 * 16, 1024)):
Josh Gao2ea46522018-04-10 14:35:06 -07001367 for offset in [-6, -5, -4]:
1368 length = base + offset
1369 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1370 'echo', 'foo']
1371 rc, stdout, _ = self.device.shell_nocheck(cmd)
Josh Gaoef3d3432017-05-02 15:01:09 -07001372
Josh Gao2ea46522018-04-10 14:35:06 -07001373 self.assertEqual(0, rc)
Josh Gaoef3d3432017-05-02 15:01:09 -07001374
Josh Gao2ea46522018-04-10 14:35:06 -07001375 # Output should be '\0' * length, followed by "foo\n"
1376 self.assertEqual(length, len(stdout) - 4)
1377 self.assertEqual(stdout, "\0" * length + "foo\n")
Josh Gaoef3d3432017-05-02 15:01:09 -07001378
Josh Gao5d799cd2018-08-22 15:13:18 -07001379 def test_zero_packet(self):
1380 """Test for http://b/113070258
1381
1382 Make sure that we don't blow up when sending USB transfers that line up
1383 exactly with the USB packet size.
1384 """
1385
1386 local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1387 try:
1388 for size in [512, 1024]:
1389 def listener():
1390 cmd = ["echo foo | nc -l -p 12345; echo done"]
1391 rc, stdout, stderr = self.device.shell_nocheck(cmd)
1392
1393 thread = threading.Thread(target=listener)
1394 thread.start()
1395
1396 # Wait a bit to let the shell command start.
1397 time.sleep(0.25)
1398
1399 sock = socket.create_connection(("localhost", local_port))
1400 with contextlib.closing(sock):
Josh Gao93dee962020-02-06 17:52:38 -08001401 bytesWritten = sock.send(b"a" * size)
Josh Gao5d799cd2018-08-22 15:13:18 -07001402 self.assertEqual(size, bytesWritten)
1403 readBytes = sock.recv(4096)
Josh Gao93dee962020-02-06 17:52:38 -08001404 self.assertEqual(b"foo\n", readBytes)
Josh Gao5d799cd2018-08-22 15:13:18 -07001405
1406 thread.join()
1407 finally:
1408 self.device.forward_remove("tcp:{}".format(local_port))
1409
Josh Gaoef3d3432017-05-02 15:01:09 -07001410
Josh Gao74b7ec72019-01-11 14:42:08 -08001411class SocketTest(DeviceTest):
1412 def test_socket_flush(self):
1413 """Test that we handle socket closure properly.
1414
1415 If we're done writing to a socket, closing before the other end has
1416 closed will send a TCP_RST if we have incoming data queued up, which
1417 may result in data that we've written being discarded.
1418
1419 Bug: http://b/74616284
1420 """
1421 s = socket.create_connection(("localhost", 5037))
1422
1423 def adb_length_prefixed(string):
1424 encoded = string.encode("utf8")
1425 result = b"%04x%s" % (len(encoded), encoded)
1426 return result
1427
1428 if "ANDROID_SERIAL" in os.environ:
1429 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1430 else:
1431 transport_string = "host:transport-any"
1432
1433 s.sendall(adb_length_prefixed(transport_string))
1434 response = s.recv(4)
Josh Gao93dee962020-02-06 17:52:38 -08001435 self.assertEqual(b"OKAY", response)
Josh Gao74b7ec72019-01-11 14:42:08 -08001436
1437 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1438 s.sendall(adb_length_prefixed(shell_string))
1439
1440 response = s.recv(4)
Josh Gao93dee962020-02-06 17:52:38 -08001441 self.assertEqual(b"OKAY", response)
Josh Gao74b7ec72019-01-11 14:42:08 -08001442
1443 # Spawn a thread that dumps garbage into the socket until failure.
1444 def spam():
1445 buf = b"\0" * 16384
1446 try:
1447 while True:
1448 s.sendall(buf)
1449 except Exception as ex:
1450 print(ex)
1451
1452 thread = threading.Thread(target=spam)
1453 thread.start()
1454
1455 time.sleep(1)
1456
1457 received = b""
1458 while True:
1459 read = s.recv(512)
1460 if len(read) == 0:
1461 break
1462 received += read
1463
Josh Gao93dee962020-02-06 17:52:38 -08001464 self.assertEqual(1024 * 1024 + len("foo\n"), len(received))
Josh Gao74b7ec72019-01-11 14:42:08 -08001465 thread.join()
1466
1467
Spencer Low69d8c392018-08-11 00:16:16 -07001468if sys.platform == "win32":
1469 # From https://stackoverflow.com/a/38749458
1470 import os
1471 import contextlib
1472 import msvcrt
1473 import ctypes
1474 from ctypes import wintypes
1475
1476 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1477
1478 GENERIC_READ = 0x80000000
1479 GENERIC_WRITE = 0x40000000
1480 FILE_SHARE_READ = 1
1481 FILE_SHARE_WRITE = 2
1482 CONSOLE_TEXTMODE_BUFFER = 1
1483 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1484 STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1485 STD_ERROR_HANDLE = wintypes.DWORD(-12)
1486
1487 def _check_zero(result, func, args):
1488 if not result:
1489 raise ctypes.WinError(ctypes.get_last_error())
1490 return args
1491
1492 def _check_invalid(result, func, args):
1493 if result == INVALID_HANDLE_VALUE:
1494 raise ctypes.WinError(ctypes.get_last_error())
1495 return args
1496
1497 if not hasattr(wintypes, 'LPDWORD'): # Python 2
1498 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1499 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1500
1501 class COORD(ctypes.Structure):
1502 _fields_ = (('X', wintypes.SHORT),
1503 ('Y', wintypes.SHORT))
1504
1505 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1506 _fields_ = (('cbSize', wintypes.ULONG),
1507 ('dwSize', COORD),
1508 ('dwCursorPosition', COORD),
1509 ('wAttributes', wintypes.WORD),
1510 ('srWindow', wintypes.SMALL_RECT),
1511 ('dwMaximumWindowSize', COORD),
1512 ('wPopupAttributes', wintypes.WORD),
1513 ('bFullscreenSupported', wintypes.BOOL),
1514 ('ColorTable', wintypes.DWORD * 16))
1515 def __init__(self, *args, **kwds):
1516 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1517 *args, **kwds)
1518 self.cbSize = ctypes.sizeof(self)
1519
1520 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1521 CONSOLE_SCREEN_BUFFER_INFOEX)
1522 LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1523
1524 kernel32.GetStdHandle.errcheck = _check_invalid
1525 kernel32.GetStdHandle.restype = wintypes.HANDLE
1526 kernel32.GetStdHandle.argtypes = (
1527 wintypes.DWORD,) # _In_ nStdHandle
1528
1529 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1530 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1531 kernel32.CreateConsoleScreenBuffer.argtypes = (
1532 wintypes.DWORD, # _In_ dwDesiredAccess
1533 wintypes.DWORD, # _In_ dwShareMode
1534 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes
1535 wintypes.DWORD, # _In_ dwFlags
1536 wintypes.LPVOID) # _Reserved_ lpScreenBufferData
1537
1538 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1539 kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1540 wintypes.HANDLE, # _In_ hConsoleOutput
1541 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1542
1543 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1544 kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1545 wintypes.HANDLE, # _In_ hConsoleOutput
1546 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo
1547
1548 kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1549 kernel32.SetConsoleWindowInfo.argtypes = (
1550 wintypes.HANDLE, # _In_ hConsoleOutput
1551 wintypes.BOOL, # _In_ bAbsolute
1552 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1553
1554 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1555 kernel32.FillConsoleOutputCharacterW.argtypes = (
1556 wintypes.HANDLE, # _In_ hConsoleOutput
1557 wintypes.WCHAR, # _In_ cCharacter
1558 wintypes.DWORD, # _In_ nLength
1559 COORD, # _In_ dwWriteCoord
1560 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1561
1562 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1563 kernel32.ReadConsoleOutputCharacterW.argtypes = (
1564 wintypes.HANDLE, # _In_ hConsoleOutput
1565 wintypes.LPWSTR, # _Out_ lpCharacter
1566 wintypes.DWORD, # _In_ nLength
1567 COORD, # _In_ dwReadCoord
1568 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1569
1570 @contextlib.contextmanager
1571 def allocate_console():
1572 allocated = kernel32.AllocConsole()
1573 try:
1574 yield allocated
1575 finally:
1576 if allocated:
1577 kernel32.FreeConsole()
1578
1579 @contextlib.contextmanager
1580 def console_screen(ncols=None, nrows=None):
1581 info = CONSOLE_SCREEN_BUFFER_INFOEX()
1582 new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1583 nwritten = (wintypes.DWORD * 1)()
1584 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1585 kernel32.GetConsoleScreenBufferInfoEx(
1586 hStdOut, ctypes.byref(info))
1587 if ncols is None:
1588 ncols = info.dwSize.X
1589 if nrows is None:
1590 nrows = info.dwSize.Y
1591 elif nrows > 9999:
1592 raise ValueError('nrows must be 9999 or less')
1593 fd_screen = None
1594 hScreen = kernel32.CreateConsoleScreenBuffer(
1595 GENERIC_READ | GENERIC_WRITE,
1596 FILE_SHARE_READ | FILE_SHARE_WRITE,
1597 None, CONSOLE_TEXTMODE_BUFFER, None)
1598 try:
1599 fd_screen = msvcrt.open_osfhandle(
1600 hScreen, os.O_RDWR | os.O_BINARY)
1601 kernel32.GetConsoleScreenBufferInfoEx(
1602 hScreen, ctypes.byref(new_info))
1603 new_info.dwSize = COORD(ncols, nrows)
1604 new_info.srWindow = wintypes.SMALL_RECT(
1605 Left=0, Top=0, Right=(ncols - 1),
1606 Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1607 kernel32.SetConsoleScreenBufferInfoEx(
1608 hScreen, ctypes.byref(new_info))
1609 kernel32.SetConsoleWindowInfo(hScreen, True,
1610 ctypes.byref(new_info.srWindow))
1611 kernel32.FillConsoleOutputCharacterW(
1612 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1613 kernel32.SetConsoleActiveScreenBuffer(hScreen)
1614 try:
1615 yield fd_screen
1616 finally:
1617 kernel32.SetConsoleScreenBufferInfoEx(
1618 hStdOut, ctypes.byref(info))
1619 kernel32.SetConsoleWindowInfo(hStdOut, True,
1620 ctypes.byref(info.srWindow))
1621 kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1622 finally:
1623 if fd_screen is not None:
1624 os.close(fd_screen)
1625 else:
1626 kernel32.CloseHandle(hScreen)
1627
1628 def read_screen(fd):
1629 hScreen = msvcrt.get_osfhandle(fd)
1630 csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1631 kernel32.GetConsoleScreenBufferInfoEx(
1632 hScreen, ctypes.byref(csbi))
1633 ncols = csbi.dwSize.X
1634 pos = csbi.dwCursorPosition
1635 length = ncols * pos.Y + pos.X + 1
1636 buf = (ctypes.c_wchar * length)()
1637 n = (wintypes.DWORD * 1)()
1638 kernel32.ReadConsoleOutputCharacterW(
1639 hScreen, buf, length, COORD(0,0), n)
1640 lines = [buf[i:i+ncols].rstrip(u'\0')
1641 for i in range(0, n[0], ncols)]
1642 return u'\n'.join(lines)
1643
1644@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1645class WindowsConsoleTest(DeviceTest):
1646 def test_unicode_output(self):
1647 """Test Unicode command line parameters and Unicode console window output.
1648
1649 Bug: https://issuetracker.google.com/issues/111972753
1650 """
1651 # If we don't have a console window, allocate one. This isn't necessary if we're already
1652 # being run from a console window, which is typical.
1653 with allocate_console() as allocated_console:
1654 # Create a temporary console buffer and switch to it. We could also pass a parameter of
1655 # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1656 # likely unnecessary given the typical console window size.
1657 with console_screen(nrows=1000) as screen:
1658 unicode_string = u'로보카 폴리'
1659 # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1660 # device.shell_popen() which does not use a pipe, unlike device.shell().
1661 process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1662 process.wait()
1663 # Read what was written by adb to the temporary console buffer.
1664 console_output = read_screen(screen)
1665 self.assertEqual(unicode_string, console_output)
1666
1667
Josh Gao191c1542015-12-09 11:26:11 -08001668def main():
1669 random.seed(0)
1670 if len(adb.get_devices()) > 0:
1671 suite = unittest.TestLoader().loadTestsFromName(__name__)
1672 unittest.TextTestRunner(verbosity=3).run(suite)
1673 else:
1674 print('Test suite must be run with attached devices')
1675
1676
1677if __name__ == '__main__':
1678 main()