update_engine: Change OperationsGenerator to use BlobFileWriter

BUG=chromium:517280
TEST=Unit test for BlobFileWriter

Change-Id: Ib49925676331acee97ff6b4cec38a81ca8b157a1
Reviewed-on: https://chromium-review.googlesource.com/291441
Tested-by: Sen Jiang <senj@chromium.org>
Reviewed-by: Alex Deymo <deymo@chromium.org>
Commit-Queue: Sen Jiang <senj@chromium.org>
diff --git a/payload_generator/ab_generator.cc b/payload_generator/ab_generator.cc
index 1f7b14e..8d2736a 100644
--- a/payload_generator/ab_generator.cc
+++ b/payload_generator/ab_generator.cc
@@ -22,8 +22,7 @@
 
 bool ABGenerator::GenerateOperations(
     const PayloadGenerationConfig& config,
-    int data_file_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     vector<AnnotatedOperation>* rootfs_ops,
     vector<AnnotatedOperation>* kernel_ops) {
 
@@ -38,8 +37,7 @@
       config.target.rootfs,
       hard_chunk_blocks,
       soft_chunk_blocks,
-      data_file_fd,
-      data_file_size,
+      blob_file,
       true));  // src_ops_allowed
   LOG(INFO) << "done reading normal files";
 
@@ -50,19 +48,16 @@
       config.target.kernel,
       hard_chunk_blocks,
       soft_chunk_blocks,
-      data_file_fd,
-      data_file_size,
+      blob_file,
       true));  // src_ops_allowed
   LOG(INFO) << "done reading kernel";
 
   TEST_AND_RETURN_FALSE(FragmentOperations(rootfs_ops,
                                            config.target.rootfs.path,
-                                           data_file_fd,
-                                           data_file_size));
+                                           blob_file));
   TEST_AND_RETURN_FALSE(FragmentOperations(kernel_ops,
                                            config.target.kernel.path,
-                                           data_file_fd,
-                                           data_file_size));
+                                           blob_file));
   SortOperationsByDestination(rootfs_ops);
   SortOperationsByDestination(kernel_ops);
 
@@ -77,13 +72,11 @@
   TEST_AND_RETURN_FALSE(MergeOperations(rootfs_ops,
                                         merge_chunk_blocks,
                                         config.target.rootfs.path,
-                                        data_file_fd,
-                                        data_file_size));
+                                        blob_file));
   TEST_AND_RETURN_FALSE(MergeOperations(kernel_ops,
                                         merge_chunk_blocks,
                                         config.target.kernel.path,
-                                        data_file_fd,
-                                        data_file_size));
+                                        blob_file));
   return true;
 }
 
@@ -95,8 +88,7 @@
 bool ABGenerator::FragmentOperations(
     vector<AnnotatedOperation>* aops,
     const string& target_part_path,
-    int data_fd,
-    off_t* data_file_size) {
+    BlobFileWriter* blob_file) {
   vector<AnnotatedOperation> fragmented_aops;
   for (const AnnotatedOperation& aop : *aops) {
     if (aop.op.type() ==
@@ -107,8 +99,8 @@
                (aop.op.type() ==
                 DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ)) {
       TEST_AND_RETURN_FALSE(SplitReplaceOrReplaceBz(aop, &fragmented_aops,
-                                                    target_part_path, data_fd,
-                                                    data_file_size));
+                                                    target_part_path,
+                                                    blob_file));
     } else {
       fragmented_aops.push_back(aop);
     }
@@ -175,8 +167,7 @@
     const AnnotatedOperation& original_aop,
     vector<AnnotatedOperation>* result_aops,
     const string& target_part_path,
-    int data_fd,
-    off_t* data_file_size) {
+    BlobFileWriter* blob_file) {
   DeltaArchiveManifest_InstallOperation original_op = original_aop.op;
   const bool is_replace =
       original_op.type() == DeltaArchiveManifest_InstallOperation_Type_REPLACE;
@@ -204,8 +195,8 @@
     AnnotatedOperation new_aop;
     new_aop.op = new_op;
     new_aop.name = base::StringPrintf("%s:%d", original_aop.name.c_str(), i);
-    TEST_AND_RETURN_FALSE(AddDataAndSetType(&new_aop, target_part_path, data_fd,
-                                            data_file_size));
+    TEST_AND_RETURN_FALSE(AddDataAndSetType(&new_aop, target_part_path,
+                                            blob_file));
 
     result_aops->push_back(new_aop);
   }
@@ -215,8 +206,7 @@
 bool ABGenerator::MergeOperations(vector<AnnotatedOperation>* aops,
                                   size_t chunk_blocks,
                                   const string& target_part_path,
-                                  int data_fd,
-                                  off_t* data_file_size) {
+                                  BlobFileWriter* blob_file) {
   vector<AnnotatedOperation> new_aops;
   for (const AnnotatedOperation& curr_aop : *aops) {
     if (new_aops.empty()) {
@@ -288,7 +278,7 @@
          curr_aop.op.type() ==
             DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ)) {
       TEST_AND_RETURN_FALSE(AddDataAndSetType(&curr_aop, target_part_path,
-                                              data_fd, data_file_size));
+                                              blob_file));
     }
   }
 
@@ -298,8 +288,7 @@
 
 bool ABGenerator::AddDataAndSetType(AnnotatedOperation* aop,
                                     const string& target_part_path,
-                                    int data_fd,
-                                    off_t* data_file_size) {
+                                    BlobFileWriter* blob_file) {
   TEST_AND_RETURN_FALSE(
       aop->op.type() == DeltaArchiveManifest_InstallOperation_Type_REPLACE ||
       aop->op.type() == DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ);
@@ -327,26 +316,11 @@
     data_p = &data;
   }
 
-  // If the operation already points to a data blob, check whether it's
-  // identical to the new one, in which case don't add it.
-  if (aop->op.type() == new_op_type &&
-      aop->op.data_length() == data_p->size()) {
-    chromeos::Blob current_data(data_p->size());
-    ssize_t bytes_read;
-    TEST_AND_RETURN_FALSE(utils::PReadAll(data_fd,
-                                          current_data.data(),
-                                          aop->op.data_length(),
-                                          aop->op.data_offset(),
-                                          &bytes_read));
-    TEST_AND_RETURN_FALSE(bytes_read ==
-                          static_cast<ssize_t>(aop->op.data_length()));
-    if (current_data == *data_p)
-      data_p = nullptr;
-  }
-
-  if (data_p) {
+  // If the operation doesn't point to a data blob, then we add it.
+  if (aop->op.type() != new_op_type ||
+      aop->op.data_length() != data_p->size()) {
     aop->op.set_type(new_op_type);
-    aop->SetOperationBlob(data_p, data_fd, data_file_size);
+    aop->SetOperationBlob(data_p, blob_file);
   }
 
   return true;
diff --git a/payload_generator/ab_generator.h b/payload_generator/ab_generator.h
index 6450776..1aa74af 100644
--- a/payload_generator/ab_generator.h
+++ b/payload_generator/ab_generator.h
@@ -12,6 +12,7 @@
 #include <chromeos/secure_blob.h>
 
 #include "update_engine/payload_constants.h"
+#include "update_engine/payload_generator/blob_file_writer.h"
 #include "update_engine/payload_generator/extent_utils.h"
 #include "update_engine/payload_generator/filesystem_interface.h"
 #include "update_engine/payload_generator/operations_generator.h"
@@ -34,13 +35,10 @@
   // write the new image on the target partition, also possibly in random order.
   // The rootfs operations are stored in |rootfs_ops| and should be executed in
   // that order. The kernel operations are stored in |kernel_ops|. All
-  // the offsets in the operations reference the data written to |data_file_fd|.
-  // The total amount of data written to that file is stored in
-  // |data_file_size|.
+  // the offsets in the operations reference the data written to |blob_file|.
   bool GenerateOperations(
       const PayloadGenerationConfig& config,
-      int data_file_fd,
-      off_t* data_file_size,
+      BlobFileWriter* blob_file,
       std::vector<AnnotatedOperation>* rootfs_ops,
       std::vector<AnnotatedOperation>* kernel_ops) override;
 
@@ -50,13 +48,10 @@
   // fragmented except BSDIFF and SOURCE_BSDIFF operations.
   // The |target_rootfs_part| is the filename of the new image, where the
   // destination extents refer to. The blobs of the operations in |aops| should
-  // reference the file |data_fd| whose initial size is |*data_file_size|. The
-  // file contents and the value pointed by |data_file_size| are updated if
-  // needed.
+  // reference |blob_file|. |blob_file| are updated if needed.
   static bool FragmentOperations(std::vector<AnnotatedOperation>* aops,
                                  const std::string& target_rootfs_part,
-                                 int data_fd,
-                                 off_t* data_file_size);
+                                 BlobFileWriter* blob_file);
 
   // Takes a vector of AnnotatedOperations |aops| and sorts them by the first
   // start block in their destination extents. Sets |aops| to a vector of the
@@ -84,8 +79,7 @@
       const AnnotatedOperation& original_aop,
       std::vector<AnnotatedOperation>* result_aops,
       const std::string& target_part,
-      int data_fd,
-      off_t* data_file_size);
+      BlobFileWriter* blob_file);
 
   // Takes a sorted (by first destination extent) vector of operations |aops|
   // and merges SOURCE_COPY, REPLACE, and REPLACE_BZ operations in that vector.
@@ -98,21 +92,19 @@
   static bool MergeOperations(std::vector<AnnotatedOperation>* aops,
                               size_t chunk_blocks,
                               const std::string& target_part,
-                              int data_fd,
-                              off_t* data_file_size);
+                              BlobFileWriter* blob_file);
 
  private:
   // Adds the data payload for a REPLACE/REPLACE_BZ operation |aop| by reading
   // its output extents from |target_part_path| and appending a corresponding
   // data blob to |data_fd|. The blob will be compressed if this is smaller than
   // the uncompressed form, and the operation type will be set accordingly.
-  // |*data_file_size| will be updated as well. If the operation happens to have
-  // the right type and already points to a data blob, we check whether its
-  // content is identical to the new one, in which case nothing is written.
+  // |*blob_file| will be updated as well. If the operation happens to have
+  // the right type and already points to a data blob, nothing is written.
+  // Caller should only set type and data blob if it's valid.
   static bool AddDataAndSetType(AnnotatedOperation* aop,
                                 const std::string& target_part_path,
-                                int data_fd,
-                                off_t* data_file_size);
+                                BlobFileWriter* blob_file);
 
   DISALLOW_COPY_AND_ASSIGN(ABGenerator);
 };
diff --git a/payload_generator/ab_generator_unittest.cc b/payload_generator/ab_generator_unittest.cc
index 466b699..c87324c 100644
--- a/payload_generator/ab_generator_unittest.cc
+++ b/payload_generator/ab_generator_unittest.cc
@@ -105,11 +105,12 @@
   EXPECT_TRUE(utils::WriteFile(data_path.c_str(), op_blob.data(),
                                op_blob.size()));
   off_t data_file_size = op_blob.size();
+  BlobFileWriter blob_file(data_fd, &data_file_size);
 
   // Split the operation.
   vector<AnnotatedOperation> result_ops;
   ASSERT_TRUE(ABGenerator::SplitReplaceOrReplaceBz(
-          aop, &result_ops, part_path, data_fd, &data_file_size));
+          aop, &result_ops, part_path, &blob_file));
 
   // Check the result.
   DeltaArchiveManifest_InstallOperation_Type expected_type =
@@ -275,10 +276,11 @@
   EXPECT_TRUE(utils::WriteFile(data_path.c_str(), blob_data.data(),
                                blob_data.size()));
   off_t data_file_size = blob_data.size();
+  BlobFileWriter blob_file(data_fd, &data_file_size);
 
   // Merge the operations.
   EXPECT_TRUE(ABGenerator::MergeOperations(
-      &aops, 5, part_path, data_fd, &data_file_size));
+      &aops, 5, part_path, &blob_file));
 
   // Check the result.
   DeltaArchiveManifest_InstallOperation_Type expected_op_type =
@@ -471,7 +473,8 @@
   third_aop.name = "3";
   aops.push_back(third_aop);
 
-  EXPECT_TRUE(ABGenerator::MergeOperations(&aops, 5, "", 0, nullptr));
+  BlobFileWriter blob_file(0, nullptr);
+  EXPECT_TRUE(ABGenerator::MergeOperations(&aops, 5, "", &blob_file));
 
   EXPECT_EQ(aops.size(), 1);
   DeltaArchiveManifest_InstallOperation first_result_op = aops[0].op;
@@ -547,7 +550,8 @@
   fourth_aop.op = fourth_op;
   aops.push_back(fourth_aop);
 
-  EXPECT_TRUE(ABGenerator::MergeOperations(&aops, 4, "", 0, nullptr));
+  BlobFileWriter blob_file(0, nullptr);
+  EXPECT_TRUE(ABGenerator::MergeOperations(&aops, 4, "", &blob_file));
 
   // No operations were merged, the number of ops is the same.
   EXPECT_EQ(aops.size(), 4);
diff --git a/payload_generator/annotated_operation.cc b/payload_generator/annotated_operation.cc
index 2aa79fc..2a0a0de 100644
--- a/payload_generator/annotated_operation.cc
+++ b/payload_generator/annotated_operation.cc
@@ -25,15 +25,13 @@
 }
 }  // namespace
 
-bool AnnotatedOperation::SetOperationBlob(chromeos::Blob* blob, int data_fd,
-                                          off_t* data_file_size) {
-  TEST_AND_RETURN_FALSE(utils::PWriteAll(data_fd,
-                                         blob->data(),
-                                         blob->size(),
-                                         *data_file_size));
+bool AnnotatedOperation::SetOperationBlob(chromeos::Blob* blob,
+                                          BlobFileWriter* blob_file) {
   op.set_data_length(blob->size());
-  op.set_data_offset(*data_file_size);
-  *data_file_size += blob->size();
+  off_t data_offset = blob_file->StoreBlob(*blob);
+  if (data_offset == -1)
+    return false;
+  op.set_data_offset(data_offset);
   return true;
 }
 
diff --git a/payload_generator/annotated_operation.h b/payload_generator/annotated_operation.h
index bacc0ab..2a88bef 100644
--- a/payload_generator/annotated_operation.h
+++ b/payload_generator/annotated_operation.h
@@ -9,6 +9,8 @@
 #include <string>
 
 #include <chromeos/secure_blob.h>
+
+#include "update_engine/payload_generator/blob_file_writer.h"
 #include "update_engine/update_metadata.pb.h"
 
 namespace chromeos_update_engine {
@@ -24,8 +26,7 @@
   // Writes |blob| to the end of |data_fd|, and updates |data_file_size| to
   // match the new size of |data_fd|. It sets the data_offset and data_length
   // in AnnotatedOperation to match the offset and size of |blob| in |data_fd|.
-  bool SetOperationBlob(chromeos::Blob* blob, int data_fd,
-                        off_t* data_file_size);
+  bool SetOperationBlob(chromeos::Blob* blob, BlobFileWriter* blob_file);
 };
 
 // For logging purposes.
diff --git a/payload_generator/blob_file_writer.cc b/payload_generator/blob_file_writer.cc
new file mode 100644
index 0000000..2032e14
--- /dev/null
+++ b/payload_generator/blob_file_writer.cc
@@ -0,0 +1,34 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "update_engine/payload_generator/blob_file_writer.h"
+
+#include "update_engine/utils.h"
+
+namespace chromeos_update_engine {
+
+off_t BlobFileWriter::StoreBlob(const chromeos::Blob& blob) {
+  base::AutoLock auto_lock(blob_mutex_);
+  if (!utils::PWriteAll(blob_fd_, blob.data(), blob.size(), *blob_file_size_))
+    return -1;
+
+  off_t result = *blob_file_size_;
+  *blob_file_size_ += blob.size();
+
+  stored_blobs_++;
+  if (total_blobs_ > 0 &&
+      (10 * (stored_blobs_ - 1) / total_blobs_) !=
+      (10 * stored_blobs_ / total_blobs_)) {
+    LOG(INFO) << (100 * stored_blobs_ / total_blobs_)
+              << "% complete " << stored_blobs_ << "/" << total_blobs_
+              << " ops (output size: " << *blob_file_size_ << ")";
+  }
+  return result;
+}
+
+void BlobFileWriter::SetTotalBlobs(size_t total_blobs) {
+  total_blobs_ = total_blobs;
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_generator/blob_file_writer.h b/payload_generator/blob_file_writer.h
new file mode 100644
index 0000000..be8fd0d
--- /dev/null
+++ b/payload_generator/blob_file_writer.h
@@ -0,0 +1,46 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef UPDATE_ENGINE_PAYLOAD_GENERATOR_BLOB_FILE_WRITER_H_
+#define UPDATE_ENGINE_PAYLOAD_GENERATOR_BLOB_FILE_WRITER_H_
+
+#include <base/macros.h>
+
+#include <base/synchronization/lock.h>
+#include <chromeos/secure_blob.h>
+
+namespace chromeos_update_engine {
+
+class BlobFileWriter {
+ public:
+  // Create the BlobFileWriter object that will manage the blobs stored to
+  // |blob_fd| in a thread safe way.
+  BlobFileWriter(int blob_fd, off_t* blob_file_size)
+    : blob_fd_(blob_fd),
+      blob_file_size_(blob_file_size) {}
+
+  // Store the passed |blob| in the blob file. Returns the offset at which it
+  // was stored, or -1 in case of failure.
+  off_t StoreBlob(const chromeos::Blob& blob);
+
+  // The number of |total_blobs| is the number of blobs that will be stored but
+  // is only used for logging purposes. If not set, logging will be skipped.
+  void SetTotalBlobs(size_t total_blobs);
+
+ private:
+  size_t total_blobs_{0};
+  size_t stored_blobs_{0};
+
+  // The file and its size are protected with the |blob_mutex_|.
+  int blob_fd_;
+  off_t* blob_file_size_;
+
+  base::Lock blob_mutex_;
+
+  DISALLOW_COPY_AND_ASSIGN(BlobFileWriter);
+};
+
+}  // namespace chromeos_update_engine
+
+#endif  // UPDATE_ENGINE_PAYLOAD_GENERATOR_BLOB_FILE_WRITER_H_
diff --git a/payload_generator/blob_file_writer_unittest.cc b/payload_generator/blob_file_writer_unittest.cc
new file mode 100644
index 0000000..c845f97
--- /dev/null
+++ b/payload_generator/blob_file_writer_unittest.cc
@@ -0,0 +1,47 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "update_engine/payload_generator/blob_file_writer.h"
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "update_engine/test_utils.h"
+#include "update_engine/utils.h"
+
+using chromeos_update_engine::test_utils::FillWithData;
+using std::string;
+
+namespace chromeos_update_engine {
+
+class BlobFileWriterTest : public ::testing::Test {};
+
+TEST(BlobFileWriterTest, SimpleTest) {
+  string blob_path;
+  int blob_fd;
+  EXPECT_TRUE(utils::MakeTempFile("BlobFileWriterTest.XXXXXX",
+                                  &blob_path,
+                                  &blob_fd));
+  off_t blob_file_size = 0;
+  BlobFileWriter blob_file(blob_fd, &blob_file_size);
+
+  off_t blob_size = 1024;
+  chromeos::Blob blob(blob_size);
+  FillWithData(&blob);
+  EXPECT_EQ(0, blob_file.StoreBlob(blob));
+  EXPECT_EQ(blob_size, blob_file.StoreBlob(blob));
+
+  chromeos::Blob stored_blob(blob_size);
+  ssize_t bytes_read;
+  ASSERT_TRUE(utils::PReadAll(blob_fd,
+                              stored_blob.data(),
+                              blob_size,
+                              0,
+                              &bytes_read));
+  EXPECT_EQ(bytes_read, blob_size);
+  EXPECT_EQ(blob, stored_blob);
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_generator/delta_diff_generator.cc b/payload_generator/delta_diff_generator.cc
index dbb63a8..ca8fb38 100644
--- a/payload_generator/delta_diff_generator.cc
+++ b/payload_generator/delta_diff_generator.cc
@@ -21,6 +21,7 @@
 #include "update_engine/delta_performer.h"
 #include "update_engine/payload_constants.h"
 #include "update_engine/payload_generator/ab_generator.h"
+#include "update_engine/payload_generator/blob_file_writer.h"
 #include "update_engine/payload_generator/delta_diff_utils.h"
 #include "update_engine/payload_generator/full_update_generator.h"
 #include "update_engine/payload_generator/inplace_generator.h"
@@ -115,10 +116,10 @@
 
   {
     ScopedFdCloser data_file_fd_closer(&data_file_fd);
+    BlobFileWriter blob_file(data_file_fd, &data_file_size);
     // Generate the operations using the strategy we selected above.
     TEST_AND_RETURN_FALSE(strategy->GenerateOperations(config,
-                                                       data_file_fd,
-                                                       &data_file_size,
+                                                       &blob_file,
                                                        &rootfs_ops,
                                                        &kernel_ops));
   }
diff --git a/payload_generator/delta_diff_utils.cc b/payload_generator/delta_diff_utils.cc
index 53396b0..68462ae 100644
--- a/payload_generator/delta_diff_utils.cc
+++ b/payload_generator/delta_diff_utils.cc
@@ -144,8 +144,7 @@
     const PartitionConfig& new_part,
     ssize_t hard_chunk_blocks,
     size_t soft_chunk_blocks,
-    int data_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     bool src_ops_allowed) {
   ExtentRanges old_visited_blocks;
   ExtentRanges new_visited_blocks;
@@ -158,8 +157,7 @@
       new_part.fs_interface->GetBlockCount(),
       soft_chunk_blocks,
       src_ops_allowed,
-      data_fd,
-      data_file_size,
+      blob_file,
       &old_visited_blocks,
       &new_visited_blocks));
 
@@ -217,8 +215,7 @@
         new_file_extents,
         new_file.name,  // operation name
         hard_chunk_blocks,
-        data_fd,
-        data_file_size,
+        blob_file,
         src_ops_allowed));
   }
   // Process all the blocks not included in any file. We provided all the unused
@@ -249,8 +246,7 @@
       new_unvisited,
       "<non-file-data>",  // operation name
       soft_chunk_blocks,
-      data_fd,
-      data_file_size,
+      blob_file,
       src_ops_allowed));
 
   return true;
@@ -264,8 +260,7 @@
     size_t new_num_blocks,
     ssize_t chunk_blocks,
     bool src_ops_allowed,
-    int data_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     ExtentRanges* old_visited_blocks,
     ExtentRanges* new_visited_blocks) {
   vector<BlockMapping::BlockId> old_block_ids;
@@ -348,8 +343,7 @@
         vector<Extent>{extent},  // new_extents
         "<zeros>",
         chunk_blocks,
-        data_fd,
-        data_file_size,
+        blob_file,
         src_ops_allowed));
   }
   LOG(INFO) << "Produced " << (aops->size() - num_ops) << " operations for "
@@ -410,8 +404,7 @@
     const vector<Extent>& new_extents,
     const string& name,
     ssize_t chunk_blocks,
-    int data_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     bool src_ops_allowed) {
   chromeos::Blob data;
   DeltaArchiveManifest_InstallOperation operation;
@@ -468,17 +461,6 @@
       }
     }
 
-    // Write the data
-    if (operation.type() != DeltaArchiveManifest_InstallOperation_Type_MOVE &&
-        operation.type() !=
-            DeltaArchiveManifest_InstallOperation_Type_SOURCE_COPY) {
-      operation.set_data_offset(*data_file_size);
-      operation.set_data_length(data.size());
-    }
-
-    TEST_AND_RETURN_FALSE(utils::WriteAll(data_fd, data.data(), data.size()));
-    *data_file_size += data.size();
-
     // Now, insert into the list of operations.
     AnnotatedOperation aop;
     aop.name = name;
@@ -487,6 +469,15 @@
                                     name.c_str(), block_offset / chunk_blocks);
     }
     aop.op = operation;
+
+    // Write the data
+    if (operation.type() != DeltaArchiveManifest_InstallOperation_Type_MOVE &&
+        operation.type() !=
+            DeltaArchiveManifest_InstallOperation_Type_SOURCE_COPY) {
+      TEST_AND_RETURN_FALSE(aop.SetOperationBlob(&data, blob_file));
+    } else {
+      TEST_AND_RETURN_FALSE(blob_file->StoreBlob(data) != -1);
+    }
     aops->emplace_back(aop);
   }
   return true;
diff --git a/payload_generator/delta_diff_utils.h b/payload_generator/delta_diff_utils.h
index dc837b0..e3acdf4 100644
--- a/payload_generator/delta_diff_utils.h
+++ b/payload_generator/delta_diff_utils.h
@@ -35,8 +35,7 @@
                         const PartitionConfig& new_part,
                         ssize_t hard_chunk_blocks,
                         size_t soft_chunk_blocks,
-                        int data_fd,
-                        off_t* data_file_size,
+                        BlobFileWriter* blob_file,
                         bool src_ops_allowed);
 
 // Create operations in |aops| for identical blocks that moved around in the old
@@ -57,8 +56,7 @@
                              size_t new_num_blocks,
                              ssize_t chunk_blocks,
                              bool src_ops_allowed,
-                             int data_fd,
-                             off_t* data_file_size,
+                             BlobFileWriter* blob_file,
                              ExtentRanges* old_visited_blocks,
                              ExtentRanges* new_visited_blocks);
 
@@ -77,8 +75,7 @@
                    const std::vector<Extent>& new_extents,
                    const std::string& name,
                    ssize_t chunk_blocks,
-                   int data_fd,
-                   off_t* data_file_size,
+                   BlobFileWriter* blob_file,
                    bool src_ops_allowed);
 
 // Reads the blocks |old_extents| from |old_part| (if it exists) and the
diff --git a/payload_generator/delta_diff_utils_unittest.cc b/payload_generator/delta_diff_utils_unittest.cc
index 9566fec..d7243c0 100644
--- a/payload_generator/delta_diff_utils_unittest.cc
+++ b/payload_generator/delta_diff_utils_unittest.cc
@@ -115,6 +115,7 @@
   // members. This simply avoid repeating all the arguments that never change.
   bool RunDeltaMovedAndZeroBlocks(ssize_t chunk_blocks,
                                   bool src_ops_allowed) {
+    BlobFileWriter blob_file(blob_fd_, &blob_size_);
     return diff_utils::DeltaMovedAndZeroBlocks(
         &aops_,
         old_part_.path,
@@ -123,8 +124,7 @@
         new_part_.size / block_size_,
         chunk_blocks,
         src_ops_allowed,
-        blob_fd_,
-        &blob_size_,
+        &blob_file,
         &old_visited_blocks_,
         &new_visited_blocks_);
   }
diff --git a/payload_generator/full_update_generator.cc b/payload_generator/full_update_generator.cc
index a546081..9cf3cc1 100644
--- a/payload_generator/full_update_generator.cc
+++ b/payload_generator/full_update_generator.cc
@@ -30,52 +30,6 @@
 
 const size_t kDefaultFullChunkSize = 1024 * 1024;  // 1 MiB
 
-class BlobFileWriter {
- public:
-  // Create the BlobFileWriter object that will manage the blobs stored to
-  // |blob_fd| in a thread safe way. The number of |total_blobs| is the number
-  // of blobs that will be stored but is only used for logging purposes.
-  BlobFileWriter(int blob_fd, off_t* blob_file_size, size_t total_blobs)
-    : total_blobs_(total_blobs),
-      blob_fd_(blob_fd),
-      blob_file_size_(blob_file_size) {}
-
-  // Store the passed |blob| in the blob file. Returns the offset at which it
-  // was stored, or -1 in case of failure.
-  off_t StoreBlob(const chromeos::Blob& blob);
-
- private:
-  size_t total_blobs_;
-  size_t stored_blobs_{0};
-
-  // The file and its size are protected with the |blob_mutex_|.
-  int blob_fd_;
-  off_t* blob_file_size_;
-
-  base::Lock blob_mutex_;
-
-  DISALLOW_COPY_AND_ASSIGN(BlobFileWriter);
-};
-
-off_t BlobFileWriter::StoreBlob(const chromeos::Blob& blob) {
-  base::AutoLock auto_lock(blob_mutex_);
-  if (!utils::WriteAll(blob_fd_, blob.data(), blob.size()))
-    return -1;
-
-  off_t result = *blob_file_size_;
-  *blob_file_size_ += blob.size();
-
-  stored_blobs_++;
-  if (total_blobs_ > 0 &&
-      (10 * (stored_blobs_ - 1) / total_blobs_) !=
-      (10 * stored_blobs_ / total_blobs_)) {
-    LOG(INFO) << (100 * stored_blobs_ / total_blobs_)
-              << "% complete " << stored_blobs_ << "/" << total_blobs_
-              << " ops (output size: " << *blob_file_size_ << ")";
-  }
-  return result;
-}
-
 // This class encapsulates a full update chunk processing thread work. The
 // processor reads a chunk of data from the input file descriptor and compresses
 // it. The processor will destroy itself when the work is done.
@@ -159,8 +113,7 @@
 
 bool FullUpdateGenerator::GenerateOperations(
     const PayloadGenerationConfig& config,
-    int data_file_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     vector<AnnotatedOperation>* rootfs_ops,
     vector<AnnotatedOperation>* kernel_ops) {
   TEST_AND_RETURN_FALSE(config.Validate());
@@ -186,15 +139,13 @@
       config.target.rootfs,
       config.block_size,
       full_chunk_size / config.block_size,
-      data_file_fd,
-      data_file_size,
+      blob_file,
       rootfs_ops));
   TEST_AND_RETURN_FALSE(GenerateOperationsForPartition(
       config.target.kernel,
       config.block_size,
       full_chunk_size / config.block_size,
-      data_file_fd,
-      data_file_size,
+      blob_file,
       kernel_ops));
   return true;
 }
@@ -203,8 +154,7 @@
     const PartitionConfig& new_part,
     size_t block_size,
     size_t chunk_blocks,
-    int data_file_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     vector<AnnotatedOperation>* aops) {
   size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
   LOG(INFO) << "Compressing partition " << PartitionNameString(new_part.name)
@@ -223,8 +173,8 @@
   aops->resize(num_chunks);
   vector<ChunkProcessor> chunk_processors;
   chunk_processors.reserve(num_chunks);
+  blob_file->SetTotalBlobs(num_chunks);
 
-  BlobFileWriter blob_file(data_file_fd, data_file_size, num_chunks);
   const string part_name_str = PartitionNameString(new_part.name);
 
   for (size_t i = 0; i < num_chunks; ++i) {
@@ -246,7 +196,7 @@
         in_fd,
         static_cast<off_t>(start_block) * block_size,
         num_blocks * block_size,
-        &blob_file,
+        blob_file,
         aop);
   }
 
diff --git a/payload_generator/full_update_generator.h b/payload_generator/full_update_generator.h
index 8d7c62e..4d75b22 100644
--- a/payload_generator/full_update_generator.h
+++ b/payload_generator/full_update_generator.h
@@ -10,6 +10,7 @@
 
 #include <base/macros.h>
 
+#include "update_engine/payload_generator/blob_file_writer.h"
 #include "update_engine/payload_generator/operations_generator.h"
 #include "update_engine/payload_generator/payload_generation_config.h"
 
@@ -27,8 +28,7 @@
   // |data_file_size| as it does.
   bool GenerateOperations(
       const PayloadGenerationConfig& config,
-      int data_file_fd,
-      off_t* data_file_size,
+      BlobFileWriter* blob_file,
       std::vector<AnnotatedOperation>* rootfs_ops,
       std::vector<AnnotatedOperation>* kernel_ops) override;
 
@@ -43,8 +43,7 @@
       const PartitionConfig& new_part,
       size_t block_size,
       size_t chunk_blocks,
-      int data_file_fd,
-      off_t* data_file_size,
+      BlobFileWriter* blob_file,
       std::vector<AnnotatedOperation>* aops);
 
  private:
diff --git a/payload_generator/full_update_generator_unittest.cc b/payload_generator/full_update_generator_unittest.cc
index 8afbb03..28cbe36 100644
--- a/payload_generator/full_update_generator_unittest.cc
+++ b/payload_generator/full_update_generator_unittest.cc
@@ -37,6 +37,7 @@
                                     &out_blobs_path_,
                                     &out_blobs_fd_));
 
+    blob_file_.reset(new BlobFileWriter(out_blobs_fd_, &out_blobs_length_));
     rootfs_part_unlinker_.reset(
         new ScopedPathUnlinker(config_.target.rootfs.path));
     kernel_part_unlinker_.reset(
@@ -49,8 +50,10 @@
   // Output file holding the payload blobs.
   string out_blobs_path_;
   int out_blobs_fd_{-1};
+  off_t out_blobs_length_{0};
   ScopedFdCloser out_blobs_fd_closer_{&out_blobs_fd_};
 
+  std::unique_ptr<BlobFileWriter> blob_file_;
   std::unique_ptr<ScopedPathUnlinker> rootfs_part_unlinker_;
   std::unique_ptr<ScopedPathUnlinker> kernel_part_unlinker_;
   std::unique_ptr<ScopedPathUnlinker> out_blobs_unlinker_;
@@ -75,13 +78,11 @@
   EXPECT_TRUE(test_utils::WriteFileVector(config_.target.kernel.path,
                                           new_kern));
 
-  off_t out_blobs_length = 0;
   vector<AnnotatedOperation> rootfs_ops;
   vector<AnnotatedOperation> kernel_ops;
 
   EXPECT_TRUE(generator_.GenerateOperations(config_,
-                                            out_blobs_fd_,
-                                            &out_blobs_length,
+                                            blob_file_.get(),
                                             &rootfs_ops,
                                             &kernel_ops));
   int64_t target_rootfs_chunks =
@@ -118,13 +119,11 @@
   EXPECT_TRUE(test_utils::WriteFileVector(config_.target.kernel.path,
                                           new_kern));
 
-  off_t out_blobs_length = 0;
   vector<AnnotatedOperation> rootfs_ops;
   vector<AnnotatedOperation> kernel_ops;
 
   EXPECT_TRUE(generator_.GenerateOperations(config_,
-                                            out_blobs_fd_,
-                                            &out_blobs_length,
+                                            blob_file_.get(),
                                             &rootfs_ops,
                                             &kernel_ops));
   // rootfs has one chunk and a half.
diff --git a/payload_generator/inplace_generator.cc b/payload_generator/inplace_generator.cc
index dc4802a..47bb1a7 100644
--- a/payload_generator/inplace_generator.cc
+++ b/payload_generator/inplace_generator.cc
@@ -309,8 +309,7 @@
 bool ConvertCutsToFull(
     Graph* graph,
     const string& new_part,
-    int data_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     vector<Vertex::Index>* op_indexes,
     vector<vector<Vertex::Index>::size_type>* reverse_op_indexes,
     const vector<CutEdgeVertexes>& cuts) {
@@ -321,8 +320,7 @@
         graph,
         cut,
         new_part,
-        data_fd,
-        data_file_size));
+        blob_file));
     deleted_nodes.insert(cut.new_vertex);
   }
   deleted_nodes.insert(cuts[0].old_dst);
@@ -349,8 +347,7 @@
 bool AssignBlockForAdjoiningCuts(
     Graph* graph,
     const string& new_part,
-    int data_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     vector<Vertex::Index>* op_indexes,
     vector<vector<Vertex::Index>::size_type>* reverse_op_indexes,
     const vector<CutEdgeVertexes>& cuts) {
@@ -421,8 +418,7 @@
     LOG(INFO) << "Unable to find sufficient scratch";
     TEST_AND_RETURN_FALSE(ConvertCutsToFull(graph,
                                             new_part,
-                                            data_fd,
-                                            data_file_size,
+                                            blob_file,
                                             op_indexes,
                                             reverse_op_indexes,
                                             cuts));
@@ -468,8 +464,7 @@
 bool InplaceGenerator::AssignTempBlocks(
     Graph* graph,
     const string& new_part,
-    int data_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     vector<Vertex::Index>* op_indexes,
     vector<vector<Vertex::Index>::size_type>* reverse_op_indexes,
     const vector<CutEdgeVertexes>& cuts) {
@@ -491,8 +486,7 @@
       CHECK(!cuts_group.empty());
       TEST_AND_RETURN_FALSE(AssignBlockForAdjoiningCuts(graph,
                                                         new_part,
-                                                        data_fd,
-                                                        data_file_size,
+                                                        blob_file,
                                                         op_indexes,
                                                         reverse_op_indexes,
                                                         cuts_group));
@@ -508,8 +502,7 @@
   CHECK(!cuts_group.empty());
   TEST_AND_RETURN_FALSE(AssignBlockForAdjoiningCuts(graph,
                                                     new_part,
-                                                    data_fd,
-                                                    data_file_size,
+                                                    blob_file,
                                                     op_indexes,
                                                     reverse_op_indexes,
                                                     cuts_group));
@@ -546,8 +539,7 @@
 bool InplaceGenerator::ConvertCutToFullOp(Graph* graph,
                                           const CutEdgeVertexes& cut,
                                           const string& new_part,
-                                          int data_fd,
-                                          off_t* data_file_size) {
+                                          BlobFileWriter* blob_file) {
   // Drop all incoming edges, keep all outgoing edges
 
   // Keep all outgoing edges
@@ -572,8 +564,7 @@
         new_extents,
         (*graph)[cut.old_dst].aop.name,
         -1,  // chunk_blocks, forces to have a single operation.
-        data_fd,
-        data_file_size,
+        blob_file,
         false));  // src_ops_allowed
     TEST_AND_RETURN_FALSE(new_aop.size() == 1);
     TEST_AND_RETURN_FALSE(AddInstallOpToGraph(
@@ -597,8 +588,7 @@
 
 bool InplaceGenerator::ConvertGraphToDag(Graph* graph,
                                          const string& new_part,
-                                         int fd,
-                                         off_t* data_file_size,
+                                         BlobFileWriter* blob_file,
                                          vector<Vertex::Index>* final_order,
                                          Vertex::Index scratch_vertex) {
   CycleBreaker cycle_breaker;
@@ -634,8 +624,7 @@
   if (!cuts.empty())
     TEST_AND_RETURN_FALSE(AssignTempBlocks(graph,
                                            new_part,
-                                           fd,
-                                           data_file_size,
+                                           blob_file,
                                            final_order,
                                            &inverse_final_order,
                                            cuts));
@@ -743,8 +732,7 @@
     const PartitionConfig& new_part,
     uint64_t partition_size,
     size_t block_size,
-    int data_file_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     vector<AnnotatedOperation>* aops) {
   // Convert the operations to the graph.
   Graph graph;
@@ -776,8 +764,7 @@
   TEST_AND_RETURN_FALSE(ConvertGraphToDag(
       &graph,
       new_part.path,
-      data_file_fd,
-      data_file_size,
+      blob_file,
       &final_order,
       scratch_vertex));
 
@@ -799,8 +786,7 @@
     size_t block_size,
     ssize_t hard_chunk_blocks,
     size_t soft_chunk_blocks,
-    int data_file_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     vector<AnnotatedOperation>* aops) {
   const string part_name = PartitionNameString(new_part.name);
   LOG(INFO) << "Delta compressing " << part_name << " partition...";
@@ -810,8 +796,7 @@
                                      new_part,
                                      hard_chunk_blocks,
                                      soft_chunk_blocks,
-                                     data_file_fd,
-                                     data_file_size,
+                                     blob_file,
                                      false));  // src_ops_allowed
   LOG(INFO) << "Done reading " << part_name;
 
@@ -819,8 +804,7 @@
       ResolveReadAfterWriteDependencies(new_part,
                                         partition_size,
                                         block_size,
-                                        data_file_fd,
-                                        data_file_size,
+                                        blob_file,
                                         aops));
   LOG(INFO) << "Done reordering " << part_name;
   return true;
@@ -828,8 +812,7 @@
 
 bool InplaceGenerator::GenerateOperations(
     const PayloadGenerationConfig& config,
-    int data_file_fd,
-    off_t* data_file_size,
+    BlobFileWriter* blob_file,
     vector<AnnotatedOperation>* rootfs_ops,
     vector<AnnotatedOperation>* kernel_ops) {
   ssize_t hard_chunk_blocks = (config.hard_chunk_size == -1 ? -1 :
@@ -843,8 +826,7 @@
       config.block_size,
       hard_chunk_blocks,
       soft_chunk_blocks,
-      data_file_fd,
-      data_file_size,
+      blob_file,
       rootfs_ops));
 
   TEST_AND_RETURN_FALSE(GenerateOperationsForPartition(
@@ -854,8 +836,7 @@
       config.block_size,
       hard_chunk_blocks,
       soft_chunk_blocks,
-      data_file_fd,
-      data_file_size,
+      blob_file,
       kernel_ops));
 
   return true;
diff --git a/payload_generator/inplace_generator.h b/payload_generator/inplace_generator.h
index a1245f3..591f042 100644
--- a/payload_generator/inplace_generator.h
+++ b/payload_generator/inplace_generator.h
@@ -10,6 +10,7 @@
 #include <string>
 #include <vector>
 
+#include "update_engine/payload_generator/blob_file_writer.h"
 #include "update_engine/payload_generator/delta_diff_generator.h"
 #include "update_engine/payload_generator/graph_types.h"
 #include "update_engine/payload_generator/operations_generator.h"
@@ -120,8 +121,7 @@
   static bool AssignTempBlocks(
       Graph* graph,
       const std::string& new_part,
-      int data_fd,
-      off_t* data_file_size,
+      BlobFileWriter* blob_file,
       std::vector<Vertex::Index>* op_indexes,
       std::vector<std::vector<Vertex::Index>::size_type>* reverse_op_indexes,
       const std::vector<CutEdgeVertexes>& cuts);
@@ -136,8 +136,7 @@
   static bool ConvertCutToFullOp(Graph* graph,
                                  const CutEdgeVertexes& cut,
                                  const std::string& new_part,
-                                 int data_fd,
-                                 off_t* data_file_size);
+                                 BlobFileWriter* blob_file);
 
   // Takes a graph, which is not a DAG, which represents the files just
   // read from disk, and converts it into a DAG by breaking all cycles
@@ -150,8 +149,7 @@
   // Returns true on success.
   static bool ConvertGraphToDag(Graph* graph,
                                 const std::string& new_part,
-                                int fd,
-                                off_t* data_file_size,
+                                BlobFileWriter* blob_file,
                                 std::vector<Vertex::Index>* final_order,
                                 Vertex::Index scratch_vertex);
 
@@ -207,8 +205,7 @@
       const PartitionConfig& new_part,
       uint64_t partition_size,
       size_t block_size,
-      int data_file_fd,
-      off_t* data_file_size,
+      BlobFileWriter* blob_file,
       std::vector<AnnotatedOperation>* aops);
 
   // Generates the list of operations to update inplace from the partition
@@ -225,8 +222,7 @@
       size_t block_size,
       ssize_t hard_chunk_blocks,
       size_t soft_chunk_blocks,
-      int data_file_fd,
-      off_t* data_file_size,
+      BlobFileWriter* blob_file,
       std::vector<AnnotatedOperation>* aops);
 
   // Generate the update payload operations for the kernel and rootfs using
@@ -242,8 +238,7 @@
   // |data_file_size|.
   bool GenerateOperations(
       const PayloadGenerationConfig& config,
-      int data_file_fd,
-      off_t* data_file_size,
+      BlobFileWriter* blob_file,
       std::vector<AnnotatedOperation>* rootfs_ops,
       std::vector<AnnotatedOperation>* kernel_ops) override;
 
diff --git a/payload_generator/inplace_generator_unittest.cc b/payload_generator/inplace_generator_unittest.cc
index 0c22664..59e2f77 100644
--- a/payload_generator/inplace_generator_unittest.cc
+++ b/payload_generator/inplace_generator_unittest.cc
@@ -110,6 +110,7 @@
     blob_fd_closer_.reset(new ScopedFdCloser(&blob_fd_));
     blob_file_size_ = 0;
     EXPECT_GE(blob_fd_, 0);
+    blob_file_.reset(new BlobFileWriter(blob_fd_, &blob_file_size_));
   }
 
   // Blob file name, file descriptor and file size used to store operation
@@ -117,6 +118,7 @@
   string blob_path_;
   int blob_fd_{-1};
   off_t blob_file_size_{0};
+  std::unique_ptr<BlobFileWriter> blob_file_;
   std::unique_ptr<ScopedPathUnlinker> blob_path_unlinker_;
   std::unique_ptr<ScopedFdCloser> blob_fd_closer_;
 };
@@ -347,8 +349,7 @@
   CreateBlobFile();
   EXPECT_TRUE(InplaceGenerator::AssignTempBlocks(&graph,
                                                  "/dev/zero",
-                                                 blob_fd_,
-                                                 &blob_file_size_,
+                                                 blob_file_.get(),
                                                  &op_indexes,
                                                  &reverse_op_indexes,
                                                  cuts));
@@ -427,8 +428,7 @@
   CreateBlobFile();
   EXPECT_TRUE(InplaceGenerator::ConvertGraphToDag(&graph,
                                                   "/dev/zero",
-                                                  blob_fd_,
-                                                  &blob_file_size_,
+                                                  blob_file_.get(),
                                                   &final_order,
                                                   Vertex::kInvalidIndex));
 
@@ -563,7 +563,7 @@
                                     part_blocks));
     vector<AnnotatedOperation> result_aops = aops;
     EXPECT_TRUE(InplaceGenerator::ResolveReadAfterWriteDependencies(
-      part, part_blocks * block_size, block_size, blob_fd_, &blob_file_size_,
+      part, part_blocks * block_size, block_size, blob_file_.get(),
       &result_aops));
 
     size_t full_ops = 0;
diff --git a/payload_generator/operations_generator.h b/payload_generator/operations_generator.h
index 31d57b3..677f766 100644
--- a/payload_generator/operations_generator.h
+++ b/payload_generator/operations_generator.h
@@ -10,6 +10,7 @@
 #include <base/macros.h>
 
 #include "update_engine/payload_generator/annotated_operation.h"
+#include "update_engine/payload_generator/blob_file_writer.h"
 #include "update_engine/payload_generator/payload_generation_config.h"
 
 namespace chromeos_update_engine {
@@ -31,8 +32,7 @@
   // |data_file_size|.
   virtual bool GenerateOperations(
       const PayloadGenerationConfig& config,
-      int data_file_fd,
-      off_t* data_file_size,
+      BlobFileWriter* blob_file,
       std::vector<AnnotatedOperation>* rootfs_ops,
       std::vector<AnnotatedOperation>* kernel_ops) = 0;