blob: c7a291b39f8dcac0d87a392ddda8fedf8b44e523 [file] [log] [blame]
Gilad Arnold5502b562013-03-08 13:22:31 -08001#!/usr/bin/python
2#
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Unit testing checker.py."""
8
9import array
10import collections
11import cStringIO
12import hashlib
13import itertools
14import os
15import unittest
16
17# Pylint cannot find mox.
18# pylint: disable=F0401
19import mox
20
21import checker
22import common
23import payload as update_payload # avoid name conflicts later.
24import test_utils
25import update_metadata_pb2
26
27
28_PRIVKEY_FILE_NAME = 'payload-test-key.pem'
29_PUBKEY_FILE_NAME = 'payload-test-key.pub'
30
31
32def _OpTypeByName(op_name):
33 op_name_to_type = {
34 'REPLACE': common.OpType.REPLACE,
35 'REPLACE_BZ': common.OpType.REPLACE_BZ,
36 'MOVE': common.OpType.MOVE,
37 'BSDIFF': common.OpType.BSDIFF,
38 }
39 return op_name_to_type[op_name]
40
41
42def _KiB(count):
43 """Return the byte size of a given number of (binary) kilobytes."""
44 return count << 10
45
46
47def _MiB(count):
48 """Return the byte size of a given number of (binary) megabytes."""
49 return count << 20
50
51
52def _GiB(count):
53 """Return the byte size of a given number of (binary) gigabytes."""
54 return count << 30
55
56
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070057def _GetPayloadChecker(payload_gen_write_to_file_func, payload_gen_dargs=None,
58 checker_init_dargs=None):
Gilad Arnold5502b562013-03-08 13:22:31 -080059 """Returns a payload checker from a given payload generator."""
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070060 if payload_gen_dargs is None:
61 payload_gen_dargs = {}
62 if checker_init_dargs is None:
63 checker_init_dargs = {}
64
Gilad Arnold5502b562013-03-08 13:22:31 -080065 payload_file = cStringIO.StringIO()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070066 payload_gen_write_to_file_func(payload_file, **payload_gen_dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -080067 payload_file.seek(0)
68 payload = update_payload.Payload(payload_file)
69 payload.Init()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070070 return checker.PayloadChecker(payload, **checker_init_dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -080071
72
73def _GetPayloadCheckerWithData(payload_gen):
74 """Returns a payload checker from a given payload generator."""
75 payload_file = cStringIO.StringIO()
76 payload_gen.WriteToFile(payload_file)
77 payload_file.seek(0)
78 payload = update_payload.Payload(payload_file)
79 payload.Init()
80 return checker.PayloadChecker(payload)
81
82
83# (i) this class doesn't need an __init__(); (ii) unit testing is all about
84# running protected methods; (iii) don't bark about missing members of classes
85# you cannot import.
86# pylint: disable=W0232
87# pylint: disable=W0212
88# pylint: disable=E1101
89class PayloadCheckerTest(mox.MoxTestBase):
90 """Tests the PayloadChecker class.
91
92 In addition to ordinary testFoo() methods, which are automatically invoked by
93 the unittest framework, in this class we make use of DoBarTest() calls that
94 implement parametric tests of certain features. In order to invoke each test,
95 which embodies a unique combination of parameter values, as a complete unit
96 test, we perform explicit enumeration of the parameter space and create
97 individual invocation contexts for each, which are then bound as
98 testBar__param1=val1__param2=val2(). The enumeration of parameter spaces for
99 all such tests is done in AddAllParametricTests().
100
101 """
102
103 def MockPayload(self):
104 """Create a mock payload object, complete with a mock menifest."""
105 payload = self.mox.CreateMock(update_payload.Payload)
106 payload.is_init = True
107 payload.manifest = self.mox.CreateMock(
108 update_metadata_pb2.DeltaArchiveManifest)
109 return payload
110
111 @staticmethod
112 def NewExtent(start_block, num_blocks):
113 """Returns an Extent message.
114
115 Each of the provided fields is set iff it is >= 0; otherwise, it's left at
116 its default state.
117
118 Args:
119 start_block: the starting block of the extent
120 num_blocks: the number of blocks in the extent
121 Returns:
122 An Extent message.
123
124 """
125 ex = update_metadata_pb2.Extent()
126 if start_block >= 0:
127 ex.start_block = start_block
128 if num_blocks >= 0:
129 ex.num_blocks = num_blocks
130 return ex
131
132 @staticmethod
133 def NewExtentList(*args):
134 """Returns an list of extents.
135
136 Args:
137 *args: (start_block, num_blocks) pairs defining the extents
138 Returns:
139 A list of Extent objects.
140
141 """
142 ex_list = []
143 for start_block, num_blocks in args:
144 ex_list.append(PayloadCheckerTest.NewExtent(start_block, num_blocks))
145 return ex_list
146
147 @staticmethod
148 def AddToMessage(repeated_field, field_vals):
149 for field_val in field_vals:
150 new_field = repeated_field.add()
151 new_field.CopyFrom(field_val)
152
153 def assertIsNone(self, val):
154 """Asserts that val is None (TODO remove once we upgrade to Python 2.7).
155
156 Note that we're using assertEqual so as for it to show us the actual
157 non-None value.
158
159 Args:
160 val: value/object to be equated to None
161
162 """
163 self.assertEqual(val, None)
164
165 def SetupAddElemTest(self, is_present, is_submsg, convert=str,
166 linebreak=False, indent=0):
167 """Setup for testing of _CheckElem() and its derivatives.
168
169 Args:
170 is_present: whether or not the element is found in the message
171 is_submsg: whether the element is a sub-message itself
172 convert: a representation conversion function
173 linebreak: whether or not a linebreak is to be used in the report
174 indent: indentation used for the report
175 Returns:
176 msg: a mock message object
177 report: a mock report object
178 subreport: a mock sub-report object
179 name: an element name to check
180 val: expected element value
181
182 """
183 name = 'foo'
184 val = 'fake submsg' if is_submsg else 'fake field'
185 subreport = 'fake subreport'
186
187 # Create a mock message.
188 msg = self.mox.CreateMock(update_metadata_pb2.message.Message)
189 msg.HasField(name).AndReturn(is_present)
190 setattr(msg, name, val)
191
192 # Create a mock report.
193 report = self.mox.CreateMock(checker._PayloadReport)
194 if is_present:
195 if is_submsg:
196 report.AddSubReport(name).AndReturn(subreport)
197 else:
198 report.AddField(name, convert(val), linebreak=linebreak, indent=indent)
199
200 self.mox.ReplayAll()
201 return (msg, report, subreport, name, val)
202
203 def DoAddElemTest(self, is_present, is_mandatory, is_submsg, convert,
204 linebreak, indent):
205 """Parametric testing of _CheckElem().
206
207 Args:
208 is_present: whether or not the element is found in the message
209 is_mandatory: whether or not it's a mandatory element
210 is_submsg: whether the element is a sub-message itself
211 convert: a representation conversion function
212 linebreak: whether or not a linebreak is to be used in the report
213 indent: indentation used for the report
214
215 """
216 msg, report, subreport, name, val = self.SetupAddElemTest(
217 is_present, is_submsg, convert, linebreak, indent)
218
219 largs = [msg, name, report, is_mandatory, is_submsg]
220 dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
221 if is_mandatory and not is_present:
222 self.assertRaises(update_payload.PayloadError,
223 checker.PayloadChecker._CheckElem, *largs, **dargs)
224 else:
225 ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*largs,
226 **dargs)
227 self.assertEquals(ret_val, val if is_present else None)
228 self.assertEquals(ret_subreport,
229 subreport if is_present and is_submsg else None)
230
231 def DoAddFieldTest(self, is_mandatory, is_present, convert, linebreak,
232 indent):
233 """Parametric testing of _Check{Mandatory,Optional}Field().
234
235 Args:
236 is_mandatory: whether we're testing a mandatory call
237 is_present: whether or not the element is found in the message
238 convert: a representation conversion function
239 linebreak: whether or not a linebreak is to be used in the report
240 indent: indentation used for the report
241
242 """
243 msg, report, _, name, val = self.SetupAddElemTest(
244 is_present, False, convert, linebreak, indent)
245
246 # Prepare for invocation of the tested method.
247 largs = [msg, name, report]
248 dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
249 if is_mandatory:
250 largs.append('bar')
251 tested_func = checker.PayloadChecker._CheckMandatoryField
252 else:
253 tested_func = checker.PayloadChecker._CheckOptionalField
254
255 # Test the method call.
256 if is_mandatory and not is_present:
257 self.assertRaises(update_payload.PayloadError, tested_func, *largs,
258 **dargs)
259 else:
260 ret_val = tested_func(*largs, **dargs)
261 self.assertEquals(ret_val, val if is_present else None)
262
263 def DoAddSubMsgTest(self, is_mandatory, is_present):
264 """Parametrized testing of _Check{Mandatory,Optional}SubMsg().
265
266 Args:
267 is_mandatory: whether we're testing a mandatory call
268 is_present: whether or not the element is found in the message
269
270 """
271 msg, report, subreport, name, val = self.SetupAddElemTest(is_present, True)
272
273 # Prepare for invocation of the tested method.
274 largs = [msg, name, report]
275 if is_mandatory:
276 largs.append('bar')
277 tested_func = checker.PayloadChecker._CheckMandatorySubMsg
278 else:
279 tested_func = checker.PayloadChecker._CheckOptionalSubMsg
280
281 # Test the method call.
282 if is_mandatory and not is_present:
283 self.assertRaises(update_payload.PayloadError, tested_func, *largs)
284 else:
285 ret_val, ret_subreport = tested_func(*largs)
286 self.assertEquals(ret_val, val if is_present else None)
287 self.assertEquals(ret_subreport, subreport if is_present else None)
288
289 def testCheckPresentIff(self):
290 """Tests _CheckPresentIff()."""
291 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
292 None, None, 'foo', 'bar', 'baz'))
293 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
294 'a', 'b', 'foo', 'bar', 'baz'))
295 self.assertRaises(update_payload.PayloadError,
296 checker.PayloadChecker._CheckPresentIff,
297 'a', None, 'foo', 'bar', 'baz')
298 self.assertRaises(update_payload.PayloadError,
299 checker.PayloadChecker._CheckPresentIff,
300 None, 'b', 'foo', 'bar', 'baz')
301
302 def DoCheckSha256SignatureTest(self, expect_pass, expect_subprocess_call,
303 sig_data, sig_asn1_header,
304 returned_signed_hash, expected_signed_hash):
305 """Parametric testing of _CheckSha256SignatureTest().
306
307 Args:
308 expect_pass: whether or not it should pass
309 expect_subprocess_call: whether to expect the openssl call to happen
310 sig_data: the signature raw data
311 sig_asn1_header: the ASN1 header
312 returned_signed_hash: the signed hash data retuned by openssl
313 expected_signed_hash: the signed hash data to compare against
314
315 """
316 # Stub out the subprocess invocation.
317 self.mox.StubOutWithMock(checker.PayloadChecker, '_Run')
318 if expect_subprocess_call:
319 checker.PayloadChecker._Run(mox.IsA(list), send_data=sig_data).AndReturn(
320 (sig_asn1_header + returned_signed_hash, None))
321
322 self.mox.ReplayAll()
323 if expect_pass:
324 self.assertIsNone(checker.PayloadChecker._CheckSha256Signature(
325 sig_data, 'foo', expected_signed_hash, 'bar'))
326 else:
327 self.assertRaises(update_payload.PayloadError,
328 checker.PayloadChecker._CheckSha256Signature,
329 sig_data, 'foo', expected_signed_hash, 'bar')
330
331 self.mox.UnsetStubs()
332
333 def testCheckSha256Signature_Pass(self):
334 """Tests _CheckSha256Signature(); pass case."""
335 sig_data = 'fake-signature'.ljust(256)
336 signed_hash = hashlib.sha256('fake-data').digest()
337 self.DoCheckSha256SignatureTest(True, True, sig_data,
338 common.SIG_ASN1_HEADER, signed_hash,
339 signed_hash)
340
341 def testCheckSha256Signature_FailBadSignature(self):
342 """Tests _CheckSha256Signature(); fails due to malformed signature."""
343 sig_data = 'fake-signature' # malformed (not 256 bytes in length)
344 signed_hash = hashlib.sha256('fake-data').digest()
345 self.DoCheckSha256SignatureTest(False, False, sig_data,
346 common.SIG_ASN1_HEADER, signed_hash,
347 signed_hash)
348
349 def testCheckSha256Signature_FailBadOutputLength(self):
350 """Tests _CheckSha256Signature(); fails due to unexpected output length."""
351 sig_data = 'fake-signature'.ljust(256)
352 signed_hash = 'fake-hash' # malformed (not 32 bytes in length)
353 self.DoCheckSha256SignatureTest(False, True, sig_data,
354 common.SIG_ASN1_HEADER, signed_hash,
355 signed_hash)
356
357 def testCheckSha256Signature_FailBadAsnHeader(self):
358 """Tests _CheckSha256Signature(); fails due to bad ASN1 header."""
359 sig_data = 'fake-signature'.ljust(256)
360 signed_hash = hashlib.sha256('fake-data').digest()
361 bad_asn1_header = 'bad-asn-header'.ljust(len(common.SIG_ASN1_HEADER))
362 self.DoCheckSha256SignatureTest(False, True, sig_data, bad_asn1_header,
363 signed_hash, signed_hash)
364
365 def testCheckSha256Signature_FailBadHash(self):
366 """Tests _CheckSha256Signature(); fails due to bad hash returned."""
367 sig_data = 'fake-signature'.ljust(256)
368 expected_signed_hash = hashlib.sha256('fake-data').digest()
369 returned_signed_hash = hashlib.sha256('bad-fake-data').digest()
370 self.DoCheckSha256SignatureTest(False, True, sig_data,
371 common.SIG_ASN1_HEADER,
372 expected_signed_hash, returned_signed_hash)
373
374 def testCheckBlocksFitLength_Pass(self):
375 """Tests _CheckBlocksFitLength(); pass case."""
376 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
377 64, 4, 16, 'foo'))
378 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
379 60, 4, 16, 'foo'))
380 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
381 49, 4, 16, 'foo'))
382 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
383 48, 3, 16, 'foo'))
384
385 def testCheckBlocksFitLength_TooManyBlocks(self):
386 """Tests _CheckBlocksFitLength(); fails due to excess blocks."""
387 self.assertRaises(update_payload.PayloadError,
388 checker.PayloadChecker._CheckBlocksFitLength,
389 64, 5, 16, 'foo')
390 self.assertRaises(update_payload.PayloadError,
391 checker.PayloadChecker._CheckBlocksFitLength,
392 60, 5, 16, 'foo')
393 self.assertRaises(update_payload.PayloadError,
394 checker.PayloadChecker._CheckBlocksFitLength,
395 49, 5, 16, 'foo')
396 self.assertRaises(update_payload.PayloadError,
397 checker.PayloadChecker._CheckBlocksFitLength,
398 48, 4, 16, 'foo')
399
400 def testCheckBlocksFitLength_TooFewBlocks(self):
401 """Tests _CheckBlocksFitLength(); fails due to insufficient blocks."""
402 self.assertRaises(update_payload.PayloadError,
403 checker.PayloadChecker._CheckBlocksFitLength,
404 64, 3, 16, 'foo')
405 self.assertRaises(update_payload.PayloadError,
406 checker.PayloadChecker._CheckBlocksFitLength,
407 60, 3, 16, 'foo')
408 self.assertRaises(update_payload.PayloadError,
409 checker.PayloadChecker._CheckBlocksFitLength,
410 49, 3, 16, 'foo')
411 self.assertRaises(update_payload.PayloadError,
412 checker.PayloadChecker._CheckBlocksFitLength,
413 48, 2, 16, 'foo')
414
415 def DoCheckManifestTest(self, fail_mismatched_block_size, fail_bad_sigs,
416 fail_mismatched_oki_ori, fail_bad_oki, fail_bad_ori,
Gilad Arnold382df5c2013-05-03 12:49:28 -0700417 fail_bad_nki, fail_bad_nri, fail_missing_ops,
418 fail_old_kernel_fs_size, fail_old_rootfs_fs_size,
419 fail_new_kernel_fs_size, fail_new_rootfs_fs_size):
Gilad Arnold5502b562013-03-08 13:22:31 -0800420 """Parametric testing of _CheckManifest().
421
422 Args:
423 fail_mismatched_block_size: simulate a missing block_size field
424 fail_bad_sigs: make signatures descriptor inconsistent
425 fail_mismatched_oki_ori: make old rootfs/kernel info partially present
426 fail_bad_oki: tamper with old kernel info
427 fail_bad_ori: tamper with old rootfs info
428 fail_bad_nki: tamper with new kernel info
429 fail_bad_nri: tamper with new rootfs info
430 fail_missing_ops: simulate a manifest without any operations
Gilad Arnold382df5c2013-05-03 12:49:28 -0700431 fail_old_kernel_fs_size: make old kernel fs size too big
432 fail_old_rootfs_fs_size: make old rootfs fs size too big
433 fail_new_kernel_fs_size: make new kernel fs size too big
434 fail_new_rootfs_fs_size: make new rootfs fs size too big
Gilad Arnold5502b562013-03-08 13:22:31 -0800435
436 """
437 # Generate a test payload. For this test, we only care about the manifest
438 # and don't need any data blobs, hence we can use a plain paylaod generator
439 # (which also gives us more control on things that can be screwed up).
440 payload_gen = test_utils.PayloadGenerator()
441
442 # Tamper with block size, if required.
443 if fail_mismatched_block_size:
444 payload_gen.SetBlockSize(_KiB(1))
445 else:
446 payload_gen.SetBlockSize(_KiB(4))
447
448 # Add some operations.
449 if not fail_missing_ops:
450 payload_gen.AddOperation(False, common.OpType.MOVE,
451 src_extents=[(0, 16), (16, 497)],
452 dst_extents=[(16, 496), (0, 16)])
453 payload_gen.AddOperation(True, common.OpType.MOVE,
454 src_extents=[(0, 8), (8, 8)],
455 dst_extents=[(8, 8), (0, 8)])
456
457 # Set an invalid signatures block (offset but no size), if required.
458 if fail_bad_sigs:
459 payload_gen.SetSignatures(32, None)
460
Gilad Arnold382df5c2013-05-03 12:49:28 -0700461 # Set partition / filesystem sizes.
462 rootfs_part_size = _MiB(8)
463 kernel_part_size = _KiB(512)
464 old_rootfs_fs_size = new_rootfs_fs_size = rootfs_part_size
465 old_kernel_fs_size = new_kernel_fs_size = kernel_part_size
466 if fail_old_kernel_fs_size:
467 old_kernel_fs_size += 100
468 if fail_old_rootfs_fs_size:
469 old_rootfs_fs_size += 100
470 if fail_new_kernel_fs_size:
471 new_kernel_fs_size += 100
472 if fail_new_rootfs_fs_size:
473 new_rootfs_fs_size += 100
474
Gilad Arnold5502b562013-03-08 13:22:31 -0800475 # Add old kernel/rootfs partition info, as required.
Gilad Arnold382df5c2013-05-03 12:49:28 -0700476 if fail_mismatched_oki_ori or fail_old_kernel_fs_size or fail_bad_oki:
Gilad Arnold5502b562013-03-08 13:22:31 -0800477 oki_hash = (None if fail_bad_oki
478 else hashlib.sha256('fake-oki-content').digest())
Gilad Arnold382df5c2013-05-03 12:49:28 -0700479 payload_gen.SetPartInfo(True, False, old_kernel_fs_size, oki_hash)
480 if not fail_mismatched_oki_ori and (fail_old_rootfs_fs_size or
481 fail_bad_ori):
482 ori_hash = (None if fail_bad_ori
483 else hashlib.sha256('fake-ori-content').digest())
484 payload_gen.SetPartInfo(False, False, old_rootfs_fs_size, ori_hash)
Gilad Arnold5502b562013-03-08 13:22:31 -0800485
486 # Add new kernel/rootfs partition info.
487 payload_gen.SetPartInfo(
Gilad Arnold382df5c2013-05-03 12:49:28 -0700488 True, True, new_kernel_fs_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800489 None if fail_bad_nki else hashlib.sha256('fake-nki-content').digest())
490 payload_gen.SetPartInfo(
Gilad Arnold382df5c2013-05-03 12:49:28 -0700491 False, True, new_rootfs_fs_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800492 None if fail_bad_nri else hashlib.sha256('fake-nri-content').digest())
493
494 # Create the test object.
495 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile)
496 report = checker._PayloadReport()
497
498 should_fail = (fail_mismatched_block_size or fail_bad_sigs or
499 fail_mismatched_oki_ori or fail_bad_oki or fail_bad_ori or
Gilad Arnold382df5c2013-05-03 12:49:28 -0700500 fail_bad_nki or fail_bad_nri or fail_missing_ops or
501 fail_old_kernel_fs_size or fail_old_rootfs_fs_size or
502 fail_new_kernel_fs_size or fail_new_rootfs_fs_size)
Gilad Arnold5502b562013-03-08 13:22:31 -0800503 if should_fail:
504 self.assertRaises(update_payload.PayloadError,
Gilad Arnold382df5c2013-05-03 12:49:28 -0700505 payload_checker._CheckManifest, report,
506 rootfs_part_size, kernel_part_size)
Gilad Arnold5502b562013-03-08 13:22:31 -0800507 else:
Gilad Arnold382df5c2013-05-03 12:49:28 -0700508 self.assertIsNone(payload_checker._CheckManifest(report,
509 rootfs_part_size,
510 kernel_part_size))
Gilad Arnold5502b562013-03-08 13:22:31 -0800511
512 def testCheckLength(self):
513 """Tests _CheckLength()."""
514 payload_checker = checker.PayloadChecker(self.MockPayload())
515 block_size = payload_checker.block_size
516
517 # Passes.
518 self.assertIsNone(payload_checker._CheckLength(
519 int(3.5 * block_size), 4, 'foo', 'bar'))
520 # Fails, too few blocks.
521 self.assertRaises(update_payload.PayloadError,
522 payload_checker._CheckLength,
523 int(3.5 * block_size), 3, 'foo', 'bar')
524 # Fails, too many blocks.
525 self.assertRaises(update_payload.PayloadError,
526 payload_checker._CheckLength,
527 int(3.5 * block_size), 5, 'foo', 'bar')
528
529 def testCheckExtents(self):
530 """Tests _CheckExtents()."""
531 payload_checker = checker.PayloadChecker(self.MockPayload())
532 block_size = payload_checker.block_size
533
534 # Passes w/ all real extents.
535 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
536 self.assertEquals(
537 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
538 collections.defaultdict(int), 'foo'),
539 23)
540
541 # Passes w/ pseudo-extents (aka sparse holes).
542 extents = self.NewExtentList((0, 4), (common.PSEUDO_EXTENT_MARKER, 5),
543 (8, 3))
544 self.assertEquals(
545 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
546 collections.defaultdict(int), 'foo',
547 allow_pseudo=True),
548 12)
549
550 # Passes w/ pseudo-extent due to a signature.
551 extents = self.NewExtentList((common.PSEUDO_EXTENT_MARKER, 2))
552 self.assertEquals(
553 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
554 collections.defaultdict(int), 'foo',
555 allow_signature=True),
556 2)
557
558 # Fails, extent missing a start block.
559 extents = self.NewExtentList((-1, 4), (8, 3), (1024, 16))
560 self.assertRaises(
561 update_payload.PayloadError, payload_checker._CheckExtents,
562 extents, (1024 + 16) * block_size, collections.defaultdict(int),
563 'foo')
564
565 # Fails, extent missing block count.
566 extents = self.NewExtentList((0, -1), (8, 3), (1024, 16))
567 self.assertRaises(
568 update_payload.PayloadError, payload_checker._CheckExtents,
569 extents, (1024 + 16) * block_size, collections.defaultdict(int),
570 'foo')
571
572 # Fails, extent has zero blocks.
573 extents = self.NewExtentList((0, 4), (8, 3), (1024, 0))
574 self.assertRaises(
575 update_payload.PayloadError, payload_checker._CheckExtents,
576 extents, (1024 + 16) * block_size, collections.defaultdict(int),
577 'foo')
578
579 # Fails, extent exceeds partition boundaries.
580 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
581 self.assertRaises(
582 update_payload.PayloadError, payload_checker._CheckExtents,
583 extents, (1024 + 15) * block_size, collections.defaultdict(int),
584 'foo')
585
586 def testCheckReplaceOperation(self):
587 """Tests _CheckReplaceOperation() where op.type == REPLACE."""
588 payload_checker = checker.PayloadChecker(self.MockPayload())
589 block_size = payload_checker.block_size
590 data_length = 10000
591
592 op = self.mox.CreateMock(
593 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
594 op.type = common.OpType.REPLACE
595
596 # Pass.
597 op.src_extents = []
598 self.assertIsNone(
599 payload_checker._CheckReplaceOperation(
600 op, data_length, (data_length + block_size - 1) / block_size,
601 'foo'))
602
603 # Fail, src extents founds.
604 op.src_extents = ['bar']
605 self.assertRaises(
606 update_payload.PayloadError,
607 payload_checker._CheckReplaceOperation,
608 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
609
610 # Fail, missing data.
611 op.src_extents = []
612 self.assertRaises(
613 update_payload.PayloadError,
614 payload_checker._CheckReplaceOperation,
615 op, None, (data_length + block_size - 1) / block_size, 'foo')
616
617 # Fail, length / block number mismatch.
618 op.src_extents = ['bar']
619 self.assertRaises(
620 update_payload.PayloadError,
621 payload_checker._CheckReplaceOperation,
622 op, data_length, (data_length + block_size - 1) / block_size + 1, 'foo')
623
624 def testCheckReplaceBzOperation(self):
625 """Tests _CheckReplaceOperation() where op.type == REPLACE_BZ."""
626 payload_checker = checker.PayloadChecker(self.MockPayload())
627 block_size = payload_checker.block_size
628 data_length = block_size * 3
629
630 op = self.mox.CreateMock(
631 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
632 op.type = common.OpType.REPLACE_BZ
633
634 # Pass.
635 op.src_extents = []
636 self.assertIsNone(
637 payload_checker._CheckReplaceOperation(
638 op, data_length, (data_length + block_size - 1) / block_size + 5,
639 'foo'))
640
641 # Fail, src extents founds.
642 op.src_extents = ['bar']
643 self.assertRaises(
644 update_payload.PayloadError,
645 payload_checker._CheckReplaceOperation,
646 op, data_length, (data_length + block_size - 1) / block_size + 5, 'foo')
647
648 # Fail, missing data.
649 op.src_extents = []
650 self.assertRaises(
651 update_payload.PayloadError,
652 payload_checker._CheckReplaceOperation,
653 op, None, (data_length + block_size - 1) / block_size, 'foo')
654
655 # Fail, too few blocks to justify BZ.
656 op.src_extents = []
657 self.assertRaises(
658 update_payload.PayloadError,
659 payload_checker._CheckReplaceOperation,
660 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
661
662 def testCheckMoveOperation_Pass(self):
663 """Tests _CheckMoveOperation(); pass case."""
664 payload_checker = checker.PayloadChecker(self.MockPayload())
665 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
666 op.type = common.OpType.MOVE
667
668 self.AddToMessage(op.src_extents,
669 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
670 self.AddToMessage(op.dst_extents,
671 self.NewExtentList((16, 128), (512, 6)))
672 self.assertIsNone(
673 payload_checker._CheckMoveOperation(op, None, 134, 134, 'foo'))
674
675 def testCheckMoveOperation_FailContainsData(self):
676 """Tests _CheckMoveOperation(); fails, message contains data."""
677 payload_checker = checker.PayloadChecker(self.MockPayload())
678 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
679 op.type = common.OpType.MOVE
680
681 self.AddToMessage(op.src_extents,
682 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
683 self.AddToMessage(op.dst_extents,
684 self.NewExtentList((16, 128), (512, 6)))
685 self.assertRaises(
686 update_payload.PayloadError,
687 payload_checker._CheckMoveOperation,
688 op, 1024, 134, 134, 'foo')
689
690 def testCheckMoveOperation_FailInsufficientSrcBlocks(self):
691 """Tests _CheckMoveOperation(); fails, not enough actual src blocks."""
692 payload_checker = checker.PayloadChecker(self.MockPayload())
693 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
694 op.type = common.OpType.MOVE
695
696 self.AddToMessage(op.src_extents,
697 self.NewExtentList((0, 4), (12, 2), (1024, 127)))
698 self.AddToMessage(op.dst_extents,
699 self.NewExtentList((16, 128), (512, 6)))
700 self.assertRaises(
701 update_payload.PayloadError,
702 payload_checker._CheckMoveOperation,
703 op, None, 134, 134, 'foo')
704
705 def testCheckMoveOperation_FailInsufficientDstBlocks(self):
706 """Tests _CheckMoveOperation(); fails, not enough actual dst blocks."""
707 payload_checker = checker.PayloadChecker(self.MockPayload())
708 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
709 op.type = common.OpType.MOVE
710
711 self.AddToMessage(op.src_extents,
712 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
713 self.AddToMessage(op.dst_extents,
714 self.NewExtentList((16, 128), (512, 5)))
715 self.assertRaises(
716 update_payload.PayloadError,
717 payload_checker._CheckMoveOperation,
718 op, None, 134, 134, 'foo')
719
720 def testCheckMoveOperation_FailExcessSrcBlocks(self):
721 """Tests _CheckMoveOperation(); fails, too many actual src blocks."""
722 payload_checker = checker.PayloadChecker(self.MockPayload())
723 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
724 op.type = common.OpType.MOVE
725
726 self.AddToMessage(op.src_extents,
727 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
728 self.AddToMessage(op.dst_extents,
729 self.NewExtentList((16, 128), (512, 5)))
730 self.assertRaises(
731 update_payload.PayloadError,
732 payload_checker._CheckMoveOperation,
733 op, None, 134, 134, 'foo')
734 self.AddToMessage(op.src_extents,
735 self.NewExtentList((0, 4), (12, 2), (1024, 129)))
736 self.AddToMessage(op.dst_extents,
737 self.NewExtentList((16, 128), (512, 6)))
738 self.assertRaises(
739 update_payload.PayloadError,
740 payload_checker._CheckMoveOperation,
741 op, None, 134, 134, 'foo')
742
743 def testCheckMoveOperation_FailExcessDstBlocks(self):
744 """Tests _CheckMoveOperation(); fails, too many actual dst blocks."""
745 payload_checker = checker.PayloadChecker(self.MockPayload())
746 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
747 op.type = common.OpType.MOVE
748
749 self.AddToMessage(op.src_extents,
750 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
751 self.AddToMessage(op.dst_extents,
752 self.NewExtentList((16, 128), (512, 7)))
753 self.assertRaises(
754 update_payload.PayloadError,
755 payload_checker._CheckMoveOperation,
756 op, None, 134, 134, 'foo')
757
758 def testCheckMoveOperation_FailStagnantBlocks(self):
759 """Tests _CheckMoveOperation(); fails, there are blocks that do not move."""
760 payload_checker = checker.PayloadChecker(self.MockPayload())
761 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
762 op.type = common.OpType.MOVE
763
764 self.AddToMessage(op.src_extents,
765 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
766 self.AddToMessage(op.dst_extents,
767 self.NewExtentList((8, 128), (512, 6)))
768 self.assertRaises(
769 update_payload.PayloadError,
770 payload_checker._CheckMoveOperation,
771 op, None, 134, 134, 'foo')
772
773 def testCheckBsdiff(self):
774 """Tests _CheckMoveOperation()."""
775 payload_checker = checker.PayloadChecker(self.MockPayload())
776
777 # Pass.
778 self.assertIsNone(
779 payload_checker._CheckBsdiffOperation(10000, 3, 'foo'))
780
781 # Fail, missing data blob.
782 self.assertRaises(
783 update_payload.PayloadError,
784 payload_checker._CheckBsdiffOperation,
785 None, 3, 'foo')
786
787 # Fail, too big of a diff blob (unjustified).
788 self.assertRaises(
789 update_payload.PayloadError,
790 payload_checker._CheckBsdiffOperation,
791 10000, 2, 'foo')
792
793 def DoCheckOperationTest(self, op_type_name, is_last, allow_signature,
794 allow_unhashed, fail_src_extents, fail_dst_extents,
795 fail_mismatched_data_offset_length,
796 fail_missing_dst_extents, fail_src_length,
797 fail_dst_length, fail_data_hash,
798 fail_prev_data_offset):
799 """Parametric testing of _CheckOperation().
800
801 Args:
802 op_type_name: 'REPLACE', 'REPLACE_BZ', 'MOVE' or 'BSDIFF'
803 is_last: whether we're testing the last operation in a sequence
804 allow_signature: whether we're testing a signature-capable operation
805 allow_unhashed: whether we're allowing to not hash the data
806 fail_src_extents: tamper with src extents
807 fail_dst_extents: tamper with dst extents
808 fail_mismatched_data_offset_length: make data_{offset,length} inconsistent
809 fail_missing_dst_extents: do not include dst extents
810 fail_src_length: make src length inconsistent
811 fail_dst_length: make dst length inconsistent
812 fail_data_hash: tamper with the data blob hash
813 fail_prev_data_offset: make data space uses incontiguous
814
815 """
816 op_type = _OpTypeByName(op_type_name)
817
818 # Create the test object.
819 payload = self.MockPayload()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700820 payload_checker = checker.PayloadChecker(payload,
821 allow_unhashed=allow_unhashed)
Gilad Arnold5502b562013-03-08 13:22:31 -0800822 block_size = payload_checker.block_size
823
824 # Create auxiliary arguments.
825 old_part_size = _MiB(4)
826 new_part_size = _MiB(8)
827 old_block_counters = array.array(
828 'B', [0] * ((old_part_size + block_size - 1) / block_size))
829 new_block_counters = array.array(
830 'B', [0] * ((new_part_size + block_size - 1) / block_size))
831 prev_data_offset = 1876
832 blob_hash_counts = collections.defaultdict(int)
833
834 # Create the operation object for the test.
835 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
836 op.type = op_type
837
838 total_src_blocks = 0
839 if op_type in (common.OpType.MOVE, common.OpType.BSDIFF):
840 if fail_src_extents:
841 self.AddToMessage(op.src_extents,
842 self.NewExtentList((0, 0)))
843 else:
844 self.AddToMessage(op.src_extents,
845 self.NewExtentList((0, 16)))
846 total_src_blocks = 16
847
848 if op_type != common.OpType.MOVE:
849 if not fail_mismatched_data_offset_length:
850 op.data_length = 16 * block_size - 8
851 if fail_prev_data_offset:
852 op.data_offset = prev_data_offset + 16
853 else:
854 op.data_offset = prev_data_offset
855
856 fake_data = 'fake-data'.ljust(op.data_length)
857 if not (allow_unhashed or (is_last and allow_signature and
858 op_type == common.OpType.REPLACE)):
859 if not fail_data_hash:
860 # Create a valid data blob hash.
861 op.data_sha256_hash = hashlib.sha256(fake_data).digest()
862 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
863 fake_data)
864 elif fail_data_hash:
865 # Create an invalid data blob hash.
866 op.data_sha256_hash = hashlib.sha256(
867 fake_data.replace(' ', '-')).digest()
868 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
869 fake_data)
870
871 total_dst_blocks = 0
872 if not fail_missing_dst_extents:
873 total_dst_blocks = 16
874 if fail_dst_extents:
875 self.AddToMessage(op.dst_extents,
876 self.NewExtentList((4, 16), (32, 0)))
877 else:
878 self.AddToMessage(op.dst_extents,
879 self.NewExtentList((4, 8), (64, 8)))
880
881 if total_src_blocks:
882 if fail_src_length:
883 op.src_length = total_src_blocks * block_size + 8
884 else:
885 op.src_length = total_src_blocks * block_size
886 elif fail_src_length:
887 # Add an orphaned src_length.
888 op.src_length = 16
889
890 if total_dst_blocks:
891 if fail_dst_length:
892 op.dst_length = total_dst_blocks * block_size + 8
893 else:
894 op.dst_length = total_dst_blocks * block_size
895
896 self.mox.ReplayAll()
897 should_fail = (fail_src_extents or fail_dst_extents or
898 fail_mismatched_data_offset_length or
899 fail_missing_dst_extents or fail_src_length or
900 fail_dst_length or fail_data_hash or fail_prev_data_offset)
901 largs = [op, 'foo', is_last, old_block_counters, new_block_counters,
902 old_part_size, new_part_size, prev_data_offset, allow_signature,
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700903 blob_hash_counts]
Gilad Arnold5502b562013-03-08 13:22:31 -0800904 if should_fail:
905 self.assertRaises(update_payload.PayloadError,
906 payload_checker._CheckOperation, *largs)
907 else:
908 self.assertEqual(payload_checker._CheckOperation(*largs),
909 op.data_length if op.HasField('data_length') else 0)
910
911 def testAllocBlockCounters(self):
912 """Tests _CheckMoveOperation()."""
913 payload_checker = checker.PayloadChecker(self.MockPayload())
914 block_size = payload_checker.block_size
915
916 # Check allocation for block-aligned partition size, ensure it's integers.
917 result = payload_checker._AllocBlockCounters(16 * block_size)
918 self.assertEqual(len(result), 16)
919 self.assertEqual(type(result[0]), int)
920
921 # Check allocation of unaligned partition sizes.
922 result = payload_checker._AllocBlockCounters(16 * block_size - 1)
923 self.assertEqual(len(result), 16)
924 result = payload_checker._AllocBlockCounters(16 * block_size + 1)
925 self.assertEqual(len(result), 17)
926
927 def DoCheckOperationsTest(self, fail_bad_type,
928 fail_nonexhaustive_full_update):
929 # Generate a test payload. For this test, we only care about one
930 # (arbitrary) set of operations, so we'll only be generating kernel and
931 # test with them.
932 payload_gen = test_utils.PayloadGenerator()
933
934 block_size = _KiB(4)
935 payload_gen.SetBlockSize(block_size)
936
937 rootfs_part_size = _MiB(8)
938
939 # Fake rootfs operations in a full update, tampered with as required.
940 rootfs_op_type = common.OpType.REPLACE
941 if fail_bad_type:
942 # Choose a type value that's bigger than the highest valid value.
943 for valid_op_type in common.OpType.ALL:
944 rootfs_op_type = max(rootfs_op_type, valid_op_type)
945 rootfs_op_type += 1
946
947 rootfs_data_length = rootfs_part_size
948 if fail_nonexhaustive_full_update:
949 rootfs_data_length -= block_size
950
951 payload_gen.AddOperation(False, rootfs_op_type,
952 dst_extents=[(0, rootfs_data_length / block_size)],
953 data_offset=0,
954 data_length=rootfs_data_length)
955
956 # Create the test object.
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700957 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile,
958 checker_init_dargs={
959 'allow_unhashed': True})
Gilad Arnold5502b562013-03-08 13:22:31 -0800960 payload_checker.payload_type = checker._TYPE_FULL
961 report = checker._PayloadReport()
962
963 should_fail = (fail_bad_type or fail_nonexhaustive_full_update)
964 largs = (payload_checker.payload.manifest.install_operations, report,
Gilad Arnold382df5c2013-05-03 12:49:28 -0700965 'foo', 0, rootfs_part_size, rootfs_part_size, 0, False)
Gilad Arnold5502b562013-03-08 13:22:31 -0800966 if should_fail:
967 self.assertRaises(update_payload.PayloadError,
968 payload_checker._CheckOperations, *largs)
969 else:
970 self.assertEqual(payload_checker._CheckOperations(*largs),
971 rootfs_data_length)
972
973 def DoCheckSignaturesTest(self, fail_empty_sigs_blob, fail_missing_pseudo_op,
974 fail_mismatched_pseudo_op, fail_sig_missing_fields,
975 fail_unknown_sig_version, fail_incorrect_sig):
976 # Generate a test payload. For this test, we only care about the signature
977 # block and how it relates to the payload hash. Therefore, we're generating
978 # a random (otherwise useless) payload for this purpose.
979 payload_gen = test_utils.EnhancedPayloadGenerator()
980 block_size = _KiB(4)
981 payload_gen.SetBlockSize(block_size)
982 rootfs_part_size = _MiB(2)
Gilad Arnold382df5c2013-05-03 12:49:28 -0700983 kernel_part_size = _KiB(16)
Gilad Arnold5502b562013-03-08 13:22:31 -0800984 payload_gen.SetPartInfo(False, True, rootfs_part_size,
985 hashlib.sha256('fake-new-rootfs-content').digest())
Gilad Arnold382df5c2013-05-03 12:49:28 -0700986 payload_gen.SetPartInfo(True, True, kernel_part_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800987 hashlib.sha256('fake-new-kernel-content').digest())
988 payload_gen.AddOperationWithData(
989 False, common.OpType.REPLACE,
990 dst_extents=[(0, rootfs_part_size / block_size)],
991 data_blob=os.urandom(rootfs_part_size))
992
993 do_forge_pseudo_op = (fail_missing_pseudo_op or fail_mismatched_pseudo_op)
994 do_forge_sigs_data = (do_forge_pseudo_op or fail_empty_sigs_blob or
995 fail_sig_missing_fields or fail_unknown_sig_version
996 or fail_incorrect_sig)
997
998 sigs_data = None
999 if do_forge_sigs_data:
1000 sigs_gen = test_utils.SignaturesGenerator()
1001 if not fail_empty_sigs_blob:
1002 if fail_sig_missing_fields:
1003 sig_data = None
1004 else:
1005 sig_data = test_utils.SignSha256('fake-payload-content',
1006 _PRIVKEY_FILE_NAME)
1007 sigs_gen.AddSig(5 if fail_unknown_sig_version else 1, sig_data)
1008
1009 sigs_data = sigs_gen.ToBinary()
1010 payload_gen.SetSignatures(payload_gen.curr_offset, len(sigs_data))
1011
1012 if do_forge_pseudo_op:
1013 assert sigs_data is not None, 'should have forged signatures blob by now'
1014 sigs_len = len(sigs_data)
1015 payload_gen.AddOperation(
1016 False, common.OpType.REPLACE,
1017 data_offset=payload_gen.curr_offset / 2,
1018 data_length=sigs_len / 2,
1019 dst_extents=[(0, (sigs_len / 2 + block_size - 1) / block_size)])
1020
1021 # Generate payload (complete w/ signature) and create the test object.
1022 payload_checker = _GetPayloadChecker(
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001023 payload_gen.WriteToFileWithData,
1024 payload_gen_dargs={
1025 'sigs_data': sigs_data,
1026 'privkey_file_name': _PRIVKEY_FILE_NAME,
1027 'do_add_pseudo_operation': not do_forge_pseudo_op})
Gilad Arnold5502b562013-03-08 13:22:31 -08001028 payload_checker.payload_type = checker._TYPE_FULL
1029 report = checker._PayloadReport()
1030
1031 # We have to check the manifest first in order to set signature attributes.
Gilad Arnold382df5c2013-05-03 12:49:28 -07001032 payload_checker._CheckManifest(report, rootfs_part_size, kernel_part_size)
Gilad Arnold5502b562013-03-08 13:22:31 -08001033
1034 should_fail = (fail_empty_sigs_blob or fail_missing_pseudo_op or
1035 fail_mismatched_pseudo_op or fail_sig_missing_fields or
1036 fail_unknown_sig_version or fail_incorrect_sig)
1037 largs = (report, _PUBKEY_FILE_NAME)
1038 if should_fail:
1039 self.assertRaises(update_payload.PayloadError,
1040 payload_checker._CheckSignatures, *largs)
1041 else:
1042 self.assertIsNone(payload_checker._CheckSignatures(*largs))
1043
1044 def DoRunTest(self, fail_wrong_payload_type, fail_invalid_block_size,
1045 fail_mismatched_block_size, fail_excess_data):
1046 # Generate a test payload. For this test, we generate a full update that
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001047 # has sample kernel and rootfs operations. Since most testing is done with
Gilad Arnold5502b562013-03-08 13:22:31 -08001048 # internal PayloadChecker methods that are tested elsewhere, here we only
1049 # tamper with what's actually being manipulated and/or tested in the Run()
1050 # method itself. Note that the checker doesn't verify partition hashes, so
1051 # they're safe to fake.
1052 payload_gen = test_utils.EnhancedPayloadGenerator()
1053 block_size = _KiB(4)
1054 payload_gen.SetBlockSize(block_size)
1055 kernel_part_size = _KiB(16)
1056 rootfs_part_size = _MiB(2)
1057 payload_gen.SetPartInfo(False, True, rootfs_part_size,
1058 hashlib.sha256('fake-new-rootfs-content').digest())
1059 payload_gen.SetPartInfo(True, True, kernel_part_size,
1060 hashlib.sha256('fake-new-kernel-content').digest())
1061 payload_gen.AddOperationWithData(
1062 False, common.OpType.REPLACE,
1063 dst_extents=[(0, rootfs_part_size / block_size)],
1064 data_blob=os.urandom(rootfs_part_size))
1065 payload_gen.AddOperationWithData(
1066 True, common.OpType.REPLACE,
1067 dst_extents=[(0, kernel_part_size / block_size)],
1068 data_blob=os.urandom(kernel_part_size))
1069
1070 # Generate payload (complete w/ signature) and create the test object.
Gilad Arnold5502b562013-03-08 13:22:31 -08001071 if fail_invalid_block_size:
1072 use_block_size = block_size + 5 # not a power of two
1073 elif fail_mismatched_block_size:
1074 use_block_size = block_size * 2 # different that payload stated
1075 else:
1076 use_block_size = block_size
Gilad Arnold5502b562013-03-08 13:22:31 -08001077
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001078 dargs = {
1079 'payload_gen_dargs': {
1080 'privkey_file_name': _PRIVKEY_FILE_NAME,
1081 'do_add_pseudo_operation': True,
1082 'is_pseudo_in_kernel': True,
1083 'padding': os.urandom(1024) if fail_excess_data else None},
1084 'checker_init_dargs': {
1085 'assert_type': 'delta' if fail_wrong_payload_type else 'full',
1086 'block_size': use_block_size}}
1087 if fail_invalid_block_size:
1088 self.assertRaises(update_payload.PayloadError, _GetPayloadChecker,
1089 payload_gen.WriteToFileWithData, **dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -08001090 else:
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001091 payload_checker = _GetPayloadChecker(payload_gen.WriteToFileWithData,
1092 **dargs)
1093 dargs = {'pubkey_file_name': _PUBKEY_FILE_NAME}
1094 should_fail = (fail_wrong_payload_type or fail_mismatched_block_size or
1095 fail_excess_data)
1096 if should_fail:
1097 self.assertRaises(update_payload.PayloadError,
1098 payload_checker.Run, **dargs)
1099 else:
1100 self.assertIsNone(payload_checker.Run(**dargs))
Gilad Arnold5502b562013-03-08 13:22:31 -08001101
1102
1103# This implements a generic API, hence the occasional unused args.
1104# pylint: disable=W0613
1105def ValidateCheckOperationTest(op_type_name, is_last, allow_signature,
1106 allow_unhashed, fail_src_extents,
1107 fail_dst_extents,
1108 fail_mismatched_data_offset_length,
1109 fail_missing_dst_extents, fail_src_length,
1110 fail_dst_length, fail_data_hash,
1111 fail_prev_data_offset):
1112 """Returns True iff the combination of arguments represents a valid test."""
1113 op_type = _OpTypeByName(op_type_name)
1114
1115 # REPLACE/REPLACE_BZ operations don't read data from src partition.
1116 if (op_type in (common.OpType.REPLACE, common.OpType.REPLACE_BZ) and (
1117 fail_src_extents or fail_src_length)):
1118 return False
1119
1120 # MOVE operations don't carry data.
1121 if (op_type == common.OpType.MOVE and (
1122 fail_mismatched_data_offset_length or fail_data_hash or
1123 fail_prev_data_offset)):
1124 return False
1125
1126 return True
1127
1128
1129def TestMethodBody(run_method_name, run_dargs):
1130 """Returns a function that invokes a named method with named arguments."""
1131 return lambda self: getattr(self, run_method_name)(**run_dargs)
1132
1133
1134def AddParametricTests(tested_method_name, arg_space, validate_func=None):
1135 """Enumerates and adds specific parametric tests to PayloadCheckerTest.
1136
1137 This function enumerates a space of test parameters (defined by arg_space),
1138 then binds a new, unique method name in PayloadCheckerTest to a test function
1139 that gets handed the said parameters. This is a preferable approach to doing
1140 the enumeration and invocation during the tests because this way each test is
1141 treated as a complete run by the unittest framework, and so benefits from the
1142 usual setUp/tearDown mechanics.
1143
1144 Args:
1145 tested_method_name: name of the tested PayloadChecker method
1146 arg_space: a dictionary containing variables (keys) and lists of values
1147 (values) associated with them
1148 validate_func: a function used for validating test argument combinations
1149
1150 """
1151 for value_tuple in itertools.product(*arg_space.itervalues()):
1152 run_dargs = dict(zip(arg_space.iterkeys(), value_tuple))
1153 if validate_func and not validate_func(**run_dargs):
1154 continue
1155 run_method_name = 'Do%sTest' % tested_method_name
1156 test_method_name = 'test%s' % tested_method_name
1157 for arg_key, arg_val in run_dargs.iteritems():
1158 if arg_val or type(arg_val) is int:
1159 test_method_name += '__%s=%s' % (arg_key, arg_val)
1160 setattr(PayloadCheckerTest, test_method_name,
1161 TestMethodBody(run_method_name, run_dargs))
1162
1163
1164def AddAllParametricTests():
1165 """Enumerates and adds all parametric tests to PayloadCheckerTest."""
1166 # Add all _CheckElem() test cases.
1167 AddParametricTests('AddElem',
1168 {'linebreak': (True, False),
1169 'indent': (0, 1, 2),
1170 'convert': (str, lambda s: s[::-1]),
1171 'is_present': (True, False),
1172 'is_mandatory': (True, False),
1173 'is_submsg': (True, False)})
1174
1175 # Add all _Add{Mandatory,Optional}Field tests.
1176 AddParametricTests('AddField',
1177 {'is_mandatory': (True, False),
1178 'linebreak': (True, False),
1179 'indent': (0, 1, 2),
1180 'convert': (str, lambda s: s[::-1]),
1181 'is_present': (True, False)})
1182
1183 # Add all _Add{Mandatory,Optional}SubMsg tests.
1184 AddParametricTests('AddSubMsg',
1185 {'is_mandatory': (True, False),
1186 'is_present': (True, False)})
1187
1188 # Add all _CheckManifest() test cases.
1189 AddParametricTests('CheckManifest',
1190 {'fail_mismatched_block_size': (True, False),
1191 'fail_bad_sigs': (True, False),
1192 'fail_mismatched_oki_ori': (True, False),
1193 'fail_bad_oki': (True, False),
1194 'fail_bad_ori': (True, False),
1195 'fail_bad_nki': (True, False),
1196 'fail_bad_nri': (True, False),
Gilad Arnold382df5c2013-05-03 12:49:28 -07001197 'fail_missing_ops': (True, False),
1198 'fail_old_kernel_fs_size': (True, False),
1199 'fail_old_rootfs_fs_size': (True, False),
1200 'fail_new_kernel_fs_size': (True, False),
1201 'fail_new_rootfs_fs_size': (True, False)})
Gilad Arnold5502b562013-03-08 13:22:31 -08001202
1203 # Add all _CheckOperation() test cases.
1204 AddParametricTests('CheckOperation',
1205 {'op_type_name': ('REPLACE', 'REPLACE_BZ', 'MOVE',
1206 'BSDIFF'),
1207 'is_last': (True, False),
1208 'allow_signature': (True, False),
1209 'allow_unhashed': (True, False),
1210 'fail_src_extents': (True, False),
1211 'fail_dst_extents': (True, False),
1212 'fail_mismatched_data_offset_length': (True, False),
1213 'fail_missing_dst_extents': (True, False),
1214 'fail_src_length': (True, False),
1215 'fail_dst_length': (True, False),
1216 'fail_data_hash': (True, False),
1217 'fail_prev_data_offset': (True, False)},
1218 validate_func=ValidateCheckOperationTest)
1219
1220 # Add all _CheckOperations() test cases.
1221 AddParametricTests('CheckOperations',
1222 {'fail_bad_type': (True, False),
1223 'fail_nonexhaustive_full_update': (True, False)})
1224
1225 # Add all _CheckOperations() test cases.
1226 AddParametricTests('CheckSignatures',
1227 {'fail_empty_sigs_blob': (True, False),
1228 'fail_missing_pseudo_op': (True, False),
1229 'fail_mismatched_pseudo_op': (True, False),
1230 'fail_sig_missing_fields': (True, False),
1231 'fail_unknown_sig_version': (True, False),
1232 'fail_incorrect_sig': (True, False)})
1233
1234 # Add all Run() test cases.
1235 AddParametricTests('Run',
1236 {'fail_wrong_payload_type': (True, False),
1237 'fail_invalid_block_size': (True, False),
1238 'fail_mismatched_block_size': (True, False),
1239 'fail_excess_data': (True, False)})
1240
1241
1242if __name__ == '__main__':
1243 AddAllParametricTests()
1244 unittest.main()