repopick: Apply change in parallel

Change-Id: Iafd803422082bcc17f2ad3300df3882d689f3674
diff --git a/build/tools/repopick.py b/build/tools/repopick.py
index 3ea8cef..4806b8f 100755
--- a/build/tools/repopick.py
+++ b/build/tools/repopick.py
@@ -30,7 +30,8 @@
 import urllib.parse
 import urllib.request
 from collections import defaultdict
-from functools import cmp_to_key
+from concurrent.futures import ThreadPoolExecutor
+from functools import cmp_to_key, partial
 from xml.etree import ElementTree
 
 # Default to LineageOS Gerrit
@@ -186,6 +187,17 @@
     return status not in ("OPEN", "NEW", "DRAFT")
 
 
+def commit_exists(project_path, revision):
+    return (
+        subprocess.call(
+            ["git", "cat-file", "-e", revision],
+            cwd=project_path,
+            stderr=subprocess.DEVNULL,
+        )
+        == 0
+    )
+
+
 def main():
     parser = argparse.ArgumentParser(
         formatter_class=argparse.RawDescriptionHelpFormatter,
@@ -292,6 +304,14 @@
         metavar="",
         help="pass the amount of commits to check for already picked changes",
     )
+    parser.add_argument(
+        "-j",
+        "--jobs",
+        type=int,
+        default=4,
+        metavar="",
+        help="max number of changes to pick in parallel",
+    )
     args = parser.parse_args()
     if not args.start_branch and args.abandon_first:
         parser.error(
@@ -502,18 +522,18 @@
             "patchset": review["revisions"][review["current_revision"]]["_number"],
             "fetch": review["revisions"][review["current_revision"]]["fetch"],
             "id": change,
+            "revision": review["current_revision"],
         }
 
         if patchset:
-            try:
-                item["fetch"] = [
-                    review["revisions"][x]["fetch"]
-                    for x in review["revisions"]
-                    if review["revisions"][x]["_number"] == patchset
-                ][0]
-                item["id"] = "{0}/{1}".format(change, patchset)
-                item["patchset"] = patchset
-            except (IndexError, ValueError):
+            for x in review["revisions"]:
+                if review["revisions"][x]["_number"] == patchset:
+                    item["fetch"] = review["revisions"][x]["fetch"]
+                    item["id"] = "{0}/{1}".format(change, patchset)
+                    item["patchset"] = patchset
+                    item["revision"] = x
+                    break
+            else:
                 args.quiet or print(
                     "ERROR: The patch set {0}/{1} could not be found, using CURRENT_REVISION instead.".format(
                         change, patchset
@@ -522,6 +542,7 @@
 
         mergables[project_path].append(item)
 
+    # round 1: start branch and drop picked changes
     for project_path, per_path_mergables in mergables.items():
         # If --start-branch is given, create the branch (more than once per path is okay; repo ignores gracefully)
         if args.start_branch:
@@ -548,11 +569,7 @@
 
         picked_change_ids = []
         for i in range(check_picked_count):
-            if subprocess.call(
-                ["git", "cat-file", "-e", "HEAD~{0}".format(i)],
-                cwd=project_path,
-                stderr=open(os.devnull, "wb"),
-            ):
+            if not commit_exists(project_path, "HEAD~{0}".format(i)):
                 continue
             output = subprocess.check_output(
                 ["git", "show", "-q", f"HEAD~{i}"], cwd=project_path, text=True
@@ -573,27 +590,34 @@
                         item["id"], project_path
                     )
                 )
-                continue
+                per_path_mergables.remove(item)
 
-            apply_change(args, item)
+    # round 2: fetch changes in parallel if not pull
+    if not args.pull:
+        with ThreadPoolExecutor(max_workers=args.jobs) as e:
+            for per_path_mergables in mergables.values():
+                # changes are sorted so loop in reversed order to fetch top commits first
+                for item in reversed(per_path_mergables):
+                    e.submit(partial(do_git_fetch_pull, args), item)
+
+    # round 3: apply changes in parallel for different projects, but sequential
+    # within each project
+    with ThreadPoolExecutor(max_workers=args.jobs) as e:
+
+        def bulk_pick_change(per_path_mergables):
+            for item in per_path_mergables:
+                apply_change(args, item)
+
+        for per_path_mergables in mergables.values():
+            e.submit(bulk_pick_change, per_path_mergables)
 
 
-def apply_change(args, item):
-    args.quiet or print("Applying change number {0}...".format(item["id"]))
-    if is_closed(item["status"]):
-        print("!! Force-picking a closed change !!\n")
-
+def do_git_fetch_pull(args, item):
     project_path = item["project_path"]
 
-    # Print out some useful info
-    if not args.quiet:
-        print('--> Subject:       "{0}"'.format(item["subject"]))
-        print("--> Project path:  {0}".format(project_path))
-        print(
-            "--> Change number: {0} (Patch Set {1})".format(
-                item["id"], item["patchset"]
-            )
-        )
+    # commit object already exists, no need to fetch
+    if commit_exists(project_path, item["revision"]):
+        return
 
     if "anonymous http" in item["fetch"]:
         method = "anonymous http"
@@ -639,8 +663,29 @@
         if result != 0:
             print("ERROR: git command failed")
             sys.exit(result)
-    # Perform the cherry-pick
-    if not args.pull:
+
+
+def apply_change(args, item):
+    args.quiet or print("Applying change number {0}...".format(item["id"]))
+    if is_closed(item["status"]):
+        print("!! Force-picking a closed change !!\n")
+
+    project_path = item["project_path"]
+
+    # Print out some useful info
+    if not args.quiet:
+        print('--> Subject:       "{0}"'.format(item["subject"]))
+        print("--> Project path:  {0}".format(project_path))
+        print(
+            "--> Change number: {0} (Patch Set {1})".format(
+                item["id"], item["patchset"]
+            )
+        )
+
+    if args.pull:
+        do_git_fetch_pull(args, item)
+    else:
+        # Perform the cherry-pick
         if args.quiet:
             cmd_out = subprocess.DEVNULL
         else: