blob: 1568e2345df5a4e66395f7ae60101c063a9e7be1 [file] [log] [blame]
Josh Gao93dee962020-02-06 17:52:38 -08001#!/usr/bin/env python3
Josh Gao191c1542015-12-09 11:26:11 -08002# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
Josh Gaoc970aef2018-03-19 15:35:11 -070034import threading
Josh Gaofe50bb72016-06-22 18:27:22 -070035import time
Josh Gao191c1542015-12-09 11:26:11 -080036import unittest
37
Josh Gao74b7ec72019-01-11 14:42:08 -080038from datetime import datetime
39
Josh Gao191c1542015-12-09 11:26:11 -080040import adb
41
Josh Gao191c1542015-12-09 11:26:11 -080042def requires_root(func):
43 def wrapper(self, *args):
44 if self.device.get_prop('ro.debuggable') != '1':
45 raise unittest.SkipTest('requires rootable build')
46
47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48 if not was_root:
49 self.device.root()
50 self.device.wait()
51
52 try:
53 func(self, *args)
54 finally:
55 if not was_root:
56 self.device.unroot()
57 self.device.wait()
58
59 return wrapper
60
61
62def requires_non_root(func):
63 def wrapper(self, *args):
64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65 if was_root:
66 self.device.unroot()
67 self.device.wait()
68
69 try:
70 func(self, *args)
71 finally:
72 if was_root:
73 self.device.root()
74 self.device.wait()
75
76 return wrapper
77
78
Josh Gao191c1542015-12-09 11:26:11 -080079class DeviceTest(unittest.TestCase):
80 def setUp(self):
81 self.device = adb.get_device()
82
83
84class ForwardReverseTest(DeviceTest):
85 def _test_no_rebind(self, description, direction_list, direction,
86 direction_no_rebind, direction_remove_all):
87 msg = direction_list()
88 self.assertEqual('', msg.strip(),
89 description + ' list must be empty to run this test.')
90
91 # Use --no-rebind with no existing binding
92 direction_no_rebind('tcp:5566', 'tcp:6655')
93 msg = direction_list()
94 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
95
96 # Use --no-rebind with existing binding
97 with self.assertRaises(subprocess.CalledProcessError):
98 direction_no_rebind('tcp:5566', 'tcp:6677')
99 msg = direction_list()
100 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
101 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
102
103 # Use the absence of --no-rebind with existing binding
104 direction('tcp:5566', 'tcp:6677')
105 msg = direction_list()
106 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
107 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
108
109 direction_remove_all()
110 msg = direction_list()
111 self.assertEqual('', msg.strip())
112
113 def test_forward_no_rebind(self):
114 self._test_no_rebind('forward', self.device.forward_list,
115 self.device.forward, self.device.forward_no_rebind,
116 self.device.forward_remove_all)
117
118 def test_reverse_no_rebind(self):
119 self._test_no_rebind('reverse', self.device.reverse_list,
120 self.device.reverse, self.device.reverse_no_rebind,
121 self.device.reverse_remove_all)
122
123 def test_forward(self):
124 msg = self.device.forward_list()
125 self.assertEqual('', msg.strip(),
126 'Forwarding list must be empty to run this test.')
127 self.device.forward('tcp:5566', 'tcp:6655')
128 msg = self.device.forward_list()
129 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
130 self.device.forward('tcp:7788', 'tcp:8877')
131 msg = self.device.forward_list()
132 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
133 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
134 self.device.forward_remove('tcp:5566')
135 msg = self.device.forward_list()
136 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
137 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
138 self.device.forward_remove_all()
139 msg = self.device.forward_list()
140 self.assertEqual('', msg.strip())
141
Josh Gao727b07b2019-09-13 00:12:26 +0800142 def test_forward_old_protocol(self):
143 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
144
145 msg = self.device.forward_list()
146 self.assertEqual('', msg.strip(),
147 'Forwarding list must be empty to run this test.')
148
149 s = socket.create_connection(("localhost", 5037))
150 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
151 cmd = b"%04x%s" % (len(service), service)
152 s.sendall(cmd)
153
154 msg = self.device.forward_list()
155 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
156
157 self.device.forward_remove_all()
158 msg = self.device.forward_list()
159 self.assertEqual('', msg.strip())
160
David Purselleaae97e2016-04-07 11:25:48 -0700161 def test_forward_tcp_port_0(self):
162 self.assertEqual('', self.device.forward_list().strip(),
163 'Forwarding list must be empty to run this test.')
164
165 try:
166 # If resolving TCP port 0 is supported, `adb forward` will print
167 # the actual port number.
168 port = self.device.forward('tcp:0', 'tcp:8888').strip()
169 if not port:
170 raise unittest.SkipTest('Forwarding tcp:0 is not available.')
171
172 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
173 self.device.forward_list()))
174 finally:
175 self.device.forward_remove_all()
176
Josh Gao191c1542015-12-09 11:26:11 -0800177 def test_reverse(self):
178 msg = self.device.reverse_list()
179 self.assertEqual('', msg.strip(),
180 'Reverse forwarding list must be empty to run this test.')
181 self.device.reverse('tcp:5566', 'tcp:6655')
182 msg = self.device.reverse_list()
183 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
184 self.device.reverse('tcp:7788', 'tcp:8877')
185 msg = self.device.reverse_list()
186 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
187 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
188 self.device.reverse_remove('tcp:5566')
189 msg = self.device.reverse_list()
190 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
191 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
192 self.device.reverse_remove_all()
193 msg = self.device.reverse_list()
194 self.assertEqual('', msg.strip())
195
David Purselleaae97e2016-04-07 11:25:48 -0700196 def test_reverse_tcp_port_0(self):
197 self.assertEqual('', self.device.reverse_list().strip(),
198 'Reverse list must be empty to run this test.')
199
200 try:
201 # If resolving TCP port 0 is supported, `adb reverse` will print
202 # the actual port number.
203 port = self.device.reverse('tcp:0', 'tcp:8888').strip()
204 if not port:
205 raise unittest.SkipTest('Reversing tcp:0 is not available.')
206
207 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
208 self.device.reverse_list()))
209 finally:
210 self.device.reverse_remove_all()
211
Josh Gao191c1542015-12-09 11:26:11 -0800212 def test_forward_reverse_echo(self):
213 """Send data through adb forward and read it back via adb reverse"""
214 forward_port = 12345
215 reverse_port = forward_port + 1
Josh Gao255c5c82016-03-03 14:49:02 -0800216 forward_spec = 'tcp:' + str(forward_port)
217 reverse_spec = 'tcp:' + str(reverse_port)
Josh Gao191c1542015-12-09 11:26:11 -0800218 forward_setup = False
219 reverse_setup = False
220
221 try:
222 # listen on localhost:forward_port, connect to remote:forward_port
223 self.device.forward(forward_spec, forward_spec)
224 forward_setup = True
225 # listen on remote:forward_port, connect to localhost:reverse_port
226 self.device.reverse(forward_spec, reverse_spec)
227 reverse_setup = True
228
229 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
230 with contextlib.closing(listener):
231 # Use SO_REUSEADDR so that subsequent runs of the test can grab
232 # the port even if it is in TIME_WAIT.
233 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
234
235 # Listen on localhost:reverse_port before connecting to
236 # localhost:forward_port because that will cause adb to connect
237 # back to localhost:reverse_port.
238 listener.bind(('127.0.0.1', reverse_port))
239 listener.listen(4)
240
241 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
242 with contextlib.closing(client):
243 # Connect to the listener.
244 client.connect(('127.0.0.1', forward_port))
245
246 # Accept the client connection.
247 accepted_connection, addr = listener.accept()
248 with contextlib.closing(accepted_connection) as server:
Josh Gao93dee962020-02-06 17:52:38 -0800249 data = b'hello'
Josh Gao191c1542015-12-09 11:26:11 -0800250
251 # Send data into the port setup by adb forward.
252 client.sendall(data)
253 # Explicitly close() so that server gets EOF.
254 client.close()
255
256 # Verify that the data came back via adb reverse.
Josh Gao93dee962020-02-06 17:52:38 -0800257 self.assertEqual(data, server.makefile().read().encode("utf8"))
Josh Gao191c1542015-12-09 11:26:11 -0800258 finally:
259 if reverse_setup:
260 self.device.reverse_remove(forward_spec)
261 if forward_setup:
262 self.device.forward_remove(forward_spec)
263
264
265class ShellTest(DeviceTest):
266 def _interactive_shell(self, shell_args, input):
267 """Runs an interactive adb shell.
268
269 Args:
270 shell_args: List of string arguments to `adb shell`.
Josh Gao93dee962020-02-06 17:52:38 -0800271 input: bytes input to send to the interactive shell.
Josh Gao191c1542015-12-09 11:26:11 -0800272
273 Returns:
274 The remote exit code.
275
276 Raises:
277 unittest.SkipTest: The device doesn't support exit codes.
278 """
David Pursellcf467412016-04-26 13:25:57 -0700279 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800280 raise unittest.SkipTest('exit codes are unavailable on this device')
281
282 proc = subprocess.Popen(
283 self.device.adb_cmd + ['shell'] + shell_args,
284 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
285 stderr=subprocess.PIPE)
286 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
287 # to explicitly add an exit command to close the session from the device
288 # side, plus the necessary newline to complete the interactive command.
Josh Gao93dee962020-02-06 17:52:38 -0800289 proc.communicate(input + b'; exit\n')
Josh Gao191c1542015-12-09 11:26:11 -0800290 return proc.returncode
291
292 def test_cat(self):
293 """Check that we can at least cat a file."""
294 out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
295 elements = out.split()
296 self.assertEqual(len(elements), 2)
297
298 uptime, idle = elements
299 self.assertGreater(float(uptime), 0.0)
300 self.assertGreater(float(idle), 0.0)
301
302 def test_throws_on_failure(self):
303 self.assertRaises(adb.ShellError, self.device.shell, ['false'])
304
305 def test_output_not_stripped(self):
306 out = self.device.shell(['echo', 'foo'])[0]
307 self.assertEqual(out, 'foo' + self.device.linesep)
308
Josh Gaoa019f782017-06-16 15:34:34 -0700309 def test_shell_command_length(self):
310 # Devices that have shell_v2 should be able to handle long commands.
311 if self.device.has_shell_protocol():
312 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
313 self.assertEqual(rc, 0)
314 self.assertTrue(out == ('x' * 16384 + '\n'))
315
Josh Gao191c1542015-12-09 11:26:11 -0800316 def test_shell_nocheck_failure(self):
317 rc, out, _ = self.device.shell_nocheck(['false'])
318 self.assertNotEqual(rc, 0)
319 self.assertEqual(out, '')
320
321 def test_shell_nocheck_output_not_stripped(self):
322 rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
323 self.assertEqual(rc, 0)
324 self.assertEqual(out, 'foo' + self.device.linesep)
325
326 def test_can_distinguish_tricky_results(self):
327 # If result checking on ADB shell is naively implemented as
328 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
329 # output from the result for a cmd of `echo -n 1`.
330 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
331 self.assertEqual(rc, 0)
332 self.assertEqual(out, '1')
333
334 def test_line_endings(self):
335 """Ensure that line ending translation is not happening in the pty.
336
337 Bug: http://b/19735063
338 """
339 output = self.device.shell(['uname'])[0]
340 self.assertEqual(output, 'Linux' + self.device.linesep)
341
342 def test_pty_logic(self):
343 """Tests that a PTY is allocated when it should be.
344
Elliott Hughescabfa112016-10-19 14:47:11 -0700345 PTY allocation behavior should match ssh.
Josh Gao191c1542015-12-09 11:26:11 -0800346 """
Josh Gao191c1542015-12-09 11:26:11 -0800347 def check_pty(args):
348 """Checks adb shell PTY allocation.
349
350 Tests |args| for terminal and non-terminal stdin.
351
352 Args:
353 args: -Tt args in a list (e.g. ['-t', '-t']).
354
355 Returns:
356 A tuple (<terminal>, <non-terminal>). True indicates
357 the corresponding shell allocated a remote PTY.
358 """
359 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
360
361 terminal = subprocess.Popen(
362 test_cmd, stdin=None,
363 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
364 terminal.communicate()
365
366 non_terminal = subprocess.Popen(
367 test_cmd, stdin=subprocess.PIPE,
368 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
369 non_terminal.communicate()
370
371 return (terminal.returncode == 0, non_terminal.returncode == 0)
372
373 # -T: never allocate PTY.
374 self.assertEqual((False, False), check_pty(['-T']))
375
Elliott Hughescabfa112016-10-19 14:47:11 -0700376 # These tests require a new device.
377 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
378 # No args: PTY only if stdin is a terminal and shell is interactive,
379 # which is difficult to reliably test from a script.
380 self.assertEqual((False, False), check_pty([]))
Josh Gao191c1542015-12-09 11:26:11 -0800381
Elliott Hughescabfa112016-10-19 14:47:11 -0700382 # -t: PTY if stdin is a terminal.
383 self.assertEqual((True, False), check_pty(['-t']))
Josh Gao191c1542015-12-09 11:26:11 -0800384
385 # -t -t: always allocate PTY.
386 self.assertEqual((True, True), check_pty(['-t', '-t']))
387
Elliott Hughescabfa112016-10-19 14:47:11 -0700388 # -tt: always allocate PTY, POSIX style (http://b/32216152).
389 self.assertEqual((True, True), check_pty(['-tt']))
390
391 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
392 # we follow the man page instead.
393 self.assertEqual((True, True), check_pty(['-ttt']))
394
395 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
396 self.assertEqual((True, True), check_pty(['-ttx']))
397
398 # -Ttt: -tt cancels out -T.
399 self.assertEqual((True, True), check_pty(['-Ttt']))
400
401 # -ttT: -T cancels out -tt.
402 self.assertEqual((False, False), check_pty(['-ttT']))
403
Josh Gao191c1542015-12-09 11:26:11 -0800404 def test_shell_protocol(self):
405 """Tests the shell protocol on the device.
406
407 If the device supports shell protocol, this gives us the ability
408 to separate stdout/stderr and return the exit code directly.
409
410 Bug: http://b/19734861
411 """
David Pursellcf467412016-04-26 13:25:57 -0700412 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800413 raise unittest.SkipTest('shell protocol unsupported on this device')
414
415 # Shell protocol should be used by default.
416 result = self.device.shell_nocheck(
417 shlex.split('echo foo; echo bar >&2; exit 17'))
418 self.assertEqual(17, result[0])
419 self.assertEqual('foo' + self.device.linesep, result[1])
420 self.assertEqual('bar' + self.device.linesep, result[2])
421
Josh Gao93dee962020-02-06 17:52:38 -0800422 self.assertEqual(17, self._interactive_shell([], b'exit 17'))
Josh Gao191c1542015-12-09 11:26:11 -0800423
424 # -x flag should disable shell protocol.
425 result = self.device.shell_nocheck(
426 shlex.split('-x echo foo; echo bar >&2; exit 17'))
427 self.assertEqual(0, result[0])
428 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
429 self.assertEqual('', result[2])
430
Josh Gao93dee962020-02-06 17:52:38 -0800431 self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17'))
Josh Gao191c1542015-12-09 11:26:11 -0800432
433 def test_non_interactive_sigint(self):
434 """Tests that SIGINT in a non-interactive shell kills the process.
435
436 This requires the shell protocol in order to detect the broken
437 pipe; raw data transfer mode will only see the break once the
438 subprocess tries to read or write.
439
440 Bug: http://b/23825725
441 """
David Pursellcf467412016-04-26 13:25:57 -0700442 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800443 raise unittest.SkipTest('shell protocol unsupported on this device')
444
445 # Start a long-running process.
446 sleep_proc = subprocess.Popen(
447 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
448 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
449 stderr=subprocess.STDOUT)
Josh Gao93dee962020-02-06 17:52:38 -0800450 remote_pid = sleep_proc.stdout.readline().strip().decode("utf8")
Josh Gao191c1542015-12-09 11:26:11 -0800451 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
452 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
453
454 # Verify that the process is running, send signal, verify it stopped.
455 self.device.shell(proc_query)
456 os.kill(sleep_proc.pid, signal.SIGINT)
457 sleep_proc.communicate()
Josh Gaoe76b9f32016-10-21 12:40:42 -0700458
459 # It can take some time for the process to receive the signal and die.
460 end_time = time.time() + 3
461 while self.device.shell_nocheck(proc_query)[0] != 1:
462 self.assertFalse(time.time() > end_time,
463 'subprocess failed to terminate in time')
Josh Gao191c1542015-12-09 11:26:11 -0800464
465 def test_non_interactive_stdin(self):
466 """Tests that non-interactive shells send stdin."""
David Pursellcf467412016-04-26 13:25:57 -0700467 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800468 raise unittest.SkipTest('non-interactive stdin unsupported '
469 'on this device')
470
471 # Test both small and large inputs.
Josh Gao93dee962020-02-06 17:52:38 -0800472 small_input = b'foo'
473 characters = [c.encode("utf8") for c in string.ascii_letters + string.digits]
474 large_input = b'\n'.join(characters)
475
Josh Gao191c1542015-12-09 11:26:11 -0800476
477 for input in (small_input, large_input):
478 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
479 stdin=subprocess.PIPE,
480 stdout=subprocess.PIPE,
481 stderr=subprocess.PIPE)
482 stdout, stderr = proc.communicate(input)
483 self.assertEqual(input.splitlines(), stdout.splitlines())
Josh Gao93dee962020-02-06 17:52:38 -0800484 self.assertEqual(b'', stderr)
Josh Gao191c1542015-12-09 11:26:11 -0800485
Josh Gaofe50bb72016-06-22 18:27:22 -0700486 def test_sighup(self):
487 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
488 log_path = "/data/local/tmp/adb_signal_test.log"
489
490 # Clear the output file.
491 self.device.shell_nocheck(["echo", ">", log_path])
492
493 script = """
494 trap "echo SIGINT > {path}; exit 0" SIGINT
495 trap "echo SIGHUP > {path}; exit 0" SIGHUP
496 echo Waiting
Josh Gao470622f2016-10-21 13:17:32 -0700497 read
Josh Gaofe50bb72016-06-22 18:27:22 -0700498 """.format(path=log_path)
499
500 script = ";".join([x.strip() for x in script.strip().splitlines()])
501
Josh Gao470622f2016-10-21 13:17:32 -0700502 process = self.device.shell_popen([script], kill_atexit=False,
503 stdin=subprocess.PIPE,
504 stdout=subprocess.PIPE)
Josh Gaofe50bb72016-06-22 18:27:22 -0700505
Josh Gao93dee962020-02-06 17:52:38 -0800506 self.assertEqual(b"Waiting\n", process.stdout.readline())
Josh Gaofe50bb72016-06-22 18:27:22 -0700507 process.send_signal(signal.SIGINT)
508 process.wait()
509
510 # Waiting for the local adb to finish is insufficient, since it hangs
511 # up immediately.
Josh Gao470622f2016-10-21 13:17:32 -0700512 time.sleep(1)
Josh Gaofe50bb72016-06-22 18:27:22 -0700513
514 stdout, _ = self.device.shell(["cat", log_path])
515 self.assertEqual(stdout.strip(), "SIGHUP")
516
Josh Gaoc970aef2018-03-19 15:35:11 -0700517 def test_exit_stress(self):
518 """Hammer `adb shell exit 42` with multiple threads."""
519 thread_count = 48
520 result = dict()
521 def hammer(thread_idx, thread_count, result):
522 success = True
523 for i in range(thread_idx, 240, thread_count):
524 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
525 if ret != i % 256:
526 success = False
527 break
528 result[thread_idx] = success
529
530 threads = []
531 for i in range(thread_count):
532 thread = threading.Thread(target=hammer, args=(i, thread_count, result))
533 thread.start()
534 threads.append(thread)
535 for thread in threads:
536 thread.join()
Josh Gao93dee962020-02-06 17:52:38 -0800537 for i, success in result.items():
Josh Gaoc970aef2018-03-19 15:35:11 -0700538 self.assertTrue(success)
539
Josh Gao63da8e62019-12-16 17:13:51 -0800540 def disabled_test_parallel(self):
541 """Spawn a bunch of `adb shell` instances in parallel.
542
543 This was broken historically due to the use of select, which only works
544 for fds that are numerically less than 1024.
545
546 Bug: http://b/141955761"""
547
548 n_procs = 2048
549 procs = dict()
Josh Gao93dee962020-02-06 17:52:38 -0800550 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800551 procs[i] = subprocess.Popen(
552 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
553 stdin=subprocess.PIPE,
554 stdout=subprocess.PIPE
555 )
556
Josh Gao93dee962020-02-06 17:52:38 -0800557 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800558 procs[i].stdin.write("%d\n" % i)
559
Josh Gao93dee962020-02-06 17:52:38 -0800560 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800561 response = procs[i].stdout.readline()
562 assert(response == "%d\n" % i)
563
Josh Gao93dee962020-02-06 17:52:38 -0800564 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800565 procs[i].stdin.write("%d\n" % (i % 256))
566
Josh Gao93dee962020-02-06 17:52:38 -0800567 for i in range(0, n_procs):
Josh Gao63da8e62019-12-16 17:13:51 -0800568 assert(procs[i].wait() == i % 256)
569
Josh Gao191c1542015-12-09 11:26:11 -0800570
571class ArgumentEscapingTest(DeviceTest):
572 def test_shell_escaping(self):
573 """Make sure that argument escaping is somewhat sane."""
574
575 # http://b/19734868
576 # Note that this actually matches ssh(1)'s behavior --- it's
577 # converted to `sh -c echo hello; echo world` which sh interprets
578 # as `sh -c echo` (with an argument to that shell of "hello"),
579 # and then `echo world` back in the first shell.
580 result = self.device.shell(
581 shlex.split("sh -c 'echo hello; echo world'"))[0]
582 result = result.splitlines()
583 self.assertEqual(['', 'world'], result)
584 # If you really wanted "hello" and "world", here's what you'd do:
585 result = self.device.shell(
586 shlex.split(r'echo hello\;echo world'))[0].splitlines()
587 self.assertEqual(['hello', 'world'], result)
588
589 # http://b/15479704
590 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
591 self.assertEqual('t', result)
592 result = self.device.shell(
593 shlex.split("sh -c 'true && echo t'"))[0].strip()
594 self.assertEqual('t', result)
595
596 # http://b/20564385
597 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
598 self.assertEqual('t', result)
599 result = self.device.shell(
600 shlex.split(r'echo -n 123\;uname'))[0].strip()
601 self.assertEqual('123Linux', result)
602
603 def test_install_argument_escaping(self):
604 """Make sure that install argument escaping works."""
605 # http://b/20323053, http://b/3090932.
Josh Gao93dee962020-02-06 17:52:38 -0800606 for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"):
Josh Gao191c1542015-12-09 11:26:11 -0800607 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
608 delete=False)
609 tf.close()
610
611 # Installing bogus .apks fails if the device supports exit codes.
612 try:
Josh Gao93dee962020-02-06 17:52:38 -0800613 output = self.device.install(tf.name.decode("utf8"))
Josh Gao191c1542015-12-09 11:26:11 -0800614 except subprocess.CalledProcessError as e:
615 output = e.output
616
617 self.assertIn(file_suffix, output)
618 os.remove(tf.name)
619
620
621class RootUnrootTest(DeviceTest):
622 def _test_root(self):
623 message = self.device.root()
624 if 'adbd cannot run as root in production builds' in message:
625 return
626 self.device.wait()
627 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
628
629 def _test_unroot(self):
630 self.device.unroot()
631 self.device.wait()
632 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
633
634 def test_root_unroot(self):
635 """Make sure that adb root and adb unroot work, using id(1)."""
636 if self.device.get_prop('ro.debuggable') != '1':
637 raise unittest.SkipTest('requires rootable build')
638
639 original_user = self.device.shell(['id', '-un'])[0].strip()
640 try:
641 if original_user == 'root':
642 self._test_unroot()
643 self._test_root()
644 elif original_user == 'shell':
645 self._test_root()
646 self._test_unroot()
647 finally:
648 if original_user == 'root':
649 self.device.root()
650 else:
651 self.device.unroot()
652 self.device.wait()
653
654
655class TcpIpTest(DeviceTest):
656 def test_tcpip_failure_raises(self):
657 """adb tcpip requires a port.
658
659 Bug: http://b/22636927
660 """
661 self.assertRaises(
662 subprocess.CalledProcessError, self.device.tcpip, '')
663 self.assertRaises(
664 subprocess.CalledProcessError, self.device.tcpip, 'foo')
665
666
667class SystemPropertiesTest(DeviceTest):
668 def test_get_prop(self):
669 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
670
671 @requires_root
672 def test_set_prop(self):
673 prop_name = 'foo.bar'
674 self.device.shell(['setprop', prop_name, '""'])
675
676 self.device.set_prop(prop_name, 'qux')
677 self.assertEqual(
678 self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
679
680
681def compute_md5(string):
682 hsh = hashlib.md5()
683 hsh.update(string)
684 return hsh.hexdigest()
685
686
687def get_md5_prog(device):
688 """Older platforms (pre-L) had the name md5 rather than md5sum."""
689 try:
690 device.shell(['md5sum', '/proc/uptime'])
691 return 'md5sum'
692 except adb.ShellError:
693 return 'md5'
694
695
696class HostFile(object):
697 def __init__(self, handle, checksum):
698 self.handle = handle
699 self.checksum = checksum
700 self.full_path = handle.name
701 self.base_name = os.path.basename(self.full_path)
702
703
704class DeviceFile(object):
705 def __init__(self, checksum, full_path):
706 self.checksum = checksum
707 self.full_path = full_path
708 self.base_name = posixpath.basename(self.full_path)
709
710
711def make_random_host_files(in_dir, num_files):
712 min_size = 1 * (1 << 10)
713 max_size = 16 * (1 << 10)
714
715 files = []
Josh Gao93dee962020-02-06 17:52:38 -0800716 for _ in range(num_files):
Josh Gao191c1542015-12-09 11:26:11 -0800717 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
718
719 size = random.randrange(min_size, max_size, 1024)
720 rand_str = os.urandom(size)
721 file_handle.write(rand_str)
722 file_handle.flush()
723 file_handle.close()
724
725 md5 = compute_md5(rand_str)
726 files.append(HostFile(file_handle, md5))
727 return files
728
729
730def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
731 min_size = 1 * (1 << 10)
732 max_size = 16 * (1 << 10)
733
734 files = []
Josh Gao93dee962020-02-06 17:52:38 -0800735 for file_num in range(num_files):
Josh Gao191c1542015-12-09 11:26:11 -0800736 size = random.randrange(min_size, max_size, 1024)
737
738 base_name = prefix + str(file_num)
739 full_path = posixpath.join(in_dir, base_name)
740
741 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
742 'bs={}'.format(size), 'count=1'])
743 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
744
745 files.append(DeviceFile(dev_md5, full_path))
746 return files
747
748
749class FileOperationsTest(DeviceTest):
750 SCRATCH_DIR = '/data/local/tmp'
751 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
752 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
753
754 def _verify_remote(self, checksum, remote_path):
755 dev_md5, _ = self.device.shell([get_md5_prog(self.device),
756 remote_path])[0].split()
757 self.assertEqual(checksum, dev_md5)
758
759 def _verify_local(self, checksum, local_path):
760 with open(local_path, 'rb') as host_file:
761 host_md5 = compute_md5(host_file.read())
762 self.assertEqual(host_md5, checksum)
763
764 def test_push(self):
765 """Push a randomly generated file to specified device."""
766 kbytes = 512
767 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
768 rand_str = os.urandom(1024 * kbytes)
769 tmp.write(rand_str)
770 tmp.close()
771
772 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
773 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
774
775 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
776 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
777
778 os.remove(tmp.name)
779
780 def test_push_dir(self):
781 """Push a randomly generated directory of files to the device."""
782 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
783 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
784
785 try:
786 host_dir = tempfile.mkdtemp()
787
788 # Make sure the temp directory isn't setuid, or else adb will complain.
789 os.chmod(host_dir, 0o700)
790
791 # Create 32 random files.
792 temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
793 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
794
795 for temp_file in temp_files:
796 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
797 os.path.basename(host_dir),
798 temp_file.base_name)
799 self._verify_remote(temp_file.checksum, remote_path)
800 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
801 finally:
802 if host_dir is not None:
803 shutil.rmtree(host_dir)
804
Josh Gaofb085102018-10-22 13:00:05 -0700805 def disabled_test_push_empty(self):
Josh Gaoed176502018-09-21 13:40:16 -0700806 """Push an empty directory to the device."""
Josh Gao191c1542015-12-09 11:26:11 -0800807 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
808 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
809
810 try:
811 host_dir = tempfile.mkdtemp()
812
813 # Make sure the temp directory isn't setuid, or else adb will complain.
814 os.chmod(host_dir, 0o700)
815
816 # Create an empty directory.
Josh Gao6c1b42c2018-08-08 14:25:41 -0700817 empty_dir_path = os.path.join(host_dir, 'empty')
818 os.mkdir(empty_dir_path);
Josh Gao191c1542015-12-09 11:26:11 -0800819
Josh Gao6c1b42c2018-08-08 14:25:41 -0700820 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
Josh Gao191c1542015-12-09 11:26:11 -0800821
Josh Gaoed176502018-09-21 13:40:16 -0700822 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
823 test_empty_cmd = ["[", "-d", remote_path, "]"]
Josh Gao191c1542015-12-09 11:26:11 -0800824 rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
Josh Gaoed176502018-09-21 13:40:16 -0700825
Josh Gao191c1542015-12-09 11:26:11 -0800826 self.assertEqual(rc, 0)
827 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
828 finally:
829 if host_dir is not None:
830 shutil.rmtree(host_dir)
831
Josh Gao94dc19f2016-09-14 16:13:50 -0700832 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
833 def test_push_symlink(self):
834 """Push a symlink.
835
836 Bug: http://b/31491920
837 """
838 try:
839 host_dir = tempfile.mkdtemp()
840
841 # Make sure the temp directory isn't setuid, or else adb will
842 # complain.
843 os.chmod(host_dir, 0o700)
844
845 with open(os.path.join(host_dir, 'foo'), 'w') as f:
846 f.write('foo')
847
848 symlink_path = os.path.join(host_dir, 'symlink')
849 os.symlink('foo', symlink_path)
850
851 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
852 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
853 self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
854 rc, out, _ = self.device.shell_nocheck(
855 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
856 self.assertEqual(0, rc)
857 self.assertEqual(out.strip(), 'foo')
858 finally:
859 if host_dir is not None:
860 shutil.rmtree(host_dir)
861
Josh Gao191c1542015-12-09 11:26:11 -0800862 def test_multiple_push(self):
863 """Push multiple files to the device in one adb push command.
864
865 Bug: http://b/25324823
866 """
867
868 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
869 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
870
871 try:
872 host_dir = tempfile.mkdtemp()
873
874 # Create some random files and a subdirectory containing more files.
875 temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
876
Josh Gao255c5c82016-03-03 14:49:02 -0800877 subdir = os.path.join(host_dir, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -0800878 os.mkdir(subdir)
879 subdir_temp_files = make_random_host_files(in_dir=subdir,
880 num_files=4)
881
Josh Gao93dee962020-02-06 17:52:38 -0800882 paths = [x.full_path for x in temp_files]
Josh Gao191c1542015-12-09 11:26:11 -0800883 paths.append(subdir)
884 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
885
886 for temp_file in temp_files:
887 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
888 temp_file.base_name)
889 self._verify_remote(temp_file.checksum, remote_path)
890
891 for subdir_temp_file in subdir_temp_files:
892 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
893 # BROKEN: http://b/25394682
Josh Gao255c5c82016-03-03 14:49:02 -0800894 # 'subdir';
Josh Gao191c1542015-12-09 11:26:11 -0800895 temp_file.base_name)
896 self._verify_remote(temp_file.checksum, remote_path)
897
898
899 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
900 finally:
901 if host_dir is not None:
902 shutil.rmtree(host_dir)
903
Josh Gaoafcdcd72016-02-19 15:55:55 -0800904 @requires_non_root
905 def test_push_error_reporting(self):
906 """Make sure that errors that occur while pushing a file get reported
907
908 Bug: http://b/26816782
909 """
910 with tempfile.NamedTemporaryFile() as tmp_file:
Josh Gao93dee962020-02-06 17:52:38 -0800911 tmp_file.write(b'\0' * 1024 * 1024)
Josh Gaoafcdcd72016-02-19 15:55:55 -0800912 tmp_file.flush()
913 try:
914 self.device.push(local=tmp_file.name, remote='/system/')
Josh Gao255c5c82016-03-03 14:49:02 -0800915 self.fail('push should not have succeeded')
Josh Gaoafcdcd72016-02-19 15:55:55 -0800916 except subprocess.CalledProcessError as e:
917 output = e.output
918
Josh Gao93dee962020-02-06 17:52:38 -0800919 self.assertTrue(b'Permission denied' in output or
920 b'Read-only file system' in output)
Josh Gao191c1542015-12-09 11:26:11 -0800921
Josh Gao4c0078d2018-06-28 18:43:19 -0700922 @requires_non_root
923 def test_push_directory_creation(self):
924 """Regression test for directory creation.
925
926 Bug: http://b/110953234
927 """
928 with tempfile.NamedTemporaryFile() as tmp_file:
Josh Gao93dee962020-02-06 17:52:38 -0800929 tmp_file.write(b'\0' * 1024 * 1024)
Josh Gao4c0078d2018-06-28 18:43:19 -0700930 tmp_file.flush()
931 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
932 self.device.shell(['rm', '-rf', remote_path])
933
934 remote_path += '/filename'
935 self.device.push(local=tmp_file.name, remote=remote_path)
936
Josh Gaoea3f43c2019-10-01 14:14:07 -0700937 def disabled_test_push_multiple_slash_root(self):
Josh Gao76b64ba2019-09-26 01:49:56 +0800938 """Regression test for pushing to //data/local/tmp.
939
940 Bug: http://b/141311284
Josh Gaoea3f43c2019-10-01 14:14:07 -0700941
942 Disabled because this broken on the adbd side as well: b/141943968
Josh Gao76b64ba2019-09-26 01:49:56 +0800943 """
944 with tempfile.NamedTemporaryFile() as tmp_file:
945 tmp_file.write('\0' * 1024 * 1024)
946 tmp_file.flush()
947 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
948 self.device.shell(['rm', '-rf', remote_path])
949 self.device.push(local=tmp_file.name, remote=remote_path)
950
Josh Gao191c1542015-12-09 11:26:11 -0800951 def _test_pull(self, remote_file, checksum):
952 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
953 tmp_write.close()
954 self.device.pull(remote=remote_file, local=tmp_write.name)
955 with open(tmp_write.name, 'rb') as tmp_read:
956 host_contents = tmp_read.read()
957 host_md5 = compute_md5(host_contents)
958 self.assertEqual(checksum, host_md5)
959 os.remove(tmp_write.name)
960
961 @requires_non_root
962 def test_pull_error_reporting(self):
963 self.device.shell(['touch', self.DEVICE_TEMP_FILE])
964 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
965
966 try:
967 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
968 except subprocess.CalledProcessError as e:
969 output = e.output
970
Josh Gao93dee962020-02-06 17:52:38 -0800971 self.assertIn(b'Permission denied', output)
Josh Gao191c1542015-12-09 11:26:11 -0800972
973 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
974
975 def test_pull(self):
976 """Pull a randomly generated file from specified device."""
977 kbytes = 512
978 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
979 cmd = ['dd', 'if=/dev/urandom',
980 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
981 'count={}'.format(kbytes)]
982 self.device.shell(cmd)
983 dev_md5, _ = self.device.shell(
984 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
985 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
986 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
987
988 def test_pull_dir(self):
989 """Pull a randomly generated directory of files from the device."""
990 try:
991 host_dir = tempfile.mkdtemp()
992
993 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
994 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
995
996 # Populate device directory with random files.
997 temp_files = make_random_device_files(
998 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
999
1000 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1001
1002 for temp_file in temp_files:
Josh Gaoce8f2cd2015-12-09 14:20:23 -08001003 host_path = os.path.join(
1004 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1005 temp_file.base_name)
1006 self._verify_local(temp_file.checksum, host_path)
Josh Gao191c1542015-12-09 11:26:11 -08001007
1008 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1009 finally:
1010 if host_dir is not None:
1011 shutil.rmtree(host_dir)
1012
Josh Gao1e611a32016-02-26 13:26:55 -08001013 def test_pull_dir_symlink(self):
1014 """Pull a directory into a symlink to a directory.
1015
1016 Bug: http://b/27362811
1017 """
Josh Gao255c5c82016-03-03 14:49:02 -08001018 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -08001019 raise unittest.SkipTest('requires POSIX')
1020
1021 try:
1022 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -08001023 real_dir = os.path.join(host_dir, 'dir')
1024 symlink = os.path.join(host_dir, 'symlink')
Josh Gao1e611a32016-02-26 13:26:55 -08001025 os.mkdir(real_dir)
1026 os.symlink(real_dir, symlink)
1027
1028 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1029 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1030
1031 # Populate device directory with random files.
1032 temp_files = make_random_device_files(
1033 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1034
1035 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1036
1037 for temp_file in temp_files:
1038 host_path = os.path.join(
1039 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1040 temp_file.base_name)
1041 self._verify_local(temp_file.checksum, host_path)
1042
1043 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1044 finally:
1045 if host_dir is not None:
1046 shutil.rmtree(host_dir)
1047
1048 def test_pull_dir_symlink_collision(self):
1049 """Pull a directory into a colliding symlink to directory."""
Josh Gao255c5c82016-03-03 14:49:02 -08001050 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -08001051 raise unittest.SkipTest('requires POSIX')
1052
1053 try:
1054 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -08001055 real_dir = os.path.join(host_dir, 'real')
Josh Gao1e611a32016-02-26 13:26:55 -08001056 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1057 symlink = os.path.join(host_dir, tmp_dirname)
1058 os.mkdir(real_dir)
1059 os.symlink(real_dir, symlink)
1060
1061 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1062 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1063
1064 # Populate device directory with random files.
1065 temp_files = make_random_device_files(
1066 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1067
1068 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1069
1070 for temp_file in temp_files:
1071 host_path = os.path.join(real_dir, temp_file.base_name)
1072 self._verify_local(temp_file.checksum, host_path)
1073
1074 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1075 finally:
1076 if host_dir is not None:
1077 shutil.rmtree(host_dir)
1078
Josh Gao89ec3a82016-03-02 16:00:02 -08001079 def test_pull_dir_nonexistent(self):
1080 """Pull a directory of files from the device to a nonexistent path."""
1081 try:
1082 host_dir = tempfile.mkdtemp()
1083 dest_dir = os.path.join(host_dir, 'dest')
1084
1085 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1086 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1087
1088 # Populate device directory with random files.
1089 temp_files = make_random_device_files(
1090 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1091
1092 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1093
1094 for temp_file in temp_files:
1095 host_path = os.path.join(dest_dir, temp_file.base_name)
1096 self._verify_local(temp_file.checksum, host_path)
1097
1098 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1099 finally:
1100 if host_dir is not None:
1101 shutil.rmtree(host_dir)
1102
Josh Gaoa8db2742018-08-08 14:26:03 -07001103 # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1104 def disabled_test_pull_symlink_dir(self):
Josh Gaof2642242015-12-09 14:03:30 -08001105 """Pull a symlink to a directory of symlinks to files."""
1106 try:
1107 host_dir = tempfile.mkdtemp()
1108
1109 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1110 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1111 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1112
1113 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1114 self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1115 self.device.shell(['ln', '-s', remote_links, remote_symlink])
1116
1117 # Populate device directory with random files.
1118 temp_files = make_random_device_files(
1119 self.device, in_dir=remote_dir, num_files=32)
1120
1121 for temp_file in temp_files:
1122 self.device.shell(
1123 ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1124 posixpath.join(remote_links, temp_file.base_name)])
1125
1126 self.device.pull(remote=remote_symlink, local=host_dir)
1127
1128 for temp_file in temp_files:
1129 host_path = os.path.join(
1130 host_dir, 'symlink', temp_file.base_name)
1131 self._verify_local(temp_file.checksum, host_path)
1132
1133 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1134 finally:
1135 if host_dir is not None:
1136 shutil.rmtree(host_dir)
1137
Josh Gao191c1542015-12-09 11:26:11 -08001138 def test_pull_empty(self):
1139 """Pull a directory containing an empty directory from the device."""
1140 try:
1141 host_dir = tempfile.mkdtemp()
1142
1143 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1144 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1145 self.device.shell(['mkdir', '-p', remote_empty_path])
1146
1147 self.device.pull(remote=remote_empty_path, local=host_dir)
1148 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1149 finally:
1150 if host_dir is not None:
1151 shutil.rmtree(host_dir)
1152
1153 def test_multiple_pull(self):
1154 """Pull a randomly generated directory of files from the device."""
1155
1156 try:
1157 host_dir = tempfile.mkdtemp()
1158
Josh Gao255c5c82016-03-03 14:49:02 -08001159 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -08001160 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1161 self.device.shell(['mkdir', '-p', subdir])
1162
1163 # Create some random files and a subdirectory containing more files.
1164 temp_files = make_random_device_files(
1165 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1166
1167 subdir_temp_files = make_random_device_files(
1168 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1169
Josh Gao93dee962020-02-06 17:52:38 -08001170 paths = [x.full_path for x in temp_files]
Josh Gao191c1542015-12-09 11:26:11 -08001171 paths.append(subdir)
1172 self.device._simple_call(['pull'] + paths + [host_dir])
1173
1174 for temp_file in temp_files:
1175 local_path = os.path.join(host_dir, temp_file.base_name)
1176 self._verify_local(temp_file.checksum, local_path)
1177
1178 for subdir_temp_file in subdir_temp_files:
1179 local_path = os.path.join(host_dir,
Josh Gao255c5c82016-03-03 14:49:02 -08001180 'subdir',
Josh Gao191c1542015-12-09 11:26:11 -08001181 subdir_temp_file.base_name)
1182 self._verify_local(subdir_temp_file.checksum, local_path)
1183
1184 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1185 finally:
1186 if host_dir is not None:
1187 shutil.rmtree(host_dir)
1188
Dan Albert06b0d6b2017-05-18 22:56:48 -07001189 def verify_sync(self, device, temp_files, device_dir):
1190 """Verifies that a list of temp files was synced to the device."""
1191 # Confirm that every file on the device mirrors that on the host.
1192 for temp_file in temp_files:
1193 device_full_path = posixpath.join(
1194 device_dir, temp_file.base_name)
1195 dev_md5, _ = device.shell(
1196 [get_md5_prog(self.device), device_full_path])[0].split()
1197 self.assertEqual(temp_file.checksum, dev_md5)
1198
Josh Gao191c1542015-12-09 11:26:11 -08001199 def test_sync(self):
Dan Albert06b0d6b2017-05-18 22:56:48 -07001200 """Sync a host directory to the data partition."""
Josh Gao191c1542015-12-09 11:26:11 -08001201
1202 try:
1203 base_dir = tempfile.mkdtemp()
1204
1205 # Create mirror device directory hierarchy within base_dir.
1206 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1207 os.makedirs(full_dir_path)
1208
1209 # Create 32 random files within the host mirror.
Dan Albert06b0d6b2017-05-18 22:56:48 -07001210 temp_files = make_random_host_files(
1211 in_dir=full_dir_path, num_files=32)
Josh Gao191c1542015-12-09 11:26:11 -08001212
Dan Albert06b0d6b2017-05-18 22:56:48 -07001213 # Clean up any stale files on the device.
Dan Albertdef4aae2017-05-18 13:52:45 -07001214 device = adb.get_device() # pylint: disable=no-member
Josh Gao191c1542015-12-09 11:26:11 -08001215 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1216
Dan Albertdef4aae2017-05-18 13:52:45 -07001217 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1218 os.environ['ANDROID_PRODUCT_OUT'] = base_dir
Josh Gao191c1542015-12-09 11:26:11 -08001219 device.sync('data')
Dan Albertdef4aae2017-05-18 13:52:45 -07001220 if old_product_out is None:
1221 del os.environ['ANDROID_PRODUCT_OUT']
1222 else:
1223 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
Josh Gao191c1542015-12-09 11:26:11 -08001224
Dan Albert06b0d6b2017-05-18 22:56:48 -07001225 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
Josh Gao191c1542015-12-09 11:26:11 -08001226
Dan Albert06b0d6b2017-05-18 22:56:48 -07001227 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
Josh Gao191c1542015-12-09 11:26:11 -08001228 finally:
1229 if base_dir is not None:
1230 shutil.rmtree(base_dir)
1231
Dan Albert06b0d6b2017-05-18 22:56:48 -07001232 def test_push_sync(self):
1233 """Sync a host directory to a specific path."""
1234
1235 try:
1236 temp_dir = tempfile.mkdtemp()
1237 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1238
1239 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1240
1241 # Clean up any stale files on the device.
1242 device = adb.get_device() # pylint: disable=no-member
1243 device.shell(['rm', '-rf', device_dir])
1244
1245 device.push(temp_dir, device_dir, sync=True)
1246
1247 self.verify_sync(device, temp_files, device_dir)
1248
1249 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1250 finally:
1251 if temp_dir is not None:
1252 shutil.rmtree(temp_dir)
1253
Josh Gao191c1542015-12-09 11:26:11 -08001254 def test_unicode_paths(self):
1255 """Ensure that we can support non-ASCII paths, even on Windows."""
1256 name = u'로보카 폴리'
1257
1258 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1259 remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1260
1261 ## push.
1262 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1263 tf.close()
1264 self.device.push(tf.name, remote_path)
1265 os.remove(tf.name)
1266 self.assertFalse(os.path.exists(tf.name))
1267
1268 # Verify that the device ended up with the expected UTF-8 path
1269 output = self.device.shell(
1270 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
Josh Gao2c621ae2018-03-19 16:09:05 -07001271 self.assertEqual(remote_path, output)
Josh Gao191c1542015-12-09 11:26:11 -08001272
1273 # pull.
1274 self.device.pull(remote_path, tf.name)
1275 self.assertTrue(os.path.exists(tf.name))
1276 os.remove(tf.name)
1277 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1278
1279
Yabin Cuib5e11412017-03-10 16:01:01 -08001280class DeviceOfflineTest(DeviceTest):
1281 def _get_device_state(self, serialno):
1282 output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1283 for line in output.split('\n'):
1284 m = re.match('(\S+)\s+(\S+)', line)
1285 if m and m.group(1) == serialno:
1286 return m.group(2)
1287 return None
1288
Josh Gao33d14b82017-09-13 14:51:23 -07001289 def disabled_test_killed_when_pushing_a_large_file(self):
Yabin Cuib5e11412017-03-10 16:01:01 -08001290 """
1291 While running adb push with a large file, kill adb server.
1292 Occasionally the device becomes offline. Because the device is still
1293 reading data without realizing that the adb server has been restarted.
1294 Test if we can bring the device online automatically now.
1295 http://b/32952319
1296 """
1297 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1298 # 1. Push a large file
1299 file_path = 'tmp_large_file'
1300 try:
1301 fh = open(file_path, 'w')
1302 fh.write('\0' * (100 * 1024 * 1024))
1303 fh.close()
1304 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1305 time.sleep(0.1)
1306 # 2. Kill the adb server
1307 subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1308 subproc.terminate()
1309 finally:
1310 try:
1311 os.unlink(file_path)
1312 except:
1313 pass
1314 # 3. See if the device still exist.
1315 # Sleep to wait for the adb server exit.
1316 time.sleep(0.5)
1317 # 4. The device should be online
1318 self.assertEqual(self._get_device_state(serialno), 'device')
1319
Josh Gao33d14b82017-09-13 14:51:23 -07001320 def disabled_test_killed_when_pulling_a_large_file(self):
Yabin Cuib5e11412017-03-10 16:01:01 -08001321 """
1322 While running adb pull with a large file, kill adb server.
1323 Occasionally the device can't be connected. Because the device is trying to
1324 send a message larger than what is expected by the adb server.
1325 Test if we can bring the device online automatically now.
1326 """
1327 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1328 file_path = 'tmp_large_file'
1329 try:
1330 # 1. Create a large file on device.
1331 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1332 'bs=1000000', 'count=100'])
1333 # 2. Pull the large file on host.
1334 subproc = subprocess.Popen(self.device.adb_cmd +
1335 ['pull','/data/local/tmp/tmp_large_file', file_path])
1336 time.sleep(0.1)
1337 # 3. Kill the adb server
1338 subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1339 subproc.terminate()
1340 finally:
1341 try:
1342 os.unlink(file_path)
1343 except:
1344 pass
1345 # 4. See if the device still exist.
1346 # Sleep to wait for the adb server exit.
1347 time.sleep(0.5)
1348 self.assertEqual(self._get_device_state(serialno), 'device')
1349
1350
Josh Gaoef3d3432017-05-02 15:01:09 -07001351 def test_packet_size_regression(self):
1352 """Test for http://b/37783561
1353
1354 Receiving packets of a length divisible by 512 but not 1024 resulted in
1355 the adb client waiting indefinitely for more input.
1356 """
1357 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1358 # Probe some surrounding values as well, for the hell of it.
Josh Gao93dee962020-02-06 17:52:38 -08001359 for base in [512] + list(range(1024, 1024 * 16, 1024)):
Josh Gao2ea46522018-04-10 14:35:06 -07001360 for offset in [-6, -5, -4]:
1361 length = base + offset
1362 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1363 'echo', 'foo']
1364 rc, stdout, _ = self.device.shell_nocheck(cmd)
Josh Gaoef3d3432017-05-02 15:01:09 -07001365
Josh Gao2ea46522018-04-10 14:35:06 -07001366 self.assertEqual(0, rc)
Josh Gaoef3d3432017-05-02 15:01:09 -07001367
Josh Gao2ea46522018-04-10 14:35:06 -07001368 # Output should be '\0' * length, followed by "foo\n"
1369 self.assertEqual(length, len(stdout) - 4)
1370 self.assertEqual(stdout, "\0" * length + "foo\n")
Josh Gaoef3d3432017-05-02 15:01:09 -07001371
Josh Gao5d799cd2018-08-22 15:13:18 -07001372 def test_zero_packet(self):
1373 """Test for http://b/113070258
1374
1375 Make sure that we don't blow up when sending USB transfers that line up
1376 exactly with the USB packet size.
1377 """
1378
1379 local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1380 try:
1381 for size in [512, 1024]:
1382 def listener():
1383 cmd = ["echo foo | nc -l -p 12345; echo done"]
1384 rc, stdout, stderr = self.device.shell_nocheck(cmd)
1385
1386 thread = threading.Thread(target=listener)
1387 thread.start()
1388
1389 # Wait a bit to let the shell command start.
1390 time.sleep(0.25)
1391
1392 sock = socket.create_connection(("localhost", local_port))
1393 with contextlib.closing(sock):
Josh Gao93dee962020-02-06 17:52:38 -08001394 bytesWritten = sock.send(b"a" * size)
Josh Gao5d799cd2018-08-22 15:13:18 -07001395 self.assertEqual(size, bytesWritten)
1396 readBytes = sock.recv(4096)
Josh Gao93dee962020-02-06 17:52:38 -08001397 self.assertEqual(b"foo\n", readBytes)
Josh Gao5d799cd2018-08-22 15:13:18 -07001398
1399 thread.join()
1400 finally:
1401 self.device.forward_remove("tcp:{}".format(local_port))
1402
Josh Gaoef3d3432017-05-02 15:01:09 -07001403
Josh Gao74b7ec72019-01-11 14:42:08 -08001404class SocketTest(DeviceTest):
1405 def test_socket_flush(self):
1406 """Test that we handle socket closure properly.
1407
1408 If we're done writing to a socket, closing before the other end has
1409 closed will send a TCP_RST if we have incoming data queued up, which
1410 may result in data that we've written being discarded.
1411
1412 Bug: http://b/74616284
1413 """
1414 s = socket.create_connection(("localhost", 5037))
1415
1416 def adb_length_prefixed(string):
1417 encoded = string.encode("utf8")
1418 result = b"%04x%s" % (len(encoded), encoded)
1419 return result
1420
1421 if "ANDROID_SERIAL" in os.environ:
1422 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1423 else:
1424 transport_string = "host:transport-any"
1425
1426 s.sendall(adb_length_prefixed(transport_string))
1427 response = s.recv(4)
Josh Gao93dee962020-02-06 17:52:38 -08001428 self.assertEqual(b"OKAY", response)
Josh Gao74b7ec72019-01-11 14:42:08 -08001429
1430 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1431 s.sendall(adb_length_prefixed(shell_string))
1432
1433 response = s.recv(4)
Josh Gao93dee962020-02-06 17:52:38 -08001434 self.assertEqual(b"OKAY", response)
Josh Gao74b7ec72019-01-11 14:42:08 -08001435
1436 # Spawn a thread that dumps garbage into the socket until failure.
1437 def spam():
1438 buf = b"\0" * 16384
1439 try:
1440 while True:
1441 s.sendall(buf)
1442 except Exception as ex:
1443 print(ex)
1444
1445 thread = threading.Thread(target=spam)
1446 thread.start()
1447
1448 time.sleep(1)
1449
1450 received = b""
1451 while True:
1452 read = s.recv(512)
1453 if len(read) == 0:
1454 break
1455 received += read
1456
Josh Gao93dee962020-02-06 17:52:38 -08001457 self.assertEqual(1024 * 1024 + len("foo\n"), len(received))
Josh Gao74b7ec72019-01-11 14:42:08 -08001458 thread.join()
1459
1460
Spencer Low69d8c392018-08-11 00:16:16 -07001461if sys.platform == "win32":
1462 # From https://stackoverflow.com/a/38749458
1463 import os
1464 import contextlib
1465 import msvcrt
1466 import ctypes
1467 from ctypes import wintypes
1468
1469 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1470
1471 GENERIC_READ = 0x80000000
1472 GENERIC_WRITE = 0x40000000
1473 FILE_SHARE_READ = 1
1474 FILE_SHARE_WRITE = 2
1475 CONSOLE_TEXTMODE_BUFFER = 1
1476 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1477 STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1478 STD_ERROR_HANDLE = wintypes.DWORD(-12)
1479
1480 def _check_zero(result, func, args):
1481 if not result:
1482 raise ctypes.WinError(ctypes.get_last_error())
1483 return args
1484
1485 def _check_invalid(result, func, args):
1486 if result == INVALID_HANDLE_VALUE:
1487 raise ctypes.WinError(ctypes.get_last_error())
1488 return args
1489
1490 if not hasattr(wintypes, 'LPDWORD'): # Python 2
1491 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1492 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1493
1494 class COORD(ctypes.Structure):
1495 _fields_ = (('X', wintypes.SHORT),
1496 ('Y', wintypes.SHORT))
1497
1498 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1499 _fields_ = (('cbSize', wintypes.ULONG),
1500 ('dwSize', COORD),
1501 ('dwCursorPosition', COORD),
1502 ('wAttributes', wintypes.WORD),
1503 ('srWindow', wintypes.SMALL_RECT),
1504 ('dwMaximumWindowSize', COORD),
1505 ('wPopupAttributes', wintypes.WORD),
1506 ('bFullscreenSupported', wintypes.BOOL),
1507 ('ColorTable', wintypes.DWORD * 16))
1508 def __init__(self, *args, **kwds):
1509 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1510 *args, **kwds)
1511 self.cbSize = ctypes.sizeof(self)
1512
1513 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1514 CONSOLE_SCREEN_BUFFER_INFOEX)
1515 LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1516
1517 kernel32.GetStdHandle.errcheck = _check_invalid
1518 kernel32.GetStdHandle.restype = wintypes.HANDLE
1519 kernel32.GetStdHandle.argtypes = (
1520 wintypes.DWORD,) # _In_ nStdHandle
1521
1522 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1523 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1524 kernel32.CreateConsoleScreenBuffer.argtypes = (
1525 wintypes.DWORD, # _In_ dwDesiredAccess
1526 wintypes.DWORD, # _In_ dwShareMode
1527 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes
1528 wintypes.DWORD, # _In_ dwFlags
1529 wintypes.LPVOID) # _Reserved_ lpScreenBufferData
1530
1531 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1532 kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1533 wintypes.HANDLE, # _In_ hConsoleOutput
1534 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1535
1536 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1537 kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1538 wintypes.HANDLE, # _In_ hConsoleOutput
1539 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo
1540
1541 kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1542 kernel32.SetConsoleWindowInfo.argtypes = (
1543 wintypes.HANDLE, # _In_ hConsoleOutput
1544 wintypes.BOOL, # _In_ bAbsolute
1545 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1546
1547 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1548 kernel32.FillConsoleOutputCharacterW.argtypes = (
1549 wintypes.HANDLE, # _In_ hConsoleOutput
1550 wintypes.WCHAR, # _In_ cCharacter
1551 wintypes.DWORD, # _In_ nLength
1552 COORD, # _In_ dwWriteCoord
1553 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1554
1555 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1556 kernel32.ReadConsoleOutputCharacterW.argtypes = (
1557 wintypes.HANDLE, # _In_ hConsoleOutput
1558 wintypes.LPWSTR, # _Out_ lpCharacter
1559 wintypes.DWORD, # _In_ nLength
1560 COORD, # _In_ dwReadCoord
1561 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1562
1563 @contextlib.contextmanager
1564 def allocate_console():
1565 allocated = kernel32.AllocConsole()
1566 try:
1567 yield allocated
1568 finally:
1569 if allocated:
1570 kernel32.FreeConsole()
1571
1572 @contextlib.contextmanager
1573 def console_screen(ncols=None, nrows=None):
1574 info = CONSOLE_SCREEN_BUFFER_INFOEX()
1575 new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1576 nwritten = (wintypes.DWORD * 1)()
1577 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1578 kernel32.GetConsoleScreenBufferInfoEx(
1579 hStdOut, ctypes.byref(info))
1580 if ncols is None:
1581 ncols = info.dwSize.X
1582 if nrows is None:
1583 nrows = info.dwSize.Y
1584 elif nrows > 9999:
1585 raise ValueError('nrows must be 9999 or less')
1586 fd_screen = None
1587 hScreen = kernel32.CreateConsoleScreenBuffer(
1588 GENERIC_READ | GENERIC_WRITE,
1589 FILE_SHARE_READ | FILE_SHARE_WRITE,
1590 None, CONSOLE_TEXTMODE_BUFFER, None)
1591 try:
1592 fd_screen = msvcrt.open_osfhandle(
1593 hScreen, os.O_RDWR | os.O_BINARY)
1594 kernel32.GetConsoleScreenBufferInfoEx(
1595 hScreen, ctypes.byref(new_info))
1596 new_info.dwSize = COORD(ncols, nrows)
1597 new_info.srWindow = wintypes.SMALL_RECT(
1598 Left=0, Top=0, Right=(ncols - 1),
1599 Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1600 kernel32.SetConsoleScreenBufferInfoEx(
1601 hScreen, ctypes.byref(new_info))
1602 kernel32.SetConsoleWindowInfo(hScreen, True,
1603 ctypes.byref(new_info.srWindow))
1604 kernel32.FillConsoleOutputCharacterW(
1605 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1606 kernel32.SetConsoleActiveScreenBuffer(hScreen)
1607 try:
1608 yield fd_screen
1609 finally:
1610 kernel32.SetConsoleScreenBufferInfoEx(
1611 hStdOut, ctypes.byref(info))
1612 kernel32.SetConsoleWindowInfo(hStdOut, True,
1613 ctypes.byref(info.srWindow))
1614 kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1615 finally:
1616 if fd_screen is not None:
1617 os.close(fd_screen)
1618 else:
1619 kernel32.CloseHandle(hScreen)
1620
1621 def read_screen(fd):
1622 hScreen = msvcrt.get_osfhandle(fd)
1623 csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1624 kernel32.GetConsoleScreenBufferInfoEx(
1625 hScreen, ctypes.byref(csbi))
1626 ncols = csbi.dwSize.X
1627 pos = csbi.dwCursorPosition
1628 length = ncols * pos.Y + pos.X + 1
1629 buf = (ctypes.c_wchar * length)()
1630 n = (wintypes.DWORD * 1)()
1631 kernel32.ReadConsoleOutputCharacterW(
1632 hScreen, buf, length, COORD(0,0), n)
1633 lines = [buf[i:i+ncols].rstrip(u'\0')
1634 for i in range(0, n[0], ncols)]
1635 return u'\n'.join(lines)
1636
1637@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1638class WindowsConsoleTest(DeviceTest):
1639 def test_unicode_output(self):
1640 """Test Unicode command line parameters and Unicode console window output.
1641
1642 Bug: https://issuetracker.google.com/issues/111972753
1643 """
1644 # If we don't have a console window, allocate one. This isn't necessary if we're already
1645 # being run from a console window, which is typical.
1646 with allocate_console() as allocated_console:
1647 # Create a temporary console buffer and switch to it. We could also pass a parameter of
1648 # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1649 # likely unnecessary given the typical console window size.
1650 with console_screen(nrows=1000) as screen:
1651 unicode_string = u'로보카 폴리'
1652 # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1653 # device.shell_popen() which does not use a pipe, unlike device.shell().
1654 process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1655 process.wait()
1656 # Read what was written by adb to the temporary console buffer.
1657 console_output = read_screen(screen)
1658 self.assertEqual(unicode_string, console_output)
1659
1660
Josh Gao191c1542015-12-09 11:26:11 -08001661def main():
1662 random.seed(0)
1663 if len(adb.get_devices()) > 0:
1664 suite = unittest.TestLoader().loadTestsFromName(__name__)
1665 unittest.TextTestRunner(verbosity=3).run(suite)
1666 else:
1667 print('Test suite must be run with attached devices')
1668
1669
1670if __name__ == '__main__':
1671 main()