|  | #!/usr/bin/env python | 
|  | # | 
|  | # Copyright (C) 2015 The Android Open Source Project | 
|  | # | 
|  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | # you may not use this file except in compliance with the License. | 
|  | # You may obtain a copy of the License at | 
|  | # | 
|  | #      http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | # | 
|  |  | 
|  | """Unit testing payload_info.py.""" | 
|  |  | 
|  | # Disable check for function names to avoid errors based on old code | 
|  | # pylint: disable-msg=invalid-name | 
|  |  | 
|  | from __future__ import absolute_import | 
|  | from __future__ import print_function | 
|  |  | 
|  | import sys | 
|  | import unittest | 
|  |  | 
|  | from contextlib import contextmanager | 
|  |  | 
|  | from six.moves import StringIO | 
|  |  | 
|  | import mock  # pylint: disable=import-error | 
|  |  | 
|  | import payload_info | 
|  | import update_payload | 
|  |  | 
|  | from update_payload import update_metadata_pb2 | 
|  |  | 
|  |  | 
|  | class FakePayloadError(Exception): | 
|  | """A generic error when using the FakePayload.""" | 
|  |  | 
|  |  | 
|  | class FakeOption(object): | 
|  | """Fake options object for testing.""" | 
|  |  | 
|  | def __init__(self, **kwargs): | 
|  | self.list_ops = False | 
|  | self.stats = False | 
|  | self.signatures = False | 
|  | for key, val in kwargs.items(): | 
|  | setattr(self, key, val) | 
|  | if not hasattr(self, 'payload_file'): | 
|  | self.payload_file = None | 
|  |  | 
|  |  | 
|  | class FakeOp(object): | 
|  | """Fake manifest operation for testing.""" | 
|  |  | 
|  | def __init__(self, src_extents, dst_extents, op_type, **kwargs): | 
|  | self.src_extents = src_extents | 
|  | self.dst_extents = dst_extents | 
|  | self.type = op_type | 
|  | for key, val in kwargs.items(): | 
|  | setattr(self, key, val) | 
|  |  | 
|  | def HasField(self, field): | 
|  | return hasattr(self, field) | 
|  |  | 
|  |  | 
|  | class FakeExtent(object): | 
|  | """Fake Extent for testing.""" | 
|  | def __init__(self, start_block, num_blocks): | 
|  | self.start_block = start_block | 
|  | self.num_blocks = num_blocks | 
|  |  | 
|  |  | 
|  | class FakePartitionInfo(object): | 
|  | """Fake PartitionInfo for testing.""" | 
|  | def __init__(self, size): | 
|  | self.size = size | 
|  |  | 
|  |  | 
|  | class FakePartition(object): | 
|  | """Fake PartitionUpdate field for testing.""" | 
|  |  | 
|  | def __init__(self, partition_name, operations, old_size, new_size): | 
|  | self.partition_name = partition_name | 
|  | self.operations = operations | 
|  | self.old_partition_info = FakePartitionInfo(old_size) | 
|  | self.new_partition_info = FakePartitionInfo(new_size) | 
|  |  | 
|  |  | 
|  | class FakeManifest(object): | 
|  | """Fake manifest for testing.""" | 
|  |  | 
|  | def __init__(self): | 
|  | self.partitions = [ | 
|  | FakePartition(update_payload.common.ROOTFS, | 
|  | [FakeOp([], [FakeExtent(1, 1), FakeExtent(2, 2)], | 
|  | update_payload.common.OpType.REPLACE_BZ, | 
|  | dst_length=3*4096, | 
|  | data_offset=1, | 
|  | data_length=1) | 
|  | ], 1 * 4096, 3 * 4096), | 
|  | FakePartition(update_payload.common.KERNEL, | 
|  | [FakeOp([FakeExtent(1, 1)], | 
|  | [FakeExtent(x, x) for x in range(20)], | 
|  | update_payload.common.OpType.SOURCE_COPY, | 
|  | src_length=4096) | 
|  | ], 2 * 4096, 4 * 4096), | 
|  | ] | 
|  | self.block_size = 4096 | 
|  | self.minor_version = 4 | 
|  | self.signatures_offset = None | 
|  | self.signatures_size = None | 
|  |  | 
|  | def HasField(self, field_name): | 
|  | """Fake HasField method based on the python members.""" | 
|  | return hasattr(self, field_name) and getattr(self, field_name) is not None | 
|  |  | 
|  |  | 
|  | class FakeHeader(object): | 
|  | """Fake payload header for testing.""" | 
|  |  | 
|  | def __init__(self, manifest_len, metadata_signature_len): | 
|  | self.version = payload_info.MAJOR_PAYLOAD_VERSION_BRILLO | 
|  | self.manifest_len = manifest_len | 
|  | self.metadata_signature_len = metadata_signature_len | 
|  |  | 
|  | @property | 
|  | def size(self): | 
|  | return 24 | 
|  |  | 
|  |  | 
|  | class FakePayload(object): | 
|  | """Fake payload for testing.""" | 
|  |  | 
|  | def __init__(self): | 
|  | self._header = FakeHeader(222, 0) | 
|  | self.header = None | 
|  | self._manifest = FakeManifest() | 
|  | self.manifest = None | 
|  |  | 
|  | self._blobs = {} | 
|  | self._payload_signatures = update_metadata_pb2.Signatures() | 
|  | self._metadata_signatures = update_metadata_pb2.Signatures() | 
|  |  | 
|  | def Init(self): | 
|  | """Fake Init that sets header and manifest. | 
|  |  | 
|  | Failing to call Init() will not make header and manifest available to the | 
|  | test. | 
|  | """ | 
|  | self.header = self._header | 
|  | self.manifest = self._manifest | 
|  |  | 
|  | def ReadDataBlob(self, offset, length): | 
|  | """Return the blob that should be present at the offset location""" | 
|  | if not offset in self._blobs: | 
|  | raise FakePayloadError('Requested blob at unknown offset %d' % offset) | 
|  | blob = self._blobs[offset] | 
|  | if len(blob) != length: | 
|  | raise FakePayloadError('Read blob with the wrong length (expect: %d, ' | 
|  | 'actual: %d)' % (len(blob), length)) | 
|  | return blob | 
|  |  | 
|  | @staticmethod | 
|  | def _AddSignatureToProto(proto, **kwargs): | 
|  | """Add a new Signature element to the passed proto.""" | 
|  | new_signature = proto.signatures.add() | 
|  | for key, val in kwargs.items(): | 
|  | setattr(new_signature, key, val) | 
|  |  | 
|  | def AddPayloadSignature(self, **kwargs): | 
|  | self._AddSignatureToProto(self._payload_signatures, **kwargs) | 
|  | blob = self._payload_signatures.SerializeToString() | 
|  | self._manifest.signatures_offset = 1234 | 
|  | self._manifest.signatures_size = len(blob) | 
|  | self._blobs[self._manifest.signatures_offset] = blob | 
|  |  | 
|  | def AddMetadataSignature(self, **kwargs): | 
|  | self._AddSignatureToProto(self._metadata_signatures, **kwargs) | 
|  | if self._header.metadata_signature_len: | 
|  | del self._blobs[-self._header.metadata_signature_len] | 
|  | blob = self._metadata_signatures.SerializeToString() | 
|  | self._header.metadata_signature_len = len(blob) | 
|  | self._blobs[-len(blob)] = blob | 
|  |  | 
|  |  | 
|  | class PayloadCommandTest(unittest.TestCase): | 
|  | """Test class for our PayloadCommand class.""" | 
|  |  | 
|  | @contextmanager | 
|  | def OutputCapturer(self): | 
|  | """A tool for capturing the sys.stdout""" | 
|  | stdout = sys.stdout | 
|  | try: | 
|  | sys.stdout = StringIO() | 
|  | yield sys.stdout | 
|  | finally: | 
|  | sys.stdout = stdout | 
|  |  | 
|  | def TestCommand(self, payload_cmd, payload, expected_out): | 
|  | """A tool for testing a payload command. | 
|  |  | 
|  | It tests that a payload command which runs with a given payload produces a | 
|  | correct output. | 
|  | """ | 
|  | with mock.patch.object(update_payload, 'Payload', return_value=payload), \ | 
|  | self.OutputCapturer() as output: | 
|  | payload_cmd.Run() | 
|  | self.assertEqual(output.getvalue(), expected_out) | 
|  |  | 
|  | def testDisplayValue(self): | 
|  | """Verify that DisplayValue prints what we expect.""" | 
|  | with self.OutputCapturer() as output: | 
|  | payload_info.DisplayValue('key', 'value') | 
|  | self.assertEqual(output.getvalue(), 'key:                         value\n') | 
|  |  | 
|  | def testRun(self): | 
|  | """Verify that Run parses and displays the payload like we expect.""" | 
|  | payload_cmd = payload_info.PayloadCommand(FakeOption(action='show')) | 
|  | payload = FakePayload() | 
|  | expected_out = """Payload version:             2 | 
|  | Manifest length:             222 | 
|  | Number of partitions:        2 | 
|  | Number of "root" ops:      1 | 
|  | Number of "kernel" ops:    1 | 
|  | Block size:                  4096 | 
|  | Minor version:               4 | 
|  | """ | 
|  | self.TestCommand(payload_cmd, payload, expected_out) | 
|  |  | 
|  | def testListOpsOnVersion2(self): | 
|  | """Verify that the --list_ops option gives the correct output.""" | 
|  | payload_cmd = payload_info.PayloadCommand( | 
|  | FakeOption(list_ops=True, action='show')) | 
|  | payload = FakePayload() | 
|  | expected_out = """Payload version:             2 | 
|  | Manifest length:             222 | 
|  | Number of partitions:        2 | 
|  | Number of "root" ops:      1 | 
|  | Number of "kernel" ops:    1 | 
|  | Block size:                  4096 | 
|  | Minor version:               4 | 
|  |  | 
|  | root install operations: | 
|  | 0: REPLACE_BZ | 
|  | Data offset: 1 | 
|  | Data length: 1 | 
|  | Destination: 2 extents (3 blocks) | 
|  | (1,1) (2,2) | 
|  | kernel install operations: | 
|  | 0: SOURCE_COPY | 
|  | Source: 1 extent (1 block) | 
|  | (1,1) | 
|  | Destination: 20 extents (190 blocks) | 
|  | (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10) | 
|  | (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19) | 
|  | """ | 
|  | self.TestCommand(payload_cmd, payload, expected_out) | 
|  |  | 
|  | def testStatsOnVersion2(self): | 
|  | """Verify that the --stats option works correctly on version 2.""" | 
|  | payload_cmd = payload_info.PayloadCommand( | 
|  | FakeOption(stats=True, action='show')) | 
|  | payload = FakePayload() | 
|  | expected_out = """Payload version:             2 | 
|  | Manifest length:             222 | 
|  | Number of partitions:        2 | 
|  | Number of "root" ops:      1 | 
|  | Number of "kernel" ops:    1 | 
|  | Block size:                  4096 | 
|  | Minor version:               4 | 
|  | Blocks read:                 11 | 
|  | Blocks written:              193 | 
|  | Seeks when writing:          18 | 
|  | """ | 
|  | self.TestCommand(payload_cmd, payload, expected_out) | 
|  |  | 
|  | def testEmptySignatures(self): | 
|  | """Verify that the --signatures option works with unsigned payloads.""" | 
|  | payload_cmd = payload_info.PayloadCommand( | 
|  | FakeOption(action='show', signatures=True)) | 
|  | payload = FakePayload() | 
|  | expected_out = """Payload version:             2 | 
|  | Manifest length:             222 | 
|  | Number of partitions:        2 | 
|  | Number of "root" ops:      1 | 
|  | Number of "kernel" ops:    1 | 
|  | Block size:                  4096 | 
|  | Minor version:               4 | 
|  | No metadata signatures stored in the payload | 
|  | No payload signatures stored in the payload | 
|  | """ | 
|  | self.TestCommand(payload_cmd, payload, expected_out) | 
|  |  | 
|  | def testSignatures(self): | 
|  | """Verify that the --signatures option shows the present signatures.""" | 
|  | payload_cmd = payload_info.PayloadCommand( | 
|  | FakeOption(action='show', signatures=True)) | 
|  | payload = FakePayload() | 
|  | payload.AddPayloadSignature(version=1, | 
|  | data=b'12345678abcdefgh\x00\x01\x02\x03') | 
|  | payload.AddPayloadSignature(data=b'I am a signature so access is yes.') | 
|  | payload.AddMetadataSignature(data=b'\x00\x0a\x0c') | 
|  | expected_out = """Payload version:             2 | 
|  | Manifest length:             222 | 
|  | Number of partitions:        2 | 
|  | Number of "root" ops:      1 | 
|  | Number of "kernel" ops:    1 | 
|  | Block size:                  4096 | 
|  | Minor version:               4 | 
|  | Metadata signatures blob:    file_offset=246 (7 bytes) | 
|  | Metadata signatures: (1 entries) | 
|  | version=None, hex_data: (3 bytes) | 
|  | 00 0a 0c                                        | ... | 
|  | Payload signatures blob:     blob_offset=1234 (64 bytes) | 
|  | Payload signatures: (2 entries) | 
|  | version=1, hex_data: (20 bytes) | 
|  | 31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh | 
|  | 00 01 02 03                                     | .... | 
|  | version=None, hex_data: (34 bytes) | 
|  | 49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature | 
|  | 20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 |  so access is ye | 
|  | 73 2e                                           | s. | 
|  | """ | 
|  | self.TestCommand(payload_cmd, payload, expected_out) | 
|  |  | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | unittest.main() |