blob: 681a920284d4188d4637aaf4779125324dac7a85 [file] [log] [blame]
Gilad Arnold5502b562013-03-08 13:22:31 -08001#!/usr/bin/python
2#
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Unit testing checker.py."""
8
9import array
10import collections
11import cStringIO
12import hashlib
13import itertools
14import os
15import unittest
16
17# Pylint cannot find mox.
18# pylint: disable=F0401
19import mox
20
21import checker
22import common
23import payload as update_payload # avoid name conflicts later.
24import test_utils
25import update_metadata_pb2
26
27
28_PRIVKEY_FILE_NAME = 'payload-test-key.pem'
29_PUBKEY_FILE_NAME = 'payload-test-key.pub'
30
31
32def _OpTypeByName(op_name):
33 op_name_to_type = {
34 'REPLACE': common.OpType.REPLACE,
35 'REPLACE_BZ': common.OpType.REPLACE_BZ,
36 'MOVE': common.OpType.MOVE,
37 'BSDIFF': common.OpType.BSDIFF,
38 }
39 return op_name_to_type[op_name]
40
41
42def _KiB(count):
43 """Return the byte size of a given number of (binary) kilobytes."""
44 return count << 10
45
46
47def _MiB(count):
48 """Return the byte size of a given number of (binary) megabytes."""
49 return count << 20
50
51
52def _GiB(count):
53 """Return the byte size of a given number of (binary) gigabytes."""
54 return count << 30
55
56
57def _GetPayloadChecker(payload_gen_write_to_file_func, *largs, **dargs):
58 """Returns a payload checker from a given payload generator."""
59 payload_file = cStringIO.StringIO()
60 payload_gen_write_to_file_func(payload_file, *largs, **dargs)
61 payload_file.seek(0)
62 payload = update_payload.Payload(payload_file)
63 payload.Init()
64 return checker.PayloadChecker(payload)
65
66
67def _GetPayloadCheckerWithData(payload_gen):
68 """Returns a payload checker from a given payload generator."""
69 payload_file = cStringIO.StringIO()
70 payload_gen.WriteToFile(payload_file)
71 payload_file.seek(0)
72 payload = update_payload.Payload(payload_file)
73 payload.Init()
74 return checker.PayloadChecker(payload)
75
76
77# (i) this class doesn't need an __init__(); (ii) unit testing is all about
78# running protected methods; (iii) don't bark about missing members of classes
79# you cannot import.
80# pylint: disable=W0232
81# pylint: disable=W0212
82# pylint: disable=E1101
83class PayloadCheckerTest(mox.MoxTestBase):
84 """Tests the PayloadChecker class.
85
86 In addition to ordinary testFoo() methods, which are automatically invoked by
87 the unittest framework, in this class we make use of DoBarTest() calls that
88 implement parametric tests of certain features. In order to invoke each test,
89 which embodies a unique combination of parameter values, as a complete unit
90 test, we perform explicit enumeration of the parameter space and create
91 individual invocation contexts for each, which are then bound as
92 testBar__param1=val1__param2=val2(). The enumeration of parameter spaces for
93 all such tests is done in AddAllParametricTests().
94
95 """
96
97 def MockPayload(self):
98 """Create a mock payload object, complete with a mock menifest."""
99 payload = self.mox.CreateMock(update_payload.Payload)
100 payload.is_init = True
101 payload.manifest = self.mox.CreateMock(
102 update_metadata_pb2.DeltaArchiveManifest)
103 return payload
104
105 @staticmethod
106 def NewExtent(start_block, num_blocks):
107 """Returns an Extent message.
108
109 Each of the provided fields is set iff it is >= 0; otherwise, it's left at
110 its default state.
111
112 Args:
113 start_block: the starting block of the extent
114 num_blocks: the number of blocks in the extent
115 Returns:
116 An Extent message.
117
118 """
119 ex = update_metadata_pb2.Extent()
120 if start_block >= 0:
121 ex.start_block = start_block
122 if num_blocks >= 0:
123 ex.num_blocks = num_blocks
124 return ex
125
126 @staticmethod
127 def NewExtentList(*args):
128 """Returns an list of extents.
129
130 Args:
131 *args: (start_block, num_blocks) pairs defining the extents
132 Returns:
133 A list of Extent objects.
134
135 """
136 ex_list = []
137 for start_block, num_blocks in args:
138 ex_list.append(PayloadCheckerTest.NewExtent(start_block, num_blocks))
139 return ex_list
140
141 @staticmethod
142 def AddToMessage(repeated_field, field_vals):
143 for field_val in field_vals:
144 new_field = repeated_field.add()
145 new_field.CopyFrom(field_val)
146
147 def assertIsNone(self, val):
148 """Asserts that val is None (TODO remove once we upgrade to Python 2.7).
149
150 Note that we're using assertEqual so as for it to show us the actual
151 non-None value.
152
153 Args:
154 val: value/object to be equated to None
155
156 """
157 self.assertEqual(val, None)
158
159 def SetupAddElemTest(self, is_present, is_submsg, convert=str,
160 linebreak=False, indent=0):
161 """Setup for testing of _CheckElem() and its derivatives.
162
163 Args:
164 is_present: whether or not the element is found in the message
165 is_submsg: whether the element is a sub-message itself
166 convert: a representation conversion function
167 linebreak: whether or not a linebreak is to be used in the report
168 indent: indentation used for the report
169 Returns:
170 msg: a mock message object
171 report: a mock report object
172 subreport: a mock sub-report object
173 name: an element name to check
174 val: expected element value
175
176 """
177 name = 'foo'
178 val = 'fake submsg' if is_submsg else 'fake field'
179 subreport = 'fake subreport'
180
181 # Create a mock message.
182 msg = self.mox.CreateMock(update_metadata_pb2.message.Message)
183 msg.HasField(name).AndReturn(is_present)
184 setattr(msg, name, val)
185
186 # Create a mock report.
187 report = self.mox.CreateMock(checker._PayloadReport)
188 if is_present:
189 if is_submsg:
190 report.AddSubReport(name).AndReturn(subreport)
191 else:
192 report.AddField(name, convert(val), linebreak=linebreak, indent=indent)
193
194 self.mox.ReplayAll()
195 return (msg, report, subreport, name, val)
196
197 def DoAddElemTest(self, is_present, is_mandatory, is_submsg, convert,
198 linebreak, indent):
199 """Parametric testing of _CheckElem().
200
201 Args:
202 is_present: whether or not the element is found in the message
203 is_mandatory: whether or not it's a mandatory element
204 is_submsg: whether the element is a sub-message itself
205 convert: a representation conversion function
206 linebreak: whether or not a linebreak is to be used in the report
207 indent: indentation used for the report
208
209 """
210 msg, report, subreport, name, val = self.SetupAddElemTest(
211 is_present, is_submsg, convert, linebreak, indent)
212
213 largs = [msg, name, report, is_mandatory, is_submsg]
214 dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
215 if is_mandatory and not is_present:
216 self.assertRaises(update_payload.PayloadError,
217 checker.PayloadChecker._CheckElem, *largs, **dargs)
218 else:
219 ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*largs,
220 **dargs)
221 self.assertEquals(ret_val, val if is_present else None)
222 self.assertEquals(ret_subreport,
223 subreport if is_present and is_submsg else None)
224
225 def DoAddFieldTest(self, is_mandatory, is_present, convert, linebreak,
226 indent):
227 """Parametric testing of _Check{Mandatory,Optional}Field().
228
229 Args:
230 is_mandatory: whether we're testing a mandatory call
231 is_present: whether or not the element is found in the message
232 convert: a representation conversion function
233 linebreak: whether or not a linebreak is to be used in the report
234 indent: indentation used for the report
235
236 """
237 msg, report, _, name, val = self.SetupAddElemTest(
238 is_present, False, convert, linebreak, indent)
239
240 # Prepare for invocation of the tested method.
241 largs = [msg, name, report]
242 dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
243 if is_mandatory:
244 largs.append('bar')
245 tested_func = checker.PayloadChecker._CheckMandatoryField
246 else:
247 tested_func = checker.PayloadChecker._CheckOptionalField
248
249 # Test the method call.
250 if is_mandatory and not is_present:
251 self.assertRaises(update_payload.PayloadError, tested_func, *largs,
252 **dargs)
253 else:
254 ret_val = tested_func(*largs, **dargs)
255 self.assertEquals(ret_val, val if is_present else None)
256
257 def DoAddSubMsgTest(self, is_mandatory, is_present):
258 """Parametrized testing of _Check{Mandatory,Optional}SubMsg().
259
260 Args:
261 is_mandatory: whether we're testing a mandatory call
262 is_present: whether or not the element is found in the message
263
264 """
265 msg, report, subreport, name, val = self.SetupAddElemTest(is_present, True)
266
267 # Prepare for invocation of the tested method.
268 largs = [msg, name, report]
269 if is_mandatory:
270 largs.append('bar')
271 tested_func = checker.PayloadChecker._CheckMandatorySubMsg
272 else:
273 tested_func = checker.PayloadChecker._CheckOptionalSubMsg
274
275 # Test the method call.
276 if is_mandatory and not is_present:
277 self.assertRaises(update_payload.PayloadError, tested_func, *largs)
278 else:
279 ret_val, ret_subreport = tested_func(*largs)
280 self.assertEquals(ret_val, val if is_present else None)
281 self.assertEquals(ret_subreport, subreport if is_present else None)
282
283 def testCheckPresentIff(self):
284 """Tests _CheckPresentIff()."""
285 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
286 None, None, 'foo', 'bar', 'baz'))
287 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
288 'a', 'b', 'foo', 'bar', 'baz'))
289 self.assertRaises(update_payload.PayloadError,
290 checker.PayloadChecker._CheckPresentIff,
291 'a', None, 'foo', 'bar', 'baz')
292 self.assertRaises(update_payload.PayloadError,
293 checker.PayloadChecker._CheckPresentIff,
294 None, 'b', 'foo', 'bar', 'baz')
295
296 def DoCheckSha256SignatureTest(self, expect_pass, expect_subprocess_call,
297 sig_data, sig_asn1_header,
298 returned_signed_hash, expected_signed_hash):
299 """Parametric testing of _CheckSha256SignatureTest().
300
301 Args:
302 expect_pass: whether or not it should pass
303 expect_subprocess_call: whether to expect the openssl call to happen
304 sig_data: the signature raw data
305 sig_asn1_header: the ASN1 header
306 returned_signed_hash: the signed hash data retuned by openssl
307 expected_signed_hash: the signed hash data to compare against
308
309 """
310 # Stub out the subprocess invocation.
311 self.mox.StubOutWithMock(checker.PayloadChecker, '_Run')
312 if expect_subprocess_call:
313 checker.PayloadChecker._Run(mox.IsA(list), send_data=sig_data).AndReturn(
314 (sig_asn1_header + returned_signed_hash, None))
315
316 self.mox.ReplayAll()
317 if expect_pass:
318 self.assertIsNone(checker.PayloadChecker._CheckSha256Signature(
319 sig_data, 'foo', expected_signed_hash, 'bar'))
320 else:
321 self.assertRaises(update_payload.PayloadError,
322 checker.PayloadChecker._CheckSha256Signature,
323 sig_data, 'foo', expected_signed_hash, 'bar')
324
325 self.mox.UnsetStubs()
326
327 def testCheckSha256Signature_Pass(self):
328 """Tests _CheckSha256Signature(); pass case."""
329 sig_data = 'fake-signature'.ljust(256)
330 signed_hash = hashlib.sha256('fake-data').digest()
331 self.DoCheckSha256SignatureTest(True, True, sig_data,
332 common.SIG_ASN1_HEADER, signed_hash,
333 signed_hash)
334
335 def testCheckSha256Signature_FailBadSignature(self):
336 """Tests _CheckSha256Signature(); fails due to malformed signature."""
337 sig_data = 'fake-signature' # malformed (not 256 bytes in length)
338 signed_hash = hashlib.sha256('fake-data').digest()
339 self.DoCheckSha256SignatureTest(False, False, sig_data,
340 common.SIG_ASN1_HEADER, signed_hash,
341 signed_hash)
342
343 def testCheckSha256Signature_FailBadOutputLength(self):
344 """Tests _CheckSha256Signature(); fails due to unexpected output length."""
345 sig_data = 'fake-signature'.ljust(256)
346 signed_hash = 'fake-hash' # malformed (not 32 bytes in length)
347 self.DoCheckSha256SignatureTest(False, True, sig_data,
348 common.SIG_ASN1_HEADER, signed_hash,
349 signed_hash)
350
351 def testCheckSha256Signature_FailBadAsnHeader(self):
352 """Tests _CheckSha256Signature(); fails due to bad ASN1 header."""
353 sig_data = 'fake-signature'.ljust(256)
354 signed_hash = hashlib.sha256('fake-data').digest()
355 bad_asn1_header = 'bad-asn-header'.ljust(len(common.SIG_ASN1_HEADER))
356 self.DoCheckSha256SignatureTest(False, True, sig_data, bad_asn1_header,
357 signed_hash, signed_hash)
358
359 def testCheckSha256Signature_FailBadHash(self):
360 """Tests _CheckSha256Signature(); fails due to bad hash returned."""
361 sig_data = 'fake-signature'.ljust(256)
362 expected_signed_hash = hashlib.sha256('fake-data').digest()
363 returned_signed_hash = hashlib.sha256('bad-fake-data').digest()
364 self.DoCheckSha256SignatureTest(False, True, sig_data,
365 common.SIG_ASN1_HEADER,
366 expected_signed_hash, returned_signed_hash)
367
368 def testCheckBlocksFitLength_Pass(self):
369 """Tests _CheckBlocksFitLength(); pass case."""
370 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
371 64, 4, 16, 'foo'))
372 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
373 60, 4, 16, 'foo'))
374 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
375 49, 4, 16, 'foo'))
376 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
377 48, 3, 16, 'foo'))
378
379 def testCheckBlocksFitLength_TooManyBlocks(self):
380 """Tests _CheckBlocksFitLength(); fails due to excess blocks."""
381 self.assertRaises(update_payload.PayloadError,
382 checker.PayloadChecker._CheckBlocksFitLength,
383 64, 5, 16, 'foo')
384 self.assertRaises(update_payload.PayloadError,
385 checker.PayloadChecker._CheckBlocksFitLength,
386 60, 5, 16, 'foo')
387 self.assertRaises(update_payload.PayloadError,
388 checker.PayloadChecker._CheckBlocksFitLength,
389 49, 5, 16, 'foo')
390 self.assertRaises(update_payload.PayloadError,
391 checker.PayloadChecker._CheckBlocksFitLength,
392 48, 4, 16, 'foo')
393
394 def testCheckBlocksFitLength_TooFewBlocks(self):
395 """Tests _CheckBlocksFitLength(); fails due to insufficient blocks."""
396 self.assertRaises(update_payload.PayloadError,
397 checker.PayloadChecker._CheckBlocksFitLength,
398 64, 3, 16, 'foo')
399 self.assertRaises(update_payload.PayloadError,
400 checker.PayloadChecker._CheckBlocksFitLength,
401 60, 3, 16, 'foo')
402 self.assertRaises(update_payload.PayloadError,
403 checker.PayloadChecker._CheckBlocksFitLength,
404 49, 3, 16, 'foo')
405 self.assertRaises(update_payload.PayloadError,
406 checker.PayloadChecker._CheckBlocksFitLength,
407 48, 2, 16, 'foo')
408
409 def DoCheckManifestTest(self, fail_mismatched_block_size, fail_bad_sigs,
410 fail_mismatched_oki_ori, fail_bad_oki, fail_bad_ori,
411 fail_bad_nki, fail_bad_nri, fail_missing_ops):
412 """Parametric testing of _CheckManifest().
413
414 Args:
415 fail_mismatched_block_size: simulate a missing block_size field
416 fail_bad_sigs: make signatures descriptor inconsistent
417 fail_mismatched_oki_ori: make old rootfs/kernel info partially present
418 fail_bad_oki: tamper with old kernel info
419 fail_bad_ori: tamper with old rootfs info
420 fail_bad_nki: tamper with new kernel info
421 fail_bad_nri: tamper with new rootfs info
422 fail_missing_ops: simulate a manifest without any operations
423
424 """
425 # Generate a test payload. For this test, we only care about the manifest
426 # and don't need any data blobs, hence we can use a plain paylaod generator
427 # (which also gives us more control on things that can be screwed up).
428 payload_gen = test_utils.PayloadGenerator()
429
430 # Tamper with block size, if required.
431 if fail_mismatched_block_size:
432 payload_gen.SetBlockSize(_KiB(1))
433 else:
434 payload_gen.SetBlockSize(_KiB(4))
435
436 # Add some operations.
437 if not fail_missing_ops:
438 payload_gen.AddOperation(False, common.OpType.MOVE,
439 src_extents=[(0, 16), (16, 497)],
440 dst_extents=[(16, 496), (0, 16)])
441 payload_gen.AddOperation(True, common.OpType.MOVE,
442 src_extents=[(0, 8), (8, 8)],
443 dst_extents=[(8, 8), (0, 8)])
444
445 # Set an invalid signatures block (offset but no size), if required.
446 if fail_bad_sigs:
447 payload_gen.SetSignatures(32, None)
448
449 # Add old kernel/rootfs partition info, as required.
450 if fail_mismatched_oki_ori or fail_bad_oki:
451 oki_hash = (None if fail_bad_oki
452 else hashlib.sha256('fake-oki-content').digest())
453 payload_gen.SetPartInfo(True, False, _KiB(512), oki_hash)
454 if not fail_mismatched_oki_ori and fail_bad_ori:
455 payload_gen.SetPartInfo(False, False, _MiB(8), None)
456
457 # Add new kernel/rootfs partition info.
458 payload_gen.SetPartInfo(
459 True, True, _KiB(512),
460 None if fail_bad_nki else hashlib.sha256('fake-nki-content').digest())
461 payload_gen.SetPartInfo(
462 False, True, _MiB(8),
463 None if fail_bad_nri else hashlib.sha256('fake-nri-content').digest())
464
465 # Create the test object.
466 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile)
467 report = checker._PayloadReport()
468
469 should_fail = (fail_mismatched_block_size or fail_bad_sigs or
470 fail_mismatched_oki_ori or fail_bad_oki or fail_bad_ori or
471 fail_bad_nki or fail_bad_nri or fail_missing_ops)
472 if should_fail:
473 self.assertRaises(update_payload.PayloadError,
474 payload_checker._CheckManifest, report)
475 else:
476 self.assertIsNone(payload_checker._CheckManifest(report))
477
478 def testCheckLength(self):
479 """Tests _CheckLength()."""
480 payload_checker = checker.PayloadChecker(self.MockPayload())
481 block_size = payload_checker.block_size
482
483 # Passes.
484 self.assertIsNone(payload_checker._CheckLength(
485 int(3.5 * block_size), 4, 'foo', 'bar'))
486 # Fails, too few blocks.
487 self.assertRaises(update_payload.PayloadError,
488 payload_checker._CheckLength,
489 int(3.5 * block_size), 3, 'foo', 'bar')
490 # Fails, too many blocks.
491 self.assertRaises(update_payload.PayloadError,
492 payload_checker._CheckLength,
493 int(3.5 * block_size), 5, 'foo', 'bar')
494
495 def testCheckExtents(self):
496 """Tests _CheckExtents()."""
497 payload_checker = checker.PayloadChecker(self.MockPayload())
498 block_size = payload_checker.block_size
499
500 # Passes w/ all real extents.
501 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
502 self.assertEquals(
503 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
504 collections.defaultdict(int), 'foo'),
505 23)
506
507 # Passes w/ pseudo-extents (aka sparse holes).
508 extents = self.NewExtentList((0, 4), (common.PSEUDO_EXTENT_MARKER, 5),
509 (8, 3))
510 self.assertEquals(
511 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
512 collections.defaultdict(int), 'foo',
513 allow_pseudo=True),
514 12)
515
516 # Passes w/ pseudo-extent due to a signature.
517 extents = self.NewExtentList((common.PSEUDO_EXTENT_MARKER, 2))
518 self.assertEquals(
519 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
520 collections.defaultdict(int), 'foo',
521 allow_signature=True),
522 2)
523
524 # Fails, extent missing a start block.
525 extents = self.NewExtentList((-1, 4), (8, 3), (1024, 16))
526 self.assertRaises(
527 update_payload.PayloadError, payload_checker._CheckExtents,
528 extents, (1024 + 16) * block_size, collections.defaultdict(int),
529 'foo')
530
531 # Fails, extent missing block count.
532 extents = self.NewExtentList((0, -1), (8, 3), (1024, 16))
533 self.assertRaises(
534 update_payload.PayloadError, payload_checker._CheckExtents,
535 extents, (1024 + 16) * block_size, collections.defaultdict(int),
536 'foo')
537
538 # Fails, extent has zero blocks.
539 extents = self.NewExtentList((0, 4), (8, 3), (1024, 0))
540 self.assertRaises(
541 update_payload.PayloadError, payload_checker._CheckExtents,
542 extents, (1024 + 16) * block_size, collections.defaultdict(int),
543 'foo')
544
545 # Fails, extent exceeds partition boundaries.
546 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
547 self.assertRaises(
548 update_payload.PayloadError, payload_checker._CheckExtents,
549 extents, (1024 + 15) * block_size, collections.defaultdict(int),
550 'foo')
551
552 def testCheckReplaceOperation(self):
553 """Tests _CheckReplaceOperation() where op.type == REPLACE."""
554 payload_checker = checker.PayloadChecker(self.MockPayload())
555 block_size = payload_checker.block_size
556 data_length = 10000
557
558 op = self.mox.CreateMock(
559 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
560 op.type = common.OpType.REPLACE
561
562 # Pass.
563 op.src_extents = []
564 self.assertIsNone(
565 payload_checker._CheckReplaceOperation(
566 op, data_length, (data_length + block_size - 1) / block_size,
567 'foo'))
568
569 # Fail, src extents founds.
570 op.src_extents = ['bar']
571 self.assertRaises(
572 update_payload.PayloadError,
573 payload_checker._CheckReplaceOperation,
574 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
575
576 # Fail, missing data.
577 op.src_extents = []
578 self.assertRaises(
579 update_payload.PayloadError,
580 payload_checker._CheckReplaceOperation,
581 op, None, (data_length + block_size - 1) / block_size, 'foo')
582
583 # Fail, length / block number mismatch.
584 op.src_extents = ['bar']
585 self.assertRaises(
586 update_payload.PayloadError,
587 payload_checker._CheckReplaceOperation,
588 op, data_length, (data_length + block_size - 1) / block_size + 1, 'foo')
589
590 def testCheckReplaceBzOperation(self):
591 """Tests _CheckReplaceOperation() where op.type == REPLACE_BZ."""
592 payload_checker = checker.PayloadChecker(self.MockPayload())
593 block_size = payload_checker.block_size
594 data_length = block_size * 3
595
596 op = self.mox.CreateMock(
597 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
598 op.type = common.OpType.REPLACE_BZ
599
600 # Pass.
601 op.src_extents = []
602 self.assertIsNone(
603 payload_checker._CheckReplaceOperation(
604 op, data_length, (data_length + block_size - 1) / block_size + 5,
605 'foo'))
606
607 # Fail, src extents founds.
608 op.src_extents = ['bar']
609 self.assertRaises(
610 update_payload.PayloadError,
611 payload_checker._CheckReplaceOperation,
612 op, data_length, (data_length + block_size - 1) / block_size + 5, 'foo')
613
614 # Fail, missing data.
615 op.src_extents = []
616 self.assertRaises(
617 update_payload.PayloadError,
618 payload_checker._CheckReplaceOperation,
619 op, None, (data_length + block_size - 1) / block_size, 'foo')
620
621 # Fail, too few blocks to justify BZ.
622 op.src_extents = []
623 self.assertRaises(
624 update_payload.PayloadError,
625 payload_checker._CheckReplaceOperation,
626 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
627
628 def testCheckMoveOperation_Pass(self):
629 """Tests _CheckMoveOperation(); pass case."""
630 payload_checker = checker.PayloadChecker(self.MockPayload())
631 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
632 op.type = common.OpType.MOVE
633
634 self.AddToMessage(op.src_extents,
635 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
636 self.AddToMessage(op.dst_extents,
637 self.NewExtentList((16, 128), (512, 6)))
638 self.assertIsNone(
639 payload_checker._CheckMoveOperation(op, None, 134, 134, 'foo'))
640
641 def testCheckMoveOperation_FailContainsData(self):
642 """Tests _CheckMoveOperation(); fails, message contains data."""
643 payload_checker = checker.PayloadChecker(self.MockPayload())
644 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
645 op.type = common.OpType.MOVE
646
647 self.AddToMessage(op.src_extents,
648 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
649 self.AddToMessage(op.dst_extents,
650 self.NewExtentList((16, 128), (512, 6)))
651 self.assertRaises(
652 update_payload.PayloadError,
653 payload_checker._CheckMoveOperation,
654 op, 1024, 134, 134, 'foo')
655
656 def testCheckMoveOperation_FailInsufficientSrcBlocks(self):
657 """Tests _CheckMoveOperation(); fails, not enough actual src blocks."""
658 payload_checker = checker.PayloadChecker(self.MockPayload())
659 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
660 op.type = common.OpType.MOVE
661
662 self.AddToMessage(op.src_extents,
663 self.NewExtentList((0, 4), (12, 2), (1024, 127)))
664 self.AddToMessage(op.dst_extents,
665 self.NewExtentList((16, 128), (512, 6)))
666 self.assertRaises(
667 update_payload.PayloadError,
668 payload_checker._CheckMoveOperation,
669 op, None, 134, 134, 'foo')
670
671 def testCheckMoveOperation_FailInsufficientDstBlocks(self):
672 """Tests _CheckMoveOperation(); fails, not enough actual dst blocks."""
673 payload_checker = checker.PayloadChecker(self.MockPayload())
674 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
675 op.type = common.OpType.MOVE
676
677 self.AddToMessage(op.src_extents,
678 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
679 self.AddToMessage(op.dst_extents,
680 self.NewExtentList((16, 128), (512, 5)))
681 self.assertRaises(
682 update_payload.PayloadError,
683 payload_checker._CheckMoveOperation,
684 op, None, 134, 134, 'foo')
685
686 def testCheckMoveOperation_FailExcessSrcBlocks(self):
687 """Tests _CheckMoveOperation(); fails, too many actual src blocks."""
688 payload_checker = checker.PayloadChecker(self.MockPayload())
689 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
690 op.type = common.OpType.MOVE
691
692 self.AddToMessage(op.src_extents,
693 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
694 self.AddToMessage(op.dst_extents,
695 self.NewExtentList((16, 128), (512, 5)))
696 self.assertRaises(
697 update_payload.PayloadError,
698 payload_checker._CheckMoveOperation,
699 op, None, 134, 134, 'foo')
700 self.AddToMessage(op.src_extents,
701 self.NewExtentList((0, 4), (12, 2), (1024, 129)))
702 self.AddToMessage(op.dst_extents,
703 self.NewExtentList((16, 128), (512, 6)))
704 self.assertRaises(
705 update_payload.PayloadError,
706 payload_checker._CheckMoveOperation,
707 op, None, 134, 134, 'foo')
708
709 def testCheckMoveOperation_FailExcessDstBlocks(self):
710 """Tests _CheckMoveOperation(); fails, too many actual dst blocks."""
711 payload_checker = checker.PayloadChecker(self.MockPayload())
712 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
713 op.type = common.OpType.MOVE
714
715 self.AddToMessage(op.src_extents,
716 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
717 self.AddToMessage(op.dst_extents,
718 self.NewExtentList((16, 128), (512, 7)))
719 self.assertRaises(
720 update_payload.PayloadError,
721 payload_checker._CheckMoveOperation,
722 op, None, 134, 134, 'foo')
723
724 def testCheckMoveOperation_FailStagnantBlocks(self):
725 """Tests _CheckMoveOperation(); fails, there are blocks that do not move."""
726 payload_checker = checker.PayloadChecker(self.MockPayload())
727 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
728 op.type = common.OpType.MOVE
729
730 self.AddToMessage(op.src_extents,
731 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
732 self.AddToMessage(op.dst_extents,
733 self.NewExtentList((8, 128), (512, 6)))
734 self.assertRaises(
735 update_payload.PayloadError,
736 payload_checker._CheckMoveOperation,
737 op, None, 134, 134, 'foo')
738
739 def testCheckBsdiff(self):
740 """Tests _CheckMoveOperation()."""
741 payload_checker = checker.PayloadChecker(self.MockPayload())
742
743 # Pass.
744 self.assertIsNone(
745 payload_checker._CheckBsdiffOperation(10000, 3, 'foo'))
746
747 # Fail, missing data blob.
748 self.assertRaises(
749 update_payload.PayloadError,
750 payload_checker._CheckBsdiffOperation,
751 None, 3, 'foo')
752
753 # Fail, too big of a diff blob (unjustified).
754 self.assertRaises(
755 update_payload.PayloadError,
756 payload_checker._CheckBsdiffOperation,
757 10000, 2, 'foo')
758
759 def DoCheckOperationTest(self, op_type_name, is_last, allow_signature,
760 allow_unhashed, fail_src_extents, fail_dst_extents,
761 fail_mismatched_data_offset_length,
762 fail_missing_dst_extents, fail_src_length,
763 fail_dst_length, fail_data_hash,
764 fail_prev_data_offset):
765 """Parametric testing of _CheckOperation().
766
767 Args:
768 op_type_name: 'REPLACE', 'REPLACE_BZ', 'MOVE' or 'BSDIFF'
769 is_last: whether we're testing the last operation in a sequence
770 allow_signature: whether we're testing a signature-capable operation
771 allow_unhashed: whether we're allowing to not hash the data
772 fail_src_extents: tamper with src extents
773 fail_dst_extents: tamper with dst extents
774 fail_mismatched_data_offset_length: make data_{offset,length} inconsistent
775 fail_missing_dst_extents: do not include dst extents
776 fail_src_length: make src length inconsistent
777 fail_dst_length: make dst length inconsistent
778 fail_data_hash: tamper with the data blob hash
779 fail_prev_data_offset: make data space uses incontiguous
780
781 """
782 op_type = _OpTypeByName(op_type_name)
783
784 # Create the test object.
785 payload = self.MockPayload()
786 payload_checker = checker.PayloadChecker(payload)
787 block_size = payload_checker.block_size
788
789 # Create auxiliary arguments.
790 old_part_size = _MiB(4)
791 new_part_size = _MiB(8)
792 old_block_counters = array.array(
793 'B', [0] * ((old_part_size + block_size - 1) / block_size))
794 new_block_counters = array.array(
795 'B', [0] * ((new_part_size + block_size - 1) / block_size))
796 prev_data_offset = 1876
797 blob_hash_counts = collections.defaultdict(int)
798
799 # Create the operation object for the test.
800 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
801 op.type = op_type
802
803 total_src_blocks = 0
804 if op_type in (common.OpType.MOVE, common.OpType.BSDIFF):
805 if fail_src_extents:
806 self.AddToMessage(op.src_extents,
807 self.NewExtentList((0, 0)))
808 else:
809 self.AddToMessage(op.src_extents,
810 self.NewExtentList((0, 16)))
811 total_src_blocks = 16
812
813 if op_type != common.OpType.MOVE:
814 if not fail_mismatched_data_offset_length:
815 op.data_length = 16 * block_size - 8
816 if fail_prev_data_offset:
817 op.data_offset = prev_data_offset + 16
818 else:
819 op.data_offset = prev_data_offset
820
821 fake_data = 'fake-data'.ljust(op.data_length)
822 if not (allow_unhashed or (is_last and allow_signature and
823 op_type == common.OpType.REPLACE)):
824 if not fail_data_hash:
825 # Create a valid data blob hash.
826 op.data_sha256_hash = hashlib.sha256(fake_data).digest()
827 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
828 fake_data)
829 elif fail_data_hash:
830 # Create an invalid data blob hash.
831 op.data_sha256_hash = hashlib.sha256(
832 fake_data.replace(' ', '-')).digest()
833 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
834 fake_data)
835
836 total_dst_blocks = 0
837 if not fail_missing_dst_extents:
838 total_dst_blocks = 16
839 if fail_dst_extents:
840 self.AddToMessage(op.dst_extents,
841 self.NewExtentList((4, 16), (32, 0)))
842 else:
843 self.AddToMessage(op.dst_extents,
844 self.NewExtentList((4, 8), (64, 8)))
845
846 if total_src_blocks:
847 if fail_src_length:
848 op.src_length = total_src_blocks * block_size + 8
849 else:
850 op.src_length = total_src_blocks * block_size
851 elif fail_src_length:
852 # Add an orphaned src_length.
853 op.src_length = 16
854
855 if total_dst_blocks:
856 if fail_dst_length:
857 op.dst_length = total_dst_blocks * block_size + 8
858 else:
859 op.dst_length = total_dst_blocks * block_size
860
861 self.mox.ReplayAll()
862 should_fail = (fail_src_extents or fail_dst_extents or
863 fail_mismatched_data_offset_length or
864 fail_missing_dst_extents or fail_src_length or
865 fail_dst_length or fail_data_hash or fail_prev_data_offset)
866 largs = [op, 'foo', is_last, old_block_counters, new_block_counters,
867 old_part_size, new_part_size, prev_data_offset, allow_signature,
868 allow_unhashed, blob_hash_counts]
869 if should_fail:
870 self.assertRaises(update_payload.PayloadError,
871 payload_checker._CheckOperation, *largs)
872 else:
873 self.assertEqual(payload_checker._CheckOperation(*largs),
874 op.data_length if op.HasField('data_length') else 0)
875
876 def testAllocBlockCounters(self):
877 """Tests _CheckMoveOperation()."""
878 payload_checker = checker.PayloadChecker(self.MockPayload())
879 block_size = payload_checker.block_size
880
881 # Check allocation for block-aligned partition size, ensure it's integers.
882 result = payload_checker._AllocBlockCounters(16 * block_size)
883 self.assertEqual(len(result), 16)
884 self.assertEqual(type(result[0]), int)
885
886 # Check allocation of unaligned partition sizes.
887 result = payload_checker._AllocBlockCounters(16 * block_size - 1)
888 self.assertEqual(len(result), 16)
889 result = payload_checker._AllocBlockCounters(16 * block_size + 1)
890 self.assertEqual(len(result), 17)
891
892 def DoCheckOperationsTest(self, fail_bad_type,
893 fail_nonexhaustive_full_update):
894 # Generate a test payload. For this test, we only care about one
895 # (arbitrary) set of operations, so we'll only be generating kernel and
896 # test with them.
897 payload_gen = test_utils.PayloadGenerator()
898
899 block_size = _KiB(4)
900 payload_gen.SetBlockSize(block_size)
901
902 rootfs_part_size = _MiB(8)
903
904 # Fake rootfs operations in a full update, tampered with as required.
905 rootfs_op_type = common.OpType.REPLACE
906 if fail_bad_type:
907 # Choose a type value that's bigger than the highest valid value.
908 for valid_op_type in common.OpType.ALL:
909 rootfs_op_type = max(rootfs_op_type, valid_op_type)
910 rootfs_op_type += 1
911
912 rootfs_data_length = rootfs_part_size
913 if fail_nonexhaustive_full_update:
914 rootfs_data_length -= block_size
915
916 payload_gen.AddOperation(False, rootfs_op_type,
917 dst_extents=[(0, rootfs_data_length / block_size)],
918 data_offset=0,
919 data_length=rootfs_data_length)
920
921 # Create the test object.
922 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile)
923 payload_checker.payload_type = checker._TYPE_FULL
924 report = checker._PayloadReport()
925
926 should_fail = (fail_bad_type or fail_nonexhaustive_full_update)
927 largs = (payload_checker.payload.manifest.install_operations, report,
928 'foo', 0, rootfs_part_size, 0, True, False)
929 if should_fail:
930 self.assertRaises(update_payload.PayloadError,
931 payload_checker._CheckOperations, *largs)
932 else:
933 self.assertEqual(payload_checker._CheckOperations(*largs),
934 rootfs_data_length)
935
936 def DoCheckSignaturesTest(self, fail_empty_sigs_blob, fail_missing_pseudo_op,
937 fail_mismatched_pseudo_op, fail_sig_missing_fields,
938 fail_unknown_sig_version, fail_incorrect_sig):
939 # Generate a test payload. For this test, we only care about the signature
940 # block and how it relates to the payload hash. Therefore, we're generating
941 # a random (otherwise useless) payload for this purpose.
942 payload_gen = test_utils.EnhancedPayloadGenerator()
943 block_size = _KiB(4)
944 payload_gen.SetBlockSize(block_size)
945 rootfs_part_size = _MiB(2)
946 payload_gen.SetPartInfo(False, True, rootfs_part_size,
947 hashlib.sha256('fake-new-rootfs-content').digest())
948 payload_gen.SetPartInfo(True, True, _KiB(16),
949 hashlib.sha256('fake-new-kernel-content').digest())
950 payload_gen.AddOperationWithData(
951 False, common.OpType.REPLACE,
952 dst_extents=[(0, rootfs_part_size / block_size)],
953 data_blob=os.urandom(rootfs_part_size))
954
955 do_forge_pseudo_op = (fail_missing_pseudo_op or fail_mismatched_pseudo_op)
956 do_forge_sigs_data = (do_forge_pseudo_op or fail_empty_sigs_blob or
957 fail_sig_missing_fields or fail_unknown_sig_version
958 or fail_incorrect_sig)
959
960 sigs_data = None
961 if do_forge_sigs_data:
962 sigs_gen = test_utils.SignaturesGenerator()
963 if not fail_empty_sigs_blob:
964 if fail_sig_missing_fields:
965 sig_data = None
966 else:
967 sig_data = test_utils.SignSha256('fake-payload-content',
968 _PRIVKEY_FILE_NAME)
969 sigs_gen.AddSig(5 if fail_unknown_sig_version else 1, sig_data)
970
971 sigs_data = sigs_gen.ToBinary()
972 payload_gen.SetSignatures(payload_gen.curr_offset, len(sigs_data))
973
974 if do_forge_pseudo_op:
975 assert sigs_data is not None, 'should have forged signatures blob by now'
976 sigs_len = len(sigs_data)
977 payload_gen.AddOperation(
978 False, common.OpType.REPLACE,
979 data_offset=payload_gen.curr_offset / 2,
980 data_length=sigs_len / 2,
981 dst_extents=[(0, (sigs_len / 2 + block_size - 1) / block_size)])
982
983 # Generate payload (complete w/ signature) and create the test object.
984 payload_checker = _GetPayloadChecker(
985 payload_gen.WriteToFileWithData, sigs_data=sigs_data,
986 privkey_file_name=_PRIVKEY_FILE_NAME,
987 do_add_pseudo_operation=(not do_forge_pseudo_op))
988 payload_checker.payload_type = checker._TYPE_FULL
989 report = checker._PayloadReport()
990
991 # We have to check the manifest first in order to set signature attributes.
992 payload_checker._CheckManifest(report)
993
994 should_fail = (fail_empty_sigs_blob or fail_missing_pseudo_op or
995 fail_mismatched_pseudo_op or fail_sig_missing_fields or
996 fail_unknown_sig_version or fail_incorrect_sig)
997 largs = (report, _PUBKEY_FILE_NAME)
998 if should_fail:
999 self.assertRaises(update_payload.PayloadError,
1000 payload_checker._CheckSignatures, *largs)
1001 else:
1002 self.assertIsNone(payload_checker._CheckSignatures(*largs))
1003
1004 def DoRunTest(self, fail_wrong_payload_type, fail_invalid_block_size,
1005 fail_mismatched_block_size, fail_excess_data):
1006 # Generate a test payload. For this test, we generate a full update that
1007 # has samle kernel and rootfs operations. Since most testing is done with
1008 # internal PayloadChecker methods that are tested elsewhere, here we only
1009 # tamper with what's actually being manipulated and/or tested in the Run()
1010 # method itself. Note that the checker doesn't verify partition hashes, so
1011 # they're safe to fake.
1012 payload_gen = test_utils.EnhancedPayloadGenerator()
1013 block_size = _KiB(4)
1014 payload_gen.SetBlockSize(block_size)
1015 kernel_part_size = _KiB(16)
1016 rootfs_part_size = _MiB(2)
1017 payload_gen.SetPartInfo(False, True, rootfs_part_size,
1018 hashlib.sha256('fake-new-rootfs-content').digest())
1019 payload_gen.SetPartInfo(True, True, kernel_part_size,
1020 hashlib.sha256('fake-new-kernel-content').digest())
1021 payload_gen.AddOperationWithData(
1022 False, common.OpType.REPLACE,
1023 dst_extents=[(0, rootfs_part_size / block_size)],
1024 data_blob=os.urandom(rootfs_part_size))
1025 payload_gen.AddOperationWithData(
1026 True, common.OpType.REPLACE,
1027 dst_extents=[(0, kernel_part_size / block_size)],
1028 data_blob=os.urandom(kernel_part_size))
1029
1030 # Generate payload (complete w/ signature) and create the test object.
1031 payload_checker = _GetPayloadChecker(
1032 payload_gen.WriteToFileWithData,
1033 privkey_file_name=_PRIVKEY_FILE_NAME,
1034 do_add_pseudo_operation=True, is_pseudo_in_kernel=True,
1035 padding=os.urandom(1024) if fail_excess_data else None)
1036
1037 if fail_invalid_block_size:
1038 use_block_size = block_size + 5 # not a power of two
1039 elif fail_mismatched_block_size:
1040 use_block_size = block_size * 2 # different that payload stated
1041 else:
1042 use_block_size = block_size
1043 dargs = {
1044 'pubkey_file_name': _PUBKEY_FILE_NAME,
1045 'assert_type': 'delta' if fail_wrong_payload_type else 'full',
1046 'block_size': use_block_size}
1047
1048 should_fail = (fail_wrong_payload_type or fail_invalid_block_size or
1049 fail_mismatched_block_size or fail_excess_data)
1050 if should_fail:
1051 self.assertRaises(update_payload.PayloadError,
1052 payload_checker.Run, **dargs)
1053 else:
1054 self.assertIsNone(payload_checker.Run(**dargs))
1055
1056
1057# This implements a generic API, hence the occasional unused args.
1058# pylint: disable=W0613
1059def ValidateCheckOperationTest(op_type_name, is_last, allow_signature,
1060 allow_unhashed, fail_src_extents,
1061 fail_dst_extents,
1062 fail_mismatched_data_offset_length,
1063 fail_missing_dst_extents, fail_src_length,
1064 fail_dst_length, fail_data_hash,
1065 fail_prev_data_offset):
1066 """Returns True iff the combination of arguments represents a valid test."""
1067 op_type = _OpTypeByName(op_type_name)
1068
1069 # REPLACE/REPLACE_BZ operations don't read data from src partition.
1070 if (op_type in (common.OpType.REPLACE, common.OpType.REPLACE_BZ) and (
1071 fail_src_extents or fail_src_length)):
1072 return False
1073
1074 # MOVE operations don't carry data.
1075 if (op_type == common.OpType.MOVE and (
1076 fail_mismatched_data_offset_length or fail_data_hash or
1077 fail_prev_data_offset)):
1078 return False
1079
1080 return True
1081
1082
1083def TestMethodBody(run_method_name, run_dargs):
1084 """Returns a function that invokes a named method with named arguments."""
1085 return lambda self: getattr(self, run_method_name)(**run_dargs)
1086
1087
1088def AddParametricTests(tested_method_name, arg_space, validate_func=None):
1089 """Enumerates and adds specific parametric tests to PayloadCheckerTest.
1090
1091 This function enumerates a space of test parameters (defined by arg_space),
1092 then binds a new, unique method name in PayloadCheckerTest to a test function
1093 that gets handed the said parameters. This is a preferable approach to doing
1094 the enumeration and invocation during the tests because this way each test is
1095 treated as a complete run by the unittest framework, and so benefits from the
1096 usual setUp/tearDown mechanics.
1097
1098 Args:
1099 tested_method_name: name of the tested PayloadChecker method
1100 arg_space: a dictionary containing variables (keys) and lists of values
1101 (values) associated with them
1102 validate_func: a function used for validating test argument combinations
1103
1104 """
1105 for value_tuple in itertools.product(*arg_space.itervalues()):
1106 run_dargs = dict(zip(arg_space.iterkeys(), value_tuple))
1107 if validate_func and not validate_func(**run_dargs):
1108 continue
1109 run_method_name = 'Do%sTest' % tested_method_name
1110 test_method_name = 'test%s' % tested_method_name
1111 for arg_key, arg_val in run_dargs.iteritems():
1112 if arg_val or type(arg_val) is int:
1113 test_method_name += '__%s=%s' % (arg_key, arg_val)
1114 setattr(PayloadCheckerTest, test_method_name,
1115 TestMethodBody(run_method_name, run_dargs))
1116
1117
1118def AddAllParametricTests():
1119 """Enumerates and adds all parametric tests to PayloadCheckerTest."""
1120 # Add all _CheckElem() test cases.
1121 AddParametricTests('AddElem',
1122 {'linebreak': (True, False),
1123 'indent': (0, 1, 2),
1124 'convert': (str, lambda s: s[::-1]),
1125 'is_present': (True, False),
1126 'is_mandatory': (True, False),
1127 'is_submsg': (True, False)})
1128
1129 # Add all _Add{Mandatory,Optional}Field tests.
1130 AddParametricTests('AddField',
1131 {'is_mandatory': (True, False),
1132 'linebreak': (True, False),
1133 'indent': (0, 1, 2),
1134 'convert': (str, lambda s: s[::-1]),
1135 'is_present': (True, False)})
1136
1137 # Add all _Add{Mandatory,Optional}SubMsg tests.
1138 AddParametricTests('AddSubMsg',
1139 {'is_mandatory': (True, False),
1140 'is_present': (True, False)})
1141
1142 # Add all _CheckManifest() test cases.
1143 AddParametricTests('CheckManifest',
1144 {'fail_mismatched_block_size': (True, False),
1145 'fail_bad_sigs': (True, False),
1146 'fail_mismatched_oki_ori': (True, False),
1147 'fail_bad_oki': (True, False),
1148 'fail_bad_ori': (True, False),
1149 'fail_bad_nki': (True, False),
1150 'fail_bad_nri': (True, False),
1151 'fail_missing_ops': (True, False)})
1152
1153 # Add all _CheckOperation() test cases.
1154 AddParametricTests('CheckOperation',
1155 {'op_type_name': ('REPLACE', 'REPLACE_BZ', 'MOVE',
1156 'BSDIFF'),
1157 'is_last': (True, False),
1158 'allow_signature': (True, False),
1159 'allow_unhashed': (True, False),
1160 'fail_src_extents': (True, False),
1161 'fail_dst_extents': (True, False),
1162 'fail_mismatched_data_offset_length': (True, False),
1163 'fail_missing_dst_extents': (True, False),
1164 'fail_src_length': (True, False),
1165 'fail_dst_length': (True, False),
1166 'fail_data_hash': (True, False),
1167 'fail_prev_data_offset': (True, False)},
1168 validate_func=ValidateCheckOperationTest)
1169
1170 # Add all _CheckOperations() test cases.
1171 AddParametricTests('CheckOperations',
1172 {'fail_bad_type': (True, False),
1173 'fail_nonexhaustive_full_update': (True, False)})
1174
1175 # Add all _CheckOperations() test cases.
1176 AddParametricTests('CheckSignatures',
1177 {'fail_empty_sigs_blob': (True, False),
1178 'fail_missing_pseudo_op': (True, False),
1179 'fail_mismatched_pseudo_op': (True, False),
1180 'fail_sig_missing_fields': (True, False),
1181 'fail_unknown_sig_version': (True, False),
1182 'fail_incorrect_sig': (True, False)})
1183
1184 # Add all Run() test cases.
1185 AddParametricTests('Run',
1186 {'fail_wrong_payload_type': (True, False),
1187 'fail_invalid_block_size': (True, False),
1188 'fail_mismatched_block_size': (True, False),
1189 'fail_excess_data': (True, False)})
1190
1191
1192if __name__ == '__main__':
1193 AddAllParametricTests()
1194 unittest.main()