blob: cfd070d81da4ef163ae65c44b03641600fe14c9a [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Tao Baofc7e0e02018-02-13 13:54:02 -080016
Tao Baoa57ab9f2018-08-24 12:08:38 -070017import copy
Dan Albert8e0178d2015-01-27 15:53:15 -080018import os
Tao Bao17e4e612018-02-16 17:12:54 -080019import subprocess
Dan Albert8e0178d2015-01-27 15:53:15 -080020import tempfile
21import time
Dan Albert8e0178d2015-01-27 15:53:15 -080022import zipfile
Tao Bao31b08072017-11-08 15:50:59 -080023from hashlib import sha1
24
Dan Albert8e0178d2015-01-27 15:53:15 -080025import common
Tao Bao04e1f012018-02-04 12:13:35 -080026import test_utils
Tianjie Xu9c384d22017-06-20 17:00:55 -070027import validate_target_files
Tao Baofc7e0e02018-02-13 13:54:02 -080028from rangelib import RangeSet
Dan Albert8e0178d2015-01-27 15:53:15 -080029
Yifan Hongbb2658d2019-01-25 12:30:58 -080030from blockimgdiff import EmptyImage, DataImage
Tao Bao04e1f012018-02-04 12:13:35 -080031
Tao Bao31b08072017-11-08 15:50:59 -080032KiB = 1024
33MiB = 1024 * KiB
34GiB = 1024 * MiB
Dan Albert8e0178d2015-01-27 15:53:15 -080035
Tao Bao1c830bf2017-12-25 10:43:47 -080036
Tao Baof3282b42015-04-01 11:21:55 -070037def get_2gb_string():
Tao Bao31b08072017-11-08 15:50:59 -080038 size = int(2 * GiB + 1)
39 block_size = 4 * KiB
40 step_size = 4 * MiB
41 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
42 for _ in range(0, size, step_size):
43 yield os.urandom(block_size)
44 yield '\0' * (step_size - block_size)
Tao Baof3282b42015-04-01 11:21:55 -070045
Dan Albert8e0178d2015-01-27 15:53:15 -080046
Tao Bao65b94e92018-10-11 21:57:26 -070047class CommonZipTest(test_utils.ReleaseToolsTestCase):
48
Tao Bao31b08072017-11-08 15:50:59 -080049 def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
Tao Baof3282b42015-04-01 11:21:55 -070050 test_file_name=None, expected_stat=None, expected_mode=0o644,
51 expected_compress_type=zipfile.ZIP_STORED):
52 # Verify the stat if present.
53 if test_file_name is not None:
54 new_stat = os.stat(test_file_name)
55 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
56 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
57
58 # Reopen the zip file to verify.
59 zip_file = zipfile.ZipFile(zip_file_name, "r")
60
61 # Verify the timestamp.
62 info = zip_file.getinfo(arcname)
63 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
64
65 # Verify the file mode.
66 mode = (info.external_attr >> 16) & 0o777
67 self.assertEqual(mode, expected_mode)
68
69 # Verify the compress type.
70 self.assertEqual(info.compress_type, expected_compress_type)
71
72 # Verify the zip contents.
Tao Bao31b08072017-11-08 15:50:59 -080073 entry = zip_file.open(arcname)
74 sha1_hash = sha1()
75 for chunk in iter(lambda: entry.read(4 * MiB), ''):
76 sha1_hash.update(chunk)
77 self.assertEqual(expected_hash, sha1_hash.hexdigest())
Tao Baof3282b42015-04-01 11:21:55 -070078 self.assertIsNone(zip_file.testzip())
79
Dan Albert8e0178d2015-01-27 15:53:15 -080080 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
81 extra_zipwrite_args = dict(extra_zipwrite_args or {})
82
83 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080084 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070085
86 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080087 zip_file_name = zip_file.name
88
89 # File names within an archive strip the leading slash.
90 arcname = extra_zipwrite_args.get("arcname", test_file_name)
91 if arcname[0] == "/":
92 arcname = arcname[1:]
93
94 zip_file.close()
95 zip_file = zipfile.ZipFile(zip_file_name, "w")
96
97 try:
Tao Bao31b08072017-11-08 15:50:59 -080098 sha1_hash = sha1()
99 for data in contents:
100 sha1_hash.update(data)
101 test_file.write(data)
Dan Albert8e0178d2015-01-27 15:53:15 -0800102 test_file.close()
103
Tao Baof3282b42015-04-01 11:21:55 -0700104 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -0800105 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700106 expected_compress_type = extra_zipwrite_args.get("compress_type",
107 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -0800108 time.sleep(5) # Make sure the atime/mtime will change measurably.
109
110 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700111 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800112
Tao Bao31b08072017-11-08 15:50:59 -0800113 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
114 test_file_name, expected_stat, expected_mode,
115 expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800116 finally:
117 os.remove(test_file_name)
118 os.remove(zip_file_name)
119
Tao Baof3282b42015-04-01 11:21:55 -0700120 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
121 extra_args = dict(extra_args or {})
122
123 zip_file = tempfile.NamedTemporaryFile(delete=False)
124 zip_file_name = zip_file.name
125 zip_file.close()
126
127 zip_file = zipfile.ZipFile(zip_file_name, "w")
128
129 try:
130 expected_compress_type = extra_args.get("compress_type",
131 zipfile.ZIP_STORED)
132 time.sleep(5) # Make sure the atime/mtime will change measurably.
133
134 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
Tao Bao58c1b962015-05-20 09:32:18 -0700135 arcname = zinfo_or_arcname
136 expected_mode = extra_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700137 else:
Tao Bao58c1b962015-05-20 09:32:18 -0700138 arcname = zinfo_or_arcname.filename
139 expected_mode = extra_args.get("perms",
140 zinfo_or_arcname.external_attr >> 16)
Tao Baof3282b42015-04-01 11:21:55 -0700141
Tao Bao58c1b962015-05-20 09:32:18 -0700142 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
Tao Baof3282b42015-04-01 11:21:55 -0700143 common.ZipClose(zip_file)
144
Tao Bao31b08072017-11-08 15:50:59 -0800145 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700146 expected_mode=expected_mode,
Tao Baof3282b42015-04-01 11:21:55 -0700147 expected_compress_type=expected_compress_type)
148 finally:
149 os.remove(zip_file_name)
150
151 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
152 extra_args = dict(extra_args or {})
153
154 zip_file = tempfile.NamedTemporaryFile(delete=False)
155 zip_file_name = zip_file.name
156
157 test_file = tempfile.NamedTemporaryFile(delete=False)
158 test_file_name = test_file.name
159
160 arcname_large = test_file_name
161 arcname_small = "bar"
162
163 # File names within an archive strip the leading slash.
164 if arcname_large[0] == "/":
165 arcname_large = arcname_large[1:]
166
167 zip_file.close()
168 zip_file = zipfile.ZipFile(zip_file_name, "w")
169
170 try:
Tao Bao31b08072017-11-08 15:50:59 -0800171 sha1_hash = sha1()
172 for data in large:
173 sha1_hash.update(data)
174 test_file.write(data)
Tao Baof3282b42015-04-01 11:21:55 -0700175 test_file.close()
176
177 expected_stat = os.stat(test_file_name)
178 expected_mode = 0o644
179 expected_compress_type = extra_args.get("compress_type",
180 zipfile.ZIP_STORED)
181 time.sleep(5) # Make sure the atime/mtime will change measurably.
182
183 common.ZipWrite(zip_file, test_file_name, **extra_args)
184 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
185 common.ZipClose(zip_file)
186
187 # Verify the contents written by ZipWrite().
Tao Bao31b08072017-11-08 15:50:59 -0800188 self._verify(zip_file, zip_file_name, arcname_large,
189 sha1_hash.hexdigest(), test_file_name, expected_stat,
190 expected_mode, expected_compress_type)
Tao Baof3282b42015-04-01 11:21:55 -0700191
192 # Verify the contents written by ZipWriteStr().
Tao Bao31b08072017-11-08 15:50:59 -0800193 self._verify(zip_file, zip_file_name, arcname_small,
194 sha1(small).hexdigest(),
Tao Baof3282b42015-04-01 11:21:55 -0700195 expected_compress_type=expected_compress_type)
196 finally:
197 os.remove(zip_file_name)
198 os.remove(test_file_name)
199
200 def _test_reset_ZIP64_LIMIT(self, func, *args):
201 default_limit = (1 << 31) - 1
202 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
203 func(*args)
204 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
205
Dan Albert8e0178d2015-01-27 15:53:15 -0800206 def test_ZipWrite(self):
207 file_contents = os.urandom(1024)
208 self._test_ZipWrite(file_contents)
209
210 def test_ZipWrite_with_opts(self):
211 file_contents = os.urandom(1024)
212 self._test_ZipWrite(file_contents, {
213 "arcname": "foobar",
214 "perms": 0o777,
215 "compress_type": zipfile.ZIP_DEFLATED,
216 })
Tao Baof3282b42015-04-01 11:21:55 -0700217 self._test_ZipWrite(file_contents, {
218 "arcname": "foobar",
219 "perms": 0o700,
220 "compress_type": zipfile.ZIP_STORED,
221 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800222
223 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700224 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800225 self._test_ZipWrite(file_contents, {
226 "compress_type": zipfile.ZIP_DEFLATED,
227 })
228
229 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700230 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
231
232 def test_ZipWriteStr(self):
233 random_string = os.urandom(1024)
234 # Passing arcname
235 self._test_ZipWriteStr("foo", random_string)
236
237 # Passing zinfo
238 zinfo = zipfile.ZipInfo(filename="foo")
239 self._test_ZipWriteStr(zinfo, random_string)
240
241 # Timestamp in the zinfo should be overwritten.
242 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
243 self._test_ZipWriteStr(zinfo, random_string)
244
245 def test_ZipWriteStr_with_opts(self):
246 random_string = os.urandom(1024)
247 # Passing arcname
248 self._test_ZipWriteStr("foo", random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700249 "perms": 0o700,
Tao Baof3282b42015-04-01 11:21:55 -0700250 "compress_type": zipfile.ZIP_DEFLATED,
251 })
Tao Bao58c1b962015-05-20 09:32:18 -0700252 self._test_ZipWriteStr("bar", random_string, {
Tao Baof3282b42015-04-01 11:21:55 -0700253 "compress_type": zipfile.ZIP_STORED,
254 })
255
256 # Passing zinfo
257 zinfo = zipfile.ZipInfo(filename="foo")
258 self._test_ZipWriteStr(zinfo, random_string, {
259 "compress_type": zipfile.ZIP_DEFLATED,
260 })
261 self._test_ZipWriteStr(zinfo, random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700262 "perms": 0o600,
Tao Baof3282b42015-04-01 11:21:55 -0700263 "compress_type": zipfile.ZIP_STORED,
264 })
265
266 def test_ZipWriteStr_large_file(self):
267 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
268 # the workaround. We will only test the case of writing a string into a
269 # large archive.
270 long_string = get_2gb_string()
271 short_string = os.urandom(1024)
272 self._test_ZipWriteStr_large_file(long_string, short_string, {
273 "compress_type": zipfile.ZIP_DEFLATED,
274 })
275
276 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
277 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
278 zinfo = zipfile.ZipInfo(filename="foo")
279 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
Tao Bao58c1b962015-05-20 09:32:18 -0700280
281 def test_bug21309935(self):
282 zip_file = tempfile.NamedTemporaryFile(delete=False)
283 zip_file_name = zip_file.name
284 zip_file.close()
285
286 try:
287 random_string = os.urandom(1024)
288 zip_file = zipfile.ZipFile(zip_file_name, "w")
289 # Default perms should be 0o644 when passing the filename.
290 common.ZipWriteStr(zip_file, "foo", random_string)
291 # Honor the specified perms.
292 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
293 # The perms in zinfo should be untouched.
294 zinfo = zipfile.ZipInfo(filename="baz")
295 zinfo.external_attr = 0o740 << 16
296 common.ZipWriteStr(zip_file, zinfo, random_string)
297 # Explicitly specified perms has the priority.
298 zinfo = zipfile.ZipInfo(filename="qux")
299 zinfo.external_attr = 0o700 << 16
300 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
301 common.ZipClose(zip_file)
302
Tao Bao31b08072017-11-08 15:50:59 -0800303 self._verify(zip_file, zip_file_name, "foo",
304 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700305 expected_mode=0o644)
Tao Bao31b08072017-11-08 15:50:59 -0800306 self._verify(zip_file, zip_file_name, "bar",
307 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700308 expected_mode=0o755)
Tao Bao31b08072017-11-08 15:50:59 -0800309 self._verify(zip_file, zip_file_name, "baz",
310 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700311 expected_mode=0o740)
Tao Bao31b08072017-11-08 15:50:59 -0800312 self._verify(zip_file, zip_file_name, "qux",
313 sha1(random_string).hexdigest(),
Tao Bao58c1b962015-05-20 09:32:18 -0700314 expected_mode=0o400)
315 finally:
316 os.remove(zip_file_name)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700317
Tao Bao89d7ab22017-12-14 17:05:33 -0800318 def test_ZipDelete(self):
319 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip')
320 output_zip = zipfile.ZipFile(zip_file.name, 'w',
321 compression=zipfile.ZIP_DEFLATED)
322 with tempfile.NamedTemporaryFile() as entry_file:
323 entry_file.write(os.urandom(1024))
324 common.ZipWrite(output_zip, entry_file.name, arcname='Test1')
325 common.ZipWrite(output_zip, entry_file.name, arcname='Test2')
326 common.ZipWrite(output_zip, entry_file.name, arcname='Test3')
327 common.ZipClose(output_zip)
328 zip_file.close()
329
330 try:
331 common.ZipDelete(zip_file.name, 'Test2')
332 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
333 entries = check_zip.namelist()
334 self.assertTrue('Test1' in entries)
335 self.assertFalse('Test2' in entries)
336 self.assertTrue('Test3' in entries)
337
Tao Bao986ee862018-10-04 15:46:16 -0700338 self.assertRaises(
339 common.ExternalError, common.ZipDelete, zip_file.name, 'Test2')
Tao Bao89d7ab22017-12-14 17:05:33 -0800340 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
341 entries = check_zip.namelist()
342 self.assertTrue('Test1' in entries)
343 self.assertFalse('Test2' in entries)
344 self.assertTrue('Test3' in entries)
345
346 common.ZipDelete(zip_file.name, ['Test3'])
347 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
348 entries = check_zip.namelist()
349 self.assertTrue('Test1' in entries)
350 self.assertFalse('Test2' in entries)
351 self.assertFalse('Test3' in entries)
352
353 common.ZipDelete(zip_file.name, ['Test1', 'Test2'])
354 with zipfile.ZipFile(zip_file.name, 'r') as check_zip:
355 entries = check_zip.namelist()
356 self.assertFalse('Test1' in entries)
357 self.assertFalse('Test2' in entries)
358 self.assertFalse('Test3' in entries)
359 finally:
360 os.remove(zip_file.name)
361
362
Tao Bao65b94e92018-10-11 21:57:26 -0700363class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
Tao Bao818ddf52018-01-05 11:17:34 -0800364 """Tests the APK utils related functions."""
365
366 APKCERTS_TXT1 = (
367 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"'
368 ' private_key="certs/devkey.pk8"\n'
369 'name="Settings.apk"'
370 ' certificate="build/target/product/security/platform.x509.pem"'
371 ' private_key="build/target/product/security/platform.pk8"\n'
372 'name="TV.apk" certificate="PRESIGNED" private_key=""\n'
373 )
374
375 APKCERTS_CERTMAP1 = {
376 'RecoveryLocalizer.apk' : 'certs/devkey',
377 'Settings.apk' : 'build/target/product/security/platform',
378 'TV.apk' : 'PRESIGNED',
379 }
380
381 APKCERTS_TXT2 = (
382 'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"'
383 ' private_key="certs/compressed1.pk8" compressed="gz"\n'
384 'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"'
385 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
386 'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"'
387 ' private_key="certs/compressed2.pk8" compressed="gz"\n'
388 'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"'
389 ' private_key="certs/compressed3.pk8" compressed="gz"\n'
390 )
391
392 APKCERTS_CERTMAP2 = {
393 'Compressed1.apk' : 'certs/compressed1',
394 'Compressed2a.apk' : 'certs/compressed2',
395 'Compressed2b.apk' : 'certs/compressed2',
396 'Compressed3.apk' : 'certs/compressed3',
397 }
398
399 APKCERTS_TXT3 = (
400 'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"'
401 ' private_key="certs/compressed4.pk8" compressed="xz"\n'
402 )
403
404 APKCERTS_CERTMAP3 = {
405 'Compressed4.apk' : 'certs/compressed4',
406 }
407
Tao Bao17e4e612018-02-16 17:12:54 -0800408 def setUp(self):
409 self.testdata_dir = test_utils.get_testdata_dir()
410
Tao Bao818ddf52018-01-05 11:17:34 -0800411 @staticmethod
412 def _write_apkcerts_txt(apkcerts_txt, additional=None):
413 if additional is None:
414 additional = []
415 target_files = common.MakeTempFile(suffix='.zip')
416 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
417 target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt)
418 for entry in additional:
419 target_files_zip.writestr(entry, '')
420 return target_files
421
422 def test_ReadApkCerts_NoncompressedApks(self):
423 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1)
424 with zipfile.ZipFile(target_files, 'r') as input_zip:
425 certmap, ext = common.ReadApkCerts(input_zip)
426
427 self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap)
428 self.assertIsNone(ext)
429
430 def test_ReadApkCerts_CompressedApks(self):
431 # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is
432 # not stored in '.gz' format, so it shouldn't be considered as installed.
433 target_files = self._write_apkcerts_txt(
434 self.APKCERTS_TXT2,
435 ['Compressed1.apk.gz', 'Compressed3.apk'])
436
437 with zipfile.ZipFile(target_files, 'r') as input_zip:
438 certmap, ext = common.ReadApkCerts(input_zip)
439
440 self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap)
441 self.assertEqual('.gz', ext)
442
443 # Alternative case with '.xz'.
444 target_files = self._write_apkcerts_txt(
445 self.APKCERTS_TXT3, ['Compressed4.apk.xz'])
446
447 with zipfile.ZipFile(target_files, 'r') as input_zip:
448 certmap, ext = common.ReadApkCerts(input_zip)
449
450 self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap)
451 self.assertEqual('.xz', ext)
452
453 def test_ReadApkCerts_CompressedAndNoncompressedApks(self):
454 target_files = self._write_apkcerts_txt(
455 self.APKCERTS_TXT1 + self.APKCERTS_TXT2,
456 ['Compressed1.apk.gz', 'Compressed3.apk'])
457
458 with zipfile.ZipFile(target_files, 'r') as input_zip:
459 certmap, ext = common.ReadApkCerts(input_zip)
460
461 certmap_merged = self.APKCERTS_CERTMAP1.copy()
462 certmap_merged.update(self.APKCERTS_CERTMAP2)
463 self.assertDictEqual(certmap_merged, certmap)
464 self.assertEqual('.gz', ext)
465
466 def test_ReadApkCerts_MultipleCompressionMethods(self):
467 target_files = self._write_apkcerts_txt(
468 self.APKCERTS_TXT2 + self.APKCERTS_TXT3,
469 ['Compressed1.apk.gz', 'Compressed4.apk.xz'])
470
471 with zipfile.ZipFile(target_files, 'r') as input_zip:
472 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
473
474 def test_ReadApkCerts_MismatchingKeys(self):
475 malformed_apkcerts_txt = (
476 'name="App1.apk" certificate="certs/cert1.x509.pem"'
477 ' private_key="certs/cert2.pk8"\n'
478 )
479 target_files = self._write_apkcerts_txt(malformed_apkcerts_txt)
480
481 with zipfile.ZipFile(target_files, 'r') as input_zip:
482 self.assertRaises(ValueError, common.ReadApkCerts, input_zip)
483
Tao Bao04e1f012018-02-04 12:13:35 -0800484 def test_ExtractPublicKey(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800485 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
486 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
Tao Bao04e1f012018-02-04 12:13:35 -0800487 with open(pubkey, 'rb') as pubkey_fp:
488 self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
489
490 def test_ExtractPublicKey_invalidInput(self):
Tao Bao17e4e612018-02-16 17:12:54 -0800491 wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8')
Tao Bao04e1f012018-02-04 12:13:35 -0800492 self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input)
493
Tao Bao17e4e612018-02-16 17:12:54 -0800494 def test_ParseCertificate(self):
495 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
496
497 cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
498 proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
499 expected, _ = proc.communicate()
500 self.assertEqual(0, proc.returncode)
501
502 with open(cert) as cert_fp:
503 actual = common.ParseCertificate(cert_fp.read())
504 self.assertEqual(expected, actual)
505
Tao Baof47bf0f2018-03-21 23:28:51 -0700506 def test_GetMinSdkVersion(self):
507 test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
508 self.assertEqual('24', common.GetMinSdkVersion(test_app))
509
510 def test_GetMinSdkVersion_invalidInput(self):
511 self.assertRaises(
512 common.ExternalError, common.GetMinSdkVersion, 'does-not-exist.apk')
513
514 def test_GetMinSdkVersionInt(self):
515 test_app = os.path.join(self.testdata_dir, 'TestApp.apk')
516 self.assertEqual(24, common.GetMinSdkVersionInt(test_app, {}))
517
518 def test_GetMinSdkVersionInt_invalidInput(self):
519 self.assertRaises(
520 common.ExternalError, common.GetMinSdkVersionInt, 'does-not-exist.apk',
521 {})
522
Tao Bao818ddf52018-01-05 11:17:34 -0800523
Tao Bao65b94e92018-10-11 21:57:26 -0700524class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
Tao Baofc7e0e02018-02-13 13:54:02 -0800525
Tao Bao02a08592018-07-22 12:40:45 -0700526 def setUp(self):
527 self.testdata_dir = test_utils.get_testdata_dir()
528
Tao Baofc7e0e02018-02-13 13:54:02 -0800529 def test_GetSparseImage_emptyBlockMapFile(self):
530 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
531 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
532 target_files_zip.write(
533 test_utils.construct_sparse_image([
534 (0xCAC1, 6),
535 (0xCAC3, 3),
536 (0xCAC1, 4)]),
537 arcname='IMAGES/system.img')
538 target_files_zip.writestr('IMAGES/system.map', '')
539 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
540 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
541
Tao Baodba59ee2018-01-09 13:21:02 -0800542 tempdir = common.UnzipTemp(target_files)
543 with zipfile.ZipFile(target_files, 'r') as input_zip:
544 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800545
546 self.assertDictEqual(
547 {
548 '__COPY': RangeSet("0"),
549 '__NONZERO-0': RangeSet("1-5 9-12"),
550 },
551 sparse_image.file_map)
552
553 def test_GetSparseImage_invalidImageName(self):
554 self.assertRaises(
555 AssertionError, common.GetSparseImage, 'system2', None, None, False)
556 self.assertRaises(
557 AssertionError, common.GetSparseImage, 'unknown', None, None, False)
558
559 def test_GetSparseImage_missingBlockMapFile(self):
560 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
561 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
562 target_files_zip.write(
563 test_utils.construct_sparse_image([
564 (0xCAC1, 6),
565 (0xCAC3, 3),
566 (0xCAC1, 4)]),
567 arcname='IMAGES/system.img')
568 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8))
569 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
570
Tao Baodba59ee2018-01-09 13:21:02 -0800571 tempdir = common.UnzipTemp(target_files)
572 with zipfile.ZipFile(target_files, 'r') as input_zip:
573 self.assertRaises(
574 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
575 False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800576
577 def test_GetSparseImage_sharedBlocks_notAllowed(self):
578 """Tests the case of having overlapping blocks but disallowed."""
579 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
580 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
581 target_files_zip.write(
582 test_utils.construct_sparse_image([(0xCAC2, 16)]),
583 arcname='IMAGES/system.img')
584 # Block 10 is shared between two files.
585 target_files_zip.writestr(
586 'IMAGES/system.map',
587 '\n'.join([
588 '/system/file1 1-5 9-10',
589 '/system/file2 10-12']))
590 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
591 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
592
Tao Baodba59ee2018-01-09 13:21:02 -0800593 tempdir = common.UnzipTemp(target_files)
594 with zipfile.ZipFile(target_files, 'r') as input_zip:
595 self.assertRaises(
596 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
597 False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800598
599 def test_GetSparseImage_sharedBlocks_allowed(self):
600 """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true."""
601 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
602 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
603 # Construct an image with a care_map of "0-5 9-12".
604 target_files_zip.write(
605 test_utils.construct_sparse_image([(0xCAC2, 16)]),
606 arcname='IMAGES/system.img')
607 # Block 10 is shared between two files.
608 target_files_zip.writestr(
609 'IMAGES/system.map',
610 '\n'.join([
611 '/system/file1 1-5 9-10',
612 '/system/file2 10-12']))
613 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
614 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
615
Tao Baodba59ee2018-01-09 13:21:02 -0800616 tempdir = common.UnzipTemp(target_files)
617 with zipfile.ZipFile(target_files, 'r') as input_zip:
618 sparse_image = common.GetSparseImage('system', tempdir, input_zip, True)
Tao Baofc7e0e02018-02-13 13:54:02 -0800619
620 self.assertDictEqual(
621 {
622 '__COPY': RangeSet("0"),
623 '__NONZERO-0': RangeSet("6-8 13-15"),
624 '/system/file1': RangeSet("1-5 9-10"),
625 '/system/file2': RangeSet("11-12"),
626 },
627 sparse_image.file_map)
628
629 # '/system/file2' should be marked with 'uses_shared_blocks', but not with
630 # 'incomplete'.
631 self.assertTrue(
632 sparse_image.file_map['/system/file2'].extra['uses_shared_blocks'])
633 self.assertNotIn(
634 'incomplete', sparse_image.file_map['/system/file2'].extra)
635
636 # All other entries should look normal without any tags.
637 self.assertFalse(sparse_image.file_map['__COPY'].extra)
638 self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra)
639 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
640
641 def test_GetSparseImage_incompleteRanges(self):
642 """Tests the case of ext4 images with holes."""
643 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
644 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
645 target_files_zip.write(
646 test_utils.construct_sparse_image([(0xCAC2, 16)]),
647 arcname='IMAGES/system.img')
648 target_files_zip.writestr(
649 'IMAGES/system.map',
650 '\n'.join([
651 '/system/file1 1-5 9-10',
652 '/system/file2 11-12']))
653 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
654 # '/system/file2' has less blocks listed (2) than actual (3).
655 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
656
Tao Baodba59ee2018-01-09 13:21:02 -0800657 tempdir = common.UnzipTemp(target_files)
658 with zipfile.ZipFile(target_files, 'r') as input_zip:
659 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
Tao Baofc7e0e02018-02-13 13:54:02 -0800660
661 self.assertFalse(sparse_image.file_map['/system/file1'].extra)
662 self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete'])
663
Tao Baod3554e62018-07-10 15:31:22 -0700664 def test_GetSparseImage_systemRootImage_filenameWithExtraLeadingSlash(self):
665 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
666 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
667 target_files_zip.write(
668 test_utils.construct_sparse_image([(0xCAC2, 16)]),
669 arcname='IMAGES/system.img')
670 target_files_zip.writestr(
671 'IMAGES/system.map',
672 '\n'.join([
673 '//system/file1 1-5 9-10',
674 '//system/file2 11-12',
675 '/system/app/file3 13-15']))
676 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
677 # '/system/file2' has less blocks listed (2) than actual (3).
678 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3))
679 # '/system/app/file3' has less blocks listed (3) than actual (4).
680 target_files_zip.writestr('SYSTEM/app/file3', os.urandom(4096 * 4))
681
682 tempdir = common.UnzipTemp(target_files)
683 with zipfile.ZipFile(target_files, 'r') as input_zip:
684 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
685
686 self.assertFalse(sparse_image.file_map['//system/file1'].extra)
687 self.assertTrue(sparse_image.file_map['//system/file2'].extra['incomplete'])
688 self.assertTrue(
689 sparse_image.file_map['/system/app/file3'].extra['incomplete'])
690
691 def test_GetSparseImage_systemRootImage_nonSystemFiles(self):
692 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
693 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
694 target_files_zip.write(
695 test_utils.construct_sparse_image([(0xCAC2, 16)]),
696 arcname='IMAGES/system.img')
697 target_files_zip.writestr(
698 'IMAGES/system.map',
699 '\n'.join([
700 '//system/file1 1-5 9-10',
701 '//init.rc 13-15']))
702 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
703 # '/init.rc' has less blocks listed (3) than actual (4).
704 target_files_zip.writestr('ROOT/init.rc', os.urandom(4096 * 4))
705
706 tempdir = common.UnzipTemp(target_files)
707 with zipfile.ZipFile(target_files, 'r') as input_zip:
708 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False)
709
710 self.assertFalse(sparse_image.file_map['//system/file1'].extra)
711 self.assertTrue(sparse_image.file_map['//init.rc'].extra['incomplete'])
712
713 def test_GetSparseImage_fileNotFound(self):
714 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
715 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
716 target_files_zip.write(
717 test_utils.construct_sparse_image([(0xCAC2, 16)]),
718 arcname='IMAGES/system.img')
719 target_files_zip.writestr(
720 'IMAGES/system.map',
721 '\n'.join([
722 '//system/file1 1-5 9-10',
723 '//system/file2 11-12']))
724 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7))
725
726 tempdir = common.UnzipTemp(target_files)
727 with zipfile.ZipFile(target_files, 'r') as input_zip:
728 self.assertRaises(
729 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip,
730 False)
731
Tao Bao02a08592018-07-22 12:40:45 -0700732 def test_GetAvbChainedPartitionArg(self):
733 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
734 info_dict = {
735 'avb_avbtool': 'avbtool',
736 'avb_system_key_path': pubkey,
737 'avb_system_rollback_index_location': 2,
738 }
739 args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
740 self.assertEqual(3, len(args))
741 self.assertEqual('system', args[0])
742 self.assertEqual('2', args[1])
743 self.assertTrue(os.path.exists(args[2]))
744
745 def test_GetAvbChainedPartitionArg_withPrivateKey(self):
746 key = os.path.join(self.testdata_dir, 'testkey.key')
747 info_dict = {
748 'avb_avbtool': 'avbtool',
749 'avb_product_key_path': key,
750 'avb_product_rollback_index_location': 2,
751 }
752 args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
753 self.assertEqual(3, len(args))
754 self.assertEqual('product', args[0])
755 self.assertEqual('2', args[1])
756 self.assertTrue(os.path.exists(args[2]))
757
758 def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
759 info_dict = {
760 'avb_avbtool': 'avbtool',
761 'avb_system_key_path': 'does-not-exist',
762 'avb_system_rollback_index_location': 2,
763 }
764 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
765 args = common.GetAvbChainedPartitionArg(
766 'system', info_dict, pubkey).split(':')
767 self.assertEqual(3, len(args))
768 self.assertEqual('system', args[0])
769 self.assertEqual('2', args[1])
770 self.assertTrue(os.path.exists(args[2]))
771
772 def test_GetAvbChainedPartitionArg_invalidKey(self):
773 pubkey = os.path.join(self.testdata_dir, 'testkey_with_passwd.x509.pem')
774 info_dict = {
775 'avb_avbtool': 'avbtool',
776 'avb_system_key_path': pubkey,
777 'avb_system_rollback_index_location': 2,
778 }
779 self.assertRaises(
Tao Bao986ee862018-10-04 15:46:16 -0700780 common.ExternalError, common.GetAvbChainedPartitionArg, 'system',
781 info_dict)
Tao Bao02a08592018-07-22 12:40:45 -0700782
Tao Baoa57ab9f2018-08-24 12:08:38 -0700783 INFO_DICT_DEFAULT = {
784 'recovery_api_version': 3,
785 'fstab_version': 2,
786 'system_root_image': 'true',
787 'no_recovery' : 'true',
788 'recovery_as_boot': 'true',
789 }
790
791 @staticmethod
792 def _test_LoadInfoDict_createTargetFiles(info_dict, fstab_path):
793 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
794 with zipfile.ZipFile(target_files, 'w') as target_files_zip:
795 info_values = ''.join(
796 ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.iteritems())])
797 common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
798
799 FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults"
800 if info_dict.get('system_root_image') == 'true':
801 fstab_values = FSTAB_TEMPLATE.format('/')
802 else:
803 fstab_values = FSTAB_TEMPLATE.format('/system')
804 common.ZipWriteStr(target_files_zip, fstab_path, fstab_values)
Tao Bao410ad8b2018-08-24 12:08:38 -0700805
806 common.ZipWriteStr(
807 target_files_zip, 'META/file_contexts', 'file-contexts')
Tao Baoa57ab9f2018-08-24 12:08:38 -0700808 return target_files
809
810 def test_LoadInfoDict(self):
811 target_files = self._test_LoadInfoDict_createTargetFiles(
812 self.INFO_DICT_DEFAULT,
813 'BOOT/RAMDISK/system/etc/recovery.fstab')
814 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
815 loaded_dict = common.LoadInfoDict(target_files_zip)
816 self.assertEqual(3, loaded_dict['recovery_api_version'])
817 self.assertEqual(2, loaded_dict['fstab_version'])
818 self.assertIn('/', loaded_dict['fstab'])
819 self.assertIn('/system', loaded_dict['fstab'])
820
821 def test_LoadInfoDict_legacyRecoveryFstabPath(self):
822 target_files = self._test_LoadInfoDict_createTargetFiles(
823 self.INFO_DICT_DEFAULT,
824 'BOOT/RAMDISK/etc/recovery.fstab')
825 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
826 loaded_dict = common.LoadInfoDict(target_files_zip)
827 self.assertEqual(3, loaded_dict['recovery_api_version'])
828 self.assertEqual(2, loaded_dict['fstab_version'])
829 self.assertIn('/', loaded_dict['fstab'])
830 self.assertIn('/system', loaded_dict['fstab'])
831
832 def test_LoadInfoDict_dirInput(self):
833 target_files = self._test_LoadInfoDict_createTargetFiles(
834 self.INFO_DICT_DEFAULT,
835 'BOOT/RAMDISK/system/etc/recovery.fstab')
836 unzipped = common.UnzipTemp(target_files)
837 loaded_dict = common.LoadInfoDict(unzipped)
838 self.assertEqual(3, loaded_dict['recovery_api_version'])
839 self.assertEqual(2, loaded_dict['fstab_version'])
840 self.assertIn('/', loaded_dict['fstab'])
841 self.assertIn('/system', loaded_dict['fstab'])
842
843 def test_LoadInfoDict_dirInput_legacyRecoveryFstabPath(self):
844 target_files = self._test_LoadInfoDict_createTargetFiles(
845 self.INFO_DICT_DEFAULT,
846 'BOOT/RAMDISK/system/etc/recovery.fstab')
847 unzipped = common.UnzipTemp(target_files)
848 loaded_dict = common.LoadInfoDict(unzipped)
849 self.assertEqual(3, loaded_dict['recovery_api_version'])
850 self.assertEqual(2, loaded_dict['fstab_version'])
851 self.assertIn('/', loaded_dict['fstab'])
852 self.assertIn('/system', loaded_dict['fstab'])
853
854 def test_LoadInfoDict_systemRootImageFalse(self):
855 # Devices not using system-as-root nor recovery-as-boot. Non-A/B devices
856 # launched prior to P will likely have this config.
857 info_dict = copy.copy(self.INFO_DICT_DEFAULT)
858 del info_dict['no_recovery']
859 del info_dict['system_root_image']
860 del info_dict['recovery_as_boot']
861 target_files = self._test_LoadInfoDict_createTargetFiles(
862 info_dict,
863 'RECOVERY/RAMDISK/system/etc/recovery.fstab')
864 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
865 loaded_dict = common.LoadInfoDict(target_files_zip)
866 self.assertEqual(3, loaded_dict['recovery_api_version'])
867 self.assertEqual(2, loaded_dict['fstab_version'])
868 self.assertNotIn('/', loaded_dict['fstab'])
869 self.assertIn('/system', loaded_dict['fstab'])
870
871 def test_LoadInfoDict_recoveryAsBootFalse(self):
872 # Devices using system-as-root, but with standalone recovery image. Non-A/B
873 # devices launched since P will likely have this config.
874 info_dict = copy.copy(self.INFO_DICT_DEFAULT)
875 del info_dict['no_recovery']
876 del info_dict['recovery_as_boot']
877 target_files = self._test_LoadInfoDict_createTargetFiles(
878 info_dict,
879 'RECOVERY/RAMDISK/system/etc/recovery.fstab')
880 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
881 loaded_dict = common.LoadInfoDict(target_files_zip)
882 self.assertEqual(3, loaded_dict['recovery_api_version'])
883 self.assertEqual(2, loaded_dict['fstab_version'])
884 self.assertIn('/', loaded_dict['fstab'])
885 self.assertIn('/system', loaded_dict['fstab'])
886
887 def test_LoadInfoDict_noRecoveryTrue(self):
888 # Device doesn't have a recovery partition at all.
889 info_dict = copy.copy(self.INFO_DICT_DEFAULT)
890 del info_dict['recovery_as_boot']
891 target_files = self._test_LoadInfoDict_createTargetFiles(
892 info_dict,
893 'RECOVERY/RAMDISK/system/etc/recovery.fstab')
894 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
895 loaded_dict = common.LoadInfoDict(target_files_zip)
896 self.assertEqual(3, loaded_dict['recovery_api_version'])
897 self.assertEqual(2, loaded_dict['fstab_version'])
898 self.assertIsNone(loaded_dict['fstab'])
899
Tao Bao410ad8b2018-08-24 12:08:38 -0700900 def test_LoadInfoDict_missingMetaMiscInfoTxt(self):
901 target_files = self._test_LoadInfoDict_createTargetFiles(
902 self.INFO_DICT_DEFAULT,
903 'BOOT/RAMDISK/system/etc/recovery.fstab')
904 common.ZipDelete(target_files, 'META/misc_info.txt')
905 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
906 self.assertRaises(ValueError, common.LoadInfoDict, target_files_zip)
907
908 def test_LoadInfoDict_repacking(self):
909 target_files = self._test_LoadInfoDict_createTargetFiles(
910 self.INFO_DICT_DEFAULT,
911 'BOOT/RAMDISK/system/etc/recovery.fstab')
912 unzipped = common.UnzipTemp(target_files)
913 loaded_dict = common.LoadInfoDict(unzipped, True)
914 self.assertEqual(3, loaded_dict['recovery_api_version'])
915 self.assertEqual(2, loaded_dict['fstab_version'])
916 self.assertIn('/', loaded_dict['fstab'])
917 self.assertIn('/system', loaded_dict['fstab'])
918 self.assertEqual(
919 os.path.join(unzipped, 'ROOT'), loaded_dict['root_dir'])
920 self.assertEqual(
921 os.path.join(unzipped, 'META', 'root_filesystem_config.txt'),
922 loaded_dict['root_fs_config'])
923
924 def test_LoadInfoDict_repackingWithZipFileInput(self):
925 target_files = self._test_LoadInfoDict_createTargetFiles(
926 self.INFO_DICT_DEFAULT,
927 'BOOT/RAMDISK/system/etc/recovery.fstab')
928 with zipfile.ZipFile(target_files, 'r') as target_files_zip:
929 self.assertRaises(
930 AssertionError, common.LoadInfoDict, target_files_zip, True)
931
Tao Baofc7e0e02018-02-13 13:54:02 -0800932
Tao Bao65b94e92018-10-11 21:57:26 -0700933class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
Tao Bao1c830bf2017-12-25 10:43:47 -0800934 """Checks the format of install-recovery.sh.
Tianjie Xu9c384d22017-06-20 17:00:55 -0700935
Tao Bao1c830bf2017-12-25 10:43:47 -0800936 Its format should match between common.py and validate_target_files.py.
937 """
Tianjie Xu9c384d22017-06-20 17:00:55 -0700938
939 def setUp(self):
Tao Bao1c830bf2017-12-25 10:43:47 -0800940 self._tempdir = common.MakeTempDir()
Tianjie Xu9c384d22017-06-20 17:00:55 -0700941 # Create a dummy dict that contains the fstab info for boot&recovery.
942 self._info = {"fstab" : {}}
Tao Bao1c830bf2017-12-25 10:43:47 -0800943 dummy_fstab = [
944 "/dev/soc.0/by-name/boot /boot emmc defaults defaults",
945 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
Tao Bao31b08072017-11-08 15:50:59 -0800946 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
Tianjie Xudf055582017-11-07 12:22:58 -0800947 # Construct the gzipped recovery.img and boot.img
948 self.recovery_data = bytearray([
949 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
950 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
951 0x08, 0x00, 0x00, 0x00
952 ])
953 # echo -n "boot" | gzip -f | hd
954 self.boot_data = bytearray([
955 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
956 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
957 ])
Tianjie Xu9c384d22017-06-20 17:00:55 -0700958
959 def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
960 loc = os.path.join(self._tempdir, prefix, name)
961 if not os.path.exists(os.path.dirname(loc)):
962 os.makedirs(os.path.dirname(loc))
963 with open(loc, "w+") as f:
964 f.write(data)
965
966 def test_full_recovery(self):
Tao Bao31b08072017-11-08 15:50:59 -0800967 recovery_image = common.File("recovery.img", self.recovery_data)
968 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700969 self._info["full_recovery_image"] = "true"
970
971 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
972 recovery_image, boot_image, self._info)
973 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
974 self._info)
975
976 def test_recovery_from_boot(self):
Tao Bao31b08072017-11-08 15:50:59 -0800977 recovery_image = common.File("recovery.img", self.recovery_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700978 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
Tao Bao31b08072017-11-08 15:50:59 -0800979 boot_image = common.File("boot.img", self.boot_data)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700980 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
981
982 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
983 recovery_image, boot_image, self._info)
984 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
985 self._info)
986 # Validate 'recovery-from-boot' with bonus argument.
987 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
988 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
989 recovery_image, boot_image, self._info)
990 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
991 self._info)
Yifan Hong45433e42019-01-18 13:55:25 -0800992
993
994class MockScriptWriter(object):
995 """A class that mocks edify_generator.EdifyGenerator.
996 """
997 def __init__(self, enable_comments=False):
998 self.lines = []
999 self.enable_comments = enable_comments
1000 def Comment(self, comment):
1001 if self.enable_comments:
1002 self.lines.append("# {}".format(comment))
1003 def AppendExtra(self, extra):
1004 self.lines.append(extra)
1005 def __str__(self):
1006 return "\n".join(self.lines)
1007
1008
1009class MockBlockDifference(object):
1010 def __init__(self, partition, tgt, src=None):
1011 self.partition = partition
1012 self.tgt = tgt
1013 self.src = src
1014 def WriteScript(self, script, _, progress=None,
1015 write_verify_script=False):
1016 if progress:
1017 script.AppendExtra("progress({})".format(progress))
1018 script.AppendExtra("patch({});".format(self.partition))
1019 if write_verify_script:
1020 self.WritePostInstallVerifyScript(script)
1021 def WritePostInstallVerifyScript(self, script):
1022 script.AppendExtra("verify({});".format(self.partition))
1023
1024
1025class FakeSparseImage(object):
1026 def __init__(self, size):
1027 self.blocksize = 4096
1028 self.total_blocks = size // 4096
1029 assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
1030
1031
1032class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
1033 @staticmethod
1034 def get_op_list(output_path):
1035 with zipfile.ZipFile(output_path, 'r') as output_zip:
1036 with output_zip.open("dynamic_partitions_op_list") as op_list:
1037 return [line.strip() for line in op_list.readlines()
1038 if not line.startswith("#")]
1039
1040 def setUp(self):
1041 self.script = MockScriptWriter()
1042 self.output_path = common.MakeTempFile(suffix='.zip')
1043
1044 def test_full(self):
1045 target_info = common.LoadDictionaryFromLines("""
1046dynamic_partition_list=system vendor
1047super_partition_groups=group_foo
1048super_group_foo_group_size={group_size}
1049super_group_foo_partition_list=system vendor
1050""".format(group_size=4 * GiB).split("\n"))
1051 block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
1052 MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
1053
1054 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
1055 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1056 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1057
1058 self.assertEqual(str(self.script).strip(), """
1059assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
1060patch(vendor);
1061verify(vendor);
1062unmap_partition("vendor");
1063patch(system);
1064verify(system);
1065unmap_partition("system");
1066""".strip())
1067
1068 lines = self.get_op_list(self.output_path)
1069
1070 remove_all_groups = lines.index("remove_all_groups")
1071 add_group = lines.index("add_group group_foo 4294967296")
1072 add_vendor = lines.index("add vendor group_foo")
1073 add_system = lines.index("add system group_foo")
1074 resize_vendor = lines.index("resize vendor 1073741824")
1075 resize_system = lines.index("resize system 3221225472")
1076
1077 self.assertLess(remove_all_groups, add_group,
1078 "Should add groups after removing all groups")
1079 self.assertLess(add_group, min(add_vendor, add_system),
1080 "Should add partitions after adding group")
1081 self.assertLess(add_system, resize_system,
1082 "Should resize system after adding it")
1083 self.assertLess(add_vendor, resize_vendor,
1084 "Should resize vendor after adding it")
1085
1086 def test_inc_groups(self):
1087 source_info = common.LoadDictionaryFromLines("""
1088super_partition_groups=group_foo group_bar group_baz
1089super_group_foo_group_size={group_foo_size}
1090super_group_bar_group_size={group_bar_size}
1091""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
1092 target_info = common.LoadDictionaryFromLines("""
1093super_partition_groups=group_foo group_baz group_qux
1094super_group_foo_group_size={group_foo_size}
1095super_group_baz_group_size={group_baz_size}
1096super_group_qux_group_size={group_qux_size}
1097""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
1098 group_qux_size=1 * GiB).split("\n"))
1099
1100 dp_diff = common.DynamicPartitionsDifference(target_info,
1101 block_diffs=[],
1102 source_info_dict=source_info)
1103 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1104 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1105
1106 lines = self.get_op_list(self.output_path)
1107
1108 removed = lines.index("remove_group group_bar")
1109 shrunk = lines.index("resize_group group_foo 3221225472")
1110 grown = lines.index("resize_group group_baz 4294967296")
1111 added = lines.index("add_group group_qux 1073741824")
1112
1113 self.assertLess(max(removed, shrunk) < min(grown, added),
1114 "ops that remove / shrink partitions must precede ops that "
1115 "grow / add partitions")
1116
Yifan Hongbb2658d2019-01-25 12:30:58 -08001117 def test_incremental(self):
Yifan Hong45433e42019-01-18 13:55:25 -08001118 source_info = common.LoadDictionaryFromLines("""
1119dynamic_partition_list=system vendor product product_services
1120super_partition_groups=group_foo
1121super_group_foo_group_size={group_foo_size}
1122super_group_foo_partition_list=system vendor product product_services
1123""".format(group_foo_size=4 * GiB).split("\n"))
1124 target_info = common.LoadDictionaryFromLines("""
1125dynamic_partition_list=system vendor product odm
1126super_partition_groups=group_foo group_bar
1127super_group_foo_group_size={group_foo_size}
1128super_group_foo_partition_list=system vendor odm
1129super_group_bar_group_size={group_bar_size}
1130super_group_bar_partition_list=product
1131""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
1132
1133 block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
1134 src=FakeSparseImage(1024 * MiB)),
1135 MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
1136 src=FakeSparseImage(1024 * MiB)),
1137 MockBlockDifference("product", FakeSparseImage(1024 * MiB),
1138 src=FakeSparseImage(1024 * MiB)),
1139 MockBlockDifference("product_services", None,
1140 src=FakeSparseImage(1024 * MiB)),
1141 MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
1142 src=None)]
1143
1144 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1145 source_info_dict=source_info)
1146 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1147 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1148
1149 metadata_idx = self.script.lines.index(
1150 'assert(update_dynamic_partitions(package_extract_file('
1151 '"dynamic_partitions_op_list")));')
1152 self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
1153 self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
1154 for p in ("product", "system", "odm"):
1155 patch_idx = self.script.lines.index("patch({});".format(p))
1156 verify_idx = self.script.lines.index("verify({});".format(p))
1157 self.assertLess(metadata_idx, patch_idx,
1158 "Should patch {} after updating metadata".format(p))
1159 self.assertLess(patch_idx, verify_idx,
1160 "Should verify {} after patching".format(p))
1161
1162 self.assertNotIn("patch(product_services);", self.script.lines)
1163
1164 lines = self.get_op_list(self.output_path)
1165
1166 remove = lines.index("remove product_services")
1167 move_product_out = lines.index("move product default")
1168 shrink = lines.index("resize vendor 536870912")
1169 shrink_group = lines.index("resize_group group_foo 3221225472")
1170 add_group_bar = lines.index("add_group group_bar 1073741824")
1171 add_odm = lines.index("add odm group_foo")
1172 grow_existing = lines.index("resize system 1610612736")
1173 grow_added = lines.index("resize odm 1073741824")
1174 move_product_in = lines.index("move product group_bar")
1175
1176 max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
1177 min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
1178
1179 self.assertLess(max_idx_move_partition_out_foo, shrink_group,
1180 "Must shrink group after partitions inside group are shrunk"
1181 " / removed")
1182
1183 self.assertLess(add_group_bar, move_product_in,
1184 "Must add partitions to group after group is added")
1185
1186 self.assertLess(max_idx_move_partition_out_foo,
1187 min_idx_move_partition_in_foo,
1188 "Must shrink partitions / remove partitions from group"
1189 "before adding / moving partitions into group")
Yifan Hongbb2658d2019-01-25 12:30:58 -08001190
1191 def test_remove_partition(self):
1192 source_info = common.LoadDictionaryFromLines("""
1193blockimgdiff_versions=3,4
1194use_dynamic_partitions=true
1195dynamic_partition_list=foo
1196super_partition_groups=group_foo
1197super_group_foo_group_size={group_foo_size}
1198super_group_foo_partition_list=foo
1199""".format(group_foo_size=4 * GiB).split("\n"))
1200 target_info = common.LoadDictionaryFromLines("""
1201blockimgdiff_versions=3,4
1202use_dynamic_partitions=true
1203super_partition_groups=group_foo
1204super_group_foo_group_size={group_foo_size}
1205""".format(group_foo_size=4 * GiB).split("\n"))
1206
1207 common.OPTIONS.info_dict = target_info
1208 common.OPTIONS.target_info_dict = target_info
1209 common.OPTIONS.source_info_dict = source_info
1210 common.OPTIONS.cache_size = 4 * 4096
1211
1212 block_diffs = [common.BlockDifference("foo", EmptyImage(),
1213 src=DataImage("source", pad=True))]
1214
1215 dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
1216 source_info_dict=source_info)
1217 with zipfile.ZipFile(self.output_path, 'w') as output_zip:
1218 dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
1219
1220 self.assertNotIn("block_image_update", str(self.script),
1221 "Removed partition should not be patched.")
1222
1223 lines = self.get_op_list(self.output_path)
1224 self.assertEqual(lines, ["remove foo"])