Resolve conflict AVB rollback index location

Add an `--avb-resolve-rollback-index-location-conflict`
option in merge_target_files. When this option is set,
the merge tool will resolve conflicting index locations
by assigning the smallest unused index location.

This is to support merging system and vendor target files
from two different targets. In this case, the two target
files may have conflict rollback index location because
they were built independently.

Test: atest releasetools_test
Test: validate_target_files *-target_files-*.zip
Test: merge_target_files &&
        add_img_to_target_files &&
        img_from_target_files &&
        flash device
Bug: 300604688
Change-Id: Ibd18ef2a9f3784157fe17966f5364c3c81c9bd9f
diff --git a/tools/releasetools/add_img_to_target_files.py b/tools/releasetools/add_img_to_target_files.py
index 31f8736..fc4ab68 100644
--- a/tools/releasetools/add_img_to_target_files.py
+++ b/tools/releasetools/add_img_to_target_files.py
@@ -42,6 +42,10 @@
   --is_signing
       Skip building & adding the images for "userdata" and "cache" if we
       are signing the target files.
+
+  --avb-resolve-rollback-index-location-conflict
+      If provided, resolve the conflict AVB rollback index location when
+      necessary.
 """
 
 from __future__ import print_function
@@ -81,6 +85,7 @@
 OPTIONS.rebuild_recovery = False
 OPTIONS.replace_updated_files_list = []
 OPTIONS.is_signing = False
+OPTIONS.avb_resolve_rollback_index_location_conflict = False
 
 
 def ParseAvbFooter(img_path) -> avbtool.AvbFooter:
@@ -682,7 +687,8 @@
     logger.info("%s.img already exists; not rebuilding...", name)
     return img.name
 
-  common.BuildVBMeta(img.name, partitions, name, needed_partitions)
+  common.BuildVBMeta(img.name, partitions, name, needed_partitions,
+                     OPTIONS.avb_resolve_rollback_index_location_conflict)
   img.Write()
   return img.name
 
@@ -1224,6 +1230,8 @@
                        " please switch to AVB")
     elif o == "--is_signing":
       OPTIONS.is_signing = True
+    elif o == "--avb_resolve_rollback_index_location_conflict":
+      OPTIONS.avb_resolve_rollback_index_location_conflict = True
     else:
       return False
     return True
@@ -1233,7 +1241,8 @@
       extra_long_opts=["add_missing", "rebuild_recovery",
                        "replace_verity_public_key=",
                        "replace_verity_private_key=",
-                       "is_signing"],
+                       "is_signing",
+                       "avb_resolve_rollback_index_location_conflict"],
       extra_option_handler=option_handler)
 
   if len(args) != 1:
diff --git a/tools/releasetools/common.py b/tools/releasetools/common.py
index c5a3fe7..f9a471a 100644
--- a/tools/releasetools/common.py
+++ b/tools/releasetools/common.py
@@ -39,6 +39,7 @@
 import threading
 import time
 import zipfile
+from dataclasses import dataclass
 from genericpath import isdir
 from hashlib import sha1, sha256
 
@@ -144,6 +145,19 @@
 RAMDISK_BUILD_PROP_REL_PATHS = ['system/etc/ramdisk/build.prop']
 
 
+@dataclass
+class AvbChainedPartitionArg:
+  """The required arguments for avbtool --chain_partition."""
+  partition: str
+  rollback_index_location: int
+  pubkey_path: str
+
+  def to_string(self):
+    """Convert to string command arguments."""
+    return '{}:{}:{}'.format(
+        self.partition, self.rollback_index_location, self.pubkey_path)
+
+
 class ErrorCode(object):
   """Define error_codes for failures that happen during the actual
   update package installation.
@@ -1452,7 +1466,7 @@
 
 
 def GetAvbPartitionArg(partition, image, info_dict=None):
-  """Returns the VBMeta arguments for partition.
+  """Returns the VBMeta arguments for one partition.
 
   It sets up the VBMeta argument by including the partition descriptor from the
   given 'image', or by configuring the partition as a chained partition.
@@ -1464,7 +1478,7 @@
         OPTIONS.info_dict if None has been given.
 
   Returns:
-    A list of VBMeta arguments.
+    A list of VBMeta arguments for one partition.
   """
   if info_dict is None:
     info_dict = OPTIONS.info_dict
@@ -1487,6 +1501,61 @@
   return [AVB_ARG_NAME_CHAIN_PARTITION, chained_partition_arg]
 
 
+def GetAvbPartitionsArg(partitions,
+                        resolve_rollback_index_location_conflict=False,
+                        info_dict=None):
+  """Returns the VBMeta arguments for all AVB partitions.
+
+  It sets up the VBMeta argument by calling GetAvbPartitionArg of all
+  partitions.
+
+  Args:
+    partitions: A dict of all AVB partitions.
+    resolve_rollback_index_location_conflict: If true, resolve conflicting avb
+        rollback index locations by assigning the smallest unused value.
+    info_dict: A dict returned by common.LoadInfoDict().
+
+  Returns:
+    A list of VBMeta arguments for all partitions.
+  """
+  # An AVB partition will be linked into a vbmeta partition by either
+  # AVB_ARG_NAME_INCLUDE_DESC_FROM_IMG or AVB_ARG_NAME_CHAIN_PARTITION, there
+  # should be no other cases.
+  valid_args = {
+      AVB_ARG_NAME_INCLUDE_DESC_FROM_IMG: [],
+      AVB_ARG_NAME_CHAIN_PARTITION: []
+  }
+
+  for partition, path in partitions.items():
+    avb_partition_arg = GetAvbPartitionArg(partition, path, info_dict)
+    if not avb_partition_arg:
+      continue
+    arg_name, arg_value = avb_partition_arg
+    assert arg_name in valid_args
+    valid_args[arg_name].append(arg_value)
+
+  # Copy the arguments for non-chained AVB partitions directly without
+  # intervention.
+  avb_args = []
+  for image in valid_args[AVB_ARG_NAME_INCLUDE_DESC_FROM_IMG]:
+    avb_args.extend([AVB_ARG_NAME_INCLUDE_DESC_FROM_IMG, image])
+
+  # Handle chained AVB partitions. The rollback index location might be
+  # adjusted if two partitions use the same value. This may happen when mixing
+  # a shared system image with other vendor images.
+  used_index_loc = set()
+  for chained_partition_arg in valid_args[AVB_ARG_NAME_CHAIN_PARTITION]:
+    if resolve_rollback_index_location_conflict:
+      while chained_partition_arg.rollback_index_location in used_index_loc:
+        chained_partition_arg.rollback_index_location += 1
+
+    used_index_loc.add(chained_partition_arg.rollback_index_location)
+    avb_args.extend([AVB_ARG_NAME_CHAIN_PARTITION,
+                     chained_partition_arg.to_string()])
+
+  return avb_args
+
+
 def GetAvbChainedPartitionArg(partition, info_dict, key=None):
   """Constructs and returns the arg to build or verify a chained partition.
 
@@ -1498,8 +1567,8 @@
         the key listed in info_dict.
 
   Returns:
-    A string of form "partition:rollback_index_location:key" that can be used to
-    build or verify vbmeta image.
+    An AvbChainedPartitionArg object with rollback_index_location and
+    pubkey_path that can be used to build or verify vbmeta image.
   """
   if key is None:
     key = info_dict["avb_" + partition + "_key_path"]
@@ -1507,7 +1576,10 @@
   pubkey_path = ExtractAvbPublicKey(info_dict["avb_avbtool"], key)
   rollback_index_location = info_dict[
       "avb_" + partition + "_rollback_index_location"]
-  return "{}:{}:{}".format(partition, rollback_index_location, pubkey_path)
+  return AvbChainedPartitionArg(
+      partition=partition,
+      rollback_index_location=int(rollback_index_location),
+      pubkey_path=pubkey_path)
 
 
 def _HasGkiCertificationArgs():
@@ -1554,7 +1626,8 @@
   return data
 
 
-def BuildVBMeta(image_path, partitions, name, needed_partitions):
+def BuildVBMeta(image_path, partitions, name, needed_partitions,
+                resolve_rollback_index_location_conflict=False):
   """Creates a VBMeta image.
 
   It generates the requested VBMeta image. The requested image could be for
@@ -1569,6 +1642,8 @@
     name: Name of the VBMeta partition, e.g. 'vbmeta', 'vbmeta_system'.
     needed_partitions: Partitions whose descriptors should be included into the
         generated VBMeta image.
+    resolve_rollback_index_location_conflict: If true, resolve conflicting avb
+        rollback index locations by assigning the smallest unused value.
 
   Raises:
     AssertionError: On invalid input args.
@@ -1582,6 +1657,7 @@
   custom_avb_partitions = ["vbmeta_" + part for part in OPTIONS.info_dict.get(
       "avb_custom_vbmeta_images_partition_list", "").strip().split()]
 
+  avb_partitions = {}
   for partition, path in partitions.items():
     if partition not in needed_partitions:
       continue
@@ -1592,7 +1668,9 @@
         'Unknown partition: {}'.format(partition)
     assert os.path.exists(path), \
         'Failed to find {} for {}'.format(path, partition)
-    cmd.extend(GetAvbPartitionArg(partition, path))
+    avb_partitions[partition] = path
+  cmd.extend(GetAvbPartitionsArg(avb_partitions,
+                                 resolve_rollback_index_location_conflict))
 
   args = OPTIONS.info_dict.get("avb_{}_args".format(name))
   if args and args.strip():
diff --git a/tools/releasetools/merge/merge_target_files.py b/tools/releasetools/merge/merge_target_files.py
index a0d3a1c..6bf1b49 100755
--- a/tools/releasetools/merge/merge_target_files.py
+++ b/tools/releasetools/merge/merge_target_files.py
@@ -90,6 +90,10 @@
   --keep-tmp
       Keep tempoary files for debugging purposes.
 
+  --avb-resolve-rollback-index-location-conflict
+      If provided, resolve the conflict AVB rollback index location when
+      necessary.
+
   The following only apply when using the VSDK to perform dexopt on vendor apps:
 
   --framework-dexpreopt-config
@@ -144,6 +148,7 @@
 OPTIONS.vendor_otatools = None
 OPTIONS.rebuild_sepolicy = False
 OPTIONS.keep_tmp = False
+OPTIONS.avb_resolve_rollback_index_location_conflict = False
 OPTIONS.framework_dexpreopt_config = None
 OPTIONS.framework_dexpreopt_tools = None
 OPTIONS.vendor_dexpreopt_config = None
@@ -230,6 +235,8 @@
   ]
   if OPTIONS.rebuild_recovery:
     add_img_args.append('--rebuild_recovery')
+  if OPTIONS.avb_resolve_rollback_index_location_conflict:
+    add_img_args.append('--avb_resolve_rollback_index_location_conflict')
   add_img_args.append(target_files_dir)
 
   add_img_to_target_files.main(add_img_args)
@@ -554,6 +561,8 @@
       OPTIONS.rebuild_sepolicy = True
     elif o == '--keep-tmp':
       OPTIONS.keep_tmp = True
+    elif o == '--avb-resolve-rollback-index-location-conflict':
+      OPTIONS.avb_resolve_rollback_index_location_conflict = True
     elif o == '--framework-dexpreopt-config':
       OPTIONS.framework_dexpreopt_config = a
     elif o == '--framework-dexpreopt-tools':
@@ -593,6 +602,7 @@
           'vendor-otatools=',
           'rebuild-sepolicy',
           'keep-tmp',
+          'avb-resolve-rollback-index-location-conflict',
       ],
       extra_option_handler=option_handler)
 
diff --git a/tools/releasetools/test_common.py b/tools/releasetools/test_common.py
index 2bd4f36..14f0e88 100644
--- a/tools/releasetools/test_common.py
+++ b/tools/releasetools/test_common.py
@@ -1299,11 +1299,11 @@
         'avb_system_key_path': pubkey,
         'avb_system_rollback_index_location': 2,
     }
-    args = common.GetAvbChainedPartitionArg('system', info_dict).split(':')
-    self.assertEqual(3, len(args))
-    self.assertEqual('system', args[0])
-    self.assertEqual('2', args[1])
-    self.assertTrue(os.path.exists(args[2]))
+    chained_partition_args = common.GetAvbChainedPartitionArg(
+        'system', info_dict)
+    self.assertEqual('system', chained_partition_args.partition)
+    self.assertEqual(2, chained_partition_args.rollback_index_location)
+    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
 
   @test_utils.SkipIfExternalToolsUnavailable()
   def test_GetAvbChainedPartitionArg_withPrivateKey(self):
@@ -1313,11 +1313,11 @@
         'avb_product_key_path': key,
         'avb_product_rollback_index_location': 2,
     }
-    args = common.GetAvbChainedPartitionArg('product', info_dict).split(':')
-    self.assertEqual(3, len(args))
-    self.assertEqual('product', args[0])
-    self.assertEqual('2', args[1])
-    self.assertTrue(os.path.exists(args[2]))
+    chained_partition_args = common.GetAvbChainedPartitionArg(
+        'product', info_dict)
+    self.assertEqual('product', chained_partition_args.partition)
+    self.assertEqual(2, chained_partition_args.rollback_index_location)
+    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
 
   @test_utils.SkipIfExternalToolsUnavailable()
   def test_GetAvbChainedPartitionArg_withSpecifiedKey(self):
@@ -1327,12 +1327,11 @@
         'avb_system_rollback_index_location': 2,
     }
     pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
-    args = common.GetAvbChainedPartitionArg(
-        'system', info_dict, pubkey).split(':')
-    self.assertEqual(3, len(args))
-    self.assertEqual('system', args[0])
-    self.assertEqual('2', args[1])
-    self.assertTrue(os.path.exists(args[2]))
+    chained_partition_args = common.GetAvbChainedPartitionArg(
+        'system', info_dict, pubkey)
+    self.assertEqual('system', chained_partition_args.partition)
+    self.assertEqual(2, chained_partition_args.rollback_index_location)
+    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
 
   @test_utils.SkipIfExternalToolsUnavailable()
   def test_GetAvbChainedPartitionArg_invalidKey(self):
@@ -1600,11 +1599,10 @@
     cmd = common.GetAvbPartitionArg('vendor', '/path/to/vendor.img', info_dict)
     self.assertEqual(2, len(cmd))
     self.assertEqual(common.AVB_ARG_NAME_CHAIN_PARTITION, cmd[0])
-    chained_partition_args = cmd[1].split(':')
-    self.assertEqual(3, len(chained_partition_args))
-    self.assertEqual('vendor', chained_partition_args[0])
-    self.assertEqual('5', chained_partition_args[1])
-    self.assertTrue(os.path.exists(chained_partition_args[2]))
+    chained_partition_args = cmd[1]
+    self.assertEqual('vendor', chained_partition_args.partition)
+    self.assertEqual(5, chained_partition_args.rollback_index_location)
+    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
 
   @test_utils.SkipIfExternalToolsUnavailable()
   def test_AppendVBMetaArgsForPartition_recoveryAsChainedPartition_nonAb(self):
@@ -1633,11 +1631,10 @@
         'recovery', '/path/to/recovery.img', info_dict)
     self.assertEqual(2, len(cmd))
     self.assertEqual(common.AVB_ARG_NAME_CHAIN_PARTITION, cmd[0])
-    chained_partition_args = cmd[1].split(':')
-    self.assertEqual(3, len(chained_partition_args))
-    self.assertEqual('recovery', chained_partition_args[0])
-    self.assertEqual('3', chained_partition_args[1])
-    self.assertTrue(os.path.exists(chained_partition_args[2]))
+    chained_partition_args = cmd[1]
+    self.assertEqual('recovery', chained_partition_args.partition)
+    self.assertEqual(3, chained_partition_args.rollback_index_location)
+    self.assertTrue(os.path.exists(chained_partition_args.pubkey_path))
 
   def test_GenerateGkiCertificate_KeyPathNotFound(self):
     pubkey = os.path.join(self.testdata_dir, 'no_testkey_gki.pem')
diff --git a/tools/releasetools/validate_target_files.py b/tools/releasetools/validate_target_files.py
index beb9e75..82b3107 100755
--- a/tools/releasetools/validate_target_files.py
+++ b/tools/releasetools/validate_target_files.py
@@ -430,7 +430,8 @@
         key_file = options.get(key_name, info_dict[key_name])
         chained_partition_arg = common.GetAvbChainedPartitionArg(
             partition, info_dict, key_file)
-        cmd.extend(['--expected_chain_partition', chained_partition_arg])
+        cmd.extend(['--expected_chain_partition',
+                    chained_partition_arg.to_string()])
 
     # Handle the boot image with a non-default name, e.g. boot-5.4.img
     boot_images = info_dict.get("boot_images")