blob: 60ae1a2248202ea6a1dac875c44153b4c48297d4 [file] [log] [blame]
Gilad Arnoldf583a7d2015-02-05 13:23:55 -08001#!/usr/bin/python2
Gilad Arnold5502b562013-03-08 13:22:31 -08002#
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Unit testing checker.py."""
8
Gilad Arnoldf583a7d2015-02-05 13:23:55 -08009from __future__ import print_function
10
Gilad Arnold5502b562013-03-08 13:22:31 -080011import array
12import collections
13import cStringIO
14import hashlib
15import itertools
16import os
17import unittest
18
Gilad Arnoldcb638912013-06-24 04:57:11 -070019# pylint cannot find mox.
Gilad Arnold5502b562013-03-08 13:22:31 -080020# pylint: disable=F0401
21import mox
22
23import checker
24import common
Gilad Arnoldcb638912013-06-24 04:57:11 -070025import payload as update_payload # Avoid name conflicts later.
Gilad Arnold5502b562013-03-08 13:22:31 -080026import test_utils
27import update_metadata_pb2
28
29
Gilad Arnold5502b562013-03-08 13:22:31 -080030def _OpTypeByName(op_name):
31 op_name_to_type = {
32 'REPLACE': common.OpType.REPLACE,
33 'REPLACE_BZ': common.OpType.REPLACE_BZ,
34 'MOVE': common.OpType.MOVE,
35 'BSDIFF': common.OpType.BSDIFF,
36 }
37 return op_name_to_type[op_name]
38
39
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070040def _GetPayloadChecker(payload_gen_write_to_file_func, payload_gen_dargs=None,
41 checker_init_dargs=None):
Gilad Arnold5502b562013-03-08 13:22:31 -080042 """Returns a payload checker from a given payload generator."""
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070043 if payload_gen_dargs is None:
44 payload_gen_dargs = {}
45 if checker_init_dargs is None:
46 checker_init_dargs = {}
47
Gilad Arnold5502b562013-03-08 13:22:31 -080048 payload_file = cStringIO.StringIO()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070049 payload_gen_write_to_file_func(payload_file, **payload_gen_dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -080050 payload_file.seek(0)
51 payload = update_payload.Payload(payload_file)
52 payload.Init()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -070053 return checker.PayloadChecker(payload, **checker_init_dargs)
Gilad Arnold5502b562013-03-08 13:22:31 -080054
55
56def _GetPayloadCheckerWithData(payload_gen):
57 """Returns a payload checker from a given payload generator."""
58 payload_file = cStringIO.StringIO()
59 payload_gen.WriteToFile(payload_file)
60 payload_file.seek(0)
61 payload = update_payload.Payload(payload_file)
62 payload.Init()
63 return checker.PayloadChecker(payload)
64
65
Gilad Arnoldcb638912013-06-24 04:57:11 -070066# This class doesn't need an __init__().
Gilad Arnold5502b562013-03-08 13:22:31 -080067# pylint: disable=W0232
Gilad Arnoldcb638912013-06-24 04:57:11 -070068# Unit testing is all about running protected methods.
Gilad Arnold5502b562013-03-08 13:22:31 -080069# pylint: disable=W0212
Gilad Arnoldcb638912013-06-24 04:57:11 -070070# Don't bark about missing members of classes you cannot import.
Gilad Arnold5502b562013-03-08 13:22:31 -080071# pylint: disable=E1101
72class PayloadCheckerTest(mox.MoxTestBase):
73 """Tests the PayloadChecker class.
74
75 In addition to ordinary testFoo() methods, which are automatically invoked by
76 the unittest framework, in this class we make use of DoBarTest() calls that
77 implement parametric tests of certain features. In order to invoke each test,
78 which embodies a unique combination of parameter values, as a complete unit
79 test, we perform explicit enumeration of the parameter space and create
80 individual invocation contexts for each, which are then bound as
81 testBar__param1=val1__param2=val2(). The enumeration of parameter spaces for
82 all such tests is done in AddAllParametricTests().
Gilad Arnold5502b562013-03-08 13:22:31 -080083 """
84
85 def MockPayload(self):
86 """Create a mock payload object, complete with a mock menifest."""
87 payload = self.mox.CreateMock(update_payload.Payload)
88 payload.is_init = True
89 payload.manifest = self.mox.CreateMock(
90 update_metadata_pb2.DeltaArchiveManifest)
91 return payload
92
93 @staticmethod
94 def NewExtent(start_block, num_blocks):
95 """Returns an Extent message.
96
97 Each of the provided fields is set iff it is >= 0; otherwise, it's left at
98 its default state.
99
100 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700101 start_block: The starting block of the extent.
102 num_blocks: The number of blocks in the extent.
Gilad Arnoldf583a7d2015-02-05 13:23:55 -0800103
Gilad Arnold5502b562013-03-08 13:22:31 -0800104 Returns:
105 An Extent message.
Gilad Arnold5502b562013-03-08 13:22:31 -0800106 """
107 ex = update_metadata_pb2.Extent()
108 if start_block >= 0:
109 ex.start_block = start_block
110 if num_blocks >= 0:
111 ex.num_blocks = num_blocks
112 return ex
113
114 @staticmethod
115 def NewExtentList(*args):
116 """Returns an list of extents.
117
118 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700119 *args: (start_block, num_blocks) pairs defining the extents.
Gilad Arnoldf583a7d2015-02-05 13:23:55 -0800120
Gilad Arnold5502b562013-03-08 13:22:31 -0800121 Returns:
122 A list of Extent objects.
Gilad Arnold5502b562013-03-08 13:22:31 -0800123 """
124 ex_list = []
125 for start_block, num_blocks in args:
126 ex_list.append(PayloadCheckerTest.NewExtent(start_block, num_blocks))
127 return ex_list
128
129 @staticmethod
130 def AddToMessage(repeated_field, field_vals):
131 for field_val in field_vals:
132 new_field = repeated_field.add()
133 new_field.CopyFrom(field_val)
134
Gilad Arnold5502b562013-03-08 13:22:31 -0800135 def SetupAddElemTest(self, is_present, is_submsg, convert=str,
136 linebreak=False, indent=0):
137 """Setup for testing of _CheckElem() and its derivatives.
138
139 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700140 is_present: Whether or not the element is found in the message.
141 is_submsg: Whether the element is a sub-message itself.
142 convert: A representation conversion function.
143 linebreak: Whether or not a linebreak is to be used in the report.
144 indent: Indentation used for the report.
Gilad Arnoldf583a7d2015-02-05 13:23:55 -0800145
Gilad Arnold5502b562013-03-08 13:22:31 -0800146 Returns:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700147 msg: A mock message object.
148 report: A mock report object.
149 subreport: A mock sub-report object.
150 name: An element name to check.
151 val: Expected element value.
Gilad Arnold5502b562013-03-08 13:22:31 -0800152 """
153 name = 'foo'
154 val = 'fake submsg' if is_submsg else 'fake field'
155 subreport = 'fake subreport'
156
157 # Create a mock message.
158 msg = self.mox.CreateMock(update_metadata_pb2.message.Message)
159 msg.HasField(name).AndReturn(is_present)
160 setattr(msg, name, val)
161
162 # Create a mock report.
163 report = self.mox.CreateMock(checker._PayloadReport)
164 if is_present:
165 if is_submsg:
166 report.AddSubReport(name).AndReturn(subreport)
167 else:
168 report.AddField(name, convert(val), linebreak=linebreak, indent=indent)
169
170 self.mox.ReplayAll()
171 return (msg, report, subreport, name, val)
172
173 def DoAddElemTest(self, is_present, is_mandatory, is_submsg, convert,
174 linebreak, indent):
175 """Parametric testing of _CheckElem().
176
177 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700178 is_present: Whether or not the element is found in the message.
179 is_mandatory: Whether or not it's a mandatory element.
180 is_submsg: Whether the element is a sub-message itself.
181 convert: A representation conversion function.
182 linebreak: Whether or not a linebreak is to be used in the report.
183 indent: Indentation used for the report.
Gilad Arnold5502b562013-03-08 13:22:31 -0800184 """
185 msg, report, subreport, name, val = self.SetupAddElemTest(
186 is_present, is_submsg, convert, linebreak, indent)
187
Gilad Arnoldcb638912013-06-24 04:57:11 -0700188 args = (msg, name, report, is_mandatory, is_submsg)
189 kwargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
Gilad Arnold5502b562013-03-08 13:22:31 -0800190 if is_mandatory and not is_present:
191 self.assertRaises(update_payload.PayloadError,
Gilad Arnoldcb638912013-06-24 04:57:11 -0700192 checker.PayloadChecker._CheckElem, *args, **kwargs)
Gilad Arnold5502b562013-03-08 13:22:31 -0800193 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700194 ret_val, ret_subreport = checker.PayloadChecker._CheckElem(*args,
195 **kwargs)
196 self.assertEquals(val if is_present else None, ret_val)
197 self.assertEquals(subreport if is_present and is_submsg else None,
198 ret_subreport)
Gilad Arnold5502b562013-03-08 13:22:31 -0800199
200 def DoAddFieldTest(self, is_mandatory, is_present, convert, linebreak,
201 indent):
202 """Parametric testing of _Check{Mandatory,Optional}Field().
203
204 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700205 is_mandatory: Whether we're testing a mandatory call.
206 is_present: Whether or not the element is found in the message.
207 convert: A representation conversion function.
208 linebreak: Whether or not a linebreak is to be used in the report.
209 indent: Indentation used for the report.
Gilad Arnold5502b562013-03-08 13:22:31 -0800210 """
211 msg, report, _, name, val = self.SetupAddElemTest(
212 is_present, False, convert, linebreak, indent)
213
214 # Prepare for invocation of the tested method.
Gilad Arnoldcb638912013-06-24 04:57:11 -0700215 args = [msg, name, report]
216 kwargs = {'convert': convert, 'linebreak': linebreak, 'indent': indent}
Gilad Arnold5502b562013-03-08 13:22:31 -0800217 if is_mandatory:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700218 args.append('bar')
Gilad Arnold5502b562013-03-08 13:22:31 -0800219 tested_func = checker.PayloadChecker._CheckMandatoryField
220 else:
221 tested_func = checker.PayloadChecker._CheckOptionalField
222
223 # Test the method call.
224 if is_mandatory and not is_present:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700225 self.assertRaises(update_payload.PayloadError, tested_func, *args,
226 **kwargs)
Gilad Arnold5502b562013-03-08 13:22:31 -0800227 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700228 ret_val = tested_func(*args, **kwargs)
229 self.assertEquals(val if is_present else None, ret_val)
Gilad Arnold5502b562013-03-08 13:22:31 -0800230
231 def DoAddSubMsgTest(self, is_mandatory, is_present):
232 """Parametrized testing of _Check{Mandatory,Optional}SubMsg().
233
234 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700235 is_mandatory: Whether we're testing a mandatory call.
236 is_present: Whether or not the element is found in the message.
Gilad Arnold5502b562013-03-08 13:22:31 -0800237 """
238 msg, report, subreport, name, val = self.SetupAddElemTest(is_present, True)
239
240 # Prepare for invocation of the tested method.
Gilad Arnoldcb638912013-06-24 04:57:11 -0700241 args = [msg, name, report]
Gilad Arnold5502b562013-03-08 13:22:31 -0800242 if is_mandatory:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700243 args.append('bar')
Gilad Arnold5502b562013-03-08 13:22:31 -0800244 tested_func = checker.PayloadChecker._CheckMandatorySubMsg
245 else:
246 tested_func = checker.PayloadChecker._CheckOptionalSubMsg
247
248 # Test the method call.
249 if is_mandatory and not is_present:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700250 self.assertRaises(update_payload.PayloadError, tested_func, *args)
Gilad Arnold5502b562013-03-08 13:22:31 -0800251 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700252 ret_val, ret_subreport = tested_func(*args)
253 self.assertEquals(val if is_present else None, ret_val)
254 self.assertEquals(subreport if is_present else None, ret_subreport)
Gilad Arnold5502b562013-03-08 13:22:31 -0800255
256 def testCheckPresentIff(self):
257 """Tests _CheckPresentIff()."""
258 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
259 None, None, 'foo', 'bar', 'baz'))
260 self.assertIsNone(checker.PayloadChecker._CheckPresentIff(
261 'a', 'b', 'foo', 'bar', 'baz'))
262 self.assertRaises(update_payload.PayloadError,
263 checker.PayloadChecker._CheckPresentIff,
264 'a', None, 'foo', 'bar', 'baz')
265 self.assertRaises(update_payload.PayloadError,
266 checker.PayloadChecker._CheckPresentIff,
267 None, 'b', 'foo', 'bar', 'baz')
268
269 def DoCheckSha256SignatureTest(self, expect_pass, expect_subprocess_call,
270 sig_data, sig_asn1_header,
271 returned_signed_hash, expected_signed_hash):
272 """Parametric testing of _CheckSha256SignatureTest().
273
274 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700275 expect_pass: Whether or not it should pass.
276 expect_subprocess_call: Whether to expect the openssl call to happen.
277 sig_data: The signature raw data.
278 sig_asn1_header: The ASN1 header.
279 returned_signed_hash: The signed hash data retuned by openssl.
280 expected_signed_hash: The signed hash data to compare against.
Gilad Arnold5502b562013-03-08 13:22:31 -0800281 """
Gilad Arnoldcb638912013-06-24 04:57:11 -0700282 try:
283 # Stub out the subprocess invocation.
284 self.mox.StubOutWithMock(checker.PayloadChecker, '_Run')
285 if expect_subprocess_call:
286 checker.PayloadChecker._Run(
287 mox.IsA(list), send_data=sig_data).AndReturn(
288 (sig_asn1_header + returned_signed_hash, None))
Gilad Arnold5502b562013-03-08 13:22:31 -0800289
Gilad Arnoldcb638912013-06-24 04:57:11 -0700290 self.mox.ReplayAll()
291 if expect_pass:
292 self.assertIsNone(checker.PayloadChecker._CheckSha256Signature(
293 sig_data, 'foo', expected_signed_hash, 'bar'))
294 else:
295 self.assertRaises(update_payload.PayloadError,
296 checker.PayloadChecker._CheckSha256Signature,
297 sig_data, 'foo', expected_signed_hash, 'bar')
298 finally:
299 self.mox.UnsetStubs()
Gilad Arnold5502b562013-03-08 13:22:31 -0800300
301 def testCheckSha256Signature_Pass(self):
302 """Tests _CheckSha256Signature(); pass case."""
303 sig_data = 'fake-signature'.ljust(256)
304 signed_hash = hashlib.sha256('fake-data').digest()
305 self.DoCheckSha256SignatureTest(True, True, sig_data,
306 common.SIG_ASN1_HEADER, signed_hash,
307 signed_hash)
308
309 def testCheckSha256Signature_FailBadSignature(self):
310 """Tests _CheckSha256Signature(); fails due to malformed signature."""
Gilad Arnoldcb638912013-06-24 04:57:11 -0700311 sig_data = 'fake-signature' # Malformed (not 256 bytes in length).
Gilad Arnold5502b562013-03-08 13:22:31 -0800312 signed_hash = hashlib.sha256('fake-data').digest()
313 self.DoCheckSha256SignatureTest(False, False, sig_data,
314 common.SIG_ASN1_HEADER, signed_hash,
315 signed_hash)
316
317 def testCheckSha256Signature_FailBadOutputLength(self):
318 """Tests _CheckSha256Signature(); fails due to unexpected output length."""
319 sig_data = 'fake-signature'.ljust(256)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700320 signed_hash = 'fake-hash' # Malformed (not 32 bytes in length).
Gilad Arnold5502b562013-03-08 13:22:31 -0800321 self.DoCheckSha256SignatureTest(False, True, sig_data,
322 common.SIG_ASN1_HEADER, signed_hash,
323 signed_hash)
324
325 def testCheckSha256Signature_FailBadAsnHeader(self):
326 """Tests _CheckSha256Signature(); fails due to bad ASN1 header."""
327 sig_data = 'fake-signature'.ljust(256)
328 signed_hash = hashlib.sha256('fake-data').digest()
329 bad_asn1_header = 'bad-asn-header'.ljust(len(common.SIG_ASN1_HEADER))
330 self.DoCheckSha256SignatureTest(False, True, sig_data, bad_asn1_header,
331 signed_hash, signed_hash)
332
333 def testCheckSha256Signature_FailBadHash(self):
334 """Tests _CheckSha256Signature(); fails due to bad hash returned."""
335 sig_data = 'fake-signature'.ljust(256)
336 expected_signed_hash = hashlib.sha256('fake-data').digest()
337 returned_signed_hash = hashlib.sha256('bad-fake-data').digest()
338 self.DoCheckSha256SignatureTest(False, True, sig_data,
339 common.SIG_ASN1_HEADER,
340 expected_signed_hash, returned_signed_hash)
341
342 def testCheckBlocksFitLength_Pass(self):
343 """Tests _CheckBlocksFitLength(); pass case."""
344 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
345 64, 4, 16, 'foo'))
346 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
347 60, 4, 16, 'foo'))
348 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
349 49, 4, 16, 'foo'))
350 self.assertIsNone(checker.PayloadChecker._CheckBlocksFitLength(
351 48, 3, 16, 'foo'))
352
353 def testCheckBlocksFitLength_TooManyBlocks(self):
354 """Tests _CheckBlocksFitLength(); fails due to excess blocks."""
355 self.assertRaises(update_payload.PayloadError,
356 checker.PayloadChecker._CheckBlocksFitLength,
357 64, 5, 16, 'foo')
358 self.assertRaises(update_payload.PayloadError,
359 checker.PayloadChecker._CheckBlocksFitLength,
360 60, 5, 16, 'foo')
361 self.assertRaises(update_payload.PayloadError,
362 checker.PayloadChecker._CheckBlocksFitLength,
363 49, 5, 16, 'foo')
364 self.assertRaises(update_payload.PayloadError,
365 checker.PayloadChecker._CheckBlocksFitLength,
366 48, 4, 16, 'foo')
367
368 def testCheckBlocksFitLength_TooFewBlocks(self):
369 """Tests _CheckBlocksFitLength(); fails due to insufficient blocks."""
370 self.assertRaises(update_payload.PayloadError,
371 checker.PayloadChecker._CheckBlocksFitLength,
372 64, 3, 16, 'foo')
373 self.assertRaises(update_payload.PayloadError,
374 checker.PayloadChecker._CheckBlocksFitLength,
375 60, 3, 16, 'foo')
376 self.assertRaises(update_payload.PayloadError,
377 checker.PayloadChecker._CheckBlocksFitLength,
378 49, 3, 16, 'foo')
379 self.assertRaises(update_payload.PayloadError,
380 checker.PayloadChecker._CheckBlocksFitLength,
381 48, 2, 16, 'foo')
382
383 def DoCheckManifestTest(self, fail_mismatched_block_size, fail_bad_sigs,
384 fail_mismatched_oki_ori, fail_bad_oki, fail_bad_ori,
Gilad Arnold5bc7fbe2015-02-05 13:01:09 -0800385 fail_bad_nki, fail_bad_nri, fail_old_kernel_fs_size,
386 fail_old_rootfs_fs_size, fail_new_kernel_fs_size,
387 fail_new_rootfs_fs_size):
Gilad Arnold5502b562013-03-08 13:22:31 -0800388 """Parametric testing of _CheckManifest().
389
390 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700391 fail_mismatched_block_size: Simulate a missing block_size field.
392 fail_bad_sigs: Make signatures descriptor inconsistent.
393 fail_mismatched_oki_ori: Make old rootfs/kernel info partially present.
394 fail_bad_oki: Tamper with old kernel info.
395 fail_bad_ori: Tamper with old rootfs info.
396 fail_bad_nki: Tamper with new kernel info.
397 fail_bad_nri: Tamper with new rootfs info.
Gilad Arnoldcb638912013-06-24 04:57:11 -0700398 fail_old_kernel_fs_size: Make old kernel fs size too big.
399 fail_old_rootfs_fs_size: Make old rootfs fs size too big.
400 fail_new_kernel_fs_size: Make new kernel fs size too big.
401 fail_new_rootfs_fs_size: Make new rootfs fs size too big.
Gilad Arnold5502b562013-03-08 13:22:31 -0800402 """
403 # Generate a test payload. For this test, we only care about the manifest
404 # and don't need any data blobs, hence we can use a plain paylaod generator
405 # (which also gives us more control on things that can be screwed up).
406 payload_gen = test_utils.PayloadGenerator()
407
408 # Tamper with block size, if required.
409 if fail_mismatched_block_size:
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700410 payload_gen.SetBlockSize(test_utils.KiB(1))
Gilad Arnold5502b562013-03-08 13:22:31 -0800411 else:
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700412 payload_gen.SetBlockSize(test_utils.KiB(4))
Gilad Arnold5502b562013-03-08 13:22:31 -0800413
414 # Add some operations.
Gilad Arnold5bc7fbe2015-02-05 13:01:09 -0800415 payload_gen.AddOperation(False, common.OpType.MOVE,
416 src_extents=[(0, 16), (16, 497)],
417 dst_extents=[(16, 496), (0, 16)])
418 payload_gen.AddOperation(True, common.OpType.MOVE,
419 src_extents=[(0, 8), (8, 8)],
420 dst_extents=[(8, 8), (0, 8)])
Gilad Arnold5502b562013-03-08 13:22:31 -0800421
422 # Set an invalid signatures block (offset but no size), if required.
423 if fail_bad_sigs:
424 payload_gen.SetSignatures(32, None)
425
Gilad Arnold382df5c2013-05-03 12:49:28 -0700426 # Set partition / filesystem sizes.
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700427 rootfs_part_size = test_utils.MiB(8)
428 kernel_part_size = test_utils.KiB(512)
Gilad Arnold382df5c2013-05-03 12:49:28 -0700429 old_rootfs_fs_size = new_rootfs_fs_size = rootfs_part_size
430 old_kernel_fs_size = new_kernel_fs_size = kernel_part_size
431 if fail_old_kernel_fs_size:
432 old_kernel_fs_size += 100
433 if fail_old_rootfs_fs_size:
434 old_rootfs_fs_size += 100
435 if fail_new_kernel_fs_size:
436 new_kernel_fs_size += 100
437 if fail_new_rootfs_fs_size:
438 new_rootfs_fs_size += 100
439
Gilad Arnold5502b562013-03-08 13:22:31 -0800440 # Add old kernel/rootfs partition info, as required.
Gilad Arnold382df5c2013-05-03 12:49:28 -0700441 if fail_mismatched_oki_ori or fail_old_kernel_fs_size or fail_bad_oki:
Gilad Arnold5502b562013-03-08 13:22:31 -0800442 oki_hash = (None if fail_bad_oki
443 else hashlib.sha256('fake-oki-content').digest())
Gilad Arnold382df5c2013-05-03 12:49:28 -0700444 payload_gen.SetPartInfo(True, False, old_kernel_fs_size, oki_hash)
445 if not fail_mismatched_oki_ori and (fail_old_rootfs_fs_size or
446 fail_bad_ori):
447 ori_hash = (None if fail_bad_ori
448 else hashlib.sha256('fake-ori-content').digest())
449 payload_gen.SetPartInfo(False, False, old_rootfs_fs_size, ori_hash)
Gilad Arnold5502b562013-03-08 13:22:31 -0800450
451 # Add new kernel/rootfs partition info.
452 payload_gen.SetPartInfo(
Gilad Arnold382df5c2013-05-03 12:49:28 -0700453 True, True, new_kernel_fs_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800454 None if fail_bad_nki else hashlib.sha256('fake-nki-content').digest())
455 payload_gen.SetPartInfo(
Gilad Arnold382df5c2013-05-03 12:49:28 -0700456 False, True, new_rootfs_fs_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800457 None if fail_bad_nri else hashlib.sha256('fake-nri-content').digest())
458
459 # Create the test object.
460 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile)
461 report = checker._PayloadReport()
462
463 should_fail = (fail_mismatched_block_size or fail_bad_sigs or
464 fail_mismatched_oki_ori or fail_bad_oki or fail_bad_ori or
Gilad Arnold5bc7fbe2015-02-05 13:01:09 -0800465 fail_bad_nki or fail_bad_nri or fail_old_kernel_fs_size or
466 fail_old_rootfs_fs_size or fail_new_kernel_fs_size or
467 fail_new_rootfs_fs_size)
Gilad Arnold5502b562013-03-08 13:22:31 -0800468 if should_fail:
469 self.assertRaises(update_payload.PayloadError,
Gilad Arnold382df5c2013-05-03 12:49:28 -0700470 payload_checker._CheckManifest, report,
471 rootfs_part_size, kernel_part_size)
Gilad Arnold5502b562013-03-08 13:22:31 -0800472 else:
Gilad Arnold382df5c2013-05-03 12:49:28 -0700473 self.assertIsNone(payload_checker._CheckManifest(report,
474 rootfs_part_size,
475 kernel_part_size))
Gilad Arnold5502b562013-03-08 13:22:31 -0800476
477 def testCheckLength(self):
478 """Tests _CheckLength()."""
479 payload_checker = checker.PayloadChecker(self.MockPayload())
480 block_size = payload_checker.block_size
481
482 # Passes.
483 self.assertIsNone(payload_checker._CheckLength(
484 int(3.5 * block_size), 4, 'foo', 'bar'))
485 # Fails, too few blocks.
486 self.assertRaises(update_payload.PayloadError,
487 payload_checker._CheckLength,
488 int(3.5 * block_size), 3, 'foo', 'bar')
489 # Fails, too many blocks.
490 self.assertRaises(update_payload.PayloadError,
491 payload_checker._CheckLength,
492 int(3.5 * block_size), 5, 'foo', 'bar')
493
494 def testCheckExtents(self):
495 """Tests _CheckExtents()."""
496 payload_checker = checker.PayloadChecker(self.MockPayload())
497 block_size = payload_checker.block_size
498
499 # Passes w/ all real extents.
500 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
501 self.assertEquals(
Gilad Arnoldcb638912013-06-24 04:57:11 -0700502 23,
Gilad Arnold5502b562013-03-08 13:22:31 -0800503 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
Gilad Arnoldcb638912013-06-24 04:57:11 -0700504 collections.defaultdict(int), 'foo'))
Gilad Arnold5502b562013-03-08 13:22:31 -0800505
506 # Passes w/ pseudo-extents (aka sparse holes).
507 extents = self.NewExtentList((0, 4), (common.PSEUDO_EXTENT_MARKER, 5),
508 (8, 3))
509 self.assertEquals(
Gilad Arnoldcb638912013-06-24 04:57:11 -0700510 12,
Gilad Arnold5502b562013-03-08 13:22:31 -0800511 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
512 collections.defaultdict(int), 'foo',
Gilad Arnoldcb638912013-06-24 04:57:11 -0700513 allow_pseudo=True))
Gilad Arnold5502b562013-03-08 13:22:31 -0800514
515 # Passes w/ pseudo-extent due to a signature.
516 extents = self.NewExtentList((common.PSEUDO_EXTENT_MARKER, 2))
517 self.assertEquals(
Gilad Arnoldcb638912013-06-24 04:57:11 -0700518 2,
Gilad Arnold5502b562013-03-08 13:22:31 -0800519 payload_checker._CheckExtents(extents, (1024 + 16) * block_size,
520 collections.defaultdict(int), 'foo',
Gilad Arnoldcb638912013-06-24 04:57:11 -0700521 allow_signature=True))
Gilad Arnold5502b562013-03-08 13:22:31 -0800522
523 # Fails, extent missing a start block.
524 extents = self.NewExtentList((-1, 4), (8, 3), (1024, 16))
525 self.assertRaises(
526 update_payload.PayloadError, payload_checker._CheckExtents,
527 extents, (1024 + 16) * block_size, collections.defaultdict(int),
528 'foo')
529
530 # Fails, extent missing block count.
531 extents = self.NewExtentList((0, -1), (8, 3), (1024, 16))
532 self.assertRaises(
533 update_payload.PayloadError, payload_checker._CheckExtents,
534 extents, (1024 + 16) * block_size, collections.defaultdict(int),
535 'foo')
536
537 # Fails, extent has zero blocks.
538 extents = self.NewExtentList((0, 4), (8, 3), (1024, 0))
539 self.assertRaises(
540 update_payload.PayloadError, payload_checker._CheckExtents,
541 extents, (1024 + 16) * block_size, collections.defaultdict(int),
542 'foo')
543
544 # Fails, extent exceeds partition boundaries.
545 extents = self.NewExtentList((0, 4), (8, 3), (1024, 16))
546 self.assertRaises(
547 update_payload.PayloadError, payload_checker._CheckExtents,
548 extents, (1024 + 15) * block_size, collections.defaultdict(int),
549 'foo')
550
551 def testCheckReplaceOperation(self):
552 """Tests _CheckReplaceOperation() where op.type == REPLACE."""
553 payload_checker = checker.PayloadChecker(self.MockPayload())
554 block_size = payload_checker.block_size
555 data_length = 10000
556
557 op = self.mox.CreateMock(
558 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
559 op.type = common.OpType.REPLACE
560
561 # Pass.
562 op.src_extents = []
563 self.assertIsNone(
564 payload_checker._CheckReplaceOperation(
565 op, data_length, (data_length + block_size - 1) / block_size,
566 'foo'))
567
568 # Fail, src extents founds.
569 op.src_extents = ['bar']
570 self.assertRaises(
571 update_payload.PayloadError,
572 payload_checker._CheckReplaceOperation,
573 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
574
575 # Fail, missing data.
576 op.src_extents = []
577 self.assertRaises(
578 update_payload.PayloadError,
579 payload_checker._CheckReplaceOperation,
580 op, None, (data_length + block_size - 1) / block_size, 'foo')
581
582 # Fail, length / block number mismatch.
583 op.src_extents = ['bar']
584 self.assertRaises(
585 update_payload.PayloadError,
586 payload_checker._CheckReplaceOperation,
587 op, data_length, (data_length + block_size - 1) / block_size + 1, 'foo')
588
589 def testCheckReplaceBzOperation(self):
590 """Tests _CheckReplaceOperation() where op.type == REPLACE_BZ."""
591 payload_checker = checker.PayloadChecker(self.MockPayload())
592 block_size = payload_checker.block_size
593 data_length = block_size * 3
594
595 op = self.mox.CreateMock(
596 update_metadata_pb2.DeltaArchiveManifest.InstallOperation)
597 op.type = common.OpType.REPLACE_BZ
598
599 # Pass.
600 op.src_extents = []
601 self.assertIsNone(
602 payload_checker._CheckReplaceOperation(
603 op, data_length, (data_length + block_size - 1) / block_size + 5,
604 'foo'))
605
606 # Fail, src extents founds.
607 op.src_extents = ['bar']
608 self.assertRaises(
609 update_payload.PayloadError,
610 payload_checker._CheckReplaceOperation,
611 op, data_length, (data_length + block_size - 1) / block_size + 5, 'foo')
612
613 # Fail, missing data.
614 op.src_extents = []
615 self.assertRaises(
616 update_payload.PayloadError,
617 payload_checker._CheckReplaceOperation,
618 op, None, (data_length + block_size - 1) / block_size, 'foo')
619
620 # Fail, too few blocks to justify BZ.
621 op.src_extents = []
622 self.assertRaises(
623 update_payload.PayloadError,
624 payload_checker._CheckReplaceOperation,
625 op, data_length, (data_length + block_size - 1) / block_size, 'foo')
626
627 def testCheckMoveOperation_Pass(self):
628 """Tests _CheckMoveOperation(); pass case."""
629 payload_checker = checker.PayloadChecker(self.MockPayload())
630 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
631 op.type = common.OpType.MOVE
632
633 self.AddToMessage(op.src_extents,
634 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
635 self.AddToMessage(op.dst_extents,
636 self.NewExtentList((16, 128), (512, 6)))
637 self.assertIsNone(
638 payload_checker._CheckMoveOperation(op, None, 134, 134, 'foo'))
639
640 def testCheckMoveOperation_FailContainsData(self):
641 """Tests _CheckMoveOperation(); fails, message contains data."""
642 payload_checker = checker.PayloadChecker(self.MockPayload())
643 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
644 op.type = common.OpType.MOVE
645
646 self.AddToMessage(op.src_extents,
647 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
648 self.AddToMessage(op.dst_extents,
649 self.NewExtentList((16, 128), (512, 6)))
650 self.assertRaises(
651 update_payload.PayloadError,
652 payload_checker._CheckMoveOperation,
653 op, 1024, 134, 134, 'foo')
654
655 def testCheckMoveOperation_FailInsufficientSrcBlocks(self):
656 """Tests _CheckMoveOperation(); fails, not enough actual src blocks."""
657 payload_checker = checker.PayloadChecker(self.MockPayload())
658 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
659 op.type = common.OpType.MOVE
660
661 self.AddToMessage(op.src_extents,
662 self.NewExtentList((0, 4), (12, 2), (1024, 127)))
663 self.AddToMessage(op.dst_extents,
664 self.NewExtentList((16, 128), (512, 6)))
665 self.assertRaises(
666 update_payload.PayloadError,
667 payload_checker._CheckMoveOperation,
668 op, None, 134, 134, 'foo')
669
670 def testCheckMoveOperation_FailInsufficientDstBlocks(self):
671 """Tests _CheckMoveOperation(); fails, not enough actual dst blocks."""
672 payload_checker = checker.PayloadChecker(self.MockPayload())
673 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
674 op.type = common.OpType.MOVE
675
676 self.AddToMessage(op.src_extents,
677 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
678 self.AddToMessage(op.dst_extents,
679 self.NewExtentList((16, 128), (512, 5)))
680 self.assertRaises(
681 update_payload.PayloadError,
682 payload_checker._CheckMoveOperation,
683 op, None, 134, 134, 'foo')
684
685 def testCheckMoveOperation_FailExcessSrcBlocks(self):
686 """Tests _CheckMoveOperation(); fails, too many actual src blocks."""
687 payload_checker = checker.PayloadChecker(self.MockPayload())
688 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
689 op.type = common.OpType.MOVE
690
691 self.AddToMessage(op.src_extents,
692 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
693 self.AddToMessage(op.dst_extents,
694 self.NewExtentList((16, 128), (512, 5)))
695 self.assertRaises(
696 update_payload.PayloadError,
697 payload_checker._CheckMoveOperation,
698 op, None, 134, 134, 'foo')
699 self.AddToMessage(op.src_extents,
700 self.NewExtentList((0, 4), (12, 2), (1024, 129)))
701 self.AddToMessage(op.dst_extents,
702 self.NewExtentList((16, 128), (512, 6)))
703 self.assertRaises(
704 update_payload.PayloadError,
705 payload_checker._CheckMoveOperation,
706 op, None, 134, 134, 'foo')
707
708 def testCheckMoveOperation_FailExcessDstBlocks(self):
709 """Tests _CheckMoveOperation(); fails, too many actual dst blocks."""
710 payload_checker = checker.PayloadChecker(self.MockPayload())
711 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
712 op.type = common.OpType.MOVE
713
714 self.AddToMessage(op.src_extents,
715 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
716 self.AddToMessage(op.dst_extents,
717 self.NewExtentList((16, 128), (512, 7)))
718 self.assertRaises(
719 update_payload.PayloadError,
720 payload_checker._CheckMoveOperation,
721 op, None, 134, 134, 'foo')
722
723 def testCheckMoveOperation_FailStagnantBlocks(self):
724 """Tests _CheckMoveOperation(); fails, there are blocks that do not move."""
725 payload_checker = checker.PayloadChecker(self.MockPayload())
726 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
727 op.type = common.OpType.MOVE
728
729 self.AddToMessage(op.src_extents,
730 self.NewExtentList((0, 4), (12, 2), (1024, 128)))
731 self.AddToMessage(op.dst_extents,
732 self.NewExtentList((8, 128), (512, 6)))
733 self.assertRaises(
734 update_payload.PayloadError,
735 payload_checker._CheckMoveOperation,
736 op, None, 134, 134, 'foo')
737
738 def testCheckBsdiff(self):
739 """Tests _CheckMoveOperation()."""
740 payload_checker = checker.PayloadChecker(self.MockPayload())
741
742 # Pass.
743 self.assertIsNone(
744 payload_checker._CheckBsdiffOperation(10000, 3, 'foo'))
745
746 # Fail, missing data blob.
747 self.assertRaises(
748 update_payload.PayloadError,
749 payload_checker._CheckBsdiffOperation,
750 None, 3, 'foo')
751
752 # Fail, too big of a diff blob (unjustified).
753 self.assertRaises(
754 update_payload.PayloadError,
755 payload_checker._CheckBsdiffOperation,
756 10000, 2, 'foo')
757
758 def DoCheckOperationTest(self, op_type_name, is_last, allow_signature,
759 allow_unhashed, fail_src_extents, fail_dst_extents,
760 fail_mismatched_data_offset_length,
761 fail_missing_dst_extents, fail_src_length,
762 fail_dst_length, fail_data_hash,
763 fail_prev_data_offset):
764 """Parametric testing of _CheckOperation().
765
766 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700767 op_type_name: 'REPLACE', 'REPLACE_BZ', 'MOVE' or 'BSDIFF'.
768 is_last: Whether we're testing the last operation in a sequence.
769 allow_signature: Whether we're testing a signature-capable operation.
770 allow_unhashed: Whether we're allowing to not hash the data.
771 fail_src_extents: Tamper with src extents.
772 fail_dst_extents: Tamper with dst extents.
773 fail_mismatched_data_offset_length: Make data_{offset,length}
774 inconsistent.
775 fail_missing_dst_extents: Do not include dst extents.
776 fail_src_length: Make src length inconsistent.
777 fail_dst_length: Make dst length inconsistent.
778 fail_data_hash: Tamper with the data blob hash.
779 fail_prev_data_offset: Make data space uses incontiguous.
Gilad Arnold5502b562013-03-08 13:22:31 -0800780 """
781 op_type = _OpTypeByName(op_type_name)
782
783 # Create the test object.
784 payload = self.MockPayload()
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700785 payload_checker = checker.PayloadChecker(payload,
786 allow_unhashed=allow_unhashed)
Gilad Arnold5502b562013-03-08 13:22:31 -0800787 block_size = payload_checker.block_size
788
789 # Create auxiliary arguments.
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700790 old_part_size = test_utils.MiB(4)
791 new_part_size = test_utils.MiB(8)
Gilad Arnold5502b562013-03-08 13:22:31 -0800792 old_block_counters = array.array(
793 'B', [0] * ((old_part_size + block_size - 1) / block_size))
794 new_block_counters = array.array(
795 'B', [0] * ((new_part_size + block_size - 1) / block_size))
796 prev_data_offset = 1876
797 blob_hash_counts = collections.defaultdict(int)
798
799 # Create the operation object for the test.
800 op = update_metadata_pb2.DeltaArchiveManifest.InstallOperation()
801 op.type = op_type
802
803 total_src_blocks = 0
804 if op_type in (common.OpType.MOVE, common.OpType.BSDIFF):
805 if fail_src_extents:
806 self.AddToMessage(op.src_extents,
807 self.NewExtentList((0, 0)))
808 else:
809 self.AddToMessage(op.src_extents,
810 self.NewExtentList((0, 16)))
811 total_src_blocks = 16
812
813 if op_type != common.OpType.MOVE:
814 if not fail_mismatched_data_offset_length:
815 op.data_length = 16 * block_size - 8
816 if fail_prev_data_offset:
817 op.data_offset = prev_data_offset + 16
818 else:
819 op.data_offset = prev_data_offset
820
821 fake_data = 'fake-data'.ljust(op.data_length)
822 if not (allow_unhashed or (is_last and allow_signature and
823 op_type == common.OpType.REPLACE)):
824 if not fail_data_hash:
825 # Create a valid data blob hash.
826 op.data_sha256_hash = hashlib.sha256(fake_data).digest()
827 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
828 fake_data)
829 elif fail_data_hash:
830 # Create an invalid data blob hash.
831 op.data_sha256_hash = hashlib.sha256(
832 fake_data.replace(' ', '-')).digest()
833 payload.ReadDataBlob(op.data_offset, op.data_length).AndReturn(
834 fake_data)
835
836 total_dst_blocks = 0
837 if not fail_missing_dst_extents:
838 total_dst_blocks = 16
839 if fail_dst_extents:
840 self.AddToMessage(op.dst_extents,
841 self.NewExtentList((4, 16), (32, 0)))
842 else:
843 self.AddToMessage(op.dst_extents,
844 self.NewExtentList((4, 8), (64, 8)))
845
846 if total_src_blocks:
847 if fail_src_length:
848 op.src_length = total_src_blocks * block_size + 8
849 else:
850 op.src_length = total_src_blocks * block_size
851 elif fail_src_length:
852 # Add an orphaned src_length.
853 op.src_length = 16
854
855 if total_dst_blocks:
856 if fail_dst_length:
857 op.dst_length = total_dst_blocks * block_size + 8
858 else:
859 op.dst_length = total_dst_blocks * block_size
860
861 self.mox.ReplayAll()
862 should_fail = (fail_src_extents or fail_dst_extents or
863 fail_mismatched_data_offset_length or
864 fail_missing_dst_extents or fail_src_length or
865 fail_dst_length or fail_data_hash or fail_prev_data_offset)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700866 args = (op, 'foo', is_last, old_block_counters, new_block_counters,
867 old_part_size, new_part_size, prev_data_offset, allow_signature,
868 blob_hash_counts)
Gilad Arnold5502b562013-03-08 13:22:31 -0800869 if should_fail:
870 self.assertRaises(update_payload.PayloadError,
Gilad Arnoldcb638912013-06-24 04:57:11 -0700871 payload_checker._CheckOperation, *args)
Gilad Arnold5502b562013-03-08 13:22:31 -0800872 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700873 self.assertEqual(op.data_length if op.HasField('data_length') else 0,
874 payload_checker._CheckOperation(*args))
Gilad Arnold5502b562013-03-08 13:22:31 -0800875
876 def testAllocBlockCounters(self):
877 """Tests _CheckMoveOperation()."""
878 payload_checker = checker.PayloadChecker(self.MockPayload())
879 block_size = payload_checker.block_size
880
881 # Check allocation for block-aligned partition size, ensure it's integers.
882 result = payload_checker._AllocBlockCounters(16 * block_size)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700883 self.assertEqual(16, len(result))
884 self.assertEqual(int, type(result[0]))
Gilad Arnold5502b562013-03-08 13:22:31 -0800885
886 # Check allocation of unaligned partition sizes.
887 result = payload_checker._AllocBlockCounters(16 * block_size - 1)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700888 self.assertEqual(16, len(result))
Gilad Arnold5502b562013-03-08 13:22:31 -0800889 result = payload_checker._AllocBlockCounters(16 * block_size + 1)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700890 self.assertEqual(17, len(result))
Gilad Arnold5502b562013-03-08 13:22:31 -0800891
892 def DoCheckOperationsTest(self, fail_bad_type,
893 fail_nonexhaustive_full_update):
894 # Generate a test payload. For this test, we only care about one
895 # (arbitrary) set of operations, so we'll only be generating kernel and
896 # test with them.
897 payload_gen = test_utils.PayloadGenerator()
898
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700899 block_size = test_utils.KiB(4)
Gilad Arnold5502b562013-03-08 13:22:31 -0800900 payload_gen.SetBlockSize(block_size)
901
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700902 rootfs_part_size = test_utils.MiB(8)
Gilad Arnold5502b562013-03-08 13:22:31 -0800903
904 # Fake rootfs operations in a full update, tampered with as required.
905 rootfs_op_type = common.OpType.REPLACE
906 if fail_bad_type:
907 # Choose a type value that's bigger than the highest valid value.
908 for valid_op_type in common.OpType.ALL:
909 rootfs_op_type = max(rootfs_op_type, valid_op_type)
910 rootfs_op_type += 1
911
912 rootfs_data_length = rootfs_part_size
913 if fail_nonexhaustive_full_update:
914 rootfs_data_length -= block_size
915
916 payload_gen.AddOperation(False, rootfs_op_type,
917 dst_extents=[(0, rootfs_data_length / block_size)],
918 data_offset=0,
919 data_length=rootfs_data_length)
920
921 # Create the test object.
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700922 payload_checker = _GetPayloadChecker(payload_gen.WriteToFile,
923 checker_init_dargs={
924 'allow_unhashed': True})
Gilad Arnold5502b562013-03-08 13:22:31 -0800925 payload_checker.payload_type = checker._TYPE_FULL
926 report = checker._PayloadReport()
927
928 should_fail = (fail_bad_type or fail_nonexhaustive_full_update)
Gilad Arnoldcb638912013-06-24 04:57:11 -0700929 args = (payload_checker.payload.manifest.install_operations, report,
930 'foo', 0, rootfs_part_size, rootfs_part_size, 0, False)
Gilad Arnold5502b562013-03-08 13:22:31 -0800931 if should_fail:
932 self.assertRaises(update_payload.PayloadError,
Gilad Arnoldcb638912013-06-24 04:57:11 -0700933 payload_checker._CheckOperations, *args)
Gilad Arnold5502b562013-03-08 13:22:31 -0800934 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -0700935 self.assertEqual(rootfs_data_length,
936 payload_checker._CheckOperations(*args))
Gilad Arnold5502b562013-03-08 13:22:31 -0800937
938 def DoCheckSignaturesTest(self, fail_empty_sigs_blob, fail_missing_pseudo_op,
939 fail_mismatched_pseudo_op, fail_sig_missing_fields,
940 fail_unknown_sig_version, fail_incorrect_sig):
941 # Generate a test payload. For this test, we only care about the signature
942 # block and how it relates to the payload hash. Therefore, we're generating
943 # a random (otherwise useless) payload for this purpose.
944 payload_gen = test_utils.EnhancedPayloadGenerator()
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700945 block_size = test_utils.KiB(4)
Gilad Arnold5502b562013-03-08 13:22:31 -0800946 payload_gen.SetBlockSize(block_size)
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700947 rootfs_part_size = test_utils.MiB(2)
948 kernel_part_size = test_utils.KiB(16)
Gilad Arnold5502b562013-03-08 13:22:31 -0800949 payload_gen.SetPartInfo(False, True, rootfs_part_size,
950 hashlib.sha256('fake-new-rootfs-content').digest())
Gilad Arnold382df5c2013-05-03 12:49:28 -0700951 payload_gen.SetPartInfo(True, True, kernel_part_size,
Gilad Arnold5502b562013-03-08 13:22:31 -0800952 hashlib.sha256('fake-new-kernel-content').digest())
953 payload_gen.AddOperationWithData(
954 False, common.OpType.REPLACE,
955 dst_extents=[(0, rootfs_part_size / block_size)],
956 data_blob=os.urandom(rootfs_part_size))
957
958 do_forge_pseudo_op = (fail_missing_pseudo_op or fail_mismatched_pseudo_op)
959 do_forge_sigs_data = (do_forge_pseudo_op or fail_empty_sigs_blob or
960 fail_sig_missing_fields or fail_unknown_sig_version
961 or fail_incorrect_sig)
962
963 sigs_data = None
964 if do_forge_sigs_data:
965 sigs_gen = test_utils.SignaturesGenerator()
966 if not fail_empty_sigs_blob:
967 if fail_sig_missing_fields:
968 sig_data = None
969 else:
970 sig_data = test_utils.SignSha256('fake-payload-content',
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700971 test_utils._PRIVKEY_FILE_NAME)
Gilad Arnold5502b562013-03-08 13:22:31 -0800972 sigs_gen.AddSig(5 if fail_unknown_sig_version else 1, sig_data)
973
974 sigs_data = sigs_gen.ToBinary()
975 payload_gen.SetSignatures(payload_gen.curr_offset, len(sigs_data))
976
977 if do_forge_pseudo_op:
978 assert sigs_data is not None, 'should have forged signatures blob by now'
979 sigs_len = len(sigs_data)
980 payload_gen.AddOperation(
981 False, common.OpType.REPLACE,
982 data_offset=payload_gen.curr_offset / 2,
983 data_length=sigs_len / 2,
984 dst_extents=[(0, (sigs_len / 2 + block_size - 1) / block_size)])
985
986 # Generate payload (complete w/ signature) and create the test object.
987 payload_checker = _GetPayloadChecker(
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700988 payload_gen.WriteToFileWithData,
989 payload_gen_dargs={
990 'sigs_data': sigs_data,
Gilad Arnold18f4f9f2013-04-02 16:24:41 -0700991 'privkey_file_name': test_utils._PRIVKEY_FILE_NAME,
Gilad Arnoldeaed0d12013-04-30 15:38:22 -0700992 'do_add_pseudo_operation': not do_forge_pseudo_op})
Gilad Arnold5502b562013-03-08 13:22:31 -0800993 payload_checker.payload_type = checker._TYPE_FULL
994 report = checker._PayloadReport()
995
996 # We have to check the manifest first in order to set signature attributes.
Gilad Arnold382df5c2013-05-03 12:49:28 -0700997 payload_checker._CheckManifest(report, rootfs_part_size, kernel_part_size)
Gilad Arnold5502b562013-03-08 13:22:31 -0800998
999 should_fail = (fail_empty_sigs_blob or fail_missing_pseudo_op or
1000 fail_mismatched_pseudo_op or fail_sig_missing_fields or
1001 fail_unknown_sig_version or fail_incorrect_sig)
Gilad Arnoldcb638912013-06-24 04:57:11 -07001002 args = (report, test_utils._PUBKEY_FILE_NAME)
Gilad Arnold5502b562013-03-08 13:22:31 -08001003 if should_fail:
1004 self.assertRaises(update_payload.PayloadError,
Gilad Arnoldcb638912013-06-24 04:57:11 -07001005 payload_checker._CheckSignatures, *args)
Gilad Arnold5502b562013-03-08 13:22:31 -08001006 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001007 self.assertIsNone(payload_checker._CheckSignatures(*args))
Gilad Arnold5502b562013-03-08 13:22:31 -08001008
1009 def DoRunTest(self, fail_wrong_payload_type, fail_invalid_block_size,
1010 fail_mismatched_block_size, fail_excess_data):
1011 # Generate a test payload. For this test, we generate a full update that
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001012 # has sample kernel and rootfs operations. Since most testing is done with
Gilad Arnold5502b562013-03-08 13:22:31 -08001013 # internal PayloadChecker methods that are tested elsewhere, here we only
1014 # tamper with what's actually being manipulated and/or tested in the Run()
1015 # method itself. Note that the checker doesn't verify partition hashes, so
1016 # they're safe to fake.
1017 payload_gen = test_utils.EnhancedPayloadGenerator()
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001018 block_size = test_utils.KiB(4)
Gilad Arnold5502b562013-03-08 13:22:31 -08001019 payload_gen.SetBlockSize(block_size)
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001020 kernel_part_size = test_utils.KiB(16)
1021 rootfs_part_size = test_utils.MiB(2)
Gilad Arnold5502b562013-03-08 13:22:31 -08001022 payload_gen.SetPartInfo(False, True, rootfs_part_size,
1023 hashlib.sha256('fake-new-rootfs-content').digest())
1024 payload_gen.SetPartInfo(True, True, kernel_part_size,
1025 hashlib.sha256('fake-new-kernel-content').digest())
1026 payload_gen.AddOperationWithData(
1027 False, common.OpType.REPLACE,
1028 dst_extents=[(0, rootfs_part_size / block_size)],
1029 data_blob=os.urandom(rootfs_part_size))
1030 payload_gen.AddOperationWithData(
1031 True, common.OpType.REPLACE,
1032 dst_extents=[(0, kernel_part_size / block_size)],
1033 data_blob=os.urandom(kernel_part_size))
1034
1035 # Generate payload (complete w/ signature) and create the test object.
Gilad Arnold5502b562013-03-08 13:22:31 -08001036 if fail_invalid_block_size:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001037 use_block_size = block_size + 5 # Not a power of two.
Gilad Arnold5502b562013-03-08 13:22:31 -08001038 elif fail_mismatched_block_size:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001039 use_block_size = block_size * 2 # Different that payload stated.
Gilad Arnold5502b562013-03-08 13:22:31 -08001040 else:
1041 use_block_size = block_size
Gilad Arnold5502b562013-03-08 13:22:31 -08001042
Gilad Arnoldcb638912013-06-24 04:57:11 -07001043 kwargs = {
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001044 'payload_gen_dargs': {
Gilad Arnold18f4f9f2013-04-02 16:24:41 -07001045 'privkey_file_name': test_utils._PRIVKEY_FILE_NAME,
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001046 'do_add_pseudo_operation': True,
1047 'is_pseudo_in_kernel': True,
1048 'padding': os.urandom(1024) if fail_excess_data else None},
1049 'checker_init_dargs': {
1050 'assert_type': 'delta' if fail_wrong_payload_type else 'full',
1051 'block_size': use_block_size}}
1052 if fail_invalid_block_size:
1053 self.assertRaises(update_payload.PayloadError, _GetPayloadChecker,
Gilad Arnoldcb638912013-06-24 04:57:11 -07001054 payload_gen.WriteToFileWithData, **kwargs)
Gilad Arnold5502b562013-03-08 13:22:31 -08001055 else:
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001056 payload_checker = _GetPayloadChecker(payload_gen.WriteToFileWithData,
Gilad Arnoldcb638912013-06-24 04:57:11 -07001057 **kwargs)
1058 kwargs = {'pubkey_file_name': test_utils._PUBKEY_FILE_NAME}
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001059 should_fail = (fail_wrong_payload_type or fail_mismatched_block_size or
1060 fail_excess_data)
1061 if should_fail:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001062 self.assertRaises(update_payload.PayloadError, payload_checker.Run,
1063 **kwargs)
Gilad Arnoldeaed0d12013-04-30 15:38:22 -07001064 else:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001065 self.assertIsNone(payload_checker.Run(**kwargs))
Gilad Arnold5502b562013-03-08 13:22:31 -08001066
1067
1068# This implements a generic API, hence the occasional unused args.
1069# pylint: disable=W0613
1070def ValidateCheckOperationTest(op_type_name, is_last, allow_signature,
1071 allow_unhashed, fail_src_extents,
1072 fail_dst_extents,
1073 fail_mismatched_data_offset_length,
1074 fail_missing_dst_extents, fail_src_length,
1075 fail_dst_length, fail_data_hash,
1076 fail_prev_data_offset):
1077 """Returns True iff the combination of arguments represents a valid test."""
1078 op_type = _OpTypeByName(op_type_name)
1079
1080 # REPLACE/REPLACE_BZ operations don't read data from src partition.
1081 if (op_type in (common.OpType.REPLACE, common.OpType.REPLACE_BZ) and (
1082 fail_src_extents or fail_src_length)):
1083 return False
1084
1085 # MOVE operations don't carry data.
1086 if (op_type == common.OpType.MOVE and (
1087 fail_mismatched_data_offset_length or fail_data_hash or
1088 fail_prev_data_offset)):
1089 return False
1090
1091 return True
1092
1093
1094def TestMethodBody(run_method_name, run_dargs):
1095 """Returns a function that invokes a named method with named arguments."""
1096 return lambda self: getattr(self, run_method_name)(**run_dargs)
1097
1098
1099def AddParametricTests(tested_method_name, arg_space, validate_func=None):
1100 """Enumerates and adds specific parametric tests to PayloadCheckerTest.
1101
1102 This function enumerates a space of test parameters (defined by arg_space),
1103 then binds a new, unique method name in PayloadCheckerTest to a test function
1104 that gets handed the said parameters. This is a preferable approach to doing
1105 the enumeration and invocation during the tests because this way each test is
1106 treated as a complete run by the unittest framework, and so benefits from the
1107 usual setUp/tearDown mechanics.
1108
1109 Args:
Gilad Arnoldcb638912013-06-24 04:57:11 -07001110 tested_method_name: Name of the tested PayloadChecker method.
1111 arg_space: A dictionary containing variables (keys) and lists of values
1112 (values) associated with them.
1113 validate_func: A function used for validating test argument combinations.
Gilad Arnold5502b562013-03-08 13:22:31 -08001114 """
1115 for value_tuple in itertools.product(*arg_space.itervalues()):
1116 run_dargs = dict(zip(arg_space.iterkeys(), value_tuple))
1117 if validate_func and not validate_func(**run_dargs):
1118 continue
1119 run_method_name = 'Do%sTest' % tested_method_name
1120 test_method_name = 'test%s' % tested_method_name
1121 for arg_key, arg_val in run_dargs.iteritems():
1122 if arg_val or type(arg_val) is int:
1123 test_method_name += '__%s=%s' % (arg_key, arg_val)
1124 setattr(PayloadCheckerTest, test_method_name,
1125 TestMethodBody(run_method_name, run_dargs))
1126
1127
1128def AddAllParametricTests():
1129 """Enumerates and adds all parametric tests to PayloadCheckerTest."""
1130 # Add all _CheckElem() test cases.
1131 AddParametricTests('AddElem',
1132 {'linebreak': (True, False),
1133 'indent': (0, 1, 2),
1134 'convert': (str, lambda s: s[::-1]),
1135 'is_present': (True, False),
1136 'is_mandatory': (True, False),
1137 'is_submsg': (True, False)})
1138
1139 # Add all _Add{Mandatory,Optional}Field tests.
1140 AddParametricTests('AddField',
1141 {'is_mandatory': (True, False),
1142 'linebreak': (True, False),
1143 'indent': (0, 1, 2),
1144 'convert': (str, lambda s: s[::-1]),
1145 'is_present': (True, False)})
1146
1147 # Add all _Add{Mandatory,Optional}SubMsg tests.
1148 AddParametricTests('AddSubMsg',
1149 {'is_mandatory': (True, False),
1150 'is_present': (True, False)})
1151
1152 # Add all _CheckManifest() test cases.
1153 AddParametricTests('CheckManifest',
1154 {'fail_mismatched_block_size': (True, False),
1155 'fail_bad_sigs': (True, False),
1156 'fail_mismatched_oki_ori': (True, False),
1157 'fail_bad_oki': (True, False),
1158 'fail_bad_ori': (True, False),
1159 'fail_bad_nki': (True, False),
1160 'fail_bad_nri': (True, False),
Gilad Arnold382df5c2013-05-03 12:49:28 -07001161 'fail_old_kernel_fs_size': (True, False),
1162 'fail_old_rootfs_fs_size': (True, False),
1163 'fail_new_kernel_fs_size': (True, False),
1164 'fail_new_rootfs_fs_size': (True, False)})
Gilad Arnold5502b562013-03-08 13:22:31 -08001165
1166 # Add all _CheckOperation() test cases.
1167 AddParametricTests('CheckOperation',
1168 {'op_type_name': ('REPLACE', 'REPLACE_BZ', 'MOVE',
1169 'BSDIFF'),
1170 'is_last': (True, False),
1171 'allow_signature': (True, False),
1172 'allow_unhashed': (True, False),
1173 'fail_src_extents': (True, False),
1174 'fail_dst_extents': (True, False),
1175 'fail_mismatched_data_offset_length': (True, False),
1176 'fail_missing_dst_extents': (True, False),
1177 'fail_src_length': (True, False),
1178 'fail_dst_length': (True, False),
1179 'fail_data_hash': (True, False),
1180 'fail_prev_data_offset': (True, False)},
1181 validate_func=ValidateCheckOperationTest)
1182
1183 # Add all _CheckOperations() test cases.
1184 AddParametricTests('CheckOperations',
1185 {'fail_bad_type': (True, False),
1186 'fail_nonexhaustive_full_update': (True, False)})
1187
1188 # Add all _CheckOperations() test cases.
1189 AddParametricTests('CheckSignatures',
1190 {'fail_empty_sigs_blob': (True, False),
1191 'fail_missing_pseudo_op': (True, False),
1192 'fail_mismatched_pseudo_op': (True, False),
1193 'fail_sig_missing_fields': (True, False),
1194 'fail_unknown_sig_version': (True, False),
1195 'fail_incorrect_sig': (True, False)})
1196
1197 # Add all Run() test cases.
1198 AddParametricTests('Run',
1199 {'fail_wrong_payload_type': (True, False),
1200 'fail_invalid_block_size': (True, False),
1201 'fail_mismatched_block_size': (True, False),
1202 'fail_excess_data': (True, False)})
1203
1204
1205if __name__ == '__main__':
1206 AddAllParametricTests()
1207 unittest.main()