blob: 6d455d526b8e25a0a56cd70bac4a03f0ed03920f [file] [log] [blame]
Gilad Arnold5502b562013-03-08 13:22:31 -08001# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Utilities for unit testing."""
6
7import cStringIO
8import hashlib
9import struct
10import subprocess
11
12import common
13import payload
14import update_metadata_pb2
15
16
17class TestError(Exception):
18 """An error during testing of update payload code."""
19
20
Gilad Arnold18f4f9f2013-04-02 16:24:41 -070021# Private/public RSA keys used for testing.
22_PRIVKEY_FILE_NAME = 'payload-test-key.pem'
23_PUBKEY_FILE_NAME = 'payload-test-key.pub'
24
25
26def KiB(count):
27 return count << 10
28
29
30def MiB(count):
31 return count << 20
32
33
34def GiB(count):
35 return count << 30
36
37
Gilad Arnold5502b562013-03-08 13:22:31 -080038def _WriteInt(file_obj, size, is_unsigned, val):
39 """Writes a binary-encoded integer to a file.
40
41 It will do the correct conversion based on the reported size and whether or
42 not a signed number is expected. Assumes a network (big-endian) byte
43 ordering.
44
45 Args:
46 file_obj: a file object
47 size: the integer size in bytes (2, 4 or 8)
48 is_unsigned: whether it is signed or not
49 val: integer value to encode
50 Raises:
51 PayloadError if a write error occurred.
52
53 """
54 try:
55 file_obj.write(struct.pack(common.IntPackingFmtStr(size, is_unsigned), val))
56 except IOError, e:
57 raise payload.PayloadError('error writing to file (%s): %s' %
58 (file_obj.name, e))
59
60
61def _SetMsgField(msg, field_name, val):
62 """Sets or clears a field in a protobuf message."""
63 if val is None:
64 msg.ClearField(field_name)
65 else:
66 setattr(msg, field_name, val)
67
68
69def SignSha256(data, privkey_file_name):
70 """Signs the data's SHA256 hash with an RSA private key.
71
72 Args:
73 data: the data whose SHA256 hash we want to sign
74 privkey_file_name: private key used for signing data
75 Returns:
76 The signature string, prepended with an ASN1 header.
77 Raises:
78 TestError if something goes wrong.
79
80 """
81 # pylint: disable=E1101
82 data_sha256_hash = common.SIG_ASN1_HEADER + hashlib.sha256(data).digest()
83 sign_cmd = ['openssl', 'rsautl', '-sign', '-inkey', privkey_file_name]
84 try:
85 sign_process = subprocess.Popen(sign_cmd, stdin=subprocess.PIPE,
86 stdout=subprocess.PIPE)
87 sig, _ = sign_process.communicate(input=data_sha256_hash)
88 except Exception as e:
89 raise TestError('signing subprocess failed: %s' % e)
90
91 return sig
92
93
94class SignaturesGenerator(object):
95 """Generates a payload signatures data block."""
96
97 def __init__(self):
98 self.sigs = update_metadata_pb2.Signatures()
99
100 def AddSig(self, version, data):
101 """Adds a signature to the signature sequence.
102
103 Args:
104 version: signature version (None means do not assign)
105 data: signature binary data (None means do not assign)
106
107 """
108 # Pylint fails to identify a member of the Signatures message.
109 # pylint: disable=E1101
110 sig = self.sigs.signatures.add()
111 if version is not None:
112 sig.version = version
113 if data is not None:
114 sig.data = data
115
116 def ToBinary(self):
117 """Returns the binary representation of the signature block."""
118 return self.sigs.SerializeToString()
119
120
121class PayloadGenerator(object):
122 """Generates an update payload allowing low-level control.
123
124 Attributes:
125 manifest: the protobuf containing the payload manifest
126 version: the payload version identifier
127 block_size: the block size pertaining to update operations
128
129 """
130
131 def __init__(self, version=1):
132 self.manifest = update_metadata_pb2.DeltaArchiveManifest()
133 self.version = version
134 self.block_size = 0
135
136 @staticmethod
137 def _WriteExtent(ex, val):
138 """Returns an Extent message."""
139 start_block, num_blocks = val
140 _SetMsgField(ex, 'start_block', start_block)
141 _SetMsgField(ex, 'num_blocks', num_blocks)
142
143 @staticmethod
144 def _AddValuesToRepeatedField(repeated_field, values, write_func):
145 """Adds values to a repeated message field."""
146 if values:
147 for val in values:
148 new_item = repeated_field.add()
149 write_func(new_item, val)
150
151 @staticmethod
152 def _AddExtents(extents_field, values):
153 """Adds extents to an extents field."""
154 PayloadGenerator._AddValuesToRepeatedField(
155 extents_field, values, PayloadGenerator._WriteExtent)
156
157 def SetBlockSize(self, block_size):
158 """Sets the payload's block size."""
159 self.block_size = block_size
160 _SetMsgField(self.manifest, 'block_size', block_size)
161
162 def SetPartInfo(self, is_kernel, is_new, part_size, part_hash):
163 """Set the partition info entry.
164
165 Args:
166 is_kernel: whether this is kernel partition info
167 is_new: whether to set old (False) or new (True) info
168 part_size: the partition size (in fact, filesystem size)
169 part_hash: the partition hash
170
171 """
172 if is_kernel:
173 # pylint: disable=E1101
174 part_info = (self.manifest.new_kernel_info if is_new
175 else self.manifest.old_kernel_info)
176 else:
177 # pylint: disable=E1101
178 part_info = (self.manifest.new_rootfs_info if is_new
179 else self.manifest.old_rootfs_info)
180 _SetMsgField(part_info, 'size', part_size)
181 _SetMsgField(part_info, 'hash', part_hash)
182
183 def AddOperation(self, is_kernel, op_type, data_offset=None,
184 data_length=None, src_extents=None, src_length=None,
185 dst_extents=None, dst_length=None, data_sha256_hash=None):
186 """Adds an InstallOperation entry."""
187 # pylint: disable=E1101
188 operations = (self.manifest.kernel_install_operations if is_kernel
189 else self.manifest.install_operations)
190
191 op = operations.add()
192 op.type = op_type
193
194 _SetMsgField(op, 'data_offset', data_offset)
195 _SetMsgField(op, 'data_length', data_length)
196
197 self._AddExtents(op.src_extents, src_extents)
198 _SetMsgField(op, 'src_length', src_length)
199
200 self._AddExtents(op.dst_extents, dst_extents)
201 _SetMsgField(op, 'dst_length', dst_length)
202
203 _SetMsgField(op, 'data_sha256_hash', data_sha256_hash)
204
205 def SetSignatures(self, sigs_offset, sigs_size):
206 """Set the payload's signature block descriptors."""
207 _SetMsgField(self.manifest, 'signatures_offset', sigs_offset)
208 _SetMsgField(self.manifest, 'signatures_size', sigs_size)
209
210 def _WriteHeaderToFile(self, file_obj, manifest_len):
211 """Writes a payload heaer to a file."""
212 # We need to access protected members in Payload for writing the header.
213 # pylint: disable=W0212
214 file_obj.write(payload.Payload._MAGIC)
215 _WriteInt(file_obj, payload.Payload._VERSION_SIZE, True, self.version)
216 _WriteInt(file_obj, payload.Payload._MANIFEST_LEN_SIZE, True, manifest_len)
217
218 def WriteToFile(self, file_obj, manifest_len=-1, data_blobs=None,
219 sigs_data=None, padding=None):
220 """Writes the payload content to a file.
221
222 Args:
223 file_obj: a file object open for writing
224 manifest_len: manifest len to dump (otherwise computed automatically)
225 data_blobs: a list of data blobs to be concatenated to the payload
226 sigs_data: a binary Signatures message to be concatenated to the payload
227 padding: stuff to dump past the normal data blobs provided (optional)
228
229 """
230 manifest = self.manifest.SerializeToString()
231 if manifest_len < 0:
232 manifest_len = len(manifest)
233 self._WriteHeaderToFile(file_obj, manifest_len)
234 file_obj.write(manifest)
235 if data_blobs:
236 for data_blob in data_blobs:
237 file_obj.write(data_blob)
238 if sigs_data:
239 file_obj.write(sigs_data)
240 if padding:
241 file_obj.write(padding)
242
243
244class EnhancedPayloadGenerator(PayloadGenerator):
245 """Payload generator with automatic handling of data blobs.
246
247 Attributes:
248 data_blobs: a list of blobs, in the order they were added
249 curr_offset: the currently consumed offset of blobs added to the payload
250
251 """
252
253 def __init__(self):
254 super(EnhancedPayloadGenerator, self).__init__()
255 self.data_blobs = []
256 self.curr_offset = 0
257
258 def AddData(self, data_blob):
259 """Adds a (possibly orphan) data blob."""
260 data_length = len(data_blob)
261 data_offset = self.curr_offset
262 self.curr_offset += data_length
263 self.data_blobs.append(data_blob)
264 return data_length, data_offset
265
266 def AddOperationWithData(self, is_kernel, op_type, src_extents=None,
267 src_length=None, dst_extents=None, dst_length=None,
268 data_blob=None, do_hash_data_blob=True):
269 """Adds an install operation and associated data blob.
270
271 This takes care of obtaining a hash of the data blob (if so instructed)
272 and appending it to the internally maintained list of blobs, including the
273 necessary offset/length accounting.
274
275 Args:
276 is_kernel: whether this is a kernel (True) or rootfs (False) operation
277 op_type: one of REPLACE, REPLACE_BZ, MOVE or BSDIFF
278 src_extents: list of (start, length) pairs indicating src block ranges
279 src_length: size of the src data in bytes (needed for BSDIFF)
280 dst_extents: list of (start, length) pairs indicating dst block ranges
281 dst_length: size of the dst data in bytes (needed for BSDIFF)
282 data_blob: a data blob associated with this operation
283 do_hash_data_blob: whether or not to compute and add a data blob hash
284
285 """
286 data_offset = data_length = data_sha256_hash = None
287 if data_blob is not None:
288 if do_hash_data_blob:
289 # pylint: disable=E1101
290 data_sha256_hash = hashlib.sha256(data_blob).digest()
291 data_length, data_offset = self.AddData(data_blob)
292
293 self.AddOperation(is_kernel, op_type, data_offset=data_offset,
294 data_length=data_length, src_extents=src_extents,
295 src_length=src_length, dst_extents=dst_extents,
296 dst_length=dst_length, data_sha256_hash=data_sha256_hash)
297
298 def WriteToFileWithData(self, file_obj, sigs_data=None,
299 privkey_file_name=None,
300 do_add_pseudo_operation=False,
301 is_pseudo_in_kernel=False, padding=None):
302 """Writes the payload content to a file, optionally signing the content.
303
304 Args:
305 file_obj: a file object open for writing
306 sigs_data: signatures blob to be appended to the payload (optional;
307 payload signature fields assumed to be preset by the caller)
308 privkey_file_name: key used for signing the payload (optional; used only
309 if explicit signatures blob not provided)
310 do_add_pseudo_operation: whether a pseudo-operation should be added to
311 account for the signature blob
312 is_pseudo_in_kernel: whether the pseudo-operation should be added to
313 kernel (True) or rootfs (False) operations
314 padding: stuff to dump past the normal data blobs provided (optional)
315 Raises:
316 TestError: if arguments are inconsistent or something goes wrong.
317
318 """
319 sigs_len = len(sigs_data) if sigs_data else 0
320
321 # Do we need to generate a genuine signatures blob?
322 do_generate_sigs_data = sigs_data is None and privkey_file_name
323
324 if do_generate_sigs_data:
325 # First, sign some arbitrary data to obtain the size of a signature blob.
326 fake_sig = SignSha256('fake-payload-data', privkey_file_name)
327 fake_sigs_gen = SignaturesGenerator()
328 fake_sigs_gen.AddSig(1, fake_sig)
329 sigs_len = len(fake_sigs_gen.ToBinary())
330
331 # Update the payload with proper signature attributes.
332 self.SetSignatures(self.curr_offset, sigs_len)
333
334 # Add a pseudo-operation to account for the signature blob, if requested.
335 if do_add_pseudo_operation:
336 if not self.block_size:
337 raise TestError('cannot add pseudo-operation without knowing the '
338 'payload block size')
339 self.AddOperation(
340 is_pseudo_in_kernel, common.OpType.REPLACE,
341 data_offset=self.curr_offset, data_length=sigs_len,
342 dst_extents=[(common.PSEUDO_EXTENT_MARKER,
343 (sigs_len + self.block_size - 1) / self.block_size)])
344
345 if do_generate_sigs_data:
346 # Once all payload fields are updated, dump and sign it.
347 temp_payload_file = cStringIO.StringIO()
348 self.WriteToFile(temp_payload_file, data_blobs=self.data_blobs)
349 sig = SignSha256(temp_payload_file.getvalue(), privkey_file_name)
350 sigs_gen = SignaturesGenerator()
351 sigs_gen.AddSig(1, sig)
352 sigs_data = sigs_gen.ToBinary()
353 assert len(sigs_data) == sigs_len, 'signature blob lengths mismatch'
354
355 # Dump the whole thing, complete with data and signature blob, to a file.
356 self.WriteToFile(file_obj, data_blobs=self.data_blobs, sigs_data=sigs_data,
357 padding=padding)