blob: 3dac58950d91cb27b34a8f14b8d96ccb6276b685 [file] [log] [blame]
Dan Albert8e0178d2015-01-27 15:53:15 -08001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
Tianjie Xu9c384d22017-06-20 17:00:55 -070017import shutil
Dan Albert8e0178d2015-01-27 15:53:15 -080018import tempfile
19import time
20import unittest
21import zipfile
22
23import common
Tianjie Xu9c384d22017-06-20 17:00:55 -070024import validate_target_files
Dan Albert8e0178d2015-01-27 15:53:15 -080025
26
27def random_string_with_holes(size, block_size, step_size):
28 data = ["\0"] * size
29 for begin in range(0, size, step_size):
30 end = begin + block_size
31 data[begin:end] = os.urandom(block_size)
32 return "".join(data)
33
Tao Baof3282b42015-04-01 11:21:55 -070034def get_2gb_string():
35 kilobytes = 1024
36 megabytes = 1024 * kilobytes
37 gigabytes = 1024 * megabytes
38
39 size = int(2 * gigabytes + 1)
40 block_size = 4 * kilobytes
41 step_size = 4 * megabytes
42 two_gb_string = random_string_with_holes(
43 size, block_size, step_size)
44 return two_gb_string
45
Dan Albert8e0178d2015-01-27 15:53:15 -080046
47class CommonZipTest(unittest.TestCase):
Tao Baof3282b42015-04-01 11:21:55 -070048 def _verify(self, zip_file, zip_file_name, arcname, contents,
49 test_file_name=None, expected_stat=None, expected_mode=0o644,
50 expected_compress_type=zipfile.ZIP_STORED):
51 # Verify the stat if present.
52 if test_file_name is not None:
53 new_stat = os.stat(test_file_name)
54 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
55 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
56
57 # Reopen the zip file to verify.
58 zip_file = zipfile.ZipFile(zip_file_name, "r")
59
60 # Verify the timestamp.
61 info = zip_file.getinfo(arcname)
62 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
63
64 # Verify the file mode.
65 mode = (info.external_attr >> 16) & 0o777
66 self.assertEqual(mode, expected_mode)
67
68 # Verify the compress type.
69 self.assertEqual(info.compress_type, expected_compress_type)
70
71 # Verify the zip contents.
72 self.assertEqual(zip_file.read(arcname), contents)
73 self.assertIsNone(zip_file.testzip())
74
Dan Albert8e0178d2015-01-27 15:53:15 -080075 def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
76 extra_zipwrite_args = dict(extra_zipwrite_args or {})
77
78 test_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080079 test_file_name = test_file.name
Tao Baof3282b42015-04-01 11:21:55 -070080
81 zip_file = tempfile.NamedTemporaryFile(delete=False)
Dan Albert8e0178d2015-01-27 15:53:15 -080082 zip_file_name = zip_file.name
83
84 # File names within an archive strip the leading slash.
85 arcname = extra_zipwrite_args.get("arcname", test_file_name)
86 if arcname[0] == "/":
87 arcname = arcname[1:]
88
89 zip_file.close()
90 zip_file = zipfile.ZipFile(zip_file_name, "w")
91
92 try:
93 test_file.write(contents)
94 test_file.close()
95
Tao Baof3282b42015-04-01 11:21:55 -070096 expected_stat = os.stat(test_file_name)
Dan Albert8e0178d2015-01-27 15:53:15 -080097 expected_mode = extra_zipwrite_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -070098 expected_compress_type = extra_zipwrite_args.get("compress_type",
99 zipfile.ZIP_STORED)
Dan Albert8e0178d2015-01-27 15:53:15 -0800100 time.sleep(5) # Make sure the atime/mtime will change measurably.
101
102 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
Tao Baof3282b42015-04-01 11:21:55 -0700103 common.ZipClose(zip_file)
Dan Albert8e0178d2015-01-27 15:53:15 -0800104
Tao Baof3282b42015-04-01 11:21:55 -0700105 self._verify(zip_file, zip_file_name, arcname, contents, test_file_name,
106 expected_stat, expected_mode, expected_compress_type)
Dan Albert8e0178d2015-01-27 15:53:15 -0800107 finally:
108 os.remove(test_file_name)
109 os.remove(zip_file_name)
110
Tao Baof3282b42015-04-01 11:21:55 -0700111 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
112 extra_args = dict(extra_args or {})
113
114 zip_file = tempfile.NamedTemporaryFile(delete=False)
115 zip_file_name = zip_file.name
116 zip_file.close()
117
118 zip_file = zipfile.ZipFile(zip_file_name, "w")
119
120 try:
121 expected_compress_type = extra_args.get("compress_type",
122 zipfile.ZIP_STORED)
123 time.sleep(5) # Make sure the atime/mtime will change measurably.
124
125 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
Tao Bao58c1b962015-05-20 09:32:18 -0700126 arcname = zinfo_or_arcname
127 expected_mode = extra_args.get("perms", 0o644)
Tao Baof3282b42015-04-01 11:21:55 -0700128 else:
Tao Bao58c1b962015-05-20 09:32:18 -0700129 arcname = zinfo_or_arcname.filename
130 expected_mode = extra_args.get("perms",
131 zinfo_or_arcname.external_attr >> 16)
Tao Baof3282b42015-04-01 11:21:55 -0700132
Tao Bao58c1b962015-05-20 09:32:18 -0700133 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
Tao Baof3282b42015-04-01 11:21:55 -0700134 common.ZipClose(zip_file)
135
136 self._verify(zip_file, zip_file_name, arcname, contents,
Tao Bao58c1b962015-05-20 09:32:18 -0700137 expected_mode=expected_mode,
Tao Baof3282b42015-04-01 11:21:55 -0700138 expected_compress_type=expected_compress_type)
139 finally:
140 os.remove(zip_file_name)
141
142 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
143 extra_args = dict(extra_args or {})
144
145 zip_file = tempfile.NamedTemporaryFile(delete=False)
146 zip_file_name = zip_file.name
147
148 test_file = tempfile.NamedTemporaryFile(delete=False)
149 test_file_name = test_file.name
150
151 arcname_large = test_file_name
152 arcname_small = "bar"
153
154 # File names within an archive strip the leading slash.
155 if arcname_large[0] == "/":
156 arcname_large = arcname_large[1:]
157
158 zip_file.close()
159 zip_file = zipfile.ZipFile(zip_file_name, "w")
160
161 try:
162 test_file.write(large)
163 test_file.close()
164
165 expected_stat = os.stat(test_file_name)
166 expected_mode = 0o644
167 expected_compress_type = extra_args.get("compress_type",
168 zipfile.ZIP_STORED)
169 time.sleep(5) # Make sure the atime/mtime will change measurably.
170
171 common.ZipWrite(zip_file, test_file_name, **extra_args)
172 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
173 common.ZipClose(zip_file)
174
175 # Verify the contents written by ZipWrite().
176 self._verify(zip_file, zip_file_name, arcname_large, large,
177 test_file_name, expected_stat, expected_mode,
178 expected_compress_type)
179
180 # Verify the contents written by ZipWriteStr().
181 self._verify(zip_file, zip_file_name, arcname_small, small,
182 expected_compress_type=expected_compress_type)
183 finally:
184 os.remove(zip_file_name)
185 os.remove(test_file_name)
186
187 def _test_reset_ZIP64_LIMIT(self, func, *args):
188 default_limit = (1 << 31) - 1
189 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
190 func(*args)
191 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
192
Dan Albert8e0178d2015-01-27 15:53:15 -0800193 def test_ZipWrite(self):
194 file_contents = os.urandom(1024)
195 self._test_ZipWrite(file_contents)
196
197 def test_ZipWrite_with_opts(self):
198 file_contents = os.urandom(1024)
199 self._test_ZipWrite(file_contents, {
200 "arcname": "foobar",
201 "perms": 0o777,
202 "compress_type": zipfile.ZIP_DEFLATED,
203 })
Tao Baof3282b42015-04-01 11:21:55 -0700204 self._test_ZipWrite(file_contents, {
205 "arcname": "foobar",
206 "perms": 0o700,
207 "compress_type": zipfile.ZIP_STORED,
208 })
Dan Albert8e0178d2015-01-27 15:53:15 -0800209
210 def test_ZipWrite_large_file(self):
Tao Baof3282b42015-04-01 11:21:55 -0700211 file_contents = get_2gb_string()
Dan Albert8e0178d2015-01-27 15:53:15 -0800212 self._test_ZipWrite(file_contents, {
213 "compress_type": zipfile.ZIP_DEFLATED,
214 })
215
216 def test_ZipWrite_resets_ZIP64_LIMIT(self):
Tao Baof3282b42015-04-01 11:21:55 -0700217 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
218
219 def test_ZipWriteStr(self):
220 random_string = os.urandom(1024)
221 # Passing arcname
222 self._test_ZipWriteStr("foo", random_string)
223
224 # Passing zinfo
225 zinfo = zipfile.ZipInfo(filename="foo")
226 self._test_ZipWriteStr(zinfo, random_string)
227
228 # Timestamp in the zinfo should be overwritten.
229 zinfo.date_time = (2015, 3, 1, 15, 30, 0)
230 self._test_ZipWriteStr(zinfo, random_string)
231
232 def test_ZipWriteStr_with_opts(self):
233 random_string = os.urandom(1024)
234 # Passing arcname
235 self._test_ZipWriteStr("foo", random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700236 "perms": 0o700,
Tao Baof3282b42015-04-01 11:21:55 -0700237 "compress_type": zipfile.ZIP_DEFLATED,
238 })
Tao Bao58c1b962015-05-20 09:32:18 -0700239 self._test_ZipWriteStr("bar", random_string, {
Tao Baof3282b42015-04-01 11:21:55 -0700240 "compress_type": zipfile.ZIP_STORED,
241 })
242
243 # Passing zinfo
244 zinfo = zipfile.ZipInfo(filename="foo")
245 self._test_ZipWriteStr(zinfo, random_string, {
246 "compress_type": zipfile.ZIP_DEFLATED,
247 })
248 self._test_ZipWriteStr(zinfo, random_string, {
Tao Bao58c1b962015-05-20 09:32:18 -0700249 "perms": 0o600,
Tao Baof3282b42015-04-01 11:21:55 -0700250 "compress_type": zipfile.ZIP_STORED,
251 })
252
253 def test_ZipWriteStr_large_file(self):
254 # zipfile.writestr() doesn't work when the str size is over 2GiB even with
255 # the workaround. We will only test the case of writing a string into a
256 # large archive.
257 long_string = get_2gb_string()
258 short_string = os.urandom(1024)
259 self._test_ZipWriteStr_large_file(long_string, short_string, {
260 "compress_type": zipfile.ZIP_DEFLATED,
261 })
262
263 def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
264 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
265 zinfo = zipfile.ZipInfo(filename="foo")
266 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
Tao Bao58c1b962015-05-20 09:32:18 -0700267
268 def test_bug21309935(self):
269 zip_file = tempfile.NamedTemporaryFile(delete=False)
270 zip_file_name = zip_file.name
271 zip_file.close()
272
273 try:
274 random_string = os.urandom(1024)
275 zip_file = zipfile.ZipFile(zip_file_name, "w")
276 # Default perms should be 0o644 when passing the filename.
277 common.ZipWriteStr(zip_file, "foo", random_string)
278 # Honor the specified perms.
279 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755)
280 # The perms in zinfo should be untouched.
281 zinfo = zipfile.ZipInfo(filename="baz")
282 zinfo.external_attr = 0o740 << 16
283 common.ZipWriteStr(zip_file, zinfo, random_string)
284 # Explicitly specified perms has the priority.
285 zinfo = zipfile.ZipInfo(filename="qux")
286 zinfo.external_attr = 0o700 << 16
287 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
288 common.ZipClose(zip_file)
289
290 self._verify(zip_file, zip_file_name, "foo", random_string,
291 expected_mode=0o644)
292 self._verify(zip_file, zip_file_name, "bar", random_string,
293 expected_mode=0o755)
294 self._verify(zip_file, zip_file_name, "baz", random_string,
295 expected_mode=0o740)
296 self._verify(zip_file, zip_file_name, "qux", random_string,
297 expected_mode=0o400)
298 finally:
299 os.remove(zip_file_name)
Tianjie Xu9c384d22017-06-20 17:00:55 -0700300
301class InstallRecoveryScriptFormatTest(unittest.TestCase):
302 """Check the format of install-recovery.sh
303
304 Its format should match between common.py and validate_target_files.py."""
305
306 def setUp(self):
307 self._tempdir = tempfile.mkdtemp()
308 # Create a dummy dict that contains the fstab info for boot&recovery.
309 self._info = {"fstab" : {}}
310 dummy_fstab = \
311 ["/dev/soc.0/by-name/boot /boot emmc defaults defaults",
312 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
313 self._info["fstab"] = common.LoadRecoveryFSTab(lambda x : "\n".join(x),
314 2, dummy_fstab)
Tianjie Xudf055582017-11-07 12:22:58 -0800315 # Construct the gzipped recovery.img and boot.img
316 self.recovery_data = bytearray([
317 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
318 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
319 0x08, 0x00, 0x00, 0x00
320 ])
321 # echo -n "boot" | gzip -f | hd
322 self.boot_data = bytearray([
323 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
324 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
325 ])
Tianjie Xu9c384d22017-06-20 17:00:55 -0700326
327 def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
328 loc = os.path.join(self._tempdir, prefix, name)
329 if not os.path.exists(os.path.dirname(loc)):
330 os.makedirs(os.path.dirname(loc))
331 with open(loc, "w+") as f:
332 f.write(data)
333
334 def test_full_recovery(self):
Tianjie Xudf055582017-11-07 12:22:58 -0800335 recovery_image = common.File("recovery.img", self.recovery_data);
336 boot_image = common.File("boot.img", self.boot_data);
Tianjie Xu9c384d22017-06-20 17:00:55 -0700337 self._info["full_recovery_image"] = "true"
338
339 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
340 recovery_image, boot_image, self._info)
341 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
342 self._info)
343
344 def test_recovery_from_boot(self):
Tianjie Xudf055582017-11-07 12:22:58 -0800345 recovery_image = common.File("recovery.img", self.recovery_data);
Tianjie Xu9c384d22017-06-20 17:00:55 -0700346 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
Tianjie Xudf055582017-11-07 12:22:58 -0800347 boot_image = common.File("boot.img", self.boot_data);
Tianjie Xu9c384d22017-06-20 17:00:55 -0700348 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
349
350 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
351 recovery_image, boot_image, self._info)
352 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
353 self._info)
354 # Validate 'recovery-from-boot' with bonus argument.
355 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM")
356 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
357 recovery_image, boot_image, self._info)
358 validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
359 self._info)
360
361 def tearDown(self):
362 shutil.rmtree(self._tempdir)