Restructure FilesystemCopierAction to use ping-pong buffers.

This would allow some parallel processing -- calculating hashes in parallel with
read in parallel with write. Performance benefit at this stage seems
marginal. The major reason for the change is to reuse the FilesystemCopierAction
for applied update verification -- hopefully speeding it up and making AU
responsive to D-Bus signals during that stage. That would be a separate CL
though.

BUG=9140
TEST=unit tests, tested update on the device

Change-Id: I9d07ebbf4d536362c0a955addcaa1d48d5e9b115

Review URL: http://codereview.chromium.org/5603001
diff --git a/filesystem_copier_action.cc b/filesystem_copier_action.cc
index 1452b4d..f3f71bd 100755
--- a/filesystem_copier_action.cc
+++ b/filesystem_copier_action.cc
@@ -32,9 +32,32 @@
 namespace chromeos_update_engine {
 
 namespace {
-const off_t kCopyFileBufferSize = 2 * 1024 * 1024;
+const off_t kCopyFileBufferSize = 512 * 1024;
 }  // namespace {}
 
+FilesystemCopierAction::FilesystemCopierAction(
+    bool copying_kernel_install_path)
+    : copying_kernel_install_path_(copying_kernel_install_path),
+      src_stream_(NULL),
+      dst_stream_(NULL),
+      read_done_(false),
+      failed_(false),
+      cancelled_(false),
+      filesystem_size_(kint64max) {
+  // A lot of code works on the implicit assumption that processing is done on
+  // exactly 2 ping-pong buffers.
+  COMPILE_ASSERT(arraysize(buffer_) == 2 &&
+                 arraysize(buffer_state_) == 2 &&
+                 arraysize(buffer_valid_size_) == 2 &&
+                 arraysize(canceller_) == 2,
+                 ping_pong_buffers_not_two);
+  for (int i = 0; i < 2; ++i) {
+    buffer_state_[i] = kBufferStateEmpty;
+    buffer_valid_size_[i] = 0;
+    canceller_[i] = NULL;
+  }
+}
+
 void FilesystemCopierAction::PerformAction() {
   // Will tell the ActionProcessor we've failed if we return.
   ScopedActionCompleter abort_action_completer(processor_, this);
@@ -84,35 +107,35 @@
   src_stream_ = g_unix_input_stream_new(src_fd, TRUE);
   dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE);
 
-  buffer_.resize(kCopyFileBufferSize);
+  for (int i = 0; i < 2; i++) {
+    buffer_[i].resize(kCopyFileBufferSize);
+    canceller_[i] = g_cancellable_new();
+  }
 
-  // Set up the first read
-  canceller_ = g_cancellable_new();
-
-  g_input_stream_read_async(src_stream_,
-                            &buffer_[0],
-                            GetBytesToRead(),
-                            G_PRIORITY_DEFAULT,
-                            canceller_,
-                            &FilesystemCopierAction::StaticAsyncReadyCallback,
-                            this);
-  read_in_flight_ = true;
+  // Start the first read.
+  SpawnAsyncActions();
 
   abort_action_completer.set_should_complete(false);
 }
 
 void FilesystemCopierAction::TerminateProcessing() {
-  if (canceller_) {
-    g_cancellable_cancel(canceller_);
+  for (int i = 0; i < 2; i++) {
+    if (canceller_[i]) {
+      g_cancellable_cancel(canceller_[i]);
+    }
   }
 }
 
-void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) {
+void FilesystemCopierAction::Cleanup(bool success) {
+  for (int i = 0; i < 2; i++) {
+    g_object_unref(canceller_[i]);
+    canceller_[i] = NULL;
+  }
   g_object_unref(src_stream_);
   src_stream_ = NULL;
   g_object_unref(dst_stream_);
   dst_stream_ = NULL;
-  if (was_cancelled)
+  if (cancelled_)
     return;
   if (success && HasOutputPipe())
     SetOutputObject(install_plan_);
@@ -121,85 +144,141 @@
       success ? kActionCodeSuccess : kActionCodeError);
 }
 
-void FilesystemCopierAction::AsyncReadyCallback(GObject *source_object,
-                                                GAsyncResult *res) {
+void FilesystemCopierAction::AsyncReadReadyCallback(GObject *source_object,
+                                                    GAsyncResult *res) {
+  int index = buffer_state_[0] == kBufferStateReading ? 0 : 1;
+  CHECK(buffer_state_[index] == kBufferStateReading);
+
   GError* error = NULL;
-  CHECK(canceller_);
-  bool was_cancelled = g_cancellable_is_cancelled(canceller_) == TRUE;
-  g_object_unref(canceller_);
-  canceller_ = NULL;
+  CHECK(canceller_[index]);
+  cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
 
-  if (read_in_flight_) {
-    ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error);
-    if (bytes_read < 0) {
-      LOG(ERROR) << "Read failed:" << utils::GetGErrorMessage(error);
-      Cleanup(false, was_cancelled);
-      return;
+  ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error);
+  if (bytes_read < 0) {
+    LOG(ERROR) << "Read failed: " << utils::GetGErrorMessage(error);
+    failed_ = true;
+    buffer_state_[index] = kBufferStateEmpty;
+  } else if (bytes_read == 0) {
+    read_done_ = true;
+    buffer_state_[index] = kBufferStateEmpty;
+  } else {
+    buffer_valid_size_[index] = bytes_read;
+    buffer_state_[index] = kBufferStateFull;
+  }
+
+  if (bytes_read > 0) {
+    filesystem_size_ -= bytes_read;
+  }
+
+  SpawnAsyncActions();
+
+  if (bytes_read > 0) {
+    if (!hasher_.Update(buffer_[index].data(), bytes_read)) {
+      LOG(ERROR) << "Unable to update the hash.";
+      failed_ = true;
     }
+  }
+}
 
-    if (bytes_read == 0) {
-      // We're done!
-      if (!hasher_.Finalize()) {
-        LOG(ERROR) << "Unable to finalize the hash.";
-        Cleanup(false, was_cancelled);
-        return;
-      }
+void FilesystemCopierAction::StaticAsyncReadReadyCallback(
+    GObject *source_object,
+    GAsyncResult *res,
+    gpointer user_data) {
+  reinterpret_cast<FilesystemCopierAction*>(user_data)->
+      AsyncReadReadyCallback(source_object, res);
+}
+
+void FilesystemCopierAction::AsyncWriteReadyCallback(GObject *source_object,
+                                                     GAsyncResult *res) {
+  int index = buffer_state_[0] == kBufferStateWriting ? 0 : 1;
+  CHECK(buffer_state_[index] == kBufferStateWriting);
+  buffer_state_[index] = kBufferStateEmpty;
+
+  GError* error = NULL;
+  CHECK(canceller_[index]);
+  cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
+
+  ssize_t bytes_written = g_output_stream_write_finish(dst_stream_,
+                                                       res,
+                                                       &error);
+  if (bytes_written < static_cast<ssize_t>(buffer_valid_size_[index])) {
+    if (bytes_written < 0) {
+      LOG(ERROR) << "Write failed: " << utils::GetGErrorMessage(error);
+    } else {
+      LOG(ERROR) << "Write was short: wrote " << bytes_written
+                 << " but expected to write " << buffer_valid_size_[index];
+    }
+    failed_ = true;
+  }
+
+  SpawnAsyncActions();
+}
+
+void FilesystemCopierAction::StaticAsyncWriteReadyCallback(
+    GObject *source_object,
+    GAsyncResult *res,
+    gpointer user_data) {
+  reinterpret_cast<FilesystemCopierAction*>(user_data)->
+      AsyncWriteReadyCallback(source_object, res);
+}
+
+void FilesystemCopierAction::SpawnAsyncActions() {
+  bool reading = false;
+  bool writing = false;
+  for (int i = 0; i < 2; i++) {
+    if (buffer_state_[i] == kBufferStateReading) {
+      reading = true;
+    }
+    if (buffer_state_[i] == kBufferStateWriting) {
+      writing = true;
+    }
+  }
+  if (failed_ || cancelled_) {
+    if (!reading && !writing) {
+      Cleanup(false);
+    }
+    return;
+  }
+  for (int i = 0; i < 2; i++) {
+    if (!reading && !read_done_ && buffer_state_[i] == kBufferStateEmpty) {
+      g_input_stream_read_async(
+          src_stream_,
+          buffer_[i].data(),
+          GetBytesToRead(),
+          G_PRIORITY_DEFAULT,
+          canceller_[i],
+          &FilesystemCopierAction::StaticAsyncReadReadyCallback,
+          this);
+      reading = true;
+      buffer_state_[i] = kBufferStateReading;
+    } else if (!writing && buffer_state_[i] == kBufferStateFull) {
+      g_output_stream_write_async(
+          dst_stream_,
+          buffer_[i].data(),
+          buffer_valid_size_[i],
+          G_PRIORITY_DEFAULT,
+          canceller_[i],
+          &FilesystemCopierAction::StaticAsyncWriteReadyCallback,
+          this);
+      writing = true;
+      buffer_state_[i] = kBufferStateWriting;
+    }
+  }
+  if (!reading && !writing) {
+    // We're done!
+    if (hasher_.Finalize()) {
       LOG(INFO) << "hash: " << hasher_.hash();
       if (copying_kernel_install_path_) {
         install_plan_.current_kernel_hash = hasher_.raw_hash();
       } else {
         install_plan_.current_rootfs_hash = hasher_.raw_hash();
       }
-      Cleanup(true, was_cancelled);
-      return;
-    }
-    if (!hasher_.Update(buffer_.data(), bytes_read)) {
-      LOG(ERROR) << "Unable to update the hash.";
-      Cleanup(false, was_cancelled);
-      return;
-    }
-    filesystem_size_ -= bytes_read;
-
-    // Kick off a write
-    read_in_flight_ = false;
-    buffer_valid_size_ = bytes_read;
-    canceller_ = g_cancellable_new();
-    g_output_stream_write_async(
-        dst_stream_,
-        &buffer_[0],
-        bytes_read,
-        G_PRIORITY_DEFAULT,
-        canceller_,
-        &FilesystemCopierAction::StaticAsyncReadyCallback,
-        this);
-    return;
-  }
-
-  ssize_t bytes_written = g_output_stream_write_finish(dst_stream_,
-                                                       res,
-                                                       &error);
-  if (bytes_written < static_cast<ssize_t>(buffer_valid_size_)) {
-    if (bytes_written < 0) {
-      LOG(ERROR) << "Write failed:" << utils::GetGErrorMessage(error);
+      Cleanup(true);
     } else {
-      LOG(ERROR) << "Write was short: wrote " << bytes_written
-                 << " but expected to write " << buffer_valid_size_;
+      LOG(ERROR) << "Unable to finalize the hash.";
+      Cleanup(false);
     }
-    Cleanup(false, was_cancelled);
-    return;
   }
-
-  // Kick off a read
-  read_in_flight_ = true;
-  canceller_ = g_cancellable_new();
-  g_input_stream_read_async(
-      src_stream_,
-      &buffer_[0],
-      GetBytesToRead(),
-      G_PRIORITY_DEFAULT,
-      canceller_,
-      &FilesystemCopierAction::StaticAsyncReadyCallback,
-      this);
 }
 
 void FilesystemCopierAction::DetermineFilesystemSize(int fd) {
@@ -214,7 +293,7 @@
 }
 
 int64_t FilesystemCopierAction::GetBytesToRead() {
-  return std::min(static_cast<int64_t>(buffer_.size()), filesystem_size_);
+  return std::min(static_cast<int64_t>(buffer_[0].size()), filesystem_size_);
 }
 
 }  // namespace chromeos_update_engine
diff --git a/filesystem_copier_action.h b/filesystem_copier_action.h
index 6720a47..6dbde41 100644
--- a/filesystem_copier_action.h
+++ b/filesystem_copier_action.h
@@ -37,14 +37,8 @@
 
 class FilesystemCopierAction : public Action<FilesystemCopierAction> {
  public:
-  explicit FilesystemCopierAction(bool copying_kernel_install_path)
-      : copying_kernel_install_path_(copying_kernel_install_path),
-        src_stream_(NULL),
-        dst_stream_(NULL),
-        canceller_(NULL),
-        read_in_flight_(false),
-        buffer_valid_size_(0),
-        filesystem_size_(kint64max) {}
+  explicit FilesystemCopierAction(bool copying_kernel_install_path);
+
   typedef ActionTraits<FilesystemCopierAction>::InputObjectType
   InputObjectType;
   typedef ActionTraits<FilesystemCopierAction>::OutputObjectType
@@ -53,9 +47,7 @@
   void TerminateProcessing();
 
   // Used for testing, so we can copy from somewhere other than root
-  void set_copy_source(const std::string& path) {
-    copy_source_ = path;
-  }
+  void set_copy_source(const std::string& path) { copy_source_ = path; }
 
   // Debugging/logging
   static std::string StaticType() { return "FilesystemCopierAction"; }
@@ -65,19 +57,34 @@
   friend class FilesystemCopierActionTest;
   FRIEND_TEST(FilesystemCopierActionTest, RunAsRootDetermineFilesystemSizeTest);
 
-  // Callback from glib when the copy operation is done.
-  void AsyncReadyCallback(GObject *source_object, GAsyncResult *res);
-  static void StaticAsyncReadyCallback(GObject *source_object,
-                                       GAsyncResult *res,
-                                       gpointer user_data) {
-    reinterpret_cast<FilesystemCopierAction*>(user_data)->AsyncReadyCallback(
-        source_object, res);
-  }
+  // Ping-pong buffers generally cycle through the following states:
+  // Empty->Reading->Full->Writing->Empty.
+  enum BufferState {
+    kBufferStateEmpty,
+    kBufferStateReading,
+    kBufferStateFull,
+    kBufferStateWriting
+  };
 
-  // Cleans up all the variables we use for async operations and tells
-  // the ActionProcessor we're done w/ success as passed in.
-  // was_cancelled should be true if TerminateProcessing() was called.
-  void Cleanup(bool success, bool was_cancelled);
+  // Callbacks from glib when the read/write operation is done.
+  void AsyncReadReadyCallback(GObject *source_object, GAsyncResult *res);
+  static void StaticAsyncReadReadyCallback(GObject *source_object,
+                                           GAsyncResult *res,
+                                           gpointer user_data);
+
+  void AsyncWriteReadyCallback(GObject *source_object, GAsyncResult *res);
+  static void StaticAsyncWriteReadyCallback(GObject *source_object,
+                                            GAsyncResult *res,
+                                            gpointer user_data);
+
+  // Based on the state of the ping-pong buffers spawns appropriate read/write
+  // actions asynchronously.
+  void SpawnAsyncActions();
+
+  // Cleans up all the variables we use for async operations and tells the
+  // ActionProcessor we're done w/ success as passed in. |cancelled_| should be
+  // true if TerminateProcessing() was called.
+  void Cleanup(bool success);
 
   // Determine, if possible, the source file system size to avoid copying the
   // whole partition. Currently this supports only the root file system assuming
@@ -101,18 +108,22 @@
   GInputStream* src_stream_;
   GOutputStream* dst_stream_;
 
-  // If non-NULL, the cancellable object for the in-flight async call.
-  GCancellable* canceller_;
+  // Ping-pong buffers for storing data we read/write. Only one buffer is being
+  // read at a time and only one buffer is being written at a time.
+  std::vector<char> buffer_[2];
 
-  // True if we're waiting on a read to complete; false if we're
-  // waiting on a write.
-  bool read_in_flight_;
+  // The state of each buffer.
+  BufferState buffer_state_[2];
 
-  // The buffer for storing data we read/write.
-  std::vector<char> buffer_;
+  // Number of valid elements in |buffer_| if its state is kBufferStateFull.
+  std::vector<char>::size_type buffer_valid_size_[2];
 
-  // Number of valid elements in buffer_.
-  std::vector<char>::size_type buffer_valid_size_;
+  // The cancellable objects for the in-flight async calls.
+  GCancellable* canceller_[2];
+
+  bool read_done_;  // true if reached EOF on the input stream.
+  bool failed_;  // true if the action has failed.
+  bool cancelled_;  // true if the action has been cancelled.
 
   // The install plan we're passed in via the input pipe.
   InstallPlan install_plan_;