Split the huge merge_target_files script into multiple files.
Bug: 221858722
Test: m otatools; Use to create merged builds
Test: atest --host releasetools_test
Change-Id: I5f932f160d3f6405b41a7721b1c75cc96749e77b
diff --git a/tools/releasetools/merge/merge_utils.py b/tools/releasetools/merge/merge_utils.py
new file mode 100644
index 0000000..e822061
--- /dev/null
+++ b/tools/releasetools/merge/merge_utils.py
@@ -0,0 +1,187 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2022 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+#
+"""Common utility functions shared by merge_* scripts.
+
+Expects items in OPTIONS prepared by merge_target_files.py.
+"""
+
+import fnmatch
+import logging
+import os
+import re
+import shutil
+import zipfile
+
+import common
+
+logger = logging.getLogger(__name__)
+OPTIONS = common.OPTIONS
+
+
+def ExtractItems(input_zip, output_dir, extract_item_list):
+ """Extracts items in extract_item_list from a zip to a dir."""
+
+ # Filter the extract_item_list to remove any items that do not exist in the
+ # zip file. Otherwise, the extraction step will fail.
+
+ with zipfile.ZipFile(input_zip, allowZip64=True) as input_zipfile:
+ input_namelist = input_zipfile.namelist()
+
+ filtered_extract_item_list = []
+ for pattern in extract_item_list:
+ if fnmatch.filter(input_namelist, pattern):
+ filtered_extract_item_list.append(pattern)
+
+ common.UnzipToDir(input_zip, output_dir, filtered_extract_item_list)
+
+
+def CopyItems(from_dir, to_dir, patterns):
+ """Similar to ExtractItems() except uses an input dir instead of zip."""
+ file_paths = []
+ for dirpath, _, filenames in os.walk(from_dir):
+ file_paths.extend(
+ os.path.relpath(path=os.path.join(dirpath, filename), start=from_dir)
+ for filename in filenames)
+
+ filtered_file_paths = set()
+ for pattern in patterns:
+ filtered_file_paths.update(fnmatch.filter(file_paths, pattern))
+
+ for file_path in filtered_file_paths:
+ original_file_path = os.path.join(from_dir, file_path)
+ copied_file_path = os.path.join(to_dir, file_path)
+ copied_file_dir = os.path.dirname(copied_file_path)
+ if not os.path.exists(copied_file_dir):
+ os.makedirs(copied_file_dir)
+ if os.path.islink(original_file_path):
+ os.symlink(os.readlink(original_file_path), copied_file_path)
+ else:
+ shutil.copyfile(original_file_path, copied_file_path)
+
+
+def WriteSortedData(data, path):
+ """Writes the sorted contents of either a list or dict to file.
+
+ This function sorts the contents of the list or dict and then writes the
+ resulting sorted contents to a file specified by path.
+
+ Args:
+ data: The list or dict to sort and write.
+ path: Path to the file to write the sorted values to. The file at path will
+ be overridden if it exists.
+ """
+ with open(path, 'w') as output:
+ for entry in sorted(data):
+ out_str = '{}={}\n'.format(entry, data[entry]) if isinstance(
+ data, dict) else '{}\n'.format(entry)
+ output.write(out_str)
+
+
+# The merge config lists should not attempt to extract items from both
+# builds for any of the following partitions. The partitions in
+# SINGLE_BUILD_PARTITIONS should come entirely from a single build (either
+# framework or vendor, but not both).
+
+_SINGLE_BUILD_PARTITIONS = (
+ 'BOOT/',
+ 'DATA/',
+ 'ODM/',
+ 'PRODUCT/',
+ 'SYSTEM_EXT/',
+ 'RADIO/',
+ 'RECOVERY/',
+ 'ROOT/',
+ 'SYSTEM/',
+ 'SYSTEM_OTHER/',
+ 'VENDOR/',
+ 'VENDOR_DLKM/',
+ 'ODM_DLKM/',
+ 'SYSTEM_DLKM/',
+)
+
+
+def ValidateConfigLists():
+ """Performs validations on the merge config lists.
+
+ Returns:
+ False if a validation fails, otherwise true.
+ """
+ has_error = False
+
+ # Check that partitions only come from one input.
+ for partition in _SINGLE_BUILD_PARTITIONS:
+ image_path = 'IMAGES/{}.img'.format(partition.lower().replace('/', ''))
+ in_framework = (
+ any(item.startswith(partition) for item in OPTIONS.framework_item_list)
+ or image_path in OPTIONS.framework_item_list)
+ in_vendor = (
+ any(item.startswith(partition) for item in OPTIONS.vendor_item_list) or
+ image_path in OPTIONS.vendor_item_list)
+ if in_framework and in_vendor:
+ logger.error(
+ 'Cannot extract items from %s for both the framework and vendor'
+ ' builds. Please ensure only one merge config item list'
+ ' includes %s.', partition, partition)
+ has_error = True
+
+ if any([
+ key in OPTIONS.framework_misc_info_keys
+ for key in ('dynamic_partition_list', 'super_partition_groups')
+ ]):
+ logger.error('Dynamic partition misc info keys should come from '
+ 'the vendor instance of META/misc_info.txt.')
+ has_error = True
+
+ return not has_error
+
+
+# In an item list (framework or vendor), we may see entries that select whole
+# partitions. Such an entry might look like this 'SYSTEM/*' (e.g., for the
+# system partition). The following regex matches this and extracts the
+# partition name.
+
+_PARTITION_ITEM_PATTERN = re.compile(r'^([A-Z_]+)/\*$')
+
+
+def ItemListToPartitionSet(item_list):
+ """Converts a target files item list to a partition set.
+
+ The item list contains items that might look like 'SYSTEM/*' or 'VENDOR/*' or
+ 'OTA/android-info.txt'. Items that end in '/*' are assumed to match entire
+ directories where 'SYSTEM' or 'VENDOR' is a directory name that identifies the
+ contents of a partition of the same name. Other items in the list, such as the
+ 'OTA' example contain metadata. This function iterates such a list, returning
+ a set that contains the partition entries.
+
+ Args:
+ item_list: A list of items in a target files package.
+
+ Returns:
+ A set of partitions extracted from the list of items.
+ """
+
+ partition_set = set()
+
+ for item in item_list:
+ partition_match = _PARTITION_ITEM_PATTERN.search(item.strip())
+ partition_tag = partition_match.group(
+ 1).lower() if partition_match else None
+
+ if partition_tag:
+ partition_set.add(partition_tag)
+
+ return partition_set