Add multi thread support for A/B generator am: c4ad1ebc33
am: 65e115f0d2
Change-Id: I312819e33f2e85d8a5bd887680fd5f8d3b254747
diff --git a/payload_generator/delta_diff_utils.cc b/payload_generator/delta_diff_utils.cc
index 44aff7a..045d52f 100644
--- a/payload_generator/delta_diff_utils.cc
+++ b/payload_generator/delta_diff_utils.cc
@@ -22,7 +22,7 @@
#pragma clang diagnostic ignored "-Wmacro-redefined"
#include <ext2fs/ext2fs.h>
#pragma clang diagnostic pop
-
+#include <unistd.h>
#include <algorithm>
#include <map>
@@ -30,6 +30,7 @@
#include <base/files/file_util.h>
#include <base/format_macros.h>
#include <base/strings/stringprintf.h>
+#include <base/threading/simple_thread.h>
#include "update_engine/common/hash_calculator.h"
#include "update_engine/common/subprocess.h"
@@ -171,6 +172,81 @@
namespace diff_utils {
+// This class encapsulates a file delta processing thread work. The
+// processor computes the delta between the source and target files;
+// and write the compressed delta to the blob.
+class FileDeltaProcessor : public base::DelegateSimpleThread::Delegate {
+ public:
+ FileDeltaProcessor(const string& old_part,
+ const string& new_part,
+ const PayloadVersion& version,
+ const vector<Extent>& old_extents,
+ const vector<Extent>& new_extents,
+ const string& name,
+ ssize_t chunk_blocks,
+ BlobFileWriter* blob_file)
+ : old_part_(old_part),
+ new_part_(new_part),
+ version_(version),
+ old_extents_(old_extents),
+ new_extents_(new_extents),
+ name_(name),
+ chunk_blocks_(chunk_blocks),
+ blob_file_(blob_file) {}
+
+ FileDeltaProcessor(FileDeltaProcessor&& processor) = default;
+
+ ~FileDeltaProcessor() override = default;
+
+ // Overrides DelegateSimpleThread::Delegate.
+ // Calculate the list of operations and write their corresponding deltas to
+ // the blob_file.
+ void Run() override;
+
+ // Merge each file processor's ops list to aops.
+ void MergeOperation(vector<AnnotatedOperation>* aops);
+
+ private:
+ const string& old_part_;
+ const string& new_part_;
+ const PayloadVersion& version_;
+
+ // The block ranges of the old/new file within the src/tgt image
+ const vector<Extent> old_extents_;
+ const vector<Extent> new_extents_;
+ const string name_;
+ // Block limit of one aop.
+ ssize_t chunk_blocks_;
+ BlobFileWriter* blob_file_;
+
+ // The list of ops to reach the new file from the old file.
+ vector<AnnotatedOperation> file_aops_;
+
+ DISALLOW_COPY_AND_ASSIGN(FileDeltaProcessor);
+};
+
+void FileDeltaProcessor::Run() {
+ TEST_AND_RETURN(blob_file_ != nullptr);
+
+ if (!DeltaReadFile(&file_aops_,
+ old_part_,
+ new_part_,
+ old_extents_,
+ new_extents_,
+ name_,
+ chunk_blocks_,
+ version_,
+ blob_file_)) {
+ LOG(ERROR) << "Failed to generate delta for " << name_ << " ("
+ << BlocksInExtents(new_extents_) << " blocks)";
+ }
+}
+
+void FileDeltaProcessor::MergeOperation(vector<AnnotatedOperation>* aops) {
+ aops->reserve(aops->size() + file_aops_.size());
+ std::move(file_aops_.begin(), file_aops_.end(), std::back_inserter(*aops));
+}
+
bool DeltaReadPartition(vector<AnnotatedOperation>* aops,
const PartitionConfig& old_part,
const PartitionConfig& new_part,
@@ -205,6 +281,8 @@
vector<FilesystemInterface::File> new_files;
new_part.fs_interface->GetFiles(&new_files);
+ vector<FileDeltaProcessor> file_delta_processors;
+
// The processing is very straightforward here, we generate operations for
// every file (and pseudo-file such as the metadata) in the new filesystem
// based on the file with the same name in the old filesystem, if any.
@@ -239,16 +317,29 @@
old_files_map[new_file.name], old_visited_blocks);
old_visited_blocks.AddExtents(old_file_extents);
- TEST_AND_RETURN_FALSE(DeltaReadFile(aops,
- old_part.path,
- new_part.path,
- old_file_extents,
- new_file_extents,
- new_file.name, // operation name
- hard_chunk_blocks,
- version,
- blob_file));
+ file_delta_processors.emplace_back(old_part.path,
+ new_part.path,
+ version,
+ std::move(old_file_extents),
+ std::move(new_file_extents),
+ new_file.name, // operation name
+ hard_chunk_blocks,
+ blob_file);
}
+
+ size_t max_threads = GetMaxThreads();
+ base::DelegateSimpleThreadPool thread_pool("incremental-update-generator",
+ max_threads);
+ thread_pool.Start();
+ for (auto& processor : file_delta_processors) {
+ thread_pool.AddWork(&processor);
+ }
+ thread_pool.JoinAll();
+
+ for (auto& processor : file_delta_processors) {
+ processor.MergeOperation(aops);
+ }
+
// Process all the blocks not included in any file. We provided all the unused
// blocks in the old partition as available data.
vector<Extent> new_unvisited = {
@@ -807,6 +898,11 @@
return true;
}
+// Return the number of CPUs on the machine, and 4 threads in minimum.
+size_t GetMaxThreads() {
+ return std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
+}
+
} // namespace diff_utils
} // namespace chromeos_update_engine
diff --git a/payload_generator/delta_diff_utils.h b/payload_generator/delta_diff_utils.h
index 7254bca..c9fef17 100644
--- a/payload_generator/delta_diff_utils.h
+++ b/payload_generator/delta_diff_utils.h
@@ -143,6 +143,9 @@
// false.
bool IsExtFilesystem(const std::string& device);
+// Returns the max number of threads to process the files(chunks) in parallel.
+size_t GetMaxThreads();
+
} // namespace diff_utils
} // namespace chromeos_update_engine
diff --git a/payload_generator/full_update_generator.cc b/payload_generator/full_update_generator.cc
index 8fdb6ec..482a789 100644
--- a/payload_generator/full_update_generator.cc
+++ b/payload_generator/full_update_generator.cc
@@ -139,7 +139,7 @@
TEST_AND_RETURN_FALSE(full_chunk_size % config.block_size == 0);
size_t chunk_blocks = full_chunk_size / config.block_size;
- size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
+ size_t max_threads = diff_utils::GetMaxThreads();
LOG(INFO) << "Compressing partition " << new_part.name
<< " from " << new_part.path << " splitting in chunks of "
<< chunk_blocks << " blocks (" << config.block_size