adb: add client side shell protocol and enable.
Adds the shell protocol functionality to the client side and enables it
if the transport supports the feature.
Bug:http://b/23031026
Change-Id: I9abe1c8b1d39f8dd09666321b1c761ad708a8854
diff --git a/adb/test_device.py b/adb/test_device.py
index 2006937..fedd2d7 100644
--- a/adb/test_device.py
+++ b/adb/test_device.py
@@ -37,7 +37,7 @@
if self.device.get_prop('ro.debuggable') != '1':
raise unittest.SkipTest('requires rootable build')
- was_root = self.device.shell(['id', '-un']).strip() == 'root'
+ was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
if not was_root:
self.device.root()
self.device.wait()
@@ -113,7 +113,7 @@
class ShellTest(DeviceTest):
def test_cat(self):
"""Check that we can at least cat a file."""
- out = self.device.shell(['cat', '/proc/uptime']).strip()
+ out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
elements = out.split()
self.assertEqual(len(elements), 2)
@@ -122,20 +122,19 @@
self.assertGreater(float(idle), 0.0)
def test_throws_on_failure(self):
- self.assertRaises(subprocess.CalledProcessError,
- self.device.shell, ['false'])
+ self.assertRaises(adb.ShellError, self.device.shell, ['false'])
def test_output_not_stripped(self):
- out = self.device.shell(['echo', 'foo'])
+ out = self.device.shell(['echo', 'foo'])[0]
self.assertEqual(out, 'foo' + self.device.linesep)
def test_shell_nocheck_failure(self):
- rc, out = self.device.shell_nocheck(['false'])
+ rc, out, _ = self.device.shell_nocheck(['false'])
self.assertNotEqual(rc, 0)
self.assertEqual(out, '')
def test_shell_nocheck_output_not_stripped(self):
- rc, out = self.device.shell_nocheck(['echo', 'foo'])
+ rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
self.assertEqual(rc, 0)
self.assertEqual(out, 'foo' + self.device.linesep)
@@ -143,7 +142,7 @@
# If result checking on ADB shell is naively implemented as
# `adb shell <cmd>; echo $?`, we would be unable to distinguish the
# output from the result for a cmd of `echo -n 1`.
- rc, out = self.device.shell_nocheck(['echo', '-n', '1'])
+ rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
self.assertEqual(rc, 0)
self.assertEqual(out, '1')
@@ -152,7 +151,7 @@
Bug: http://b/19735063
"""
- output = self.device.shell(['uname'])
+ output = self.device.shell(['uname'])[0]
self.assertEqual(output, 'Linux' + self.device.linesep)
def test_pty_logic(self):
@@ -180,6 +179,23 @@
exit_code = self.device.shell_nocheck(['[ -t 0 ]'])[0]
self.assertEqual(exit_code, 1)
+ def test_shell_protocol(self):
+ """Tests the shell protocol on the device.
+
+ If the device supports shell protocol, this gives us the ability
+ to separate stdout/stderr and return the exit code directly.
+
+ Bug: http://b/19734861
+ """
+ if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
+ raise unittest.SkipTest('shell protocol unsupported on this device')
+ result = self.device.shell_nocheck(
+ shlex.split('echo foo; echo bar >&2; exit 17'))
+
+ self.assertEqual(17, result[0])
+ self.assertEqual('foo' + self.device.linesep, result[1])
+ self.assertEqual('bar' + self.device.linesep, result[2])
+
class ArgumentEscapingTest(DeviceTest):
def test_shell_escaping(self):
@@ -191,25 +207,26 @@
# as `sh -c echo` (with an argument to that shell of "hello"),
# and then `echo world` back in the first shell.
result = self.device.shell(
- shlex.split("sh -c 'echo hello; echo world'"))
+ shlex.split("sh -c 'echo hello; echo world'"))[0]
result = result.splitlines()
self.assertEqual(['', 'world'], result)
# If you really wanted "hello" and "world", here's what you'd do:
result = self.device.shell(
- shlex.split(r'echo hello\;echo world')).splitlines()
+ shlex.split(r'echo hello\;echo world'))[0].splitlines()
self.assertEqual(['hello', 'world'], result)
# http://b/15479704
- result = self.device.shell(shlex.split("'true && echo t'")).strip()
+ result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
self.assertEqual('t', result)
result = self.device.shell(
- shlex.split("sh -c 'true && echo t'")).strip()
+ shlex.split("sh -c 'true && echo t'"))[0].strip()
self.assertEqual('t', result)
# http://b/20564385
- result = self.device.shell(shlex.split('FOO=a BAR=b echo t')).strip()
+ result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
self.assertEqual('t', result)
- result = self.device.shell(shlex.split(r'echo -n 123\;uname')).strip()
+ result = self.device.shell(
+ shlex.split(r'echo -n 123\;uname'))[0].strip()
self.assertEqual('123Linux', result)
def test_install_argument_escaping(self):
@@ -235,19 +252,19 @@
if 'adbd cannot run as root in production builds' in message:
return
self.device.wait()
- self.assertEqual('root', self.device.shell(['id', '-un']).strip())
+ self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
def _test_unroot(self):
self.device.unroot()
self.device.wait()
- self.assertEqual('shell', self.device.shell(['id', '-un']).strip())
+ self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
def test_root_unroot(self):
"""Make sure that adb root and adb unroot work, using id(1)."""
if self.device.get_prop('ro.debuggable') != '1':
raise unittest.SkipTest('requires rootable build')
- original_user = self.device.shell(['id', '-un']).strip()
+ original_user = self.device.shell(['id', '-un'])[0].strip()
try:
if original_user == 'root':
self._test_unroot()
@@ -286,7 +303,7 @@
self.device.set_prop(prop_name, 'qux')
self.assertEqual(
- self.device.shell(['getprop', prop_name]).strip(), 'qux')
+ self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
def compute_md5(string):
@@ -351,7 +368,7 @@
device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
'bs={}'.format(size), 'count=1'])
- dev_md5, _ = device.shell([get_md5_prog(device), full_path]).split()
+ dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
files.append(DeviceFile(dev_md5, full_path))
return files
@@ -366,7 +383,7 @@
self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
self.device.push(local=local_file, remote=self.DEVICE_TEMP_FILE)
dev_md5, _ = self.device.shell([get_md5_prog(self.device),
- self.DEVICE_TEMP_FILE]).split()
+ self.DEVICE_TEMP_FILE])[0].split()
self.assertEqual(checksum, dev_md5)
self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
@@ -401,7 +418,7 @@
'count={}'.format(kbytes)]
self.device.shell(cmd)
dev_md5, _ = self.device.shell(
- [get_md5_prog(self.device), self.DEVICE_TEMP_FILE]).split()
+ [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
@@ -449,7 +466,7 @@
device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
temp_file.base_name)
dev_md5, _ = device.shell(
- [get_md5_prog(self.device), device_full_path]).split()
+ [get_md5_prog(self.device), device_full_path])[0].split()
self.assertEqual(temp_file.checksum, dev_md5)
self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])