blob: 619eda54f863ee5a4e29ad6b82f643d0009eebad [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Tao Baofc7e0e02018-02-13 13:54:02 -080016
Dan Albert8e0178d2015-01-27 15:53:15 -080017import os
18import tempfile
19import time
20import unittest
21import zipfile
Tao Bao31b08072017-11-08 15:50:59 -080022from hashlib import sha1
23
Dan Albert8e0178d2015-01-27 15:53:15 -080024import common
Tao Bao04e1f012018-02-04 12:13:35 -080025import test_utils
Tianjie Xu9c384d22017-06-20 17:00:55 -070026import validate_target_files
Tao Baofc7e0e02018-02-13 13:54:02 -080027from rangelib import RangeSet
Dan Albert8e0178d2015-01-27 15:53:15 -080028
Tao Bao04e1f012018-02-04 12:13:35 -080029
Tao Bao31b08072017-11-08 15:50:59 -080030KiB = 1024
31MiB = 1024 * KiB
32GiB = 1024 * MiB
Dan Albert8e0178d2015-01-27 15:53:15 -080033
Tao Bao1c830bf2017-12-25 10:43:47 -080034
Tao Baof3282b42015-04-01 11:21:55 -070035def get_2gb_string():
Tao Bao31b08072017-11-08 15:50:59 -080036 size = int(2 * GiB + 1)
37 block_size = 4 * KiB
38 step_size = 4 * MiB
39 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
40 for _ in range(0, size, step_size):
41 yield os.urandom(block_size)
42 yield '\0' * (step_size - block_size)
Tao Baof3282b42015-04-01 11:21:55 -070043
Dan Albert8e0178d2015-01-27 15:53:15 -080044
45class CommonZipTest(unittest.TestCase):
Tao Bao31b08072017-11-08 15:50:59 -080046 def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
Tao Baof3282b42015-04-01 11:21:55 -070047 test_file_name=None, expected_stat=None, expected_mode=0o644,
48 expected_compress_type=zipfile.ZIP_STORED):
49 # Verify the stat if present.
50 if test_file_name is not None:
51 new_stat = os.stat(test_file_name)
52 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
53 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
54
55 # Reopen the zip file to verify.
56 zip_file = zipfile.ZipFile(zip_file_name, "r")
57
58 # Verify the timestamp.
59 info = zip_file.getinfo(arcname)
60 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
61
62 # Verify the file mode.
63 mode = (info.external_attr >> 16) & 0o777
64 self.assertEqual(mode, expected_mode)
65
66 # Verify the compress type.
67 self.assertEqual(info.compress_type, expected_compress_type)
68
69 # Verify the zip contents.
Tao Bao31b08072017-11-08 15:50:59 -080070 entry = zip_file.open(arcname)
71 sha1_hash = sha1()
72 for chunk in iter(lambda: entry.read(4 * MiB), ''):
73 sha1_hash.update(chunk)
74 self.assertEqual(expected_hash, sha1_hash.hexdigest())
Tao Baof3282b42015-04-01 11:21:55 -070075 self.assertIsNone(zip_file.testzip())
76
Dan Albert8e0178d2015-01-27 15:53:15 -080077 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
78 extra_zipwrite_args = dict(extra_zipwrite_args or {})
79
80 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080081 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070082
83 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080084 zip_file_name = zip_file.name
85
86 # File names within an archive strip the leading slash.
87 arcname = extra_zipwrite_args.get("arcname", test_file_name)
88 if arcname[0] == "/":
89 arcname = arcname[1:]
90
91 zip_file.close()
92 zip_file = zipfile.ZipFile(zip_file_name, "w")
93
94 try:
Tao Bao31b08072017-11-08 15:50:59 -080095 sha1_hash = sha1()
96 for data in contents:
97 sha1_hash.update(data)
98 test_file.write(data)
Dan Albert8e0178d2015-01-27 15:53:15 -080099 test_file.close()
100
Tao Baof3282b42015-04-01 11:21:55 -0700101 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -0800102 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700103 expected_compress_type = extra_zipwrite_args.get("compress_type",
104 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -0800105 time.sleep(5) # Make sure the atime/mtime will change measurably.
106
107 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700108 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800109
Tao Bao31b08072017-11-08 15:50:59 -0800110 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
111 test_file_name, expected_stat, expected_mode,
112 expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800113 finally:
114 os.remove(test_file_name)
115 os.remove(zip_file_name)
116
Tao Baof3282b42015-04-01 11:21:55 -0700117 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
118 extra_args = dict(extra_args or {})
119
120 zip_file = tempfile.NamedTemporaryFile(delete=False)
121 zip_file_name = zip_file.name
122 zip_file.close()
123
124 zip_file = zipfile.ZipFile(zip_file_name, "w")
125
126 try:
127 expected_compress_type = extra_args.get("compress_type",
128 zipfile.ZIP_STORED)
129 time.sleep(5) # Make sure the atime/mtime will change measurably.
130
131 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
Tao Bao58c1b962015-05-20 09:32:18 -0700132 arcname = zinfo_or_arcname
133 expected_mode = extra_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700134 else:
Tao Bao58c1b962015-05-20 09:32:18 -0700135 arcname = zinfo_or_arcname.filename
136 expected_mode = extra_args.get("perms",
137 zinfo_or_arcname.external_attr >> 16)
Tao Baof3282b42015-04-01 11:21:55 -0700138
Tao Bao58c1b962015-05-20 09:32:18 -0700139 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
Tao Baof3282b42015-04-01 11:21:55 -0700140 common.ZipClose(zip_file)
141
Tao Bao31b08072017-11-08 15:50:59 -0800142 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700143 expected_mode=expected_mode,
Tao Baof3282b42015-04-01 11:21:55 -0700144 expected_compress_type=expected_compress_type)
145 finally:
146 os.remove(zip_file_name)
147
148 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
149 extra_args = dict(extra_args or {})
150
151 zip_file = tempfile.NamedTemporaryFile(delete=False)
152 zip_file_name = zip_file.name
153
154 test_file = tempfile.NamedTemporaryFile(delete=False)
155 test_file_name = test_file.name
156
157 arcname_large = test_file_name
158 arcname_small = "bar"
159
160 # File names within an archive strip the leading slash.
161 if arcname_large[0] == "/":
162 arcname_large = arcname_large[1:]
163
164 zip_file.close()
165 zip_file = zipfile.ZipFile(zip_file_name, "w")
166
167 try:
Tao Bao31b08072017-11-08 15:50:59 -0800168 sha1_hash = sha1()
169 for data in large:
170 sha1_hash.update(data)
171 test_file.write(data)
Tao Baof3282b42015-04-01 11:21:55 -0700172 test_file.close()
173
174 expected_stat = os.stat(test_file_name)
175 expected_mode = 0o644
176 expected_compress_type = extra_args.get("compress_type",
177 zipfile.ZIP_STORED)
178 time.sleep(5) # Make sure the atime/mtime will change measurably.
179
180 common.ZipWrite(zip_file, test_file_name, **extra_args)
181 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
182 common.ZipClose(zip_file)
183
184 # Verify the contents written by ZipWrite().
Tao Bao31b08072017-11-08 15:50:59 -0800185 self._verify(zip_file, zip_file_name, arcname_large,
186 sha1_hash.hexdigest(), test_file_name, expected_stat,
187 expected_mode, expected_compress_type)
Tao Baof3282b42015-04-01 11:21:55 -0700188
189 # Verify the contents written by ZipWriteStr().
Tao Bao31b08072017-11-08 15:50:59 -0800190 self._verify(zip_file, zip_file_name, arcname_small,
191 sha1(small).hexdigest(),
Tao Baof3282b42015-04-01 11:21:55 -0700192 expected_compress_type=expected_compress_type)
193 finally:
194 os.remove(zip_file_name)
195 os.remove(test_file_name)
196
197 def _test_reset_ZIP64_LIMIT(self, func, *args):
198 default_limit = (1 << 31) - 1
199 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
200 func(*args)
201 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
202
Dan Albert8e0178d2015-01-27 15:53:15 -0800203 def test_ZipWrite(self):
204 file_contents = os.urandom(1024)
205 self._test_ZipWrite(file_contents)
206
207 def test_ZipWrite_with_opts(self):
208 file_contents = os.urandom(1024)
209 self._test_ZipWrite(file_contents, {
210 "arcname": "foobar",
211 "perms": 0o777,
212 "compress_type": zipfile.ZIP_DEFLATED,
213 })
Tao Baof3282b42015-04-01 11:21:55 -0700214 self._test_ZipWrite(file_contents, {
215 "arcname": "foobar",
216 "perms": 0o700,
217 "compress_type": zipfile.ZIP_STORED,
218 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800219
220 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700221 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800222 self._test_ZipWrite(file_contents, {
223 "compress_type": zipfile.ZIP_DEFLATED,
224 })
225
226 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700227 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
228
229 def test_ZipWriteStr(self):
230 random_string = os.urandom(1024)
231 # Passing arcname
232 self._test_ZipWriteStr("foo", random_string)
233
234 # Passing zinfo
235 zinfo = zipfile.ZipInfo(filename="foo")
236 self._test_ZipWriteStr(zinfo, random_string)
237
238 # Timestamp in the zinfo should be overwritten.
239 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
240 self._test_ZipWriteStr(zinfo, random_string)
241
242 def test_ZipWriteStr_with_opts(self):
243 random_string = os.urandom(1024)
244 # Passing arcname
245 self._test_ZipWriteStr("foo", random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700246 "perms": 0o700,
Tao Baof3282b42015-04-01 11:21:55 -0700247 "compress_type": zipfile.ZIP_DEFLATED,
248 })
Tao Bao58c1b962015-05-20 09:32:18 -0700249 self._test_ZipWriteStr("bar", random_string, {
Tao Baof3282b42015-04-01 11:21:55 -0700250 "compress_type": zipfile.ZIP_STORED,
251 })
252
253 # Passing zinfo
254 zinfo = zipfile.ZipInfo(filename="foo")
255 self._test_ZipWriteStr(zinfo, random_string, {
256 "compress_type": zipfile.ZIP_DEFLATED,
257 })
258 self._test_ZipWriteStr(zinfo, random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700259 "perms": 0o600,
Tao Baof3282b42015-04-01 11:21:55 -0700260 "compress_type": zipfile.ZIP_STORED,
261 })
262
263 def test_ZipWriteStr_large_file(self):
264 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
265 # the workaround. We will only test the case of writing a string into a
266 # large archive.
267 long_string = get_2gb_string()
268 short_string = os.urandom(1024)
269 self._test_ZipWriteStr_large_file(long_string, short_string, {
270 "compress_type": zipfile.ZIP_DEFLATED,
271 })
272
273 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
274 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
275 zinfo = zipfile.ZipInfo(filename="foo")
276 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
Tao Bao58c1b962015-05-20 09:32:18 -0700277
278 def test_bug21309935(self):
279 zip_file = tempfile.NamedTemporaryFile(delete=False)
280 zip_file_name = zip_file.name
281 zip_file.close()
282
283 try:
284 random_string = os.urandom(1024)
285 zip_file = zipfile.ZipFile(zip_file_name, "w")
286 # Default perms should be 0o644 when passing the filename.
287 common.ZipWriteStr(zip_file, "foo", random_string)
288 # Honor the specified perms.
289 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
290 # The perms in zinfo should be untouched.
291 zinfo = zipfile.ZipInfo(filename="baz")
292 zinfo.external_attr = 0o740 << 16
293 common.ZipWriteStr(zip_file, zinfo, random_string)
294 # Explicitly specified perms has the priority.
295 zinfo = zipfile.ZipInfo(filename="qux")
296 zinfo.external_attr = 0o700 << 16
297 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
298 common.ZipClose(zip_file)
299
Tao Bao31b08072017-11-08 15:50:59 -0800300 self._verify(zip_file, zip_file_name, "foo",
301 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700302 expected_mode=0o644)
Tao Bao31b08072017-11-08 15:50:59 -0800303 self._verify(zip_file, zip_file_name, "bar",
304 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700305 expected_mode=0o755)
Tao Bao31b08072017-11-08 15:50:59 -0800306 self._verify(zip_file, zip_file_name, "baz",
307 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700308 expected_mode=0o740)
Tao Bao31b08072017-11-08 15:50:59 -0800309 self._verify(zip_file, zip_file_name, "qux",
310 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700311 expected_mode=0o400)
312 finally:
313 os.remove(zip_file_name)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700314
Tao Bao89d7ab22017-12-14 17:05:33 -0800315 def test_ZipDelete(self):
316 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
317 output_zip = zipfile.ZipFile(zip_file.name, 'w',
318 compression=zipfile.ZIP_DEFLATED)
319 with tempfile.NamedTemporaryFile() as entry_file:
320 entry_file.write(os.urandom(1024))
321 common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
322 common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
323 common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
324 common.ZipClose(output_zip)
325 zip_file.close()
326
327 try:
328 common.ZipDelete(zip_file.name, 'Test2')
329 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
330 entries = check_zip.namelist()
331 self.assertTrue('Test1' in entries)
332 self.assertFalse('Test2' in entries)
333 self.assertTrue('Test3' in entries)
334
335 self.assertRaises(AssertionError, common.ZipDelete, zip_file.name,
336 'Test2')
337 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
338 entries = check_zip.namelist()
339 self.assertTrue('Test1' in entries)
340 self.assertFalse('Test2' in entries)
341 self.assertTrue('Test3' in entries)
342
343 common.ZipDelete(zip_file.name, ['Test3'])
344 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
345 entries = check_zip.namelist()
346 self.assertTrue('Test1' in entries)
347 self.assertFalse('Test2' in entries)
348 self.assertFalse('Test3' in entries)
349
350 common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
351 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
352 entries = check_zip.namelist()
353 self.assertFalse('Test1' in entries)
354 self.assertFalse('Test2' in entries)
355 self.assertFalse('Test3' in entries)
356 finally:
357 os.remove(zip_file.name)
358
359
Tao Bao818ddf52018-01-05 11:17:34 -0800360class CommonApkUtilsTest(unittest.TestCase):
361 """Tests the APK utils related functions."""
362
363 APKCERTS_TXT1 = (
364 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
365 ' private_key="certs/devkey.pk8"\n'
366 'name="Settings.apk"'
367 ' certificate="build/target/product/security/platform.x509.pem"'
368 ' private_key="build/target/product/security/platform.pk8"\n'
369 'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
370 )
371
372 APKCERTS_CERTMAP1 = {
373 'RecoveryLocalizer.apk' : 'certs/devkey',
374 'Settings.apk' : 'build/target/product/security/platform',
375 'TV.apk' : 'PRESIGNED',
376 }
377
378 APKCERTS_TXT2 = (
379 'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
380 ' private_key="certs/compressed1.pk8" compressed="gz"\n'
381 'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
382 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
383 'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
384 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
385 'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
386 ' private_key="certs/compressed3.pk8" compressed="gz"\n'
387 )
388
389 APKCERTS_CERTMAP2 = {
390 'Compressed1.apk' : 'certs/compressed1',
391 'Compressed2a.apk' : 'certs/compressed2',
392 'Compressed2b.apk' : 'certs/compressed2',
393 'Compressed3.apk' : 'certs/compressed3',
394 }
395
396 APKCERTS_TXT3 = (
397 'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
398 ' private_key="certs/compressed4.pk8" compressed="xz"\n'
399 )
400
401 APKCERTS_CERTMAP3 = {
402 'Compressed4.apk' : 'certs/compressed4',
403 }
404
405 def tearDown(self):
406 common.Cleanup()
407
408 @staticmethod
409 def _write_apkcerts_txt(apkcerts_txt, additional=None):
410 if additional is None:
411 additional = []
412 target_files = common.MakeTempFile(suffix='.zip')
413 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
414 target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
415 for entry in additional:
416 target_files_zip.writestr(entry, '')
417 return target_files
418
419 def test_ReadApkCerts_NoncompressedApks(self):
420 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
421 with zipfile.ZipFile(target_files, 'r') as input_zip:
422 certmap, ext = common.ReadApkCerts(input_zip)
423
424 self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
425 self.assertIsNone(ext)
426
427 def test_ReadApkCerts_CompressedApks(self):
428 # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
429 # not stored in '.gz' format, so it shouldn't be considered as installed.
430 target_files = self._write_apkcerts_txt(
431 self.APKCERTS_TXT2,
432 ['Compressed1.apk.gz', 'Compressed3.apk'])
433
434 with zipfile.ZipFile(target_files, 'r') as input_zip:
435 certmap, ext = common.ReadApkCerts(input_zip)
436
437 self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
438 self.assertEqual('.gz', ext)
439
440 # Alternative case with '.xz'.
441 target_files = self._write_apkcerts_txt(
442 self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
443
444 with zipfile.ZipFile(target_files, 'r') as input_zip:
445 certmap, ext = common.ReadApkCerts(input_zip)
446
447 self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
448 self.assertEqual('.xz', ext)
449
450 def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
451 target_files = self._write_apkcerts_txt(
452 self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
453 ['Compressed1.apk.gz', 'Compressed3.apk'])
454
455 with zipfile.ZipFile(target_files, 'r') as input_zip:
456 certmap, ext = common.ReadApkCerts(input_zip)
457
458 certmap_merged = self.APKCERTS_CERTMAP1.copy()
459 certmap_merged.update(self.APKCERTS_CERTMAP2)
460 self.assertDictEqual(certmap_merged, certmap)
461 self.assertEqual('.gz', ext)
462
463 def test_ReadApkCerts_MultipleCompressionMethods(self):
464 target_files = self._write_apkcerts_txt(
465 self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
466 ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
467
468 with zipfile.ZipFile(target_files, 'r') as input_zip:
469 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
470
471 def test_ReadApkCerts_MismatchingKeys(self):
472 malformed_apkcerts_txt = (
473 'name="App1.apk" certificate="certs/cert1.x509.pem"'
474 ' private_key="certs/cert2.pk8"\n'
475 )
476 target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
477
478 with zipfile.ZipFile(target_files, 'r') as input_zip:
479 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
480
Tao Bao04e1f012018-02-04 12:13:35 -0800481 def test_ExtractPublicKey(self):
482 testdata_dir = test_utils.get_testdata_dir()
483 cert = os.path.join(testdata_dir, 'testkey.x509.pem')
484 pubkey = os.path.join(testdata_dir, 'testkey.pubkey.pem')
485 with open(pubkey, 'rb') as pubkey_fp:
486 self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
487
488 def test_ExtractPublicKey_invalidInput(self):
489 testdata_dir = test_utils.get_testdata_dir()
490 wrong_input = os.path.join(testdata_dir, 'testkey.pk8')
491 self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
492
Tao Bao818ddf52018-01-05 11:17:34 -0800493
Tao Baofc7e0e02018-02-13 13:54:02 -0800494class CommonUtilsTest(unittest.TestCase):
495
496 def tearDown(self):
497 common.Cleanup()
498
499 def test_GetSparseImage_emptyBlockMapFile(self):
500 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
501 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
502 target_files_zip.write(
503 test_utils.construct_sparse_image([
504 (0xCAC1, 6),
505 (0xCAC3, 3),
506 (0xCAC1, 4)]),
507 arcname='IMAGES/system.img')
508 target_files_zip.writestr('IMAGES/system.map', '')
509 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
510 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
511
512 tempdir, input_zip = common.UnzipTemp(target_files)
513 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
514 input_zip.close()
515
516 self.assertDictEqual(
517 {
518 '__COPY': RangeSet("0"),
519 '__NONZERO-0': RangeSet("1-5 9-12"),
520 },
521 sparse_image.file_map)
522
523 def test_GetSparseImage_invalidImageName(self):
524 self.assertRaises(
525 AssertionError, common.GetSparseImage, 'system2', None, None, False)
526 self.assertRaises(
527 AssertionError, common.GetSparseImage, 'unknown', None, None, False)
528
529 def test_GetSparseImage_missingBlockMapFile(self):
530 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
531 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
532 target_files_zip.write(
533 test_utils.construct_sparse_image([
534 (0xCAC1, 6),
535 (0xCAC3, 3),
536 (0xCAC1, 4)]),
537 arcname='IMAGES/system.img')
538 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
539 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
540
541 tempdir, input_zip = common.UnzipTemp(target_files)
542 self.assertRaises(
543 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
544 False)
545 input_zip.close()
546
547 def test_GetSparseImage_sharedBlocks_notAllowed(self):
548 """Tests the case of having overlapping blocks but disallowed."""
549 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
550 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
551 target_files_zip.write(
552 test_utils.construct_sparse_image([(0xCAC2, 16)]),
553 arcname='IMAGES/system.img')
554 # Block 10 is shared between two files.
555 target_files_zip.writestr(
556 'IMAGES/system.map',
557 '\n'.join([
558 '/system/file1 1-5 9-10',
559 '/system/file2 10-12']))
560 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
561 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
562
563 tempdir, input_zip = common.UnzipTemp(target_files)
564 self.assertRaises(
565 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
566 False)
567 input_zip.close()
568
569 def test_GetSparseImage_sharedBlocks_allowed(self):
570 """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
571 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
572 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
573 # Construct an image with a care_map of "0-5 9-12".
574 target_files_zip.write(
575 test_utils.construct_sparse_image([(0xCAC2, 16)]),
576 arcname='IMAGES/system.img')
577 # Block 10 is shared between two files.
578 target_files_zip.writestr(
579 'IMAGES/system.map',
580 '\n'.join([
581 '/system/file1 1-5 9-10',
582 '/system/file2 10-12']))
583 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
584 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
585
586 tempdir, input_zip = common.UnzipTemp(target_files)
587 sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
588 input_zip.close()
589
590 self.assertDictEqual(
591 {
592 '__COPY': RangeSet("0"),
593 '__NONZERO-0': RangeSet("6-8 13-15"),
594 '/system/file1': RangeSet("1-5 9-10"),
595 '/system/file2': RangeSet("11-12"),
596 },
597 sparse_image.file_map)
598
599 # '/system/file2' should be marked with 'uses_shared_blocks', but not with
600 # 'incomplete'.
601 self.assertTrue(
602 sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
603 self.assertNotIn(
604 'incomplete', sparse_image.file_map['/system/file2'].extra)
605
606 # All other entries should look normal without any tags.
607 self.assertFalse(sparse_image.file_map['__COPY'].extra)
608 self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
609 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
610
611 def test_GetSparseImage_incompleteRanges(self):
612 """Tests the case of ext4 images with holes."""
613 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
614 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
615 target_files_zip.write(
616 test_utils.construct_sparse_image([(0xCAC2, 16)]),
617 arcname='IMAGES/system.img')
618 target_files_zip.writestr(
619 'IMAGES/system.map',
620 '\n'.join([
621 '/system/file1 1-5 9-10',
622 '/system/file2 11-12']))
623 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
624 # '/system/file2' has less blocks listed (2) than actual (3).
625 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
626
627 tempdir, input_zip = common.UnzipTemp(target_files)
628 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
629 input_zip.close()
630
631 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
632 self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
633
634
Tianjie Xu9c384d22017-06-20 17:00:55 -0700635class InstallRecoveryScriptFormatTest(unittest.TestCase):
Tao Bao1c830bf2017-12-25 10:43:47 -0800636 """Checks the format of install-recovery.sh.
Tianjie Xu9c384d22017-06-20 17:00:55 -0700637
Tao Bao1c830bf2017-12-25 10:43:47 -0800638 Its format should match between common.py and validate_target_files.py.
639 """
Tianjie Xu9c384d22017-06-20 17:00:55 -0700640
641 def setUp(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800642 self._tempdir = common.MakeTempDir()
Tianjie Xu9c384d22017-06-20 17:00:55 -0700643 # Create a dummy dict that contains the fstab info for boot&recovery.
644 self._info = {"fstab" : {}}
Tao Bao1c830bf2017-12-25 10:43:47 -0800645 dummy_fstab = [
646 "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
647 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
Tao Bao31b08072017-11-08 15:50:59 -0800648 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
Tianjie Xudf055582017-11-07 12:22:58 -0800649 # Construct the gzipped recovery.img and boot.img
650 self.recovery_data = bytearray([
651 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
652 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
653 0x08, 0x00, 0x00, 0x00
654 ])
655 # echo -n "boot" | gzip -f | hd
656 self.boot_data = bytearray([
657 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
658 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
659 ])
Tianjie Xu9c384d22017-06-20 17:00:55 -0700660
661 def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
662 loc = os.path.join(self._tempdir, prefix, name)
663 if not os.path.exists(os.path.dirname(loc)):
664 os.makedirs(os.path.dirname(loc))
665 with open(loc, "w+") as f:
666 f.write(data)
667
668 def test_full_recovery(self):
Tao Bao31b08072017-11-08 15:50:59 -0800669 recovery_image = common.File("recovery.img", self.recovery_data)
670 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700671 self._info["full_recovery_image"] = "true"
672
673 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
674 recovery_image, boot_image, self._info)
675 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
676 self._info)
677
678 def test_recovery_from_boot(self):
Tao Bao31b08072017-11-08 15:50:59 -0800679 recovery_image = common.File("recovery.img", self.recovery_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700680 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
Tao Bao31b08072017-11-08 15:50:59 -0800681 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700682 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
683
684 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
685 recovery_image, boot_image, self._info)
686 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
687 self._info)
688 # Validate 'recovery-from-boot' with bonus argument.
689 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
690 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
691 recovery_image, boot_image, self._info)
692 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
693 self._info)
694
695 def tearDown(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800696 common.Cleanup()