blob: d0b43f73f83c453d147474bdd3ce4e01fc769ad5 [file] [log] [blame]
Amin Hassani50dd2f02018-07-19 19:21:29 -07001# -*- coding: utf-8 -*-
2# Copyright 2015 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""This module tests the cros payload command."""
7
8from __future__ import print_function
9
10import collections
11import sys
12
13from chromite.lib import constants
14from chromite.cli.cros import cros_payload
15from chromite.lib import cros_test_lib
16
17# Needed for the update_payload import below.
18sys.path.insert(0, constants.UPDATE_ENGINE_SCRIPTS_PATH)
19
20# TODO(alliewood)(chromium:454629) update once update_payload is moved
21# into chromite
22import update_payload
23from update_payload import update_metadata_pb2
24
25class FakePayloadError(Exception):
26 """A generic error when using the FakePayload."""
27
28class FakeOption(object):
29 """Fake options object for testing."""
30
31 def __init__(self, **kwargs):
32 self.list_ops = False
33 self.stats = False
34 self.signatures = False
35 for key, val in kwargs.iteritems():
36 setattr(self, key, val)
37 if not hasattr(self, 'payload_file'):
38 self.payload_file = None
39
40class FakeOp(object):
41 """Fake manifest operation for testing."""
42
43 def __init__(self, src_extents, dst_extents, op_type, **kwargs):
44 self.src_extents = src_extents
45 self.dst_extents = dst_extents
46 self.type = op_type
47 for key, val in kwargs.iteritems():
48 setattr(self, key, val)
49
50 def HasField(self, field):
51 return hasattr(self, field)
52
53class FakePartition(object):
54 """Fake PartitionUpdate field for testing."""
55
56 def __init__(self, partition_name, operations):
57 self.partition_name = partition_name
58 self.operations = operations
59
60class FakeManifest(object):
61 """Fake manifest for testing."""
62
63 def __init__(self, major_version):
64 FakeExtent = collections.namedtuple('FakeExtent',
65 ['start_block', 'num_blocks'])
66 self.install_operations = [FakeOp([],
67 [FakeExtent(1, 1), FakeExtent(2, 2)],
68 update_payload.common.OpType.REPLACE_BZ,
69 dst_length=3*4096,
70 data_offset=1,
71 data_length=1)]
72 self.kernel_install_operations = [FakeOp(
73 [FakeExtent(1, 1)],
74 [FakeExtent(x, x) for x in xrange(20)],
75 update_payload.common.OpType.SOURCE_COPY,
76 src_length=4096)]
77 if major_version == cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO:
78 self.partitions = [FakePartition('rootfs', self.install_operations),
79 FakePartition('kernel',
80 self.kernel_install_operations)]
81 self.install_operations = self.kernel_install_operations = []
82 self.block_size = 4096
83 self.minor_version = 4
84 FakePartInfo = collections.namedtuple('FakePartInfo', ['size'])
85 self.old_rootfs_info = FakePartInfo(1 * 4096)
86 self.old_kernel_info = FakePartInfo(2 * 4096)
87 self.new_rootfs_info = FakePartInfo(3 * 4096)
88 self.new_kernel_info = FakePartInfo(4 * 4096)
89 self.signatures_offset = None
90 self.signatures_size = None
91
92 def HasField(self, field_name):
93 """Fake HasField method based on the python members."""
94 return hasattr(self, field_name) and getattr(self, field_name) is not None
95
96class FakeHeader(object):
97 """Fake payload header for testing."""
98
99 def __init__(self, version, manifest_len, metadata_signature_len):
100 self.version = version
101 self.manifest_len = manifest_len
102 self.metadata_signature_len = metadata_signature_len
103
104 @property
105 def size(self):
106 return (20 if self.version == cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS
107 else 24)
108
109
110class FakePayload(object):
111 """Fake payload for testing."""
112
113 def __init__(self, major_version):
114 self._header = FakeHeader(major_version, 222, 0)
115 self.header = None
116 self._manifest = FakeManifest(major_version)
117 self.manifest = None
118
119 self._blobs = {}
120 self._payload_signatures = update_metadata_pb2.Signatures()
121 self._metadata_signatures = update_metadata_pb2.Signatures()
122
123 def Init(self):
124 """Fake Init that sets header and manifest.
125
126 Failing to call Init() will not make header and manifest available to the
127 test.
128 """
129 self.header = self._header
130 self.manifest = self._manifest
131
132 def ReadDataBlob(self, offset, length):
133 """Return the blob that should be present at the offset location"""
134 if not offset in self._blobs:
135 raise FakePayloadError('Requested blob at unknown offset %d' % offset)
136 blob = self._blobs[offset]
137 if len(blob) != length:
138 raise FakePayloadError('Read blob with the wrong length (expect: %d, '
139 'actual: %d)' % (len(blob), length))
140 return blob
141
142 @staticmethod
143 def _AddSignatureToProto(proto, **kwargs):
144 """Add a new Signature element to the passed proto."""
145 new_signature = proto.signatures.add()
146 for key, val in kwargs.iteritems():
147 setattr(new_signature, key, val)
148
149 def AddPayloadSignature(self, **kwargs):
150 self._AddSignatureToProto(self._payload_signatures, **kwargs)
151 blob = self._payload_signatures.SerializeToString()
152 self._manifest.signatures_offset = 1234
153 self._manifest.signatures_size = len(blob)
154 self._blobs[self._manifest.signatures_offset] = blob
155
156 def AddMetadataSignature(self, **kwargs):
157 self._AddSignatureToProto(self._metadata_signatures, **kwargs)
158 if self._header.metadata_signature_len:
159 del self._blobs[-self._header.metadata_signature_len]
160 blob = self._metadata_signatures.SerializeToString()
161 self._header.metadata_signature_len = len(blob)
162 self._blobs[-len(blob)] = blob
163
164
165class PayloadCommandTest(cros_test_lib.MockOutputTestCase):
166 """Test class for our PayloadCommand class."""
167
168 def testDisplayValue(self):
169 """Verify that DisplayValue prints what we expect."""
170 with self.OutputCapturer() as output:
171 cros_payload.DisplayValue('key', 'value')
172 stdout = output.GetStdout()
173 self.assertEquals(stdout, 'key: value\n')
174
175 def testRun(self):
176 """Verify that Run parses and displays the payload like we expect."""
177 payload_cmd = cros_payload.PayloadCommand(FakeOption(action='show'))
178 self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
179 cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
180
181 with self.OutputCapturer() as output:
182 payload_cmd.Run()
183
184 stdout = output.GetStdout()
185 expected_out = """Payload version: 1
186Manifest length: 222
187Number of operations: 1
188Number of kernel ops: 1
189Block size: 4096
190Minor version: 4
191"""
192 self.assertEquals(stdout, expected_out)
193
194 def testListOpsOnVersion1(self):
195 """Verify that the --list_ops option gives the correct output."""
196 payload_cmd = cros_payload.PayloadCommand(FakeOption(list_ops=True,
197 action='show'))
198 self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
199 cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
200
201 with self.OutputCapturer() as output:
202 payload_cmd.Run()
203
204 stdout = output.GetStdout()
205 expected_out = """Payload version: 1
206Manifest length: 222
207Number of operations: 1
208Number of kernel ops: 1
209Block size: 4096
210Minor version: 4
211
212Install operations:
213 0: REPLACE_BZ
214 Data offset: 1
215 Data length: 1
216 Destination: 2 extents (3 blocks)
217 (1,1) (2,2)
218Kernel install operations:
219 0: SOURCE_COPY
220 Source: 1 extent (1 block)
221 (1,1)
222 Destination: 20 extents (190 blocks)
223 (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
224 (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
225"""
226 self.assertEquals(stdout, expected_out)
227
228 def testListOpsOnVersion2(self):
229 """Verify that the --list_ops option gives the correct output."""
230 payload_cmd = cros_payload.PayloadCommand(FakeOption(list_ops=True,
231 action='show'))
232 self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
233 cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO))
234
235 with self.OutputCapturer() as output:
236 payload_cmd.Run()
237
238 stdout = output.GetStdout()
239 expected_out = """Payload version: 2
240Manifest length: 222
241Number of partitions: 2
242 Number of "rootfs" ops: 1
243 Number of "kernel" ops: 1
244Block size: 4096
245Minor version: 4
246
247rootfs install operations:
248 0: REPLACE_BZ
249 Data offset: 1
250 Data length: 1
251 Destination: 2 extents (3 blocks)
252 (1,1) (2,2)
253kernel install operations:
254 0: SOURCE_COPY
255 Source: 1 extent (1 block)
256 (1,1)
257 Destination: 20 extents (190 blocks)
258 (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
259 (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
260"""
261 self.assertEquals(stdout, expected_out)
262
263 def testStatsOnVersion1(self):
264 """Verify that the --stats option works correctly."""
265 payload_cmd = cros_payload.PayloadCommand(FakeOption(stats=True,
266 action='show'))
267 self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
268 cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
269
270 with self.OutputCapturer() as output:
271 payload_cmd.Run()
272
273 stdout = output.GetStdout()
274 expected_out = """Payload version: 1
275Manifest length: 222
276Number of operations: 1
277Number of kernel ops: 1
278Block size: 4096
279Minor version: 4
280Blocks read: 11
281Blocks written: 193
282Seeks when writing: 18
283"""
284 self.assertEquals(stdout, expected_out)
285
286 def testStatsOnVersion2(self):
287 """Verify that the --stats option works correctly on version 2."""
288 payload_cmd = cros_payload.PayloadCommand(FakeOption(stats=True,
289 action='show'))
290 self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
291 cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO))
292
293 with self.OutputCapturer() as output:
294 payload_cmd.Run()
295
296 stdout = output.GetStdout()
297 expected_out = """Payload version: 2
298Manifest length: 222
299Number of partitions: 2
300 Number of "rootfs" ops: 1
301 Number of "kernel" ops: 1
302Block size: 4096
303Minor version: 4
304Blocks read: 11
305Blocks written: 193
306Seeks when writing: 18
307"""
308 self.assertEquals(stdout, expected_out)
309
310 def testEmptySignatures(self):
311 """Verify that the --signatures option works with unsigned payloads."""
312 payload_cmd = cros_payload.PayloadCommand(
313 FakeOption(action='show', signatures=True))
314 self.PatchObject(update_payload, 'Payload', return_value=FakePayload(
315 cros_payload.MAJOR_PAYLOAD_VERSION_CHROMEOS))
316
317 with self.OutputCapturer() as output:
318 payload_cmd.Run()
319
320 stdout = output.GetStdout()
321 expected_out = """Payload version: 1
322Manifest length: 222
323Number of operations: 1
324Number of kernel ops: 1
325Block size: 4096
326Minor version: 4
327No metadata signatures stored in the payload
328No payload signatures stored in the payload
329"""
330 self.assertEquals(stdout, expected_out)
331
332
333 def testSignatures(self):
334 """Verify that the --signatures option shows the present signatures."""
335 payload_cmd = cros_payload.PayloadCommand(
336 FakeOption(action='show', signatures=True))
337 payload = FakePayload(cros_payload.MAJOR_PAYLOAD_VERSION_BRILLO)
338 payload.AddPayloadSignature(version=1,
339 data='12345678abcdefgh\x00\x01\x02\x03')
340 payload.AddPayloadSignature(data='I am a signature so access is yes.')
341 payload.AddMetadataSignature(data='\x00\x0a\x0c')
342 self.PatchObject(update_payload, 'Payload', return_value=payload)
343
344 with self.OutputCapturer() as output:
345 payload_cmd.Run()
346
347 stdout = output.GetStdout()
348 expected_out = """Payload version: 2
349Manifest length: 222
350Number of partitions: 2
351 Number of "rootfs" ops: 1
352 Number of "kernel" ops: 1
353Block size: 4096
354Minor version: 4
355Metadata signatures blob: file_offset=246 (7 bytes)
356Metadata signatures: (1 entries)
357 version=None, hex_data: (3 bytes)
358 00 0a 0c | ...
359Payload signatures blob: blob_offset=1234 (64 bytes)
360Payload signatures: (2 entries)
361 version=1, hex_data: (20 bytes)
362 31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh
363 00 01 02 03 | ....
364 version=None, hex_data: (34 bytes)
365 49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature
366 20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 | so access is ye
367 73 2e | s.
368"""
369 self.assertEquals(stdout, expected_out)