update_engine: Remove libgio usage from FilesystemVerifierAction.

The FilesystemVerifierAction used gio to read from the partition it
is verifying. This patch makes it use plain file descriptors and watch
them using chromeos::MessageLoop instead.

BUG=chromium:499886
TEST=updated unittests.

Change-Id: I5230399d5ac4777522bd5f53d4f4ade0a29a6c9f
Reviewed-on: https://chromium-review.googlesource.com/284648
Tested-by: Alex Deymo <deymo@chromium.org>
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Alex Deymo <deymo@chromium.org>
Trybot-Ready: Alex Deymo <deymo@chromium.org>
diff --git a/filesystem_verifier_action.cc b/filesystem_verifier_action.cc
index 8810763..70156d9 100644
--- a/filesystem_verifier_action.cc
+++ b/filesystem_verifier_action.cc
@@ -13,33 +13,27 @@
 #include <cstdlib>
 #include <string>
 
-#include <gio/gio.h>
-#include <gio/gunixinputstream.h>
-#include <glib.h>
+#include <base/bind.h>
+#include <base/posix/eintr_wrapper.h>
 
-#include "update_engine/glib_utils.h"
 #include "update_engine/hardware_interface.h"
 #include "update_engine/subprocess.h"
 #include "update_engine/system_state.h"
 #include "update_engine/utils.h"
 
+using chromeos::MessageLoop;
 using std::string;
 
 namespace chromeos_update_engine {
 
 namespace {
-const off_t kCopyFileBufferSize = 128 * 1024;
+const off_t kReadFileBufferSize = 128 * 1024;
 }  // namespace
 
 FilesystemVerifierAction::FilesystemVerifierAction(
     SystemState* system_state,
     PartitionType partition_type)
     : partition_type_(partition_type),
-      src_stream_(nullptr),
-      canceller_(nullptr),
-      read_done_(false),
-      failed_(false),
-      cancelled_(false),
       remaining_size_(kint64max),
       system_state_(system_state) {}
 
@@ -104,39 +98,49 @@
       break;
   }
 
-  int src_fd = open(target_path.c_str(), O_RDONLY);
-  if (src_fd < 0) {
-    PLOG(ERROR) << "Unable to open " << target_path << " for reading:";
+  src_fd_ = HANDLE_EINTR(open(target_path.c_str(), O_RDONLY));
+  if (src_fd_ < 0) {
+    PLOG(ERROR) << "Unable to open " << target_path << " for reading";
     return;
   }
 
-  DetermineFilesystemSize(src_fd);
-  src_stream_ = g_unix_input_stream_new(src_fd, TRUE);
-
-  buffer_.resize(kCopyFileBufferSize);
-  canceller_ = g_cancellable_new();
+  DetermineFilesystemSize(src_fd_);
+  buffer_.resize(kReadFileBufferSize);
 
   // Start the first read.
-  SpawnAsyncActions();
+  read_task_ = MessageLoop::current()->WatchFileDescriptor(
+      FROM_HERE,
+      src_fd_,
+      MessageLoop::WatchMode::kWatchRead,
+      true,  // persistent
+      base::Bind(&FilesystemVerifierAction::OnReadReadyCallback,
+                 base::Unretained(this)));
 
   abort_action_completer.set_should_complete(false);
 }
 
 void FilesystemVerifierAction::TerminateProcessing() {
-  if (canceller_) {
-    g_cancellable_cancel(canceller_);
-  }
+  cancelled_ = true;
+  Cleanup(ErrorCode::kSuccess);  // error code is ignored if canceled_ is true.
 }
 
 bool FilesystemVerifierAction::IsCleanupPending() const {
-  return (src_stream_ != nullptr);
+  return (src_fd_ != -1);
 }
 
 void FilesystemVerifierAction::Cleanup(ErrorCode code) {
-  g_object_unref(canceller_);
-  canceller_ = nullptr;
-  g_object_unref(src_stream_);
-  src_stream_ = nullptr;
+  MessageLoop::current()->CancelTask(read_task_);
+  read_task_ = MessageLoop::kTaskIdNull;
+
+  if (src_fd_ != -1) {
+    if (IGNORE_EINTR(close(src_fd_)) != 0) {
+      PLOG(ERROR) << "Error closing fd " << src_fd_;
+    }
+    src_fd_ = -1;
+  }
+  // This memory is not used anymore.
+  buffer_.clear();
+
   if (cancelled_)
     return;
   if (code == ErrorCode::kSuccess && HasOutputPipe())
@@ -144,16 +148,16 @@
   processor_->ActionComplete(this, code);
 }
 
-void FilesystemVerifierAction::AsyncReadReadyCallback(GObject *source_object,
-                                                      GAsyncResult *res) {
-  GError* error = nullptr;
-  CHECK(canceller_);
-  cancelled_ = g_cancellable_is_cancelled(canceller_) == TRUE;
+void FilesystemVerifierAction::OnReadReadyCallback() {
+  size_t bytes_to_read = std::min(static_cast<int64_t>(buffer_.size()),
+                                  remaining_size_);
 
-  ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error);
-
+  ssize_t bytes_read = 0;
+  if (bytes_to_read) {
+    bytes_read = HANDLE_EINTR(read(src_fd_, buffer_.data(), bytes_to_read));
+  }
   if (bytes_read < 0) {
-    LOG(ERROR) << "Read failed: " << utils::GetAndFreeGError(&error);
+    PLOG(ERROR) << "Read failed";
     failed_ = true;
   } else if (bytes_read == 0) {
     read_done_ = true;
@@ -162,43 +166,23 @@
   }
 
   if (bytes_read > 0) {
-    // If read_done_ is set, SpawnAsyncActions may finalize the hash so the hash
-    // update below would happen too late.
     CHECK(!read_done_);
     if (!hasher_.Update(buffer_.data(), bytes_read)) {
       LOG(ERROR) << "Unable to update the hash.";
       failed_ = true;
     }
   }
-  SpawnAsyncActions();
+
+  CheckTerminationConditions();
 }
 
-void FilesystemVerifierAction::StaticAsyncReadReadyCallback(
-    GObject *source_object,
-    GAsyncResult *res,
-    gpointer user_data) {
-  reinterpret_cast<FilesystemVerifierAction*>(user_data)->
-      AsyncReadReadyCallback(source_object, res);
-}
-
-void FilesystemVerifierAction::SpawnAsyncActions() {
+void FilesystemVerifierAction::CheckTerminationConditions() {
   if (failed_ || cancelled_) {
     Cleanup(ErrorCode::kError);
     return;
   }
 
-  if (!read_done_) {
-    int64_t bytes_to_read = std::min(static_cast<int64_t>(buffer_.size()),
-                                     remaining_size_);
-    g_input_stream_read_async(
-        src_stream_,
-        buffer_.data(),
-        bytes_to_read,
-        G_PRIORITY_DEFAULT,
-        canceller_,
-        &FilesystemVerifierAction::StaticAsyncReadReadyCallback,
-        this);
-  } else {
+  if (read_done_) {
     // We're done!
     ErrorCode code = ErrorCode::kSuccess;
     if (hasher_.Finalize()) {