blob: 8d134fc0ddcf917e6d9a115b7c76127c6b293a38 [file] [log] [blame]
Gilad Arnold5502b562013-03-08 13:22:31 -08001#!/usr/bin/python
2#
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Unit testing checker.py."""
8
9import array
10import collections
11import cStringIO
12import hashlib
13import itertools
14import os
15import unittest
16
17# Pylint cannot find mox.
18# pylint: disable=F0401
19import mox
20
21import checker
22import common
23import payload as update_payload # avoid name conflicts later.
24import test_utils
25import update_metadata_pb2
26
27
28_PRIVKEY_FILE_NAME = 'payload-test-key.pem'
29_PUBKEY_FILE_NAME = 'payload-test-key.pub'
30
31
32def _OpTypeByName(op_name):
33 op_name_to_type = {
34 'REPLACE': common.OpType.REPLACE,
35 'REPLACE_BZ': common.OpType.REPLACE_BZ,
36 'MOVE': common.OpType.MOVE,
37 'BSDIFF': common.OpType.BSDIFF,
38 }
39 return op_name_to_type[op_name]
40
41
42def _KiB(count):
43 """Return the byte size of a given number of (binary) kilobytes."""
44 return count << 10
45
46
47def _MiB(count):
48 """Return the byte size of a given number of (binary) megabytes."""
49 return count << 20
50
51
52def _GiB(count):
53 """Return the byte size of a given number of (binary) gigabytes."""
54 return count << 30
55
56
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070057def _GetPayloadChecker(payload_gen_write_to_file_func, payload_gen_dargs=None,
58 checker_init_dargs=None):
Gilad Arnold5502b562013-03-08 13:22:31 -080059 """Returns a payload checker from a given payload generator."""
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070060 if payload_gen_dargs is None:
61 payload_gen_dargs = {}
62 if checker_init_dargs is None:
63 checker_init_dargs = {}
64
Gilad Arnold5502b562013-03-08 13:22:31 -080065 payload_file = cStringIO.StringIO()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070066 payload_gen_write_to_file_func(payload_file, **payload_gen_dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -080067 payload_file.seek(0)
68 payload = update_payload.Payload(payload_file)
69 payload.Init()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070070 return checker.PayloadChecker(payload, **checker_init_dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -080071
72
73def _GetPayloadCheckerWithData(payload_gen):
74 """Returns a payload checker from a given payload generator."""
75 payload_file = cStringIO.StringIO()
76 payload_gen.WriteToFile(payload_file)
77 payload_file.seek(0)
78 payload = update_payload.Payload(payload_file)
79 payload.Init()
80 return checker.PayloadChecker(payload)
81
82
83# (i) this class doesn't need an __init__(); (ii) unit testing is all about
84# running protected methods; (iii) don't bark about missing members of classes
85# you cannot import.
86# pylint: disable=W0232
87# pylint: disable=W0212
88# pylint: disable=E1101
89class PayloadCheckerTest(mox.MoxTestBase):
90 """Tests the PayloadChecker class.
91
92 In addition to ordinary testFoo() methods, which are automatically invoked by
93 the unittest framework, in this class we make use of DoBarTest() calls that
94 implement parametric tests of certain features. In order to invoke each test,
95 which embodies a unique combination of parameter values, as a complete unit
96 test, we perform explicit enumeration of the parameter space and create
97 individual invocation contexts for each, which are then bound as
98 testBar__param1=val1__param2=val2(). The enumeration of parameter spaces for
99 all such tests is done in AddAllParametricTests().
100
101 """
102
103 def MockPayload(self):
104 """Create a mock payload object, complete with a mock menifest."""
105 payload = self.mox.CreateMock(update_payload.Payload)
106 payload.is_init = True
107 payload.manifest = self.mox.CreateMock(
108 update_metadata_pb2.DeltaArchiveManifest)
109 return payload
110
111 @staticmethod
112 def NewExtent(start_block, num_blocks):
113 """Returns an Extent message.
114
115 Each of the provided fields is set iff it is >= 0; otherwise, it's left at
116 its default state.
117
118 Args:
119 start_block: the starting block of the extent
120 num_blocks: the number of blocks in the extent
121 Returns:
122 An Extent message.
123
124 """
125 ex = update_metadata_pb2.Extent()
126 if start_block >= 0:
127 ex.start_block = start_block
128 if num_blocks >= 0:
129 ex.num_blocks = num_blocks
130 return ex
131
132 @staticmethod
133 def NewExtentList(*args):
134 """Returns an list of extents.
135
136 Args:
137 *args: (start_block, num_blocks) pairs defining the extents
138 Returns:
139 A list of Extent objects.
140
141 """
142 ex_list = []
143 for start_block, num_blocks in args:
144 ex_list.append(PayloadCheckerTest.NewExtent(start_block, num_blocks))
145 return ex_list
146
147 @staticmethod
148 def AddToMessage(repeated_field, field_vals):
149 for field_val in field_vals:
150 new_field = repeated_field.add()
151 new_field.CopyFrom(field_val)
152
153 def assertIsNone(self, val):
154 """Asserts that val is None (TODO remove once we upgrade to Python 2.7).
155
156 Note that we're using assertEqual so as for it to show us the actual
157 non-None value.
158
159 Args:
160 val: value/object to be equated to None
161
162 """
163 self.assertEqual(val, None)
164
165 def SetupAddElemTest(self, is_present, is_submsg, convert=str,
166 linebreak=False, indent=0):
167 """Setup for testing of _CheckElem() and its derivatives.
168
169 Args:
170 is_present: whether or not the element is found in the message
171 is_submsg: whether the element is a sub-message itself
172 convert: a representation conversion function
173 linebreak: whether or not a linebreak is to be used in the report
174 indent: indentation used for the report
175 Returns:
176 msg: a mock message object
177 report: a mock report object
178 subreport: a mock sub-report object
179 name: an element name to check
180 val: expected element value
181
182 """
183 name = 'foo'
184 val = 'fake submsg' if is_submsg else 'fake field'
185 subreport = 'fake subreport'
186
187 # Create a mock message.
188 msg = self.mox.CreateMock(update_metadata_pb2.message.Message)
189 msg.HasField(name).AndReturn(is_present)
190 setattr(msg, name, val)
191
192 # Create a mock report.
193 report = self.mox.CreateMock(checker._PayloadReport)
194 if is_present:
195 if is_submsg:
196 report.AddSubReport(name).AndReturn(subreport)
197 else:
198 report.AddField(name, convert(val), linebreak=linebreak, indent=indent)
199
200 self.mox.ReplayAll()
201 return (msg, report, subreport, name, val)
202
203 def DoAddElemTest(self, is_present, is_mandatory, is_submsg, convert,
204 linebreak, indent):
205 """Parametric testing of _CheckElem().
206
207 Args:
208 is_present: whether or not the element is found in the message
209 is_mandatory: whether or not it's a mandatory element
210 is_submsg: whether the element is a sub-message itself
211 convert: a representation conversion function
212 linebreak: whether or not a linebreak is to be used in the report
213 indent: indentation used for the report
214
215 """
216 msg, report, subreport, name, val = self.SetupAddElemTest(
217 is_present, is_submsg, convert, linebreak, indent)
218
219 largs = [msg, name, report, is_mandatory, is_submsg]
220 dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
221 if is_mandatory and not is_present:
222 self.assertRaises(update_payload.PayloadError,
223 checker.PayloadChecker._CheckElem, *largs, **dargs)
224 else:
225 ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*largs,
226 **dargs)
227 self.assertEquals(ret_val, val if is_present else None)
228 self.assertEquals(ret_subreport,
229 subreport if is_present and is_submsg else None)
230
231 def DoAddFieldTest(self, is_mandatory, is_present, convert, linebreak,
232 indent):
233 """Parametric testing of _Check{Mandatory,Optional}Field().
234
235 Args:
236 is_mandatory: whether we're testing a mandatory call
237 is_present: whether or not the element is found in the message
238 convert: a representation conversion function
239 linebreak: whether or not a linebreak is to be used in the report
240 indent: indentation used for the report
241
242 """
243 msg, report, _, name, val = self.SetupAddElemTest(
244 is_present, False, convert, linebreak, indent)
245
246 # Prepare for invocation of the tested method.
247 largs = [msg, name, report]
248 dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
249 if is_mandatory:
250 largs.append('bar')
251 tested_func = checker.PayloadChecker._CheckMandatoryField
252 else:
253 tested_func = checker.PayloadChecker._CheckOptionalField
254
255 # Test the method call.
256 if is_mandatory and not is_present:
257 self.assertRaises(update_payload.PayloadError, tested_func, *largs,
258 **dargs)
259 else:
260 ret_val = tested_func(*largs, **dargs)
261 self.assertEquals(ret_val, val if is_present else None)
262
263 def DoAddSubMsgTest(self, is_mandatory, is_present):
264 """Parametrized testing of _Check{Mandatory,Optional}SubMsg().
265
266 Args:
267 is_mandatory: whether we're testing a mandatory call
268 is_present: whether or not the element is found in the message
269
270 """
271 msg, report, subreport, name, val = self.SetupAddElemTest(is_present, True)
272
273 # Prepare for invocation of the tested method.
274 largs = [msg, name, report]
275 if is_mandatory:
276 largs.append('bar')
277 tested_func = checker.PayloadChecker._CheckMandatorySubMsg
278 else:
279 tested_func = checker.PayloadChecker._CheckOptionalSubMsg
280
281 # Test the method call.
282 if is_mandatory and not is_present:
283 self.assertRaises(update_payload.PayloadError, tested_func, *largs)
284 else:
285 ret_val, ret_subreport = tested_func(*largs)
286 self.assertEquals(ret_val, val if is_present else None)
287 self.assertEquals(ret_subreport, subreport if is_present else None)
288
289 def testCheckPresentIff(self):
290 """Tests _CheckPresentIff()."""
291 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
292 None, None, 'foo', 'bar', 'baz'))
293 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
294 'a', 'b', 'foo', 'bar', 'baz'))
295 self.assertRaises(update_payload.PayloadError,
296 checker.PayloadChecker._CheckPresentIff,
297 'a', None, 'foo', 'bar', 'baz')
298 self.assertRaises(update_payload.PayloadError,
299 checker.PayloadChecker._CheckPresentIff,
300 None, 'b', 'foo', 'bar', 'baz')
301
302 def DoCheckSha256SignatureTest(self, expect_pass, expect_subprocess_call,
303 sig_data, sig_asn1_header,
304 returned_signed_hash, expected_signed_hash):
305 """Parametric testing of _CheckSha256SignatureTest().
306
307 Args:
308 expect_pass: whether or not it should pass
309 expect_subprocess_call: whether to expect the openssl call to happen
310 sig_data: the signature raw data
311 sig_asn1_header: the ASN1 header
312 returned_signed_hash: the signed hash data retuned by openssl
313 expected_signed_hash: the signed hash data to compare against
314
315 """
316 # Stub out the subprocess invocation.
317 self.mox.StubOutWithMock(checker.PayloadChecker, '_Run')
318 if expect_subprocess_call:
319 checker.PayloadChecker._Run(mox.IsA(list), send_data=sig_data).AndReturn(
320 (sig_asn1_header + returned_signed_hash, None))
321
322 self.mox.ReplayAll()
323 if expect_pass:
324 self.assertIsNone(checker.PayloadChecker._CheckSha256Signature(
325 sig_data, 'foo', expected_signed_hash, 'bar'))
326 else:
327 self.assertRaises(update_payload.PayloadError,
328 checker.PayloadChecker._CheckSha256Signature,
329 sig_data, 'foo', expected_signed_hash, 'bar')
330
331 self.mox.UnsetStubs()
332
333 def testCheckSha256Signature_Pass(self):
334 """Tests _CheckSha256Signature(); pass case."""
335 sig_data = 'fake-signature'.ljust(256)
336 signed_hash = hashlib.sha256('fake-data').digest()
337 self.DoCheckSha256SignatureTest(True, True, sig_data,
338 common.SIG_ASN1_HEADER, signed_hash,
339 signed_hash)
340
341 def testCheckSha256Signature_FailBadSignature(self):
342 """Tests _CheckSha256Signature(); fails due to malformed signature."""
343 sig_data = 'fake-signature' # malformed (not 256 bytes in length)
344 signed_hash = hashlib.sha256('fake-data').digest()
345 self.DoCheckSha256SignatureTest(False, False, sig_data,
346 common.SIG_ASN1_HEADER, signed_hash,
347 signed_hash)
348
349 def testCheckSha256Signature_FailBadOutputLength(self):
350 """Tests _CheckSha256Signature(); fails due to unexpected output length."""
351 sig_data = 'fake-signature'.ljust(256)
352 signed_hash = 'fake-hash' # malformed (not 32 bytes in length)
353 self.DoCheckSha256SignatureTest(False, True, sig_data,
354 common.SIG_ASN1_HEADER, signed_hash,
355 signed_hash)
356
357 def testCheckSha256Signature_FailBadAsnHeader(self):
358 """Tests _CheckSha256Signature(); fails due to bad ASN1 header."""
359 sig_data = 'fake-signature'.ljust(256)
360 signed_hash = hashlib.sha256('fake-data').digest()
361 bad_asn1_header = 'bad-asn-header'.ljust(len(common.SIG_ASN1_HEADER))
362 self.DoCheckSha256SignatureTest(False, True, sig_data, bad_asn1_header,
363 signed_hash, signed_hash)
364
365 def testCheckSha256Signature_FailBadHash(self):
366 """Tests _CheckSha256Signature(); fails due to bad hash returned."""
367 sig_data = 'fake-signature'.ljust(256)
368 expected_signed_hash = hashlib.sha256('fake-data').digest()
369 returned_signed_hash = hashlib.sha256('bad-fake-data').digest()
370 self.DoCheckSha256SignatureTest(False, True, sig_data,
371 common.SIG_ASN1_HEADER,
372 expected_signed_hash, returned_signed_hash)
373
374 def testCheckBlocksFitLength_Pass(self):
375 """Tests _CheckBlocksFitLength(); pass case."""
376 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
377 64, 4, 16, 'foo'))
378 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
379 60, 4, 16, 'foo'))
380 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
381 49, 4, 16, 'foo'))
382 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
383 48, 3, 16, 'foo'))
384
385 def testCheckBlocksFitLength_TooManyBlocks(self):
386 """Tests _CheckBlocksFitLength(); fails due to excess blocks."""
387 self.assertRaises(update_payload.PayloadError,
388 checker.PayloadChecker._CheckBlocksFitLength,
389 64, 5, 16, 'foo')
390 self.assertRaises(update_payload.PayloadError,
391 checker.PayloadChecker._CheckBlocksFitLength,
392 60, 5, 16, 'foo')
393 self.assertRaises(update_payload.PayloadError,
394 checker.PayloadChecker._CheckBlocksFitLength,
395 49, 5, 16, 'foo')
396 self.assertRaises(update_payload.PayloadError,
397 checker.PayloadChecker._CheckBlocksFitLength,
398 48, 4, 16, 'foo')
399
400 def testCheckBlocksFitLength_TooFewBlocks(self):
401 """Tests _CheckBlocksFitLength(); fails due to insufficient blocks."""
402 self.assertRaises(update_payload.PayloadError,
403 checker.PayloadChecker._CheckBlocksFitLength,
404 64, 3, 16, 'foo')
405 self.assertRaises(update_payload.PayloadError,
406 checker.PayloadChecker._CheckBlocksFitLength,
407 60, 3, 16, 'foo')
408 self.assertRaises(update_payload.PayloadError,
409 checker.PayloadChecker._CheckBlocksFitLength,
410 49, 3, 16, 'foo')
411 self.assertRaises(update_payload.PayloadError,
412 checker.PayloadChecker._CheckBlocksFitLength,
413 48, 2, 16, 'foo')
414
415 def DoCheckManifestTest(self, fail_mismatched_block_size, fail_bad_sigs,
416 fail_mismatched_oki_ori, fail_bad_oki, fail_bad_ori,
417 fail_bad_nki, fail_bad_nri, fail_missing_ops):
418 """Parametric testing of _CheckManifest().
419
420 Args:
421 fail_mismatched_block_size: simulate a missing block_size field
422 fail_bad_sigs: make signatures descriptor inconsistent
423 fail_mismatched_oki_ori: make old rootfs/kernel info partially present
424 fail_bad_oki: tamper with old kernel info
425 fail_bad_ori: tamper with old rootfs info
426 fail_bad_nki: tamper with new kernel info
427 fail_bad_nri: tamper with new rootfs info
428 fail_missing_ops: simulate a manifest without any operations
429
430 """
431 # Generate a test payload. For this test, we only care about the manifest
432 # and don't need any data blobs, hence we can use a plain paylaod generator
433 # (which also gives us more control on things that can be screwed up).
434 payload_gen = test_utils.PayloadGenerator()
435
436 # Tamper with block size, if required.
437 if fail_mismatched_block_size:
438 payload_gen.SetBlockSize(_KiB(1))
439 else:
440 payload_gen.SetBlockSize(_KiB(4))
441
442 # Add some operations.
443 if not fail_missing_ops:
444 payload_gen.AddOperation(False, common.OpType.MOVE,
445 src_extents=[(0, 16), (16, 497)],
446 dst_extents=[(16, 496), (0, 16)])
447 payload_gen.AddOperation(True, common.OpType.MOVE,
448 src_extents=[(0, 8), (8, 8)],
449 dst_extents=[(8, 8), (0, 8)])
450
451 # Set an invalid signatures block (offset but no size), if required.
452 if fail_bad_sigs:
453 payload_gen.SetSignatures(32, None)
454
455 # Add old kernel/rootfs partition info, as required.
456 if fail_mismatched_oki_ori or fail_bad_oki:
457 oki_hash = (None if fail_bad_oki
458 else hashlib.sha256('fake-oki-content').digest())
459 payload_gen.SetPartInfo(True, False, _KiB(512), oki_hash)
460 if not fail_mismatched_oki_ori and fail_bad_ori:
461 payload_gen.SetPartInfo(False, False, _MiB(8), None)
462
463 # Add new kernel/rootfs partition info.
464 payload_gen.SetPartInfo(
465 True, True, _KiB(512),
466 None if fail_bad_nki else hashlib.sha256('fake-nki-content').digest())
467 payload_gen.SetPartInfo(
468 False, True, _MiB(8),
469 None if fail_bad_nri else hashlib.sha256('fake-nri-content').digest())
470
471 # Create the test object.
472 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile)
473 report = checker._PayloadReport()
474
475 should_fail = (fail_mismatched_block_size or fail_bad_sigs or
476 fail_mismatched_oki_ori or fail_bad_oki or fail_bad_ori or
477 fail_bad_nki or fail_bad_nri or fail_missing_ops)
478 if should_fail:
479 self.assertRaises(update_payload.PayloadError,
480 payload_checker._CheckManifest, report)
481 else:
482 self.assertIsNone(payload_checker._CheckManifest(report))
483
484 def testCheckLength(self):
485 """Tests _CheckLength()."""
486 payload_checker = checker.PayloadChecker(self.MockPayload())
487 block_size = payload_checker.block_size
488
489 # Passes.
490 self.assertIsNone(payload_checker._CheckLength(
491 int(3.5 * block_size), 4, 'foo', 'bar'))
492 # Fails, too few blocks.
493 self.assertRaises(update_payload.PayloadError,
494 payload_checker._CheckLength,
495 int(3.5 * block_size), 3, 'foo', 'bar')
496 # Fails, too many blocks.
497 self.assertRaises(update_payload.PayloadError,
498 payload_checker._CheckLength,
499 int(3.5 * block_size), 5, 'foo', 'bar')
500
501 def testCheckExtents(self):
502 """Tests _CheckExtents()."""
503 payload_checker = checker.PayloadChecker(self.MockPayload())
504 block_size = payload_checker.block_size
505
506 # Passes w/ all real extents.
507 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
508 self.assertEquals(
509 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
510 collections.defaultdict(int), 'foo'),
511 23)
512
513 # Passes w/ pseudo-extents (aka sparse holes).
514 extents = self.NewExtentList((0, 4), (common.PSEUDO_EXTENT_MARKER, 5),
515 (8, 3))
516 self.assertEquals(
517 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
518 collections.defaultdict(int), 'foo',
519 allow_pseudo=True),
520 12)
521
522 # Passes w/ pseudo-extent due to a signature.
523 extents = self.NewExtentList((common.PSEUDO_EXTENT_MARKER, 2))
524 self.assertEquals(
525 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
526 collections.defaultdict(int), 'foo',
527 allow_signature=True),
528 2)
529
530 # Fails, extent missing a start block.
531 extents = self.NewExtentList((-1, 4), (8, 3), (1024, 16))
532 self.assertRaises(
533 update_payload.PayloadError, payload_checker._CheckExtents,
534 extents, (1024 + 16) * block_size, collections.defaultdict(int),
535 'foo')
536
537 # Fails, extent missing block count.
538 extents = self.NewExtentList((0, -1), (8, 3), (1024, 16))
539 self.assertRaises(
540 update_payload.PayloadError, payload_checker._CheckExtents,
541 extents, (1024 + 16) * block_size, collections.defaultdict(int),
542 'foo')
543
544 # Fails, extent has zero blocks.
545 extents = self.NewExtentList((0, 4), (8, 3), (1024, 0))
546 self.assertRaises(
547 update_payload.PayloadError, payload_checker._CheckExtents,
548 extents, (1024 + 16) * block_size, collections.defaultdict(int),
549 'foo')
550
551 # Fails, extent exceeds partition boundaries.
552 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
553 self.assertRaises(
554 update_payload.PayloadError, payload_checker._CheckExtents,
555 extents, (1024 + 15) * block_size, collections.defaultdict(int),
556 'foo')
557
558 def testCheckReplaceOperation(self):
559 """Tests _CheckReplaceOperation() where op.type == REPLACE."""
560 payload_checker = checker.PayloadChecker(self.MockPayload())
561 block_size = payload_checker.block_size
562 data_length = 10000
563
564 op = self.mox.CreateMock(
565 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
566 op.type = common.OpType.REPLACE
567
568 # Pass.
569 op.src_extents = []
570 self.assertIsNone(
571 payload_checker._CheckReplaceOperation(
572 op, data_length, (data_length + block_size - 1) / block_size,
573 'foo'))
574
575 # Fail, src extents founds.
576 op.src_extents = ['bar']
577 self.assertRaises(
578 update_payload.PayloadError,
579 payload_checker._CheckReplaceOperation,
580 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
581
582 # Fail, missing data.
583 op.src_extents = []
584 self.assertRaises(
585 update_payload.PayloadError,
586 payload_checker._CheckReplaceOperation,
587 op, None, (data_length + block_size - 1) / block_size, 'foo')
588
589 # Fail, length / block number mismatch.
590 op.src_extents = ['bar']
591 self.assertRaises(
592 update_payload.PayloadError,
593 payload_checker._CheckReplaceOperation,
594 op, data_length, (data_length + block_size - 1) / block_size + 1, 'foo')
595
596 def testCheckReplaceBzOperation(self):
597 """Tests _CheckReplaceOperation() where op.type == REPLACE_BZ."""
598 payload_checker = checker.PayloadChecker(self.MockPayload())
599 block_size = payload_checker.block_size
600 data_length = block_size * 3
601
602 op = self.mox.CreateMock(
603 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
604 op.type = common.OpType.REPLACE_BZ
605
606 # Pass.
607 op.src_extents = []
608 self.assertIsNone(
609 payload_checker._CheckReplaceOperation(
610 op, data_length, (data_length + block_size - 1) / block_size + 5,
611 'foo'))
612
613 # Fail, src extents founds.
614 op.src_extents = ['bar']
615 self.assertRaises(
616 update_payload.PayloadError,
617 payload_checker._CheckReplaceOperation,
618 op, data_length, (data_length + block_size - 1) / block_size + 5, 'foo')
619
620 # Fail, missing data.
621 op.src_extents = []
622 self.assertRaises(
623 update_payload.PayloadError,
624 payload_checker._CheckReplaceOperation,
625 op, None, (data_length + block_size - 1) / block_size, 'foo')
626
627 # Fail, too few blocks to justify BZ.
628 op.src_extents = []
629 self.assertRaises(
630 update_payload.PayloadError,
631 payload_checker._CheckReplaceOperation,
632 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
633
634 def testCheckMoveOperation_Pass(self):
635 """Tests _CheckMoveOperation(); pass case."""
636 payload_checker = checker.PayloadChecker(self.MockPayload())
637 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
638 op.type = common.OpType.MOVE
639
640 self.AddToMessage(op.src_extents,
641 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
642 self.AddToMessage(op.dst_extents,
643 self.NewExtentList((16, 128), (512, 6)))
644 self.assertIsNone(
645 payload_checker._CheckMoveOperation(op, None, 134, 134, 'foo'))
646
647 def testCheckMoveOperation_FailContainsData(self):
648 """Tests _CheckMoveOperation(); fails, message contains data."""
649 payload_checker = checker.PayloadChecker(self.MockPayload())
650 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
651 op.type = common.OpType.MOVE
652
653 self.AddToMessage(op.src_extents,
654 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
655 self.AddToMessage(op.dst_extents,
656 self.NewExtentList((16, 128), (512, 6)))
657 self.assertRaises(
658 update_payload.PayloadError,
659 payload_checker._CheckMoveOperation,
660 op, 1024, 134, 134, 'foo')
661
662 def testCheckMoveOperation_FailInsufficientSrcBlocks(self):
663 """Tests _CheckMoveOperation(); fails, not enough actual src blocks."""
664 payload_checker = checker.PayloadChecker(self.MockPayload())
665 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
666 op.type = common.OpType.MOVE
667
668 self.AddToMessage(op.src_extents,
669 self.NewExtentList((0, 4), (12, 2), (1024, 127)))
670 self.AddToMessage(op.dst_extents,
671 self.NewExtentList((16, 128), (512, 6)))
672 self.assertRaises(
673 update_payload.PayloadError,
674 payload_checker._CheckMoveOperation,
675 op, None, 134, 134, 'foo')
676
677 def testCheckMoveOperation_FailInsufficientDstBlocks(self):
678 """Tests _CheckMoveOperation(); fails, not enough actual dst blocks."""
679 payload_checker = checker.PayloadChecker(self.MockPayload())
680 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
681 op.type = common.OpType.MOVE
682
683 self.AddToMessage(op.src_extents,
684 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
685 self.AddToMessage(op.dst_extents,
686 self.NewExtentList((16, 128), (512, 5)))
687 self.assertRaises(
688 update_payload.PayloadError,
689 payload_checker._CheckMoveOperation,
690 op, None, 134, 134, 'foo')
691
692 def testCheckMoveOperation_FailExcessSrcBlocks(self):
693 """Tests _CheckMoveOperation(); fails, too many actual src blocks."""
694 payload_checker = checker.PayloadChecker(self.MockPayload())
695 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
696 op.type = common.OpType.MOVE
697
698 self.AddToMessage(op.src_extents,
699 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
700 self.AddToMessage(op.dst_extents,
701 self.NewExtentList((16, 128), (512, 5)))
702 self.assertRaises(
703 update_payload.PayloadError,
704 payload_checker._CheckMoveOperation,
705 op, None, 134, 134, 'foo')
706 self.AddToMessage(op.src_extents,
707 self.NewExtentList((0, 4), (12, 2), (1024, 129)))
708 self.AddToMessage(op.dst_extents,
709 self.NewExtentList((16, 128), (512, 6)))
710 self.assertRaises(
711 update_payload.PayloadError,
712 payload_checker._CheckMoveOperation,
713 op, None, 134, 134, 'foo')
714
715 def testCheckMoveOperation_FailExcessDstBlocks(self):
716 """Tests _CheckMoveOperation(); fails, too many actual dst blocks."""
717 payload_checker = checker.PayloadChecker(self.MockPayload())
718 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
719 op.type = common.OpType.MOVE
720
721 self.AddToMessage(op.src_extents,
722 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
723 self.AddToMessage(op.dst_extents,
724 self.NewExtentList((16, 128), (512, 7)))
725 self.assertRaises(
726 update_payload.PayloadError,
727 payload_checker._CheckMoveOperation,
728 op, None, 134, 134, 'foo')
729
730 def testCheckMoveOperation_FailStagnantBlocks(self):
731 """Tests _CheckMoveOperation(); fails, there are blocks that do not move."""
732 payload_checker = checker.PayloadChecker(self.MockPayload())
733 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
734 op.type = common.OpType.MOVE
735
736 self.AddToMessage(op.src_extents,
737 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
738 self.AddToMessage(op.dst_extents,
739 self.NewExtentList((8, 128), (512, 6)))
740 self.assertRaises(
741 update_payload.PayloadError,
742 payload_checker._CheckMoveOperation,
743 op, None, 134, 134, 'foo')
744
745 def testCheckBsdiff(self):
746 """Tests _CheckMoveOperation()."""
747 payload_checker = checker.PayloadChecker(self.MockPayload())
748
749 # Pass.
750 self.assertIsNone(
751 payload_checker._CheckBsdiffOperation(10000, 3, 'foo'))
752
753 # Fail, missing data blob.
754 self.assertRaises(
755 update_payload.PayloadError,
756 payload_checker._CheckBsdiffOperation,
757 None, 3, 'foo')
758
759 # Fail, too big of a diff blob (unjustified).
760 self.assertRaises(
761 update_payload.PayloadError,
762 payload_checker._CheckBsdiffOperation,
763 10000, 2, 'foo')
764
765 def DoCheckOperationTest(self, op_type_name, is_last, allow_signature,
766 allow_unhashed, fail_src_extents, fail_dst_extents,
767 fail_mismatched_data_offset_length,
768 fail_missing_dst_extents, fail_src_length,
769 fail_dst_length, fail_data_hash,
770 fail_prev_data_offset):
771 """Parametric testing of _CheckOperation().
772
773 Args:
774 op_type_name: 'REPLACE', 'REPLACE_BZ', 'MOVE' or 'BSDIFF'
775 is_last: whether we're testing the last operation in a sequence
776 allow_signature: whether we're testing a signature-capable operation
777 allow_unhashed: whether we're allowing to not hash the data
778 fail_src_extents: tamper with src extents
779 fail_dst_extents: tamper with dst extents
780 fail_mismatched_data_offset_length: make data_{offset,length} inconsistent
781 fail_missing_dst_extents: do not include dst extents
782 fail_src_length: make src length inconsistent
783 fail_dst_length: make dst length inconsistent
784 fail_data_hash: tamper with the data blob hash
785 fail_prev_data_offset: make data space uses incontiguous
786
787 """
788 op_type = _OpTypeByName(op_type_name)
789
790 # Create the test object.
791 payload = self.MockPayload()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700792 payload_checker = checker.PayloadChecker(payload,
793 allow_unhashed=allow_unhashed)
Gilad Arnold5502b562013-03-08 13:22:31 -0800794 block_size = payload_checker.block_size
795
796 # Create auxiliary arguments.
797 old_part_size = _MiB(4)
798 new_part_size = _MiB(8)
799 old_block_counters = array.array(
800 'B', [0] * ((old_part_size + block_size - 1) / block_size))
801 new_block_counters = array.array(
802 'B', [0] * ((new_part_size + block_size - 1) / block_size))
803 prev_data_offset = 1876
804 blob_hash_counts = collections.defaultdict(int)
805
806 # Create the operation object for the test.
807 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
808 op.type = op_type
809
810 total_src_blocks = 0
811 if op_type in (common.OpType.MOVE, common.OpType.BSDIFF):
812 if fail_src_extents:
813 self.AddToMessage(op.src_extents,
814 self.NewExtentList((0, 0)))
815 else:
816 self.AddToMessage(op.src_extents,
817 self.NewExtentList((0, 16)))
818 total_src_blocks = 16
819
820 if op_type != common.OpType.MOVE:
821 if not fail_mismatched_data_offset_length:
822 op.data_length = 16 * block_size - 8
823 if fail_prev_data_offset:
824 op.data_offset = prev_data_offset + 16
825 else:
826 op.data_offset = prev_data_offset
827
828 fake_data = 'fake-data'.ljust(op.data_length)
829 if not (allow_unhashed or (is_last and allow_signature and
830 op_type == common.OpType.REPLACE)):
831 if not fail_data_hash:
832 # Create a valid data blob hash.
833 op.data_sha256_hash = hashlib.sha256(fake_data).digest()
834 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
835 fake_data)
836 elif fail_data_hash:
837 # Create an invalid data blob hash.
838 op.data_sha256_hash = hashlib.sha256(
839 fake_data.replace(' ', '-')).digest()
840 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
841 fake_data)
842
843 total_dst_blocks = 0
844 if not fail_missing_dst_extents:
845 total_dst_blocks = 16
846 if fail_dst_extents:
847 self.AddToMessage(op.dst_extents,
848 self.NewExtentList((4, 16), (32, 0)))
849 else:
850 self.AddToMessage(op.dst_extents,
851 self.NewExtentList((4, 8), (64, 8)))
852
853 if total_src_blocks:
854 if fail_src_length:
855 op.src_length = total_src_blocks * block_size + 8
856 else:
857 op.src_length = total_src_blocks * block_size
858 elif fail_src_length:
859 # Add an orphaned src_length.
860 op.src_length = 16
861
862 if total_dst_blocks:
863 if fail_dst_length:
864 op.dst_length = total_dst_blocks * block_size + 8
865 else:
866 op.dst_length = total_dst_blocks * block_size
867
868 self.mox.ReplayAll()
869 should_fail = (fail_src_extents or fail_dst_extents or
870 fail_mismatched_data_offset_length or
871 fail_missing_dst_extents or fail_src_length or
872 fail_dst_length or fail_data_hash or fail_prev_data_offset)
873 largs = [op, 'foo', is_last, old_block_counters, new_block_counters,
874 old_part_size, new_part_size, prev_data_offset, allow_signature,
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700875 blob_hash_counts]
Gilad Arnold5502b562013-03-08 13:22:31 -0800876 if should_fail:
877 self.assertRaises(update_payload.PayloadError,
878 payload_checker._CheckOperation, *largs)
879 else:
880 self.assertEqual(payload_checker._CheckOperation(*largs),
881 op.data_length if op.HasField('data_length') else 0)
882
883 def testAllocBlockCounters(self):
884 """Tests _CheckMoveOperation()."""
885 payload_checker = checker.PayloadChecker(self.MockPayload())
886 block_size = payload_checker.block_size
887
888 # Check allocation for block-aligned partition size, ensure it's integers.
889 result = payload_checker._AllocBlockCounters(16 * block_size)
890 self.assertEqual(len(result), 16)
891 self.assertEqual(type(result[0]), int)
892
893 # Check allocation of unaligned partition sizes.
894 result = payload_checker._AllocBlockCounters(16 * block_size - 1)
895 self.assertEqual(len(result), 16)
896 result = payload_checker._AllocBlockCounters(16 * block_size + 1)
897 self.assertEqual(len(result), 17)
898
899 def DoCheckOperationsTest(self, fail_bad_type,
900 fail_nonexhaustive_full_update):
901 # Generate a test payload. For this test, we only care about one
902 # (arbitrary) set of operations, so we'll only be generating kernel and
903 # test with them.
904 payload_gen = test_utils.PayloadGenerator()
905
906 block_size = _KiB(4)
907 payload_gen.SetBlockSize(block_size)
908
909 rootfs_part_size = _MiB(8)
910
911 # Fake rootfs operations in a full update, tampered with as required.
912 rootfs_op_type = common.OpType.REPLACE
913 if fail_bad_type:
914 # Choose a type value that's bigger than the highest valid value.
915 for valid_op_type in common.OpType.ALL:
916 rootfs_op_type = max(rootfs_op_type, valid_op_type)
917 rootfs_op_type += 1
918
919 rootfs_data_length = rootfs_part_size
920 if fail_nonexhaustive_full_update:
921 rootfs_data_length -= block_size
922
923 payload_gen.AddOperation(False, rootfs_op_type,
924 dst_extents=[(0, rootfs_data_length / block_size)],
925 data_offset=0,
926 data_length=rootfs_data_length)
927
928 # Create the test object.
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700929 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile,
930 checker_init_dargs={
931 'allow_unhashed': True})
Gilad Arnold5502b562013-03-08 13:22:31 -0800932 payload_checker.payload_type = checker._TYPE_FULL
933 report = checker._PayloadReport()
934
935 should_fail = (fail_bad_type or fail_nonexhaustive_full_update)
936 largs = (payload_checker.payload.manifest.install_operations, report,
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700937 'foo', 0, rootfs_part_size, 0, False)
Gilad Arnold5502b562013-03-08 13:22:31 -0800938 if should_fail:
939 self.assertRaises(update_payload.PayloadError,
940 payload_checker._CheckOperations, *largs)
941 else:
942 self.assertEqual(payload_checker._CheckOperations(*largs),
943 rootfs_data_length)
944
945 def DoCheckSignaturesTest(self, fail_empty_sigs_blob, fail_missing_pseudo_op,
946 fail_mismatched_pseudo_op, fail_sig_missing_fields,
947 fail_unknown_sig_version, fail_incorrect_sig):
948 # Generate a test payload. For this test, we only care about the signature
949 # block and how it relates to the payload hash. Therefore, we're generating
950 # a random (otherwise useless) payload for this purpose.
951 payload_gen = test_utils.EnhancedPayloadGenerator()
952 block_size = _KiB(4)
953 payload_gen.SetBlockSize(block_size)
954 rootfs_part_size = _MiB(2)
955 payload_gen.SetPartInfo(False, True, rootfs_part_size,
956 hashlib.sha256('fake-new-rootfs-content').digest())
957 payload_gen.SetPartInfo(True, True, _KiB(16),
958 hashlib.sha256('fake-new-kernel-content').digest())
959 payload_gen.AddOperationWithData(
960 False, common.OpType.REPLACE,
961 dst_extents=[(0, rootfs_part_size / block_size)],
962 data_blob=os.urandom(rootfs_part_size))
963
964 do_forge_pseudo_op = (fail_missing_pseudo_op or fail_mismatched_pseudo_op)
965 do_forge_sigs_data = (do_forge_pseudo_op or fail_empty_sigs_blob or
966 fail_sig_missing_fields or fail_unknown_sig_version
967 or fail_incorrect_sig)
968
969 sigs_data = None
970 if do_forge_sigs_data:
971 sigs_gen = test_utils.SignaturesGenerator()
972 if not fail_empty_sigs_blob:
973 if fail_sig_missing_fields:
974 sig_data = None
975 else:
976 sig_data = test_utils.SignSha256('fake-payload-content',
977 _PRIVKEY_FILE_NAME)
978 sigs_gen.AddSig(5 if fail_unknown_sig_version else 1, sig_data)
979
980 sigs_data = sigs_gen.ToBinary()
981 payload_gen.SetSignatures(payload_gen.curr_offset, len(sigs_data))
982
983 if do_forge_pseudo_op:
984 assert sigs_data is not None, 'should have forged signatures blob by now'
985 sigs_len = len(sigs_data)
986 payload_gen.AddOperation(
987 False, common.OpType.REPLACE,
988 data_offset=payload_gen.curr_offset / 2,
989 data_length=sigs_len / 2,
990 dst_extents=[(0, (sigs_len / 2 + block_size - 1) / block_size)])
991
992 # Generate payload (complete w/ signature) and create the test object.
993 payload_checker = _GetPayloadChecker(
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700994 payload_gen.WriteToFileWithData,
995 payload_gen_dargs={
996 'sigs_data': sigs_data,
997 'privkey_file_name': _PRIVKEY_FILE_NAME,
998 'do_add_pseudo_operation': not do_forge_pseudo_op})
Gilad Arnold5502b562013-03-08 13:22:31 -0800999 payload_checker.payload_type = checker._TYPE_FULL
1000 report = checker._PayloadReport()
1001
1002 # We have to check the manifest first in order to set signature attributes.
1003 payload_checker._CheckManifest(report)
1004
1005 should_fail = (fail_empty_sigs_blob or fail_missing_pseudo_op or
1006 fail_mismatched_pseudo_op or fail_sig_missing_fields or
1007 fail_unknown_sig_version or fail_incorrect_sig)
1008 largs = (report, _PUBKEY_FILE_NAME)
1009 if should_fail:
1010 self.assertRaises(update_payload.PayloadError,
1011 payload_checker._CheckSignatures, *largs)
1012 else:
1013 self.assertIsNone(payload_checker._CheckSignatures(*largs))
1014
1015 def DoRunTest(self, fail_wrong_payload_type, fail_invalid_block_size,
1016 fail_mismatched_block_size, fail_excess_data):
1017 # Generate a test payload. For this test, we generate a full update that
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001018 # has sample kernel and rootfs operations. Since most testing is done with
Gilad Arnold5502b562013-03-08 13:22:31 -08001019 # internal PayloadChecker methods that are tested elsewhere, here we only
1020 # tamper with what's actually being manipulated and/or tested in the Run()
1021 # method itself. Note that the checker doesn't verify partition hashes, so
1022 # they're safe to fake.
1023 payload_gen = test_utils.EnhancedPayloadGenerator()
1024 block_size = _KiB(4)
1025 payload_gen.SetBlockSize(block_size)
1026 kernel_part_size = _KiB(16)
1027 rootfs_part_size = _MiB(2)
1028 payload_gen.SetPartInfo(False, True, rootfs_part_size,
1029 hashlib.sha256('fake-new-rootfs-content').digest())
1030 payload_gen.SetPartInfo(True, True, kernel_part_size,
1031 hashlib.sha256('fake-new-kernel-content').digest())
1032 payload_gen.AddOperationWithData(
1033 False, common.OpType.REPLACE,
1034 dst_extents=[(0, rootfs_part_size / block_size)],
1035 data_blob=os.urandom(rootfs_part_size))
1036 payload_gen.AddOperationWithData(
1037 True, common.OpType.REPLACE,
1038 dst_extents=[(0, kernel_part_size / block_size)],
1039 data_blob=os.urandom(kernel_part_size))
1040
1041 # Generate payload (complete w/ signature) and create the test object.
Gilad Arnold5502b562013-03-08 13:22:31 -08001042 if fail_invalid_block_size:
1043 use_block_size = block_size + 5 # not a power of two
1044 elif fail_mismatched_block_size:
1045 use_block_size = block_size * 2 # different that payload stated
1046 else:
1047 use_block_size = block_size
Gilad Arnold5502b562013-03-08 13:22:31 -08001048
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001049 dargs = {
1050 'payload_gen_dargs': {
1051 'privkey_file_name': _PRIVKEY_FILE_NAME,
1052 'do_add_pseudo_operation': True,
1053 'is_pseudo_in_kernel': True,
1054 'padding': os.urandom(1024) if fail_excess_data else None},
1055 'checker_init_dargs': {
1056 'assert_type': 'delta' if fail_wrong_payload_type else 'full',
1057 'block_size': use_block_size}}
1058 if fail_invalid_block_size:
1059 self.assertRaises(update_payload.PayloadError, _GetPayloadChecker,
1060 payload_gen.WriteToFileWithData, **dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -08001061 else:
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001062 payload_checker = _GetPayloadChecker(payload_gen.WriteToFileWithData,
1063 **dargs)
1064 dargs = {'pubkey_file_name': _PUBKEY_FILE_NAME}
1065 should_fail = (fail_wrong_payload_type or fail_mismatched_block_size or
1066 fail_excess_data)
1067 if should_fail:
1068 self.assertRaises(update_payload.PayloadError,
1069 payload_checker.Run, **dargs)
1070 else:
1071 self.assertIsNone(payload_checker.Run(**dargs))
Gilad Arnold5502b562013-03-08 13:22:31 -08001072
1073
1074# This implements a generic API, hence the occasional unused args.
1075# pylint: disable=W0613
1076def ValidateCheckOperationTest(op_type_name, is_last, allow_signature,
1077 allow_unhashed, fail_src_extents,
1078 fail_dst_extents,
1079 fail_mismatched_data_offset_length,
1080 fail_missing_dst_extents, fail_src_length,
1081 fail_dst_length, fail_data_hash,
1082 fail_prev_data_offset):
1083 """Returns True iff the combination of arguments represents a valid test."""
1084 op_type = _OpTypeByName(op_type_name)
1085
1086 # REPLACE/REPLACE_BZ operations don't read data from src partition.
1087 if (op_type in (common.OpType.REPLACE, common.OpType.REPLACE_BZ) and (
1088 fail_src_extents or fail_src_length)):
1089 return False
1090
1091 # MOVE operations don't carry data.
1092 if (op_type == common.OpType.MOVE and (
1093 fail_mismatched_data_offset_length or fail_data_hash or
1094 fail_prev_data_offset)):
1095 return False
1096
1097 return True
1098
1099
1100def TestMethodBody(run_method_name, run_dargs):
1101 """Returns a function that invokes a named method with named arguments."""
1102 return lambda self: getattr(self, run_method_name)(**run_dargs)
1103
1104
1105def AddParametricTests(tested_method_name, arg_space, validate_func=None):
1106 """Enumerates and adds specific parametric tests to PayloadCheckerTest.
1107
1108 This function enumerates a space of test parameters (defined by arg_space),
1109 then binds a new, unique method name in PayloadCheckerTest to a test function
1110 that gets handed the said parameters. This is a preferable approach to doing
1111 the enumeration and invocation during the tests because this way each test is
1112 treated as a complete run by the unittest framework, and so benefits from the
1113 usual setUp/tearDown mechanics.
1114
1115 Args:
1116 tested_method_name: name of the tested PayloadChecker method
1117 arg_space: a dictionary containing variables (keys) and lists of values
1118 (values) associated with them
1119 validate_func: a function used for validating test argument combinations
1120
1121 """
1122 for value_tuple in itertools.product(*arg_space.itervalues()):
1123 run_dargs = dict(zip(arg_space.iterkeys(), value_tuple))
1124 if validate_func and not validate_func(**run_dargs):
1125 continue
1126 run_method_name = 'Do%sTest' % tested_method_name
1127 test_method_name = 'test%s' % tested_method_name
1128 for arg_key, arg_val in run_dargs.iteritems():
1129 if arg_val or type(arg_val) is int:
1130 test_method_name += '__%s=%s' % (arg_key, arg_val)
1131 setattr(PayloadCheckerTest, test_method_name,
1132 TestMethodBody(run_method_name, run_dargs))
1133
1134
1135def AddAllParametricTests():
1136 """Enumerates and adds all parametric tests to PayloadCheckerTest."""
1137 # Add all _CheckElem() test cases.
1138 AddParametricTests('AddElem',
1139 {'linebreak': (True, False),
1140 'indent': (0, 1, 2),
1141 'convert': (str, lambda s: s[::-1]),
1142 'is_present': (True, False),
1143 'is_mandatory': (True, False),
1144 'is_submsg': (True, False)})
1145
1146 # Add all _Add{Mandatory,Optional}Field tests.
1147 AddParametricTests('AddField',
1148 {'is_mandatory': (True, False),
1149 'linebreak': (True, False),
1150 'indent': (0, 1, 2),
1151 'convert': (str, lambda s: s[::-1]),
1152 'is_present': (True, False)})
1153
1154 # Add all _Add{Mandatory,Optional}SubMsg tests.
1155 AddParametricTests('AddSubMsg',
1156 {'is_mandatory': (True, False),
1157 'is_present': (True, False)})
1158
1159 # Add all _CheckManifest() test cases.
1160 AddParametricTests('CheckManifest',
1161 {'fail_mismatched_block_size': (True, False),
1162 'fail_bad_sigs': (True, False),
1163 'fail_mismatched_oki_ori': (True, False),
1164 'fail_bad_oki': (True, False),
1165 'fail_bad_ori': (True, False),
1166 'fail_bad_nki': (True, False),
1167 'fail_bad_nri': (True, False),
1168 'fail_missing_ops': (True, False)})
1169
1170 # Add all _CheckOperation() test cases.
1171 AddParametricTests('CheckOperation',
1172 {'op_type_name': ('REPLACE', 'REPLACE_BZ', 'MOVE',
1173 'BSDIFF'),
1174 'is_last': (True, False),
1175 'allow_signature': (True, False),
1176 'allow_unhashed': (True, False),
1177 'fail_src_extents': (True, False),
1178 'fail_dst_extents': (True, False),
1179 'fail_mismatched_data_offset_length': (True, False),
1180 'fail_missing_dst_extents': (True, False),
1181 'fail_src_length': (True, False),
1182 'fail_dst_length': (True, False),
1183 'fail_data_hash': (True, False),
1184 'fail_prev_data_offset': (True, False)},
1185 validate_func=ValidateCheckOperationTest)
1186
1187 # Add all _CheckOperations() test cases.
1188 AddParametricTests('CheckOperations',
1189 {'fail_bad_type': (True, False),
1190 'fail_nonexhaustive_full_update': (True, False)})
1191
1192 # Add all _CheckOperations() test cases.
1193 AddParametricTests('CheckSignatures',
1194 {'fail_empty_sigs_blob': (True, False),
1195 'fail_missing_pseudo_op': (True, False),
1196 'fail_mismatched_pseudo_op': (True, False),
1197 'fail_sig_missing_fields': (True, False),
1198 'fail_unknown_sig_version': (True, False),
1199 'fail_incorrect_sig': (True, False)})
1200
1201 # Add all Run() test cases.
1202 AddParametricTests('Run',
1203 {'fail_wrong_payload_type': (True, False),
1204 'fail_invalid_block_size': (True, False),
1205 'fail_mismatched_block_size': (True, False),
1206 'fail_excess_data': (True, False)})
1207
1208
1209if __name__ == '__main__':
1210 AddAllParametricTests()
1211 unittest.main()