update_payload: Port scripts to python3
Update the update_payload scripts to be compatible with
python2 and python3. Python2 compatibility is needed since
the repo is shared with Android.
BUG=chromium:1011631
TEST=Executed aosp/system/update_engine/scripts/run_unittests and
cros_generate_update_payload
Cq-Depend: chromium:1904837, chromium:1911499
Change-Id: Ie450b80b5f7550051b38d320173ccc0c915f65e7
Reviewed-on: https://chromium-review.googlesource.com/c/aosp/platform/system/update_engine/+/1904310
Commit-Queue: Andrew Lassalle <andrewlassalle@chromium.org>
Tested-by: Andrew Lassalle <andrewlassalle@chromium.org>
Reviewed-by: Mike Frysinger <vapier@chromium.org>
Reviewed-by: Amin Hassani <ahassani@chromium.org>
Auto-Submit: Andrew Lassalle <andrewlassalle@chromium.org>
diff --git a/scripts/update_payload/checker_unittest.py b/scripts/update_payload/checker_unittest.py
index 4881653..993b785 100755
--- a/scripts/update_payload/checker_unittest.py
+++ b/scripts/update_payload/checker_unittest.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python2
+#!/usr/bin/env python
#
# Copyright (C) 2013 The Android Open Source Project
#
@@ -17,30 +17,33 @@
"""Unit testing checker.py."""
-from __future__ import print_function
+# Disable check for function names to avoid errors based on old code
+# pylint: disable-msg=invalid-name
+
+from __future__ import absolute_import
import array
import collections
-import cStringIO
import hashlib
+import io
import itertools
import os
import unittest
-# pylint cannot find mox.
-# pylint: disable=F0401
-import mox
+from six.moves import zip
+
+import mock # pylint: disable=import-error
from update_payload import checker
from update_payload import common
from update_payload import test_utils
from update_payload import update_metadata_pb2
from update_payload.error import PayloadError
-from update_payload.payload import Payload # Avoid name conflicts later.
+from update_payload.payload import Payload # Avoid name conflicts later.
def _OpTypeByName(op_name):
- """Returns the type of an operation from itsname."""
+ """Returns the type of an operation from its name."""
op_name_to_type = {
'REPLACE': common.OpType.REPLACE,
'REPLACE_BZ': common.OpType.REPLACE_BZ,
@@ -63,7 +66,7 @@
if checker_init_dargs is None:
checker_init_dargs = {}
- payload_file = cStringIO.StringIO()
+ payload_file = io.BytesIO()
payload_gen_write_to_file_func(payload_file, **payload_gen_dargs)
payload_file.seek(0)
payload = Payload(payload_file)
@@ -73,7 +76,7 @@
def _GetPayloadCheckerWithData(payload_gen):
"""Returns a payload checker from a given payload generator."""
- payload_file = cStringIO.StringIO()
+ payload_file = io.BytesIO()
payload_gen.WriteToFile(payload_file)
payload_file.seek(0)
payload = Payload(payload_file)
@@ -87,7 +90,7 @@
# pylint: disable=W0212
# Don't bark about missing members of classes you cannot import.
# pylint: disable=E1101
-class PayloadCheckerTest(mox.MoxTestBase):
+class PayloadCheckerTest(unittest.TestCase):
"""Tests the PayloadChecker class.
In addition to ordinary testFoo() methods, which are automatically invoked by
@@ -100,11 +103,42 @@
all such tests is done in AddAllParametricTests().
"""
+ def setUp(self):
+ """setUp function for unittest testcase"""
+ self.mock_checks = []
+
+ def tearDown(self):
+ """tearDown function for unittest testcase"""
+ # Verify that all mock functions were called.
+ for check in self.mock_checks:
+ check.mock_fn.assert_called_once_with(*check.exp_args, **check.exp_kwargs)
+
+ class MockChecksAtTearDown(object):
+ """Mock data storage.
+
+ This class stores the mock functions and its arguments to be checked at a
+ later point.
+ """
+ def __init__(self, mock_fn, *args, **kwargs):
+ self.mock_fn = mock_fn
+ self.exp_args = args
+ self.exp_kwargs = kwargs
+
+ def addPostCheckForMockFunction(self, mock_fn, *args, **kwargs):
+ """Store a mock function and its arguments to self.mock_checks
+
+ Args:
+ mock_fn: mock function object
+ args: expected positional arguments for the mock_fn
+ kwargs: expected named arguments for the mock_fn
+ """
+ self.mock_checks.append(self.MockChecksAtTearDown(mock_fn, *args, **kwargs))
+
def MockPayload(self):
"""Create a mock payload object, complete with a mock manifest."""
- payload = self.mox.CreateMock(Payload)
+ payload = mock.create_autospec(Payload)
payload.is_init = True
- payload.manifest = self.mox.CreateMock(
+ payload.manifest = mock.create_autospec(
update_metadata_pb2.DeltaArchiveManifest)
return payload
@@ -173,19 +207,20 @@
subreport = 'fake subreport'
# Create a mock message.
- msg = self.mox.CreateMock(update_metadata_pb2._message.Message)
- msg.HasField(name).AndReturn(is_present)
+ msg = mock.create_autospec(update_metadata_pb2._message.Message)
+ self.addPostCheckForMockFunction(msg.HasField, name)
+ msg.HasField.return_value = is_present
setattr(msg, name, val)
-
# Create a mock report.
- report = self.mox.CreateMock(checker._PayloadReport)
+ report = mock.create_autospec(checker._PayloadReport)
if is_present:
if is_submsg:
- report.AddSubReport(name).AndReturn(subreport)
+ self.addPostCheckForMockFunction(report.AddSubReport, name)
+ report.AddSubReport.return_value = subreport
else:
- report.AddField(name, convert(val), linebreak=linebreak, indent=indent)
+ self.addPostCheckForMockFunction(report.AddField, name, convert(val),
+ linebreak=linebreak, indent=indent)
- self.mox.ReplayAll()
return (msg, report, subreport, name, val)
def DoAddElemTest(self, is_present, is_mandatory, is_submsg, convert,
@@ -211,9 +246,9 @@
else:
ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*args,
**kwargs)
- self.assertEquals(val if is_present else None, ret_val)
- self.assertEquals(subreport if is_present and is_submsg else None,
- ret_subreport)
+ self.assertEqual(val if is_present else None, ret_val)
+ self.assertEqual(subreport if is_present and is_submsg else None,
+ ret_subreport)
def DoAddFieldTest(self, is_mandatory, is_present, convert, linebreak,
indent):
@@ -243,7 +278,7 @@
self.assertRaises(PayloadError, tested_func, *args, **kwargs)
else:
ret_val = tested_func(*args, **kwargs)
- self.assertEquals(val if is_present else None, ret_val)
+ self.assertEqual(val if is_present else None, ret_val)
def DoAddSubMsgTest(self, is_mandatory, is_present):
"""Parametrized testing of _Check{Mandatory,Optional}SubMsg().
@@ -267,8 +302,8 @@
self.assertRaises(PayloadError, tested_func, *args)
else:
ret_val, ret_subreport = tested_func(*args)
- self.assertEquals(val if is_present else None, ret_val)
- self.assertEquals(subreport if is_present else None, ret_subreport)
+ self.assertEqual(val if is_present else None, ret_val)
+ self.assertEqual(subreport if is_present else None, ret_subreport)
def testCheckPresentIff(self):
"""Tests _CheckPresentIff()."""
@@ -294,15 +329,14 @@
returned_signed_hash: The signed hash data retuned by openssl.
expected_signed_hash: The signed hash data to compare against.
"""
- try:
- # Stub out the subprocess invocation.
- self.mox.StubOutWithMock(checker.PayloadChecker, '_Run')
+ # Stub out the subprocess invocation.
+ with mock.patch.object(checker.PayloadChecker, '_Run') \
+ as mock_payload_checker:
if expect_subprocess_call:
- checker.PayloadChecker._Run(
- mox.IsA(list), send_data=sig_data).AndReturn(
- (sig_asn1_header + returned_signed_hash, None))
+ mock_payload_checker([], send_data=sig_data)
+ mock_payload_checker.return_value = (
+ sig_asn1_header + returned_signed_hash, None)
- self.mox.ReplayAll()
if expect_pass:
self.assertIsNone(checker.PayloadChecker._CheckSha256Signature(
sig_data, 'foo', expected_signed_hash, 'bar'))
@@ -310,13 +344,11 @@
self.assertRaises(PayloadError,
checker.PayloadChecker._CheckSha256Signature,
sig_data, 'foo', expected_signed_hash, 'bar')
- finally:
- self.mox.UnsetStubs()
def testCheckSha256Signature_Pass(self):
"""Tests _CheckSha256Signature(); pass case."""
sig_data = 'fake-signature'.ljust(256)
- signed_hash = hashlib.sha256('fake-data').digest()
+ signed_hash = hashlib.sha256(b'fake-data').digest()
self.DoCheckSha256SignatureTest(True, True, sig_data,
common.SIG_ASN1_HEADER, signed_hash,
signed_hash)
@@ -324,7 +356,7 @@
def testCheckSha256Signature_FailBadSignature(self):
"""Tests _CheckSha256Signature(); fails due to malformed signature."""
sig_data = 'fake-signature' # Malformed (not 256 bytes in length).
- signed_hash = hashlib.sha256('fake-data').digest()
+ signed_hash = hashlib.sha256(b'fake-data').digest()
self.DoCheckSha256SignatureTest(False, False, sig_data,
common.SIG_ASN1_HEADER, signed_hash,
signed_hash)
@@ -332,7 +364,7 @@
def testCheckSha256Signature_FailBadOutputLength(self):
"""Tests _CheckSha256Signature(); fails due to unexpected output length."""
sig_data = 'fake-signature'.ljust(256)
- signed_hash = 'fake-hash' # Malformed (not 32 bytes in length).
+ signed_hash = b'fake-hash' # Malformed (not 32 bytes in length).
self.DoCheckSha256SignatureTest(False, True, sig_data,
common.SIG_ASN1_HEADER, signed_hash,
signed_hash)
@@ -340,16 +372,16 @@
def testCheckSha256Signature_FailBadAsnHeader(self):
"""Tests _CheckSha256Signature(); fails due to bad ASN1 header."""
sig_data = 'fake-signature'.ljust(256)
- signed_hash = hashlib.sha256('fake-data').digest()
- bad_asn1_header = 'bad-asn-header'.ljust(len(common.SIG_ASN1_HEADER))
+ signed_hash = hashlib.sha256(b'fake-data').digest()
+ bad_asn1_header = b'bad-asn-header'.ljust(len(common.SIG_ASN1_HEADER))
self.DoCheckSha256SignatureTest(False, True, sig_data, bad_asn1_header,
signed_hash, signed_hash)
def testCheckSha256Signature_FailBadHash(self):
"""Tests _CheckSha256Signature(); fails due to bad hash returned."""
sig_data = 'fake-signature'.ljust(256)
- expected_signed_hash = hashlib.sha256('fake-data').digest()
- returned_signed_hash = hashlib.sha256('bad-fake-data').digest()
+ expected_signed_hash = hashlib.sha256(b'fake-data').digest()
+ returned_signed_hash = hashlib.sha256(b'bad-fake-data').digest()
self.DoCheckSha256SignatureTest(False, True, sig_data,
common.SIG_ASN1_HEADER,
expected_signed_hash, returned_signed_hash)
@@ -455,23 +487,23 @@
# Add old kernel/rootfs partition info, as required.
if fail_mismatched_oki_ori or fail_old_kernel_fs_size or fail_bad_oki:
oki_hash = (None if fail_bad_oki
- else hashlib.sha256('fake-oki-content').digest())
+ else hashlib.sha256(b'fake-oki-content').digest())
payload_gen.SetPartInfo(common.KERNEL, False, old_kernel_fs_size,
oki_hash)
if not fail_mismatched_oki_ori and (fail_old_rootfs_fs_size or
fail_bad_ori):
ori_hash = (None if fail_bad_ori
- else hashlib.sha256('fake-ori-content').digest())
+ else hashlib.sha256(b'fake-ori-content').digest())
payload_gen.SetPartInfo(common.ROOTFS, False, old_rootfs_fs_size,
ori_hash)
# Add new kernel/rootfs partition info.
payload_gen.SetPartInfo(
common.KERNEL, True, new_kernel_fs_size,
- None if fail_bad_nki else hashlib.sha256('fake-nki-content').digest())
+ None if fail_bad_nki else hashlib.sha256(b'fake-nki-content').digest())
payload_gen.SetPartInfo(
common.ROOTFS, True, new_rootfs_fs_size,
- None if fail_bad_nri else hashlib.sha256('fake-nri-content').digest())
+ None if fail_bad_nri else hashlib.sha256(b'fake-nri-content').digest())
# Set the minor version.
payload_gen.SetMinorVersion(0)
@@ -518,7 +550,7 @@
# Passes w/ all real extents.
extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
- self.assertEquals(
+ self.assertEqual(
23,
payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
collections.defaultdict(int), 'foo'))
@@ -553,34 +585,34 @@
block_size = payload_checker.block_size
data_length = 10000
- op = self.mox.CreateMock(
- update_metadata_pb2.InstallOperation)
+ op = mock.create_autospec(update_metadata_pb2.InstallOperation)
op.type = common.OpType.REPLACE
# Pass.
op.src_extents = []
self.assertIsNone(
payload_checker._CheckReplaceOperation(
- op, data_length, (data_length + block_size - 1) / block_size,
+ op, data_length, (data_length + block_size - 1) // block_size,
'foo'))
# Fail, src extents founds.
op.src_extents = ['bar']
self.assertRaises(
PayloadError, payload_checker._CheckReplaceOperation,
- op, data_length, (data_length + block_size - 1) / block_size, 'foo')
+ op, data_length, (data_length + block_size - 1) // block_size, 'foo')
# Fail, missing data.
op.src_extents = []
self.assertRaises(
PayloadError, payload_checker._CheckReplaceOperation,
- op, None, (data_length + block_size - 1) / block_size, 'foo')
+ op, None, (data_length + block_size - 1) // block_size, 'foo')
# Fail, length / block number mismatch.
op.src_extents = ['bar']
self.assertRaises(
PayloadError, payload_checker._CheckReplaceOperation,
- op, data_length, (data_length + block_size - 1) / block_size + 1, 'foo')
+ op, data_length, (data_length + block_size - 1) // block_size + 1,
+ 'foo')
def testCheckReplaceBzOperation(self):
"""Tests _CheckReplaceOperation() where op.type == REPLACE_BZ."""
@@ -588,7 +620,7 @@
block_size = payload_checker.block_size
data_length = block_size * 3
- op = self.mox.CreateMock(
+ op = mock.create_autospec(
update_metadata_pb2.InstallOperation)
op.type = common.OpType.REPLACE_BZ
@@ -596,25 +628,32 @@
op.src_extents = []
self.assertIsNone(
payload_checker._CheckReplaceOperation(
- op, data_length, (data_length + block_size - 1) / block_size + 5,
+ op, data_length, (data_length + block_size - 1) // block_size + 5,
'foo'))
# Fail, src extents founds.
op.src_extents = ['bar']
self.assertRaises(
PayloadError, payload_checker._CheckReplaceOperation,
- op, data_length, (data_length + block_size - 1) / block_size + 5, 'foo')
+ op, data_length, (data_length + block_size - 1) // block_size + 5,
+ 'foo')
# Fail, missing data.
op.src_extents = []
self.assertRaises(
PayloadError, payload_checker._CheckReplaceOperation,
- op, None, (data_length + block_size - 1) / block_size, 'foo')
+ op, None, (data_length + block_size - 1) // block_size, 'foo')
# Fail, too few blocks to justify BZ.
op.src_extents = []
self.assertRaises(
PayloadError, payload_checker._CheckReplaceOperation,
+ op, data_length, (data_length + block_size - 1) // block_size, 'foo')
+
+ # Fail, total_dst_blocks is a floating point value.
+ op.src_extents = []
+ self.assertRaises(
+ PayloadError, payload_checker._CheckReplaceOperation,
op, data_length, (data_length + block_size - 1) / block_size, 'foo')
def testCheckReplaceXzOperation(self):
@@ -623,7 +662,7 @@
block_size = payload_checker.block_size
data_length = block_size * 3
- op = self.mox.CreateMock(
+ op = mock.create_autospec(
update_metadata_pb2.InstallOperation)
op.type = common.OpType.REPLACE_XZ
@@ -631,25 +670,32 @@
op.src_extents = []
self.assertIsNone(
payload_checker._CheckReplaceOperation(
- op, data_length, (data_length + block_size - 1) / block_size + 5,
+ op, data_length, (data_length + block_size - 1) // block_size + 5,
'foo'))
# Fail, src extents founds.
op.src_extents = ['bar']
self.assertRaises(
PayloadError, payload_checker._CheckReplaceOperation,
- op, data_length, (data_length + block_size - 1) / block_size + 5, 'foo')
+ op, data_length, (data_length + block_size - 1) // block_size + 5,
+ 'foo')
# Fail, missing data.
op.src_extents = []
self.assertRaises(
PayloadError, payload_checker._CheckReplaceOperation,
- op, None, (data_length + block_size - 1) / block_size, 'foo')
+ op, None, (data_length + block_size - 1) // block_size, 'foo')
# Fail, too few blocks to justify XZ.
op.src_extents = []
self.assertRaises(
PayloadError, payload_checker._CheckReplaceOperation,
+ op, data_length, (data_length + block_size - 1) // block_size, 'foo')
+
+ # Fail, total_dst_blocks is a floating point value.
+ op.src_extents = []
+ self.assertRaises(
+ PayloadError, payload_checker._CheckReplaceOperation,
op, data_length, (data_length + block_size - 1) / block_size, 'foo')
def testCheckAnyDiff(self):
@@ -724,9 +770,9 @@
old_part_size = test_utils.MiB(4)
new_part_size = test_utils.MiB(8)
old_block_counters = array.array(
- 'B', [0] * ((old_part_size + block_size - 1) / block_size))
+ 'B', [0] * ((old_part_size + block_size - 1) // block_size))
new_block_counters = array.array(
- 'B', [0] * ((new_part_size + block_size - 1) / block_size))
+ 'B', [0] * ((new_part_size + block_size - 1) // block_size))
prev_data_offset = 1876
blob_hash_counts = collections.defaultdict(int)
@@ -769,16 +815,14 @@
fake_data = 'fake-data'.ljust(op.data_length)
if not allow_unhashed and not fail_data_hash:
# Create a valid data blob hash.
- op.data_sha256_hash = hashlib.sha256(fake_data).digest()
- payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
- fake_data)
+ op.data_sha256_hash = hashlib.sha256(fake_data.encode('utf-8')).digest()
+ payload.ReadDataBlob.return_value = fake_data.encode('utf-8')
elif fail_data_hash:
# Create an invalid data blob hash.
op.data_sha256_hash = hashlib.sha256(
- fake_data.replace(' ', '-')).digest()
- payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
- fake_data)
+ fake_data.replace(' ', '-').encode('utf-8')).digest()
+ payload.ReadDataBlob.return_value = fake_data.encode('utf-8')
total_dst_blocks = 0
if not fail_missing_dst_extents:
@@ -807,7 +851,6 @@
payload_checker.minor_version <= 3):
op.dst_length = total_dst_blocks * block_size
- self.mox.ReplayAll()
should_fail = (fail_src_extents or fail_dst_extents or
fail_mismatched_data_offset_length or
fail_missing_dst_extents or fail_src_length or
@@ -857,7 +900,8 @@
rootfs_data_length -= block_size
payload_gen.AddOperation(common.ROOTFS, rootfs_op_type,
- dst_extents=[(0, rootfs_data_length / block_size)],
+ dst_extents=
+ [(0, rootfs_data_length // block_size)],
data_offset=0,
data_length=rootfs_data_length)
@@ -889,13 +933,13 @@
rootfs_part_size = test_utils.MiB(2)
kernel_part_size = test_utils.KiB(16)
payload_gen.SetPartInfo(common.ROOTFS, True, rootfs_part_size,
- hashlib.sha256('fake-new-rootfs-content').digest())
+ hashlib.sha256(b'fake-new-rootfs-content').digest())
payload_gen.SetPartInfo(common.KERNEL, True, kernel_part_size,
- hashlib.sha256('fake-new-kernel-content').digest())
+ hashlib.sha256(b'fake-new-kernel-content').digest())
payload_gen.SetMinorVersion(0)
payload_gen.AddOperationWithData(
common.ROOTFS, common.OpType.REPLACE,
- dst_extents=[(0, rootfs_part_size / block_size)],
+ dst_extents=[(0, rootfs_part_size // block_size)],
data_blob=os.urandom(rootfs_part_size))
do_forge_sigs_data = (fail_empty_sigs_blob or fail_sig_missing_fields or
@@ -908,7 +952,7 @@
if fail_sig_missing_fields:
sig_data = None
else:
- sig_data = test_utils.SignSha256('fake-payload-content',
+ sig_data = test_utils.SignSha256(b'fake-payload-content',
test_utils._PRIVKEY_FILE_NAME)
sigs_gen.AddSig(5 if fail_unknown_sig_version else 1, sig_data)
@@ -984,9 +1028,9 @@
kernel_filesystem_size = test_utils.KiB(16)
rootfs_filesystem_size = test_utils.MiB(2)
payload_gen.SetPartInfo(common.ROOTFS, True, rootfs_filesystem_size,
- hashlib.sha256('fake-new-rootfs-content').digest())
+ hashlib.sha256(b'fake-new-rootfs-content').digest())
payload_gen.SetPartInfo(common.KERNEL, True, kernel_filesystem_size,
- hashlib.sha256('fake-new-kernel-content').digest())
+ hashlib.sha256(b'fake-new-kernel-content').digest())
payload_gen.SetMinorVersion(0)
rootfs_part_size = 0
@@ -997,7 +1041,7 @@
rootfs_op_size += block_size
payload_gen.AddOperationWithData(
common.ROOTFS, common.OpType.REPLACE,
- dst_extents=[(0, rootfs_op_size / block_size)],
+ dst_extents=[(0, rootfs_op_size // block_size)],
data_blob=os.urandom(rootfs_op_size))
kernel_part_size = 0
@@ -1008,7 +1052,7 @@
kernel_op_size += block_size
payload_gen.AddOperationWithData(
common.KERNEL, common.OpType.REPLACE,
- dst_extents=[(0, kernel_op_size / block_size)],
+ dst_extents=[(0, kernel_op_size // block_size)],
data_blob=os.urandom(kernel_op_size))
# Generate payload (complete w/ signature) and create the test object.
@@ -1054,6 +1098,7 @@
else:
self.assertIsNone(payload_checker.Run(**kwargs2))
+
# This implements a generic API, hence the occasional unused args.
# pylint: disable=W0613
def ValidateCheckOperationTest(op_type_name, allow_unhashed,
@@ -1104,13 +1149,13 @@
(values) associated with them.
validate_func: A function used for validating test argument combinations.
"""
- for value_tuple in itertools.product(*arg_space.itervalues()):
- run_dargs = dict(zip(arg_space.iterkeys(), value_tuple))
+ for value_tuple in itertools.product(*iter(arg_space.values())):
+ run_dargs = dict(zip(iter(arg_space.keys()), value_tuple))
if validate_func and not validate_func(**run_dargs):
continue
run_method_name = 'Do%sTest' % tested_method_name
test_method_name = 'test%s' % tested_method_name
- for arg_key, arg_val in run_dargs.iteritems():
+ for arg_key, arg_val in run_dargs.items():
if arg_val or isinstance(arg_val, int):
test_method_name += '__%s=%s' % (arg_key, arg_val)
setattr(PayloadCheckerTest, test_method_name,