Add ota script support to generate partial updates
Allow the ota generation script to take a list of partitions;
and thus generating a partial ota update package. This allow the
testing of partial update functionality, e.g. system-only updates.
Bug: 170921953
Test: unit tests, generate and apply a partial update on coral with system, system_ext, vbmeta_system
Change-Id: I0d8f93806dfbf7b781ea911117b02fc5c0971434
diff --git a/tools/releasetools/ota_from_target_files.py b/tools/releasetools/ota_from_target_files.py
index 18b2b76..7dc648f 100755
--- a/tools/releasetools/ota_from_target_files.py
+++ b/tools/releasetools/ota_from_target_files.py
@@ -202,6 +202,10 @@
ones. Should only be used if caller knows it's safe to do so (e.g. all the
postinstall work is to dexopt apps and a data wipe will happen immediately
after). Only meaningful when generating A/B OTAs.
+
+ --partial "<PARTITION> [<PARTITION>[...]]"
+ Generate partial updates, overriding ab_partitions list with the given
+ list.
"""
from __future__ import print_function
@@ -257,6 +261,7 @@
OPTIONS.skip_postinstall = False
OPTIONS.skip_compatibility_check = False
OPTIONS.disable_fec_computation = False
+OPTIONS.partial = None
POSTINSTALL_CONFIG = 'META/postinstall_config.txt'
@@ -593,6 +598,48 @@
return (payload_offset, metadata_total)
+def UpdatesInfoForSpecialUpdates(content, partitions_filter,
+ delete_keys=None):
+ """ Updates info file for secondary payload generation, partial update, etc.
+
+ Scan each line in the info file, and remove the unwanted partitions from
+ the dynamic partition list in the related properties. e.g.
+ "super_google_dynamic_partitions_partition_list=system vendor product"
+ will become "super_google_dynamic_partitions_partition_list=system".
+
+ Args:
+ content: The content of the input info file. e.g. misc_info.txt.
+ partitions_filter: A function to filter the desired partitions from a given
+ list
+ delete_keys: A list of keys to delete in the info file
+
+ Returns:
+ A string of the updated info content.
+ """
+
+ output_list = []
+ # The suffix in partition_list variables that follows the name of the
+ # partition group.
+ list_suffix = 'partition_list'
+ for line in content.splitlines():
+ if line.startswith('#') or '=' not in line:
+ output_list.append(line)
+ continue
+ key, value = line.strip().split('=', 1)
+
+ if delete_keys and key in delete_keys:
+ pass
+ elif key.endswith(list_suffix):
+ partitions = value.split()
+ # TODO for partial update, partitions in the same group must be all
+ # updated or all omitted
+ partitions = filter(partitions_filter, partitions)
+ output_list.append('{}={}'.format(key, ' '.join(partitions)))
+ else:
+ output_list.append(line)
+ return '\n'.join(output_list)
+
+
def GetTargetFilesZipForSecondaryImages(input_file, skip_postinstall=False):
"""Returns a target-files.zip file for generating secondary payload.
@@ -614,44 +661,15 @@
"""
def GetInfoForSecondaryImages(info_file):
- """Updates info file for secondary payload generation.
-
- Scan each line in the info file, and remove the unwanted partitions from
- the dynamic partition list in the related properties. e.g.
- "super_google_dynamic_partitions_partition_list=system vendor product"
- will become "super_google_dynamic_partitions_partition_list=system".
-
- Args:
- info_file: The input info file. e.g. misc_info.txt.
-
- Returns:
- A string of the updated info content.
- """
-
- output_list = []
+ """Updates info file for secondary payload generation."""
with open(info_file) as f:
- lines = f.read().splitlines()
-
- # The suffix in partition_list variables that follows the name of the
- # partition group.
- LIST_SUFFIX = 'partition_list'
- for line in lines:
- if line.startswith('#') or '=' not in line:
- output_list.append(line)
- continue
- key, value = line.strip().split('=', 1)
- if key == 'dynamic_partition_list' or key.endswith(LIST_SUFFIX):
- partitions = value.split()
- partitions = [partition for partition in partitions if partition
- not in SECONDARY_PAYLOAD_SKIPPED_IMAGES]
- output_list.append('{}={}'.format(key, ' '.join(partitions)))
- elif key in ['virtual_ab', "virtual_ab_retrofit"]:
- # Remove virtual_ab flag from secondary payload so that OTA client
- # don't use snapshots for secondary update
- pass
- else:
- output_list.append(line)
- return '\n'.join(output_list)
+ content = f.read()
+ # Remove virtual_ab flag from secondary payload so that OTA client
+ # don't use snapshots for secondary update
+ delete_keys = ['virtual_ab', "virtual_ab_retrofit"]
+ return UpdatesInfoForSpecialUpdates(
+ content, lambda p: p not in SECONDARY_PAYLOAD_SKIPPED_IMAGES,
+ delete_keys)
target_file = common.MakeTempFile(prefix="targetfiles-", suffix=".zip")
target_zip = zipfile.ZipFile(target_file, 'w', allowZip64=True)
@@ -729,6 +747,76 @@
return target_file
+def GetTargetFilesZipForPartialUpdates(input_file, ab_partitions):
+ """Returns a target-files.zip for partial ota update package generation.
+
+ This function modifies ab_partitions list with the desired partitions before
+ calling the brillo_update_payload script. It also cleans up the reference to
+ the excluded partitions in the info file, e.g misc_info.txt.
+
+ Args:
+ input_file: The input target-files.zip filename.
+ ab_partitions: A list of partitions to include in the partial update
+
+ Returns:
+ The filename of target-files.zip used for partial ota update.
+ """
+
+ def AddImageForPartition(partition_name):
+ """Add the archive name for a given partition to the copy list."""
+ for prefix in ['IMAGES', 'RADIO']:
+ image_path = '{}/{}.img'.format(prefix, partition_name)
+ if image_path in namelist:
+ copy_entries.append(image_path)
+ map_path = '{}/{}.map'.format(prefix, partition_name)
+ if map_path in namelist:
+ copy_entries.append(map_path)
+ return
+
+ raise ValueError("Cannot find {} in input zipfile".format(partition_name))
+
+ with zipfile.ZipFile(input_file, allowZip64=True) as input_zip:
+ original_ab_partitions = input_zip.read(AB_PARTITIONS).decode().splitlines()
+ namelist = input_zip.namelist()
+
+ unrecognized_partitions = [partition for partition in ab_partitions if
+ partition not in original_ab_partitions]
+ if unrecognized_partitions:
+ raise ValueError("Unrecognized partitions when generating partial updates",
+ unrecognized_partitions)
+
+ logger.info("Generating partial updates for %s", ab_partitions)
+
+ copy_entries = ['META/update_engine_config.txt']
+ for partition_name in ab_partitions:
+ AddImageForPartition(partition_name)
+
+ # Use zip2zip to avoid extracting the zipfile.
+ partial_target_file = common.MakeTempFile(suffix='.zip')
+ cmd = ['zip2zip', '-i', input_file, '-o', partial_target_file]
+ cmd.extend(['{}:{}'.format(name, name) for name in copy_entries])
+ common.RunAndCheckOutput(cmd)
+
+ partial_target_zip = zipfile.ZipFile(partial_target_file, 'a',
+ allowZip64=True)
+ with zipfile.ZipFile(input_file, allowZip64=True) as input_zip:
+ common.ZipWriteStr(partial_target_zip, 'META/ab_partitions.txt',
+ '\n'.join(ab_partitions))
+ for info_file in ['META/misc_info.txt', DYNAMIC_PARTITION_INFO]:
+ if info_file not in input_zip.namelist():
+ logger.warning('Cannot find %s in input zipfile', info_file)
+ continue
+ content = input_zip.read(info_file).decode()
+ modified_info = UpdatesInfoForSpecialUpdates(
+ content, lambda p: p in ab_partitions)
+ common.ZipWriteStr(partial_target_zip, info_file, modified_info)
+
+ # TODO(xunchang) handle 'META/care_map.pb', 'META/postinstall_config.txt'
+ common.ZipClose(partial_target_zip)
+
+ return partial_target_file
+
+
def GetTargetFilesZipForRetrofitDynamicPartitions(input_file,
super_block_devices,
dynamic_partition_list):
@@ -837,10 +925,16 @@
target_info = common.BuildInfo(OPTIONS.info_dict, OPTIONS.oem_dicts)
source_info = None
+ additional_args = []
+
if OPTIONS.retrofit_dynamic_partitions:
target_file = GetTargetFilesZipForRetrofitDynamicPartitions(
target_file, target_info.get("super_block_devices").strip().split(),
target_info.get("dynamic_partition_list").strip().split())
+ elif OPTIONS.partial:
+ target_file = GetTargetFilesZipForPartialUpdates(target_file,
+ OPTIONS.partial)
+ additional_args += ["--is_partial_update", "true"]
elif OPTIONS.skip_postinstall:
target_file = GetTargetFilesZipWithoutPostinstallConfig(target_file)
# Target_file may have been modified, reparse ab_partitions
@@ -862,7 +956,7 @@
partition_timestamps = [
part.partition_name + ":" + part.version
for part in metadata.postcondition.partition_state]
- additional_args = ["--max_timestamp", max_timestamp]
+ additional_args += ["--max_timestamp", max_timestamp]
if partition_timestamps:
additional_args.extend(
["--partition_timestamps", ",".join(
@@ -1006,6 +1100,11 @@
OPTIONS.force_non_ab = True
elif o == "--boot_variable_file":
OPTIONS.boot_variable_file = a
+ elif o == "--partial":
+ partitions = a.split()
+ if not partitions:
+ raise ValueError("Cannot parse partitions in {}".format(a))
+ OPTIONS.partial = partitions
else:
return False
return True
@@ -1044,6 +1143,7 @@
"disable_fec_computation",
"force_non_ab",
"boot_variable_file=",
+ "partial=",
], extra_option_handler=option_handler)
if len(args) != 2:
@@ -1058,6 +1158,8 @@
# OTA package.
if OPTIONS.incremental_source is None:
raise ValueError("Cannot generate downgradable full OTAs")
+ if OPTIONS.partial:
+ raise ValueError("Cannot generate downgradable partial OTAs")
# Load the build info dicts from the zip directly or the extracted input
# directory. We don't need to unzip the entire target-files zips, because they
@@ -1072,6 +1174,10 @@
with zipfile.ZipFile(args[0], 'r', allowZip64=True) as input_zip:
OPTIONS.info_dict = common.LoadInfoDict(input_zip)
+ # TODO(xunchang) for retrofit and partial updates, maybe we should rebuild the
+ # target-file and reload the info_dict. So the info will be consistent with
+ # the modified target-file.
+
logger.info("--- target info ---")
common.DumpInfoDict(OPTIONS.info_dict)