update_engine: Replace gthread with libchrome equivalents.
This patch replaces the gthread usage in the multi-threaded full
payload generator with equivalent functions from libchrome. In the
new code, a thread-pool is used instead of creating one thread per
work and waiting for the threads to finish.
BUG=chromium:499886
TEST=unittests still pass.
Change-Id: I00fd56de2a789b5f007f1a4ab88680809bbeb5f0
Reviewed-on: https://chromium-review.googlesource.com/287635
Trybot-Ready: Alex Deymo <deymo@chromium.org>
Tested-by: Alex Deymo <deymo@chromium.org>
Reviewed-by: Gilad Arnold <garnold@chromium.org>
Commit-Queue: Alex Deymo <deymo@chromium.org>
diff --git a/payload_generator/full_update_generator.cc b/payload_generator/full_update_generator.cc
index b1e479d..a546081 100644
--- a/payload_generator/full_update_generator.cc
+++ b/payload_generator/full_update_generator.cc
@@ -12,14 +12,15 @@
#include <memory>
#include <base/format_macros.h>
-#include <base/strings/stringprintf.h>
#include <base/strings/string_util.h>
+#include <base/strings/stringprintf.h>
+#include <base/synchronization/lock.h>
+#include <base/threading/simple_thread.h>
+#include <chromeos/secure_blob.h>
#include "update_engine/bzip.h"
#include "update_engine/utils.h"
-using std::deque;
-using std::shared_ptr;
using std::string;
using std::vector;
@@ -29,82 +30,128 @@
const size_t kDefaultFullChunkSize = 1024 * 1024; // 1 MiB
-// This class encapsulates a full update chunk processing thread. The processor
-// reads a chunk of data from the input file descriptor and compresses it. The
-// processor needs to be started through Start() then waited on through Wait().
-class ChunkProcessor {
+class BlobFileWriter {
public:
- // Read a chunk of |size| bytes from |fd| starting at offset |offset|.
- ChunkProcessor(int fd, off_t offset, size_t size)
- : thread_(nullptr),
- fd_(fd),
- offset_(offset),
- buffer_in_(size) {}
- ~ChunkProcessor() { Wait(); }
+ // Create the BlobFileWriter object that will manage the blobs stored to
+ // |blob_fd| in a thread safe way. The number of |total_blobs| is the number
+ // of blobs that will be stored but is only used for logging purposes.
+ BlobFileWriter(int blob_fd, off_t* blob_file_size, size_t total_blobs)
+ : total_blobs_(total_blobs),
+ blob_fd_(blob_fd),
+ blob_file_size_(blob_file_size) {}
- off_t offset() const { return offset_; }
- const chromeos::Blob& buffer_in() const { return buffer_in_; }
- const chromeos::Blob& buffer_compressed() const { return buffer_compressed_; }
-
- // Starts the processor. Returns true on success, false on failure.
- bool Start();
-
- // Waits for the processor to complete. Returns true on success, false on
- // failure.
- bool Wait();
-
- bool ShouldCompress() const {
- return buffer_compressed_.size() < buffer_in_.size();
- }
+ // Store the passed |blob| in the blob file. Returns the offset at which it
+ // was stored, or -1 in case of failure.
+ off_t StoreBlob(const chromeos::Blob& blob);
private:
- // Reads the input data into |buffer_in_| and compresses it into
- // |buffer_compressed_|. Returns true on success, false otherwise.
- bool ReadAndCompress();
- static gpointer ReadAndCompressThread(gpointer data);
+ size_t total_blobs_;
+ size_t stored_blobs_{0};
- GThread* thread_;
+ // The file and its size are protected with the |blob_mutex_|.
+ int blob_fd_;
+ off_t* blob_file_size_;
+
+ base::Lock blob_mutex_;
+
+ DISALLOW_COPY_AND_ASSIGN(BlobFileWriter);
+};
+
+off_t BlobFileWriter::StoreBlob(const chromeos::Blob& blob) {
+ base::AutoLock auto_lock(blob_mutex_);
+ if (!utils::WriteAll(blob_fd_, blob.data(), blob.size()))
+ return -1;
+
+ off_t result = *blob_file_size_;
+ *blob_file_size_ += blob.size();
+
+ stored_blobs_++;
+ if (total_blobs_ > 0 &&
+ (10 * (stored_blobs_ - 1) / total_blobs_) !=
+ (10 * stored_blobs_ / total_blobs_)) {
+ LOG(INFO) << (100 * stored_blobs_ / total_blobs_)
+ << "% complete " << stored_blobs_ << "/" << total_blobs_
+ << " ops (output size: " << *blob_file_size_ << ")";
+ }
+ return result;
+}
+
+// This class encapsulates a full update chunk processing thread work. The
+// processor reads a chunk of data from the input file descriptor and compresses
+// it. The processor will destroy itself when the work is done.
+class ChunkProcessor : public base::DelegateSimpleThread::Delegate {
+ public:
+ // Read a chunk of |size| bytes from |fd| starting at offset |offset|.
+ ChunkProcessor(int fd, off_t offset, size_t size,
+ BlobFileWriter* blob_file, AnnotatedOperation* aop)
+ : fd_(fd),
+ offset_(offset),
+ size_(size),
+ blob_file_(blob_file),
+ aop_(aop) {}
+ // We use a default move constructor since all the data members are POD types.
+ ChunkProcessor(ChunkProcessor&&) = default;
+ ~ChunkProcessor() override = default;
+
+ // Overrides DelegateSimpleThread::Delegate.
+ // Run() handles the read from |fd| in a thread-safe way, and stores the
+ // new operation to generate the region starting at |offset| of size |size|
+ // in the output operation |aop|. The associated blob data is stored in
+ // |blob_fd| and |blob_file_size| is updated.
+ void Run() override;
+
+ private:
+ bool ProcessChunk();
+
+ // Stores the operation blob in the |blob_fd_| and updates the
+ // |blob_file_size| accordingly.
+ // This method is thread-safe since it uses a mutex to access the file.
+ // Returns the data offset where the data was written to.
+ off_t StoreBlob(const chromeos::Blob& blob);
+
+ // Work parameters.
int fd_;
off_t offset_;
- chromeos::Blob buffer_in_;
- chromeos::Blob buffer_compressed_;
+ size_t size_;
+ BlobFileWriter* blob_file_;
+ AnnotatedOperation* aop_;
DISALLOW_COPY_AND_ASSIGN(ChunkProcessor);
};
-bool ChunkProcessor::Start() {
- // g_thread_create is deprecated since glib 2.32. Use
- // g_thread_new instead.
- thread_ = g_thread_try_new("chunk_proc", ReadAndCompressThread, this,
- nullptr);
- TEST_AND_RETURN_FALSE(thread_ != nullptr);
- return true;
-}
-
-bool ChunkProcessor::Wait() {
- if (!thread_) {
- return false;
+void ChunkProcessor::Run() {
+ if (!ProcessChunk()) {
+ LOG(ERROR) << "Error processing region at " << offset_ << " of size "
+ << size_;
}
- gpointer result = g_thread_join(thread_);
- thread_ = nullptr;
- TEST_AND_RETURN_FALSE(result == this);
- return true;
}
-gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) {
- return reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ?
- data : nullptr;
-}
-
-bool ChunkProcessor::ReadAndCompress() {
+bool ChunkProcessor::ProcessChunk() {
+ chromeos::Blob buffer_in_(size_);
+ chromeos::Blob op_blob;
ssize_t bytes_read = -1;
TEST_AND_RETURN_FALSE(utils::PReadAll(fd_,
buffer_in_.data(),
buffer_in_.size(),
offset_,
&bytes_read));
- TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size()));
- TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_));
+ TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(size_));
+ TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &op_blob));
+
+ DeltaArchiveManifest_InstallOperation_Type op_type =
+ DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ;
+
+ if (op_blob.size() >= buffer_in_.size()) {
+ // A REPLACE is cheaper than a REPLACE_BZ in this case.
+ op_blob = std::move(buffer_in_);
+ op_type = DeltaArchiveManifest_InstallOperation_Type_REPLACE;
+ }
+
+ off_t op_offset = blob_file_->StoreBlob(op_blob);
+ TEST_AND_RETURN_FALSE(op_offset >= 0);
+ aop_->op.set_data_offset(op_offset);
+ aop_->op.set_data_length(op_blob.size());
+ aop_->op.set_type(op_type);
return true;
}
@@ -112,13 +159,11 @@
bool FullUpdateGenerator::GenerateOperations(
const PayloadGenerationConfig& config,
- int fd,
+ int data_file_fd,
off_t* data_file_size,
vector<AnnotatedOperation>* rootfs_ops,
vector<AnnotatedOperation>* kernel_ops) {
TEST_AND_RETURN_FALSE(config.Validate());
- rootfs_ops->clear();
- kernel_ops->clear();
// FullUpdateGenerator requires a positive chunk_size, otherwise there will
// be only one operation with the whole partition which should not be allowed.
@@ -137,81 +182,87 @@
TEST_AND_RETURN_FALSE(full_chunk_size > 0);
TEST_AND_RETURN_FALSE(full_chunk_size % config.block_size == 0);
+ TEST_AND_RETURN_FALSE(GenerateOperationsForPartition(
+ config.target.rootfs,
+ config.block_size,
+ full_chunk_size / config.block_size,
+ data_file_fd,
+ data_file_size,
+ rootfs_ops));
+ TEST_AND_RETURN_FALSE(GenerateOperationsForPartition(
+ config.target.kernel,
+ config.block_size,
+ full_chunk_size / config.block_size,
+ data_file_fd,
+ data_file_size,
+ kernel_ops));
+ return true;
+}
+
+bool FullUpdateGenerator::GenerateOperationsForPartition(
+ const PartitionConfig& new_part,
+ size_t block_size,
+ size_t chunk_blocks,
+ int data_file_fd,
+ off_t* data_file_size,
+ vector<AnnotatedOperation>* aops) {
size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
- LOG(INFO) << "Max threads: " << max_threads;
+ LOG(INFO) << "Compressing partition " << PartitionNameString(new_part.name)
+ << " from " << new_part.path << " splitting in chunks of "
+ << chunk_blocks << " blocks (" << block_size
+ << " bytes each) using " << max_threads << " threads";
- const PartitionConfig* partitions[] = {
- &config.target.rootfs,
- &config.target.kernel};
+ int in_fd = open(new_part.path.c_str(), O_RDONLY, 0);
+ TEST_AND_RETURN_FALSE(in_fd >= 0);
+ ScopedFdCloser in_fd_closer(&in_fd);
- for (int part_id = 0; part_id < 2; ++part_id) {
- const PartitionConfig* partition = partitions[part_id];
- LOG(INFO) << "compressing " << partition->path;
- int in_fd = open(partition->path.c_str(), O_RDONLY, 0);
- TEST_AND_RETURN_FALSE(in_fd >= 0);
- ScopedFdCloser in_fd_closer(&in_fd);
- deque<shared_ptr<ChunkProcessor>> threads;
- int last_progress_update = INT_MIN;
- size_t bytes_left = partition->size, counter = 0, offset = 0;
- while (bytes_left > 0 || !threads.empty()) {
- // Check and start new chunk processors if possible.
- while (threads.size() < max_threads && bytes_left > 0) {
- size_t this_chunk_bytes = std::min(bytes_left, full_chunk_size);
- shared_ptr<ChunkProcessor> processor(
- new ChunkProcessor(in_fd, offset, this_chunk_bytes));
- threads.push_back(processor);
- TEST_AND_RETURN_FALSE(processor->Start());
- bytes_left -= this_chunk_bytes;
- offset += this_chunk_bytes;
- }
+ // We potentially have all the ChunkProcessors in memory but only
+ // |max_threads| will actually hold a block in memory while we process.
+ size_t partition_blocks = new_part.size / block_size;
+ size_t num_chunks = (partition_blocks + chunk_blocks - 1) / chunk_blocks;
+ aops->resize(num_chunks);
+ vector<ChunkProcessor> chunk_processors;
+ chunk_processors.reserve(num_chunks);
- // Need to wait for a chunk processor to complete and process its output
- // before spawning new processors.
- shared_ptr<ChunkProcessor> processor = threads.front();
- threads.pop_front();
- TEST_AND_RETURN_FALSE(processor->Wait());
+ BlobFileWriter blob_file(data_file_fd, data_file_size, num_chunks);
+ const string part_name_str = PartitionNameString(new_part.name);
- DeltaArchiveManifest_InstallOperation* op = nullptr;
- if (part_id == 0) {
- rootfs_ops->emplace_back();
- rootfs_ops->back().name =
- base::StringPrintf("<rootfs-operation-%" PRIuS ">", counter++);
- op = &rootfs_ops->back().op;
- } else {
- kernel_ops->emplace_back();
- kernel_ops->back().name =
- base::StringPrintf("<kernel-operation-%" PRIuS ">", counter++);
- op = &kernel_ops->back().op;
- }
+ for (size_t i = 0; i < num_chunks; ++i) {
+ size_t start_block = i * chunk_blocks;
+ // The last chunk could be smaller.
+ size_t num_blocks = std::min(chunk_blocks,
+ partition_blocks - i * chunk_blocks);
- const bool compress = processor->ShouldCompress();
- const chromeos::Blob& use_buf =
- compress ? processor->buffer_compressed() : processor->buffer_in();
- op->set_type(compress ?
- DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ :
- DeltaArchiveManifest_InstallOperation_Type_REPLACE);
- op->set_data_offset(*data_file_size);
- TEST_AND_RETURN_FALSE(utils::WriteAll(fd, use_buf.data(),
- use_buf.size()));
- *data_file_size += use_buf.size();
- op->set_data_length(use_buf.size());
- Extent* dst_extent = op->add_dst_extents();
- dst_extent->set_start_block(processor->offset() / config.block_size);
- dst_extent->set_num_blocks(
- processor->buffer_in().size() / config.block_size);
+ // Preset all the static information about the operations. The
+ // ChunkProcessor will set the rest.
+ AnnotatedOperation* aop = aops->data() + i;
+ aop->name = base::StringPrintf("<%s-operation-%" PRIuS ">",
+ part_name_str.c_str(), i);
+ Extent* dst_extent = aop->op.add_dst_extents();
+ dst_extent->set_start_block(start_block);
+ dst_extent->set_num_blocks(num_blocks);
- int progress = static_cast<int>(
- (processor->offset() + processor->buffer_in().size()) * 100.0 /
- partition->size);
- if (last_progress_update < progress &&
- (last_progress_update + 10 <= progress || progress == 100)) {
- LOG(INFO) << progress << "% complete (output size: "
- << *data_file_size << ")";
- last_progress_update = progress;
- }
- }
+ chunk_processors.emplace_back(
+ in_fd,
+ static_cast<off_t>(start_block) * block_size,
+ num_blocks * block_size,
+ &blob_file,
+ aop);
}
+ // Thread pool used for worker threads.
+ base::DelegateSimpleThreadPool thread_pool("full-update-generator",
+ max_threads);
+ thread_pool.Start();
+ for (ChunkProcessor& processor : chunk_processors)
+ thread_pool.AddWork(&processor);
+ thread_pool.JoinAll();
+ // All the operations must have a type set at this point. Otherwise, a
+ // ChunkProcessor failed to complete.
+ for (const AnnotatedOperation& aop : *aops) {
+ if (!aop.op.has_type())
+ return false;
+ }
return true;
}
diff --git a/payload_generator/full_update_generator.h b/payload_generator/full_update_generator.h
index b4d1135..8d7c62e 100644
--- a/payload_generator/full_update_generator.h
+++ b/payload_generator/full_update_generator.h
@@ -19,6 +19,7 @@
public:
FullUpdateGenerator() = default;
+ // OperationsGenerator override.
// Creates a full update for the target image defined in |config|. |config|
// must be a valid payload generation configuration for a full payload.
// Populates |rootfs_ops| and |kernel_ops|, with data about the update
@@ -31,6 +32,21 @@
std::vector<AnnotatedOperation>* rootfs_ops,
std::vector<AnnotatedOperation>* kernel_ops) override;
+ // Generates the list of operations to update inplace from the partition
+ // |old_part| to |new_part|. The |partition_size| should be at least
+ // |new_part.size| and any extra space there could be used as scratch space.
+ // The operations generated will not write more than |chunk_blocks| blocks.
+ // The new operations will create blobs in |data_file_fd| and update
+ // the file size pointed by |data_file_size| if needed.
+ // On success, stores the new operations in |aops| and returns true.
+ static bool GenerateOperationsForPartition(
+ const PartitionConfig& new_part,
+ size_t block_size,
+ size_t chunk_blocks,
+ int data_file_fd,
+ off_t* data_file_size,
+ std::vector<AnnotatedOperation>* aops);
+
private:
DISALLOW_COPY_AND_ASSIGN(FullUpdateGenerator);
};
diff --git a/payload_generator/generate_delta_main.cc b/payload_generator/generate_delta_main.cc
index 469952b..fee3dbb 100644
--- a/payload_generator/generate_delta_main.cc
+++ b/payload_generator/generate_delta_main.cc
@@ -302,7 +302,7 @@
logging::LoggingSettings log_settings;
log_settings.log_file = "delta_generator.log";
log_settings.logging_dest = logging::LOG_TO_SYSTEM_DEBUG_LOG;
- log_settings.lock_log = logging::DONT_LOCK_LOG_FILE;
+ log_settings.lock_log = logging::LOCK_LOG_FILE;
log_settings.delete_old = logging::APPEND_TO_OLD_LOG_FILE;
logging::InitLogging(log_settings);
diff --git a/update_engine.gyp b/update_engine.gyp
index 89e0b7b..78485d5 100644
--- a/update_engine.gyp
+++ b/update_engine.gyp
@@ -85,7 +85,6 @@
'dbus-1',
'dbus-glib-1',
'glib-2.0',
- 'gthread-2.0',
'libchrome-<(libbase_ver)',
'libchromeos-<(libbase_ver)',
'libcrypto',