blob: 87091241934cd62c7eeba82c7d52b28a25c59016 [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Tao Baofc7e0e02018-02-13 13:54:02 -080016
Tao Baoa57ab9f2018-08-24 12:08:38 -070017import copy
Dan Albert8e0178d2015-01-27 15:53:15 -080018import os
Tao Bao17e4e612018-02-16 17:12:54 -080019import subprocess
Dan Albert8e0178d2015-01-27 15:53:15 -080020import tempfile
21import time
Dan Albert8e0178d2015-01-27 15:53:15 -080022import zipfile
Tao Bao31b08072017-11-08 15:50:59 -080023from hashlib import sha1
24
Dan Albert8e0178d2015-01-27 15:53:15 -080025import common
Tao Bao04e1f012018-02-04 12:13:35 -080026import test_utils
Tianjie Xu9c384d22017-06-20 17:00:55 -070027import validate_target_files
Tao Baofc7e0e02018-02-13 13:54:02 -080028from rangelib import RangeSet
Dan Albert8e0178d2015-01-27 15:53:15 -080029
Yifan Hongbb2658d2019-01-25 12:30:58 -080030from blockimgdiff import EmptyImage, DataImage
Tao Bao04e1f012018-02-04 12:13:35 -080031
Tao Bao31b08072017-11-08 15:50:59 -080032KiB = 1024
33MiB = 1024 * KiB
34GiB = 1024 * MiB
Dan Albert8e0178d2015-01-27 15:53:15 -080035
Tao Bao1c830bf2017-12-25 10:43:47 -080036
Tao Baof3282b42015-04-01 11:21:55 -070037def get_2gb_string():
Tao Bao31b08072017-11-08 15:50:59 -080038 size = int(2 * GiB + 1)
39 block_size = 4 * KiB
40 step_size = 4 * MiB
41 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
42 for _ in range(0, size, step_size):
43 yield os.urandom(block_size)
44 yield '\0' * (step_size - block_size)
Tao Baof3282b42015-04-01 11:21:55 -070045
Dan Albert8e0178d2015-01-27 15:53:15 -080046
Tao Bao65b94e92018-10-11 21:57:26 -070047class CommonZipTest(test_utils.ReleaseToolsTestCase):
48
Tao Bao31b08072017-11-08 15:50:59 -080049 def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
Tao Baof3282b42015-04-01 11:21:55 -070050 test_file_name=None, expected_stat=None, expected_mode=0o644,
51 expected_compress_type=zipfile.ZIP_STORED):
52 # Verify the stat if present.
53 if test_file_name is not None:
54 new_stat = os.stat(test_file_name)
55 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
56 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
57
58 # Reopen the zip file to verify.
59 zip_file = zipfile.ZipFile(zip_file_name, "r")
60
61 # Verify the timestamp.
62 info = zip_file.getinfo(arcname)
63 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
64
65 # Verify the file mode.
66 mode = (info.external_attr >> 16) & 0o777
67 self.assertEqual(mode, expected_mode)
68
69 # Verify the compress type.
70 self.assertEqual(info.compress_type, expected_compress_type)
71
72 # Verify the zip contents.
Tao Bao31b08072017-11-08 15:50:59 -080073 entry = zip_file.open(arcname)
74 sha1_hash = sha1()
75 for chunk in iter(lambda: entry.read(4 * MiB), ''):
76 sha1_hash.update(chunk)
77 self.assertEqual(expected_hash, sha1_hash.hexdigest())
Tao Baof3282b42015-04-01 11:21:55 -070078 self.assertIsNone(zip_file.testzip())
79
Dan Albert8e0178d2015-01-27 15:53:15 -080080 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
81 extra_zipwrite_args = dict(extra_zipwrite_args or {})
82
83 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080084 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070085
86 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080087 zip_file_name = zip_file.name
88
89 # File names within an archive strip the leading slash.
90 arcname = extra_zipwrite_args.get("arcname", test_file_name)
91 if arcname[0] == "/":
92 arcname = arcname[1:]
93
94 zip_file.close()
95 zip_file = zipfile.ZipFile(zip_file_name, "w")
96
97 try:
Tao Bao31b08072017-11-08 15:50:59 -080098 sha1_hash = sha1()
99 for data in contents:
100 sha1_hash.update(data)
101 test_file.write(data)
Dan Albert8e0178d2015-01-27 15:53:15 -0800102 test_file.close()
103
Tao Baof3282b42015-04-01 11:21:55 -0700104 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -0800105 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700106 expected_compress_type = extra_zipwrite_args.get("compress_type",
107 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -0800108 time.sleep(5) # Make sure the atime/mtime will change measurably.
109
110 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700111 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800112
Tao Bao31b08072017-11-08 15:50:59 -0800113 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
114 test_file_name, expected_stat, expected_mode,
115 expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800116 finally:
117 os.remove(test_file_name)
118 os.remove(zip_file_name)
119
Tao Baof3282b42015-04-01 11:21:55 -0700120 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
121 extra_args = dict(extra_args or {})
122
123 zip_file = tempfile.NamedTemporaryFile(delete=False)
124 zip_file_name = zip_file.name
125 zip_file.close()
126
127 zip_file = zipfile.ZipFile(zip_file_name, "w")
128
129 try:
130 expected_compress_type = extra_args.get("compress_type",
131 zipfile.ZIP_STORED)
132 time.sleep(5) # Make sure the atime/mtime will change measurably.
133
134 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
Tao Bao58c1b962015-05-20 09:32:18 -0700135 arcname = zinfo_or_arcname
136 expected_mode = extra_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700137 else:
Tao Bao58c1b962015-05-20 09:32:18 -0700138 arcname = zinfo_or_arcname.filename
139 expected_mode = extra_args.get("perms",
140 zinfo_or_arcname.external_attr >> 16)
Tao Baof3282b42015-04-01 11:21:55 -0700141
Tao Bao58c1b962015-05-20 09:32:18 -0700142 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
Tao Baof3282b42015-04-01 11:21:55 -0700143 common.ZipClose(zip_file)
144
Tao Bao31b08072017-11-08 15:50:59 -0800145 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700146 expected_mode=expected_mode,
Tao Baof3282b42015-04-01 11:21:55 -0700147 expected_compress_type=expected_compress_type)
148 finally:
149 os.remove(zip_file_name)
150
151 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
152 extra_args = dict(extra_args or {})
153
154 zip_file = tempfile.NamedTemporaryFile(delete=False)
155 zip_file_name = zip_file.name
156
157 test_file = tempfile.NamedTemporaryFile(delete=False)
158 test_file_name = test_file.name
159
160 arcname_large = test_file_name
161 arcname_small = "bar"
162
163 # File names within an archive strip the leading slash.
164 if arcname_large[0] == "/":
165 arcname_large = arcname_large[1:]
166
167 zip_file.close()
168 zip_file = zipfile.ZipFile(zip_file_name, "w")
169
170 try:
Tao Bao31b08072017-11-08 15:50:59 -0800171 sha1_hash = sha1()
172 for data in large:
173 sha1_hash.update(data)
174 test_file.write(data)
Tao Baof3282b42015-04-01 11:21:55 -0700175 test_file.close()
176
177 expected_stat = os.stat(test_file_name)
178 expected_mode = 0o644
179 expected_compress_type = extra_args.get("compress_type",
180 zipfile.ZIP_STORED)
181 time.sleep(5) # Make sure the atime/mtime will change measurably.
182
183 common.ZipWrite(zip_file, test_file_name, **extra_args)
184 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
185 common.ZipClose(zip_file)
186
187 # Verify the contents written by ZipWrite().
Tao Bao31b08072017-11-08 15:50:59 -0800188 self._verify(zip_file, zip_file_name, arcname_large,
189 sha1_hash.hexdigest(), test_file_name, expected_stat,
190 expected_mode, expected_compress_type)
Tao Baof3282b42015-04-01 11:21:55 -0700191
192 # Verify the contents written by ZipWriteStr().
Tao Bao31b08072017-11-08 15:50:59 -0800193 self._verify(zip_file, zip_file_name, arcname_small,
194 sha1(small).hexdigest(),
Tao Baof3282b42015-04-01 11:21:55 -0700195 expected_compress_type=expected_compress_type)
196 finally:
197 os.remove(zip_file_name)
198 os.remove(test_file_name)
199
200 def _test_reset_ZIP64_LIMIT(self, func, *args):
201 default_limit = (1 << 31) - 1
202 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
203 func(*args)
204 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
205
Dan Albert8e0178d2015-01-27 15:53:15 -0800206 def test_ZipWrite(self):
207 file_contents = os.urandom(1024)
208 self._test_ZipWrite(file_contents)
209
210 def test_ZipWrite_with_opts(self):
211 file_contents = os.urandom(1024)
212 self._test_ZipWrite(file_contents, {
213 "arcname": "foobar",
214 "perms": 0o777,
215 "compress_type": zipfile.ZIP_DEFLATED,
216 })
Tao Baof3282b42015-04-01 11:21:55 -0700217 self._test_ZipWrite(file_contents, {
218 "arcname": "foobar",
219 "perms": 0o700,
220 "compress_type": zipfile.ZIP_STORED,
221 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800222
223 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700224 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800225 self._test_ZipWrite(file_contents, {
226 "compress_type": zipfile.ZIP_DEFLATED,
227 })
228
229 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700230 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
231
232 def test_ZipWriteStr(self):
233 random_string = os.urandom(1024)
234 # Passing arcname
235 self._test_ZipWriteStr("foo", random_string)
236
237 # Passing zinfo
238 zinfo = zipfile.ZipInfo(filename="foo")
239 self._test_ZipWriteStr(zinfo, random_string)
240
241 # Timestamp in the zinfo should be overwritten.
242 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
243 self._test_ZipWriteStr(zinfo, random_string)
244
245 def test_ZipWriteStr_with_opts(self):
246 random_string = os.urandom(1024)
247 # Passing arcname
248 self._test_ZipWriteStr("foo", random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700249 "perms": 0o700,
Tao Baof3282b42015-04-01 11:21:55 -0700250 "compress_type": zipfile.ZIP_DEFLATED,
251 })
Tao Bao58c1b962015-05-20 09:32:18 -0700252 self._test_ZipWriteStr("bar", random_string, {
Tao Baof3282b42015-04-01 11:21:55 -0700253 "compress_type": zipfile.ZIP_STORED,
254 })
255
256 # Passing zinfo
257 zinfo = zipfile.ZipInfo(filename="foo")
258 self._test_ZipWriteStr(zinfo, random_string, {
259 "compress_type": zipfile.ZIP_DEFLATED,
260 })
261 self._test_ZipWriteStr(zinfo, random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700262 "perms": 0o600,
Tao Baof3282b42015-04-01 11:21:55 -0700263 "compress_type": zipfile.ZIP_STORED,
264 })
265
266 def test_ZipWriteStr_large_file(self):
267 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
268 # the workaround. We will only test the case of writing a string into a
269 # large archive.
270 long_string = get_2gb_string()
271 short_string = os.urandom(1024)
272 self._test_ZipWriteStr_large_file(long_string, short_string, {
273 "compress_type": zipfile.ZIP_DEFLATED,
274 })
275
276 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
277 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
278 zinfo = zipfile.ZipInfo(filename="foo")
279 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
Tao Bao58c1b962015-05-20 09:32:18 -0700280
281 def test_bug21309935(self):
282 zip_file = tempfile.NamedTemporaryFile(delete=False)
283 zip_file_name = zip_file.name
284 zip_file.close()
285
286 try:
287 random_string = os.urandom(1024)
288 zip_file = zipfile.ZipFile(zip_file_name, "w")
289 # Default perms should be 0o644 when passing the filename.
290 common.ZipWriteStr(zip_file, "foo", random_string)
291 # Honor the specified perms.
292 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
293 # The perms in zinfo should be untouched.
294 zinfo = zipfile.ZipInfo(filename="baz")
295 zinfo.external_attr = 0o740 << 16
296 common.ZipWriteStr(zip_file, zinfo, random_string)
297 # Explicitly specified perms has the priority.
298 zinfo = zipfile.ZipInfo(filename="qux")
299 zinfo.external_attr = 0o700 << 16
300 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
301 common.ZipClose(zip_file)
302
Tao Bao31b08072017-11-08 15:50:59 -0800303 self._verify(zip_file, zip_file_name, "foo",
304 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700305 expected_mode=0o644)
Tao Bao31b08072017-11-08 15:50:59 -0800306 self._verify(zip_file, zip_file_name, "bar",
307 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700308 expected_mode=0o755)
Tao Bao31b08072017-11-08 15:50:59 -0800309 self._verify(zip_file, zip_file_name, "baz",
310 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700311 expected_mode=0o740)
Tao Bao31b08072017-11-08 15:50:59 -0800312 self._verify(zip_file, zip_file_name, "qux",
313 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700314 expected_mode=0o400)
315 finally:
316 os.remove(zip_file_name)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700317
Tao Bao89d7ab22017-12-14 17:05:33 -0800318 def test_ZipDelete(self):
319 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
320 output_zip = zipfile.ZipFile(zip_file.name, 'w',
321 compression=zipfile.ZIP_DEFLATED)
322 with tempfile.NamedTemporaryFile() as entry_file:
323 entry_file.write(os.urandom(1024))
324 common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
325 common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
326 common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
327 common.ZipClose(output_zip)
328 zip_file.close()
329
330 try:
331 common.ZipDelete(zip_file.name, 'Test2')
332 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
333 entries = check_zip.namelist()
334 self.assertTrue('Test1' in entries)
335 self.assertFalse('Test2' in entries)
336 self.assertTrue('Test3' in entries)
337
Tao Bao986ee862018-10-04 15:46:16 -0700338 self.assertRaises(
339 common.ExternalError, common.ZipDelete, zip_file.name, 'Test2')
Tao Bao89d7ab22017-12-14 17:05:33 -0800340 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
341 entries = check_zip.namelist()
342 self.assertTrue('Test1' in entries)
343 self.assertFalse('Test2' in entries)
344 self.assertTrue('Test3' in entries)
345
346 common.ZipDelete(zip_file.name, ['Test3'])
347 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
348 entries = check_zip.namelist()
349 self.assertTrue('Test1' in entries)
350 self.assertFalse('Test2' in entries)
351 self.assertFalse('Test3' in entries)
352
353 common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
354 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
355 entries = check_zip.namelist()
356 self.assertFalse('Test1' in entries)
357 self.assertFalse('Test2' in entries)
358 self.assertFalse('Test3' in entries)
359 finally:
360 os.remove(zip_file.name)
361
362
Tao Bao65b94e92018-10-11 21:57:26 -0700363class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
Tao Bao818ddf52018-01-05 11:17:34 -0800364 """Tests the APK utils related functions."""
365
366 APKCERTS_TXT1 = (
367 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
368 ' private_key="certs/devkey.pk8"\n'
369 'name="Settings.apk"'
370 ' certificate="build/target/product/security/platform.x509.pem"'
371 ' private_key="build/target/product/security/platform.pk8"\n'
372 'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
373 )
374
375 APKCERTS_CERTMAP1 = {
376 'RecoveryLocalizer.apk' : 'certs/devkey',
377 'Settings.apk' : 'build/target/product/security/platform',
378 'TV.apk' : 'PRESIGNED',
379 }
380
381 APKCERTS_TXT2 = (
382 'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
383 ' private_key="certs/compressed1.pk8" compressed="gz"\n'
384 'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
385 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
386 'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
387 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
388 'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
389 ' private_key="certs/compressed3.pk8" compressed="gz"\n'
390 )
391
392 APKCERTS_CERTMAP2 = {
393 'Compressed1.apk' : 'certs/compressed1',
394 'Compressed2a.apk' : 'certs/compressed2',
395 'Compressed2b.apk' : 'certs/compressed2',
396 'Compressed3.apk' : 'certs/compressed3',
397 }
398
399 APKCERTS_TXT3 = (
400 'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
401 ' private_key="certs/compressed4.pk8" compressed="xz"\n'
402 )
403
404 APKCERTS_CERTMAP3 = {
405 'Compressed4.apk' : 'certs/compressed4',
406 }
407
Tao Bao17e4e612018-02-16 17:12:54 -0800408 def setUp(self):
409 self.testdata_dir = test_utils.get_testdata_dir()
410
Tao Bao818ddf52018-01-05 11:17:34 -0800411 @staticmethod
412 def _write_apkcerts_txt(apkcerts_txt, additional=None):
413 if additional is None:
414 additional = []
415 target_files = common.MakeTempFile(suffix='.zip')
416 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
417 target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
418 for entry in additional:
419 target_files_zip.writestr(entry, '')
420 return target_files
421
422 def test_ReadApkCerts_NoncompressedApks(self):
423 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
424 with zipfile.ZipFile(target_files, 'r') as input_zip:
425 certmap, ext = common.ReadApkCerts(input_zip)
426
427 self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
428 self.assertIsNone(ext)
429
430 def test_ReadApkCerts_CompressedApks(self):
431 # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
432 # not stored in '.gz' format, so it shouldn't be considered as installed.
433 target_files = self._write_apkcerts_txt(
434 self.APKCERTS_TXT2,
435 ['Compressed1.apk.gz', 'Compressed3.apk'])
436
437 with zipfile.ZipFile(target_files, 'r') as input_zip:
438 certmap, ext = common.ReadApkCerts(input_zip)
439
440 self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
441 self.assertEqual('.gz', ext)
442
443 # Alternative case with '.xz'.
444 target_files = self._write_apkcerts_txt(
445 self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
446
447 with zipfile.ZipFile(target_files, 'r') as input_zip:
448 certmap, ext = common.ReadApkCerts(input_zip)
449
450 self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
451 self.assertEqual('.xz', ext)
452
453 def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
454 target_files = self._write_apkcerts_txt(
455 self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
456 ['Compressed1.apk.gz', 'Compressed3.apk'])
457
458 with zipfile.ZipFile(target_files, 'r') as input_zip:
459 certmap, ext = common.ReadApkCerts(input_zip)
460
461 certmap_merged = self.APKCERTS_CERTMAP1.copy()
462 certmap_merged.update(self.APKCERTS_CERTMAP2)
463 self.assertDictEqual(certmap_merged, certmap)
464 self.assertEqual('.gz', ext)
465
466 def test_ReadApkCerts_MultipleCompressionMethods(self):
467 target_files = self._write_apkcerts_txt(
468 self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
469 ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
470
471 with zipfile.ZipFile(target_files, 'r') as input_zip:
472 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
473
474 def test_ReadApkCerts_MismatchingKeys(self):
475 malformed_apkcerts_txt = (
476 'name="App1.apk" certificate="certs/cert1.x509.pem"'
477 ' private_key="certs/cert2.pk8"\n'
478 )
479 target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
480
481 with zipfile.ZipFile(target_files, 'r') as input_zip:
482 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
483
Tao Bao04e1f012018-02-04 12:13:35 -0800484 def test_ExtractPublicKey(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800485 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
486 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
Tao Bao04e1f012018-02-04 12:13:35 -0800487 with open(pubkey, 'rb') as pubkey_fp:
488 self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
489
490 def test_ExtractPublicKey_invalidInput(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800491 wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
Tao Bao04e1f012018-02-04 12:13:35 -0800492 self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
493
Tao Bao2cc0ca12019-03-15 10:44:43 -0700494 def test_ExtractAvbPublicKey(self):
495 privkey = os.path.join(self.testdata_dir, 'testkey.key')
496 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
497 with open(common.ExtractAvbPublicKey(privkey)) as privkey_fp, \
498 open(common.ExtractAvbPublicKey(pubkey)) as pubkey_fp:
499 self.assertEqual(privkey_fp.read(), pubkey_fp.read())
500
Tao Bao17e4e612018-02-16 17:12:54 -0800501 def test_ParseCertificate(self):
502 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
503
504 cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
505 proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
506 expected, _ = proc.communicate()
507 self.assertEqual(0, proc.returncode)
508
509 with open(cert) as cert_fp:
510 actual = common.ParseCertificate(cert_fp.read())
511 self.assertEqual(expected, actual)
512
Tao Baof47bf0f2018-03-21 23:28:51 -0700513 def test_GetMinSdkVersion(self):
514 test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
515 self.assertEqual('24', common.GetMinSdkVersion(test_app))
516
517 def test_GetMinSdkVersion_invalidInput(self):
518 self.assertRaises(
519 common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
520
521 def test_GetMinSdkVersionInt(self):
522 test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
523 self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
524
525 def test_GetMinSdkVersionInt_invalidInput(self):
526 self.assertRaises(
527 common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
528 {})
529
Tao Bao818ddf52018-01-05 11:17:34 -0800530
Tao Bao65b94e92018-10-11 21:57:26 -0700531class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
Tao Baofc7e0e02018-02-13 13:54:02 -0800532
Tao Bao02a08592018-07-22 12:40:45 -0700533 def setUp(self):
534 self.testdata_dir = test_utils.get_testdata_dir()
535
Tao Baofc7e0e02018-02-13 13:54:02 -0800536 def test_GetSparseImage_emptyBlockMapFile(self):
537 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
538 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
539 target_files_zip.write(
540 test_utils.construct_sparse_image([
541 (0xCAC1, 6),
542 (0xCAC3, 3),
543 (0xCAC1, 4)]),
544 arcname='IMAGES/system.img')
545 target_files_zip.writestr('IMAGES/system.map', '')
546 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
547 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
548
Tao Baodba59ee2018-01-09 13:21:02 -0800549 tempdir = common.UnzipTemp(target_files)
550 with zipfile.ZipFile(target_files, 'r') as input_zip:
551 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800552
553 self.assertDictEqual(
554 {
555 '__COPY': RangeSet("0"),
556 '__NONZERO-0': RangeSet("1-5 9-12"),
557 },
558 sparse_image.file_map)
559
560 def test_GetSparseImage_invalidImageName(self):
561 self.assertRaises(
562 AssertionError, common.GetSparseImage, 'system2', None, None, False)
563 self.assertRaises(
564 AssertionError, common.GetSparseImage, 'unknown', None, None, False)
565
566 def test_GetSparseImage_missingBlockMapFile(self):
567 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
568 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
569 target_files_zip.write(
570 test_utils.construct_sparse_image([
571 (0xCAC1, 6),
572 (0xCAC3, 3),
573 (0xCAC1, 4)]),
574 arcname='IMAGES/system.img')
575 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
576 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
577
Tao Baodba59ee2018-01-09 13:21:02 -0800578 tempdir = common.UnzipTemp(target_files)
579 with zipfile.ZipFile(target_files, 'r') as input_zip:
580 self.assertRaises(
581 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
582 False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800583
584 def test_GetSparseImage_sharedBlocks_notAllowed(self):
585 """Tests the case of having overlapping blocks but disallowed."""
586 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
587 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
588 target_files_zip.write(
589 test_utils.construct_sparse_image([(0xCAC2, 16)]),
590 arcname='IMAGES/system.img')
591 # Block 10 is shared between two files.
592 target_files_zip.writestr(
593 'IMAGES/system.map',
594 '\n'.join([
595 '/system/file1 1-5 9-10',
596 '/system/file2 10-12']))
597 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
598 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
599
Tao Baodba59ee2018-01-09 13:21:02 -0800600 tempdir = common.UnzipTemp(target_files)
601 with zipfile.ZipFile(target_files, 'r') as input_zip:
602 self.assertRaises(
603 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
604 False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800605
606 def test_GetSparseImage_sharedBlocks_allowed(self):
607 """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
608 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
609 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
610 # Construct an image with a care_map of "0-5 9-12".
611 target_files_zip.write(
612 test_utils.construct_sparse_image([(0xCAC2, 16)]),
613 arcname='IMAGES/system.img')
614 # Block 10 is shared between two files.
615 target_files_zip.writestr(
616 'IMAGES/system.map',
617 '\n'.join([
618 '/system/file1 1-5 9-10',
619 '/system/file2 10-12']))
620 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
621 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
622
Tao Baodba59ee2018-01-09 13:21:02 -0800623 tempdir = common.UnzipTemp(target_files)
624 with zipfile.ZipFile(target_files, 'r') as input_zip:
625 sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
Tao Baofc7e0e02018-02-13 13:54:02 -0800626
627 self.assertDictEqual(
628 {
629 '__COPY': RangeSet("0"),
630 '__NONZERO-0': RangeSet("6-8 13-15"),
631 '/system/file1': RangeSet("1-5 9-10"),
632 '/system/file2': RangeSet("11-12"),
633 },
634 sparse_image.file_map)
635
636 # '/system/file2' should be marked with 'uses_shared_blocks', but not with
637 # 'incomplete'.
638 self.assertTrue(
639 sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
640 self.assertNotIn(
641 'incomplete', sparse_image.file_map['/system/file2'].extra)
642
643 # All other entries should look normal without any tags.
644 self.assertFalse(sparse_image.file_map['__COPY'].extra)
645 self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
646 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
647
648 def test_GetSparseImage_incompleteRanges(self):
649 """Tests the case of ext4 images with holes."""
650 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
651 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
652 target_files_zip.write(
653 test_utils.construct_sparse_image([(0xCAC2, 16)]),
654 arcname='IMAGES/system.img')
655 target_files_zip.writestr(
656 'IMAGES/system.map',
657 '\n'.join([
658 '/system/file1 1-5 9-10',
659 '/system/file2 11-12']))
660 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
661 # '/system/file2' has less blocks listed (2) than actual (3).
662 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
663
Tao Baodba59ee2018-01-09 13:21:02 -0800664 tempdir = common.UnzipTemp(target_files)
665 with zipfile.ZipFile(target_files, 'r') as input_zip:
666 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800667
668 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
669 self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
670
Tao Baod3554e62018-07-10 15:31:22 -0700671 def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
672 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
673 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
674 target_files_zip.write(
675 test_utils.construct_sparse_image([(0xCAC2, 16)]),
676 arcname='IMAGES/system.img')
677 target_files_zip.writestr(
678 'IMAGES/system.map',
679 '\n'.join([
680 '//system/file1 1-5 9-10',
681 '//system/file2 11-12',
682 '/system/app/file3 13-15']))
683 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
684 # '/system/file2' has less blocks listed (2) than actual (3).
685 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
686 # '/system/app/file3' has less blocks listed (3) than actual (4).
687 target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
688
689 tempdir = common.UnzipTemp(target_files)
690 with zipfile.ZipFile(target_files, 'r') as input_zip:
691 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
692
693 self.assertFalse(sparse_image.file_map['//system/file1'].extra)
694 self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
695 self.assertTrue(
696 sparse_image.file_map['/system/app/file3'].extra['incomplete'])
697
698 def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
699 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
700 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
701 target_files_zip.write(
702 test_utils.construct_sparse_image([(0xCAC2, 16)]),
703 arcname='IMAGES/system.img')
704 target_files_zip.writestr(
705 'IMAGES/system.map',
706 '\n'.join([
707 '//system/file1 1-5 9-10',
708 '//init.rc 13-15']))
709 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
710 # '/init.rc' has less blocks listed (3) than actual (4).
711 target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
712
713 tempdir = common.UnzipTemp(target_files)
714 with zipfile.ZipFile(target_files, 'r') as input_zip:
715 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
716
717 self.assertFalse(sparse_image.file_map['//system/file1'].extra)
718 self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
719
720 def test_GetSparseImage_fileNotFound(self):
721 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
722 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
723 target_files_zip.write(
724 test_utils.construct_sparse_image([(0xCAC2, 16)]),
725 arcname='IMAGES/system.img')
726 target_files_zip.writestr(
727 'IMAGES/system.map',
728 '\n'.join([
729 '//system/file1 1-5 9-10',
730 '//system/file2 11-12']))
731 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
732
733 tempdir = common.UnzipTemp(target_files)
734 with zipfile.ZipFile(target_files, 'r') as input_zip:
735 self.assertRaises(
736 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
737 False)
738
Tao Bao02a08592018-07-22 12:40:45 -0700739 def test_GetAvbChainedPartitionArg(self):
740 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
741 info_dict = {
742 'avb_avbtool': 'avbtool',
743 'avb_system_key_path': pubkey,
744 'avb_system_rollback_index_location': 2,
745 }
746 args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
747 self.assertEqual(3, len(args))
748 self.assertEqual('system', args[0])
749 self.assertEqual('2', args[1])
750 self.assertTrue(os.path.exists(args[2]))
751
752 def test_GetAvbChainedPartitionArg_withPrivateKey(self):
753 key = os.path.join(self.testdata_dir, 'testkey.key')
754 info_dict = {
755 'avb_avbtool': 'avbtool',
756 'avb_product_key_path': key,
757 'avb_product_rollback_index_location': 2,
758 }
759 args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
760 self.assertEqual(3, len(args))
761 self.assertEqual('product', args[0])
762 self.assertEqual('2', args[1])
763 self.assertTrue(os.path.exists(args[2]))
764
765 def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
766 info_dict = {
767 'avb_avbtool': 'avbtool',
768 'avb_system_key_path': 'does-not-exist',
769 'avb_system_rollback_index_location': 2,
770 }
771 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
772 args = common.GetAvbChainedPartitionArg(
773 'system', info_dict, pubkey).split(':')
774 self.assertEqual(3, len(args))
775 self.assertEqual('system', args[0])
776 self.assertEqual('2', args[1])
777 self.assertTrue(os.path.exists(args[2]))
778
779 def test_GetAvbChainedPartitionArg_invalidKey(self):
780 pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
781 info_dict = {
782 'avb_avbtool': 'avbtool',
783 'avb_system_key_path': pubkey,
784 'avb_system_rollback_index_location': 2,
785 }
786 self.assertRaises(
Tao Bao986ee862018-10-04 15:46:16 -0700787 common.ExternalError, common.GetAvbChainedPartitionArg, 'system',
788 info_dict)
Tao Bao02a08592018-07-22 12:40:45 -0700789
Tao Baoa57ab9f2018-08-24 12:08:38 -0700790 INFO_DICT_DEFAULT = {
791 'recovery_api_version': 3,
792 'fstab_version': 2,
793 'system_root_image': 'true',
794 'no_recovery' : 'true',
795 'recovery_as_boot': 'true',
796 }
797
798 @staticmethod
799 def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path):
800 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
801 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
802 info_values = ''.join(
803 ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.iteritems())])
804 common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
805
806 FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults"
807 if info_dict.get('system_root_image') == 'true':
808 fstab_values = FSTAB_TEMPLATE.format('/')
809 else:
810 fstab_values = FSTAB_TEMPLATE.format('/system')
811 common.ZipWriteStr(target_files_zip, fstab_path, fstab_values)
Tao Bao410ad8b2018-08-24 12:08:38 -0700812
813 common.ZipWriteStr(
814 target_files_zip, 'META/file_contexts', 'file-contexts')
Tao Baoa57ab9f2018-08-24 12:08:38 -0700815 return target_files
816
817 def test_LoadInfoDict(self):
818 target_files = self._test_LoadInfoDict_createTargetFiles(
819 self.INFO_DICT_DEFAULT,
820 'BOOT/RAMDISK/system/etc/recovery.fstab')
821 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
822 loaded_dict = common.LoadInfoDict(target_files_zip)
823 self.assertEqual(3, loaded_dict['recovery_api_version'])
824 self.assertEqual(2, loaded_dict['fstab_version'])
825 self.assertIn('/', loaded_dict['fstab'])
826 self.assertIn('/system', loaded_dict['fstab'])
827
828 def test_LoadInfoDict_legacyRecoveryFstabPath(self):
829 target_files = self._test_LoadInfoDict_createTargetFiles(
830 self.INFO_DICT_DEFAULT,
831 'BOOT/RAMDISK/etc/recovery.fstab')
832 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
833 loaded_dict = common.LoadInfoDict(target_files_zip)
834 self.assertEqual(3, loaded_dict['recovery_api_version'])
835 self.assertEqual(2, loaded_dict['fstab_version'])
836 self.assertIn('/', loaded_dict['fstab'])
837 self.assertIn('/system', loaded_dict['fstab'])
838
839 def test_LoadInfoDict_dirInput(self):
840 target_files = self._test_LoadInfoDict_createTargetFiles(
841 self.INFO_DICT_DEFAULT,
842 'BOOT/RAMDISK/system/etc/recovery.fstab')
843 unzipped = common.UnzipTemp(target_files)
844 loaded_dict = common.LoadInfoDict(unzipped)
845 self.assertEqual(3, loaded_dict['recovery_api_version'])
846 self.assertEqual(2, loaded_dict['fstab_version'])
847 self.assertIn('/', loaded_dict['fstab'])
848 self.assertIn('/system', loaded_dict['fstab'])
849
850 def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self):
851 target_files = self._test_LoadInfoDict_createTargetFiles(
852 self.INFO_DICT_DEFAULT,
853 'BOOT/RAMDISK/system/etc/recovery.fstab')
854 unzipped = common.UnzipTemp(target_files)
855 loaded_dict = common.LoadInfoDict(unzipped)
856 self.assertEqual(3, loaded_dict['recovery_api_version'])
857 self.assertEqual(2, loaded_dict['fstab_version'])
858 self.assertIn('/', loaded_dict['fstab'])
859 self.assertIn('/system', loaded_dict['fstab'])
860
861 def test_LoadInfoDict_systemRootImageFalse(self):
862 # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices
863 # launched prior to P will likely have this config.
864 info_dict = copy.copy(self.INFO_DICT_DEFAULT)
865 del info_dict['no_recovery']
866 del info_dict['system_root_image']
867 del info_dict['recovery_as_boot']
868 target_files = self._test_LoadInfoDict_createTargetFiles(
869 info_dict,
870 'RECOVERY/RAMDISK/system/etc/recovery.fstab')
871 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
872 loaded_dict = common.LoadInfoDict(target_files_zip)
873 self.assertEqual(3, loaded_dict['recovery_api_version'])
874 self.assertEqual(2, loaded_dict['fstab_version'])
875 self.assertNotIn('/', loaded_dict['fstab'])
876 self.assertIn('/system', loaded_dict['fstab'])
877
878 def test_LoadInfoDict_recoveryAsBootFalse(self):
879 # Devices using system-as-root, but with standalone recovery image. Non-A/B
880 # devices launched since P will likely have this config.
881 info_dict = copy.copy(self.INFO_DICT_DEFAULT)
882 del info_dict['no_recovery']
883 del info_dict['recovery_as_boot']
884 target_files = self._test_LoadInfoDict_createTargetFiles(
885 info_dict,
886 'RECOVERY/RAMDISK/system/etc/recovery.fstab')
887 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
888 loaded_dict = common.LoadInfoDict(target_files_zip)
889 self.assertEqual(3, loaded_dict['recovery_api_version'])
890 self.assertEqual(2, loaded_dict['fstab_version'])
891 self.assertIn('/', loaded_dict['fstab'])
892 self.assertIn('/system', loaded_dict['fstab'])
893
894 def test_LoadInfoDict_noRecoveryTrue(self):
895 # Device doesn't have a recovery partition at all.
896 info_dict = copy.copy(self.INFO_DICT_DEFAULT)
897 del info_dict['recovery_as_boot']
898 target_files = self._test_LoadInfoDict_createTargetFiles(
899 info_dict,
900 'RECOVERY/RAMDISK/system/etc/recovery.fstab')
901 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
902 loaded_dict = common.LoadInfoDict(target_files_zip)
903 self.assertEqual(3, loaded_dict['recovery_api_version'])
904 self.assertEqual(2, loaded_dict['fstab_version'])
905 self.assertIsNone(loaded_dict['fstab'])
906
Tao Bao410ad8b2018-08-24 12:08:38 -0700907 def test_LoadInfoDict_missingMetaMiscInfoTxt(self):
908 target_files = self._test_LoadInfoDict_createTargetFiles(
909 self.INFO_DICT_DEFAULT,
910 'BOOT/RAMDISK/system/etc/recovery.fstab')
911 common.ZipDelete(target_files, 'META/misc_info.txt')
912 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
913 self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip)
914
915 def test_LoadInfoDict_repacking(self):
916 target_files = self._test_LoadInfoDict_createTargetFiles(
917 self.INFO_DICT_DEFAULT,
918 'BOOT/RAMDISK/system/etc/recovery.fstab')
919 unzipped = common.UnzipTemp(target_files)
920 loaded_dict = common.LoadInfoDict(unzipped, True)
921 self.assertEqual(3, loaded_dict['recovery_api_version'])
922 self.assertEqual(2, loaded_dict['fstab_version'])
923 self.assertIn('/', loaded_dict['fstab'])
924 self.assertIn('/system', loaded_dict['fstab'])
925 self.assertEqual(
926 os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir'])
927 self.assertEqual(
928 os.path.join(unzipped, 'META', 'root_filesystem_config.txt'),
929 loaded_dict['root_fs_config'])
930
931 def test_LoadInfoDict_repackingWithZipFileInput(self):
932 target_files = self._test_LoadInfoDict_createTargetFiles(
933 self.INFO_DICT_DEFAULT,
934 'BOOT/RAMDISK/system/etc/recovery.fstab')
935 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
936 self.assertRaises(
937 AssertionError, common.LoadInfoDict, target_files_zip, True)
938
Tao Baofc7e0e02018-02-13 13:54:02 -0800939
Tao Bao65b94e92018-10-11 21:57:26 -0700940class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
Tao Bao1c830bf2017-12-25 10:43:47 -0800941 """Checks the format of install-recovery.sh.
Tianjie Xu9c384d22017-06-20 17:00:55 -0700942
Tao Bao1c830bf2017-12-25 10:43:47 -0800943 Its format should match between common.py and validate_target_files.py.
944 """
Tianjie Xu9c384d22017-06-20 17:00:55 -0700945
946 def setUp(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800947 self._tempdir = common.MakeTempDir()
Tianjie Xu9c384d22017-06-20 17:00:55 -0700948 # Create a dummy dict that contains the fstab info for boot&recovery.
949 self._info = {"fstab" : {}}
Tao Bao1c830bf2017-12-25 10:43:47 -0800950 dummy_fstab = [
951 "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
952 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
Tao Bao31b08072017-11-08 15:50:59 -0800953 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
Tianjie Xudf055582017-11-07 12:22:58 -0800954 # Construct the gzipped recovery.img and boot.img
955 self.recovery_data = bytearray([
956 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
957 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
958 0x08, 0x00, 0x00, 0x00
959 ])
960 # echo -n "boot" | gzip -f | hd
961 self.boot_data = bytearray([
962 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
963 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
964 ])
Tianjie Xu9c384d22017-06-20 17:00:55 -0700965
966 def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
967 loc = os.path.join(self._tempdir, prefix, name)
968 if not os.path.exists(os.path.dirname(loc)):
969 os.makedirs(os.path.dirname(loc))
970 with open(loc, "w+") as f:
971 f.write(data)
972
973 def test_full_recovery(self):
Tao Bao31b08072017-11-08 15:50:59 -0800974 recovery_image = common.File("recovery.img", self.recovery_data)
975 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700976 self._info["full_recovery_image"] = "true"
977
978 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
979 recovery_image, boot_image, self._info)
980 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
981 self._info)
982
983 def test_recovery_from_boot(self):
Tao Bao31b08072017-11-08 15:50:59 -0800984 recovery_image = common.File("recovery.img", self.recovery_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700985 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
Tao Bao31b08072017-11-08 15:50:59 -0800986 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700987 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
988
989 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
990 recovery_image, boot_image, self._info)
991 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
992 self._info)
993 # Validate 'recovery-from-boot' with bonus argument.
994 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
995 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
996 recovery_image, boot_image, self._info)
997 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
998 self._info)
Yifan Hong45433e42019-01-18 13:55:25 -0800999
1000
1001class MockScriptWriter(object):
1002 """A class that mocks edify_generator.EdifyGenerator.
1003 """
1004 def __init__(self, enable_comments=False):
1005 self.lines = []
1006 self.enable_comments = enable_comments
1007 def Comment(self, comment):
1008 if self.enable_comments:
1009 self.lines.append("# {}".format(comment))
1010 def AppendExtra(self, extra):
1011 self.lines.append(extra)
1012 def __str__(self):
1013 return "\n".join(self.lines)
1014
1015
1016class MockBlockDifference(object):
1017 def __init__(self, partition, tgt, src=None):
1018 self.partition = partition
1019 self.tgt = tgt
1020 self.src = src
1021 def WriteScript(self, script, _, progress=None,
1022 write_verify_script=False):
1023 if progress:
1024 script.AppendExtra("progress({})".format(progress))
1025 script.AppendExtra("patch({});".format(self.partition))
1026 if write_verify_script:
1027 self.WritePostInstallVerifyScript(script)
1028 def WritePostInstallVerifyScript(self, script):
1029 script.AppendExtra("verify({});".format(self.partition))
1030
1031
1032class FakeSparseImage(object):
1033 def __init__(self, size):
1034 self.blocksize = 4096
1035 self.total_blocks = size // 4096
1036 assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
1037
1038
1039class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
1040 @staticmethod
1041 def get_op_list(output_path):
1042 with zipfile.ZipFile(output_path, 'r') as output_zip:
1043 with output_zip.open("dynamic_partitions_op_list") as op_list:
1044 return [line.strip() for line in op_list.readlines()
1045 if not line.startswith("#")]
1046
1047 def setUp(self):
1048 self.script = MockScriptWriter()
1049 self.output_path = common.MakeTempFile(suffix='.zip')
1050
1051 def test_full(self):
1052 target_info = common.LoadDictionaryFromLines("""
1053dynamic_partition_list=system vendor
1054super_partition_groups=group_foo
1055super_group_foo_group_size={group_size}
1056super_group_foo_partition_list=system vendor
1057""".format(group_size=4 * GiB).split("\n"))
1058 block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
1059 MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
1060
1061 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
1062 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1063 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1064
1065 self.assertEqual(str(self.script).strip(), """
1066assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
1067patch(vendor);
1068verify(vendor);
1069unmap_partition("vendor");
1070patch(system);
1071verify(system);
1072unmap_partition("system");
1073""".strip())
1074
1075 lines = self.get_op_list(self.output_path)
1076
1077 remove_all_groups = lines.index("remove_all_groups")
1078 add_group = lines.index("add_group group_foo 4294967296")
1079 add_vendor = lines.index("add vendor group_foo")
1080 add_system = lines.index("add system group_foo")
1081 resize_vendor = lines.index("resize vendor 1073741824")
1082 resize_system = lines.index("resize system 3221225472")
1083
1084 self.assertLess(remove_all_groups, add_group,
1085 "Should add groups after removing all groups")
1086 self.assertLess(add_group, min(add_vendor, add_system),
1087 "Should add partitions after adding group")
1088 self.assertLess(add_system, resize_system,
1089 "Should resize system after adding it")
1090 self.assertLess(add_vendor, resize_vendor,
1091 "Should resize vendor after adding it")
1092
1093 def test_inc_groups(self):
1094 source_info = common.LoadDictionaryFromLines("""
1095super_partition_groups=group_foo group_bar group_baz
1096super_group_foo_group_size={group_foo_size}
1097super_group_bar_group_size={group_bar_size}
1098""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
1099 target_info = common.LoadDictionaryFromLines("""
1100super_partition_groups=group_foo group_baz group_qux
1101super_group_foo_group_size={group_foo_size}
1102super_group_baz_group_size={group_baz_size}
1103super_group_qux_group_size={group_qux_size}
1104""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
1105 group_qux_size=1 * GiB).split("\n"))
1106
1107 dp_diff = common.DynamicPartitionsDifference(target_info,
1108 block_diffs=[],
1109 source_info_dict=source_info)
1110 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1111 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1112
1113 lines = self.get_op_list(self.output_path)
1114
1115 removed = lines.index("remove_group group_bar")
1116 shrunk = lines.index("resize_group group_foo 3221225472")
1117 grown = lines.index("resize_group group_baz 4294967296")
1118 added = lines.index("add_group group_qux 1073741824")
1119
1120 self.assertLess(max(removed, shrunk) < min(grown, added),
1121 "ops that remove / shrink partitions must precede ops that "
1122 "grow / add partitions")
1123
Yifan Hongbb2658d2019-01-25 12:30:58 -08001124 def test_incremental(self):
Yifan Hong45433e42019-01-18 13:55:25 -08001125 source_info = common.LoadDictionaryFromLines("""
1126dynamic_partition_list=system vendor product product_services
1127super_partition_groups=group_foo
1128super_group_foo_group_size={group_foo_size}
1129super_group_foo_partition_list=system vendor product product_services
1130""".format(group_foo_size=4 * GiB).split("\n"))
1131 target_info = common.LoadDictionaryFromLines("""
1132dynamic_partition_list=system vendor product odm
1133super_partition_groups=group_foo group_bar
1134super_group_foo_group_size={group_foo_size}
1135super_group_foo_partition_list=system vendor odm
1136super_group_bar_group_size={group_bar_size}
1137super_group_bar_partition_list=product
1138""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
1139
1140 block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
1141 src=FakeSparseImage(1024 * MiB)),
1142 MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
1143 src=FakeSparseImage(1024 * MiB)),
1144 MockBlockDifference("product", FakeSparseImage(1024 * MiB),
1145 src=FakeSparseImage(1024 * MiB)),
1146 MockBlockDifference("product_services", None,
1147 src=FakeSparseImage(1024 * MiB)),
1148 MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
1149 src=None)]
1150
1151 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1152 source_info_dict=source_info)
1153 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1154 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1155
1156 metadata_idx = self.script.lines.index(
1157 'assert(update_dynamic_partitions(package_extract_file('
1158 '"dynamic_partitions_op_list")));')
1159 self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
1160 self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
1161 for p in ("product", "system", "odm"):
1162 patch_idx = self.script.lines.index("patch({});".format(p))
1163 verify_idx = self.script.lines.index("verify({});".format(p))
1164 self.assertLess(metadata_idx, patch_idx,
1165 "Should patch {} after updating metadata".format(p))
1166 self.assertLess(patch_idx, verify_idx,
1167 "Should verify {} after patching".format(p))
1168
1169 self.assertNotIn("patch(product_services);", self.script.lines)
1170
1171 lines = self.get_op_list(self.output_path)
1172
1173 remove = lines.index("remove product_services")
1174 move_product_out = lines.index("move product default")
1175 shrink = lines.index("resize vendor 536870912")
1176 shrink_group = lines.index("resize_group group_foo 3221225472")
1177 add_group_bar = lines.index("add_group group_bar 1073741824")
1178 add_odm = lines.index("add odm group_foo")
1179 grow_existing = lines.index("resize system 1610612736")
1180 grow_added = lines.index("resize odm 1073741824")
1181 move_product_in = lines.index("move product group_bar")
1182
1183 max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
1184 min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
1185
1186 self.assertLess(max_idx_move_partition_out_foo, shrink_group,
1187 "Must shrink group after partitions inside group are shrunk"
1188 " / removed")
1189
1190 self.assertLess(add_group_bar, move_product_in,
1191 "Must add partitions to group after group is added")
1192
1193 self.assertLess(max_idx_move_partition_out_foo,
1194 min_idx_move_partition_in_foo,
1195 "Must shrink partitions / remove partitions from group"
1196 "before adding / moving partitions into group")
Yifan Hongbb2658d2019-01-25 12:30:58 -08001197
1198 def test_remove_partition(self):
1199 source_info = common.LoadDictionaryFromLines("""
1200blockimgdiff_versions=3,4
1201use_dynamic_partitions=true
1202dynamic_partition_list=foo
1203super_partition_groups=group_foo
1204super_group_foo_group_size={group_foo_size}
1205super_group_foo_partition_list=foo
1206""".format(group_foo_size=4 * GiB).split("\n"))
1207 target_info = common.LoadDictionaryFromLines("""
1208blockimgdiff_versions=3,4
1209use_dynamic_partitions=true
1210super_partition_groups=group_foo
1211super_group_foo_group_size={group_foo_size}
1212""".format(group_foo_size=4 * GiB).split("\n"))
1213
1214 common.OPTIONS.info_dict = target_info
1215 common.OPTIONS.target_info_dict = target_info
1216 common.OPTIONS.source_info_dict = source_info
1217 common.OPTIONS.cache_size = 4 * 4096
1218
1219 block_diffs = [common.BlockDifference("foo", EmptyImage(),
1220 src=DataImage("source", pad=True))]
1221
1222 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1223 source_info_dict=source_info)
1224 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1225 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1226
1227 self.assertNotIn("block_image_update", str(self.script),
Tao Bao2cc0ca12019-03-15 10:44:43 -07001228 "Removed partition should not be patched.")
Yifan Hongbb2658d2019-01-25 12:30:58 -08001229
1230 lines = self.get_op_list(self.output_path)
1231 self.assertEqual(lines, ["remove foo"])