Parallelize delta generation across partition

On my machine, this change alone reduces incremental
OTA generation time from 56 minutes to 29 minutes

Test: Generate and serve an OTA
Change-Id: Id4ffc6f02f28594eb60cb934777b82f1899bbbc2
diff --git a/payload_generator/delta_diff_generator.cc b/payload_generator/delta_diff_generator.cc
index 595a41e..aa49252 100644
--- a/payload_generator/delta_diff_generator.cc
+++ b/payload_generator/delta_diff_generator.cc
@@ -29,11 +29,13 @@
 #include <vector>
 
 #include <base/logging.h>
+#include <base/threading/simple_thread.h>
 
 #include "update_engine/common/utils.h"
 #include "update_engine/payload_consumer/delta_performer.h"
 #include "update_engine/payload_consumer/payload_constants.h"
 #include "update_engine/payload_generator/ab_generator.h"
+#include "update_engine/payload_generator/annotated_operation.h"
 #include "update_engine/payload_generator/blob_file_writer.h"
 #include "update_engine/payload_generator/delta_diff_utils.h"
 #include "update_engine/payload_generator/full_update_generator.h"
@@ -49,6 +51,45 @@
 const size_t kRootFSPartitionSize = static_cast<size_t>(2) * 1024 * 1024 * 1024;
 const size_t kBlockSize = 4096;  // bytes
 
+class PartitionProcessor : public base::DelegateSimpleThread::Delegate {
+ public:
+  explicit PartitionProcessor(
+      const PayloadGenerationConfig& config,
+      const PartitionConfig& old_part,
+      const PartitionConfig& new_part,
+      BlobFileWriter* file_writer,
+      std::vector<AnnotatedOperation>* aops,
+      std::unique_ptr<chromeos_update_engine::OperationsGenerator> strategy)
+      : config_(config),
+        old_part_(old_part),
+        new_part_(new_part),
+        file_writer_(file_writer),
+        aops_(aops),
+        strategy_(std::move(strategy)) {}
+  PartitionProcessor(PartitionProcessor&&) noexcept = default;
+  void Run() override {
+    LOG(INFO) << "Started an async task to process partition "
+              << old_part_.name;
+    bool success = strategy_->GenerateOperations(
+        config_, old_part_, new_part_, file_writer_, aops_);
+    if (!success) {
+      // ABORT the entire process, so that developer can look
+      // at recent logs and diagnose what happened
+      LOG(FATAL) << "GenerateOperations(" << old_part_.name << ", "
+                 << new_part_.name << ") failed";
+    }
+  }
+
+ private:
+  const PayloadGenerationConfig& config_;
+  const PartitionConfig& old_part_;
+  const PartitionConfig& new_part_;
+  BlobFileWriter* file_writer_;
+  std::vector<AnnotatedOperation>* aops_;
+  std::unique_ptr<chromeos_update_engine::OperationsGenerator> strategy_;
+  DISALLOW_COPY_AND_ASSIGN(PartitionProcessor);
+};
+
 bool GenerateUpdatePayloadFile(const PayloadGenerationConfig& config,
                                const string& output_path,
                                const string& private_key_path,
@@ -80,6 +121,13 @@
                             config.target.partitions.size());
     }
     PartitionConfig empty_part("");
+    std::vector<std::vector<AnnotatedOperation>> all_aops;
+    all_aops.resize(config.target.partitions.size());
+    std::vector<PartitionProcessor> partition_tasks{};
+    auto thread_count = std::min<int>(diff_utils::GetMaxThreads(),
+                                      config.target.partitions.size());
+    base::DelegateSimpleThreadPool thread_pool{"partition-thread-pool",
+                                               thread_count};
     for (size_t i = 0; i < config.target.partitions.size(); i++) {
       const PartitionConfig& old_part =
           config.is_delta ? config.source.partitions[i] : empty_part;
@@ -99,12 +147,26 @@
         strategy.reset(new FullUpdateGenerator());
       }
 
-      vector<AnnotatedOperation> aops;
       // Generate the operations using the strategy we selected above.
-      TEST_AND_RETURN_FALSE(strategy->GenerateOperations(
-          config, old_part, new_part, &blob_file, &aops));
+      partition_tasks.push_back(PartitionProcessor(config,
+                                                   old_part,
+                                                   new_part,
+                                                   &blob_file,
+                                                   &all_aops[i],
+                                                   std::move(strategy)));
+    }
+    thread_pool.Start();
+    for (auto& processor : partition_tasks) {
+      thread_pool.AddWork(&processor);
+    }
+    thread_pool.JoinAll();
 
-      TEST_AND_RETURN_FALSE(payload.AddPartition(old_part, new_part, aops));
+    for (size_t i = 0; i < config.target.partitions.size(); i++) {
+      const PartitionConfig& old_part =
+          config.is_delta ? config.source.partitions[i] : empty_part;
+      const PartitionConfig& new_part = config.target.partitions[i];
+      TEST_AND_RETURN_FALSE(
+          payload.AddPartition(old_part, new_part, std::move(all_aops[i])));
     }
   }
 
diff --git a/payload_generator/payload_file.cc b/payload_generator/payload_file.cc
index 940e9bd..c1594c7 100644
--- a/payload_generator/payload_file.cc
+++ b/payload_generator/payload_file.cc
@@ -86,10 +86,10 @@
 
 bool PayloadFile::AddPartition(const PartitionConfig& old_conf,
                                const PartitionConfig& new_conf,
-                               const vector<AnnotatedOperation>& aops) {
+                               vector<AnnotatedOperation> aops) {
   Partition part;
   part.name = new_conf.name;
-  part.aops = aops;
+  part.aops = std::move(aops);
   part.postinstall = new_conf.postinstall;
   part.verity = new_conf.verity;
   // Initialize the PartitionInfo objects if present.
@@ -172,9 +172,7 @@
     TEST_AND_RETURN_FALSE(PayloadSigner::SignatureBlobLength(
         {private_key_path}, &signature_blob_length));
     PayloadSigner::AddSignatureToManifest(
-        next_blob_offset,
-        signature_blob_length,
-        &manifest_);
+        next_blob_offset, signature_blob_length, &manifest_);
   }
 
   // Serialize protobuf
diff --git a/payload_generator/payload_file.h b/payload_generator/payload_file.h
index 9dc80a7..0c3e9d5 100644
--- a/payload_generator/payload_file.h
+++ b/payload_generator/payload_file.h
@@ -43,7 +43,7 @@
   // reference a blob stored in the file provided to WritePayload().
   bool AddPartition(const PartitionConfig& old_conf,
                     const PartitionConfig& new_conf,
-                    const std::vector<AnnotatedOperation>& aops);
+                    std::vector<AnnotatedOperation> aops);
 
   // Write the payload to the |payload_file| file. The operations reference
   // blobs in the |data_blobs_path| file and the blobs will be reordered in the