| #!/usr/bin/env python | 
 | # | 
 | # Copyright (C) 2015 The Android Open Source Project | 
 | # | 
 | # Licensed under the Apache License, Version 2.0 (the "License"); | 
 | # you may not use this file except in compliance with the License. | 
 | # You may obtain a copy of the License at | 
 | # | 
 | #      http://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, software | 
 | # distributed under the License is distributed on an "AS IS" BASIS, | 
 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | # See the License for the specific language governing permissions and | 
 | # limitations under the License. | 
 | # | 
 |  | 
 | """Unit testing payload_info.py.""" | 
 |  | 
 | # Disable check for function names to avoid errors based on old code | 
 | # pylint: disable-msg=invalid-name | 
 |  | 
 | from __future__ import absolute_import | 
 | from __future__ import print_function | 
 |  | 
 | import sys | 
 | import unittest | 
 |  | 
 | from contextlib import contextmanager | 
 |  | 
 | from six.moves import StringIO | 
 |  | 
 | import mock  # pylint: disable=import-error | 
 |  | 
 | import payload_info | 
 | import update_payload | 
 |  | 
 | from update_payload import update_metadata_pb2 | 
 |  | 
 |  | 
 | class FakePayloadError(Exception): | 
 |   """A generic error when using the FakePayload.""" | 
 |  | 
 |  | 
 | class FakeOption(object): | 
 |   """Fake options object for testing.""" | 
 |  | 
 |   def __init__(self, **kwargs): | 
 |     self.list_ops = False | 
 |     self.stats = False | 
 |     self.signatures = False | 
 |     for key, val in kwargs.items(): | 
 |       setattr(self, key, val) | 
 |     if not hasattr(self, 'payload_file'): | 
 |       self.payload_file = None | 
 |  | 
 |  | 
 | class FakeOp(object): | 
 |   """Fake manifest operation for testing.""" | 
 |  | 
 |   def __init__(self, src_extents, dst_extents, op_type, **kwargs): | 
 |     self.src_extents = src_extents | 
 |     self.dst_extents = dst_extents | 
 |     self.type = op_type | 
 |     for key, val in kwargs.items(): | 
 |       setattr(self, key, val) | 
 |  | 
 |   def HasField(self, field): | 
 |     return hasattr(self, field) | 
 |  | 
 |  | 
 | class FakeExtent(object): | 
 |   """Fake Extent for testing.""" | 
 |   def __init__(self, start_block, num_blocks): | 
 |     self.start_block = start_block | 
 |     self.num_blocks = num_blocks | 
 |  | 
 |  | 
 | class FakePartitionInfo(object): | 
 |   """Fake PartitionInfo for testing.""" | 
 |   def __init__(self, size): | 
 |     self.size = size | 
 |  | 
 |  | 
 | class FakePartition(object): | 
 |   """Fake PartitionUpdate field for testing.""" | 
 |  | 
 |   def __init__(self, partition_name, operations, old_size, new_size): | 
 |     self.partition_name = partition_name | 
 |     self.operations = operations | 
 |     self.old_partition_info = FakePartitionInfo(old_size) | 
 |     self.new_partition_info = FakePartitionInfo(new_size) | 
 |  | 
 |  | 
 | class FakeManifest(object): | 
 |   """Fake manifest for testing.""" | 
 |  | 
 |   def __init__(self): | 
 |     self.partitions = [ | 
 |         FakePartition(update_payload.common.ROOTFS, | 
 |                       [FakeOp([], [FakeExtent(1, 1), FakeExtent(2, 2)], | 
 |                               update_payload.common.OpType.REPLACE_BZ, | 
 |                               dst_length=3*4096, | 
 |                               data_offset=1, | 
 |                               data_length=1) | 
 |                       ], 1 * 4096, 3 * 4096), | 
 |         FakePartition(update_payload.common.KERNEL, | 
 |                       [FakeOp([FakeExtent(1, 1)], | 
 |                               [FakeExtent(x, x) for x in range(20)], | 
 |                               update_payload.common.OpType.SOURCE_COPY, | 
 |                               src_length=4096) | 
 |                       ], 2 * 4096, 4 * 4096), | 
 |     ] | 
 |     self.block_size = 4096 | 
 |     self.minor_version = 4 | 
 |     self.signatures_offset = None | 
 |     self.signatures_size = None | 
 |  | 
 |   def HasField(self, field_name): | 
 |     """Fake HasField method based on the python members.""" | 
 |     return hasattr(self, field_name) and getattr(self, field_name) is not None | 
 |  | 
 |  | 
 | class FakeHeader(object): | 
 |   """Fake payload header for testing.""" | 
 |  | 
 |   def __init__(self, manifest_len, metadata_signature_len): | 
 |     self.version = payload_info.MAJOR_PAYLOAD_VERSION_BRILLO | 
 |     self.manifest_len = manifest_len | 
 |     self.metadata_signature_len = metadata_signature_len | 
 |  | 
 |   @property | 
 |   def size(self): | 
 |     return 24 | 
 |  | 
 |  | 
 | class FakePayload(object): | 
 |   """Fake payload for testing.""" | 
 |  | 
 |   def __init__(self): | 
 |     self._header = FakeHeader(222, 0) | 
 |     self.header = None | 
 |     self._manifest = FakeManifest() | 
 |     self.manifest = None | 
 |  | 
 |     self._blobs = {} | 
 |     self._payload_signatures = update_metadata_pb2.Signatures() | 
 |     self._metadata_signatures = update_metadata_pb2.Signatures() | 
 |  | 
 |   def Init(self): | 
 |     """Fake Init that sets header and manifest. | 
 |  | 
 |     Failing to call Init() will not make header and manifest available to the | 
 |     test. | 
 |     """ | 
 |     self.header = self._header | 
 |     self.manifest = self._manifest | 
 |  | 
 |   def ReadDataBlob(self, offset, length): | 
 |     """Return the blob that should be present at the offset location""" | 
 |     if not offset in self._blobs: | 
 |       raise FakePayloadError('Requested blob at unknown offset %d' % offset) | 
 |     blob = self._blobs[offset] | 
 |     if len(blob) != length: | 
 |       raise FakePayloadError('Read blob with the wrong length (expect: %d, ' | 
 |                              'actual: %d)' % (len(blob), length)) | 
 |     return blob | 
 |  | 
 |   @staticmethod | 
 |   def _AddSignatureToProto(proto, **kwargs): | 
 |     """Add a new Signature element to the passed proto.""" | 
 |     new_signature = proto.signatures.add() | 
 |     for key, val in kwargs.items(): | 
 |       setattr(new_signature, key, val) | 
 |  | 
 |   def AddPayloadSignature(self, **kwargs): | 
 |     self._AddSignatureToProto(self._payload_signatures, **kwargs) | 
 |     blob = self._payload_signatures.SerializeToString() | 
 |     self._manifest.signatures_offset = 1234 | 
 |     self._manifest.signatures_size = len(blob) | 
 |     self._blobs[self._manifest.signatures_offset] = blob | 
 |  | 
 |   def AddMetadataSignature(self, **kwargs): | 
 |     self._AddSignatureToProto(self._metadata_signatures, **kwargs) | 
 |     if self._header.metadata_signature_len: | 
 |       del self._blobs[-self._header.metadata_signature_len] | 
 |     blob = self._metadata_signatures.SerializeToString() | 
 |     self._header.metadata_signature_len = len(blob) | 
 |     self._blobs[-len(blob)] = blob | 
 |  | 
 |  | 
 | class PayloadCommandTest(unittest.TestCase): | 
 |   """Test class for our PayloadCommand class.""" | 
 |  | 
 |   @contextmanager | 
 |   def OutputCapturer(self): | 
 |     """A tool for capturing the sys.stdout""" | 
 |     stdout = sys.stdout | 
 |     try: | 
 |       sys.stdout = StringIO() | 
 |       yield sys.stdout | 
 |     finally: | 
 |       sys.stdout = stdout | 
 |  | 
 |   def TestCommand(self, payload_cmd, payload, expected_out): | 
 |     """A tool for testing a payload command. | 
 |  | 
 |     It tests that a payload command which runs with a given payload produces a | 
 |     correct output. | 
 |     """ | 
 |     with mock.patch.object(update_payload, 'Payload', return_value=payload), \ | 
 |          self.OutputCapturer() as output: | 
 |       payload_cmd.Run() | 
 |     self.assertEqual(output.getvalue(), expected_out) | 
 |  | 
 |   def testDisplayValue(self): | 
 |     """Verify that DisplayValue prints what we expect.""" | 
 |     with self.OutputCapturer() as output: | 
 |       payload_info.DisplayValue('key', 'value') | 
 |     self.assertEqual(output.getvalue(), 'key:                         value\n') | 
 |  | 
 |   def testRun(self): | 
 |     """Verify that Run parses and displays the payload like we expect.""" | 
 |     payload_cmd = payload_info.PayloadCommand(FakeOption(action='show')) | 
 |     payload = FakePayload() | 
 |     expected_out = """Payload version:             2 | 
 | Manifest length:             222 | 
 | Number of partitions:        2 | 
 |   Number of "root" ops:      1 | 
 |   Number of "kernel" ops:    1 | 
 | Block size:                  4096 | 
 | Minor version:               4 | 
 | """ | 
 |     self.TestCommand(payload_cmd, payload, expected_out) | 
 |  | 
 |   def testListOpsOnVersion2(self): | 
 |     """Verify that the --list_ops option gives the correct output.""" | 
 |     payload_cmd = payload_info.PayloadCommand( | 
 |         FakeOption(list_ops=True, action='show')) | 
 |     payload = FakePayload() | 
 |     expected_out = """Payload version:             2 | 
 | Manifest length:             222 | 
 | Number of partitions:        2 | 
 |   Number of "root" ops:      1 | 
 |   Number of "kernel" ops:    1 | 
 | Block size:                  4096 | 
 | Minor version:               4 | 
 |  | 
 | root install operations: | 
 |   0: REPLACE_BZ | 
 |     Data offset: 1 | 
 |     Data length: 1 | 
 |     Destination: 2 extents (3 blocks) | 
 |       (1,1) (2,2) | 
 | kernel install operations: | 
 |   0: SOURCE_COPY | 
 |     Source: 1 extent (1 block) | 
 |       (1,1) | 
 |     Destination: 20 extents (190 blocks) | 
 |       (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10) | 
 |       (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19) | 
 | """ | 
 |     self.TestCommand(payload_cmd, payload, expected_out) | 
 |  | 
 |   def testStatsOnVersion2(self): | 
 |     """Verify that the --stats option works correctly on version 2.""" | 
 |     payload_cmd = payload_info.PayloadCommand( | 
 |         FakeOption(stats=True, action='show')) | 
 |     payload = FakePayload() | 
 |     expected_out = """Payload version:             2 | 
 | Manifest length:             222 | 
 | Number of partitions:        2 | 
 |   Number of "root" ops:      1 | 
 |   Number of "kernel" ops:    1 | 
 | Block size:                  4096 | 
 | Minor version:               4 | 
 | Blocks read:                 11 | 
 | Blocks written:              193 | 
 | Seeks when writing:          18 | 
 | """ | 
 |     self.TestCommand(payload_cmd, payload, expected_out) | 
 |  | 
 |   def testEmptySignatures(self): | 
 |     """Verify that the --signatures option works with unsigned payloads.""" | 
 |     payload_cmd = payload_info.PayloadCommand( | 
 |         FakeOption(action='show', signatures=True)) | 
 |     payload = FakePayload() | 
 |     expected_out = """Payload version:             2 | 
 | Manifest length:             222 | 
 | Number of partitions:        2 | 
 |   Number of "root" ops:      1 | 
 |   Number of "kernel" ops:    1 | 
 | Block size:                  4096 | 
 | Minor version:               4 | 
 | No metadata signatures stored in the payload | 
 | No payload signatures stored in the payload | 
 | """ | 
 |     self.TestCommand(payload_cmd, payload, expected_out) | 
 |  | 
 |   def testSignatures(self): | 
 |     """Verify that the --signatures option shows the present signatures.""" | 
 |     payload_cmd = payload_info.PayloadCommand( | 
 |         FakeOption(action='show', signatures=True)) | 
 |     payload = FakePayload() | 
 |     payload.AddPayloadSignature(version=1, | 
 |                                 data=b'12345678abcdefgh\x00\x01\x02\x03') | 
 |     payload.AddPayloadSignature(data=b'I am a signature so access is yes.') | 
 |     payload.AddMetadataSignature(data=b'\x00\x0a\x0c') | 
 |     expected_out = """Payload version:             2 | 
 | Manifest length:             222 | 
 | Number of partitions:        2 | 
 |   Number of "root" ops:      1 | 
 |   Number of "kernel" ops:    1 | 
 | Block size:                  4096 | 
 | Minor version:               4 | 
 | Metadata signatures blob:    file_offset=246 (7 bytes) | 
 | Metadata signatures: (1 entries) | 
 |   version=None, hex_data: (3 bytes) | 
 |     00 0a 0c                                        | ... | 
 | Payload signatures blob:     blob_offset=1234 (64 bytes) | 
 | Payload signatures: (2 entries) | 
 |   version=1, hex_data: (20 bytes) | 
 |     31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh | 
 |     00 01 02 03                                     | .... | 
 |   version=None, hex_data: (34 bytes) | 
 |     49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature | 
 |     20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 |  so access is ye | 
 |     73 2e                                           | s. | 
 | """ | 
 |     self.TestCommand(payload_cmd, payload, expected_out) | 
 |  | 
 |  | 
 | if __name__ == '__main__': | 
 |   unittest.main() |