Re-generate 4K boot OTAs using signed boot.img during signing process

Currently, dev option OTAs are generated using dev-key signed boot.img
On release-key devices, OTA will install successfully, but user would be
using dev-key signed boot image after reverting to 4K mode, and
subsequent OTAs would fail. This CL re-generates 4K boot OTA using
release-key signed boot.img , which allows normal OTAs after toggling
dev options.

Test: th
Bug: 354019928
Change-Id: I40811d6ed7a37f50edea77d245bf559b66da5a71
diff --git a/tools/releasetools/sign_target_files_apks.py b/tools/releasetools/sign_target_files_apks.py
index b8f848f..b485440 100755
--- a/tools/releasetools/sign_target_files_apks.py
+++ b/tools/releasetools/sign_target_files_apks.py
@@ -189,6 +189,8 @@
 from xml.etree import ElementTree
 
 import add_img_to_target_files
+import ota_from_raw_img
+import ota_utils
 import apex_utils
 import common
 import payload_signer
@@ -579,7 +581,61 @@
         filename.endswith("/prop.default")
 
 
-def ProcessTargetFiles(input_tf_zip: zipfile.ZipFile, output_tf_zip, misc_info,
+def RegenerateKernelPartitions(input_tf_zip: zipfile.ZipFile, output_tf_zip: zipfile.ZipFile, misc_info):
+  """Re-generate boot and dtbo partitions using new signing configuration"""
+  if OPTIONS.input_tmp is None:
+    OPTIONS.input_tmp = common.UnzipTemp(input_tf_zip.filename, [
+                                "*/boot.img", "*/dtbo.img"])
+  else:
+    common.UnzipToDir(input_tf_zip, OPTIONS.input_tmp, [
+                                "*/boot.img", "*/dtbo.img"])
+  unzip_dir = OPTIONS.input_tmp
+  image_dir = os.path.join(unzip_dir, "IMAGES")
+  shutil.rmtree(image_dir)
+  os.makedirs(image_dir, exist_ok=True)
+
+  boot_image = common.GetBootableImage(
+      "IMAGES/boot.img", "boot.img", unzip_dir, "BOOT", misc_info)
+  if boot_image:
+    boot_image.WriteToDir(unzip_dir)
+    boot_image = os.path.join(unzip_dir, boot_image.name)
+    common.ZipWrite(output_tf_zip, boot_image, "IMAGES/boot.img",
+                    compress_type=zipfile.ZIP_STORED)
+  add_img_to_target_files.AddDtbo(output_tf_zip)
+  return unzip_dir
+
+
+def RegenerateBootOTA(input_tf_zip: zipfile.ZipFile, output_tf_zip: zipfile.ZipFile, misc_info, filename, input_ota):
+  if filename not in ["VENDOR/boot_otas/boot_ota_4k.zip", "SYSTEM/boot_otas/boot_ota_4k.zip"]:
+    # We only need to re-generate 4K boot OTA, for other OTA packages
+    # simply copy as is
+    with input_tf_zip.open(filename, "r") as in_fp:
+      shutil.copyfileobj(in_fp, input_ota)
+      input_ota.flush()
+    return
+  timestamp = misc_info["build.prop"].GetProp(
+      "ro.system.build.date.utc")
+  unzip_dir = RegenerateKernelPartitions(
+      input_tf_zip, output_tf_zip, misc_info)
+  signed_boot_image = os.path.join(unzip_dir, "IMAGES/boot.img")
+  signed_dtbo_image = os.path.join(unzip_dir, "IMAGES/dtbo.img")
+
+  if not os.path.exists(signed_boot_image):
+    logger.warn("Need to re-generate boot OTA {} but failed to get signed boot image. 16K dev option will be impacted, after rolling back to 4K user would need to sideload/flash their device to continue receiving OTAs.")
+    return
+  logger.info(
+      "Re-generating boot OTA {} with timestamp {}".format(filename, timestamp))
+  args = ["ota_from_raw_img", "--package_key", OPTIONS.package_key,
+          "--max_timestamp", timestamp, "--output", input_ota.name]
+  if os.path.exists(signed_dtbo_image):
+    args.extend(["--partition_name", "boot,dtbo",
+                signed_boot_image, signed_dtbo_image])
+  else:
+    args.extend(["--partition_name", "boot", signed_boot_image])
+  ota_from_raw_img.main(args)
+
+
+def ProcessTargetFiles(input_tf_zip: zipfile.ZipFile, output_tf_zip: zipfile.ZipFile, misc_info,
                        apk_keys, apex_keys, key_passwords,
                        platform_api_level, codename_to_api_level_map,
                        compressed_extension):
@@ -593,6 +649,14 @@
     # Sets this to zero for targets without APK files.
     maxsize = 0
 
+  # Replace the AVB signing keys, if any.
+  ReplaceAvbSigningKeys(misc_info)
+  OPTIONS.info_dict = misc_info
+
+  # Rewrite the props in AVB signing args.
+  if misc_info.get('avb_enable') == 'true':
+    RewriteAvbProps(misc_info)
+
   for info in input_tf_zip.infolist():
     filename = info.filename
     if filename.startswith("IMAGES/"):
@@ -670,9 +734,9 @@
     elif filename.endswith(".zip") and IsEntryOtaPackage(input_tf_zip, filename):
       logger.info("Re-signing OTA package {}".format(filename))
       with tempfile.NamedTemporaryFile() as input_ota, tempfile.NamedTemporaryFile() as output_ota:
-        with input_tf_zip.open(filename, "r") as in_fp:
-          shutil.copyfileobj(in_fp, input_ota)
-          input_ota.flush()
+        RegenerateBootOTA(input_tf_zip, output_tf_zip,
+                          misc_info, filename, input_ota)
+
         SignOtaPackage(input_ota.name, output_ota.name)
         common.ZipWrite(output_tf_zip, output_ota.name, filename,
                         compress_type=zipfile.ZIP_STORED)
@@ -811,17 +875,18 @@
         common.ZipWrite(output_tf_zip, image.name, filename)
     # A non-APK file; copy it verbatim.
     else:
-      common.ZipWriteStr(output_tf_zip, out_info, data)
+      try:
+        entry = output_tf_zip.getinfo(filename)
+        if output_tf_zip.read(entry) != data:
+          logger.warn(
+              "Output zip contains duplicate entries for %s with different contents", filename)
+        continue
+      except KeyError:
+        common.ZipWriteStr(output_tf_zip, out_info, data)
 
   if OPTIONS.replace_ota_keys:
     ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info)
 
-  # Replace the AVB signing keys, if any.
-  ReplaceAvbSigningKeys(misc_info)
-
-  # Rewrite the props in AVB signing args.
-  if misc_info.get('avb_enable') == 'true':
-    RewriteAvbProps(misc_info)
 
   # Write back misc_info with the latest values.
   ReplaceMiscInfoTxt(input_tf_zip, output_tf_zip, misc_info)