blob: 4b23bd82070d209f48ccb1f02cc305ba863afab7 [file] [log] [blame]
Gilad Arnold5502b562013-03-08 13:22:31 -08001#!/usr/bin/python
2#
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Unit testing checker.py."""
8
9import array
10import collections
11import cStringIO
12import hashlib
13import itertools
14import os
15import unittest
16
17# Pylint cannot find mox.
18# pylint: disable=F0401
19import mox
20
21import checker
22import common
23import payload as update_payload # avoid name conflicts later.
24import test_utils
25import update_metadata_pb2
26
27
Gilad Arnold5502b562013-03-08 13:22:31 -080028def _OpTypeByName(op_name):
29 op_name_to_type = {
30 'REPLACE': common.OpType.REPLACE,
31 'REPLACE_BZ': common.OpType.REPLACE_BZ,
32 'MOVE': common.OpType.MOVE,
33 'BSDIFF': common.OpType.BSDIFF,
34 }
35 return op_name_to_type[op_name]
36
37
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070038def _GetPayloadChecker(payload_gen_write_to_file_func, payload_gen_dargs=None,
39 checker_init_dargs=None):
Gilad Arnold5502b562013-03-08 13:22:31 -080040 """Returns a payload checker from a given payload generator."""
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070041 if payload_gen_dargs is None:
42 payload_gen_dargs = {}
43 if checker_init_dargs is None:
44 checker_init_dargs = {}
45
Gilad Arnold5502b562013-03-08 13:22:31 -080046 payload_file = cStringIO.StringIO()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070047 payload_gen_write_to_file_func(payload_file, **payload_gen_dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -080048 payload_file.seek(0)
49 payload = update_payload.Payload(payload_file)
50 payload.Init()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070051 return checker.PayloadChecker(payload, **checker_init_dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -080052
53
54def _GetPayloadCheckerWithData(payload_gen):
55 """Returns a payload checker from a given payload generator."""
56 payload_file = cStringIO.StringIO()
57 payload_gen.WriteToFile(payload_file)
58 payload_file.seek(0)
59 payload = update_payload.Payload(payload_file)
60 payload.Init()
61 return checker.PayloadChecker(payload)
62
63
64# (i) this class doesn't need an __init__(); (ii) unit testing is all about
65# running protected methods; (iii) don't bark about missing members of classes
66# you cannot import.
67# pylint: disable=W0232
68# pylint: disable=W0212
69# pylint: disable=E1101
70class PayloadCheckerTest(mox.MoxTestBase):
71 """Tests the PayloadChecker class.
72
73 In addition to ordinary testFoo() methods, which are automatically invoked by
74 the unittest framework, in this class we make use of DoBarTest() calls that
75 implement parametric tests of certain features. In order to invoke each test,
76 which embodies a unique combination of parameter values, as a complete unit
77 test, we perform explicit enumeration of the parameter space and create
78 individual invocation contexts for each, which are then bound as
79 testBar__param1=val1__param2=val2(). The enumeration of parameter spaces for
80 all such tests is done in AddAllParametricTests().
81
82 """
83
84 def MockPayload(self):
85 """Create a mock payload object, complete with a mock menifest."""
86 payload = self.mox.CreateMock(update_payload.Payload)
87 payload.is_init = True
88 payload.manifest = self.mox.CreateMock(
89 update_metadata_pb2.DeltaArchiveManifest)
90 return payload
91
92 @staticmethod
93 def NewExtent(start_block, num_blocks):
94 """Returns an Extent message.
95
96 Each of the provided fields is set iff it is >= 0; otherwise, it's left at
97 its default state.
98
99 Args:
100 start_block: the starting block of the extent
101 num_blocks: the number of blocks in the extent
102 Returns:
103 An Extent message.
104
105 """
106 ex = update_metadata_pb2.Extent()
107 if start_block >= 0:
108 ex.start_block = start_block
109 if num_blocks >= 0:
110 ex.num_blocks = num_blocks
111 return ex
112
113 @staticmethod
114 def NewExtentList(*args):
115 """Returns an list of extents.
116
117 Args:
118 *args: (start_block, num_blocks) pairs defining the extents
119 Returns:
120 A list of Extent objects.
121
122 """
123 ex_list = []
124 for start_block, num_blocks in args:
125 ex_list.append(PayloadCheckerTest.NewExtent(start_block, num_blocks))
126 return ex_list
127
128 @staticmethod
129 def AddToMessage(repeated_field, field_vals):
130 for field_val in field_vals:
131 new_field = repeated_field.add()
132 new_field.CopyFrom(field_val)
133
134 def assertIsNone(self, val):
135 """Asserts that val is None (TODO remove once we upgrade to Python 2.7).
136
137 Note that we're using assertEqual so as for it to show us the actual
138 non-None value.
139
140 Args:
141 val: value/object to be equated to None
142
143 """
144 self.assertEqual(val, None)
145
146 def SetupAddElemTest(self, is_present, is_submsg, convert=str,
147 linebreak=False, indent=0):
148 """Setup for testing of _CheckElem() and its derivatives.
149
150 Args:
151 is_present: whether or not the element is found in the message
152 is_submsg: whether the element is a sub-message itself
153 convert: a representation conversion function
154 linebreak: whether or not a linebreak is to be used in the report
155 indent: indentation used for the report
156 Returns:
157 msg: a mock message object
158 report: a mock report object
159 subreport: a mock sub-report object
160 name: an element name to check
161 val: expected element value
162
163 """
164 name = 'foo'
165 val = 'fake submsg' if is_submsg else 'fake field'
166 subreport = 'fake subreport'
167
168 # Create a mock message.
169 msg = self.mox.CreateMock(update_metadata_pb2.message.Message)
170 msg.HasField(name).AndReturn(is_present)
171 setattr(msg, name, val)
172
173 # Create a mock report.
174 report = self.mox.CreateMock(checker._PayloadReport)
175 if is_present:
176 if is_submsg:
177 report.AddSubReport(name).AndReturn(subreport)
178 else:
179 report.AddField(name, convert(val), linebreak=linebreak, indent=indent)
180
181 self.mox.ReplayAll()
182 return (msg, report, subreport, name, val)
183
184 def DoAddElemTest(self, is_present, is_mandatory, is_submsg, convert,
185 linebreak, indent):
186 """Parametric testing of _CheckElem().
187
188 Args:
189 is_present: whether or not the element is found in the message
190 is_mandatory: whether or not it's a mandatory element
191 is_submsg: whether the element is a sub-message itself
192 convert: a representation conversion function
193 linebreak: whether or not a linebreak is to be used in the report
194 indent: indentation used for the report
195
196 """
197 msg, report, subreport, name, val = self.SetupAddElemTest(
198 is_present, is_submsg, convert, linebreak, indent)
199
200 largs = [msg, name, report, is_mandatory, is_submsg]
201 dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
202 if is_mandatory and not is_present:
203 self.assertRaises(update_payload.PayloadError,
204 checker.PayloadChecker._CheckElem, *largs, **dargs)
205 else:
206 ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*largs,
207 **dargs)
208 self.assertEquals(ret_val, val if is_present else None)
209 self.assertEquals(ret_subreport,
210 subreport if is_present and is_submsg else None)
211
212 def DoAddFieldTest(self, is_mandatory, is_present, convert, linebreak,
213 indent):
214 """Parametric testing of _Check{Mandatory,Optional}Field().
215
216 Args:
217 is_mandatory: whether we're testing a mandatory call
218 is_present: whether or not the element is found in the message
219 convert: a representation conversion function
220 linebreak: whether or not a linebreak is to be used in the report
221 indent: indentation used for the report
222
223 """
224 msg, report, _, name, val = self.SetupAddElemTest(
225 is_present, False, convert, linebreak, indent)
226
227 # Prepare for invocation of the tested method.
228 largs = [msg, name, report]
229 dargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
230 if is_mandatory:
231 largs.append('bar')
232 tested_func = checker.PayloadChecker._CheckMandatoryField
233 else:
234 tested_func = checker.PayloadChecker._CheckOptionalField
235
236 # Test the method call.
237 if is_mandatory and not is_present:
238 self.assertRaises(update_payload.PayloadError, tested_func, *largs,
239 **dargs)
240 else:
241 ret_val = tested_func(*largs, **dargs)
242 self.assertEquals(ret_val, val if is_present else None)
243
244 def DoAddSubMsgTest(self, is_mandatory, is_present):
245 """Parametrized testing of _Check{Mandatory,Optional}SubMsg().
246
247 Args:
248 is_mandatory: whether we're testing a mandatory call
249 is_present: whether or not the element is found in the message
250
251 """
252 msg, report, subreport, name, val = self.SetupAddElemTest(is_present, True)
253
254 # Prepare for invocation of the tested method.
255 largs = [msg, name, report]
256 if is_mandatory:
257 largs.append('bar')
258 tested_func = checker.PayloadChecker._CheckMandatorySubMsg
259 else:
260 tested_func = checker.PayloadChecker._CheckOptionalSubMsg
261
262 # Test the method call.
263 if is_mandatory and not is_present:
264 self.assertRaises(update_payload.PayloadError, tested_func, *largs)
265 else:
266 ret_val, ret_subreport = tested_func(*largs)
267 self.assertEquals(ret_val, val if is_present else None)
268 self.assertEquals(ret_subreport, subreport if is_present else None)
269
270 def testCheckPresentIff(self):
271 """Tests _CheckPresentIff()."""
272 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
273 None, None, 'foo', 'bar', 'baz'))
274 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
275 'a', 'b', 'foo', 'bar', 'baz'))
276 self.assertRaises(update_payload.PayloadError,
277 checker.PayloadChecker._CheckPresentIff,
278 'a', None, 'foo', 'bar', 'baz')
279 self.assertRaises(update_payload.PayloadError,
280 checker.PayloadChecker._CheckPresentIff,
281 None, 'b', 'foo', 'bar', 'baz')
282
283 def DoCheckSha256SignatureTest(self, expect_pass, expect_subprocess_call,
284 sig_data, sig_asn1_header,
285 returned_signed_hash, expected_signed_hash):
286 """Parametric testing of _CheckSha256SignatureTest().
287
288 Args:
289 expect_pass: whether or not it should pass
290 expect_subprocess_call: whether to expect the openssl call to happen
291 sig_data: the signature raw data
292 sig_asn1_header: the ASN1 header
293 returned_signed_hash: the signed hash data retuned by openssl
294 expected_signed_hash: the signed hash data to compare against
295
296 """
297 # Stub out the subprocess invocation.
298 self.mox.StubOutWithMock(checker.PayloadChecker, '_Run')
299 if expect_subprocess_call:
300 checker.PayloadChecker._Run(mox.IsA(list), send_data=sig_data).AndReturn(
301 (sig_asn1_header + returned_signed_hash, None))
302
303 self.mox.ReplayAll()
304 if expect_pass:
305 self.assertIsNone(checker.PayloadChecker._CheckSha256Signature(
306 sig_data, 'foo', expected_signed_hash, 'bar'))
307 else:
308 self.assertRaises(update_payload.PayloadError,
309 checker.PayloadChecker._CheckSha256Signature,
310 sig_data, 'foo', expected_signed_hash, 'bar')
311
312 self.mox.UnsetStubs()
313
314 def testCheckSha256Signature_Pass(self):
315 """Tests _CheckSha256Signature(); pass case."""
316 sig_data = 'fake-signature'.ljust(256)
317 signed_hash = hashlib.sha256('fake-data').digest()
318 self.DoCheckSha256SignatureTest(True, True, sig_data,
319 common.SIG_ASN1_HEADER, signed_hash,
320 signed_hash)
321
322 def testCheckSha256Signature_FailBadSignature(self):
323 """Tests _CheckSha256Signature(); fails due to malformed signature."""
324 sig_data = 'fake-signature' # malformed (not 256 bytes in length)
325 signed_hash = hashlib.sha256('fake-data').digest()
326 self.DoCheckSha256SignatureTest(False, False, sig_data,
327 common.SIG_ASN1_HEADER, signed_hash,
328 signed_hash)
329
330 def testCheckSha256Signature_FailBadOutputLength(self):
331 """Tests _CheckSha256Signature(); fails due to unexpected output length."""
332 sig_data = 'fake-signature'.ljust(256)
333 signed_hash = 'fake-hash' # malformed (not 32 bytes in length)
334 self.DoCheckSha256SignatureTest(False, True, sig_data,
335 common.SIG_ASN1_HEADER, signed_hash,
336 signed_hash)
337
338 def testCheckSha256Signature_FailBadAsnHeader(self):
339 """Tests _CheckSha256Signature(); fails due to bad ASN1 header."""
340 sig_data = 'fake-signature'.ljust(256)
341 signed_hash = hashlib.sha256('fake-data').digest()
342 bad_asn1_header = 'bad-asn-header'.ljust(len(common.SIG_ASN1_HEADER))
343 self.DoCheckSha256SignatureTest(False, True, sig_data, bad_asn1_header,
344 signed_hash, signed_hash)
345
346 def testCheckSha256Signature_FailBadHash(self):
347 """Tests _CheckSha256Signature(); fails due to bad hash returned."""
348 sig_data = 'fake-signature'.ljust(256)
349 expected_signed_hash = hashlib.sha256('fake-data').digest()
350 returned_signed_hash = hashlib.sha256('bad-fake-data').digest()
351 self.DoCheckSha256SignatureTest(False, True, sig_data,
352 common.SIG_ASN1_HEADER,
353 expected_signed_hash, returned_signed_hash)
354
355 def testCheckBlocksFitLength_Pass(self):
356 """Tests _CheckBlocksFitLength(); pass case."""
357 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
358 64, 4, 16, 'foo'))
359 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
360 60, 4, 16, 'foo'))
361 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
362 49, 4, 16, 'foo'))
363 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
364 48, 3, 16, 'foo'))
365
366 def testCheckBlocksFitLength_TooManyBlocks(self):
367 """Tests _CheckBlocksFitLength(); fails due to excess blocks."""
368 self.assertRaises(update_payload.PayloadError,
369 checker.PayloadChecker._CheckBlocksFitLength,
370 64, 5, 16, 'foo')
371 self.assertRaises(update_payload.PayloadError,
372 checker.PayloadChecker._CheckBlocksFitLength,
373 60, 5, 16, 'foo')
374 self.assertRaises(update_payload.PayloadError,
375 checker.PayloadChecker._CheckBlocksFitLength,
376 49, 5, 16, 'foo')
377 self.assertRaises(update_payload.PayloadError,
378 checker.PayloadChecker._CheckBlocksFitLength,
379 48, 4, 16, 'foo')
380
381 def testCheckBlocksFitLength_TooFewBlocks(self):
382 """Tests _CheckBlocksFitLength(); fails due to insufficient blocks."""
383 self.assertRaises(update_payload.PayloadError,
384 checker.PayloadChecker._CheckBlocksFitLength,
385 64, 3, 16, 'foo')
386 self.assertRaises(update_payload.PayloadError,
387 checker.PayloadChecker._CheckBlocksFitLength,
388 60, 3, 16, 'foo')
389 self.assertRaises(update_payload.PayloadError,
390 checker.PayloadChecker._CheckBlocksFitLength,
391 49, 3, 16, 'foo')
392 self.assertRaises(update_payload.PayloadError,
393 checker.PayloadChecker._CheckBlocksFitLength,
394 48, 2, 16, 'foo')
395
396 def DoCheckManifestTest(self, fail_mismatched_block_size, fail_bad_sigs,
397 fail_mismatched_oki_ori, fail_bad_oki, fail_bad_ori,
Gilad Arnold382df5c2013-05-03 12:49:28 -0700398 fail_bad_nki, fail_bad_nri, fail_missing_ops,
399 fail_old_kernel_fs_size, fail_old_rootfs_fs_size,
400 fail_new_kernel_fs_size, fail_new_rootfs_fs_size):
Gilad Arnold5502b562013-03-08 13:22:31 -0800401 """Parametric testing of _CheckManifest().
402
403 Args:
404 fail_mismatched_block_size: simulate a missing block_size field
405 fail_bad_sigs: make signatures descriptor inconsistent
406 fail_mismatched_oki_ori: make old rootfs/kernel info partially present
407 fail_bad_oki: tamper with old kernel info
408 fail_bad_ori: tamper with old rootfs info
409 fail_bad_nki: tamper with new kernel info
410 fail_bad_nri: tamper with new rootfs info
411 fail_missing_ops: simulate a manifest without any operations
Gilad Arnold382df5c2013-05-03 12:49:28 -0700412 fail_old_kernel_fs_size: make old kernel fs size too big
413 fail_old_rootfs_fs_size: make old rootfs fs size too big
414 fail_new_kernel_fs_size: make new kernel fs size too big
415 fail_new_rootfs_fs_size: make new rootfs fs size too big
Gilad Arnold5502b562013-03-08 13:22:31 -0800416
417 """
418 # Generate a test payload. For this test, we only care about the manifest
419 # and don't need any data blobs, hence we can use a plain paylaod generator
420 # (which also gives us more control on things that can be screwed up).
421 payload_gen = test_utils.PayloadGenerator()
422
423 # Tamper with block size, if required.
424 if fail_mismatched_block_size:
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700425 payload_gen.SetBlockSize(test_utils.KiB(1))
Gilad Arnold5502b562013-03-08 13:22:31 -0800426 else:
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700427 payload_gen.SetBlockSize(test_utils.KiB(4))
Gilad Arnold5502b562013-03-08 13:22:31 -0800428
429 # Add some operations.
430 if not fail_missing_ops:
431 payload_gen.AddOperation(False, common.OpType.MOVE,
432 src_extents=[(0, 16), (16, 497)],
433 dst_extents=[(16, 496), (0, 16)])
434 payload_gen.AddOperation(True, common.OpType.MOVE,
435 src_extents=[(0, 8), (8, 8)],
436 dst_extents=[(8, 8), (0, 8)])
437
438 # Set an invalid signatures block (offset but no size), if required.
439 if fail_bad_sigs:
440 payload_gen.SetSignatures(32, None)
441
Gilad Arnold382df5c2013-05-03 12:49:28 -0700442 # Set partition / filesystem sizes.
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700443 rootfs_part_size = test_utils.MiB(8)
444 kernel_part_size = test_utils.KiB(512)
Gilad Arnold382df5c2013-05-03 12:49:28 -0700445 old_rootfs_fs_size = new_rootfs_fs_size = rootfs_part_size
446 old_kernel_fs_size = new_kernel_fs_size = kernel_part_size
447 if fail_old_kernel_fs_size:
448 old_kernel_fs_size += 100
449 if fail_old_rootfs_fs_size:
450 old_rootfs_fs_size += 100
451 if fail_new_kernel_fs_size:
452 new_kernel_fs_size += 100
453 if fail_new_rootfs_fs_size:
454 new_rootfs_fs_size += 100
455
Gilad Arnold5502b562013-03-08 13:22:31 -0800456 # Add old kernel/rootfs partition info, as required.
Gilad Arnold382df5c2013-05-03 12:49:28 -0700457 if fail_mismatched_oki_ori or fail_old_kernel_fs_size or fail_bad_oki:
Gilad Arnold5502b562013-03-08 13:22:31 -0800458 oki_hash = (None if fail_bad_oki
459 else hashlib.sha256('fake-oki-content').digest())
Gilad Arnold382df5c2013-05-03 12:49:28 -0700460 payload_gen.SetPartInfo(True, False, old_kernel_fs_size, oki_hash)
461 if not fail_mismatched_oki_ori and (fail_old_rootfs_fs_size or
462 fail_bad_ori):
463 ori_hash = (None if fail_bad_ori
464 else hashlib.sha256('fake-ori-content').digest())
465 payload_gen.SetPartInfo(False, False, old_rootfs_fs_size, ori_hash)
Gilad Arnold5502b562013-03-08 13:22:31 -0800466
467 # Add new kernel/rootfs partition info.
468 payload_gen.SetPartInfo(
Gilad Arnold382df5c2013-05-03 12:49:28 -0700469 True, True, new_kernel_fs_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800470 None if fail_bad_nki else hashlib.sha256('fake-nki-content').digest())
471 payload_gen.SetPartInfo(
Gilad Arnold382df5c2013-05-03 12:49:28 -0700472 False, True, new_rootfs_fs_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800473 None if fail_bad_nri else hashlib.sha256('fake-nri-content').digest())
474
475 # Create the test object.
476 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile)
477 report = checker._PayloadReport()
478
479 should_fail = (fail_mismatched_block_size or fail_bad_sigs or
480 fail_mismatched_oki_ori or fail_bad_oki or fail_bad_ori or
Gilad Arnold382df5c2013-05-03 12:49:28 -0700481 fail_bad_nki or fail_bad_nri or fail_missing_ops or
482 fail_old_kernel_fs_size or fail_old_rootfs_fs_size or
483 fail_new_kernel_fs_size or fail_new_rootfs_fs_size)
Gilad Arnold5502b562013-03-08 13:22:31 -0800484 if should_fail:
485 self.assertRaises(update_payload.PayloadError,
Gilad Arnold382df5c2013-05-03 12:49:28 -0700486 payload_checker._CheckManifest, report,
487 rootfs_part_size, kernel_part_size)
Gilad Arnold5502b562013-03-08 13:22:31 -0800488 else:
Gilad Arnold382df5c2013-05-03 12:49:28 -0700489 self.assertIsNone(payload_checker._CheckManifest(report,
490 rootfs_part_size,
491 kernel_part_size))
Gilad Arnold5502b562013-03-08 13:22:31 -0800492
493 def testCheckLength(self):
494 """Tests _CheckLength()."""
495 payload_checker = checker.PayloadChecker(self.MockPayload())
496 block_size = payload_checker.block_size
497
498 # Passes.
499 self.assertIsNone(payload_checker._CheckLength(
500 int(3.5 * block_size), 4, 'foo', 'bar'))
501 # Fails, too few blocks.
502 self.assertRaises(update_payload.PayloadError,
503 payload_checker._CheckLength,
504 int(3.5 * block_size), 3, 'foo', 'bar')
505 # Fails, too many blocks.
506 self.assertRaises(update_payload.PayloadError,
507 payload_checker._CheckLength,
508 int(3.5 * block_size), 5, 'foo', 'bar')
509
510 def testCheckExtents(self):
511 """Tests _CheckExtents()."""
512 payload_checker = checker.PayloadChecker(self.MockPayload())
513 block_size = payload_checker.block_size
514
515 # Passes w/ all real extents.
516 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
517 self.assertEquals(
518 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
519 collections.defaultdict(int), 'foo'),
520 23)
521
522 # Passes w/ pseudo-extents (aka sparse holes).
523 extents = self.NewExtentList((0, 4), (common.PSEUDO_EXTENT_MARKER, 5),
524 (8, 3))
525 self.assertEquals(
526 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
527 collections.defaultdict(int), 'foo',
528 allow_pseudo=True),
529 12)
530
531 # Passes w/ pseudo-extent due to a signature.
532 extents = self.NewExtentList((common.PSEUDO_EXTENT_MARKER, 2))
533 self.assertEquals(
534 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
535 collections.defaultdict(int), 'foo',
536 allow_signature=True),
537 2)
538
539 # Fails, extent missing a start block.
540 extents = self.NewExtentList((-1, 4), (8, 3), (1024, 16))
541 self.assertRaises(
542 update_payload.PayloadError, payload_checker._CheckExtents,
543 extents, (1024 + 16) * block_size, collections.defaultdict(int),
544 'foo')
545
546 # Fails, extent missing block count.
547 extents = self.NewExtentList((0, -1), (8, 3), (1024, 16))
548 self.assertRaises(
549 update_payload.PayloadError, payload_checker._CheckExtents,
550 extents, (1024 + 16) * block_size, collections.defaultdict(int),
551 'foo')
552
553 # Fails, extent has zero blocks.
554 extents = self.NewExtentList((0, 4), (8, 3), (1024, 0))
555 self.assertRaises(
556 update_payload.PayloadError, payload_checker._CheckExtents,
557 extents, (1024 + 16) * block_size, collections.defaultdict(int),
558 'foo')
559
560 # Fails, extent exceeds partition boundaries.
561 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
562 self.assertRaises(
563 update_payload.PayloadError, payload_checker._CheckExtents,
564 extents, (1024 + 15) * block_size, collections.defaultdict(int),
565 'foo')
566
567 def testCheckReplaceOperation(self):
568 """Tests _CheckReplaceOperation() where op.type == REPLACE."""
569 payload_checker = checker.PayloadChecker(self.MockPayload())
570 block_size = payload_checker.block_size
571 data_length = 10000
572
573 op = self.mox.CreateMock(
574 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
575 op.type = common.OpType.REPLACE
576
577 # Pass.
578 op.src_extents = []
579 self.assertIsNone(
580 payload_checker._CheckReplaceOperation(
581 op, data_length, (data_length + block_size - 1) / block_size,
582 'foo'))
583
584 # Fail, src extents founds.
585 op.src_extents = ['bar']
586 self.assertRaises(
587 update_payload.PayloadError,
588 payload_checker._CheckReplaceOperation,
589 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
590
591 # Fail, missing data.
592 op.src_extents = []
593 self.assertRaises(
594 update_payload.PayloadError,
595 payload_checker._CheckReplaceOperation,
596 op, None, (data_length + block_size - 1) / block_size, 'foo')
597
598 # Fail, length / block number mismatch.
599 op.src_extents = ['bar']
600 self.assertRaises(
601 update_payload.PayloadError,
602 payload_checker._CheckReplaceOperation,
603 op, data_length, (data_length + block_size - 1) / block_size + 1, 'foo')
604
605 def testCheckReplaceBzOperation(self):
606 """Tests _CheckReplaceOperation() where op.type == REPLACE_BZ."""
607 payload_checker = checker.PayloadChecker(self.MockPayload())
608 block_size = payload_checker.block_size
609 data_length = block_size * 3
610
611 op = self.mox.CreateMock(
612 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
613 op.type = common.OpType.REPLACE_BZ
614
615 # Pass.
616 op.src_extents = []
617 self.assertIsNone(
618 payload_checker._CheckReplaceOperation(
619 op, data_length, (data_length + block_size - 1) / block_size + 5,
620 'foo'))
621
622 # Fail, src extents founds.
623 op.src_extents = ['bar']
624 self.assertRaises(
625 update_payload.PayloadError,
626 payload_checker._CheckReplaceOperation,
627 op, data_length, (data_length + block_size - 1) / block_size + 5, 'foo')
628
629 # Fail, missing data.
630 op.src_extents = []
631 self.assertRaises(
632 update_payload.PayloadError,
633 payload_checker._CheckReplaceOperation,
634 op, None, (data_length + block_size - 1) / block_size, 'foo')
635
636 # Fail, too few blocks to justify BZ.
637 op.src_extents = []
638 self.assertRaises(
639 update_payload.PayloadError,
640 payload_checker._CheckReplaceOperation,
641 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
642
643 def testCheckMoveOperation_Pass(self):
644 """Tests _CheckMoveOperation(); pass case."""
645 payload_checker = checker.PayloadChecker(self.MockPayload())
646 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
647 op.type = common.OpType.MOVE
648
649 self.AddToMessage(op.src_extents,
650 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
651 self.AddToMessage(op.dst_extents,
652 self.NewExtentList((16, 128), (512, 6)))
653 self.assertIsNone(
654 payload_checker._CheckMoveOperation(op, None, 134, 134, 'foo'))
655
656 def testCheckMoveOperation_FailContainsData(self):
657 """Tests _CheckMoveOperation(); fails, message contains data."""
658 payload_checker = checker.PayloadChecker(self.MockPayload())
659 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
660 op.type = common.OpType.MOVE
661
662 self.AddToMessage(op.src_extents,
663 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
664 self.AddToMessage(op.dst_extents,
665 self.NewExtentList((16, 128), (512, 6)))
666 self.assertRaises(
667 update_payload.PayloadError,
668 payload_checker._CheckMoveOperation,
669 op, 1024, 134, 134, 'foo')
670
671 def testCheckMoveOperation_FailInsufficientSrcBlocks(self):
672 """Tests _CheckMoveOperation(); fails, not enough actual src blocks."""
673 payload_checker = checker.PayloadChecker(self.MockPayload())
674 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
675 op.type = common.OpType.MOVE
676
677 self.AddToMessage(op.src_extents,
678 self.NewExtentList((0, 4), (12, 2), (1024, 127)))
679 self.AddToMessage(op.dst_extents,
680 self.NewExtentList((16, 128), (512, 6)))
681 self.assertRaises(
682 update_payload.PayloadError,
683 payload_checker._CheckMoveOperation,
684 op, None, 134, 134, 'foo')
685
686 def testCheckMoveOperation_FailInsufficientDstBlocks(self):
687 """Tests _CheckMoveOperation(); fails, not enough actual dst blocks."""
688 payload_checker = checker.PayloadChecker(self.MockPayload())
689 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
690 op.type = common.OpType.MOVE
691
692 self.AddToMessage(op.src_extents,
693 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
694 self.AddToMessage(op.dst_extents,
695 self.NewExtentList((16, 128), (512, 5)))
696 self.assertRaises(
697 update_payload.PayloadError,
698 payload_checker._CheckMoveOperation,
699 op, None, 134, 134, 'foo')
700
701 def testCheckMoveOperation_FailExcessSrcBlocks(self):
702 """Tests _CheckMoveOperation(); fails, too many actual src blocks."""
703 payload_checker = checker.PayloadChecker(self.MockPayload())
704 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
705 op.type = common.OpType.MOVE
706
707 self.AddToMessage(op.src_extents,
708 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
709 self.AddToMessage(op.dst_extents,
710 self.NewExtentList((16, 128), (512, 5)))
711 self.assertRaises(
712 update_payload.PayloadError,
713 payload_checker._CheckMoveOperation,
714 op, None, 134, 134, 'foo')
715 self.AddToMessage(op.src_extents,
716 self.NewExtentList((0, 4), (12, 2), (1024, 129)))
717 self.AddToMessage(op.dst_extents,
718 self.NewExtentList((16, 128), (512, 6)))
719 self.assertRaises(
720 update_payload.PayloadError,
721 payload_checker._CheckMoveOperation,
722 op, None, 134, 134, 'foo')
723
724 def testCheckMoveOperation_FailExcessDstBlocks(self):
725 """Tests _CheckMoveOperation(); fails, too many actual dst blocks."""
726 payload_checker = checker.PayloadChecker(self.MockPayload())
727 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
728 op.type = common.OpType.MOVE
729
730 self.AddToMessage(op.src_extents,
731 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
732 self.AddToMessage(op.dst_extents,
733 self.NewExtentList((16, 128), (512, 7)))
734 self.assertRaises(
735 update_payload.PayloadError,
736 payload_checker._CheckMoveOperation,
737 op, None, 134, 134, 'foo')
738
739 def testCheckMoveOperation_FailStagnantBlocks(self):
740 """Tests _CheckMoveOperation(); fails, there are blocks that do not move."""
741 payload_checker = checker.PayloadChecker(self.MockPayload())
742 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
743 op.type = common.OpType.MOVE
744
745 self.AddToMessage(op.src_extents,
746 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
747 self.AddToMessage(op.dst_extents,
748 self.NewExtentList((8, 128), (512, 6)))
749 self.assertRaises(
750 update_payload.PayloadError,
751 payload_checker._CheckMoveOperation,
752 op, None, 134, 134, 'foo')
753
754 def testCheckBsdiff(self):
755 """Tests _CheckMoveOperation()."""
756 payload_checker = checker.PayloadChecker(self.MockPayload())
757
758 # Pass.
759 self.assertIsNone(
760 payload_checker._CheckBsdiffOperation(10000, 3, 'foo'))
761
762 # Fail, missing data blob.
763 self.assertRaises(
764 update_payload.PayloadError,
765 payload_checker._CheckBsdiffOperation,
766 None, 3, 'foo')
767
768 # Fail, too big of a diff blob (unjustified).
769 self.assertRaises(
770 update_payload.PayloadError,
771 payload_checker._CheckBsdiffOperation,
772 10000, 2, 'foo')
773
774 def DoCheckOperationTest(self, op_type_name, is_last, allow_signature,
775 allow_unhashed, fail_src_extents, fail_dst_extents,
776 fail_mismatched_data_offset_length,
777 fail_missing_dst_extents, fail_src_length,
778 fail_dst_length, fail_data_hash,
779 fail_prev_data_offset):
780 """Parametric testing of _CheckOperation().
781
782 Args:
783 op_type_name: 'REPLACE', 'REPLACE_BZ', 'MOVE' or 'BSDIFF'
784 is_last: whether we're testing the last operation in a sequence
785 allow_signature: whether we're testing a signature-capable operation
786 allow_unhashed: whether we're allowing to not hash the data
787 fail_src_extents: tamper with src extents
788 fail_dst_extents: tamper with dst extents
789 fail_mismatched_data_offset_length: make data_{offset,length} inconsistent
790 fail_missing_dst_extents: do not include dst extents
791 fail_src_length: make src length inconsistent
792 fail_dst_length: make dst length inconsistent
793 fail_data_hash: tamper with the data blob hash
794 fail_prev_data_offset: make data space uses incontiguous
795
796 """
797 op_type = _OpTypeByName(op_type_name)
798
799 # Create the test object.
800 payload = self.MockPayload()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700801 payload_checker = checker.PayloadChecker(payload,
802 allow_unhashed=allow_unhashed)
Gilad Arnold5502b562013-03-08 13:22:31 -0800803 block_size = payload_checker.block_size
804
805 # Create auxiliary arguments.
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700806 old_part_size = test_utils.MiB(4)
807 new_part_size = test_utils.MiB(8)
Gilad Arnold5502b562013-03-08 13:22:31 -0800808 old_block_counters = array.array(
809 'B', [0] * ((old_part_size + block_size - 1) / block_size))
810 new_block_counters = array.array(
811 'B', [0] * ((new_part_size + block_size - 1) / block_size))
812 prev_data_offset = 1876
813 blob_hash_counts = collections.defaultdict(int)
814
815 # Create the operation object for the test.
816 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
817 op.type = op_type
818
819 total_src_blocks = 0
820 if op_type in (common.OpType.MOVE, common.OpType.BSDIFF):
821 if fail_src_extents:
822 self.AddToMessage(op.src_extents,
823 self.NewExtentList((0, 0)))
824 else:
825 self.AddToMessage(op.src_extents,
826 self.NewExtentList((0, 16)))
827 total_src_blocks = 16
828
829 if op_type != common.OpType.MOVE:
830 if not fail_mismatched_data_offset_length:
831 op.data_length = 16 * block_size - 8
832 if fail_prev_data_offset:
833 op.data_offset = prev_data_offset + 16
834 else:
835 op.data_offset = prev_data_offset
836
837 fake_data = 'fake-data'.ljust(op.data_length)
838 if not (allow_unhashed or (is_last and allow_signature and
839 op_type == common.OpType.REPLACE)):
840 if not fail_data_hash:
841 # Create a valid data blob hash.
842 op.data_sha256_hash = hashlib.sha256(fake_data).digest()
843 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
844 fake_data)
845 elif fail_data_hash:
846 # Create an invalid data blob hash.
847 op.data_sha256_hash = hashlib.sha256(
848 fake_data.replace(' ', '-')).digest()
849 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
850 fake_data)
851
852 total_dst_blocks = 0
853 if not fail_missing_dst_extents:
854 total_dst_blocks = 16
855 if fail_dst_extents:
856 self.AddToMessage(op.dst_extents,
857 self.NewExtentList((4, 16), (32, 0)))
858 else:
859 self.AddToMessage(op.dst_extents,
860 self.NewExtentList((4, 8), (64, 8)))
861
862 if total_src_blocks:
863 if fail_src_length:
864 op.src_length = total_src_blocks * block_size + 8
865 else:
866 op.src_length = total_src_blocks * block_size
867 elif fail_src_length:
868 # Add an orphaned src_length.
869 op.src_length = 16
870
871 if total_dst_blocks:
872 if fail_dst_length:
873 op.dst_length = total_dst_blocks * block_size + 8
874 else:
875 op.dst_length = total_dst_blocks * block_size
876
877 self.mox.ReplayAll()
878 should_fail = (fail_src_extents or fail_dst_extents or
879 fail_mismatched_data_offset_length or
880 fail_missing_dst_extents or fail_src_length or
881 fail_dst_length or fail_data_hash or fail_prev_data_offset)
882 largs = [op, 'foo', is_last, old_block_counters, new_block_counters,
883 old_part_size, new_part_size, prev_data_offset, allow_signature,
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700884 blob_hash_counts]
Gilad Arnold5502b562013-03-08 13:22:31 -0800885 if should_fail:
886 self.assertRaises(update_payload.PayloadError,
887 payload_checker._CheckOperation, *largs)
888 else:
889 self.assertEqual(payload_checker._CheckOperation(*largs),
890 op.data_length if op.HasField('data_length') else 0)
891
892 def testAllocBlockCounters(self):
893 """Tests _CheckMoveOperation()."""
894 payload_checker = checker.PayloadChecker(self.MockPayload())
895 block_size = payload_checker.block_size
896
897 # Check allocation for block-aligned partition size, ensure it's integers.
898 result = payload_checker._AllocBlockCounters(16 * block_size)
899 self.assertEqual(len(result), 16)
900 self.assertEqual(type(result[0]), int)
901
902 # Check allocation of unaligned partition sizes.
903 result = payload_checker._AllocBlockCounters(16 * block_size - 1)
904 self.assertEqual(len(result), 16)
905 result = payload_checker._AllocBlockCounters(16 * block_size + 1)
906 self.assertEqual(len(result), 17)
907
908 def DoCheckOperationsTest(self, fail_bad_type,
909 fail_nonexhaustive_full_update):
910 # Generate a test payload. For this test, we only care about one
911 # (arbitrary) set of operations, so we'll only be generating kernel and
912 # test with them.
913 payload_gen = test_utils.PayloadGenerator()
914
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700915 block_size = test_utils.KiB(4)
Gilad Arnold5502b562013-03-08 13:22:31 -0800916 payload_gen.SetBlockSize(block_size)
917
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700918 rootfs_part_size = test_utils.MiB(8)
Gilad Arnold5502b562013-03-08 13:22:31 -0800919
920 # Fake rootfs operations in a full update, tampered with as required.
921 rootfs_op_type = common.OpType.REPLACE
922 if fail_bad_type:
923 # Choose a type value that's bigger than the highest valid value.
924 for valid_op_type in common.OpType.ALL:
925 rootfs_op_type = max(rootfs_op_type, valid_op_type)
926 rootfs_op_type += 1
927
928 rootfs_data_length = rootfs_part_size
929 if fail_nonexhaustive_full_update:
930 rootfs_data_length -= block_size
931
932 payload_gen.AddOperation(False, rootfs_op_type,
933 dst_extents=[(0, rootfs_data_length / block_size)],
934 data_offset=0,
935 data_length=rootfs_data_length)
936
937 # Create the test object.
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700938 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile,
939 checker_init_dargs={
940 'allow_unhashed': True})
Gilad Arnold5502b562013-03-08 13:22:31 -0800941 payload_checker.payload_type = checker._TYPE_FULL
942 report = checker._PayloadReport()
943
944 should_fail = (fail_bad_type or fail_nonexhaustive_full_update)
945 largs = (payload_checker.payload.manifest.install_operations, report,
Gilad Arnold382df5c2013-05-03 12:49:28 -0700946 'foo', 0, rootfs_part_size, rootfs_part_size, 0, False)
Gilad Arnold5502b562013-03-08 13:22:31 -0800947 if should_fail:
948 self.assertRaises(update_payload.PayloadError,
949 payload_checker._CheckOperations, *largs)
950 else:
951 self.assertEqual(payload_checker._CheckOperations(*largs),
952 rootfs_data_length)
953
954 def DoCheckSignaturesTest(self, fail_empty_sigs_blob, fail_missing_pseudo_op,
955 fail_mismatched_pseudo_op, fail_sig_missing_fields,
956 fail_unknown_sig_version, fail_incorrect_sig):
957 # Generate a test payload. For this test, we only care about the signature
958 # block and how it relates to the payload hash. Therefore, we're generating
959 # a random (otherwise useless) payload for this purpose.
960 payload_gen = test_utils.EnhancedPayloadGenerator()
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700961 block_size = test_utils.KiB(4)
Gilad Arnold5502b562013-03-08 13:22:31 -0800962 payload_gen.SetBlockSize(block_size)
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700963 rootfs_part_size = test_utils.MiB(2)
964 kernel_part_size = test_utils.KiB(16)
Gilad Arnold5502b562013-03-08 13:22:31 -0800965 payload_gen.SetPartInfo(False, True, rootfs_part_size,
966 hashlib.sha256('fake-new-rootfs-content').digest())
Gilad Arnold382df5c2013-05-03 12:49:28 -0700967 payload_gen.SetPartInfo(True, True, kernel_part_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800968 hashlib.sha256('fake-new-kernel-content').digest())
969 payload_gen.AddOperationWithData(
970 False, common.OpType.REPLACE,
971 dst_extents=[(0, rootfs_part_size / block_size)],
972 data_blob=os.urandom(rootfs_part_size))
973
974 do_forge_pseudo_op = (fail_missing_pseudo_op or fail_mismatched_pseudo_op)
975 do_forge_sigs_data = (do_forge_pseudo_op or fail_empty_sigs_blob or
976 fail_sig_missing_fields or fail_unknown_sig_version
977 or fail_incorrect_sig)
978
979 sigs_data = None
980 if do_forge_sigs_data:
981 sigs_gen = test_utils.SignaturesGenerator()
982 if not fail_empty_sigs_blob:
983 if fail_sig_missing_fields:
984 sig_data = None
985 else:
986 sig_data = test_utils.SignSha256('fake-payload-content',
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700987 test_utils._PRIVKEY_FILE_NAME)
Gilad Arnold5502b562013-03-08 13:22:31 -0800988 sigs_gen.AddSig(5 if fail_unknown_sig_version else 1, sig_data)
989
990 sigs_data = sigs_gen.ToBinary()
991 payload_gen.SetSignatures(payload_gen.curr_offset, len(sigs_data))
992
993 if do_forge_pseudo_op:
994 assert sigs_data is not None, 'should have forged signatures blob by now'
995 sigs_len = len(sigs_data)
996 payload_gen.AddOperation(
997 False, common.OpType.REPLACE,
998 data_offset=payload_gen.curr_offset / 2,
999 data_length=sigs_len / 2,
1000 dst_extents=[(0, (sigs_len / 2 + block_size - 1) / block_size)])
1001
1002 # Generate payload (complete w/ signature) and create the test object.
1003 payload_checker = _GetPayloadChecker(
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001004 payload_gen.WriteToFileWithData,
1005 payload_gen_dargs={
1006 'sigs_data': sigs_data,
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001007 'privkey_file_name': test_utils._PRIVKEY_FILE_NAME,
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001008 'do_add_pseudo_operation': not do_forge_pseudo_op})
Gilad Arnold5502b562013-03-08 13:22:31 -08001009 payload_checker.payload_type = checker._TYPE_FULL
1010 report = checker._PayloadReport()
1011
1012 # We have to check the manifest first in order to set signature attributes.
Gilad Arnold382df5c2013-05-03 12:49:28 -07001013 payload_checker._CheckManifest(report, rootfs_part_size, kernel_part_size)
Gilad Arnold5502b562013-03-08 13:22:31 -08001014
1015 should_fail = (fail_empty_sigs_blob or fail_missing_pseudo_op or
1016 fail_mismatched_pseudo_op or fail_sig_missing_fields or
1017 fail_unknown_sig_version or fail_incorrect_sig)
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001018 largs = (report, test_utils._PUBKEY_FILE_NAME)
Gilad Arnold5502b562013-03-08 13:22:31 -08001019 if should_fail:
1020 self.assertRaises(update_payload.PayloadError,
1021 payload_checker._CheckSignatures, *largs)
1022 else:
1023 self.assertIsNone(payload_checker._CheckSignatures(*largs))
1024
1025 def DoRunTest(self, fail_wrong_payload_type, fail_invalid_block_size,
1026 fail_mismatched_block_size, fail_excess_data):
1027 # Generate a test payload. For this test, we generate a full update that
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001028 # has sample kernel and rootfs operations. Since most testing is done with
Gilad Arnold5502b562013-03-08 13:22:31 -08001029 # internal PayloadChecker methods that are tested elsewhere, here we only
1030 # tamper with what's actually being manipulated and/or tested in the Run()
1031 # method itself. Note that the checker doesn't verify partition hashes, so
1032 # they're safe to fake.
1033 payload_gen = test_utils.EnhancedPayloadGenerator()
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001034 block_size = test_utils.KiB(4)
Gilad Arnold5502b562013-03-08 13:22:31 -08001035 payload_gen.SetBlockSize(block_size)
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001036 kernel_part_size = test_utils.KiB(16)
1037 rootfs_part_size = test_utils.MiB(2)
Gilad Arnold5502b562013-03-08 13:22:31 -08001038 payload_gen.SetPartInfo(False, True, rootfs_part_size,
1039 hashlib.sha256('fake-new-rootfs-content').digest())
1040 payload_gen.SetPartInfo(True, True, kernel_part_size,
1041 hashlib.sha256('fake-new-kernel-content').digest())
1042 payload_gen.AddOperationWithData(
1043 False, common.OpType.REPLACE,
1044 dst_extents=[(0, rootfs_part_size / block_size)],
1045 data_blob=os.urandom(rootfs_part_size))
1046 payload_gen.AddOperationWithData(
1047 True, common.OpType.REPLACE,
1048 dst_extents=[(0, kernel_part_size / block_size)],
1049 data_blob=os.urandom(kernel_part_size))
1050
1051 # Generate payload (complete w/ signature) and create the test object.
Gilad Arnold5502b562013-03-08 13:22:31 -08001052 if fail_invalid_block_size:
1053 use_block_size = block_size + 5 # not a power of two
1054 elif fail_mismatched_block_size:
1055 use_block_size = block_size * 2 # different that payload stated
1056 else:
1057 use_block_size = block_size
Gilad Arnold5502b562013-03-08 13:22:31 -08001058
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001059 dargs = {
1060 'payload_gen_dargs': {
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001061 'privkey_file_name': test_utils._PRIVKEY_FILE_NAME,
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001062 'do_add_pseudo_operation': True,
1063 'is_pseudo_in_kernel': True,
1064 'padding': os.urandom(1024) if fail_excess_data else None},
1065 'checker_init_dargs': {
1066 'assert_type': 'delta' if fail_wrong_payload_type else 'full',
1067 'block_size': use_block_size}}
1068 if fail_invalid_block_size:
1069 self.assertRaises(update_payload.PayloadError, _GetPayloadChecker,
1070 payload_gen.WriteToFileWithData, **dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -08001071 else:
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001072 payload_checker = _GetPayloadChecker(payload_gen.WriteToFileWithData,
1073 **dargs)
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001074 dargs = {'pubkey_file_name': test_utils._PUBKEY_FILE_NAME}
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001075 should_fail = (fail_wrong_payload_type or fail_mismatched_block_size or
1076 fail_excess_data)
1077 if should_fail:
1078 self.assertRaises(update_payload.PayloadError,
1079 payload_checker.Run, **dargs)
1080 else:
1081 self.assertIsNone(payload_checker.Run(**dargs))
Gilad Arnold5502b562013-03-08 13:22:31 -08001082
1083
1084# This implements a generic API, hence the occasional unused args.
1085# pylint: disable=W0613
1086def ValidateCheckOperationTest(op_type_name, is_last, allow_signature,
1087 allow_unhashed, fail_src_extents,
1088 fail_dst_extents,
1089 fail_mismatched_data_offset_length,
1090 fail_missing_dst_extents, fail_src_length,
1091 fail_dst_length, fail_data_hash,
1092 fail_prev_data_offset):
1093 """Returns True iff the combination of arguments represents a valid test."""
1094 op_type = _OpTypeByName(op_type_name)
1095
1096 # REPLACE/REPLACE_BZ operations don't read data from src partition.
1097 if (op_type in (common.OpType.REPLACE, common.OpType.REPLACE_BZ) and (
1098 fail_src_extents or fail_src_length)):
1099 return False
1100
1101 # MOVE operations don't carry data.
1102 if (op_type == common.OpType.MOVE and (
1103 fail_mismatched_data_offset_length or fail_data_hash or
1104 fail_prev_data_offset)):
1105 return False
1106
1107 return True
1108
1109
1110def TestMethodBody(run_method_name, run_dargs):
1111 """Returns a function that invokes a named method with named arguments."""
1112 return lambda self: getattr(self, run_method_name)(**run_dargs)
1113
1114
1115def AddParametricTests(tested_method_name, arg_space, validate_func=None):
1116 """Enumerates and adds specific parametric tests to PayloadCheckerTest.
1117
1118 This function enumerates a space of test parameters (defined by arg_space),
1119 then binds a new, unique method name in PayloadCheckerTest to a test function
1120 that gets handed the said parameters. This is a preferable approach to doing
1121 the enumeration and invocation during the tests because this way each test is
1122 treated as a complete run by the unittest framework, and so benefits from the
1123 usual setUp/tearDown mechanics.
1124
1125 Args:
1126 tested_method_name: name of the tested PayloadChecker method
1127 arg_space: a dictionary containing variables (keys) and lists of values
1128 (values) associated with them
1129 validate_func: a function used for validating test argument combinations
1130
1131 """
1132 for value_tuple in itertools.product(*arg_space.itervalues()):
1133 run_dargs = dict(zip(arg_space.iterkeys(), value_tuple))
1134 if validate_func and not validate_func(**run_dargs):
1135 continue
1136 run_method_name = 'Do%sTest' % tested_method_name
1137 test_method_name = 'test%s' % tested_method_name
1138 for arg_key, arg_val in run_dargs.iteritems():
1139 if arg_val or type(arg_val) is int:
1140 test_method_name += '__%s=%s' % (arg_key, arg_val)
1141 setattr(PayloadCheckerTest, test_method_name,
1142 TestMethodBody(run_method_name, run_dargs))
1143
1144
1145def AddAllParametricTests():
1146 """Enumerates and adds all parametric tests to PayloadCheckerTest."""
1147 # Add all _CheckElem() test cases.
1148 AddParametricTests('AddElem',
1149 {'linebreak': (True, False),
1150 'indent': (0, 1, 2),
1151 'convert': (str, lambda s: s[::-1]),
1152 'is_present': (True, False),
1153 'is_mandatory': (True, False),
1154 'is_submsg': (True, False)})
1155
1156 # Add all _Add{Mandatory,Optional}Field tests.
1157 AddParametricTests('AddField',
1158 {'is_mandatory': (True, False),
1159 'linebreak': (True, False),
1160 'indent': (0, 1, 2),
1161 'convert': (str, lambda s: s[::-1]),
1162 'is_present': (True, False)})
1163
1164 # Add all _Add{Mandatory,Optional}SubMsg tests.
1165 AddParametricTests('AddSubMsg',
1166 {'is_mandatory': (True, False),
1167 'is_present': (True, False)})
1168
1169 # Add all _CheckManifest() test cases.
1170 AddParametricTests('CheckManifest',
1171 {'fail_mismatched_block_size': (True, False),
1172 'fail_bad_sigs': (True, False),
1173 'fail_mismatched_oki_ori': (True, False),
1174 'fail_bad_oki': (True, False),
1175 'fail_bad_ori': (True, False),
1176 'fail_bad_nki': (True, False),
1177 'fail_bad_nri': (True, False),
Gilad Arnold382df5c2013-05-03 12:49:28 -07001178 'fail_missing_ops': (True, False),
1179 'fail_old_kernel_fs_size': (True, False),
1180 'fail_old_rootfs_fs_size': (True, False),
1181 'fail_new_kernel_fs_size': (True, False),
1182 'fail_new_rootfs_fs_size': (True, False)})
Gilad Arnold5502b562013-03-08 13:22:31 -08001183
1184 # Add all _CheckOperation() test cases.
1185 AddParametricTests('CheckOperation',
1186 {'op_type_name': ('REPLACE', 'REPLACE_BZ', 'MOVE',
1187 'BSDIFF'),
1188 'is_last': (True, False),
1189 'allow_signature': (True, False),
1190 'allow_unhashed': (True, False),
1191 'fail_src_extents': (True, False),
1192 'fail_dst_extents': (True, False),
1193 'fail_mismatched_data_offset_length': (True, False),
1194 'fail_missing_dst_extents': (True, False),
1195 'fail_src_length': (True, False),
1196 'fail_dst_length': (True, False),
1197 'fail_data_hash': (True, False),
1198 'fail_prev_data_offset': (True, False)},
1199 validate_func=ValidateCheckOperationTest)
1200
1201 # Add all _CheckOperations() test cases.
1202 AddParametricTests('CheckOperations',
1203 {'fail_bad_type': (True, False),
1204 'fail_nonexhaustive_full_update': (True, False)})
1205
1206 # Add all _CheckOperations() test cases.
1207 AddParametricTests('CheckSignatures',
1208 {'fail_empty_sigs_blob': (True, False),
1209 'fail_missing_pseudo_op': (True, False),
1210 'fail_mismatched_pseudo_op': (True, False),
1211 'fail_sig_missing_fields': (True, False),
1212 'fail_unknown_sig_version': (True, False),
1213 'fail_incorrect_sig': (True, False)})
1214
1215 # Add all Run() test cases.
1216 AddParametricTests('Run',
1217 {'fail_wrong_payload_type': (True, False),
1218 'fail_invalid_block_size': (True, False),
1219 'fail_mismatched_block_size': (True, False),
1220 'fail_excess_data': (True, False)})
1221
1222
1223if __name__ == '__main__':
1224 AddAllParametricTests()
1225 unittest.main()