blob: 6c20b6e0b1fb5c16074f09bcd861d633ab94352a [file] [log] [blame]
Dan Albert8e1fdd72015-07-24 17:08:33 -07001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import hashlib
21import os
22import posixpath
23import random
24import shlex
25import shutil
26import subprocess
27import tempfile
28import unittest
29
30import mock
31
32import adb
33
34
35class GetDeviceTest(unittest.TestCase):
36 def setUp(self):
37 self.android_serial = os.getenv('ANDROID_SERIAL')
38 del os.environ['ANDROID_SERIAL']
39
40 def tearDown(self):
41 os.environ['ANDROID_SERIAL'] = self.android_serial
42
43 @mock.patch('adb.device.get_devices')
44 def test_explicit(self, mock_get_devices):
45 mock_get_devices.return_value = ['foo', 'bar']
46 device = adb.get_device('foo')
47 self.assertEqual(device.serial, 'foo')
48
49 @mock.patch('adb.device.get_devices')
50 def test_from_env(self, mock_get_devices):
51 mock_get_devices.return_value = ['foo', 'bar']
52 os.environ['ANDROID_SERIAL'] = 'foo'
53 device = adb.get_device()
54 self.assertEqual(device.serial, 'foo')
55
56 @mock.patch('adb.device.get_devices')
57 def test_arg_beats_env(self, mock_get_devices):
58 mock_get_devices.return_value = ['foo', 'bar']
59 os.environ['ANDROID_SERIAL'] = 'bar'
60 device = adb.get_device('foo')
61 self.assertEqual(device.serial, 'foo')
62
63 @mock.patch('adb.device.get_devices')
64 def test_no_such_device(self, mock_get_devices):
65 mock_get_devices.return_value = ['foo', 'bar']
66 self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
67
68 os.environ['ANDROID_SERIAL'] = 'baz'
69 self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
70
71 @mock.patch('adb.device.get_devices')
72 def test_unique_device(self, mock_get_devices):
73 mock_get_devices.return_value = ['foo']
74 device = adb.get_device()
75 self.assertEqual(device.serial, 'foo')
76
77 @mock.patch('adb.device.get_devices')
78 def test_no_unique_device(self, mock_get_devices):
79 mock_get_devices.return_value = ['foo', 'bar']
80 self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
81
82
83class DeviceTest(unittest.TestCase):
84 def setUp(self):
85 self.device = adb.get_device()
86
87
88class ShellTest(DeviceTest):
89 def test_cat(self):
90 """Check that we can at least cat a file."""
91 out = self.device.shell(['cat', '/proc/uptime']).strip()
92 elements = out.split()
93 self.assertEqual(len(elements), 2)
94
95 uptime, idle = elements
96 self.assertGreater(float(uptime), 0.0)
97 self.assertGreater(float(idle), 0.0)
98
99 def test_throws_on_failure(self):
100 self.assertRaises(subprocess.CalledProcessError,
101 self.device.shell, ['false'])
102
103 def test_output_not_stripped(self):
104 out = self.device.shell(['echo', 'foo'])
105 self.assertEqual(out, 'foo' + self.device.linesep)
106
107 def test_shell_nocheck_failure(self):
108 rc, out = self.device.shell_nocheck(['false'])
109 self.assertNotEqual(rc, 0)
110 self.assertEqual(out, '')
111
112 def test_shell_nocheck_output_not_stripped(self):
113 rc, out = self.device.shell_nocheck(['echo', 'foo'])
114 self.assertEqual(rc, 0)
115 self.assertEqual(out, 'foo' + self.device.linesep)
116
117 def test_can_distinguish_tricky_results(self):
118 # If result checking on ADB shell is naively implemented as
119 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
120 # output from the result for a cmd of `echo -n 1`.
121 rc, out = self.device.shell_nocheck(['echo', '-n', '1'])
122 self.assertEqual(rc, 0)
123 self.assertEqual(out, '1')
124
125 def test_line_endings(self):
126 """Ensure that line ending translation is not happening in the pty.
127
128 Bug: http://b/19735063
129 """
130 output = self.device.shell(['uname'])
131 self.assertEqual(output, 'Linux' + self.device.linesep)
132
133
134class ArgumentEscapingTest(DeviceTest):
135 def test_shell_escaping(self):
136 """Make sure that argument escaping is somewhat sane."""
137
138 # http://b/19734868
139 # Note that this actually matches ssh(1)'s behavior --- it's
140 # converted to `sh -c echo hello; echo world` which sh interprets
141 # as `sh -c echo` (with an argument to that shell of "hello"),
142 # and then `echo world` back in the first shell.
143 result = self.device.shell(
144 shlex.split("sh -c 'echo hello; echo world'"))
145 result = result.splitlines()
146 self.assertEqual(['', 'world'], result)
147 # If you really wanted "hello" and "world", here's what you'd do:
148 result = self.device.shell(
149 shlex.split(r'echo hello\;echo world')).splitlines()
150 self.assertEqual(['hello', 'world'], result)
151
152 # http://b/15479704
153 result = self.device.shell(shlex.split("'true && echo t'")).strip()
154 self.assertEqual('t', result)
155 result = self.device.shell(
156 shlex.split("sh -c 'true && echo t'")).strip()
157 self.assertEqual('t', result)
158
159 # http://b/20564385
160 result = self.device.shell(shlex.split('FOO=a BAR=b echo t')).strip()
161 self.assertEqual('t', result)
162 result = self.device.shell(shlex.split(r'echo -n 123\;uname')).strip()
163 self.assertEqual('123Linux', result)
164
165 def test_install_argument_escaping(self):
166 """Make sure that install argument escaping works."""
167 # http://b/20323053
168 tf = tempfile.NamedTemporaryFile('wb', suffix='-text;ls;1.apk')
169 self.assertIn("-text;ls;1.apk", self.device.install(tf.name))
170
171 # http://b/3090932
172 tf = tempfile.NamedTemporaryFile('wb', suffix="-Live Hold'em.apk")
173 self.assertIn("-Live Hold'em.apk", self.device.install(tf.name))
174
175
176class RootUnrootTest(DeviceTest):
177 def _test_root(self):
178 message = self.device.root()
179 if 'adbd cannot run as root in production builds' in message:
180 return
181 self.device.wait()
182 self.assertEqual('root', self.device.shell(['id', '-un']).strip())
183
184 def _test_unroot(self):
185 self.device.unroot()
186 self.device.wait()
187 self.assertEqual('shell', self.device.shell(['id', '-un']).strip())
188
189 def test_root_unroot(self):
190 """Make sure that adb root and adb unroot work, using id(1)."""
191 original_user = self.device.shell(['id', '-un']).strip()
192 try:
193 if original_user == 'root':
194 self._test_unroot()
195 self._test_root()
196 elif original_user == 'shell':
197 self._test_root()
198 self._test_unroot()
199 finally:
200 if original_user == 'root':
201 self.device.root()
202 else:
203 self.device.unroot()
204 self.device.wait()
205
206
207class TcpIpTest(DeviceTest):
208 def test_tcpip_failure_raises(self):
209 """adb tcpip requires a port.
210
211 Bug: http://b/22636927
212 """
213 self.assertRaises(
214 subprocess.CalledProcessError, self.device.tcpip, '')
215 self.assertRaises(
216 subprocess.CalledProcessError, self.device.tcpip, 'foo')
217
218
219def compute_md5(string):
220 hsh = hashlib.md5()
221 hsh.update(string)
222 return hsh.hexdigest()
223
224
225def get_md5_prog(device):
226 """Older platforms (pre-L) had the name md5 rather than md5sum."""
227 try:
228 device.shell(['md5sum', '/proc/uptime'])
229 return 'md5sum'
230 except subprocess.CalledProcessError:
231 return 'md5'
232
233
234class HostFile(object):
235 def __init__(self, handle, checksum):
236 self.handle = handle
237 self.checksum = checksum
238 self.full_path = handle.name
239 self.base_name = os.path.basename(self.full_path)
240
241
242class DeviceFile(object):
243 def __init__(self, checksum, full_path):
244 self.checksum = checksum
245 self.full_path = full_path
246 self.base_name = posixpath.basename(self.full_path)
247
248
249def make_random_host_files(in_dir, num_files):
250 min_size = 1 * (1 << 10)
251 max_size = 16 * (1 << 10)
252
253 files = []
254 for _ in xrange(num_files):
255 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
256
257 size = random.randrange(min_size, max_size, 1024)
258 rand_str = os.urandom(size)
259 file_handle.write(rand_str)
260 file_handle.flush()
261 file_handle.close()
262
263 md5 = compute_md5(rand_str)
264 files.append(HostFile(file_handle, md5))
265 return files
266
267
268def make_random_device_files(device, in_dir, num_files):
269 min_size = 1 * (1 << 10)
270 max_size = 16 * (1 << 10)
271
272 files = []
273 for file_num in xrange(num_files):
274 size = random.randrange(min_size, max_size, 1024)
275
276 base_name = 'device_tmpfile' + str(file_num)
277 full_path = os.path.join(in_dir, base_name)
278
279 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
280 'bs={}'.format(size), 'count=1'])
281 dev_md5, _ = device.shell([get_md5_prog(device), full_path]).split()
282
283 files.append(DeviceFile(dev_md5, full_path))
284 return files
285
286
287class FileOperationsTest(DeviceTest):
288 SCRATCH_DIR = '/data/local/tmp'
289 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
290 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
291
292 def _test_push(self, local_file, checksum):
293 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
294 try:
295 self.device.push(
296 local=local_file, remote=self.DEVICE_TEMP_FILE)
297 dev_md5, _ = self.device.shell(
298 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE]).split()
299 self.assertEqual(checksum, dev_md5)
300 finally:
301 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
302
303 def test_push(self):
304 """Push a randomly generated file to specified device."""
305 kbytes = 512
306 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
307 try:
308 rand_str = os.urandom(1024 * kbytes)
309 tmp.write(rand_str)
310 tmp.close()
311 self._test_push(tmp.name, compute_md5(rand_str))
312 finally:
313 os.remove(tmp.name)
314
315 # TODO: write push directory test.
316
317 def _test_pull(self, remote_file, checksum):
318 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
319 try:
320 tmp_write.close()
321 self.device.pull(remote=remote_file, local=tmp_write.name)
322 with open(tmp_write.name, 'rb') as tmp_read:
323 host_contents = tmp_read.read()
324 host_md5 = compute_md5(host_contents)
325 self.assertEqual(checksum, host_md5)
326 finally:
327 os.remove(tmp_write.name)
328
329 def test_pull(self):
330 """Pull a randomly generated file from specified device."""
331 kbytes = 512
332 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
333 try:
334 cmd = ['dd', 'if=/dev/urandom',
335 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
336 'count={}'.format(kbytes)]
337 self.device.shell(cmd)
338 dev_md5, _ = self.device.shell(
339 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE]).split()
340 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
341 finally:
342 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
343
344 def test_pull_dir(self):
345 """Pull a randomly generated directory of files from the device."""
346 host_dir = tempfile.mkdtemp()
347 try:
348 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
349 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
350
351 # Populate device directory with random files.
352 temp_files = make_random_device_files(
353 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
354
355 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
356
357 for temp_file in temp_files:
358 host_path = os.path.join(host_dir, temp_file.base_name)
359 with open(host_path, 'rb') as host_file:
360 host_md5 = compute_md5(host_file.read())
361 self.assertEqual(host_md5, temp_file.checksum)
362 finally:
363 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
364 if host_dir is not None:
365 shutil.rmtree(host_dir)
366
367 def test_sync(self):
368 """Sync a randomly generated directory of files to specified device."""
369 base_dir = tempfile.mkdtemp()
370 try:
371 # Create mirror device directory hierarchy within base_dir.
372 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
373 os.makedirs(full_dir_path)
374
375 # Create 32 random files within the host mirror.
376 temp_files = make_random_host_files(in_dir=full_dir_path,
377 num_files=32)
378
379 # Clean up any trash on the device.
380 device = adb.get_device(product=base_dir)
381 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
382
383 device.sync('data')
384
385 # Confirm that every file on the device mirrors that on the host.
386 for temp_file in temp_files:
387 device_full_path = posixpath.join(
388 self.DEVICE_TEMP_DIR, temp_file.base_name)
389 dev_md5, _ = device.shell(
390 [get_md5_prog(self.device), device_full_path]).split()
391 self.assertEqual(temp_file.checksum, dev_md5)
392 finally:
393 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
394 shutil.rmtree(base_dir + self.DEVICE_TEMP_DIR)
395
396
397 def test_unicode_paths(self):
398 """Ensure that we can support non-ASCII paths, even on Windows."""
399 name = u'로보카 폴리'.encode('utf-8')
400
401 ## push.
402 tf = tempfile.NamedTemporaryFile('wb', suffix=name)
403 self.device.push(tf.name, '/data/local/tmp/adb-test-{}'.format(name))
404 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
405
406 # pull.
407 cmd = ['touch', '"/data/local/tmp/adb-test-{}"'.format(name)]
408 self.device.shell(cmd)
409
410 tf = tempfile.NamedTemporaryFile('wb', suffix=name)
411 self.device.pull('/data/local/tmp/adb-test-{}'.format(name), tf.name)
412
413
414def main():
415 random.seed(0)
416 if len(adb.get_devices()) > 0:
417 suite = unittest.TestLoader().loadTestsFromName(__name__)
418 unittest.TextTestRunner(verbosity=3).run(suite)
419 else:
420 print('Test suite must be run with attached devices')
421
422
423if __name__ == '__main__':
424 main()