Refactor ota_utils to reduce dependency on global OPTIONS
Move some depency on global OPTIONS to parameters. This makes it easier
for other modules to re-use these functions w/o having to modify OPTIONS
variables before/after calling.
Test: th
Bug: 227536004
Change-Id: I3bab292f65a4222f0c9502bcf04fa7a85f8124c6
diff --git a/tools/releasetools/ota_from_target_files.py b/tools/releasetools/ota_from_target_files.py
index ab65ee2..12444cd 100755
--- a/tools/releasetools/ota_from_target_files.py
+++ b/tools/releasetools/ota_from_target_files.py
@@ -263,7 +263,7 @@
import common
import ota_utils
from ota_utils import (UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata,
- PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME, StreamingPropertyFiles, AbOtaPropertyFiles)
+ PayloadGenerator, SECURITY_PATCH_LEVEL_PROP_NAME)
from common import IsSparseImage
import target_files_diff
from check_target_files_vintf import CheckVintfIfTrebleEnabled
@@ -893,7 +893,7 @@
# Metadata to comply with Android OTA package format.
metadata = GetPackageMetadata(target_info, source_info)
# Generate payload.
- payload = PayloadGenerator()
+ payload = PayloadGenerator(OPTIONS.include_secondary, OPTIONS.wipe_user_data)
partition_timestamps_flags = []
# Enforce a max timestamp this payload can be applied on top of.
@@ -1010,15 +1010,8 @@
# FinalizeMetadata().
common.ZipClose(output_zip)
- # AbOtaPropertyFiles intends to replace StreamingPropertyFiles, as it covers
- # all the info of the latter. However, system updaters and OTA servers need to
- # take time to switch to the new flag. We keep both of the flags for
- # P-timeframe, and will remove StreamingPropertyFiles in later release.
- needed_property_files = (
- AbOtaPropertyFiles(),
- StreamingPropertyFiles(),
- )
- FinalizeMetadata(metadata, staging_file, output_file, needed_property_files)
+ FinalizeMetadata(metadata, staging_file, output_file,
+ package_key=OPTIONS.package_key)
def main(argv):
diff --git a/tools/releasetools/ota_utils.py b/tools/releasetools/ota_utils.py
index 4ff5027..e1529c7 100644
--- a/tools/releasetools/ota_utils.py
+++ b/tools/releasetools/ota_utils.py
@@ -48,7 +48,7 @@
SECURITY_PATCH_LEVEL_PROP_NAME = "ro.build.version.security_patch"
-def FinalizeMetadata(metadata, input_file, output_file, needed_property_files):
+def FinalizeMetadata(metadata, input_file, output_file, needed_property_files=None, package_key=None, pw=None):
"""Finalizes the metadata and signs an A/B OTA package.
In order to stream an A/B OTA package, we need 'ota-streaming-property-files'
@@ -66,8 +66,21 @@
input_file: The input ZIP filename that doesn't contain the package METADATA
entry yet.
output_file: The final output ZIP filename.
- needed_property_files: The list of PropertyFiles' to be generated.
+ needed_property_files: The list of PropertyFiles' to be generated. Default is [AbOtaPropertyFiles(), StreamingPropertyFiles()]
+ package_key: The key used to sign this OTA package
+ pw: Password for the package_key
"""
+ no_signing = package_key is None
+
+ if needed_property_files is None:
+ # AbOtaPropertyFiles intends to replace StreamingPropertyFiles, as it covers
+ # all the info of the latter. However, system updaters and OTA servers need to
+ # take time to switch to the new flag. We keep both of the flags for
+ # P-timeframe, and will remove StreamingPropertyFiles in later release.
+ needed_property_files = (
+ AbOtaPropertyFiles(),
+ StreamingPropertyFiles(),
+ )
def ComputeAllPropertyFiles(input_file, needed_property_files):
# Write the current metadata entry with placeholders.
@@ -83,11 +96,11 @@
WriteMetadata(metadata, output_zip)
ZipClose(output_zip)
- if OPTIONS.no_signing:
+ if no_signing:
return input_file
prelim_signing = MakeTempFile(suffix='.zip')
- SignOutput(input_file, prelim_signing)
+ SignOutput(input_file, prelim_signing, package_key, pw)
return prelim_signing
def FinalizeAllPropertyFiles(prelim_signing, needed_property_files):
@@ -122,10 +135,10 @@
ZipClose(output_zip)
# Re-sign the package after updating the metadata entry.
- if OPTIONS.no_signing:
+ if no_signing:
shutil.copy(prelim_signing, output_file)
else:
- SignOutput(prelim_signing, output_file)
+ SignOutput(prelim_signing, output_file, package_key, pw)
# Reopen the final signed zip to double check the streaming metadata.
with zipfile.ZipFile(output_file, allowZip64=True) as output_zip:
@@ -578,7 +591,7 @@
else:
tokens.append(ComputeEntryOffsetSize(METADATA_NAME))
if METADATA_PROTO_NAME in zip_file.namelist():
- tokens.append(ComputeEntryOffsetSize(METADATA_PROTO_NAME))
+ tokens.append(ComputeEntryOffsetSize(METADATA_PROTO_NAME))
return ','.join(tokens)
@@ -600,10 +613,13 @@
return []
-def SignOutput(temp_zip_name, output_zip_name):
- pw = OPTIONS.key_passwords[OPTIONS.package_key]
+def SignOutput(temp_zip_name, output_zip_name, package_key=None, pw=None):
+ if package_key is None:
+ package_key = OPTIONS.package_key
+ if pw is None and OPTIONS.key_passwords:
+ pw = OPTIONS.key_passwords[package_key]
- SignFile(temp_zip_name, output_zip_name, OPTIONS.package_key, pw,
+ SignFile(temp_zip_name, output_zip_name, package_key, pw,
whole_file=True)
@@ -715,7 +731,7 @@
SECONDARY_PAYLOAD_BIN = 'secondary/payload.bin'
SECONDARY_PAYLOAD_PROPERTIES_TXT = 'secondary/payload_properties.txt'
- def __init__(self, secondary=False):
+ def __init__(self, secondary=False, wipe_user_data=False):
"""Initializes a Payload instance.
Args:
@@ -724,6 +740,7 @@
self.payload_file = None
self.payload_properties = None
self.secondary = secondary
+ self.wipe_user_data = wipe_user_data
def _Run(self, cmd): # pylint: disable=no-self-use
# Don't pipe (buffer) the output if verbose is set. Let
@@ -785,8 +802,8 @@
self._Run(cmd)
# 2. Sign the hashes.
- signed_payload_sig_file = payload_signer.Sign(payload_sig_file)
- signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
+ signed_payload_sig_file = payload_signer.SignHashFile(payload_sig_file)
+ signed_metadata_sig_file = payload_signer.SignHashFile(metadata_sig_file)
# 3. Insert the signatures back into the payload file.
signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
@@ -799,24 +816,7 @@
"--payload_signature_file", signed_payload_sig_file]
self._Run(cmd)
- # 4. Dump the signed payload properties.
- properties_file = common.MakeTempFile(prefix="payload-properties-",
- suffix=".txt")
- cmd = ["brillo_update_payload", "properties",
- "--payload", signed_payload_file,
- "--properties_file", properties_file]
- self._Run(cmd)
-
- if self.secondary:
- with open(properties_file, "a") as f:
- f.write("SWITCH_SLOT_ON_REBOOT=0\n")
-
- if OPTIONS.wipe_user_data:
- with open(properties_file, "a") as f:
- f.write("POWERWASH=1\n")
-
self.payload_file = signed_payload_file
- self.payload_properties = properties_file
def WriteToZip(self, output_zip):
"""Writes the payload to the given zip.
@@ -825,7 +825,23 @@
output_zip: The output ZipFile instance.
"""
assert self.payload_file is not None
- assert self.payload_properties is not None
+ # 4. Dump the signed payload properties.
+ properties_file = common.MakeTempFile(prefix="payload-properties-",
+ suffix=".txt")
+ cmd = ["brillo_update_payload", "properties",
+ "--payload", self.payload_file,
+ "--properties_file", properties_file]
+ self._Run(cmd)
+
+ if self.secondary:
+ with open(properties_file, "a") as f:
+ f.write("SWITCH_SLOT_ON_REBOOT=0\n")
+
+ if self.wipe_user_data:
+ with open(properties_file, "a") as f:
+ f.write("POWERWASH=1\n")
+
+ self.payload_properties = properties_file
if self.secondary:
payload_arcname = PayloadGenerator.SECONDARY_PAYLOAD_BIN
@@ -946,6 +962,6 @@
manifest_size = header[2]
metadata_signature_size = header[3]
metadata_total = 24 + manifest_size + metadata_signature_size
- assert metadata_total < payload_size
+ assert metadata_total <= payload_size
return (payload_offset, metadata_total)
diff --git a/tools/releasetools/payload_signer.py b/tools/releasetools/payload_signer.py
index 6a643de..4f342ac 100644
--- a/tools/releasetools/payload_signer.py
+++ b/tools/releasetools/payload_signer.py
@@ -81,7 +81,40 @@
signature_size)
return int(signature_size)
- def Sign(self, in_file):
+ @staticmethod
+ def _Run(cmd):
+ common.RunAndCheckOutput(cmd, stdout=None, stderr=None)
+
+ def SignPayload(self, unsigned_payload):
+
+ # 1. Generate hashes of the payload and metadata files.
+ payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
+ metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
+ cmd = ["brillo_update_payload", "hash",
+ "--unsigned_payload", unsigned_payload,
+ "--signature_size", str(self.maximum_signature_size),
+ "--metadata_hash_file", metadata_sig_file,
+ "--payload_hash_file", payload_sig_file]
+ self._Run(cmd)
+
+ # 2. Sign the hashes.
+ signed_payload_sig_file = self.SignHashFile(payload_sig_file)
+ signed_metadata_sig_file = self.SignHashFile(metadata_sig_file)
+
+ # 3. Insert the signatures back into the payload file.
+ signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
+ suffix=".bin")
+ cmd = ["brillo_update_payload", "sign",
+ "--unsigned_payload", unsigned_payload,
+ "--payload", signed_payload_file,
+ "--signature_size", str(self.maximum_signature_size),
+ "--metadata_signature_file", signed_metadata_sig_file,
+ "--payload_signature_file", signed_payload_sig_file]
+ self._Run(cmd)
+ return signed_payload_file
+
+
+ def SignHashFile(self, in_file):
"""Signs the given input file. Returns the output filename."""
out_file = common.MakeTempFile(prefix="signed-", suffix=".bin")
cmd = [self.signer] + self.signer_args + ['-in', in_file, '-out', out_file]
diff --git a/tools/releasetools/test_ota_from_target_files.py b/tools/releasetools/test_ota_from_target_files.py
index 161bec3..ad0f7a8 100644
--- a/tools/releasetools/test_ota_from_target_files.py
+++ b/tools/releasetools/test_ota_from_target_files.py
@@ -1030,7 +1030,7 @@
0, proc.returncode,
'Failed to run brillo_update_payload:\n{}'.format(stdoutdata))
- signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
+ signed_metadata_sig_file = payload_signer.SignHashFile(metadata_sig_file)
# Finally we can compare the two signatures.
with open(signed_metadata_sig_file, 'rb') as verify_fp:
@@ -1170,7 +1170,7 @@
def test_Sign(self):
payload_signer = PayloadSigner()
input_file = os.path.join(self.testdata_dir, self.SIGFILE)
- signed_file = payload_signer.Sign(input_file)
+ signed_file = payload_signer.SignHashFile(input_file)
verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
self._assertFilesEqual(verify_file, signed_file)
@@ -1184,7 +1184,7 @@
payload_signer = PayloadSigner(
OPTIONS.package_key, OPTIONS.private_key_suffix, payload_signer="openssl")
input_file = os.path.join(self.testdata_dir, self.SIGFILE)
- signed_file = payload_signer.Sign(input_file)
+ signed_file = payload_signer.SignHashFile(input_file)
verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
self._assertFilesEqual(verify_file, signed_file)
@@ -1199,7 +1199,7 @@
payload_signer = PayloadSigner(
OPTIONS.package_key, OPTIONS.private_key_suffix, payload_signer=external_signer)
input_file = os.path.join(self.testdata_dir, self.SIGFILE)
- signed_file = payload_signer.Sign(input_file)
+ signed_file = payload_signer.SignHashFile(input_file)
verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
self._assertFilesEqual(verify_file, signed_file)
@@ -1222,7 +1222,7 @@
@staticmethod
def _create_payload_full(secondary=False):
target_file = construct_target_files(secondary)
- payload = PayloadGenerator(secondary)
+ payload = PayloadGenerator(secondary, OPTIONS.wipe_user_data)
payload.Generate(target_file)
return payload
@@ -1295,6 +1295,9 @@
common.OPTIONS.wipe_user_data = True
payload = self._create_payload_full()
payload.Sign(PayloadSigner())
+ with tempfile.NamedTemporaryFile() as fp:
+ with zipfile.ZipFile(fp, "w") as zfp:
+ payload.WriteToZip(zfp)
with open(payload.payload_properties) as properties_fp:
self.assertIn("POWERWASH=1", properties_fp.read())
@@ -1303,6 +1306,9 @@
def test_Sign_secondary(self):
payload = self._create_payload_full(secondary=True)
payload.Sign(PayloadSigner())
+ with tempfile.NamedTemporaryFile() as fp:
+ with zipfile.ZipFile(fp, "w") as zfp:
+ payload.WriteToZip(zfp)
with open(payload.payload_properties) as properties_fp:
self.assertIn("SWITCH_SLOT_ON_REBOOT=0", properties_fp.read())
@@ -1338,22 +1344,6 @@
self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
@test_utils.SkipIfExternalToolsUnavailable()
- def test_WriteToZip_unsignedPayload(self):
- """Unsigned payloads should not be allowed to be written to zip."""
- payload = self._create_payload_full()
-
- output_file = common.MakeTempFile(suffix='.zip')
- with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
- self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
-
- # Also test with incremental payload.
- payload = self._create_payload_incremental()
-
- output_file = common.MakeTempFile(suffix='.zip')
- with zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
- self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
-
- @test_utils.SkipIfExternalToolsUnavailable()
def test_WriteToZip_secondary(self):
payload = self._create_payload_full(secondary=True)
payload.Sign(PayloadSigner())