update_engine: Replace gthread with libchrome equivalents.

This patch replaces the gthread usage in the multi-threaded full
payload generator with equivalent functions from libchrome. In the
new code, a thread-pool is used instead of creating one thread per
work and waiting for the threads to finish.

BUG=chromium:499886
TEST=unittests still pass.

Change-Id: I00fd56de2a789b5f007f1a4ab88680809bbeb5f0
Reviewed-on: https://chromium-review.googlesource.com/287635
Trybot-Ready: Alex Deymo <deymo@chromium.org>
Tested-by: Alex Deymo <deymo@chromium.org>
Reviewed-by: Gilad Arnold <garnold@chromium.org>
Commit-Queue: Alex Deymo <deymo@chromium.org>
diff --git a/payload_generator/full_update_generator.cc b/payload_generator/full_update_generator.cc
index b1e479d..a546081 100644
--- a/payload_generator/full_update_generator.cc
+++ b/payload_generator/full_update_generator.cc
@@ -12,14 +12,15 @@
 #include <memory>
 
 #include <base/format_macros.h>
-#include <base/strings/stringprintf.h>
 #include <base/strings/string_util.h>
+#include <base/strings/stringprintf.h>
+#include <base/synchronization/lock.h>
+#include <base/threading/simple_thread.h>
+#include <chromeos/secure_blob.h>
 
 #include "update_engine/bzip.h"
 #include "update_engine/utils.h"
 
-using std::deque;
-using std::shared_ptr;
 using std::string;
 using std::vector;
 
@@ -29,82 +30,128 @@
 
 const size_t kDefaultFullChunkSize = 1024 * 1024;  // 1 MiB
 
-// This class encapsulates a full update chunk processing thread. The processor
-// reads a chunk of data from the input file descriptor and compresses it. The
-// processor needs to be started through Start() then waited on through Wait().
-class ChunkProcessor {
+class BlobFileWriter {
  public:
-  // Read a chunk of |size| bytes from |fd| starting at offset |offset|.
-  ChunkProcessor(int fd, off_t offset, size_t size)
-      : thread_(nullptr),
-        fd_(fd),
-        offset_(offset),
-        buffer_in_(size) {}
-  ~ChunkProcessor() { Wait(); }
+  // Create the BlobFileWriter object that will manage the blobs stored to
+  // |blob_fd| in a thread safe way. The number of |total_blobs| is the number
+  // of blobs that will be stored but is only used for logging purposes.
+  BlobFileWriter(int blob_fd, off_t* blob_file_size, size_t total_blobs)
+    : total_blobs_(total_blobs),
+      blob_fd_(blob_fd),
+      blob_file_size_(blob_file_size) {}
 
-  off_t offset() const { return offset_; }
-  const chromeos::Blob& buffer_in() const { return buffer_in_; }
-  const chromeos::Blob& buffer_compressed() const { return buffer_compressed_; }
-
-  // Starts the processor. Returns true on success, false on failure.
-  bool Start();
-
-  // Waits for the processor to complete. Returns true on success, false on
-  // failure.
-  bool Wait();
-
-  bool ShouldCompress() const {
-    return buffer_compressed_.size() < buffer_in_.size();
-  }
+  // Store the passed |blob| in the blob file. Returns the offset at which it
+  // was stored, or -1 in case of failure.
+  off_t StoreBlob(const chromeos::Blob& blob);
 
  private:
-  // Reads the input data into |buffer_in_| and compresses it into
-  // |buffer_compressed_|. Returns true on success, false otherwise.
-  bool ReadAndCompress();
-  static gpointer ReadAndCompressThread(gpointer data);
+  size_t total_blobs_;
+  size_t stored_blobs_{0};
 
-  GThread* thread_;
+  // The file and its size are protected with the |blob_mutex_|.
+  int blob_fd_;
+  off_t* blob_file_size_;
+
+  base::Lock blob_mutex_;
+
+  DISALLOW_COPY_AND_ASSIGN(BlobFileWriter);
+};
+
+off_t BlobFileWriter::StoreBlob(const chromeos::Blob& blob) {
+  base::AutoLock auto_lock(blob_mutex_);
+  if (!utils::WriteAll(blob_fd_, blob.data(), blob.size()))
+    return -1;
+
+  off_t result = *blob_file_size_;
+  *blob_file_size_ += blob.size();
+
+  stored_blobs_++;
+  if (total_blobs_ > 0 &&
+      (10 * (stored_blobs_ - 1) / total_blobs_) !=
+      (10 * stored_blobs_ / total_blobs_)) {
+    LOG(INFO) << (100 * stored_blobs_ / total_blobs_)
+              << "% complete " << stored_blobs_ << "/" << total_blobs_
+              << " ops (output size: " << *blob_file_size_ << ")";
+  }
+  return result;
+}
+
+// This class encapsulates a full update chunk processing thread work. The
+// processor reads a chunk of data from the input file descriptor and compresses
+// it. The processor will destroy itself when the work is done.
+class ChunkProcessor : public base::DelegateSimpleThread::Delegate {
+ public:
+  // Read a chunk of |size| bytes from |fd| starting at offset |offset|.
+  ChunkProcessor(int fd, off_t offset, size_t size,
+                 BlobFileWriter* blob_file, AnnotatedOperation* aop)
+    : fd_(fd),
+      offset_(offset),
+      size_(size),
+      blob_file_(blob_file),
+      aop_(aop) {}
+  // We use a default move constructor since all the data members are POD types.
+  ChunkProcessor(ChunkProcessor&&) = default;
+  ~ChunkProcessor() override = default;
+
+  // Overrides DelegateSimpleThread::Delegate.
+  // Run() handles the read from |fd| in a thread-safe way, and stores the
+  // new operation to generate the region starting at |offset| of size |size|
+  // in the output operation |aop|. The associated blob data is stored in
+  // |blob_fd| and |blob_file_size| is updated.
+  void Run() override;
+
+ private:
+  bool ProcessChunk();
+
+  // Stores the operation blob in the |blob_fd_| and updates the
+  // |blob_file_size| accordingly.
+  // This method is thread-safe since it uses a mutex to access the file.
+  // Returns the data offset where the data was written to.
+  off_t StoreBlob(const chromeos::Blob& blob);
+
+  // Work parameters.
   int fd_;
   off_t offset_;
-  chromeos::Blob buffer_in_;
-  chromeos::Blob buffer_compressed_;
+  size_t size_;
+  BlobFileWriter* blob_file_;
+  AnnotatedOperation* aop_;
 
   DISALLOW_COPY_AND_ASSIGN(ChunkProcessor);
 };
 
-bool ChunkProcessor::Start() {
-  // g_thread_create is deprecated since glib 2.32. Use
-  // g_thread_new instead.
-  thread_ = g_thread_try_new("chunk_proc", ReadAndCompressThread, this,
-                             nullptr);
-  TEST_AND_RETURN_FALSE(thread_ != nullptr);
-  return true;
-}
-
-bool ChunkProcessor::Wait() {
-  if (!thread_) {
-    return false;
+void ChunkProcessor::Run() {
+  if (!ProcessChunk()) {
+    LOG(ERROR) << "Error processing region at " << offset_ << " of size "
+               << size_;
   }
-  gpointer result = g_thread_join(thread_);
-  thread_ = nullptr;
-  TEST_AND_RETURN_FALSE(result == this);
-  return true;
 }
 
-gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) {
-  return reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ?
-      data : nullptr;
-}
-
-bool ChunkProcessor::ReadAndCompress() {
+bool ChunkProcessor::ProcessChunk() {
+  chromeos::Blob buffer_in_(size_);
+  chromeos::Blob op_blob;
   ssize_t bytes_read = -1;
   TEST_AND_RETURN_FALSE(utils::PReadAll(fd_,
                                         buffer_in_.data(),
                                         buffer_in_.size(),
                                         offset_,
                                         &bytes_read));
-  TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size()));
-  TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_));
+  TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(size_));
+  TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &op_blob));
+
+  DeltaArchiveManifest_InstallOperation_Type op_type =
+      DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ;
+
+  if (op_blob.size() >= buffer_in_.size()) {
+    // A REPLACE is cheaper than a REPLACE_BZ in this case.
+    op_blob = std::move(buffer_in_);
+    op_type = DeltaArchiveManifest_InstallOperation_Type_REPLACE;
+  }
+
+  off_t op_offset = blob_file_->StoreBlob(op_blob);
+  TEST_AND_RETURN_FALSE(op_offset >= 0);
+  aop_->op.set_data_offset(op_offset);
+  aop_->op.set_data_length(op_blob.size());
+  aop_->op.set_type(op_type);
   return true;
 }
 
@@ -112,13 +159,11 @@
 
 bool FullUpdateGenerator::GenerateOperations(
     const PayloadGenerationConfig& config,
-    int fd,
+    int data_file_fd,
     off_t* data_file_size,
     vector<AnnotatedOperation>* rootfs_ops,
     vector<AnnotatedOperation>* kernel_ops) {
   TEST_AND_RETURN_FALSE(config.Validate());
-  rootfs_ops->clear();
-  kernel_ops->clear();
 
   // FullUpdateGenerator requires a positive chunk_size, otherwise there will
   // be only one operation with the whole partition which should not be allowed.
@@ -137,81 +182,87 @@
   TEST_AND_RETURN_FALSE(full_chunk_size > 0);
   TEST_AND_RETURN_FALSE(full_chunk_size % config.block_size == 0);
 
+  TEST_AND_RETURN_FALSE(GenerateOperationsForPartition(
+      config.target.rootfs,
+      config.block_size,
+      full_chunk_size / config.block_size,
+      data_file_fd,
+      data_file_size,
+      rootfs_ops));
+  TEST_AND_RETURN_FALSE(GenerateOperationsForPartition(
+      config.target.kernel,
+      config.block_size,
+      full_chunk_size / config.block_size,
+      data_file_fd,
+      data_file_size,
+      kernel_ops));
+  return true;
+}
+
+bool FullUpdateGenerator::GenerateOperationsForPartition(
+    const PartitionConfig& new_part,
+    size_t block_size,
+    size_t chunk_blocks,
+    int data_file_fd,
+    off_t* data_file_size,
+    vector<AnnotatedOperation>* aops) {
   size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
-  LOG(INFO) << "Max threads: " << max_threads;
+  LOG(INFO) << "Compressing partition " << PartitionNameString(new_part.name)
+            << " from " << new_part.path << " splitting in chunks of "
+            << chunk_blocks << " blocks (" << block_size
+            << " bytes each) using " << max_threads << " threads";
 
-  const PartitionConfig* partitions[] = {
-      &config.target.rootfs,
-      &config.target.kernel};
+  int in_fd = open(new_part.path.c_str(), O_RDONLY, 0);
+  TEST_AND_RETURN_FALSE(in_fd >= 0);
+  ScopedFdCloser in_fd_closer(&in_fd);
 
-  for (int part_id = 0; part_id < 2; ++part_id) {
-    const PartitionConfig* partition = partitions[part_id];
-    LOG(INFO) << "compressing " << partition->path;
-    int in_fd = open(partition->path.c_str(), O_RDONLY, 0);
-    TEST_AND_RETURN_FALSE(in_fd >= 0);
-    ScopedFdCloser in_fd_closer(&in_fd);
-    deque<shared_ptr<ChunkProcessor>> threads;
-    int last_progress_update = INT_MIN;
-    size_t bytes_left = partition->size, counter = 0, offset = 0;
-    while (bytes_left > 0 || !threads.empty()) {
-      // Check and start new chunk processors if possible.
-      while (threads.size() < max_threads && bytes_left > 0) {
-        size_t this_chunk_bytes = std::min(bytes_left, full_chunk_size);
-        shared_ptr<ChunkProcessor> processor(
-            new ChunkProcessor(in_fd, offset, this_chunk_bytes));
-        threads.push_back(processor);
-        TEST_AND_RETURN_FALSE(processor->Start());
-        bytes_left -= this_chunk_bytes;
-        offset += this_chunk_bytes;
-      }
+  // We potentially have all the ChunkProcessors in memory but only
+  // |max_threads| will actually hold a block in memory while we process.
+  size_t partition_blocks = new_part.size / block_size;
+  size_t num_chunks = (partition_blocks + chunk_blocks - 1) / chunk_blocks;
+  aops->resize(num_chunks);
+  vector<ChunkProcessor> chunk_processors;
+  chunk_processors.reserve(num_chunks);
 
-      // Need to wait for a chunk processor to complete and process its output
-      // before spawning new processors.
-      shared_ptr<ChunkProcessor> processor = threads.front();
-      threads.pop_front();
-      TEST_AND_RETURN_FALSE(processor->Wait());
+  BlobFileWriter blob_file(data_file_fd, data_file_size, num_chunks);
+  const string part_name_str = PartitionNameString(new_part.name);
 
-      DeltaArchiveManifest_InstallOperation* op = nullptr;
-      if (part_id == 0) {
-        rootfs_ops->emplace_back();
-        rootfs_ops->back().name =
-            base::StringPrintf("<rootfs-operation-%" PRIuS ">", counter++);
-        op = &rootfs_ops->back().op;
-      } else {
-        kernel_ops->emplace_back();
-        kernel_ops->back().name =
-            base::StringPrintf("<kernel-operation-%" PRIuS ">", counter++);
-        op = &kernel_ops->back().op;
-      }
+  for (size_t i = 0; i < num_chunks; ++i) {
+    size_t start_block = i * chunk_blocks;
+    // The last chunk could be smaller.
+    size_t num_blocks = std::min(chunk_blocks,
+                                 partition_blocks - i * chunk_blocks);
 
-      const bool compress = processor->ShouldCompress();
-      const chromeos::Blob& use_buf =
-          compress ? processor->buffer_compressed() : processor->buffer_in();
-      op->set_type(compress ?
-                   DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ :
-                   DeltaArchiveManifest_InstallOperation_Type_REPLACE);
-      op->set_data_offset(*data_file_size);
-      TEST_AND_RETURN_FALSE(utils::WriteAll(fd, use_buf.data(),
-                                            use_buf.size()));
-      *data_file_size += use_buf.size();
-      op->set_data_length(use_buf.size());
-      Extent* dst_extent = op->add_dst_extents();
-      dst_extent->set_start_block(processor->offset() / config.block_size);
-      dst_extent->set_num_blocks(
-          processor->buffer_in().size() / config.block_size);
+    // Preset all the static information about the operations. The
+    // ChunkProcessor will set the rest.
+    AnnotatedOperation* aop = aops->data() + i;
+    aop->name = base::StringPrintf("<%s-operation-%" PRIuS ">",
+                                   part_name_str.c_str(), i);
+    Extent* dst_extent = aop->op.add_dst_extents();
+    dst_extent->set_start_block(start_block);
+    dst_extent->set_num_blocks(num_blocks);
 
-      int progress = static_cast<int>(
-          (processor->offset() + processor->buffer_in().size()) * 100.0 /
-          partition->size);
-      if (last_progress_update < progress &&
-          (last_progress_update + 10 <= progress || progress == 100)) {
-        LOG(INFO) << progress << "% complete (output size: "
-                  << *data_file_size << ")";
-        last_progress_update = progress;
-      }
-    }
+    chunk_processors.emplace_back(
+        in_fd,
+        static_cast<off_t>(start_block) * block_size,
+        num_blocks * block_size,
+        &blob_file,
+        aop);
   }
 
+  // Thread pool used for worker threads.
+  base::DelegateSimpleThreadPool thread_pool("full-update-generator",
+                                             max_threads);
+  thread_pool.Start();
+  for (ChunkProcessor& processor : chunk_processors)
+    thread_pool.AddWork(&processor);
+  thread_pool.JoinAll();
+  // All the operations must have a type set at this point. Otherwise, a
+  // ChunkProcessor failed to complete.
+  for (const AnnotatedOperation& aop : *aops) {
+    if (!aop.op.has_type())
+      return false;
+  }
   return true;
 }
 
diff --git a/payload_generator/full_update_generator.h b/payload_generator/full_update_generator.h
index b4d1135..8d7c62e 100644
--- a/payload_generator/full_update_generator.h
+++ b/payload_generator/full_update_generator.h
@@ -19,6 +19,7 @@
  public:
   FullUpdateGenerator() = default;
 
+  // OperationsGenerator override.
   // Creates a full update for the target image defined in |config|. |config|
   // must be a valid payload generation configuration for a full payload.
   // Populates |rootfs_ops| and |kernel_ops|, with data about the update
@@ -31,6 +32,21 @@
       std::vector<AnnotatedOperation>* rootfs_ops,
       std::vector<AnnotatedOperation>* kernel_ops) override;
 
+  // Generates the list of operations to update inplace from the partition
+  // |old_part| to |new_part|. The |partition_size| should be at least
+  // |new_part.size| and any extra space there could be used as scratch space.
+  // The operations generated will not write more than |chunk_blocks| blocks.
+  // The new operations will create blobs in |data_file_fd| and update
+  // the file size pointed by |data_file_size| if needed.
+  // On success, stores the new operations in |aops| and returns true.
+  static bool GenerateOperationsForPartition(
+      const PartitionConfig& new_part,
+      size_t block_size,
+      size_t chunk_blocks,
+      int data_file_fd,
+      off_t* data_file_size,
+      std::vector<AnnotatedOperation>* aops);
+
  private:
   DISALLOW_COPY_AND_ASSIGN(FullUpdateGenerator);
 };
diff --git a/payload_generator/generate_delta_main.cc b/payload_generator/generate_delta_main.cc
index 469952b..fee3dbb 100644
--- a/payload_generator/generate_delta_main.cc
+++ b/payload_generator/generate_delta_main.cc
@@ -302,7 +302,7 @@
   logging::LoggingSettings log_settings;
   log_settings.log_file     = "delta_generator.log";
   log_settings.logging_dest = logging::LOG_TO_SYSTEM_DEBUG_LOG;
-  log_settings.lock_log     = logging::DONT_LOCK_LOG_FILE;
+  log_settings.lock_log     = logging::LOCK_LOG_FILE;
   log_settings.delete_old   = logging::APPEND_TO_OLD_LOG_FILE;
 
   logging::InitLogging(log_settings);
diff --git a/update_engine.gyp b/update_engine.gyp
index 89e0b7b..78485d5 100644
--- a/update_engine.gyp
+++ b/update_engine.gyp
@@ -85,7 +85,6 @@
           'dbus-1',
           'dbus-glib-1',
           'glib-2.0',
-          'gthread-2.0',
           'libchrome-<(libbase_ver)',
           'libchromeos-<(libbase_ver)',
           'libcrypto',