blob: 2c686371b50eb8e50cd41f01ee8bde10171b030f [file] [log] [blame]
Gilad Arnold5502b562013-03-08 13:22:31 -08001#!/usr/bin/python
2#
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Unit testing checker.py."""
8
9import array
10import collections
11import cStringIO
12import hashlib
13import itertools
14import os
15import unittest
16
Gilad Arnoldcb638912013-06-24 04:57:11 -070017# pylint cannot find mox.
Gilad Arnold5502b562013-03-08 13:22:31 -080018# pylint: disable=F0401
19import mox
20
21import checker
22import common
Gilad Arnoldcb638912013-06-24 04:57:11 -070023import payload as update_payload # Avoid name conflicts later.
Gilad Arnold5502b562013-03-08 13:22:31 -080024import test_utils
25import update_metadata_pb2
26
27
Gilad Arnold5502b562013-03-08 13:22:31 -080028def _OpTypeByName(op_name):
29 op_name_to_type = {
30 'REPLACE': common.OpType.REPLACE,
31 'REPLACE_BZ': common.OpType.REPLACE_BZ,
32 'MOVE': common.OpType.MOVE,
33 'BSDIFF': common.OpType.BSDIFF,
34 }
35 return op_name_to_type[op_name]
36
37
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070038def _GetPayloadChecker(payload_gen_write_to_file_func, payload_gen_dargs=None,
39 checker_init_dargs=None):
Gilad Arnold5502b562013-03-08 13:22:31 -080040 """Returns a payload checker from a given payload generator."""
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070041 if payload_gen_dargs is None:
42 payload_gen_dargs = {}
43 if checker_init_dargs is None:
44 checker_init_dargs = {}
45
Gilad Arnold5502b562013-03-08 13:22:31 -080046 payload_file = cStringIO.StringIO()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070047 payload_gen_write_to_file_func(payload_file, **payload_gen_dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -080048 payload_file.seek(0)
49 payload = update_payload.Payload(payload_file)
50 payload.Init()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070051 return checker.PayloadChecker(payload, **checker_init_dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -080052
53
54def _GetPayloadCheckerWithData(payload_gen):
55 """Returns a payload checker from a given payload generator."""
56 payload_file = cStringIO.StringIO()
57 payload_gen.WriteToFile(payload_file)
58 payload_file.seek(0)
59 payload = update_payload.Payload(payload_file)
60 payload.Init()
61 return checker.PayloadChecker(payload)
62
63
Gilad Arnoldcb638912013-06-24 04:57:11 -070064# This class doesn't need an __init__().
Gilad Arnold5502b562013-03-08 13:22:31 -080065# pylint: disable=W0232
Gilad Arnoldcb638912013-06-24 04:57:11 -070066# Unit testing is all about running protected methods.
Gilad Arnold5502b562013-03-08 13:22:31 -080067# pylint: disable=W0212
Gilad Arnoldcb638912013-06-24 04:57:11 -070068# Don't bark about missing members of classes you cannot import.
Gilad Arnold5502b562013-03-08 13:22:31 -080069# pylint: disable=E1101
70class PayloadCheckerTest(mox.MoxTestBase):
71 """Tests the PayloadChecker class.
72
73 In addition to ordinary testFoo() methods, which are automatically invoked by
74 the unittest framework, in this class we make use of DoBarTest() calls that
75 implement parametric tests of certain features. In order to invoke each test,
76 which embodies a unique combination of parameter values, as a complete unit
77 test, we perform explicit enumeration of the parameter space and create
78 individual invocation contexts for each, which are then bound as
79 testBar__param1=val1__param2=val2(). The enumeration of parameter spaces for
80 all such tests is done in AddAllParametricTests().
Gilad Arnold5502b562013-03-08 13:22:31 -080081 """
82
83 def MockPayload(self):
84 """Create a mock payload object, complete with a mock menifest."""
85 payload = self.mox.CreateMock(update_payload.Payload)
86 payload.is_init = True
87 payload.manifest = self.mox.CreateMock(
88 update_metadata_pb2.DeltaArchiveManifest)
89 return payload
90
91 @staticmethod
92 def NewExtent(start_block, num_blocks):
93 """Returns an Extent message.
94
95 Each of the provided fields is set iff it is >= 0; otherwise, it's left at
96 its default state.
97
98 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -070099 start_block: The starting block of the extent.
100 num_blocks: The number of blocks in the extent.
Gilad Arnold5502b562013-03-08 13:22:31 -0800101 Returns:
102 An Extent message.
Gilad Arnold5502b562013-03-08 13:22:31 -0800103 """
104 ex = update_metadata_pb2.Extent()
105 if start_block >= 0:
106 ex.start_block = start_block
107 if num_blocks >= 0:
108 ex.num_blocks = num_blocks
109 return ex
110
111 @staticmethod
112 def NewExtentList(*args):
113 """Returns an list of extents.
114
115 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700116 *args: (start_block, num_blocks) pairs defining the extents.
Gilad Arnold5502b562013-03-08 13:22:31 -0800117 Returns:
118 A list of Extent objects.
Gilad Arnold5502b562013-03-08 13:22:31 -0800119 """
120 ex_list = []
121 for start_block, num_blocks in args:
122 ex_list.append(PayloadCheckerTest.NewExtent(start_block, num_blocks))
123 return ex_list
124
125 @staticmethod
126 def AddToMessage(repeated_field, field_vals):
127 for field_val in field_vals:
128 new_field = repeated_field.add()
129 new_field.CopyFrom(field_val)
130
Gilad Arnoldcb638912013-06-24 04:57:11 -0700131 # The production environment uses an older Python, this isn't an override.
132 # pylint: disable=W0221
Gilad Arnold5502b562013-03-08 13:22:31 -0800133 def assertIsNone(self, val):
134 """Asserts that val is None (TODO remove once we upgrade to Python 2.7).
135
136 Note that we're using assertEqual so as for it to show us the actual
137 non-None value.
138
139 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700140 val: Value/object to be equated to None.
Gilad Arnold5502b562013-03-08 13:22:31 -0800141 """
Gilad Arnoldcb638912013-06-24 04:57:11 -0700142 self.assertIs(None, val)
Gilad Arnold5502b562013-03-08 13:22:31 -0800143
144 def SetupAddElemTest(self, is_present, is_submsg, convert=str,
145 linebreak=False, indent=0):
146 """Setup for testing of _CheckElem() and its derivatives.
147
148 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700149 is_present: Whether or not the element is found in the message.
150 is_submsg: Whether the element is a sub-message itself.
151 convert: A representation conversion function.
152 linebreak: Whether or not a linebreak is to be used in the report.
153 indent: Indentation used for the report.
Gilad Arnold5502b562013-03-08 13:22:31 -0800154 Returns:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700155 msg: A mock message object.
156 report: A mock report object.
157 subreport: A mock sub-report object.
158 name: An element name to check.
159 val: Expected element value.
Gilad Arnold5502b562013-03-08 13:22:31 -0800160 """
161 name = 'foo'
162 val = 'fake submsg' if is_submsg else 'fake field'
163 subreport = 'fake subreport'
164
165 # Create a mock message.
166 msg = self.mox.CreateMock(update_metadata_pb2.message.Message)
167 msg.HasField(name).AndReturn(is_present)
168 setattr(msg, name, val)
169
170 # Create a mock report.
171 report = self.mox.CreateMock(checker._PayloadReport)
172 if is_present:
173 if is_submsg:
174 report.AddSubReport(name).AndReturn(subreport)
175 else:
176 report.AddField(name, convert(val), linebreak=linebreak, indent=indent)
177
178 self.mox.ReplayAll()
179 return (msg, report, subreport, name, val)
180
181 def DoAddElemTest(self, is_present, is_mandatory, is_submsg, convert,
182 linebreak, indent):
183 """Parametric testing of _CheckElem().
184
185 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700186 is_present: Whether or not the element is found in the message.
187 is_mandatory: Whether or not it's a mandatory element.
188 is_submsg: Whether the element is a sub-message itself.
189 convert: A representation conversion function.
190 linebreak: Whether or not a linebreak is to be used in the report.
191 indent: Indentation used for the report.
Gilad Arnold5502b562013-03-08 13:22:31 -0800192 """
193 msg, report, subreport, name, val = self.SetupAddElemTest(
194 is_present, is_submsg, convert, linebreak, indent)
195
Gilad Arnoldcb638912013-06-24 04:57:11 -0700196 args = (msg, name, report, is_mandatory, is_submsg)
197 kwargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
Gilad Arnold5502b562013-03-08 13:22:31 -0800198 if is_mandatory and not is_present:
199 self.assertRaises(update_payload.PayloadError,
Gilad Arnoldcb638912013-06-24 04:57:11 -0700200 checker.PayloadChecker._CheckElem, *args, **kwargs)
Gilad Arnold5502b562013-03-08 13:22:31 -0800201 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700202 ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*args,
203 **kwargs)
204 self.assertEquals(val if is_present else None, ret_val)
205 self.assertEquals(subreport if is_present and is_submsg else None,
206 ret_subreport)
Gilad Arnold5502b562013-03-08 13:22:31 -0800207
208 def DoAddFieldTest(self, is_mandatory, is_present, convert, linebreak,
209 indent):
210 """Parametric testing of _Check{Mandatory,Optional}Field().
211
212 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700213 is_mandatory: Whether we're testing a mandatory call.
214 is_present: Whether or not the element is found in the message.
215 convert: A representation conversion function.
216 linebreak: Whether or not a linebreak is to be used in the report.
217 indent: Indentation used for the report.
Gilad Arnold5502b562013-03-08 13:22:31 -0800218 """
219 msg, report, _, name, val = self.SetupAddElemTest(
220 is_present, False, convert, linebreak, indent)
221
222 # Prepare for invocation of the tested method.
Gilad Arnoldcb638912013-06-24 04:57:11 -0700223 args = [msg, name, report]
224 kwargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
Gilad Arnold5502b562013-03-08 13:22:31 -0800225 if is_mandatory:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700226 args.append('bar')
Gilad Arnold5502b562013-03-08 13:22:31 -0800227 tested_func = checker.PayloadChecker._CheckMandatoryField
228 else:
229 tested_func = checker.PayloadChecker._CheckOptionalField
230
231 # Test the method call.
232 if is_mandatory and not is_present:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700233 self.assertRaises(update_payload.PayloadError, tested_func, *args,
234 **kwargs)
Gilad Arnold5502b562013-03-08 13:22:31 -0800235 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700236 ret_val = tested_func(*args, **kwargs)
237 self.assertEquals(val if is_present else None, ret_val)
Gilad Arnold5502b562013-03-08 13:22:31 -0800238
239 def DoAddSubMsgTest(self, is_mandatory, is_present):
240 """Parametrized testing of _Check{Mandatory,Optional}SubMsg().
241
242 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700243 is_mandatory: Whether we're testing a mandatory call.
244 is_present: Whether or not the element is found in the message.
Gilad Arnold5502b562013-03-08 13:22:31 -0800245 """
246 msg, report, subreport, name, val = self.SetupAddElemTest(is_present, True)
247
248 # Prepare for invocation of the tested method.
Gilad Arnoldcb638912013-06-24 04:57:11 -0700249 args = [msg, name, report]
Gilad Arnold5502b562013-03-08 13:22:31 -0800250 if is_mandatory:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700251 args.append('bar')
Gilad Arnold5502b562013-03-08 13:22:31 -0800252 tested_func = checker.PayloadChecker._CheckMandatorySubMsg
253 else:
254 tested_func = checker.PayloadChecker._CheckOptionalSubMsg
255
256 # Test the method call.
257 if is_mandatory and not is_present:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700258 self.assertRaises(update_payload.PayloadError, tested_func, *args)
Gilad Arnold5502b562013-03-08 13:22:31 -0800259 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700260 ret_val, ret_subreport = tested_func(*args)
261 self.assertEquals(val if is_present else None, ret_val)
262 self.assertEquals(subreport if is_present else None, ret_subreport)
Gilad Arnold5502b562013-03-08 13:22:31 -0800263
264 def testCheckPresentIff(self):
265 """Tests _CheckPresentIff()."""
266 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
267 None, None, 'foo', 'bar', 'baz'))
268 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
269 'a', 'b', 'foo', 'bar', 'baz'))
270 self.assertRaises(update_payload.PayloadError,
271 checker.PayloadChecker._CheckPresentIff,
272 'a', None, 'foo', 'bar', 'baz')
273 self.assertRaises(update_payload.PayloadError,
274 checker.PayloadChecker._CheckPresentIff,
275 None, 'b', 'foo', 'bar', 'baz')
276
277 def DoCheckSha256SignatureTest(self, expect_pass, expect_subprocess_call,
278 sig_data, sig_asn1_header,
279 returned_signed_hash, expected_signed_hash):
280 """Parametric testing of _CheckSha256SignatureTest().
281
282 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700283 expect_pass: Whether or not it should pass.
284 expect_subprocess_call: Whether to expect the openssl call to happen.
285 sig_data: The signature raw data.
286 sig_asn1_header: The ASN1 header.
287 returned_signed_hash: The signed hash data retuned by openssl.
288 expected_signed_hash: The signed hash data to compare against.
Gilad Arnold5502b562013-03-08 13:22:31 -0800289 """
Gilad Arnoldcb638912013-06-24 04:57:11 -0700290 try:
291 # Stub out the subprocess invocation.
292 self.mox.StubOutWithMock(checker.PayloadChecker, '_Run')
293 if expect_subprocess_call:
294 checker.PayloadChecker._Run(
295 mox.IsA(list), send_data=sig_data).AndReturn(
296 (sig_asn1_header + returned_signed_hash, None))
Gilad Arnold5502b562013-03-08 13:22:31 -0800297
Gilad Arnoldcb638912013-06-24 04:57:11 -0700298 self.mox.ReplayAll()
299 if expect_pass:
300 self.assertIsNone(checker.PayloadChecker._CheckSha256Signature(
301 sig_data, 'foo', expected_signed_hash, 'bar'))
302 else:
303 self.assertRaises(update_payload.PayloadError,
304 checker.PayloadChecker._CheckSha256Signature,
305 sig_data, 'foo', expected_signed_hash, 'bar')
306 finally:
307 self.mox.UnsetStubs()
Gilad Arnold5502b562013-03-08 13:22:31 -0800308
309 def testCheckSha256Signature_Pass(self):
310 """Tests _CheckSha256Signature(); pass case."""
311 sig_data = 'fake-signature'.ljust(256)
312 signed_hash = hashlib.sha256('fake-data').digest()
313 self.DoCheckSha256SignatureTest(True, True, sig_data,
314 common.SIG_ASN1_HEADER, signed_hash,
315 signed_hash)
316
317 def testCheckSha256Signature_FailBadSignature(self):
318 """Tests _CheckSha256Signature(); fails due to malformed signature."""
Gilad Arnoldcb638912013-06-24 04:57:11 -0700319 sig_data = 'fake-signature' # Malformed (not 256 bytes in length).
Gilad Arnold5502b562013-03-08 13:22:31 -0800320 signed_hash = hashlib.sha256('fake-data').digest()
321 self.DoCheckSha256SignatureTest(False, False, sig_data,
322 common.SIG_ASN1_HEADER, signed_hash,
323 signed_hash)
324
325 def testCheckSha256Signature_FailBadOutputLength(self):
326 """Tests _CheckSha256Signature(); fails due to unexpected output length."""
327 sig_data = 'fake-signature'.ljust(256)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700328 signed_hash = 'fake-hash' # Malformed (not 32 bytes in length).
Gilad Arnold5502b562013-03-08 13:22:31 -0800329 self.DoCheckSha256SignatureTest(False, True, sig_data,
330 common.SIG_ASN1_HEADER, signed_hash,
331 signed_hash)
332
333 def testCheckSha256Signature_FailBadAsnHeader(self):
334 """Tests _CheckSha256Signature(); fails due to bad ASN1 header."""
335 sig_data = 'fake-signature'.ljust(256)
336 signed_hash = hashlib.sha256('fake-data').digest()
337 bad_asn1_header = 'bad-asn-header'.ljust(len(common.SIG_ASN1_HEADER))
338 self.DoCheckSha256SignatureTest(False, True, sig_data, bad_asn1_header,
339 signed_hash, signed_hash)
340
341 def testCheckSha256Signature_FailBadHash(self):
342 """Tests _CheckSha256Signature(); fails due to bad hash returned."""
343 sig_data = 'fake-signature'.ljust(256)
344 expected_signed_hash = hashlib.sha256('fake-data').digest()
345 returned_signed_hash = hashlib.sha256('bad-fake-data').digest()
346 self.DoCheckSha256SignatureTest(False, True, sig_data,
347 common.SIG_ASN1_HEADER,
348 expected_signed_hash, returned_signed_hash)
349
350 def testCheckBlocksFitLength_Pass(self):
351 """Tests _CheckBlocksFitLength(); pass case."""
352 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
353 64, 4, 16, 'foo'))
354 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
355 60, 4, 16, 'foo'))
356 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
357 49, 4, 16, 'foo'))
358 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
359 48, 3, 16, 'foo'))
360
361 def testCheckBlocksFitLength_TooManyBlocks(self):
362 """Tests _CheckBlocksFitLength(); fails due to excess blocks."""
363 self.assertRaises(update_payload.PayloadError,
364 checker.PayloadChecker._CheckBlocksFitLength,
365 64, 5, 16, 'foo')
366 self.assertRaises(update_payload.PayloadError,
367 checker.PayloadChecker._CheckBlocksFitLength,
368 60, 5, 16, 'foo')
369 self.assertRaises(update_payload.PayloadError,
370 checker.PayloadChecker._CheckBlocksFitLength,
371 49, 5, 16, 'foo')
372 self.assertRaises(update_payload.PayloadError,
373 checker.PayloadChecker._CheckBlocksFitLength,
374 48, 4, 16, 'foo')
375
376 def testCheckBlocksFitLength_TooFewBlocks(self):
377 """Tests _CheckBlocksFitLength(); fails due to insufficient blocks."""
378 self.assertRaises(update_payload.PayloadError,
379 checker.PayloadChecker._CheckBlocksFitLength,
380 64, 3, 16, 'foo')
381 self.assertRaises(update_payload.PayloadError,
382 checker.PayloadChecker._CheckBlocksFitLength,
383 60, 3, 16, 'foo')
384 self.assertRaises(update_payload.PayloadError,
385 checker.PayloadChecker._CheckBlocksFitLength,
386 49, 3, 16, 'foo')
387 self.assertRaises(update_payload.PayloadError,
388 checker.PayloadChecker._CheckBlocksFitLength,
389 48, 2, 16, 'foo')
390
391 def DoCheckManifestTest(self, fail_mismatched_block_size, fail_bad_sigs,
392 fail_mismatched_oki_ori, fail_bad_oki, fail_bad_ori,
Gilad Arnold382df5c2013-05-03 12:49:28 -0700393 fail_bad_nki, fail_bad_nri, fail_missing_ops,
394 fail_old_kernel_fs_size, fail_old_rootfs_fs_size,
395 fail_new_kernel_fs_size, fail_new_rootfs_fs_size):
Gilad Arnold5502b562013-03-08 13:22:31 -0800396 """Parametric testing of _CheckManifest().
397
398 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700399 fail_mismatched_block_size: Simulate a missing block_size field.
400 fail_bad_sigs: Make signatures descriptor inconsistent.
401 fail_mismatched_oki_ori: Make old rootfs/kernel info partially present.
402 fail_bad_oki: Tamper with old kernel info.
403 fail_bad_ori: Tamper with old rootfs info.
404 fail_bad_nki: Tamper with new kernel info.
405 fail_bad_nri: Tamper with new rootfs info.
406 fail_missing_ops: Simulate a manifest without any operations.
407 fail_old_kernel_fs_size: Make old kernel fs size too big.
408 fail_old_rootfs_fs_size: Make old rootfs fs size too big.
409 fail_new_kernel_fs_size: Make new kernel fs size too big.
410 fail_new_rootfs_fs_size: Make new rootfs fs size too big.
Gilad Arnold5502b562013-03-08 13:22:31 -0800411 """
412 # Generate a test payload. For this test, we only care about the manifest
413 # and don't need any data blobs, hence we can use a plain paylaod generator
414 # (which also gives us more control on things that can be screwed up).
415 payload_gen = test_utils.PayloadGenerator()
416
417 # Tamper with block size, if required.
418 if fail_mismatched_block_size:
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700419 payload_gen.SetBlockSize(test_utils.KiB(1))
Gilad Arnold5502b562013-03-08 13:22:31 -0800420 else:
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700421 payload_gen.SetBlockSize(test_utils.KiB(4))
Gilad Arnold5502b562013-03-08 13:22:31 -0800422
423 # Add some operations.
424 if not fail_missing_ops:
425 payload_gen.AddOperation(False, common.OpType.MOVE,
426 src_extents=[(0, 16), (16, 497)],
427 dst_extents=[(16, 496), (0, 16)])
428 payload_gen.AddOperation(True, common.OpType.MOVE,
429 src_extents=[(0, 8), (8, 8)],
430 dst_extents=[(8, 8), (0, 8)])
431
432 # Set an invalid signatures block (offset but no size), if required.
433 if fail_bad_sigs:
434 payload_gen.SetSignatures(32, None)
435
Gilad Arnold382df5c2013-05-03 12:49:28 -0700436 # Set partition / filesystem sizes.
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700437 rootfs_part_size = test_utils.MiB(8)
438 kernel_part_size = test_utils.KiB(512)
Gilad Arnold382df5c2013-05-03 12:49:28 -0700439 old_rootfs_fs_size = new_rootfs_fs_size = rootfs_part_size
440 old_kernel_fs_size = new_kernel_fs_size = kernel_part_size
441 if fail_old_kernel_fs_size:
442 old_kernel_fs_size += 100
443 if fail_old_rootfs_fs_size:
444 old_rootfs_fs_size += 100
445 if fail_new_kernel_fs_size:
446 new_kernel_fs_size += 100
447 if fail_new_rootfs_fs_size:
448 new_rootfs_fs_size += 100
449
Gilad Arnold5502b562013-03-08 13:22:31 -0800450 # Add old kernel/rootfs partition info, as required.
Gilad Arnold382df5c2013-05-03 12:49:28 -0700451 if fail_mismatched_oki_ori or fail_old_kernel_fs_size or fail_bad_oki:
Gilad Arnold5502b562013-03-08 13:22:31 -0800452 oki_hash = (None if fail_bad_oki
453 else hashlib.sha256('fake-oki-content').digest())
Gilad Arnold382df5c2013-05-03 12:49:28 -0700454 payload_gen.SetPartInfo(True, False, old_kernel_fs_size, oki_hash)
455 if not fail_mismatched_oki_ori and (fail_old_rootfs_fs_size or
456 fail_bad_ori):
457 ori_hash = (None if fail_bad_ori
458 else hashlib.sha256('fake-ori-content').digest())
459 payload_gen.SetPartInfo(False, False, old_rootfs_fs_size, ori_hash)
Gilad Arnold5502b562013-03-08 13:22:31 -0800460
461 # Add new kernel/rootfs partition info.
462 payload_gen.SetPartInfo(
Gilad Arnold382df5c2013-05-03 12:49:28 -0700463 True, True, new_kernel_fs_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800464 None if fail_bad_nki else hashlib.sha256('fake-nki-content').digest())
465 payload_gen.SetPartInfo(
Gilad Arnold382df5c2013-05-03 12:49:28 -0700466 False, True, new_rootfs_fs_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800467 None if fail_bad_nri else hashlib.sha256('fake-nri-content').digest())
468
469 # Create the test object.
470 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile)
471 report = checker._PayloadReport()
472
473 should_fail = (fail_mismatched_block_size or fail_bad_sigs or
474 fail_mismatched_oki_ori or fail_bad_oki or fail_bad_ori or
Gilad Arnold382df5c2013-05-03 12:49:28 -0700475 fail_bad_nki or fail_bad_nri or fail_missing_ops or
476 fail_old_kernel_fs_size or fail_old_rootfs_fs_size or
477 fail_new_kernel_fs_size or fail_new_rootfs_fs_size)
Gilad Arnold5502b562013-03-08 13:22:31 -0800478 if should_fail:
479 self.assertRaises(update_payload.PayloadError,
Gilad Arnold382df5c2013-05-03 12:49:28 -0700480 payload_checker._CheckManifest, report,
481 rootfs_part_size, kernel_part_size)
Gilad Arnold5502b562013-03-08 13:22:31 -0800482 else:
Gilad Arnold382df5c2013-05-03 12:49:28 -0700483 self.assertIsNone(payload_checker._CheckManifest(report,
484 rootfs_part_size,
485 kernel_part_size))
Gilad Arnold5502b562013-03-08 13:22:31 -0800486
487 def testCheckLength(self):
488 """Tests _CheckLength()."""
489 payload_checker = checker.PayloadChecker(self.MockPayload())
490 block_size = payload_checker.block_size
491
492 # Passes.
493 self.assertIsNone(payload_checker._CheckLength(
494 int(3.5 * block_size), 4, 'foo', 'bar'))
495 # Fails, too few blocks.
496 self.assertRaises(update_payload.PayloadError,
497 payload_checker._CheckLength,
498 int(3.5 * block_size), 3, 'foo', 'bar')
499 # Fails, too many blocks.
500 self.assertRaises(update_payload.PayloadError,
501 payload_checker._CheckLength,
502 int(3.5 * block_size), 5, 'foo', 'bar')
503
504 def testCheckExtents(self):
505 """Tests _CheckExtents()."""
506 payload_checker = checker.PayloadChecker(self.MockPayload())
507 block_size = payload_checker.block_size
508
509 # Passes w/ all real extents.
510 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
511 self.assertEquals(
Gilad Arnoldcb638912013-06-24 04:57:11 -0700512 23,
Gilad Arnold5502b562013-03-08 13:22:31 -0800513 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
Gilad Arnoldcb638912013-06-24 04:57:11 -0700514 collections.defaultdict(int), 'foo'))
Gilad Arnold5502b562013-03-08 13:22:31 -0800515
516 # Passes w/ pseudo-extents (aka sparse holes).
517 extents = self.NewExtentList((0, 4), (common.PSEUDO_EXTENT_MARKER, 5),
518 (8, 3))
519 self.assertEquals(
Gilad Arnoldcb638912013-06-24 04:57:11 -0700520 12,
Gilad Arnold5502b562013-03-08 13:22:31 -0800521 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
522 collections.defaultdict(int), 'foo',
Gilad Arnoldcb638912013-06-24 04:57:11 -0700523 allow_pseudo=True))
Gilad Arnold5502b562013-03-08 13:22:31 -0800524
525 # Passes w/ pseudo-extent due to a signature.
526 extents = self.NewExtentList((common.PSEUDO_EXTENT_MARKER, 2))
527 self.assertEquals(
Gilad Arnoldcb638912013-06-24 04:57:11 -0700528 2,
Gilad Arnold5502b562013-03-08 13:22:31 -0800529 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
530 collections.defaultdict(int), 'foo',
Gilad Arnoldcb638912013-06-24 04:57:11 -0700531 allow_signature=True))
Gilad Arnold5502b562013-03-08 13:22:31 -0800532
533 # Fails, extent missing a start block.
534 extents = self.NewExtentList((-1, 4), (8, 3), (1024, 16))
535 self.assertRaises(
536 update_payload.PayloadError, payload_checker._CheckExtents,
537 extents, (1024 + 16) * block_size, collections.defaultdict(int),
538 'foo')
539
540 # Fails, extent missing block count.
541 extents = self.NewExtentList((0, -1), (8, 3), (1024, 16))
542 self.assertRaises(
543 update_payload.PayloadError, payload_checker._CheckExtents,
544 extents, (1024 + 16) * block_size, collections.defaultdict(int),
545 'foo')
546
547 # Fails, extent has zero blocks.
548 extents = self.NewExtentList((0, 4), (8, 3), (1024, 0))
549 self.assertRaises(
550 update_payload.PayloadError, payload_checker._CheckExtents,
551 extents, (1024 + 16) * block_size, collections.defaultdict(int),
552 'foo')
553
554 # Fails, extent exceeds partition boundaries.
555 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
556 self.assertRaises(
557 update_payload.PayloadError, payload_checker._CheckExtents,
558 extents, (1024 + 15) * block_size, collections.defaultdict(int),
559 'foo')
560
561 def testCheckReplaceOperation(self):
562 """Tests _CheckReplaceOperation() where op.type == REPLACE."""
563 payload_checker = checker.PayloadChecker(self.MockPayload())
564 block_size = payload_checker.block_size
565 data_length = 10000
566
567 op = self.mox.CreateMock(
568 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
569 op.type = common.OpType.REPLACE
570
571 # Pass.
572 op.src_extents = []
573 self.assertIsNone(
574 payload_checker._CheckReplaceOperation(
575 op, data_length, (data_length + block_size - 1) / block_size,
576 'foo'))
577
578 # Fail, src extents founds.
579 op.src_extents = ['bar']
580 self.assertRaises(
581 update_payload.PayloadError,
582 payload_checker._CheckReplaceOperation,
583 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
584
585 # Fail, missing data.
586 op.src_extents = []
587 self.assertRaises(
588 update_payload.PayloadError,
589 payload_checker._CheckReplaceOperation,
590 op, None, (data_length + block_size - 1) / block_size, 'foo')
591
592 # Fail, length / block number mismatch.
593 op.src_extents = ['bar']
594 self.assertRaises(
595 update_payload.PayloadError,
596 payload_checker._CheckReplaceOperation,
597 op, data_length, (data_length + block_size - 1) / block_size + 1, 'foo')
598
599 def testCheckReplaceBzOperation(self):
600 """Tests _CheckReplaceOperation() where op.type == REPLACE_BZ."""
601 payload_checker = checker.PayloadChecker(self.MockPayload())
602 block_size = payload_checker.block_size
603 data_length = block_size * 3
604
605 op = self.mox.CreateMock(
606 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
607 op.type = common.OpType.REPLACE_BZ
608
609 # Pass.
610 op.src_extents = []
611 self.assertIsNone(
612 payload_checker._CheckReplaceOperation(
613 op, data_length, (data_length + block_size - 1) / block_size + 5,
614 'foo'))
615
616 # Fail, src extents founds.
617 op.src_extents = ['bar']
618 self.assertRaises(
619 update_payload.PayloadError,
620 payload_checker._CheckReplaceOperation,
621 op, data_length, (data_length + block_size - 1) / block_size + 5, 'foo')
622
623 # Fail, missing data.
624 op.src_extents = []
625 self.assertRaises(
626 update_payload.PayloadError,
627 payload_checker._CheckReplaceOperation,
628 op, None, (data_length + block_size - 1) / block_size, 'foo')
629
630 # Fail, too few blocks to justify BZ.
631 op.src_extents = []
632 self.assertRaises(
633 update_payload.PayloadError,
634 payload_checker._CheckReplaceOperation,
635 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
636
637 def testCheckMoveOperation_Pass(self):
638 """Tests _CheckMoveOperation(); pass case."""
639 payload_checker = checker.PayloadChecker(self.MockPayload())
640 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
641 op.type = common.OpType.MOVE
642
643 self.AddToMessage(op.src_extents,
644 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
645 self.AddToMessage(op.dst_extents,
646 self.NewExtentList((16, 128), (512, 6)))
647 self.assertIsNone(
648 payload_checker._CheckMoveOperation(op, None, 134, 134, 'foo'))
649
650 def testCheckMoveOperation_FailContainsData(self):
651 """Tests _CheckMoveOperation(); fails, message contains data."""
652 payload_checker = checker.PayloadChecker(self.MockPayload())
653 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
654 op.type = common.OpType.MOVE
655
656 self.AddToMessage(op.src_extents,
657 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
658 self.AddToMessage(op.dst_extents,
659 self.NewExtentList((16, 128), (512, 6)))
660 self.assertRaises(
661 update_payload.PayloadError,
662 payload_checker._CheckMoveOperation,
663 op, 1024, 134, 134, 'foo')
664
665 def testCheckMoveOperation_FailInsufficientSrcBlocks(self):
666 """Tests _CheckMoveOperation(); fails, not enough actual src blocks."""
667 payload_checker = checker.PayloadChecker(self.MockPayload())
668 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
669 op.type = common.OpType.MOVE
670
671 self.AddToMessage(op.src_extents,
672 self.NewExtentList((0, 4), (12, 2), (1024, 127)))
673 self.AddToMessage(op.dst_extents,
674 self.NewExtentList((16, 128), (512, 6)))
675 self.assertRaises(
676 update_payload.PayloadError,
677 payload_checker._CheckMoveOperation,
678 op, None, 134, 134, 'foo')
679
680 def testCheckMoveOperation_FailInsufficientDstBlocks(self):
681 """Tests _CheckMoveOperation(); fails, not enough actual dst blocks."""
682 payload_checker = checker.PayloadChecker(self.MockPayload())
683 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
684 op.type = common.OpType.MOVE
685
686 self.AddToMessage(op.src_extents,
687 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
688 self.AddToMessage(op.dst_extents,
689 self.NewExtentList((16, 128), (512, 5)))
690 self.assertRaises(
691 update_payload.PayloadError,
692 payload_checker._CheckMoveOperation,
693 op, None, 134, 134, 'foo')
694
695 def testCheckMoveOperation_FailExcessSrcBlocks(self):
696 """Tests _CheckMoveOperation(); fails, too many actual src blocks."""
697 payload_checker = checker.PayloadChecker(self.MockPayload())
698 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
699 op.type = common.OpType.MOVE
700
701 self.AddToMessage(op.src_extents,
702 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
703 self.AddToMessage(op.dst_extents,
704 self.NewExtentList((16, 128), (512, 5)))
705 self.assertRaises(
706 update_payload.PayloadError,
707 payload_checker._CheckMoveOperation,
708 op, None, 134, 134, 'foo')
709 self.AddToMessage(op.src_extents,
710 self.NewExtentList((0, 4), (12, 2), (1024, 129)))
711 self.AddToMessage(op.dst_extents,
712 self.NewExtentList((16, 128), (512, 6)))
713 self.assertRaises(
714 update_payload.PayloadError,
715 payload_checker._CheckMoveOperation,
716 op, None, 134, 134, 'foo')
717
718 def testCheckMoveOperation_FailExcessDstBlocks(self):
719 """Tests _CheckMoveOperation(); fails, too many actual dst blocks."""
720 payload_checker = checker.PayloadChecker(self.MockPayload())
721 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
722 op.type = common.OpType.MOVE
723
724 self.AddToMessage(op.src_extents,
725 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
726 self.AddToMessage(op.dst_extents,
727 self.NewExtentList((16, 128), (512, 7)))
728 self.assertRaises(
729 update_payload.PayloadError,
730 payload_checker._CheckMoveOperation,
731 op, None, 134, 134, 'foo')
732
733 def testCheckMoveOperation_FailStagnantBlocks(self):
734 """Tests _CheckMoveOperation(); fails, there are blocks that do not move."""
735 payload_checker = checker.PayloadChecker(self.MockPayload())
736 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
737 op.type = common.OpType.MOVE
738
739 self.AddToMessage(op.src_extents,
740 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
741 self.AddToMessage(op.dst_extents,
742 self.NewExtentList((8, 128), (512, 6)))
743 self.assertRaises(
744 update_payload.PayloadError,
745 payload_checker._CheckMoveOperation,
746 op, None, 134, 134, 'foo')
747
748 def testCheckBsdiff(self):
749 """Tests _CheckMoveOperation()."""
750 payload_checker = checker.PayloadChecker(self.MockPayload())
751
752 # Pass.
753 self.assertIsNone(
754 payload_checker._CheckBsdiffOperation(10000, 3, 'foo'))
755
756 # Fail, missing data blob.
757 self.assertRaises(
758 update_payload.PayloadError,
759 payload_checker._CheckBsdiffOperation,
760 None, 3, 'foo')
761
762 # Fail, too big of a diff blob (unjustified).
763 self.assertRaises(
764 update_payload.PayloadError,
765 payload_checker._CheckBsdiffOperation,
766 10000, 2, 'foo')
767
768 def DoCheckOperationTest(self, op_type_name, is_last, allow_signature,
769 allow_unhashed, fail_src_extents, fail_dst_extents,
770 fail_mismatched_data_offset_length,
771 fail_missing_dst_extents, fail_src_length,
772 fail_dst_length, fail_data_hash,
773 fail_prev_data_offset):
774 """Parametric testing of _CheckOperation().
775
776 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700777 op_type_name: 'REPLACE', 'REPLACE_BZ', 'MOVE' or 'BSDIFF'.
778 is_last: Whether we're testing the last operation in a sequence.
779 allow_signature: Whether we're testing a signature-capable operation.
780 allow_unhashed: Whether we're allowing to not hash the data.
781 fail_src_extents: Tamper with src extents.
782 fail_dst_extents: Tamper with dst extents.
783 fail_mismatched_data_offset_length: Make data_{offset,length}
784 inconsistent.
785 fail_missing_dst_extents: Do not include dst extents.
786 fail_src_length: Make src length inconsistent.
787 fail_dst_length: Make dst length inconsistent.
788 fail_data_hash: Tamper with the data blob hash.
789 fail_prev_data_offset: Make data space uses incontiguous.
Gilad Arnold5502b562013-03-08 13:22:31 -0800790 """
791 op_type = _OpTypeByName(op_type_name)
792
793 # Create the test object.
794 payload = self.MockPayload()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700795 payload_checker = checker.PayloadChecker(payload,
796 allow_unhashed=allow_unhashed)
Gilad Arnold5502b562013-03-08 13:22:31 -0800797 block_size = payload_checker.block_size
798
799 # Create auxiliary arguments.
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700800 old_part_size = test_utils.MiB(4)
801 new_part_size = test_utils.MiB(8)
Gilad Arnold5502b562013-03-08 13:22:31 -0800802 old_block_counters = array.array(
803 'B', [0] * ((old_part_size + block_size - 1) / block_size))
804 new_block_counters = array.array(
805 'B', [0] * ((new_part_size + block_size - 1) / block_size))
806 prev_data_offset = 1876
807 blob_hash_counts = collections.defaultdict(int)
808
809 # Create the operation object for the test.
810 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
811 op.type = op_type
812
813 total_src_blocks = 0
814 if op_type in (common.OpType.MOVE, common.OpType.BSDIFF):
815 if fail_src_extents:
816 self.AddToMessage(op.src_extents,
817 self.NewExtentList((0, 0)))
818 else:
819 self.AddToMessage(op.src_extents,
820 self.NewExtentList((0, 16)))
821 total_src_blocks = 16
822
823 if op_type != common.OpType.MOVE:
824 if not fail_mismatched_data_offset_length:
825 op.data_length = 16 * block_size - 8
826 if fail_prev_data_offset:
827 op.data_offset = prev_data_offset + 16
828 else:
829 op.data_offset = prev_data_offset
830
831 fake_data = 'fake-data'.ljust(op.data_length)
832 if not (allow_unhashed or (is_last and allow_signature and
833 op_type == common.OpType.REPLACE)):
834 if not fail_data_hash:
835 # Create a valid data blob hash.
836 op.data_sha256_hash = hashlib.sha256(fake_data).digest()
837 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
838 fake_data)
839 elif fail_data_hash:
840 # Create an invalid data blob hash.
841 op.data_sha256_hash = hashlib.sha256(
842 fake_data.replace(' ', '-')).digest()
843 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
844 fake_data)
845
846 total_dst_blocks = 0
847 if not fail_missing_dst_extents:
848 total_dst_blocks = 16
849 if fail_dst_extents:
850 self.AddToMessage(op.dst_extents,
851 self.NewExtentList((4, 16), (32, 0)))
852 else:
853 self.AddToMessage(op.dst_extents,
854 self.NewExtentList((4, 8), (64, 8)))
855
856 if total_src_blocks:
857 if fail_src_length:
858 op.src_length = total_src_blocks * block_size + 8
859 else:
860 op.src_length = total_src_blocks * block_size
861 elif fail_src_length:
862 # Add an orphaned src_length.
863 op.src_length = 16
864
865 if total_dst_blocks:
866 if fail_dst_length:
867 op.dst_length = total_dst_blocks * block_size + 8
868 else:
869 op.dst_length = total_dst_blocks * block_size
870
871 self.mox.ReplayAll()
872 should_fail = (fail_src_extents or fail_dst_extents or
873 fail_mismatched_data_offset_length or
874 fail_missing_dst_extents or fail_src_length or
875 fail_dst_length or fail_data_hash or fail_prev_data_offset)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700876 args = (op, 'foo', is_last, old_block_counters, new_block_counters,
877 old_part_size, new_part_size, prev_data_offset, allow_signature,
878 blob_hash_counts)
Gilad Arnold5502b562013-03-08 13:22:31 -0800879 if should_fail:
880 self.assertRaises(update_payload.PayloadError,
Gilad Arnoldcb638912013-06-24 04:57:11 -0700881 payload_checker._CheckOperation, *args)
Gilad Arnold5502b562013-03-08 13:22:31 -0800882 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700883 self.assertEqual(op.data_length if op.HasField('data_length') else 0,
884 payload_checker._CheckOperation(*args))
Gilad Arnold5502b562013-03-08 13:22:31 -0800885
886 def testAllocBlockCounters(self):
887 """Tests _CheckMoveOperation()."""
888 payload_checker = checker.PayloadChecker(self.MockPayload())
889 block_size = payload_checker.block_size
890
891 # Check allocation for block-aligned partition size, ensure it's integers.
892 result = payload_checker._AllocBlockCounters(16 * block_size)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700893 self.assertEqual(16, len(result))
894 self.assertEqual(int, type(result[0]))
Gilad Arnold5502b562013-03-08 13:22:31 -0800895
896 # Check allocation of unaligned partition sizes.
897 result = payload_checker._AllocBlockCounters(16 * block_size - 1)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700898 self.assertEqual(16, len(result))
Gilad Arnold5502b562013-03-08 13:22:31 -0800899 result = payload_checker._AllocBlockCounters(16 * block_size + 1)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700900 self.assertEqual(17, len(result))
Gilad Arnold5502b562013-03-08 13:22:31 -0800901
902 def DoCheckOperationsTest(self, fail_bad_type,
903 fail_nonexhaustive_full_update):
904 # Generate a test payload. For this test, we only care about one
905 # (arbitrary) set of operations, so we'll only be generating kernel and
906 # test with them.
907 payload_gen = test_utils.PayloadGenerator()
908
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700909 block_size = test_utils.KiB(4)
Gilad Arnold5502b562013-03-08 13:22:31 -0800910 payload_gen.SetBlockSize(block_size)
911
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700912 rootfs_part_size = test_utils.MiB(8)
Gilad Arnold5502b562013-03-08 13:22:31 -0800913
914 # Fake rootfs operations in a full update, tampered with as required.
915 rootfs_op_type = common.OpType.REPLACE
916 if fail_bad_type:
917 # Choose a type value that's bigger than the highest valid value.
918 for valid_op_type in common.OpType.ALL:
919 rootfs_op_type = max(rootfs_op_type, valid_op_type)
920 rootfs_op_type += 1
921
922 rootfs_data_length = rootfs_part_size
923 if fail_nonexhaustive_full_update:
924 rootfs_data_length -= block_size
925
926 payload_gen.AddOperation(False, rootfs_op_type,
927 dst_extents=[(0, rootfs_data_length / block_size)],
928 data_offset=0,
929 data_length=rootfs_data_length)
930
931 # Create the test object.
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700932 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile,
933 checker_init_dargs={
934 'allow_unhashed': True})
Gilad Arnold5502b562013-03-08 13:22:31 -0800935 payload_checker.payload_type = checker._TYPE_FULL
936 report = checker._PayloadReport()
937
938 should_fail = (fail_bad_type or fail_nonexhaustive_full_update)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700939 args = (payload_checker.payload.manifest.install_operations, report,
940 'foo', 0, rootfs_part_size, rootfs_part_size, 0, False)
Gilad Arnold5502b562013-03-08 13:22:31 -0800941 if should_fail:
942 self.assertRaises(update_payload.PayloadError,
Gilad Arnoldcb638912013-06-24 04:57:11 -0700943 payload_checker._CheckOperations, *args)
Gilad Arnold5502b562013-03-08 13:22:31 -0800944 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700945 self.assertEqual(rootfs_data_length,
946 payload_checker._CheckOperations(*args))
Gilad Arnold5502b562013-03-08 13:22:31 -0800947
948 def DoCheckSignaturesTest(self, fail_empty_sigs_blob, fail_missing_pseudo_op,
949 fail_mismatched_pseudo_op, fail_sig_missing_fields,
950 fail_unknown_sig_version, fail_incorrect_sig):
951 # Generate a test payload. For this test, we only care about the signature
952 # block and how it relates to the payload hash. Therefore, we're generating
953 # a random (otherwise useless) payload for this purpose.
954 payload_gen = test_utils.EnhancedPayloadGenerator()
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700955 block_size = test_utils.KiB(4)
Gilad Arnold5502b562013-03-08 13:22:31 -0800956 payload_gen.SetBlockSize(block_size)
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700957 rootfs_part_size = test_utils.MiB(2)
958 kernel_part_size = test_utils.KiB(16)
Gilad Arnold5502b562013-03-08 13:22:31 -0800959 payload_gen.SetPartInfo(False, True, rootfs_part_size,
960 hashlib.sha256('fake-new-rootfs-content').digest())
Gilad Arnold382df5c2013-05-03 12:49:28 -0700961 payload_gen.SetPartInfo(True, True, kernel_part_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800962 hashlib.sha256('fake-new-kernel-content').digest())
963 payload_gen.AddOperationWithData(
964 False, common.OpType.REPLACE,
965 dst_extents=[(0, rootfs_part_size / block_size)],
966 data_blob=os.urandom(rootfs_part_size))
967
968 do_forge_pseudo_op = (fail_missing_pseudo_op or fail_mismatched_pseudo_op)
969 do_forge_sigs_data = (do_forge_pseudo_op or fail_empty_sigs_blob or
970 fail_sig_missing_fields or fail_unknown_sig_version
971 or fail_incorrect_sig)
972
973 sigs_data = None
974 if do_forge_sigs_data:
975 sigs_gen = test_utils.SignaturesGenerator()
976 if not fail_empty_sigs_blob:
977 if fail_sig_missing_fields:
978 sig_data = None
979 else:
980 sig_data = test_utils.SignSha256('fake-payload-content',
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700981 test_utils._PRIVKEY_FILE_NAME)
Gilad Arnold5502b562013-03-08 13:22:31 -0800982 sigs_gen.AddSig(5 if fail_unknown_sig_version else 1, sig_data)
983
984 sigs_data = sigs_gen.ToBinary()
985 payload_gen.SetSignatures(payload_gen.curr_offset, len(sigs_data))
986
987 if do_forge_pseudo_op:
988 assert sigs_data is not None, 'should have forged signatures blob by now'
989 sigs_len = len(sigs_data)
990 payload_gen.AddOperation(
991 False, common.OpType.REPLACE,
992 data_offset=payload_gen.curr_offset / 2,
993 data_length=sigs_len / 2,
994 dst_extents=[(0, (sigs_len / 2 + block_size - 1) / block_size)])
995
996 # Generate payload (complete w/ signature) and create the test object.
997 payload_checker = _GetPayloadChecker(
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700998 payload_gen.WriteToFileWithData,
999 payload_gen_dargs={
1000 'sigs_data': sigs_data,
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001001 'privkey_file_name': test_utils._PRIVKEY_FILE_NAME,
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001002 'do_add_pseudo_operation': not do_forge_pseudo_op})
Gilad Arnold5502b562013-03-08 13:22:31 -08001003 payload_checker.payload_type = checker._TYPE_FULL
1004 report = checker._PayloadReport()
1005
1006 # We have to check the manifest first in order to set signature attributes.
Gilad Arnold382df5c2013-05-03 12:49:28 -07001007 payload_checker._CheckManifest(report, rootfs_part_size, kernel_part_size)
Gilad Arnold5502b562013-03-08 13:22:31 -08001008
1009 should_fail = (fail_empty_sigs_blob or fail_missing_pseudo_op or
1010 fail_mismatched_pseudo_op or fail_sig_missing_fields or
1011 fail_unknown_sig_version or fail_incorrect_sig)
Gilad Arnoldcb638912013-06-24 04:57:11 -07001012 args = (report, test_utils._PUBKEY_FILE_NAME)
Gilad Arnold5502b562013-03-08 13:22:31 -08001013 if should_fail:
1014 self.assertRaises(update_payload.PayloadError,
Gilad Arnoldcb638912013-06-24 04:57:11 -07001015 payload_checker._CheckSignatures, *args)
Gilad Arnold5502b562013-03-08 13:22:31 -08001016 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001017 self.assertIsNone(payload_checker._CheckSignatures(*args))
Gilad Arnold5502b562013-03-08 13:22:31 -08001018
1019 def DoRunTest(self, fail_wrong_payload_type, fail_invalid_block_size,
1020 fail_mismatched_block_size, fail_excess_data):
1021 # Generate a test payload. For this test, we generate a full update that
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001022 # has sample kernel and rootfs operations. Since most testing is done with
Gilad Arnold5502b562013-03-08 13:22:31 -08001023 # internal PayloadChecker methods that are tested elsewhere, here we only
1024 # tamper with what's actually being manipulated and/or tested in the Run()
1025 # method itself. Note that the checker doesn't verify partition hashes, so
1026 # they're safe to fake.
1027 payload_gen = test_utils.EnhancedPayloadGenerator()
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001028 block_size = test_utils.KiB(4)
Gilad Arnold5502b562013-03-08 13:22:31 -08001029 payload_gen.SetBlockSize(block_size)
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001030 kernel_part_size = test_utils.KiB(16)
1031 rootfs_part_size = test_utils.MiB(2)
Gilad Arnold5502b562013-03-08 13:22:31 -08001032 payload_gen.SetPartInfo(False, True, rootfs_part_size,
1033 hashlib.sha256('fake-new-rootfs-content').digest())
1034 payload_gen.SetPartInfo(True, True, kernel_part_size,
1035 hashlib.sha256('fake-new-kernel-content').digest())
1036 payload_gen.AddOperationWithData(
1037 False, common.OpType.REPLACE,
1038 dst_extents=[(0, rootfs_part_size / block_size)],
1039 data_blob=os.urandom(rootfs_part_size))
1040 payload_gen.AddOperationWithData(
1041 True, common.OpType.REPLACE,
1042 dst_extents=[(0, kernel_part_size / block_size)],
1043 data_blob=os.urandom(kernel_part_size))
1044
1045 # Generate payload (complete w/ signature) and create the test object.
Gilad Arnold5502b562013-03-08 13:22:31 -08001046 if fail_invalid_block_size:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001047 use_block_size = block_size + 5 # Not a power of two.
Gilad Arnold5502b562013-03-08 13:22:31 -08001048 elif fail_mismatched_block_size:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001049 use_block_size = block_size * 2 # Different that payload stated.
Gilad Arnold5502b562013-03-08 13:22:31 -08001050 else:
1051 use_block_size = block_size
Gilad Arnold5502b562013-03-08 13:22:31 -08001052
Gilad Arnoldcb638912013-06-24 04:57:11 -07001053 kwargs = {
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001054 'payload_gen_dargs': {
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001055 'privkey_file_name': test_utils._PRIVKEY_FILE_NAME,
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001056 'do_add_pseudo_operation': True,
1057 'is_pseudo_in_kernel': True,
1058 'padding': os.urandom(1024) if fail_excess_data else None},
1059 'checker_init_dargs': {
1060 'assert_type': 'delta' if fail_wrong_payload_type else 'full',
1061 'block_size': use_block_size}}
1062 if fail_invalid_block_size:
1063 self.assertRaises(update_payload.PayloadError, _GetPayloadChecker,
Gilad Arnoldcb638912013-06-24 04:57:11 -07001064 payload_gen.WriteToFileWithData, **kwargs)
Gilad Arnold5502b562013-03-08 13:22:31 -08001065 else:
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001066 payload_checker = _GetPayloadChecker(payload_gen.WriteToFileWithData,
Gilad Arnoldcb638912013-06-24 04:57:11 -07001067 **kwargs)
1068 kwargs = {'pubkey_file_name': test_utils._PUBKEY_FILE_NAME}
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001069 should_fail = (fail_wrong_payload_type or fail_mismatched_block_size or
1070 fail_excess_data)
1071 if should_fail:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001072 self.assertRaises(update_payload.PayloadError, payload_checker.Run,
1073 **kwargs)
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001074 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001075 self.assertIsNone(payload_checker.Run(**kwargs))
Gilad Arnold5502b562013-03-08 13:22:31 -08001076
1077
1078# This implements a generic API, hence the occasional unused args.
1079# pylint: disable=W0613
1080def ValidateCheckOperationTest(op_type_name, is_last, allow_signature,
1081 allow_unhashed, fail_src_extents,
1082 fail_dst_extents,
1083 fail_mismatched_data_offset_length,
1084 fail_missing_dst_extents, fail_src_length,
1085 fail_dst_length, fail_data_hash,
1086 fail_prev_data_offset):
1087 """Returns True iff the combination of arguments represents a valid test."""
1088 op_type = _OpTypeByName(op_type_name)
1089
1090 # REPLACE/REPLACE_BZ operations don't read data from src partition.
1091 if (op_type in (common.OpType.REPLACE, common.OpType.REPLACE_BZ) and (
1092 fail_src_extents or fail_src_length)):
1093 return False
1094
1095 # MOVE operations don't carry data.
1096 if (op_type == common.OpType.MOVE and (
1097 fail_mismatched_data_offset_length or fail_data_hash or
1098 fail_prev_data_offset)):
1099 return False
1100
1101 return True
1102
1103
1104def TestMethodBody(run_method_name, run_dargs):
1105 """Returns a function that invokes a named method with named arguments."""
1106 return lambda self: getattr(self, run_method_name)(**run_dargs)
1107
1108
1109def AddParametricTests(tested_method_name, arg_space, validate_func=None):
1110 """Enumerates and adds specific parametric tests to PayloadCheckerTest.
1111
1112 This function enumerates a space of test parameters (defined by arg_space),
1113 then binds a new, unique method name in PayloadCheckerTest to a test function
1114 that gets handed the said parameters. This is a preferable approach to doing
1115 the enumeration and invocation during the tests because this way each test is
1116 treated as a complete run by the unittest framework, and so benefits from the
1117 usual setUp/tearDown mechanics.
1118
1119 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001120 tested_method_name: Name of the tested PayloadChecker method.
1121 arg_space: A dictionary containing variables (keys) and lists of values
1122 (values) associated with them.
1123 validate_func: A function used for validating test argument combinations.
Gilad Arnold5502b562013-03-08 13:22:31 -08001124 """
1125 for value_tuple in itertools.product(*arg_space.itervalues()):
1126 run_dargs = dict(zip(arg_space.iterkeys(), value_tuple))
1127 if validate_func and not validate_func(**run_dargs):
1128 continue
1129 run_method_name = 'Do%sTest' % tested_method_name
1130 test_method_name = 'test%s' % tested_method_name
1131 for arg_key, arg_val in run_dargs.iteritems():
1132 if arg_val or type(arg_val) is int:
1133 test_method_name += '__%s=%s' % (arg_key, arg_val)
1134 setattr(PayloadCheckerTest, test_method_name,
1135 TestMethodBody(run_method_name, run_dargs))
1136
1137
1138def AddAllParametricTests():
1139 """Enumerates and adds all parametric tests to PayloadCheckerTest."""
1140 # Add all _CheckElem() test cases.
1141 AddParametricTests('AddElem',
1142 {'linebreak': (True, False),
1143 'indent': (0, 1, 2),
1144 'convert': (str, lambda s: s[::-1]),
1145 'is_present': (True, False),
1146 'is_mandatory': (True, False),
1147 'is_submsg': (True, False)})
1148
1149 # Add all _Add{Mandatory,Optional}Field tests.
1150 AddParametricTests('AddField',
1151 {'is_mandatory': (True, False),
1152 'linebreak': (True, False),
1153 'indent': (0, 1, 2),
1154 'convert': (str, lambda s: s[::-1]),
1155 'is_present': (True, False)})
1156
1157 # Add all _Add{Mandatory,Optional}SubMsg tests.
1158 AddParametricTests('AddSubMsg',
1159 {'is_mandatory': (True, False),
1160 'is_present': (True, False)})
1161
1162 # Add all _CheckManifest() test cases.
1163 AddParametricTests('CheckManifest',
1164 {'fail_mismatched_block_size': (True, False),
1165 'fail_bad_sigs': (True, False),
1166 'fail_mismatched_oki_ori': (True, False),
1167 'fail_bad_oki': (True, False),
1168 'fail_bad_ori': (True, False),
1169 'fail_bad_nki': (True, False),
1170 'fail_bad_nri': (True, False),
Gilad Arnold382df5c2013-05-03 12:49:28 -07001171 'fail_missing_ops': (True, False),
1172 'fail_old_kernel_fs_size': (True, False),
1173 'fail_old_rootfs_fs_size': (True, False),
1174 'fail_new_kernel_fs_size': (True, False),
1175 'fail_new_rootfs_fs_size': (True, False)})
Gilad Arnold5502b562013-03-08 13:22:31 -08001176
1177 # Add all _CheckOperation() test cases.
1178 AddParametricTests('CheckOperation',
1179 {'op_type_name': ('REPLACE', 'REPLACE_BZ', 'MOVE',
1180 'BSDIFF'),
1181 'is_last': (True, False),
1182 'allow_signature': (True, False),
1183 'allow_unhashed': (True, False),
1184 'fail_src_extents': (True, False),
1185 'fail_dst_extents': (True, False),
1186 'fail_mismatched_data_offset_length': (True, False),
1187 'fail_missing_dst_extents': (True, False),
1188 'fail_src_length': (True, False),
1189 'fail_dst_length': (True, False),
1190 'fail_data_hash': (True, False),
1191 'fail_prev_data_offset': (True, False)},
1192 validate_func=ValidateCheckOperationTest)
1193
1194 # Add all _CheckOperations() test cases.
1195 AddParametricTests('CheckOperations',
1196 {'fail_bad_type': (True, False),
1197 'fail_nonexhaustive_full_update': (True, False)})
1198
1199 # Add all _CheckOperations() test cases.
1200 AddParametricTests('CheckSignatures',
1201 {'fail_empty_sigs_blob': (True, False),
1202 'fail_missing_pseudo_op': (True, False),
1203 'fail_mismatched_pseudo_op': (True, False),
1204 'fail_sig_missing_fields': (True, False),
1205 'fail_unknown_sig_version': (True, False),
1206 'fail_incorrect_sig': (True, False)})
1207
1208 # Add all Run() test cases.
1209 AddParametricTests('Run',
1210 {'fail_wrong_payload_type': (True, False),
1211 'fail_invalid_block_size': (True, False),
1212 'fail_mismatched_block_size': (True, False),
1213 'fail_excess_data': (True, False)})
1214
1215
1216if __name__ == '__main__':
1217 AddAllParametricTests()
1218 unittest.main()