releasetools: Separate streaming metadata computation into functions.

And add unittests for ComputeStreamingMetadata().

This prepares for the changes that add additional property-files (for
both of A/B and non-A/B).

Bug: 74210298
Bug: 72751683
Test: python -m unittest test_ota_from_target_files
Test: Generate A/B OTA package. Check the ota-streaming-property-files
      in the METADATA entry.
Change-Id: Ib4b069f61c2c06c035c0cff73a55112f3936b969
diff --git a/tools/releasetools/ota_from_target_files.py b/tools/releasetools/ota_from_target_files.py
index dd8dcd0..7ef522c 100755
--- a/tools/releasetools/ota_from_target_files.py
+++ b/tools/releasetools/ota_from_target_files.py
@@ -955,6 +955,119 @@
   return metadata
 
 
+def ComputeStreamingMetadata(zip_file, reserve_space=False,
+                             expected_length=None):
+  """Computes the streaming metadata for a given zip.
+
+  When 'reserve_space' is True, we reserve extra space for the offset and
+  length of the metadata entry itself, although we don't know the final
+  values until the package gets signed. This function will be called again
+  after signing. We then write the actual values and pad the string to the
+  length we set earlier. Note that we can't use the actual length of the
+  metadata entry in the second run. Otherwise the offsets for other entries
+  will be changing again.
+  """
+
+  def ComputeEntryOffsetSize(name):
+    """Compute the zip entry offset and size."""
+    info = zip_file.getinfo(name)
+    offset = info.header_offset + len(info.FileHeader())
+    size = info.file_size
+    return '%s:%d:%d' % (os.path.basename(name), offset, size)
+
+  # payload.bin and payload_properties.txt must exist.
+  offsets = [ComputeEntryOffsetSize('payload.bin'),
+             ComputeEntryOffsetSize('payload_properties.txt')]
+
+  # care_map.txt is available only if dm-verity is enabled.
+  if 'care_map.txt' in zip_file.namelist():
+    offsets.append(ComputeEntryOffsetSize('care_map.txt'))
+
+  if 'compatibility.zip' in zip_file.namelist():
+    offsets.append(ComputeEntryOffsetSize('compatibility.zip'))
+
+  # 'META-INF/com/android/metadata' is required. We don't know its actual
+  # offset and length (as well as the values for other entries). So we
+  # reserve 10-byte as a placeholder, which is to cover the space for metadata
+  # entry ('xx:xxx', since it's ZIP_STORED which should appear at the
+  # beginning of the zip), as well as the possible value changes in other
+  # entries.
+  if reserve_space:
+    offsets.append('metadata:' + ' ' * 10)
+  else:
+    offsets.append(ComputeEntryOffsetSize(METADATA_NAME))
+
+  value = ','.join(offsets)
+  if expected_length is not None:
+    assert len(value) <= expected_length, \
+        'Insufficient reserved space: reserved=%d, actual=%d' % (
+            expected_length, len(value))
+    value += ' ' * (expected_length - len(value))
+  return value
+
+
+def FinalizeMetadata(metadata, input_file, output_file):
+  """Finalizes the metadata and signs an A/B OTA package.
+
+  In order to stream an A/B OTA package, we need 'ota-streaming-property-files'
+  that contains the offsets and sizes for the ZIP entries. An example
+  property-files string is as follows.
+
+    "payload.bin:679:343,payload_properties.txt:378:45,metadata:69:379"
+
+  OTA server can pass down this string, in addition to the package URL, to the
+  system update client. System update client can then fetch individual ZIP
+  entries (ZIP_STORED) directly at the given offset of the URL.
+
+  Args:
+    metadata: The metadata dict for the package.
+    input_file: The input ZIP filename that doesn't contain the package METADATA
+        entry yet.
+    output_file: The final output ZIP filename.
+  """
+  output_zip = zipfile.ZipFile(
+      input_file, 'a', compression=zipfile.ZIP_DEFLATED)
+
+  # Write the current metadata entry with placeholders.
+  metadata['ota-streaming-property-files'] = ComputeStreamingMetadata(
+      output_zip, reserve_space=True)
+  WriteMetadata(metadata, output_zip)
+  common.ZipClose(output_zip)
+
+  # SignOutput(), which in turn calls signapk.jar, will possibly reorder the
+  # ZIP entries, as well as padding the entry headers. We do a preliminary
+  # signing (with an incomplete metadata entry) to allow that to happen. Then
+  # compute the ZIP entry offsets, write back the final metadata and do the
+  # final signing.
+  prelim_signing = common.MakeTempFile(suffix='.zip')
+  SignOutput(input_file, prelim_signing)
+
+  # Open the signed zip. Compute the final metadata that's needed for streaming.
+  prelim_signing_zip = zipfile.ZipFile(prelim_signing, 'r')
+  expected_length = len(metadata['ota-streaming-property-files'])
+  metadata['ota-streaming-property-files'] = ComputeStreamingMetadata(
+      prelim_signing_zip, reserve_space=False, expected_length=expected_length)
+  common.ZipClose(prelim_signing_zip)
+
+  # Replace the METADATA entry.
+  common.ZipDelete(prelim_signing, METADATA_NAME)
+  output_zip = zipfile.ZipFile(prelim_signing, 'a',
+                               compression=zipfile.ZIP_DEFLATED)
+  WriteMetadata(metadata, output_zip)
+  common.ZipClose(output_zip)
+
+  # Re-sign the package after updating the metadata entry.
+  SignOutput(prelim_signing, output_file)
+
+  # Reopen the final signed zip to double check the streaming metadata.
+  output_zip = zipfile.ZipFile(output_file, 'r')
+  actual = metadata['ota-streaming-property-files'].strip()
+  expected = ComputeStreamingMetadata(output_zip)
+  assert actual == expected, \
+      "Mismatching streaming metadata: %s vs %s." % (actual, expected)
+  common.ZipClose(output_zip)
+
+
 def WriteBlockIncrementalOTAPackage(target_zip, source_zip, output_zip):
   target_info = BuildInfo(OPTIONS.target_info_dict, OPTIONS.oem_dicts)
   source_info = BuildInfo(OPTIONS.source_info_dict, OPTIONS.oem_dicts)
@@ -1301,58 +1414,7 @@
 
 def WriteABOTAPackageWithBrilloScript(target_file, output_file,
                                       source_file=None):
-  """Generate an Android OTA package that has A/B update payload."""
-
-  def ComputeStreamingMetadata(zip_file, reserve_space=False,
-                               expected_length=None):
-    """Compute the streaming metadata for a given zip.
-
-    When 'reserve_space' is True, we reserve extra space for the offset and
-    length of the metadata entry itself, although we don't know the final
-    values until the package gets signed. This function will be called again
-    after signing. We then write the actual values and pad the string to the
-    length we set earlier. Note that we can't use the actual length of the
-    metadata entry in the second run. Otherwise the offsets for other entries
-    will be changing again.
-    """
-
-    def ComputeEntryOffsetSize(name):
-      """Compute the zip entry offset and size."""
-      info = zip_file.getinfo(name)
-      offset = info.header_offset + len(info.FileHeader())
-      size = info.file_size
-      return '%s:%d:%d' % (os.path.basename(name), offset, size)
-
-    # payload.bin and payload_properties.txt must exist.
-    offsets = [ComputeEntryOffsetSize('payload.bin'),
-               ComputeEntryOffsetSize('payload_properties.txt')]
-
-    # care_map.txt is available only if dm-verity is enabled.
-    if 'care_map.txt' in zip_file.namelist():
-      offsets.append(ComputeEntryOffsetSize('care_map.txt'))
-
-    if 'compatibility.zip' in zip_file.namelist():
-      offsets.append(ComputeEntryOffsetSize('compatibility.zip'))
-
-    # 'META-INF/com/android/metadata' is required. We don't know its actual
-    # offset and length (as well as the values for other entries). So we
-    # reserve 10-byte as a placeholder, which is to cover the space for metadata
-    # entry ('xx:xxx', since it's ZIP_STORED which should appear at the
-    # beginning of the zip), as well as the possible value changes in other
-    # entries.
-    if reserve_space:
-      offsets.append('metadata:' + ' ' * 10)
-    else:
-      offsets.append(ComputeEntryOffsetSize(METADATA_NAME))
-
-    value = ','.join(offsets)
-    if expected_length is not None:
-      assert len(value) <= expected_length, \
-          'Insufficient reserved space: reserved=%d, actual=%d' % (
-              expected_length, len(value))
-      value += ' ' * (expected_length - len(value))
-    return value
-
+  """Generates an Android OTA package that has A/B update payload."""
   # Stage the output zip package for package signing.
   staging_file = common.MakeTempFile(suffix='.zip')
   output_zip = zipfile.ZipFile(staging_file, "w",
@@ -1415,44 +1477,11 @@
 
   common.ZipClose(target_zip)
 
-  # Write the current metadata entry with placeholders.
-  metadata['ota-streaming-property-files'] = ComputeStreamingMetadata(
-      output_zip, reserve_space=True)
-  WriteMetadata(metadata, output_zip)
+  # We haven't written the metadata entry yet, which will be handled in
+  # FinalizeMetadata().
   common.ZipClose(output_zip)
 
-  # SignOutput(), which in turn calls signapk.jar, will possibly reorder the
-  # ZIP entries, as well as padding the entry headers. We do a preliminary
-  # signing (with an incomplete metadata entry) to allow that to happen. Then
-  # compute the ZIP entry offsets, write back the final metadata and do the
-  # final signing.
-  prelim_signing = common.MakeTempFile(suffix='.zip')
-  SignOutput(staging_file, prelim_signing)
-
-  # Open the signed zip. Compute the final metadata that's needed for streaming.
-  prelim_signing_zip = zipfile.ZipFile(prelim_signing, 'r')
-  expected_length = len(metadata['ota-streaming-property-files'])
-  metadata['ota-streaming-property-files'] = ComputeStreamingMetadata(
-      prelim_signing_zip, reserve_space=False, expected_length=expected_length)
-  common.ZipClose(prelim_signing_zip)
-
-  # Replace the METADATA entry.
-  common.ZipDelete(prelim_signing, METADATA_NAME)
-  output_zip = zipfile.ZipFile(prelim_signing, 'a',
-                               compression=zipfile.ZIP_DEFLATED)
-  WriteMetadata(metadata, output_zip)
-  common.ZipClose(output_zip)
-
-  # Re-sign the package after updating the metadata entry.
-  SignOutput(prelim_signing, output_file)
-
-  # Reopen the final signed zip to double check the streaming metadata.
-  output_zip = zipfile.ZipFile(output_file, 'r')
-  actual = metadata['ota-streaming-property-files'].strip()
-  expected = ComputeStreamingMetadata(output_zip)
-  assert actual == expected, \
-      "Mismatching streaming metadata: %s vs %s." % (actual, expected)
-  common.ZipClose(output_zip)
+  FinalizeMetadata(metadata, staging_file, output_file)
 
 
 def main(argv):
diff --git a/tools/releasetools/test_ota_from_target_files.py b/tools/releasetools/test_ota_from_target_files.py
index ee5bc53..ed25f13 100644
--- a/tools/releasetools/test_ota_from_target_files.py
+++ b/tools/releasetools/test_ota_from_target_files.py
@@ -23,7 +23,7 @@
 import common
 import test_utils
 from ota_from_target_files import (
-    _LoadOemDicts, BuildInfo, GetPackageMetadata,
+    _LoadOemDicts, BuildInfo, ComputeStreamingMetadata, GetPackageMetadata,
     GetTargetFilesZipForSecondaryImages,
     GetTargetFilesZipWithoutPostinstallConfig,
     Payload, PayloadSigner, POSTINSTALL_CONFIG,
@@ -378,6 +378,9 @@
     common.OPTIONS.timestamp = False
     common.OPTIONS.wipe_user_data = False
 
+  def tearDown(self):
+    common.Cleanup()
+
   def test_GetPackageMetadata_abOta_full(self):
     target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
     target_info_dict['ab_update'] = 'true'
@@ -586,6 +589,119 @@
     with zipfile.ZipFile(target_file) as verify_zip:
       self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
 
+  @staticmethod
+  def _construct_zip_package(entries):
+    zip_file = common.MakeTempFile(suffix='.zip')
+    with zipfile.ZipFile(zip_file, 'w') as zip_fp:
+      for entry in entries:
+        zip_fp.writestr(
+            entry,
+            entry.replace('.', '-').upper(),
+            zipfile.ZIP_STORED)
+    return zip_file
+
+  @staticmethod
+  def _parse_streaming_metadata_string(data):
+    result = {}
+    for token in data.split(','):
+      name, info = token.split(':', 1)
+      result[name] = info
+    return result
+
+  def _verify_entries(self, input_file, tokens, entries):
+    for entry in entries:
+      offset, size = map(int, tokens[entry].split(':'))
+      with open(input_file, 'rb') as input_fp:
+        input_fp.seek(offset)
+        if entry == 'metadata':
+          expected = b'META-INF/COM/ANDROID/METADATA'
+        else:
+          expected = entry.replace('.', '-').upper().encode()
+        self.assertEqual(expected, input_fp.read(size))
+
+  def test_ComputeStreamingMetadata_reserveSpace(self):
+    entries = (
+        'payload.bin',
+        'payload_properties.txt',
+    )
+    zip_file = self._construct_zip_package(entries)
+    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
+      streaming_metadata = ComputeStreamingMetadata(zip_fp, reserve_space=True)
+    tokens = self._parse_streaming_metadata_string(streaming_metadata)
+
+    self.assertEqual(3, len(tokens))
+    self._verify_entries(zip_file, tokens, entries)
+
+  def test_ComputeStreamingMetadata_reserveSpace_withCareMapTxtAndCompatibilityZip(self):
+    entries = (
+        'payload.bin',
+        'payload_properties.txt',
+        'care_map.txt',
+        'compatibility.zip',
+    )
+    zip_file = self._construct_zip_package(entries)
+    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
+      streaming_metadata = ComputeStreamingMetadata(zip_fp, reserve_space=True)
+    tokens = self._parse_streaming_metadata_string(streaming_metadata)
+
+    self.assertEqual(5, len(tokens))
+    self._verify_entries(zip_file, tokens, entries)
+
+  def test_ComputeStreamingMetadata(self):
+    entries = [
+        'payload.bin',
+        'payload_properties.txt',
+        'META-INF/com/android/metadata',
+    ]
+    zip_file = self._construct_zip_package(entries)
+    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
+      streaming_metadata = ComputeStreamingMetadata(zip_fp, reserve_space=False)
+    tokens = self._parse_streaming_metadata_string(streaming_metadata)
+
+    self.assertEqual(3, len(tokens))
+    # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
+    # streaming metadata.
+    entries[2] = 'metadata'
+    self._verify_entries(zip_file, tokens, entries)
+
+  def test_ComputeStreamingMetadata_withExpectedLength(self):
+    entries = (
+        'payload.bin',
+        'payload_properties.txt',
+        'care_map.txt',
+        'META-INF/com/android/metadata',
+    )
+    zip_file = self._construct_zip_package(entries)
+    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
+      # First get the raw metadata string (i.e. without padding space).
+      raw_metadata = ComputeStreamingMetadata(
+          zip_fp,
+          reserve_space=False)
+      raw_length = len(raw_metadata)
+
+      # Now pass in the exact expected length.
+      streaming_metadata = ComputeStreamingMetadata(
+          zip_fp,
+          reserve_space=False,
+          expected_length=raw_length)
+      self.assertEqual(raw_length, len(streaming_metadata))
+
+      # Or pass in insufficient length.
+      self.assertRaises(
+          AssertionError,
+          ComputeStreamingMetadata,
+          zip_fp,
+          reserve_space=False,
+          expected_length=raw_length - 1)
+
+      # Or pass in a much larger size.
+      streaming_metadata = ComputeStreamingMetadata(
+          zip_fp,
+          reserve_space=False,
+          expected_length=raw_length + 20)
+      self.assertEqual(raw_length + 20, len(streaming_metadata))
+      self.assertEqual(' ' * 20, streaming_metadata[raw_length:])
+
 
 class PayloadSignerTest(unittest.TestCase):