Limit partition parallelization to number of CPU cores

When running on machines with <4 CPUs,
delta_generator tend to use 4 threads for partition pool. This
can overload the CPU can cause unnecessary context switching.
Make partition pool respect the --max_threads parameter.

Test: th
Change-Id: I1f63606be43cbbcc91e146c586ecd4faf9fd75ca
diff --git a/lz4diff/lz4diff_compress_unittest.cc b/lz4diff/lz4diff_compress_unittest.cc
index 9caa9a3..2466c82 100644
--- a/lz4diff/lz4diff_compress_unittest.cc
+++ b/lz4diff/lz4diff_compress_unittest.cc
@@ -21,7 +21,6 @@
 #include <string>
 #include <vector>
 
-#include <base/format_macros.h>
 #include <base/logging.h>
 #include <base/strings/string_number_conversions.h>
 #include <base/strings/string_util.h>
@@ -35,7 +34,6 @@
 #include "update_engine/lz4diff/lz4diff_compress.h"
 #include "update_engine/payload_generator/delta_diff_generator.h"
 #include "update_engine/payload_generator/erofs_filesystem.h"
-#include "update_engine/payload_generator/extent_utils.h"
 
 using std::string;
 using std::vector;
diff --git a/payload_consumer/delta_performer_integration_test.cc b/payload_consumer/delta_performer_integration_test.cc
index ffc753a..e765f54 100644
--- a/payload_consumer/delta_performer_integration_test.cc
+++ b/payload_consumer/delta_performer_integration_test.cc
@@ -904,12 +904,11 @@
     // no need to verify new partition if VerifyPayload failed.
     return;
   }
-
-  CompareFilesByBlock(state->result_kernel->path(),
-                      state->new_kernel->path(),
-                      state->kernel_size);
-  CompareFilesByBlock(
-      state->result_img->path(), state->b_img->path(), state->image_size);
+  ASSERT_NO_FATAL_FAILURE(CompareFilesByBlock(state->result_kernel->path(),
+                                              state->new_kernel->path(),
+                                              state->kernel_size));
+  ASSERT_NO_FATAL_FAILURE(CompareFilesByBlock(
+      state->result_img->path(), state->b_img->path(), state->image_size));
 
   brillo::Blob updated_kernel_partition;
   ASSERT_TRUE(
@@ -955,7 +954,8 @@
       break;  // appease gcc
   }
 
-  VerifyPayloadResult(performer, state, expected_result, minor_version);
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyPayloadResult(performer, state, expected_result, minor_version));
 }
 
 void DoSmallImageTest(bool full_kernel,
@@ -966,22 +966,23 @@
                       uint32_t minor_version) {
   DeltaState state;
   DeltaPerformer* performer = nullptr;
-  GenerateDeltaFile(full_kernel,
-                    full_rootfs,
-                    chunk_size,
-                    signature_test,
-                    &state,
-                    minor_version);
+  ASSERT_NO_FATAL_FAILURE(GenerateDeltaFile(full_kernel,
+                                            full_rootfs,
+                                            chunk_size,
+                                            signature_test,
+                                            &state,
+                                            minor_version));
 
-  ApplyDeltaFile(full_kernel,
-                 full_rootfs,
-                 signature_test,
-                 &state,
-                 hash_checks_mandatory,
-                 kValidOperationData,
-                 &performer,
-                 minor_version);
-  VerifyPayload(performer, &state, signature_test, minor_version);
+  ASSERT_NO_FATAL_FAILURE(ApplyDeltaFile(full_kernel,
+                                         full_rootfs,
+                                         signature_test,
+                                         &state,
+                                         hash_checks_mandatory,
+                                         kValidOperationData,
+                                         &performer,
+                                         minor_version));
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyPayload(performer, &state, signature_test, minor_version));
   delete performer;
 }
 
@@ -991,14 +992,14 @@
   uint64_t minor_version = kFullPayloadMinorVersion;
   GenerateDeltaFile(true, true, -1, kSignatureGenerated, &state, minor_version);
   DeltaPerformer* performer = nullptr;
-  ApplyDeltaFile(true,
-                 true,
-                 kSignatureGenerated,
-                 &state,
-                 hash_checks_mandatory,
-                 op_hash_test,
-                 &performer,
-                 minor_version);
+  ASSERT_NO_FATAL_FAILURE(ApplyDeltaFile(true,
+                                         true,
+                                         kSignatureGenerated,
+                                         &state,
+                                         hash_checks_mandatory,
+                                         op_hash_test,
+                                         &performer,
+                                         minor_version));
   delete performer;
 }
 
diff --git a/payload_generator/delta_diff_generator.cc b/payload_generator/delta_diff_generator.cc
index 4abff92..86238f8 100644
--- a/payload_generator/delta_diff_generator.cc
+++ b/payload_generator/delta_diff_generator.cc
@@ -16,9 +16,7 @@
 
 #include "update_engine/payload_generator/delta_diff_generator.h"
 
-#include <errno.h>
 #include <fcntl.h>
-#include <inttypes.h>
 #include <sys/stat.h>
 #include <sys/types.h>
 
@@ -32,9 +30,7 @@
 #include <base/threading/simple_thread.h>
 
 #include "update_engine/common/utils.h"
-#include "update_engine/payload_consumer/delta_performer.h"
 #include "update_engine/payload_consumer/file_descriptor.h"
-#include "update_engine/payload_consumer/payload_constants.h"
 #include "update_engine/payload_generator/ab_generator.h"
 #include "update_engine/payload_generator/annotated_operation.h"
 #include "update_engine/payload_generator/blob_file_writer.h"
@@ -186,8 +182,7 @@
     off_t data_file_size = 0;
     BlobFileWriter blob_file(data_file.fd(), &data_file_size);
     if (config.is_delta) {
-      TEST_AND_RETURN_FALSE(config.source.partitions.size() ==
-                            config.target.partitions.size());
+      TEST_EQ(config.source.partitions.size(), config.target.partitions.size());
     }
     PartitionConfig empty_part("");
     std::vector<std::vector<AnnotatedOperation>> all_aops;
@@ -200,10 +195,16 @@
         config.target.partitions.size());
 
     std::vector<PartitionProcessor> partition_tasks{};
-    auto thread_count = std::min<int>(diff_utils::GetMaxThreads(),
-                                      config.target.partitions.size());
+    auto thread_count = std::min<size_t>(diff_utils::GetMaxThreads(),
+                                         config.target.partitions.size());
+    if (thread_count > config.max_threads) {
+      thread_count = config.max_threads;
+    }
+    if (thread_count < 1) {
+      thread_count = 1;
+    }
     base::DelegateSimpleThreadPool thread_pool{"partition-thread-pool",
-                                               thread_count};
+                                               static_cast<int>(thread_count)};
     for (size_t i = 0; i < config.target.partitions.size(); i++) {
       const PartitionConfig& old_part =
           config.is_delta ? config.source.partitions[i] : empty_part;
diff --git a/payload_generator/delta_diff_utils.cc b/payload_generator/delta_diff_utils.cc
index 152da4d..26b3d76 100644
--- a/payload_generator/delta_diff_utils.cc
+++ b/payload_generator/delta_diff_utils.cc
@@ -1208,9 +1208,9 @@
   return true;
 }
 
-// Return the number of CPUs on the machine, and 4 threads in minimum.
+// Return the number of CPUs on the machine, and 1 threads in minimum.
 size_t GetMaxThreads() {
-  return std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
+  return std::max(sysconf(_SC_NPROCESSORS_ONLN), 1L);
 }
 
 }  // namespace diff_utils
diff --git a/payload_generator/generate_delta_main.cc b/payload_generator/generate_delta_main.cc
index 5ffd05e..40dae87 100644
--- a/payload_generator/generate_delta_main.cc
+++ b/payload_generator/generate_delta_main.cc
@@ -764,6 +764,9 @@
 
   payload_config.security_patch_level = FLAGS_security_patch_level;
 
+  if (FLAGS_max_threads < 1) {
+    FLAGS_max_threads = 1;
+  }
   payload_config.max_threads = FLAGS_max_threads;
 
   if (!FLAGS_partition_timestamps.empty()) {
diff --git a/payload_generator/payload_generation_config.h b/payload_generator/payload_generation_config.h
index 0256a9d..dc18641 100644
--- a/payload_generator/payload_generation_config.h
+++ b/payload_generator/payload_generation_config.h
@@ -266,7 +266,7 @@
 
   std::string security_patch_level;
 
-  uint32_t max_threads = 0;
+  uint32_t max_threads = 1;
 
   std::vector<bsdiff::CompressorType> compressors{
       bsdiff::CompressorType::kBZ2, bsdiff::CompressorType::kBrotli};