Add ota script support to generate partial updates

Allow the ota generation script to take a list of partitions;
and thus generating a partial ota update package. This allow the
testing of partial update functionality, e.g. system-only updates.

Bug: 170921953
Test: unit tests, generate and apply a partial update on coral with system, system_ext, vbmeta_system
Change-Id: I0d8f93806dfbf7b781ea911117b02fc5c0971434
diff --git a/tools/releasetools/ota_from_target_files.py b/tools/releasetools/ota_from_target_files.py
index 18b2b76..7dc648f 100755
--- a/tools/releasetools/ota_from_target_files.py
+++ b/tools/releasetools/ota_from_target_files.py
@@ -202,6 +202,10 @@
       ones. Should only be used if caller knows it's safe to do so (e.g. all the
       postinstall work is to dexopt apps and a data wipe will happen immediately
       after). Only meaningful when generating A/B OTAs.
+
+  --partial "<PARTITION> [<PARTITION>[...]]"
+      Generate partial updates, overriding ab_partitions list with the given
+      list.
 """
 
 from __future__ import print_function
@@ -257,6 +261,7 @@
 OPTIONS.skip_postinstall = False
 OPTIONS.skip_compatibility_check = False
 OPTIONS.disable_fec_computation = False
+OPTIONS.partial = None
 
 
 POSTINSTALL_CONFIG = 'META/postinstall_config.txt'
@@ -593,6 +598,48 @@
     return (payload_offset, metadata_total)
 
 
+def UpdatesInfoForSpecialUpdates(content, partitions_filter,
+                                 delete_keys=None):
+  """ Updates info file for secondary payload generation, partial update, etc.
+
+    Scan each line in the info file, and remove the unwanted partitions from
+    the dynamic partition list in the related properties. e.g.
+    "super_google_dynamic_partitions_partition_list=system vendor product"
+    will become "super_google_dynamic_partitions_partition_list=system".
+
+  Args:
+    content: The content of the input info file. e.g. misc_info.txt.
+    partitions_filter: A function to filter the desired partitions from a given
+      list
+    delete_keys: A list of keys to delete in the info file
+
+  Returns:
+    A string of the updated info content.
+  """
+
+  output_list = []
+  # The suffix in partition_list variables that follows the name of the
+  # partition group.
+  list_suffix = 'partition_list'
+  for line in content.splitlines():
+    if line.startswith('#') or '=' not in line:
+      output_list.append(line)
+      continue
+    key, value = line.strip().split('=', 1)
+
+    if delete_keys and key in delete_keys:
+      pass
+    elif key.endswith(list_suffix):
+      partitions = value.split()
+      # TODO for partial update, partitions in the same group must be all
+      # updated or all omitted
+      partitions = filter(partitions_filter, partitions)
+      output_list.append('{}={}'.format(key, ' '.join(partitions)))
+    else:
+      output_list.append(line)
+  return '\n'.join(output_list)
+
+
 def GetTargetFilesZipForSecondaryImages(input_file, skip_postinstall=False):
   """Returns a target-files.zip file for generating secondary payload.
 
@@ -614,44 +661,15 @@
   """
 
   def GetInfoForSecondaryImages(info_file):
-    """Updates info file for secondary payload generation.
-
-    Scan each line in the info file, and remove the unwanted partitions from
-    the dynamic partition list in the related properties. e.g.
-    "super_google_dynamic_partitions_partition_list=system vendor product"
-    will become "super_google_dynamic_partitions_partition_list=system".
-
-    Args:
-      info_file: The input info file. e.g. misc_info.txt.
-
-    Returns:
-      A string of the updated info content.
-    """
-
-    output_list = []
+    """Updates info file for secondary payload generation."""
     with open(info_file) as f:
-      lines = f.read().splitlines()
-
-    # The suffix in partition_list variables that follows the name of the
-    # partition group.
-    LIST_SUFFIX = 'partition_list'
-    for line in lines:
-      if line.startswith('#') or '=' not in line:
-        output_list.append(line)
-        continue
-      key, value = line.strip().split('=', 1)
-      if key == 'dynamic_partition_list' or key.endswith(LIST_SUFFIX):
-        partitions = value.split()
-        partitions = [partition for partition in partitions if partition
-                      not in SECONDARY_PAYLOAD_SKIPPED_IMAGES]
-        output_list.append('{}={}'.format(key, ' '.join(partitions)))
-      elif key in ['virtual_ab', "virtual_ab_retrofit"]:
-        # Remove virtual_ab flag from secondary payload so that OTA client
-        # don't use snapshots for secondary update
-        pass
-      else:
-        output_list.append(line)
-    return '\n'.join(output_list)
+      content = f.read()
+    # Remove virtual_ab flag from secondary payload so that OTA client
+    # don't use snapshots for secondary update
+    delete_keys = ['virtual_ab', "virtual_ab_retrofit"]
+    return UpdatesInfoForSpecialUpdates(
+        content, lambda p: p not in SECONDARY_PAYLOAD_SKIPPED_IMAGES,
+        delete_keys)
 
   target_file = common.MakeTempFile(prefix="targetfiles-", suffix=".zip")
   target_zip = zipfile.ZipFile(target_file, 'w', allowZip64=True)
@@ -729,6 +747,76 @@
   return target_file
 
 
+def GetTargetFilesZipForPartialUpdates(input_file, ab_partitions):
+  """Returns a target-files.zip for partial ota update package generation.
+
+  This function modifies ab_partitions list with the desired partitions before
+  calling the brillo_update_payload script. It also cleans up the reference to
+  the excluded partitions in the info file, e.g misc_info.txt.
+
+  Args:
+    input_file: The input target-files.zip filename.
+    ab_partitions: A list of partitions to include in the partial update
+
+  Returns:
+    The filename of target-files.zip used for partial ota update.
+  """
+
+  def AddImageForPartition(partition_name):
+    """Add the archive name for a given partition to the copy list."""
+    for prefix in ['IMAGES', 'RADIO']:
+      image_path = '{}/{}.img'.format(prefix, partition_name)
+      if image_path in namelist:
+        copy_entries.append(image_path)
+        map_path = '{}/{}.map'.format(prefix, partition_name)
+        if map_path in namelist:
+          copy_entries.append(map_path)
+        return
+
+    raise ValueError("Cannot find {} in input zipfile".format(partition_name))
+
+  with zipfile.ZipFile(input_file, allowZip64=True) as input_zip:
+    original_ab_partitions = input_zip.read(AB_PARTITIONS).decode().splitlines()
+    namelist = input_zip.namelist()
+
+  unrecognized_partitions = [partition for partition in ab_partitions if
+                             partition not in original_ab_partitions]
+  if unrecognized_partitions:
+    raise ValueError("Unrecognized partitions when generating partial updates",
+                     unrecognized_partitions)
+
+  logger.info("Generating partial updates for %s", ab_partitions)
+
+  copy_entries = ['META/update_engine_config.txt']
+  for partition_name in ab_partitions:
+    AddImageForPartition(partition_name)
+
+  # Use zip2zip to avoid extracting the zipfile.
+  partial_target_file = common.MakeTempFile(suffix='.zip')
+  cmd = ['zip2zip', '-i', input_file, '-o', partial_target_file]
+  cmd.extend(['{}:{}'.format(name, name) for name in copy_entries])
+  common.RunAndCheckOutput(cmd)
+
+  partial_target_zip = zipfile.ZipFile(partial_target_file, 'a',
+                                       allowZip64=True)
+  with zipfile.ZipFile(input_file, allowZip64=True) as input_zip:
+    common.ZipWriteStr(partial_target_zip, 'META/ab_partitions.txt',
+                       '\n'.join(ab_partitions))
+    for info_file in ['META/misc_info.txt', DYNAMIC_PARTITION_INFO]:
+      if info_file not in input_zip.namelist():
+        logger.warning('Cannot find %s in input zipfile', info_file)
+        continue
+      content = input_zip.read(info_file).decode()
+      modified_info = UpdatesInfoForSpecialUpdates(
+          content, lambda p: p in ab_partitions)
+      common.ZipWriteStr(partial_target_zip, info_file, modified_info)
+
+    # TODO(xunchang) handle 'META/care_map.pb', 'META/postinstall_config.txt'
+  common.ZipClose(partial_target_zip)
+
+  return partial_target_file
+
+
 def GetTargetFilesZipForRetrofitDynamicPartitions(input_file,
                                                   super_block_devices,
                                                   dynamic_partition_list):
@@ -837,10 +925,16 @@
     target_info = common.BuildInfo(OPTIONS.info_dict, OPTIONS.oem_dicts)
     source_info = None
 
+  additional_args = []
+
   if OPTIONS.retrofit_dynamic_partitions:
     target_file = GetTargetFilesZipForRetrofitDynamicPartitions(
         target_file, target_info.get("super_block_devices").strip().split(),
         target_info.get("dynamic_partition_list").strip().split())
+  elif OPTIONS.partial:
+    target_file = GetTargetFilesZipForPartialUpdates(target_file,
+                                                     OPTIONS.partial)
+    additional_args += ["--is_partial_update", "true"]
   elif OPTIONS.skip_postinstall:
     target_file = GetTargetFilesZipWithoutPostinstallConfig(target_file)
   # Target_file may have been modified, reparse ab_partitions
@@ -862,7 +956,7 @@
     partition_timestamps = [
         part.partition_name + ":" + part.version
         for part in metadata.postcondition.partition_state]
-  additional_args = ["--max_timestamp", max_timestamp]
+  additional_args += ["--max_timestamp", max_timestamp]
   if partition_timestamps:
     additional_args.extend(
         ["--partition_timestamps", ",".join(
@@ -1006,6 +1100,11 @@
       OPTIONS.force_non_ab = True
     elif o == "--boot_variable_file":
       OPTIONS.boot_variable_file = a
+    elif o == "--partial":
+      partitions = a.split()
+      if not partitions:
+        raise ValueError("Cannot parse partitions in {}".format(a))
+      OPTIONS.partial = partitions
     else:
       return False
     return True
@@ -1044,6 +1143,7 @@
                                  "disable_fec_computation",
                                  "force_non_ab",
                                  "boot_variable_file=",
+                                 "partial=",
                              ], extra_option_handler=option_handler)
 
   if len(args) != 2:
@@ -1058,6 +1158,8 @@
     # OTA package.
     if OPTIONS.incremental_source is None:
       raise ValueError("Cannot generate downgradable full OTAs")
+    if OPTIONS.partial:
+      raise ValueError("Cannot generate downgradable partial OTAs")
 
   # Load the build info dicts from the zip directly or the extracted input
   # directory. We don't need to unzip the entire target-files zips, because they
@@ -1072,6 +1174,10 @@
     with zipfile.ZipFile(args[0], 'r', allowZip64=True) as input_zip:
       OPTIONS.info_dict = common.LoadInfoDict(input_zip)
 
+  # TODO(xunchang) for retrofit and partial updates, maybe we should rebuild the
+  # target-file and reload the info_dict. So the info will be consistent with
+  # the modified target-file.
+
   logger.info("--- target info ---")
   common.DumpInfoDict(OPTIONS.info_dict)