paycheck: fixes to checker module (readability review)
BUG=None
TEST=Passes unit tests
TEST=Passes integration tests
Change-Id: Ifd502af9d2755b2c23805cd03857ebbf0e633732
Reviewed-on: https://chromium-review.googlesource.com/59752
Reviewed-by: Gilad Arnold <garnold@chromium.org>
Tested-by: Gilad Arnold <garnold@chromium.org>
Reviewed-by: Chris Sosa <sosa@chromium.org>
Commit-Queue: Gilad Arnold <garnold@chromium.org>
diff --git a/scripts/update_payload/checker_unittest.py b/scripts/update_payload/checker_unittest.py
index 4b23bd8..2c68637 100755
--- a/scripts/update_payload/checker_unittest.py
+++ b/scripts/update_payload/checker_unittest.py
@@ -14,13 +14,13 @@
import os
import unittest
-# Pylint cannot find mox.
+# pylint cannot find mox.
# pylint: disable=F0401
import mox
import checker
import common
-import payload as update_payload # avoid name conflicts later.
+import payload as update_payload # Avoid name conflicts later.
import test_utils
import update_metadata_pb2
@@ -61,11 +61,11 @@
return checker.PayloadChecker(payload)
-# (i) this class doesn't need an __init__(); (ii) unit testing is all about
-# running protected methods; (iii) don't bark about missing members of classes
-# you cannot import.
+# This class doesn't need an __init__().
# pylint: disable=W0232
+# Unit testing is all about running protected methods.
# pylint: disable=W0212
+# Don't bark about missing members of classes you cannot import.
# pylint: disable=E1101
class PayloadCheckerTest(mox.MoxTestBase):
"""Tests the PayloadChecker class.
@@ -78,7 +78,6 @@
individual invocation contexts for each, which are then bound as
testBar__param1=val1__param2=val2(). The enumeration of parameter spaces for
all such tests is done in AddAllParametricTests().
-
"""
def MockPayload(self):
@@ -97,11 +96,10 @@
its default state.
Args:
- start_block: the starting block of the extent
- num_blocks: the number of blocks in the extent
+ start_block: The starting block of the extent.
+ num_blocks: The number of blocks in the extent.
Returns:
An Extent message.
-
"""
ex = update_metadata_pb2.Extent()
if start_block >= 0:
@@ -115,10 +113,9 @@
"""Returns an list of extents.
Args:
- *args: (start_block, num_blocks) pairs defining the extents
+ *args: (start_block, num_blocks) pairs defining the extents.
Returns:
A list of Extent objects.
-
"""
ex_list = []
for start_block, num_blocks in args:
@@ -131,6 +128,8 @@
new_field = repeated_field.add()
new_field.CopyFrom(field_val)
+ # The production environment uses an older Python, this isn't an override.
+ # pylint: disable=W0221
def assertIsNone(self, val):
"""Asserts that val is None (TODO remove once we upgrade to Python 2.7).
@@ -138,28 +137,26 @@
non-None value.
Args:
- val: value/object to be equated to None
-
+ val: Value/object to be equated to None.
"""
- self.assertEqual(val, None)
+ self.assertIs(None, val)
def SetupAddElemTest(self, is_present, is_submsg, convert=str,
linebreak=False, indent=0):
"""Setup for testing of _CheckElem() and its derivatives.
Args:
- is_present: whether or not the element is found in the message
- is_submsg: whether the element is a sub-message itself
- convert: a representation conversion function
- linebreak: whether or not a linebreak is to be used in the report
- indent: indentation used for the report
+ is_present: Whether or not the element is found in the message.
+ is_submsg: Whether the element is a sub-message itself.
+ convert: A representation conversion function.
+ linebreak: Whether or not a linebreak is to be used in the report.
+ indent: Indentation used for the report.
Returns:
- msg: a mock message object
- report: a mock report object
- subreport: a mock sub-report object
- name: an element name to check
- val: expected element value
-
+ msg: A mock message object.
+ report: A mock report object.
+ subreport: A mock sub-report object.
+ name: An element name to check.
+ val: Expected element value.
"""
name = 'foo'
val = 'fake submsg' if is_submsg else 'fake field'
@@ -186,86 +183,83 @@
"""Parametric testing of _CheckElem().
Args:
- is_present: whether or not the element is found in the message
- is_mandatory: whether or not it's a mandatory element
- is_submsg: whether the element is a sub-message itself
- convert: a representation conversion function
- linebreak: whether or not a linebreak is to be used in the report
- indent: indentation used for the report
-
+ is_present: Whether or not the element is found in the message.
+ is_mandatory: Whether or not it's a mandatory element.
+ is_submsg: Whether the element is a sub-message itself.
+ convert: A representation conversion function.
+ linebreak: Whether or not a linebreak is to be used in the report.
+ indent: Indentation used for the report.
"""
msg, report, subreport, name, val = self.SetupAddElemTest(
is_present, is_submsg, convert, linebreak, indent)
- largs = [msg, name, report, is_mandatory, is_submsg]
- dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
+ args = (msg, name, report, is_mandatory, is_submsg)
+ kwargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
if is_mandatory and not is_present:
self.assertRaises(update_payload.PayloadError,
- checker.PayloadChecker._CheckElem, *largs, **dargs)
+ checker.PayloadChecker._CheckElem, *args, **kwargs)
else:
- ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*largs,
- **dargs)
- self.assertEquals(ret_val, val if is_present else None)
- self.assertEquals(ret_subreport,
- subreport if is_present and is_submsg else None)
+ ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*args,
+ **kwargs)
+ self.assertEquals(val if is_present else None, ret_val)
+ self.assertEquals(subreport if is_present and is_submsg else None,
+ ret_subreport)
def DoAddFieldTest(self, is_mandatory, is_present, convert, linebreak,
indent):
"""Parametric testing of _Check{Mandatory,Optional}Field().
Args:
- is_mandatory: whether we're testing a mandatory call
- is_present: whether or not the element is found in the message
- convert: a representation conversion function
- linebreak: whether or not a linebreak is to be used in the report
- indent: indentation used for the report
-
+ is_mandatory: Whether we're testing a mandatory call.
+ is_present: Whether or not the element is found in the message.
+ convert: A representation conversion function.
+ linebreak: Whether or not a linebreak is to be used in the report.
+ indent: Indentation used for the report.
"""
msg, report, _, name, val = self.SetupAddElemTest(
is_present, False, convert, linebreak, indent)
# Prepare for invocation of the tested method.
- largs = [msg, name, report]
- dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
+ args = [msg, name, report]
+ kwargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
if is_mandatory:
- largs.append('bar')
+ args.append('bar')
tested_func = checker.PayloadChecker._CheckMandatoryField
else:
tested_func = checker.PayloadChecker._CheckOptionalField
# Test the method call.
if is_mandatory and not is_present:
- self.assertRaises(update_payload.PayloadError, tested_func, *largs,
- **dargs)
+ self.assertRaises(update_payload.PayloadError, tested_func, *args,
+ **kwargs)
else:
- ret_val = tested_func(*largs, **dargs)
- self.assertEquals(ret_val, val if is_present else None)
+ ret_val = tested_func(*args, **kwargs)
+ self.assertEquals(val if is_present else None, ret_val)
def DoAddSubMsgTest(self, is_mandatory, is_present):
"""Parametrized testing of _Check{Mandatory,Optional}SubMsg().
Args:
- is_mandatory: whether we're testing a mandatory call
- is_present: whether or not the element is found in the message
-
+ is_mandatory: Whether we're testing a mandatory call.
+ is_present: Whether or not the element is found in the message.
"""
msg, report, subreport, name, val = self.SetupAddElemTest(is_present, True)
# Prepare for invocation of the tested method.
- largs = [msg, name, report]
+ args = [msg, name, report]
if is_mandatory:
- largs.append('bar')
+ args.append('bar')
tested_func = checker.PayloadChecker._CheckMandatorySubMsg
else:
tested_func = checker.PayloadChecker._CheckOptionalSubMsg
# Test the method call.
if is_mandatory and not is_present:
- self.assertRaises(update_payload.PayloadError, tested_func, *largs)
+ self.assertRaises(update_payload.PayloadError, tested_func, *args)
else:
- ret_val, ret_subreport = tested_func(*largs)
- self.assertEquals(ret_val, val if is_present else None)
- self.assertEquals(ret_subreport, subreport if is_present else None)
+ ret_val, ret_subreport = tested_func(*args)
+ self.assertEquals(val if is_present else None, ret_val)
+ self.assertEquals(subreport if is_present else None, ret_subreport)
def testCheckPresentIff(self):
"""Tests _CheckPresentIff()."""
@@ -286,30 +280,31 @@
"""Parametric testing of _CheckSha256SignatureTest().
Args:
- expect_pass: whether or not it should pass
- expect_subprocess_call: whether to expect the openssl call to happen
- sig_data: the signature raw data
- sig_asn1_header: the ASN1 header
- returned_signed_hash: the signed hash data retuned by openssl
- expected_signed_hash: the signed hash data to compare against
-
+ expect_pass: Whether or not it should pass.
+ expect_subprocess_call: Whether to expect the openssl call to happen.
+ sig_data: The signature raw data.
+ sig_asn1_header: The ASN1 header.
+ returned_signed_hash: The signed hash data retuned by openssl.
+ expected_signed_hash: The signed hash data to compare against.
"""
- # Stub out the subprocess invocation.
- self.mox.StubOutWithMock(checker.PayloadChecker, '_Run')
- if expect_subprocess_call:
- checker.PayloadChecker._Run(mox.IsA(list), send_data=sig_data).AndReturn(
- (sig_asn1_header + returned_signed_hash, None))
+ try:
+ # Stub out the subprocess invocation.
+ self.mox.StubOutWithMock(checker.PayloadChecker, '_Run')
+ if expect_subprocess_call:
+ checker.PayloadChecker._Run(
+ mox.IsA(list), send_data=sig_data).AndReturn(
+ (sig_asn1_header + returned_signed_hash, None))
- self.mox.ReplayAll()
- if expect_pass:
- self.assertIsNone(checker.PayloadChecker._CheckSha256Signature(
- sig_data, 'foo', expected_signed_hash, 'bar'))
- else:
- self.assertRaises(update_payload.PayloadError,
- checker.PayloadChecker._CheckSha256Signature,
- sig_data, 'foo', expected_signed_hash, 'bar')
-
- self.mox.UnsetStubs()
+ self.mox.ReplayAll()
+ if expect_pass:
+ self.assertIsNone(checker.PayloadChecker._CheckSha256Signature(
+ sig_data, 'foo', expected_signed_hash, 'bar'))
+ else:
+ self.assertRaises(update_payload.PayloadError,
+ checker.PayloadChecker._CheckSha256Signature,
+ sig_data, 'foo', expected_signed_hash, 'bar')
+ finally:
+ self.mox.UnsetStubs()
def testCheckSha256Signature_Pass(self):
"""Tests _CheckSha256Signature(); pass case."""
@@ -321,7 +316,7 @@
def testCheckSha256Signature_FailBadSignature(self):
"""Tests _CheckSha256Signature(); fails due to malformed signature."""
- sig_data = 'fake-signature' # malformed (not 256 bytes in length)
+ sig_data = 'fake-signature' # Malformed (not 256 bytes in length).
signed_hash = hashlib.sha256('fake-data').digest()
self.DoCheckSha256SignatureTest(False, False, sig_data,
common.SIG_ASN1_HEADER, signed_hash,
@@ -330,7 +325,7 @@
def testCheckSha256Signature_FailBadOutputLength(self):
"""Tests _CheckSha256Signature(); fails due to unexpected output length."""
sig_data = 'fake-signature'.ljust(256)
- signed_hash = 'fake-hash' # malformed (not 32 bytes in length)
+ signed_hash = 'fake-hash' # Malformed (not 32 bytes in length).
self.DoCheckSha256SignatureTest(False, True, sig_data,
common.SIG_ASN1_HEADER, signed_hash,
signed_hash)
@@ -401,19 +396,18 @@
"""Parametric testing of _CheckManifest().
Args:
- fail_mismatched_block_size: simulate a missing block_size field
- fail_bad_sigs: make signatures descriptor inconsistent
- fail_mismatched_oki_ori: make old rootfs/kernel info partially present
- fail_bad_oki: tamper with old kernel info
- fail_bad_ori: tamper with old rootfs info
- fail_bad_nki: tamper with new kernel info
- fail_bad_nri: tamper with new rootfs info
- fail_missing_ops: simulate a manifest without any operations
- fail_old_kernel_fs_size: make old kernel fs size too big
- fail_old_rootfs_fs_size: make old rootfs fs size too big
- fail_new_kernel_fs_size: make new kernel fs size too big
- fail_new_rootfs_fs_size: make new rootfs fs size too big
-
+ fail_mismatched_block_size: Simulate a missing block_size field.
+ fail_bad_sigs: Make signatures descriptor inconsistent.
+ fail_mismatched_oki_ori: Make old rootfs/kernel info partially present.
+ fail_bad_oki: Tamper with old kernel info.
+ fail_bad_ori: Tamper with old rootfs info.
+ fail_bad_nki: Tamper with new kernel info.
+ fail_bad_nri: Tamper with new rootfs info.
+ fail_missing_ops: Simulate a manifest without any operations.
+ fail_old_kernel_fs_size: Make old kernel fs size too big.
+ fail_old_rootfs_fs_size: Make old rootfs fs size too big.
+ fail_new_kernel_fs_size: Make new kernel fs size too big.
+ fail_new_rootfs_fs_size: Make new rootfs fs size too big.
"""
# Generate a test payload. For this test, we only care about the manifest
# and don't need any data blobs, hence we can use a plain paylaod generator
@@ -515,26 +509,26 @@
# Passes w/ all real extents.
extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
self.assertEquals(
+ 23,
payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
- collections.defaultdict(int), 'foo'),
- 23)
+ collections.defaultdict(int), 'foo'))
# Passes w/ pseudo-extents (aka sparse holes).
extents = self.NewExtentList((0, 4), (common.PSEUDO_EXTENT_MARKER, 5),
(8, 3))
self.assertEquals(
+ 12,
payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
collections.defaultdict(int), 'foo',
- allow_pseudo=True),
- 12)
+ allow_pseudo=True))
# Passes w/ pseudo-extent due to a signature.
extents = self.NewExtentList((common.PSEUDO_EXTENT_MARKER, 2))
self.assertEquals(
+ 2,
payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
collections.defaultdict(int), 'foo',
- allow_signature=True),
- 2)
+ allow_signature=True))
# Fails, extent missing a start block.
extents = self.NewExtentList((-1, 4), (8, 3), (1024, 16))
@@ -780,19 +774,19 @@
"""Parametric testing of _CheckOperation().
Args:
- op_type_name: 'REPLACE', 'REPLACE_BZ', 'MOVE' or 'BSDIFF'
- is_last: whether we're testing the last operation in a sequence
- allow_signature: whether we're testing a signature-capable operation
- allow_unhashed: whether we're allowing to not hash the data
- fail_src_extents: tamper with src extents
- fail_dst_extents: tamper with dst extents
- fail_mismatched_data_offset_length: make data_{offset,length} inconsistent
- fail_missing_dst_extents: do not include dst extents
- fail_src_length: make src length inconsistent
- fail_dst_length: make dst length inconsistent
- fail_data_hash: tamper with the data blob hash
- fail_prev_data_offset: make data space uses incontiguous
-
+ op_type_name: 'REPLACE', 'REPLACE_BZ', 'MOVE' or 'BSDIFF'.
+ is_last: Whether we're testing the last operation in a sequence.
+ allow_signature: Whether we're testing a signature-capable operation.
+ allow_unhashed: Whether we're allowing to not hash the data.
+ fail_src_extents: Tamper with src extents.
+ fail_dst_extents: Tamper with dst extents.
+ fail_mismatched_data_offset_length: Make data_{offset,length}
+ inconsistent.
+ fail_missing_dst_extents: Do not include dst extents.
+ fail_src_length: Make src length inconsistent.
+ fail_dst_length: Make dst length inconsistent.
+ fail_data_hash: Tamper with the data blob hash.
+ fail_prev_data_offset: Make data space uses incontiguous.
"""
op_type = _OpTypeByName(op_type_name)
@@ -879,15 +873,15 @@
fail_mismatched_data_offset_length or
fail_missing_dst_extents or fail_src_length or
fail_dst_length or fail_data_hash or fail_prev_data_offset)
- largs = [op, 'foo', is_last, old_block_counters, new_block_counters,
- old_part_size, new_part_size, prev_data_offset, allow_signature,
- blob_hash_counts]
+ args = (op, 'foo', is_last, old_block_counters, new_block_counters,
+ old_part_size, new_part_size, prev_data_offset, allow_signature,
+ blob_hash_counts)
if should_fail:
self.assertRaises(update_payload.PayloadError,
- payload_checker._CheckOperation, *largs)
+ payload_checker._CheckOperation, *args)
else:
- self.assertEqual(payload_checker._CheckOperation(*largs),
- op.data_length if op.HasField('data_length') else 0)
+ self.assertEqual(op.data_length if op.HasField('data_length') else 0,
+ payload_checker._CheckOperation(*args))
def testAllocBlockCounters(self):
"""Tests _CheckMoveOperation()."""
@@ -896,14 +890,14 @@
# Check allocation for block-aligned partition size, ensure it's integers.
result = payload_checker._AllocBlockCounters(16 * block_size)
- self.assertEqual(len(result), 16)
- self.assertEqual(type(result[0]), int)
+ self.assertEqual(16, len(result))
+ self.assertEqual(int, type(result[0]))
# Check allocation of unaligned partition sizes.
result = payload_checker._AllocBlockCounters(16 * block_size - 1)
- self.assertEqual(len(result), 16)
+ self.assertEqual(16, len(result))
result = payload_checker._AllocBlockCounters(16 * block_size + 1)
- self.assertEqual(len(result), 17)
+ self.assertEqual(17, len(result))
def DoCheckOperationsTest(self, fail_bad_type,
fail_nonexhaustive_full_update):
@@ -942,14 +936,14 @@
report = checker._PayloadReport()
should_fail = (fail_bad_type or fail_nonexhaustive_full_update)
- largs = (payload_checker.payload.manifest.install_operations, report,
- 'foo', 0, rootfs_part_size, rootfs_part_size, 0, False)
+ args = (payload_checker.payload.manifest.install_operations, report,
+ 'foo', 0, rootfs_part_size, rootfs_part_size, 0, False)
if should_fail:
self.assertRaises(update_payload.PayloadError,
- payload_checker._CheckOperations, *largs)
+ payload_checker._CheckOperations, *args)
else:
- self.assertEqual(payload_checker._CheckOperations(*largs),
- rootfs_data_length)
+ self.assertEqual(rootfs_data_length,
+ payload_checker._CheckOperations(*args))
def DoCheckSignaturesTest(self, fail_empty_sigs_blob, fail_missing_pseudo_op,
fail_mismatched_pseudo_op, fail_sig_missing_fields,
@@ -1015,12 +1009,12 @@
should_fail = (fail_empty_sigs_blob or fail_missing_pseudo_op or
fail_mismatched_pseudo_op or fail_sig_missing_fields or
fail_unknown_sig_version or fail_incorrect_sig)
- largs = (report, test_utils._PUBKEY_FILE_NAME)
+ args = (report, test_utils._PUBKEY_FILE_NAME)
if should_fail:
self.assertRaises(update_payload.PayloadError,
- payload_checker._CheckSignatures, *largs)
+ payload_checker._CheckSignatures, *args)
else:
- self.assertIsNone(payload_checker._CheckSignatures(*largs))
+ self.assertIsNone(payload_checker._CheckSignatures(*args))
def DoRunTest(self, fail_wrong_payload_type, fail_invalid_block_size,
fail_mismatched_block_size, fail_excess_data):
@@ -1050,13 +1044,13 @@
# Generate payload (complete w/ signature) and create the test object.
if fail_invalid_block_size:
- use_block_size = block_size + 5 # not a power of two
+ use_block_size = block_size + 5 # Not a power of two.
elif fail_mismatched_block_size:
- use_block_size = block_size * 2 # different that payload stated
+ use_block_size = block_size * 2 # Different that payload stated.
else:
use_block_size = block_size
- dargs = {
+ kwargs = {
'payload_gen_dargs': {
'privkey_file_name': test_utils._PRIVKEY_FILE_NAME,
'do_add_pseudo_operation': True,
@@ -1067,18 +1061,18 @@
'block_size': use_block_size}}
if fail_invalid_block_size:
self.assertRaises(update_payload.PayloadError, _GetPayloadChecker,
- payload_gen.WriteToFileWithData, **dargs)
+ payload_gen.WriteToFileWithData, **kwargs)
else:
payload_checker = _GetPayloadChecker(payload_gen.WriteToFileWithData,
- **dargs)
- dargs = {'pubkey_file_name': test_utils._PUBKEY_FILE_NAME}
+ **kwargs)
+ kwargs = {'pubkey_file_name': test_utils._PUBKEY_FILE_NAME}
should_fail = (fail_wrong_payload_type or fail_mismatched_block_size or
fail_excess_data)
if should_fail:
- self.assertRaises(update_payload.PayloadError,
- payload_checker.Run, **dargs)
+ self.assertRaises(update_payload.PayloadError, payload_checker.Run,
+ **kwargs)
else:
- self.assertIsNone(payload_checker.Run(**dargs))
+ self.assertIsNone(payload_checker.Run(**kwargs))
# This implements a generic API, hence the occasional unused args.
@@ -1123,11 +1117,10 @@
usual setUp/tearDown mechanics.
Args:
- tested_method_name: name of the tested PayloadChecker method
- arg_space: a dictionary containing variables (keys) and lists of values
- (values) associated with them
- validate_func: a function used for validating test argument combinations
-
+ tested_method_name: Name of the tested PayloadChecker method.
+ arg_space: A dictionary containing variables (keys) and lists of values
+ (values) associated with them.
+ validate_func: A function used for validating test argument combinations.
"""
for value_tuple in itertools.product(*arg_space.itervalues()):
run_dargs = dict(zip(arg_space.iterkeys(), value_tuple))