Amin Hassani | 50dd2f0 | 2018-07-19 19:21:29 -0700 | [diff] [blame^] | 1 | # -*- coding: utf-8 -*- |
| 2 | # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| 3 | # Use of this source code is governed by a BSD-style license that can be |
| 4 | # found in the LICENSE file. |
| 5 | |
| 6 | """This module tests the cros payload command.""" |
| 7 | |
| 8 | from __future__ import print_function |
| 9 | |
| 10 | import collections |
| 11 | import sys |
| 12 | |
| 13 | from chromite.lib import constants |
| 14 | from chromite.cli.cros import cros_payload |
| 15 | from chromite.lib import cros_test_lib |
| 16 | |
| 17 | # Needed for the update_payload import below. |
| 18 | sys.path.insert(0, constants.UPDATE_ENGINE_SCRIPTS_PATH) |
| 19 | |
| 20 | # TODO(alliewood)(chromium:454629) update once update_payload is moved |
| 21 | # into chromite |
| 22 | import update_payload |
| 23 | from update_payload import update_metadata_pb2 |
| 24 | |
| 25 | class FakePayloadError(Exception): |
| 26 | """A generic error when using the FakePayload.""" |
| 27 | |
| 28 | class FakeOption(object): |
| 29 | """Fake options object for testing.""" |
| 30 | |
| 31 | def __init__(self, **kwargs): |
| 32 | self.list_ops = False |
| 33 | self.stats = False |
| 34 | self.signatures = False |
| 35 | for key, val in kwargs.iteritems(): |
| 36 | setattr(self, key, val) |
| 37 | if not hasattr(self, 'payload_file'): |
| 38 | self.payload_file = None |
| 39 | |
| 40 | class FakeOp(object): |
| 41 | """Fake manifest operation for testing.""" |
| 42 | |
| 43 | def __init__(self, src_extents, dst_extents, op_type, **kwargs): |
| 44 | self.src_extents = src_extents |
| 45 | self.dst_extents = dst_extents |
| 46 | self.type = op_type |
| 47 | for key, val in kwargs.iteritems(): |
| 48 | setattr(self, key, val) |
| 49 | |
| 50 | def HasField(self, field): |
| 51 | return hasattr(self, field) |
| 52 | |
| 53 | class FakePartition(object): |
| 54 | """Fake PartitionUpdate field for testing.""" |
| 55 | |
| 56 | def __init__(self, partition_name, operations): |
| 57 | self.partition_name = partition_name |
| 58 | self.operations = operations |
| 59 | |
| 60 | class FakeManifest(object): |
| 61 | """Fake manifest for testing.""" |
| 62 | |
| 63 | def __init__(self, major_version): |
| 64 | FakeExtent = collections.namedtuple('FakeExtent', |
| 65 | ['start_block', 'num_blocks']) |
| 66 | self.install_operations = [FakeOp([], |
| 67 | [FakeExtent(1, 1), FakeExtent(2, 2)], |
| 68 | update_payload.common.OpType.REPLACE_BZ, |
| 69 | dst_length=3*4096, |
| 70 | data_offset=1, |
| 71 | data_length=1)] |
| 72 | self.kernel_install_operations = [FakeOp( |
| 73 | [FakeExtent(1, 1)], |
| 74 | [FakeExtent(x, x) for x in xrange(20)], |
| 75 | update_payload.common.OpType.SOURCE_COPY, |
| 76 | src_length=4096)] |
| 77 | if major_version == cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO: |
| 78 | self.partitions = [FakePartition('rootfs', self.install_operations), |
| 79 | FakePartition('kernel', |
| 80 | self.kernel_install_operations)] |
| 81 | self.install_operations = self.kernel_install_operations = [] |
| 82 | self.block_size = 4096 |
| 83 | self.minor_version = 4 |
| 84 | FakePartInfo = collections.namedtuple('FakePartInfo', ['size']) |
| 85 | self.old_rootfs_info = FakePartInfo(1 * 4096) |
| 86 | self.old_kernel_info = FakePartInfo(2 * 4096) |
| 87 | self.new_rootfs_info = FakePartInfo(3 * 4096) |
| 88 | self.new_kernel_info = FakePartInfo(4 * 4096) |
| 89 | self.signatures_offset = None |
| 90 | self.signatures_size = None |
| 91 | |
| 92 | def HasField(self, field_name): |
| 93 | """Fake HasField method based on the python members.""" |
| 94 | return hasattr(self, field_name) and getattr(self, field_name) is not None |
| 95 | |
| 96 | class FakeHeader(object): |
| 97 | """Fake payload header for testing.""" |
| 98 | |
| 99 | def __init__(self, version, manifest_len, metadata_signature_len): |
| 100 | self.version = version |
| 101 | self.manifest_len = manifest_len |
| 102 | self.metadata_signature_len = metadata_signature_len |
| 103 | |
| 104 | @property |
| 105 | def size(self): |
| 106 | return (20 if self.version == cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS |
| 107 | else 24) |
| 108 | |
| 109 | |
| 110 | class FakePayload(object): |
| 111 | """Fake payload for testing.""" |
| 112 | |
| 113 | def __init__(self, major_version): |
| 114 | self._header = FakeHeader(major_version, 222, 0) |
| 115 | self.header = None |
| 116 | self._manifest = FakeManifest(major_version) |
| 117 | self.manifest = None |
| 118 | |
| 119 | self._blobs = {} |
| 120 | self._payload_signatures = update_metadata_pb2.Signatures() |
| 121 | self._metadata_signatures = update_metadata_pb2.Signatures() |
| 122 | |
| 123 | def Init(self): |
| 124 | """Fake Init that sets header and manifest. |
| 125 | |
| 126 | Failing to call Init() will not make header and manifest available to the |
| 127 | test. |
| 128 | """ |
| 129 | self.header = self._header |
| 130 | self.manifest = self._manifest |
| 131 | |
| 132 | def ReadDataBlob(self, offset, length): |
| 133 | """Return the blob that should be present at the offset location""" |
| 134 | if not offset in self._blobs: |
| 135 | raise FakePayloadError('Requested blob at unknown offset %d' % offset) |
| 136 | blob = self._blobs[offset] |
| 137 | if len(blob) != length: |
| 138 | raise FakePayloadError('Read blob with the wrong length (expect: %d, ' |
| 139 | 'actual: %d)' % (len(blob), length)) |
| 140 | return blob |
| 141 | |
| 142 | @staticmethod |
| 143 | def _AddSignatureToProto(proto, **kwargs): |
| 144 | """Add a new Signature element to the passed proto.""" |
| 145 | new_signature = proto.signatures.add() |
| 146 | for key, val in kwargs.iteritems(): |
| 147 | setattr(new_signature, key, val) |
| 148 | |
| 149 | def AddPayloadSignature(self, **kwargs): |
| 150 | self._AddSignatureToProto(self._payload_signatures, **kwargs) |
| 151 | blob = self._payload_signatures.SerializeToString() |
| 152 | self._manifest.signatures_offset = 1234 |
| 153 | self._manifest.signatures_size = len(blob) |
| 154 | self._blobs[self._manifest.signatures_offset] = blob |
| 155 | |
| 156 | def AddMetadataSignature(self, **kwargs): |
| 157 | self._AddSignatureToProto(self._metadata_signatures, **kwargs) |
| 158 | if self._header.metadata_signature_len: |
| 159 | del self._blobs[-self._header.metadata_signature_len] |
| 160 | blob = self._metadata_signatures.SerializeToString() |
| 161 | self._header.metadata_signature_len = len(blob) |
| 162 | self._blobs[-len(blob)] = blob |
| 163 | |
| 164 | |
| 165 | class PayloadCommandTest(cros_test_lib.MockOutputTestCase): |
| 166 | """Test class for our PayloadCommand class.""" |
| 167 | |
| 168 | def testDisplayValue(self): |
| 169 | """Verify that DisplayValue prints what we expect.""" |
| 170 | with self.OutputCapturer() as output: |
| 171 | cros_payload.DisplayValue('key', 'value') |
| 172 | stdout = output.GetStdout() |
| 173 | self.assertEquals(stdout, 'key: value\n') |
| 174 | |
| 175 | def testRun(self): |
| 176 | """Verify that Run parses and displays the payload like we expect.""" |
| 177 | payload_cmd = cros_payload.PayloadCommand(FakeOption(action='show')) |
| 178 | self.PatchObject(update_payload, 'Payload', return_value=FakePayload( |
| 179 | cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS)) |
| 180 | |
| 181 | with self.OutputCapturer() as output: |
| 182 | payload_cmd.Run() |
| 183 | |
| 184 | stdout = output.GetStdout() |
| 185 | expected_out = """Payload version: 1 |
| 186 | Manifest length: 222 |
| 187 | Number of operations: 1 |
| 188 | Number of kernel ops: 1 |
| 189 | Block size: 4096 |
| 190 | Minor version: 4 |
| 191 | """ |
| 192 | self.assertEquals(stdout, expected_out) |
| 193 | |
| 194 | def testListOpsOnVersion1(self): |
| 195 | """Verify that the --list_ops option gives the correct output.""" |
| 196 | payload_cmd = cros_payload.PayloadCommand(FakeOption(list_ops=True, |
| 197 | action='show')) |
| 198 | self.PatchObject(update_payload, 'Payload', return_value=FakePayload( |
| 199 | cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS)) |
| 200 | |
| 201 | with self.OutputCapturer() as output: |
| 202 | payload_cmd.Run() |
| 203 | |
| 204 | stdout = output.GetStdout() |
| 205 | expected_out = """Payload version: 1 |
| 206 | Manifest length: 222 |
| 207 | Number of operations: 1 |
| 208 | Number of kernel ops: 1 |
| 209 | Block size: 4096 |
| 210 | Minor version: 4 |
| 211 | |
| 212 | Install operations: |
| 213 | 0: REPLACE_BZ |
| 214 | Data offset: 1 |
| 215 | Data length: 1 |
| 216 | Destination: 2 extents (3 blocks) |
| 217 | (1,1) (2,2) |
| 218 | Kernel install operations: |
| 219 | 0: SOURCE_COPY |
| 220 | Source: 1 extent (1 block) |
| 221 | (1,1) |
| 222 | Destination: 20 extents (190 blocks) |
| 223 | (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10) |
| 224 | (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19) |
| 225 | """ |
| 226 | self.assertEquals(stdout, expected_out) |
| 227 | |
| 228 | def testListOpsOnVersion2(self): |
| 229 | """Verify that the --list_ops option gives the correct output.""" |
| 230 | payload_cmd = cros_payload.PayloadCommand(FakeOption(list_ops=True, |
| 231 | action='show')) |
| 232 | self.PatchObject(update_payload, 'Payload', return_value=FakePayload( |
| 233 | cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO)) |
| 234 | |
| 235 | with self.OutputCapturer() as output: |
| 236 | payload_cmd.Run() |
| 237 | |
| 238 | stdout = output.GetStdout() |
| 239 | expected_out = """Payload version: 2 |
| 240 | Manifest length: 222 |
| 241 | Number of partitions: 2 |
| 242 | Number of "rootfs" ops: 1 |
| 243 | Number of "kernel" ops: 1 |
| 244 | Block size: 4096 |
| 245 | Minor version: 4 |
| 246 | |
| 247 | rootfs install operations: |
| 248 | 0: REPLACE_BZ |
| 249 | Data offset: 1 |
| 250 | Data length: 1 |
| 251 | Destination: 2 extents (3 blocks) |
| 252 | (1,1) (2,2) |
| 253 | kernel install operations: |
| 254 | 0: SOURCE_COPY |
| 255 | Source: 1 extent (1 block) |
| 256 | (1,1) |
| 257 | Destination: 20 extents (190 blocks) |
| 258 | (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10) |
| 259 | (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19) |
| 260 | """ |
| 261 | self.assertEquals(stdout, expected_out) |
| 262 | |
| 263 | def testStatsOnVersion1(self): |
| 264 | """Verify that the --stats option works correctly.""" |
| 265 | payload_cmd = cros_payload.PayloadCommand(FakeOption(stats=True, |
| 266 | action='show')) |
| 267 | self.PatchObject(update_payload, 'Payload', return_value=FakePayload( |
| 268 | cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS)) |
| 269 | |
| 270 | with self.OutputCapturer() as output: |
| 271 | payload_cmd.Run() |
| 272 | |
| 273 | stdout = output.GetStdout() |
| 274 | expected_out = """Payload version: 1 |
| 275 | Manifest length: 222 |
| 276 | Number of operations: 1 |
| 277 | Number of kernel ops: 1 |
| 278 | Block size: 4096 |
| 279 | Minor version: 4 |
| 280 | Blocks read: 11 |
| 281 | Blocks written: 193 |
| 282 | Seeks when writing: 18 |
| 283 | """ |
| 284 | self.assertEquals(stdout, expected_out) |
| 285 | |
| 286 | def testStatsOnVersion2(self): |
| 287 | """Verify that the --stats option works correctly on version 2.""" |
| 288 | payload_cmd = cros_payload.PayloadCommand(FakeOption(stats=True, |
| 289 | action='show')) |
| 290 | self.PatchObject(update_payload, 'Payload', return_value=FakePayload( |
| 291 | cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO)) |
| 292 | |
| 293 | with self.OutputCapturer() as output: |
| 294 | payload_cmd.Run() |
| 295 | |
| 296 | stdout = output.GetStdout() |
| 297 | expected_out = """Payload version: 2 |
| 298 | Manifest length: 222 |
| 299 | Number of partitions: 2 |
| 300 | Number of "rootfs" ops: 1 |
| 301 | Number of "kernel" ops: 1 |
| 302 | Block size: 4096 |
| 303 | Minor version: 4 |
| 304 | Blocks read: 11 |
| 305 | Blocks written: 193 |
| 306 | Seeks when writing: 18 |
| 307 | """ |
| 308 | self.assertEquals(stdout, expected_out) |
| 309 | |
| 310 | def testEmptySignatures(self): |
| 311 | """Verify that the --signatures option works with unsigned payloads.""" |
| 312 | payload_cmd = cros_payload.PayloadCommand( |
| 313 | FakeOption(action='show', signatures=True)) |
| 314 | self.PatchObject(update_payload, 'Payload', return_value=FakePayload( |
| 315 | cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS)) |
| 316 | |
| 317 | with self.OutputCapturer() as output: |
| 318 | payload_cmd.Run() |
| 319 | |
| 320 | stdout = output.GetStdout() |
| 321 | expected_out = """Payload version: 1 |
| 322 | Manifest length: 222 |
| 323 | Number of operations: 1 |
| 324 | Number of kernel ops: 1 |
| 325 | Block size: 4096 |
| 326 | Minor version: 4 |
| 327 | No metadata signatures stored in the payload |
| 328 | No payload signatures stored in the payload |
| 329 | """ |
| 330 | self.assertEquals(stdout, expected_out) |
| 331 | |
| 332 | |
| 333 | def testSignatures(self): |
| 334 | """Verify that the --signatures option shows the present signatures.""" |
| 335 | payload_cmd = cros_payload.PayloadCommand( |
| 336 | FakeOption(action='show', signatures=True)) |
| 337 | payload = FakePayload(cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO) |
| 338 | payload.AddPayloadSignature(version=1, |
| 339 | data='12345678abcdefgh\x00\x01\x02\x03') |
| 340 | payload.AddPayloadSignature(data='I am a signature so access is yes.') |
| 341 | payload.AddMetadataSignature(data='\x00\x0a\x0c') |
| 342 | self.PatchObject(update_payload, 'Payload', return_value=payload) |
| 343 | |
| 344 | with self.OutputCapturer() as output: |
| 345 | payload_cmd.Run() |
| 346 | |
| 347 | stdout = output.GetStdout() |
| 348 | expected_out = """Payload version: 2 |
| 349 | Manifest length: 222 |
| 350 | Number of partitions: 2 |
| 351 | Number of "rootfs" ops: 1 |
| 352 | Number of "kernel" ops: 1 |
| 353 | Block size: 4096 |
| 354 | Minor version: 4 |
| 355 | Metadata signatures blob: file_offset=246 (7 bytes) |
| 356 | Metadata signatures: (1 entries) |
| 357 | version=None, hex_data: (3 bytes) |
| 358 | 00 0a 0c | ... |
| 359 | Payload signatures blob: blob_offset=1234 (64 bytes) |
| 360 | Payload signatures: (2 entries) |
| 361 | version=1, hex_data: (20 bytes) |
| 362 | 31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh |
| 363 | 00 01 02 03 | .... |
| 364 | version=None, hex_data: (34 bytes) |
| 365 | 49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature |
| 366 | 20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 | so access is ye |
| 367 | 73 2e | s. |
| 368 | """ |
| 369 | self.assertEquals(stdout, expected_out) |