releasetools: Separate streaming metadata computation into functions.

And add unittests for ComputeStreamingMetadata().

This prepares for the changes that add additional property-files (for
both of A/B and non-A/B).

Bug: 74210298
Bug: 72751683
Test: python -m unittest test_ota_from_target_files
Test: Generate A/B OTA package. Check the ota-streaming-property-files
      in the METADATA entry.
Change-Id: Ib4b069f61c2c06c035c0cff73a55112f3936b969
diff --git a/tools/releasetools/test_ota_from_target_files.py b/tools/releasetools/test_ota_from_target_files.py
index ee5bc53..ed25f13 100644
--- a/tools/releasetools/test_ota_from_target_files.py
+++ b/tools/releasetools/test_ota_from_target_files.py
@@ -23,7 +23,7 @@
 import common
 import test_utils
 from ota_from_target_files import (
-    _LoadOemDicts, BuildInfo, GetPackageMetadata,
+    _LoadOemDicts, BuildInfo, ComputeStreamingMetadata, GetPackageMetadata,
     GetTargetFilesZipForSecondaryImages,
     GetTargetFilesZipWithoutPostinstallConfig,
     Payload, PayloadSigner, POSTINSTALL_CONFIG,
@@ -378,6 +378,9 @@
     common.OPTIONS.timestamp = False
     common.OPTIONS.wipe_user_data = False
 
+  def tearDown(self):
+    common.Cleanup()
+
   def test_GetPackageMetadata_abOta_full(self):
     target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
     target_info_dict['ab_update'] = 'true'
@@ -586,6 +589,119 @@
     with zipfile.ZipFile(target_file) as verify_zip:
       self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
 
+  @staticmethod
+  def _construct_zip_package(entries):
+    zip_file = common.MakeTempFile(suffix='.zip')
+    with zipfile.ZipFile(zip_file, 'w') as zip_fp:
+      for entry in entries:
+        zip_fp.writestr(
+            entry,
+            entry.replace('.', '-').upper(),
+            zipfile.ZIP_STORED)
+    return zip_file
+
+  @staticmethod
+  def _parse_streaming_metadata_string(data):
+    result = {}
+    for token in data.split(','):
+      name, info = token.split(':', 1)
+      result[name] = info
+    return result
+
+  def _verify_entries(self, input_file, tokens, entries):
+    for entry in entries:
+      offset, size = map(int, tokens[entry].split(':'))
+      with open(input_file, 'rb') as input_fp:
+        input_fp.seek(offset)
+        if entry == 'metadata':
+          expected = b'META-INF/COM/ANDROID/METADATA'
+        else:
+          expected = entry.replace('.', '-').upper().encode()
+        self.assertEqual(expected, input_fp.read(size))
+
+  def test_ComputeStreamingMetadata_reserveSpace(self):
+    entries = (
+        'payload.bin',
+        'payload_properties.txt',
+    )
+    zip_file = self._construct_zip_package(entries)
+    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
+      streaming_metadata = ComputeStreamingMetadata(zip_fp, reserve_space=True)
+    tokens = self._parse_streaming_metadata_string(streaming_metadata)
+
+    self.assertEqual(3, len(tokens))
+    self._verify_entries(zip_file, tokens, entries)
+
+  def test_ComputeStreamingMetadata_reserveSpace_withCareMapTxtAndCompatibilityZip(self):
+    entries = (
+        'payload.bin',
+        'payload_properties.txt',
+        'care_map.txt',
+        'compatibility.zip',
+    )
+    zip_file = self._construct_zip_package(entries)
+    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
+      streaming_metadata = ComputeStreamingMetadata(zip_fp, reserve_space=True)
+    tokens = self._parse_streaming_metadata_string(streaming_metadata)
+
+    self.assertEqual(5, len(tokens))
+    self._verify_entries(zip_file, tokens, entries)
+
+  def test_ComputeStreamingMetadata(self):
+    entries = [
+        'payload.bin',
+        'payload_properties.txt',
+        'META-INF/com/android/metadata',
+    ]
+    zip_file = self._construct_zip_package(entries)
+    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
+      streaming_metadata = ComputeStreamingMetadata(zip_fp, reserve_space=False)
+    tokens = self._parse_streaming_metadata_string(streaming_metadata)
+
+    self.assertEqual(3, len(tokens))
+    # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
+    # streaming metadata.
+    entries[2] = 'metadata'
+    self._verify_entries(zip_file, tokens, entries)
+
+  def test_ComputeStreamingMetadata_withExpectedLength(self):
+    entries = (
+        'payload.bin',
+        'payload_properties.txt',
+        'care_map.txt',
+        'META-INF/com/android/metadata',
+    )
+    zip_file = self._construct_zip_package(entries)
+    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
+      # First get the raw metadata string (i.e. without padding space).
+      raw_metadata = ComputeStreamingMetadata(
+          zip_fp,
+          reserve_space=False)
+      raw_length = len(raw_metadata)
+
+      # Now pass in the exact expected length.
+      streaming_metadata = ComputeStreamingMetadata(
+          zip_fp,
+          reserve_space=False,
+          expected_length=raw_length)
+      self.assertEqual(raw_length, len(streaming_metadata))
+
+      # Or pass in insufficient length.
+      self.assertRaises(
+          AssertionError,
+          ComputeStreamingMetadata,
+          zip_fp,
+          reserve_space=False,
+          expected_length=raw_length - 1)
+
+      # Or pass in a much larger size.
+      streaming_metadata = ComputeStreamingMetadata(
+          zip_fp,
+          reserve_space=False,
+          expected_length=raw_length + 20)
+      self.assertEqual(raw_length + 20, len(streaming_metadata))
+      self.assertEqual(' ' * 20, streaming_metadata[raw_length:])
+
 
 class PayloadSignerTest(unittest.TestCase):