blob: 083adce4df5de9e391dbe2b70fa9b27e2f804875 [file] [log] [blame]
Josh Gao191c1542015-12-09 11:26:11 -08001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
Josh Gaoc970aef2018-03-19 15:35:11 -070034import threading
Josh Gaofe50bb72016-06-22 18:27:22 -070035import time
Josh Gao191c1542015-12-09 11:26:11 -080036import unittest
37
Josh Gao74b7ec72019-01-11 14:42:08 -080038from datetime import datetime
39
Josh Gao191c1542015-12-09 11:26:11 -080040import adb
41
Josh Gao191c1542015-12-09 11:26:11 -080042def requires_root(func):
43 def wrapper(self, *args):
44 if self.device.get_prop('ro.debuggable') != '1':
45 raise unittest.SkipTest('requires rootable build')
46
47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48 if not was_root:
49 self.device.root()
50 self.device.wait()
51
52 try:
53 func(self, *args)
54 finally:
55 if not was_root:
56 self.device.unroot()
57 self.device.wait()
58
59 return wrapper
60
61
62def requires_non_root(func):
63 def wrapper(self, *args):
64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65 if was_root:
66 self.device.unroot()
67 self.device.wait()
68
69 try:
70 func(self, *args)
71 finally:
72 if was_root:
73 self.device.root()
74 self.device.wait()
75
76 return wrapper
77
78
Josh Gao191c1542015-12-09 11:26:11 -080079class DeviceTest(unittest.TestCase):
80 def setUp(self):
81 self.device = adb.get_device()
82
83
84class ForwardReverseTest(DeviceTest):
85 def _test_no_rebind(self, description, direction_list, direction,
86 direction_no_rebind, direction_remove_all):
87 msg = direction_list()
88 self.assertEqual('', msg.strip(),
89 description + ' list must be empty to run this test.')
90
91 # Use --no-rebind with no existing binding
92 direction_no_rebind('tcp:5566', 'tcp:6655')
93 msg = direction_list()
94 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
95
96 # Use --no-rebind with existing binding
97 with self.assertRaises(subprocess.CalledProcessError):
98 direction_no_rebind('tcp:5566', 'tcp:6677')
99 msg = direction_list()
100 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
101 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
102
103 # Use the absence of --no-rebind with existing binding
104 direction('tcp:5566', 'tcp:6677')
105 msg = direction_list()
106 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
107 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
108
109 direction_remove_all()
110 msg = direction_list()
111 self.assertEqual('', msg.strip())
112
113 def test_forward_no_rebind(self):
114 self._test_no_rebind('forward', self.device.forward_list,
115 self.device.forward, self.device.forward_no_rebind,
116 self.device.forward_remove_all)
117
118 def test_reverse_no_rebind(self):
119 self._test_no_rebind('reverse', self.device.reverse_list,
120 self.device.reverse, self.device.reverse_no_rebind,
121 self.device.reverse_remove_all)
122
123 def test_forward(self):
124 msg = self.device.forward_list()
125 self.assertEqual('', msg.strip(),
126 'Forwarding list must be empty to run this test.')
127 self.device.forward('tcp:5566', 'tcp:6655')
128 msg = self.device.forward_list()
129 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
130 self.device.forward('tcp:7788', 'tcp:8877')
131 msg = self.device.forward_list()
132 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
133 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
134 self.device.forward_remove('tcp:5566')
135 msg = self.device.forward_list()
136 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
137 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
138 self.device.forward_remove_all()
139 msg = self.device.forward_list()
140 self.assertEqual('', msg.strip())
141
Josh Gao727b07b2019-09-13 00:12:26 +0800142 def test_forward_old_protocol(self):
143 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
144
145 msg = self.device.forward_list()
146 self.assertEqual('', msg.strip(),
147 'Forwarding list must be empty to run this test.')
148
149 s = socket.create_connection(("localhost", 5037))
150 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
151 cmd = b"%04x%s" % (len(service), service)
152 s.sendall(cmd)
153
154 msg = self.device.forward_list()
155 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
156
157 self.device.forward_remove_all()
158 msg = self.device.forward_list()
159 self.assertEqual('', msg.strip())
160
David Purselleaae97e2016-04-07 11:25:48 -0700161 def test_forward_tcp_port_0(self):
162 self.assertEqual('', self.device.forward_list().strip(),
163 'Forwarding list must be empty to run this test.')
164
165 try:
166 # If resolving TCP port 0 is supported, `adb forward` will print
167 # the actual port number.
168 port = self.device.forward('tcp:0', 'tcp:8888').strip()
169 if not port:
170 raise unittest.SkipTest('Forwarding tcp:0 is not available.')
171
172 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
173 self.device.forward_list()))
174 finally:
175 self.device.forward_remove_all()
176
Josh Gao191c1542015-12-09 11:26:11 -0800177 def test_reverse(self):
178 msg = self.device.reverse_list()
179 self.assertEqual('', msg.strip(),
180 'Reverse forwarding list must be empty to run this test.')
181 self.device.reverse('tcp:5566', 'tcp:6655')
182 msg = self.device.reverse_list()
183 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
184 self.device.reverse('tcp:7788', 'tcp:8877')
185 msg = self.device.reverse_list()
186 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
187 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
188 self.device.reverse_remove('tcp:5566')
189 msg = self.device.reverse_list()
190 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
191 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
192 self.device.reverse_remove_all()
193 msg = self.device.reverse_list()
194 self.assertEqual('', msg.strip())
195
David Purselleaae97e2016-04-07 11:25:48 -0700196 def test_reverse_tcp_port_0(self):
197 self.assertEqual('', self.device.reverse_list().strip(),
198 'Reverse list must be empty to run this test.')
199
200 try:
201 # If resolving TCP port 0 is supported, `adb reverse` will print
202 # the actual port number.
203 port = self.device.reverse('tcp:0', 'tcp:8888').strip()
204 if not port:
205 raise unittest.SkipTest('Reversing tcp:0 is not available.')
206
207 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
208 self.device.reverse_list()))
209 finally:
210 self.device.reverse_remove_all()
211
Josh Gao191c1542015-12-09 11:26:11 -0800212 def test_forward_reverse_echo(self):
213 """Send data through adb forward and read it back via adb reverse"""
214 forward_port = 12345
215 reverse_port = forward_port + 1
Josh Gao255c5c82016-03-03 14:49:02 -0800216 forward_spec = 'tcp:' + str(forward_port)
217 reverse_spec = 'tcp:' + str(reverse_port)
Josh Gao191c1542015-12-09 11:26:11 -0800218 forward_setup = False
219 reverse_setup = False
220
221 try:
222 # listen on localhost:forward_port, connect to remote:forward_port
223 self.device.forward(forward_spec, forward_spec)
224 forward_setup = True
225 # listen on remote:forward_port, connect to localhost:reverse_port
226 self.device.reverse(forward_spec, reverse_spec)
227 reverse_setup = True
228
229 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
230 with contextlib.closing(listener):
231 # Use SO_REUSEADDR so that subsequent runs of the test can grab
232 # the port even if it is in TIME_WAIT.
233 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
234
235 # Listen on localhost:reverse_port before connecting to
236 # localhost:forward_port because that will cause adb to connect
237 # back to localhost:reverse_port.
238 listener.bind(('127.0.0.1', reverse_port))
239 listener.listen(4)
240
241 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
242 with contextlib.closing(client):
243 # Connect to the listener.
244 client.connect(('127.0.0.1', forward_port))
245
246 # Accept the client connection.
247 accepted_connection, addr = listener.accept()
248 with contextlib.closing(accepted_connection) as server:
249 data = 'hello'
250
251 # Send data into the port setup by adb forward.
252 client.sendall(data)
253 # Explicitly close() so that server gets EOF.
254 client.close()
255
256 # Verify that the data came back via adb reverse.
257 self.assertEqual(data, server.makefile().read())
258 finally:
259 if reverse_setup:
260 self.device.reverse_remove(forward_spec)
261 if forward_setup:
262 self.device.forward_remove(forward_spec)
263
264
265class ShellTest(DeviceTest):
266 def _interactive_shell(self, shell_args, input):
267 """Runs an interactive adb shell.
268
269 Args:
270 shell_args: List of string arguments to `adb shell`.
271 input: String input to send to the interactive shell.
272
273 Returns:
274 The remote exit code.
275
276 Raises:
277 unittest.SkipTest: The device doesn't support exit codes.
278 """
David Pursellcf467412016-04-26 13:25:57 -0700279 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800280 raise unittest.SkipTest('exit codes are unavailable on this device')
281
282 proc = subprocess.Popen(
283 self.device.adb_cmd + ['shell'] + shell_args,
284 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
285 stderr=subprocess.PIPE)
286 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
287 # to explicitly add an exit command to close the session from the device
288 # side, plus the necessary newline to complete the interactive command.
289 proc.communicate(input + '; exit\n')
290 return proc.returncode
291
292 def test_cat(self):
293 """Check that we can at least cat a file."""
294 out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
295 elements = out.split()
296 self.assertEqual(len(elements), 2)
297
298 uptime, idle = elements
299 self.assertGreater(float(uptime), 0.0)
300 self.assertGreater(float(idle), 0.0)
301
302 def test_throws_on_failure(self):
303 self.assertRaises(adb.ShellError, self.device.shell, ['false'])
304
305 def test_output_not_stripped(self):
306 out = self.device.shell(['echo', 'foo'])[0]
307 self.assertEqual(out, 'foo' + self.device.linesep)
308
Josh Gaoa019f782017-06-16 15:34:34 -0700309 def test_shell_command_length(self):
310 # Devices that have shell_v2 should be able to handle long commands.
311 if self.device.has_shell_protocol():
312 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
313 self.assertEqual(rc, 0)
314 self.assertTrue(out == ('x' * 16384 + '\n'))
315
Josh Gao191c1542015-12-09 11:26:11 -0800316 def test_shell_nocheck_failure(self):
317 rc, out, _ = self.device.shell_nocheck(['false'])
318 self.assertNotEqual(rc, 0)
319 self.assertEqual(out, '')
320
321 def test_shell_nocheck_output_not_stripped(self):
322 rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
323 self.assertEqual(rc, 0)
324 self.assertEqual(out, 'foo' + self.device.linesep)
325
326 def test_can_distinguish_tricky_results(self):
327 # If result checking on ADB shell is naively implemented as
328 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
329 # output from the result for a cmd of `echo -n 1`.
330 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
331 self.assertEqual(rc, 0)
332 self.assertEqual(out, '1')
333
334 def test_line_endings(self):
335 """Ensure that line ending translation is not happening in the pty.
336
337 Bug: http://b/19735063
338 """
339 output = self.device.shell(['uname'])[0]
340 self.assertEqual(output, 'Linux' + self.device.linesep)
341
342 def test_pty_logic(self):
343 """Tests that a PTY is allocated when it should be.
344
Elliott Hughescabfa112016-10-19 14:47:11 -0700345 PTY allocation behavior should match ssh.
Josh Gao191c1542015-12-09 11:26:11 -0800346 """
Josh Gao191c1542015-12-09 11:26:11 -0800347 def check_pty(args):
348 """Checks adb shell PTY allocation.
349
350 Tests |args| for terminal and non-terminal stdin.
351
352 Args:
353 args: -Tt args in a list (e.g. ['-t', '-t']).
354
355 Returns:
356 A tuple (<terminal>, <non-terminal>). True indicates
357 the corresponding shell allocated a remote PTY.
358 """
359 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
360
361 terminal = subprocess.Popen(
362 test_cmd, stdin=None,
363 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
364 terminal.communicate()
365
366 non_terminal = subprocess.Popen(
367 test_cmd, stdin=subprocess.PIPE,
368 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
369 non_terminal.communicate()
370
371 return (terminal.returncode == 0, non_terminal.returncode == 0)
372
373 # -T: never allocate PTY.
374 self.assertEqual((False, False), check_pty(['-T']))
375
Elliott Hughescabfa112016-10-19 14:47:11 -0700376 # These tests require a new device.
377 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
378 # No args: PTY only if stdin is a terminal and shell is interactive,
379 # which is difficult to reliably test from a script.
380 self.assertEqual((False, False), check_pty([]))
Josh Gao191c1542015-12-09 11:26:11 -0800381
Elliott Hughescabfa112016-10-19 14:47:11 -0700382 # -t: PTY if stdin is a terminal.
383 self.assertEqual((True, False), check_pty(['-t']))
Josh Gao191c1542015-12-09 11:26:11 -0800384
385 # -t -t: always allocate PTY.
386 self.assertEqual((True, True), check_pty(['-t', '-t']))
387
Elliott Hughescabfa112016-10-19 14:47:11 -0700388 # -tt: always allocate PTY, POSIX style (http://b/32216152).
389 self.assertEqual((True, True), check_pty(['-tt']))
390
391 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
392 # we follow the man page instead.
393 self.assertEqual((True, True), check_pty(['-ttt']))
394
395 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
396 self.assertEqual((True, True), check_pty(['-ttx']))
397
398 # -Ttt: -tt cancels out -T.
399 self.assertEqual((True, True), check_pty(['-Ttt']))
400
401 # -ttT: -T cancels out -tt.
402 self.assertEqual((False, False), check_pty(['-ttT']))
403
Josh Gao191c1542015-12-09 11:26:11 -0800404 def test_shell_protocol(self):
405 """Tests the shell protocol on the device.
406
407 If the device supports shell protocol, this gives us the ability
408 to separate stdout/stderr and return the exit code directly.
409
410 Bug: http://b/19734861
411 """
David Pursellcf467412016-04-26 13:25:57 -0700412 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800413 raise unittest.SkipTest('shell protocol unsupported on this device')
414
415 # Shell protocol should be used by default.
416 result = self.device.shell_nocheck(
417 shlex.split('echo foo; echo bar >&2; exit 17'))
418 self.assertEqual(17, result[0])
419 self.assertEqual('foo' + self.device.linesep, result[1])
420 self.assertEqual('bar' + self.device.linesep, result[2])
421
422 self.assertEqual(17, self._interactive_shell([], 'exit 17'))
423
424 # -x flag should disable shell protocol.
425 result = self.device.shell_nocheck(
426 shlex.split('-x echo foo; echo bar >&2; exit 17'))
427 self.assertEqual(0, result[0])
428 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
429 self.assertEqual('', result[2])
430
431 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
432
433 def test_non_interactive_sigint(self):
434 """Tests that SIGINT in a non-interactive shell kills the process.
435
436 This requires the shell protocol in order to detect the broken
437 pipe; raw data transfer mode will only see the break once the
438 subprocess tries to read or write.
439
440 Bug: http://b/23825725
441 """
David Pursellcf467412016-04-26 13:25:57 -0700442 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800443 raise unittest.SkipTest('shell protocol unsupported on this device')
444
445 # Start a long-running process.
446 sleep_proc = subprocess.Popen(
447 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
448 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
449 stderr=subprocess.STDOUT)
450 remote_pid = sleep_proc.stdout.readline().strip()
451 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
452 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
453
454 # Verify that the process is running, send signal, verify it stopped.
455 self.device.shell(proc_query)
456 os.kill(sleep_proc.pid, signal.SIGINT)
457 sleep_proc.communicate()
Josh Gaoe76b9f32016-10-21 12:40:42 -0700458
459 # It can take some time for the process to receive the signal and die.
460 end_time = time.time() + 3
461 while self.device.shell_nocheck(proc_query)[0] != 1:
462 self.assertFalse(time.time() > end_time,
463 'subprocess failed to terminate in time')
Josh Gao191c1542015-12-09 11:26:11 -0800464
465 def test_non_interactive_stdin(self):
466 """Tests that non-interactive shells send stdin."""
David Pursellcf467412016-04-26 13:25:57 -0700467 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800468 raise unittest.SkipTest('non-interactive stdin unsupported '
469 'on this device')
470
471 # Test both small and large inputs.
472 small_input = 'foo'
473 large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
474 string.digits))
475
476 for input in (small_input, large_input):
477 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
478 stdin=subprocess.PIPE,
479 stdout=subprocess.PIPE,
480 stderr=subprocess.PIPE)
481 stdout, stderr = proc.communicate(input)
482 self.assertEqual(input.splitlines(), stdout.splitlines())
483 self.assertEqual('', stderr)
484
Josh Gaofe50bb72016-06-22 18:27:22 -0700485 def test_sighup(self):
486 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
487 log_path = "/data/local/tmp/adb_signal_test.log"
488
489 # Clear the output file.
490 self.device.shell_nocheck(["echo", ">", log_path])
491
492 script = """
493 trap "echo SIGINT > {path}; exit 0" SIGINT
494 trap "echo SIGHUP > {path}; exit 0" SIGHUP
495 echo Waiting
Josh Gao470622f2016-10-21 13:17:32 -0700496 read
Josh Gaofe50bb72016-06-22 18:27:22 -0700497 """.format(path=log_path)
498
499 script = ";".join([x.strip() for x in script.strip().splitlines()])
500
Josh Gao470622f2016-10-21 13:17:32 -0700501 process = self.device.shell_popen([script], kill_atexit=False,
502 stdin=subprocess.PIPE,
503 stdout=subprocess.PIPE)
Josh Gaofe50bb72016-06-22 18:27:22 -0700504
505 self.assertEqual("Waiting\n", process.stdout.readline())
506 process.send_signal(signal.SIGINT)
507 process.wait()
508
509 # Waiting for the local adb to finish is insufficient, since it hangs
510 # up immediately.
Josh Gao470622f2016-10-21 13:17:32 -0700511 time.sleep(1)
Josh Gaofe50bb72016-06-22 18:27:22 -0700512
513 stdout, _ = self.device.shell(["cat", log_path])
514 self.assertEqual(stdout.strip(), "SIGHUP")
515
Josh Gaoc970aef2018-03-19 15:35:11 -0700516 def test_exit_stress(self):
517 """Hammer `adb shell exit 42` with multiple threads."""
518 thread_count = 48
519 result = dict()
520 def hammer(thread_idx, thread_count, result):
521 success = True
522 for i in range(thread_idx, 240, thread_count):
523 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
524 if ret != i % 256:
525 success = False
526 break
527 result[thread_idx] = success
528
529 threads = []
530 for i in range(thread_count):
531 thread = threading.Thread(target=hammer, args=(i, thread_count, result))
532 thread.start()
533 threads.append(thread)
534 for thread in threads:
535 thread.join()
536 for i, success in result.iteritems():
537 self.assertTrue(success)
538
Josh Gao63da8e62019-12-16 17:13:51 -0800539 def disabled_test_parallel(self):
540 """Spawn a bunch of `adb shell` instances in parallel.
541
542 This was broken historically due to the use of select, which only works
543 for fds that are numerically less than 1024.
544
545 Bug: http://b/141955761"""
546
547 n_procs = 2048
548 procs = dict()
549 for i in xrange(0, n_procs):
550 procs[i] = subprocess.Popen(
551 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
552 stdin=subprocess.PIPE,
553 stdout=subprocess.PIPE
554 )
555
556 for i in xrange(0, n_procs):
557 procs[i].stdin.write("%d\n" % i)
558
559 for i in xrange(0, n_procs):
560 response = procs[i].stdout.readline()
561 assert(response == "%d\n" % i)
562
563 for i in xrange(0, n_procs):
564 procs[i].stdin.write("%d\n" % (i % 256))
565
566 for i in xrange(0, n_procs):
567 assert(procs[i].wait() == i % 256)
568
Josh Gao191c1542015-12-09 11:26:11 -0800569
570class ArgumentEscapingTest(DeviceTest):
571 def test_shell_escaping(self):
572 """Make sure that argument escaping is somewhat sane."""
573
574 # http://b/19734868
575 # Note that this actually matches ssh(1)'s behavior --- it's
576 # converted to `sh -c echo hello; echo world` which sh interprets
577 # as `sh -c echo` (with an argument to that shell of "hello"),
578 # and then `echo world` back in the first shell.
579 result = self.device.shell(
580 shlex.split("sh -c 'echo hello; echo world'"))[0]
581 result = result.splitlines()
582 self.assertEqual(['', 'world'], result)
583 # If you really wanted "hello" and "world", here's what you'd do:
584 result = self.device.shell(
585 shlex.split(r'echo hello\;echo world'))[0].splitlines()
586 self.assertEqual(['hello', 'world'], result)
587
588 # http://b/15479704
589 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
590 self.assertEqual('t', result)
591 result = self.device.shell(
592 shlex.split("sh -c 'true && echo t'"))[0].strip()
593 self.assertEqual('t', result)
594
595 # http://b/20564385
596 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
597 self.assertEqual('t', result)
598 result = self.device.shell(
599 shlex.split(r'echo -n 123\;uname'))[0].strip()
600 self.assertEqual('123Linux', result)
601
602 def test_install_argument_escaping(self):
603 """Make sure that install argument escaping works."""
604 # http://b/20323053, http://b/3090932.
605 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
606 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
607 delete=False)
608 tf.close()
609
610 # Installing bogus .apks fails if the device supports exit codes.
611 try:
612 output = self.device.install(tf.name)
613 except subprocess.CalledProcessError as e:
614 output = e.output
615
616 self.assertIn(file_suffix, output)
617 os.remove(tf.name)
618
619
620class RootUnrootTest(DeviceTest):
621 def _test_root(self):
622 message = self.device.root()
623 if 'adbd cannot run as root in production builds' in message:
624 return
625 self.device.wait()
626 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
627
628 def _test_unroot(self):
629 self.device.unroot()
630 self.device.wait()
631 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
632
633 def test_root_unroot(self):
634 """Make sure that adb root and adb unroot work, using id(1)."""
635 if self.device.get_prop('ro.debuggable') != '1':
636 raise unittest.SkipTest('requires rootable build')
637
638 original_user = self.device.shell(['id', '-un'])[0].strip()
639 try:
640 if original_user == 'root':
641 self._test_unroot()
642 self._test_root()
643 elif original_user == 'shell':
644 self._test_root()
645 self._test_unroot()
646 finally:
647 if original_user == 'root':
648 self.device.root()
649 else:
650 self.device.unroot()
651 self.device.wait()
652
653
654class TcpIpTest(DeviceTest):
655 def test_tcpip_failure_raises(self):
656 """adb tcpip requires a port.
657
658 Bug: http://b/22636927
659 """
660 self.assertRaises(
661 subprocess.CalledProcessError, self.device.tcpip, '')
662 self.assertRaises(
663 subprocess.CalledProcessError, self.device.tcpip, 'foo')
664
665
666class SystemPropertiesTest(DeviceTest):
667 def test_get_prop(self):
668 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
669
670 @requires_root
671 def test_set_prop(self):
672 prop_name = 'foo.bar'
673 self.device.shell(['setprop', prop_name, '""'])
674
675 self.device.set_prop(prop_name, 'qux')
676 self.assertEqual(
677 self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
678
679
680def compute_md5(string):
681 hsh = hashlib.md5()
682 hsh.update(string)
683 return hsh.hexdigest()
684
685
686def get_md5_prog(device):
687 """Older platforms (pre-L) had the name md5 rather than md5sum."""
688 try:
689 device.shell(['md5sum', '/proc/uptime'])
690 return 'md5sum'
691 except adb.ShellError:
692 return 'md5'
693
694
695class HostFile(object):
696 def __init__(self, handle, checksum):
697 self.handle = handle
698 self.checksum = checksum
699 self.full_path = handle.name
700 self.base_name = os.path.basename(self.full_path)
701
702
703class DeviceFile(object):
704 def __init__(self, checksum, full_path):
705 self.checksum = checksum
706 self.full_path = full_path
707 self.base_name = posixpath.basename(self.full_path)
708
709
710def make_random_host_files(in_dir, num_files):
711 min_size = 1 * (1 << 10)
712 max_size = 16 * (1 << 10)
713
714 files = []
715 for _ in xrange(num_files):
716 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
717
718 size = random.randrange(min_size, max_size, 1024)
719 rand_str = os.urandom(size)
720 file_handle.write(rand_str)
721 file_handle.flush()
722 file_handle.close()
723
724 md5 = compute_md5(rand_str)
725 files.append(HostFile(file_handle, md5))
726 return files
727
728
729def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
730 min_size = 1 * (1 << 10)
731 max_size = 16 * (1 << 10)
732
733 files = []
734 for file_num in xrange(num_files):
735 size = random.randrange(min_size, max_size, 1024)
736
737 base_name = prefix + str(file_num)
738 full_path = posixpath.join(in_dir, base_name)
739
740 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
741 'bs={}'.format(size), 'count=1'])
742 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
743
744 files.append(DeviceFile(dev_md5, full_path))
745 return files
746
747
748class FileOperationsTest(DeviceTest):
749 SCRATCH_DIR = '/data/local/tmp'
750 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
751 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
752
753 def _verify_remote(self, checksum, remote_path):
754 dev_md5, _ = self.device.shell([get_md5_prog(self.device),
755 remote_path])[0].split()
756 self.assertEqual(checksum, dev_md5)
757
758 def _verify_local(self, checksum, local_path):
759 with open(local_path, 'rb') as host_file:
760 host_md5 = compute_md5(host_file.read())
761 self.assertEqual(host_md5, checksum)
762
763 def test_push(self):
764 """Push a randomly generated file to specified device."""
765 kbytes = 512
766 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
767 rand_str = os.urandom(1024 * kbytes)
768 tmp.write(rand_str)
769 tmp.close()
770
771 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
772 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
773
774 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
775 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
776
777 os.remove(tmp.name)
778
779 def test_push_dir(self):
780 """Push a randomly generated directory of files to the device."""
781 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
782 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
783
784 try:
785 host_dir = tempfile.mkdtemp()
786
787 # Make sure the temp directory isn't setuid, or else adb will complain.
788 os.chmod(host_dir, 0o700)
789
790 # Create 32 random files.
791 temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
792 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
793
794 for temp_file in temp_files:
795 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
796 os.path.basename(host_dir),
797 temp_file.base_name)
798 self._verify_remote(temp_file.checksum, remote_path)
799 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
800 finally:
801 if host_dir is not None:
802 shutil.rmtree(host_dir)
803
Josh Gaofb085102018-10-22 13:00:05 -0700804 def disabled_test_push_empty(self):
Josh Gaoed176502018-09-21 13:40:16 -0700805 """Push an empty directory to the device."""
Josh Gao191c1542015-12-09 11:26:11 -0800806 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
807 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
808
809 try:
810 host_dir = tempfile.mkdtemp()
811
812 # Make sure the temp directory isn't setuid, or else adb will complain.
813 os.chmod(host_dir, 0o700)
814
815 # Create an empty directory.
Josh Gao6c1b42c2018-08-08 14:25:41 -0700816 empty_dir_path = os.path.join(host_dir, 'empty')
817 os.mkdir(empty_dir_path);
Josh Gao191c1542015-12-09 11:26:11 -0800818
Josh Gao6c1b42c2018-08-08 14:25:41 -0700819 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
Josh Gao191c1542015-12-09 11:26:11 -0800820
Josh Gaoed176502018-09-21 13:40:16 -0700821 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
822 test_empty_cmd = ["[", "-d", remote_path, "]"]
Josh Gao191c1542015-12-09 11:26:11 -0800823 rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
Josh Gaoed176502018-09-21 13:40:16 -0700824
Josh Gao191c1542015-12-09 11:26:11 -0800825 self.assertEqual(rc, 0)
826 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
827 finally:
828 if host_dir is not None:
829 shutil.rmtree(host_dir)
830
Josh Gao94dc19f2016-09-14 16:13:50 -0700831 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
832 def test_push_symlink(self):
833 """Push a symlink.
834
835 Bug: http://b/31491920
836 """
837 try:
838 host_dir = tempfile.mkdtemp()
839
840 # Make sure the temp directory isn't setuid, or else adb will
841 # complain.
842 os.chmod(host_dir, 0o700)
843
844 with open(os.path.join(host_dir, 'foo'), 'w') as f:
845 f.write('foo')
846
847 symlink_path = os.path.join(host_dir, 'symlink')
848 os.symlink('foo', symlink_path)
849
850 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
851 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
852 self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
853 rc, out, _ = self.device.shell_nocheck(
854 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
855 self.assertEqual(0, rc)
856 self.assertEqual(out.strip(), 'foo')
857 finally:
858 if host_dir is not None:
859 shutil.rmtree(host_dir)
860
Josh Gao191c1542015-12-09 11:26:11 -0800861 def test_multiple_push(self):
862 """Push multiple files to the device in one adb push command.
863
864 Bug: http://b/25324823
865 """
866
867 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
868 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
869
870 try:
871 host_dir = tempfile.mkdtemp()
872
873 # Create some random files and a subdirectory containing more files.
874 temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
875
Josh Gao255c5c82016-03-03 14:49:02 -0800876 subdir = os.path.join(host_dir, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -0800877 os.mkdir(subdir)
878 subdir_temp_files = make_random_host_files(in_dir=subdir,
879 num_files=4)
880
881 paths = map(lambda temp_file: temp_file.full_path, temp_files)
882 paths.append(subdir)
883 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
884
885 for temp_file in temp_files:
886 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
887 temp_file.base_name)
888 self._verify_remote(temp_file.checksum, remote_path)
889
890 for subdir_temp_file in subdir_temp_files:
891 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
892 # BROKEN: http://b/25394682
Josh Gao255c5c82016-03-03 14:49:02 -0800893 # 'subdir';
Josh Gao191c1542015-12-09 11:26:11 -0800894 temp_file.base_name)
895 self._verify_remote(temp_file.checksum, remote_path)
896
897
898 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
899 finally:
900 if host_dir is not None:
901 shutil.rmtree(host_dir)
902
Josh Gaoafcdcd72016-02-19 15:55:55 -0800903 @requires_non_root
904 def test_push_error_reporting(self):
905 """Make sure that errors that occur while pushing a file get reported
906
907 Bug: http://b/26816782
908 """
909 with tempfile.NamedTemporaryFile() as tmp_file:
910 tmp_file.write('\0' * 1024 * 1024)
911 tmp_file.flush()
912 try:
913 self.device.push(local=tmp_file.name, remote='/system/')
Josh Gao255c5c82016-03-03 14:49:02 -0800914 self.fail('push should not have succeeded')
Josh Gaoafcdcd72016-02-19 15:55:55 -0800915 except subprocess.CalledProcessError as e:
916 output = e.output
917
Josh Gao79ce3fe2016-11-18 15:31:11 -0800918 self.assertTrue('Permission denied' in output or
919 'Read-only file system' in output)
Josh Gao191c1542015-12-09 11:26:11 -0800920
Josh Gao4c0078d2018-06-28 18:43:19 -0700921 @requires_non_root
922 def test_push_directory_creation(self):
923 """Regression test for directory creation.
924
925 Bug: http://b/110953234
926 """
927 with tempfile.NamedTemporaryFile() as tmp_file:
928 tmp_file.write('\0' * 1024 * 1024)
929 tmp_file.flush()
930 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
931 self.device.shell(['rm', '-rf', remote_path])
932
933 remote_path += '/filename'
934 self.device.push(local=tmp_file.name, remote=remote_path)
935
Josh Gaoea3f43c2019-10-01 14:14:07 -0700936 def disabled_test_push_multiple_slash_root(self):
Josh Gao76b64ba2019-09-26 01:49:56 +0800937 """Regression test for pushing to //data/local/tmp.
938
939 Bug: http://b/141311284
Josh Gaoea3f43c2019-10-01 14:14:07 -0700940
941 Disabled because this broken on the adbd side as well: b/141943968
Josh Gao76b64ba2019-09-26 01:49:56 +0800942 """
943 with tempfile.NamedTemporaryFile() as tmp_file:
944 tmp_file.write('\0' * 1024 * 1024)
945 tmp_file.flush()
946 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
947 self.device.shell(['rm', '-rf', remote_path])
948 self.device.push(local=tmp_file.name, remote=remote_path)
949
Josh Gao191c1542015-12-09 11:26:11 -0800950 def _test_pull(self, remote_file, checksum):
951 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
952 tmp_write.close()
953 self.device.pull(remote=remote_file, local=tmp_write.name)
954 with open(tmp_write.name, 'rb') as tmp_read:
955 host_contents = tmp_read.read()
956 host_md5 = compute_md5(host_contents)
957 self.assertEqual(checksum, host_md5)
958 os.remove(tmp_write.name)
959
960 @requires_non_root
961 def test_pull_error_reporting(self):
962 self.device.shell(['touch', self.DEVICE_TEMP_FILE])
963 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
964
965 try:
966 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
967 except subprocess.CalledProcessError as e:
968 output = e.output
969
970 self.assertIn('Permission denied', output)
971
972 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
973
974 def test_pull(self):
975 """Pull a randomly generated file from specified device."""
976 kbytes = 512
977 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
978 cmd = ['dd', 'if=/dev/urandom',
979 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
980 'count={}'.format(kbytes)]
981 self.device.shell(cmd)
982 dev_md5, _ = self.device.shell(
983 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
984 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
985 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
986
987 def test_pull_dir(self):
988 """Pull a randomly generated directory of files from the device."""
989 try:
990 host_dir = tempfile.mkdtemp()
991
992 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
993 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
994
995 # Populate device directory with random files.
996 temp_files = make_random_device_files(
997 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
998
999 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1000
1001 for temp_file in temp_files:
Josh Gaoce8f2cd2015-12-09 14:20:23 -08001002 host_path = os.path.join(
1003 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1004 temp_file.base_name)
1005 self._verify_local(temp_file.checksum, host_path)
Josh Gao191c1542015-12-09 11:26:11 -08001006
1007 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1008 finally:
1009 if host_dir is not None:
1010 shutil.rmtree(host_dir)
1011
Josh Gao1e611a32016-02-26 13:26:55 -08001012 def test_pull_dir_symlink(self):
1013 """Pull a directory into a symlink to a directory.
1014
1015 Bug: http://b/27362811
1016 """
Josh Gao255c5c82016-03-03 14:49:02 -08001017 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -08001018 raise unittest.SkipTest('requires POSIX')
1019
1020 try:
1021 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -08001022 real_dir = os.path.join(host_dir, 'dir')
1023 symlink = os.path.join(host_dir, 'symlink')
Josh Gao1e611a32016-02-26 13:26:55 -08001024 os.mkdir(real_dir)
1025 os.symlink(real_dir, symlink)
1026
1027 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1028 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1029
1030 # Populate device directory with random files.
1031 temp_files = make_random_device_files(
1032 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1033
1034 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1035
1036 for temp_file in temp_files:
1037 host_path = os.path.join(
1038 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1039 temp_file.base_name)
1040 self._verify_local(temp_file.checksum, host_path)
1041
1042 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1043 finally:
1044 if host_dir is not None:
1045 shutil.rmtree(host_dir)
1046
1047 def test_pull_dir_symlink_collision(self):
1048 """Pull a directory into a colliding symlink to directory."""
Josh Gao255c5c82016-03-03 14:49:02 -08001049 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -08001050 raise unittest.SkipTest('requires POSIX')
1051
1052 try:
1053 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -08001054 real_dir = os.path.join(host_dir, 'real')
Josh Gao1e611a32016-02-26 13:26:55 -08001055 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1056 symlink = os.path.join(host_dir, tmp_dirname)
1057 os.mkdir(real_dir)
1058 os.symlink(real_dir, symlink)
1059
1060 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1061 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1062
1063 # Populate device directory with random files.
1064 temp_files = make_random_device_files(
1065 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1066
1067 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1068
1069 for temp_file in temp_files:
1070 host_path = os.path.join(real_dir, temp_file.base_name)
1071 self._verify_local(temp_file.checksum, host_path)
1072
1073 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1074 finally:
1075 if host_dir is not None:
1076 shutil.rmtree(host_dir)
1077
Josh Gao89ec3a82016-03-02 16:00:02 -08001078 def test_pull_dir_nonexistent(self):
1079 """Pull a directory of files from the device to a nonexistent path."""
1080 try:
1081 host_dir = tempfile.mkdtemp()
1082 dest_dir = os.path.join(host_dir, 'dest')
1083
1084 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1085 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1086
1087 # Populate device directory with random files.
1088 temp_files = make_random_device_files(
1089 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1090
1091 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1092
1093 for temp_file in temp_files:
1094 host_path = os.path.join(dest_dir, temp_file.base_name)
1095 self._verify_local(temp_file.checksum, host_path)
1096
1097 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1098 finally:
1099 if host_dir is not None:
1100 shutil.rmtree(host_dir)
1101
Josh Gaoa8db2742018-08-08 14:26:03 -07001102 # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1103 def disabled_test_pull_symlink_dir(self):
Josh Gaof2642242015-12-09 14:03:30 -08001104 """Pull a symlink to a directory of symlinks to files."""
1105 try:
1106 host_dir = tempfile.mkdtemp()
1107
1108 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1109 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1110 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1111
1112 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1113 self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1114 self.device.shell(['ln', '-s', remote_links, remote_symlink])
1115
1116 # Populate device directory with random files.
1117 temp_files = make_random_device_files(
1118 self.device, in_dir=remote_dir, num_files=32)
1119
1120 for temp_file in temp_files:
1121 self.device.shell(
1122 ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1123 posixpath.join(remote_links, temp_file.base_name)])
1124
1125 self.device.pull(remote=remote_symlink, local=host_dir)
1126
1127 for temp_file in temp_files:
1128 host_path = os.path.join(
1129 host_dir, 'symlink', temp_file.base_name)
1130 self._verify_local(temp_file.checksum, host_path)
1131
1132 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1133 finally:
1134 if host_dir is not None:
1135 shutil.rmtree(host_dir)
1136
Josh Gao191c1542015-12-09 11:26:11 -08001137 def test_pull_empty(self):
1138 """Pull a directory containing an empty directory from the device."""
1139 try:
1140 host_dir = tempfile.mkdtemp()
1141
1142 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1143 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1144 self.device.shell(['mkdir', '-p', remote_empty_path])
1145
1146 self.device.pull(remote=remote_empty_path, local=host_dir)
1147 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1148 finally:
1149 if host_dir is not None:
1150 shutil.rmtree(host_dir)
1151
1152 def test_multiple_pull(self):
1153 """Pull a randomly generated directory of files from the device."""
1154
1155 try:
1156 host_dir = tempfile.mkdtemp()
1157
Josh Gao255c5c82016-03-03 14:49:02 -08001158 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -08001159 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1160 self.device.shell(['mkdir', '-p', subdir])
1161
1162 # Create some random files and a subdirectory containing more files.
1163 temp_files = make_random_device_files(
1164 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1165
1166 subdir_temp_files = make_random_device_files(
1167 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1168
1169 paths = map(lambda temp_file: temp_file.full_path, temp_files)
1170 paths.append(subdir)
1171 self.device._simple_call(['pull'] + paths + [host_dir])
1172
1173 for temp_file in temp_files:
1174 local_path = os.path.join(host_dir, temp_file.base_name)
1175 self._verify_local(temp_file.checksum, local_path)
1176
1177 for subdir_temp_file in subdir_temp_files:
1178 local_path = os.path.join(host_dir,
Josh Gao255c5c82016-03-03 14:49:02 -08001179 'subdir',
Josh Gao191c1542015-12-09 11:26:11 -08001180 subdir_temp_file.base_name)
1181 self._verify_local(subdir_temp_file.checksum, local_path)
1182
1183 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1184 finally:
1185 if host_dir is not None:
1186 shutil.rmtree(host_dir)
1187
Dan Albert06b0d6b2017-05-18 22:56:48 -07001188 def verify_sync(self, device, temp_files, device_dir):
1189 """Verifies that a list of temp files was synced to the device."""
1190 # Confirm that every file on the device mirrors that on the host.
1191 for temp_file in temp_files:
1192 device_full_path = posixpath.join(
1193 device_dir, temp_file.base_name)
1194 dev_md5, _ = device.shell(
1195 [get_md5_prog(self.device), device_full_path])[0].split()
1196 self.assertEqual(temp_file.checksum, dev_md5)
1197
Josh Gao191c1542015-12-09 11:26:11 -08001198 def test_sync(self):
Dan Albert06b0d6b2017-05-18 22:56:48 -07001199 """Sync a host directory to the data partition."""
Josh Gao191c1542015-12-09 11:26:11 -08001200
1201 try:
1202 base_dir = tempfile.mkdtemp()
1203
1204 # Create mirror device directory hierarchy within base_dir.
1205 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1206 os.makedirs(full_dir_path)
1207
1208 # Create 32 random files within the host mirror.
Dan Albert06b0d6b2017-05-18 22:56:48 -07001209 temp_files = make_random_host_files(
1210 in_dir=full_dir_path, num_files=32)
Josh Gao191c1542015-12-09 11:26:11 -08001211
Dan Albert06b0d6b2017-05-18 22:56:48 -07001212 # Clean up any stale files on the device.
Dan Albertdef4aae2017-05-18 13:52:45 -07001213 device = adb.get_device() # pylint: disable=no-member
Josh Gao191c1542015-12-09 11:26:11 -08001214 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1215
Dan Albertdef4aae2017-05-18 13:52:45 -07001216 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1217 os.environ['ANDROID_PRODUCT_OUT'] = base_dir
Josh Gao191c1542015-12-09 11:26:11 -08001218 device.sync('data')
Dan Albertdef4aae2017-05-18 13:52:45 -07001219 if old_product_out is None:
1220 del os.environ['ANDROID_PRODUCT_OUT']
1221 else:
1222 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
Josh Gao191c1542015-12-09 11:26:11 -08001223
Dan Albert06b0d6b2017-05-18 22:56:48 -07001224 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
Josh Gao191c1542015-12-09 11:26:11 -08001225
Dan Albert06b0d6b2017-05-18 22:56:48 -07001226 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
Josh Gao191c1542015-12-09 11:26:11 -08001227 finally:
1228 if base_dir is not None:
1229 shutil.rmtree(base_dir)
1230
Dan Albert06b0d6b2017-05-18 22:56:48 -07001231 def test_push_sync(self):
1232 """Sync a host directory to a specific path."""
1233
1234 try:
1235 temp_dir = tempfile.mkdtemp()
1236 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1237
1238 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1239
1240 # Clean up any stale files on the device.
1241 device = adb.get_device() # pylint: disable=no-member
1242 device.shell(['rm', '-rf', device_dir])
1243
1244 device.push(temp_dir, device_dir, sync=True)
1245
1246 self.verify_sync(device, temp_files, device_dir)
1247
1248 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1249 finally:
1250 if temp_dir is not None:
1251 shutil.rmtree(temp_dir)
1252
Josh Gao191c1542015-12-09 11:26:11 -08001253 def test_unicode_paths(self):
1254 """Ensure that we can support non-ASCII paths, even on Windows."""
1255 name = u'로보카 폴리'
1256
1257 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1258 remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1259
1260 ## push.
1261 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1262 tf.close()
1263 self.device.push(tf.name, remote_path)
1264 os.remove(tf.name)
1265 self.assertFalse(os.path.exists(tf.name))
1266
1267 # Verify that the device ended up with the expected UTF-8 path
1268 output = self.device.shell(
1269 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
Josh Gao2c621ae2018-03-19 16:09:05 -07001270 self.assertEqual(remote_path, output)
Josh Gao191c1542015-12-09 11:26:11 -08001271
1272 # pull.
1273 self.device.pull(remote_path, tf.name)
1274 self.assertTrue(os.path.exists(tf.name))
1275 os.remove(tf.name)
1276 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1277
1278
Yabin Cuib5e11412017-03-10 16:01:01 -08001279class DeviceOfflineTest(DeviceTest):
1280 def _get_device_state(self, serialno):
1281 output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1282 for line in output.split('\n'):
1283 m = re.match('(\S+)\s+(\S+)', line)
1284 if m and m.group(1) == serialno:
1285 return m.group(2)
1286 return None
1287
Josh Gao33d14b82017-09-13 14:51:23 -07001288 def disabled_test_killed_when_pushing_a_large_file(self):
Yabin Cuib5e11412017-03-10 16:01:01 -08001289 """
1290 While running adb push with a large file, kill adb server.
1291 Occasionally the device becomes offline. Because the device is still
1292 reading data without realizing that the adb server has been restarted.
1293 Test if we can bring the device online automatically now.
1294 http://b/32952319
1295 """
1296 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1297 # 1. Push a large file
1298 file_path = 'tmp_large_file'
1299 try:
1300 fh = open(file_path, 'w')
1301 fh.write('\0' * (100 * 1024 * 1024))
1302 fh.close()
1303 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1304 time.sleep(0.1)
1305 # 2. Kill the adb server
1306 subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1307 subproc.terminate()
1308 finally:
1309 try:
1310 os.unlink(file_path)
1311 except:
1312 pass
1313 # 3. See if the device still exist.
1314 # Sleep to wait for the adb server exit.
1315 time.sleep(0.5)
1316 # 4. The device should be online
1317 self.assertEqual(self._get_device_state(serialno), 'device')
1318
Josh Gao33d14b82017-09-13 14:51:23 -07001319 def disabled_test_killed_when_pulling_a_large_file(self):
Yabin Cuib5e11412017-03-10 16:01:01 -08001320 """
1321 While running adb pull with a large file, kill adb server.
1322 Occasionally the device can't be connected. Because the device is trying to
1323 send a message larger than what is expected by the adb server.
1324 Test if we can bring the device online automatically now.
1325 """
1326 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1327 file_path = 'tmp_large_file'
1328 try:
1329 # 1. Create a large file on device.
1330 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1331 'bs=1000000', 'count=100'])
1332 # 2. Pull the large file on host.
1333 subproc = subprocess.Popen(self.device.adb_cmd +
1334 ['pull','/data/local/tmp/tmp_large_file', file_path])
1335 time.sleep(0.1)
1336 # 3. Kill the adb server
1337 subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1338 subproc.terminate()
1339 finally:
1340 try:
1341 os.unlink(file_path)
1342 except:
1343 pass
1344 # 4. See if the device still exist.
1345 # Sleep to wait for the adb server exit.
1346 time.sleep(0.5)
1347 self.assertEqual(self._get_device_state(serialno), 'device')
1348
1349
Josh Gaoef3d3432017-05-02 15:01:09 -07001350 def test_packet_size_regression(self):
1351 """Test for http://b/37783561
1352
1353 Receiving packets of a length divisible by 512 but not 1024 resulted in
1354 the adb client waiting indefinitely for more input.
1355 """
1356 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1357 # Probe some surrounding values as well, for the hell of it.
Josh Gao2ea46522018-04-10 14:35:06 -07001358 for base in [512] + range(1024, 1024 * 16, 1024):
1359 for offset in [-6, -5, -4]:
1360 length = base + offset
1361 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1362 'echo', 'foo']
1363 rc, stdout, _ = self.device.shell_nocheck(cmd)
Josh Gaoef3d3432017-05-02 15:01:09 -07001364
Josh Gao2ea46522018-04-10 14:35:06 -07001365 self.assertEqual(0, rc)
Josh Gaoef3d3432017-05-02 15:01:09 -07001366
Josh Gao2ea46522018-04-10 14:35:06 -07001367 # Output should be '\0' * length, followed by "foo\n"
1368 self.assertEqual(length, len(stdout) - 4)
1369 self.assertEqual(stdout, "\0" * length + "foo\n")
Josh Gaoef3d3432017-05-02 15:01:09 -07001370
Josh Gao5d799cd2018-08-22 15:13:18 -07001371 def test_zero_packet(self):
1372 """Test for http://b/113070258
1373
1374 Make sure that we don't blow up when sending USB transfers that line up
1375 exactly with the USB packet size.
1376 """
1377
1378 local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1379 try:
1380 for size in [512, 1024]:
1381 def listener():
1382 cmd = ["echo foo | nc -l -p 12345; echo done"]
1383 rc, stdout, stderr = self.device.shell_nocheck(cmd)
1384
1385 thread = threading.Thread(target=listener)
1386 thread.start()
1387
1388 # Wait a bit to let the shell command start.
1389 time.sleep(0.25)
1390
1391 sock = socket.create_connection(("localhost", local_port))
1392 with contextlib.closing(sock):
1393 bytesWritten = sock.send("a" * size)
1394 self.assertEqual(size, bytesWritten)
1395 readBytes = sock.recv(4096)
1396 self.assertEqual("foo\n", readBytes)
1397
1398 thread.join()
1399 finally:
1400 self.device.forward_remove("tcp:{}".format(local_port))
1401
Josh Gaoef3d3432017-05-02 15:01:09 -07001402
Josh Gao74b7ec72019-01-11 14:42:08 -08001403class SocketTest(DeviceTest):
1404 def test_socket_flush(self):
1405 """Test that we handle socket closure properly.
1406
1407 If we're done writing to a socket, closing before the other end has
1408 closed will send a TCP_RST if we have incoming data queued up, which
1409 may result in data that we've written being discarded.
1410
1411 Bug: http://b/74616284
1412 """
1413 s = socket.create_connection(("localhost", 5037))
1414
1415 def adb_length_prefixed(string):
1416 encoded = string.encode("utf8")
1417 result = b"%04x%s" % (len(encoded), encoded)
1418 return result
1419
1420 if "ANDROID_SERIAL" in os.environ:
1421 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1422 else:
1423 transport_string = "host:transport-any"
1424
1425 s.sendall(adb_length_prefixed(transport_string))
1426 response = s.recv(4)
1427 self.assertEquals(b"OKAY", response)
1428
1429 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1430 s.sendall(adb_length_prefixed(shell_string))
1431
1432 response = s.recv(4)
1433 self.assertEquals(b"OKAY", response)
1434
1435 # Spawn a thread that dumps garbage into the socket until failure.
1436 def spam():
1437 buf = b"\0" * 16384
1438 try:
1439 while True:
1440 s.sendall(buf)
1441 except Exception as ex:
1442 print(ex)
1443
1444 thread = threading.Thread(target=spam)
1445 thread.start()
1446
1447 time.sleep(1)
1448
1449 received = b""
1450 while True:
1451 read = s.recv(512)
1452 if len(read) == 0:
1453 break
1454 received += read
1455
1456 self.assertEquals(1024 * 1024 + len("foo\n"), len(received))
1457 thread.join()
1458
1459
Spencer Low69d8c392018-08-11 00:16:16 -07001460if sys.platform == "win32":
1461 # From https://stackoverflow.com/a/38749458
1462 import os
1463 import contextlib
1464 import msvcrt
1465 import ctypes
1466 from ctypes import wintypes
1467
1468 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1469
1470 GENERIC_READ = 0x80000000
1471 GENERIC_WRITE = 0x40000000
1472 FILE_SHARE_READ = 1
1473 FILE_SHARE_WRITE = 2
1474 CONSOLE_TEXTMODE_BUFFER = 1
1475 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1476 STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1477 STD_ERROR_HANDLE = wintypes.DWORD(-12)
1478
1479 def _check_zero(result, func, args):
1480 if not result:
1481 raise ctypes.WinError(ctypes.get_last_error())
1482 return args
1483
1484 def _check_invalid(result, func, args):
1485 if result == INVALID_HANDLE_VALUE:
1486 raise ctypes.WinError(ctypes.get_last_error())
1487 return args
1488
1489 if not hasattr(wintypes, 'LPDWORD'): # Python 2
1490 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1491 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1492
1493 class COORD(ctypes.Structure):
1494 _fields_ = (('X', wintypes.SHORT),
1495 ('Y', wintypes.SHORT))
1496
1497 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1498 _fields_ = (('cbSize', wintypes.ULONG),
1499 ('dwSize', COORD),
1500 ('dwCursorPosition', COORD),
1501 ('wAttributes', wintypes.WORD),
1502 ('srWindow', wintypes.SMALL_RECT),
1503 ('dwMaximumWindowSize', COORD),
1504 ('wPopupAttributes', wintypes.WORD),
1505 ('bFullscreenSupported', wintypes.BOOL),
1506 ('ColorTable', wintypes.DWORD * 16))
1507 def __init__(self, *args, **kwds):
1508 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1509 *args, **kwds)
1510 self.cbSize = ctypes.sizeof(self)
1511
1512 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1513 CONSOLE_SCREEN_BUFFER_INFOEX)
1514 LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1515
1516 kernel32.GetStdHandle.errcheck = _check_invalid
1517 kernel32.GetStdHandle.restype = wintypes.HANDLE
1518 kernel32.GetStdHandle.argtypes = (
1519 wintypes.DWORD,) # _In_ nStdHandle
1520
1521 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1522 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1523 kernel32.CreateConsoleScreenBuffer.argtypes = (
1524 wintypes.DWORD, # _In_ dwDesiredAccess
1525 wintypes.DWORD, # _In_ dwShareMode
1526 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes
1527 wintypes.DWORD, # _In_ dwFlags
1528 wintypes.LPVOID) # _Reserved_ lpScreenBufferData
1529
1530 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1531 kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1532 wintypes.HANDLE, # _In_ hConsoleOutput
1533 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1534
1535 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1536 kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1537 wintypes.HANDLE, # _In_ hConsoleOutput
1538 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo
1539
1540 kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1541 kernel32.SetConsoleWindowInfo.argtypes = (
1542 wintypes.HANDLE, # _In_ hConsoleOutput
1543 wintypes.BOOL, # _In_ bAbsolute
1544 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1545
1546 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1547 kernel32.FillConsoleOutputCharacterW.argtypes = (
1548 wintypes.HANDLE, # _In_ hConsoleOutput
1549 wintypes.WCHAR, # _In_ cCharacter
1550 wintypes.DWORD, # _In_ nLength
1551 COORD, # _In_ dwWriteCoord
1552 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1553
1554 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1555 kernel32.ReadConsoleOutputCharacterW.argtypes = (
1556 wintypes.HANDLE, # _In_ hConsoleOutput
1557 wintypes.LPWSTR, # _Out_ lpCharacter
1558 wintypes.DWORD, # _In_ nLength
1559 COORD, # _In_ dwReadCoord
1560 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1561
1562 @contextlib.contextmanager
1563 def allocate_console():
1564 allocated = kernel32.AllocConsole()
1565 try:
1566 yield allocated
1567 finally:
1568 if allocated:
1569 kernel32.FreeConsole()
1570
1571 @contextlib.contextmanager
1572 def console_screen(ncols=None, nrows=None):
1573 info = CONSOLE_SCREEN_BUFFER_INFOEX()
1574 new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1575 nwritten = (wintypes.DWORD * 1)()
1576 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1577 kernel32.GetConsoleScreenBufferInfoEx(
1578 hStdOut, ctypes.byref(info))
1579 if ncols is None:
1580 ncols = info.dwSize.X
1581 if nrows is None:
1582 nrows = info.dwSize.Y
1583 elif nrows > 9999:
1584 raise ValueError('nrows must be 9999 or less')
1585 fd_screen = None
1586 hScreen = kernel32.CreateConsoleScreenBuffer(
1587 GENERIC_READ | GENERIC_WRITE,
1588 FILE_SHARE_READ | FILE_SHARE_WRITE,
1589 None, CONSOLE_TEXTMODE_BUFFER, None)
1590 try:
1591 fd_screen = msvcrt.open_osfhandle(
1592 hScreen, os.O_RDWR | os.O_BINARY)
1593 kernel32.GetConsoleScreenBufferInfoEx(
1594 hScreen, ctypes.byref(new_info))
1595 new_info.dwSize = COORD(ncols, nrows)
1596 new_info.srWindow = wintypes.SMALL_RECT(
1597 Left=0, Top=0, Right=(ncols - 1),
1598 Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1599 kernel32.SetConsoleScreenBufferInfoEx(
1600 hScreen, ctypes.byref(new_info))
1601 kernel32.SetConsoleWindowInfo(hScreen, True,
1602 ctypes.byref(new_info.srWindow))
1603 kernel32.FillConsoleOutputCharacterW(
1604 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1605 kernel32.SetConsoleActiveScreenBuffer(hScreen)
1606 try:
1607 yield fd_screen
1608 finally:
1609 kernel32.SetConsoleScreenBufferInfoEx(
1610 hStdOut, ctypes.byref(info))
1611 kernel32.SetConsoleWindowInfo(hStdOut, True,
1612 ctypes.byref(info.srWindow))
1613 kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1614 finally:
1615 if fd_screen is not None:
1616 os.close(fd_screen)
1617 else:
1618 kernel32.CloseHandle(hScreen)
1619
1620 def read_screen(fd):
1621 hScreen = msvcrt.get_osfhandle(fd)
1622 csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1623 kernel32.GetConsoleScreenBufferInfoEx(
1624 hScreen, ctypes.byref(csbi))
1625 ncols = csbi.dwSize.X
1626 pos = csbi.dwCursorPosition
1627 length = ncols * pos.Y + pos.X + 1
1628 buf = (ctypes.c_wchar * length)()
1629 n = (wintypes.DWORD * 1)()
1630 kernel32.ReadConsoleOutputCharacterW(
1631 hScreen, buf, length, COORD(0,0), n)
1632 lines = [buf[i:i+ncols].rstrip(u'\0')
1633 for i in range(0, n[0], ncols)]
1634 return u'\n'.join(lines)
1635
1636@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1637class WindowsConsoleTest(DeviceTest):
1638 def test_unicode_output(self):
1639 """Test Unicode command line parameters and Unicode console window output.
1640
1641 Bug: https://issuetracker.google.com/issues/111972753
1642 """
1643 # If we don't have a console window, allocate one. This isn't necessary if we're already
1644 # being run from a console window, which is typical.
1645 with allocate_console() as allocated_console:
1646 # Create a temporary console buffer and switch to it. We could also pass a parameter of
1647 # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1648 # likely unnecessary given the typical console window size.
1649 with console_screen(nrows=1000) as screen:
1650 unicode_string = u'로보카 폴리'
1651 # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1652 # device.shell_popen() which does not use a pipe, unlike device.shell().
1653 process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1654 process.wait()
1655 # Read what was written by adb to the temporary console buffer.
1656 console_output = read_screen(screen)
1657 self.assertEqual(unicode_string, console_output)
1658
1659
Josh Gao191c1542015-12-09 11:26:11 -08001660def main():
1661 random.seed(0)
1662 if len(adb.get_devices()) > 0:
1663 suite = unittest.TestLoader().loadTestsFromName(__name__)
1664 unittest.TextTestRunner(verbosity=3).run(suite)
1665 else:
1666 print('Test suite must be run with attached devices')
1667
1668
1669if __name__ == '__main__':
1670 main()