update_engine: Watch file descriptors using chromeos::MessageLoop.

This patch removes all the calls to g_io_*() functions used to create
io_channels from a file descriptor and watch for them in the main loop.
Instead, we use the chromeos::MessageLoop backed with the glib
implementation.

This patch also removes the duplicated process handling work done in
P2PManager and uses the common Subprocess class instead.

BUG=chromium:499886
TEST=Added and updated unittests.

Change-Id: Ia093b060d2396325fce69b2bbdb62957ba7bfbc6
Reviewed-on: https://chromium-review.googlesource.com/284593
Tested-by: Alex Deymo <deymo@chromium.org>
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Alex Deymo <deymo@chromium.org>
Trybot-Ready: Alex Deymo <deymo@chromium.org>
diff --git a/p2p_manager.cc b/p2p_manager.cc
index c1a2738..ee0f583 100644
--- a/p2p_manager.cc
+++ b/p2p_manager.cc
@@ -35,6 +35,7 @@
 #include <base/strings/stringprintf.h>
 
 #include "update_engine/glib_utils.h"
+#include "update_engine/subprocess.h"
 #include "update_engine/update_manager/policy.h"
 #include "update_engine/update_manager/update_manager.h"
 #include "update_engine/utils.h"
@@ -45,6 +46,7 @@
 using base::StringPrintf;
 using base::Time;
 using base::TimeDelta;
+using chromeos::MessageLoop;
 using chromeos_update_manager::EvalStatus;
 using chromeos_update_manager::Policy;
 using chromeos_update_manager::UpdateManager;
@@ -105,7 +107,7 @@
                  UpdateManager* update_manager,
                  const string& file_extension,
                  const int num_files_to_keep,
-                 const base::TimeDelta& max_file_age);
+                 const TimeDelta& max_file_age);
 
   // P2PManager methods.
   void SetDevicePolicy(const policy::DevicePolicy* device_policy) override;
@@ -147,7 +149,7 @@
 
   // Utility function to delete a file given by |path| and log the
   // path as well as |reason|. Returns false on failure.
-  bool DeleteP2PFile(const FilePath& path, const std::string& reason);
+  bool DeleteP2PFile(const FilePath& path, const string& reason);
 
   // Schedules an async request for tracking changes in P2P enabled status.
   void ScheduleEnabledStatusChange();
@@ -178,7 +180,7 @@
 
   // If non-zero, files older than this will not be kept after
   // performing housekeeping.
-  const base::TimeDelta max_file_age_;
+  const TimeDelta max_file_age_;
 
   // The string ".p2p".
   static const char kP2PExtension[];
@@ -206,7 +208,7 @@
                                UpdateManager* update_manager,
                                const string& file_extension,
                                const int num_files_to_keep,
-                               const base::TimeDelta& max_file_age)
+                               const TimeDelta& max_file_age)
   : clock_(clock),
     update_manager_(update_manager),
     file_extension_(file_extension),
@@ -238,34 +240,14 @@
 }
 
 bool P2PManagerImpl::EnsureP2P(bool should_be_running) {
-  gchar *standard_error = nullptr;
-  GError *error = nullptr;
-  gint exit_status = 0;
+  int return_code = 0;
+  string output;
 
   may_be_running_ = true;  // Unless successful, we must be conservative.
 
   vector<string> args = configuration_->GetInitctlArgs(should_be_running);
-  unique_ptr<gchar*, utils::GLibStrvFreeDeleter> argv(
-      utils::StringVectorToGStrv(args));
-  if (!g_spawn_sync(nullptr,  // working_directory
-                    argv.get(),
-                    nullptr,  // envp
-                    static_cast<GSpawnFlags>(G_SPAWN_SEARCH_PATH),
-                    nullptr, nullptr,  // child_setup, user_data
-                    nullptr,  // standard_output
-                    &standard_error,
-                    &exit_status,
-                    &error)) {
-    LOG(ERROR) << "Error spawning " << utils::StringVectorToString(args)
-               << ": " << utils::GetAndFreeGError(&error);
-    return false;
-  }
-  unique_ptr<gchar, utils::GLibFreeDeleter> standard_error_deleter(
-      standard_error);
-
-  if (!WIFEXITED(exit_status)) {
-    LOG(ERROR) << "Error spawning '" << utils::StringVectorToString(args)
-               << "': WIFEXITED is false";
+  if (!Subprocess::SynchronousExec(args, &return_code, &output)) {
+    LOG(ERROR) << "Error spawning " << utils::StringVectorToString(args);
     return false;
   }
 
@@ -274,11 +256,11 @@
   // necessity because initctl does not offer actions such as "start if not
   // running" or "stop if running".
   // TODO(zeuthen,chromium:277051): Avoid doing this.
-  if (WEXITSTATUS(exit_status) != 0) {
-    const gchar *expected_error_message = should_be_running ?
+  if (return_code != 0) {
+    const char *expected_error_message = should_be_running ?
       "initctl: Job is already running: p2p\n" :
       "initctl: Unknown instance \n";
-    if (g_strcmp0(standard_error, expected_error_message) != 0)
+    if (output != expected_error_message)
       return false;
   }
 
@@ -322,7 +304,7 @@
 }
 
 bool P2PManagerImpl::DeleteP2PFile(const FilePath& path,
-                                   const std::string& reason) {
+                                   const string& reason) {
   LOG(INFO) << "Deleting p2p file " << path.value()
             << " (reason: " << reason << ")";
   if (unlink(path.value().c_str()) != 0) {
@@ -367,7 +349,7 @@
     // If instructed to keep only files younger than a given age
     // (|max_file_age_| != 0), delete files satisfying this criteria
     // right now. Otherwise add it to a list we'll consider for later.
-    if (clock_ != nullptr && max_file_age_ != base::TimeDelta() &&
+    if (clock_ != nullptr && max_file_age_ != TimeDelta() &&
         clock_->GetWallclockTime() - time > max_file_age_) {
       if (!DeleteP2PFile(file, "file too old"))
         deletion_failed = true;
@@ -396,81 +378,50 @@
 class LookupData {
  public:
   explicit LookupData(P2PManager::LookupCallback callback)
-    : callback_(callback),
-      pid_(0),
-      stdout_fd_(-1),
-      stdout_channel_source_id_(0),
-      child_watch_source_id_(0),
-      timeout_source_id_(0),
-      reported_(false) {}
+    : callback_(callback) {}
 
   ~LookupData() {
-    if (child_watch_source_id_ != 0)
-      g_source_remove(child_watch_source_id_);
-    if (stdout_channel_source_id_ != 0)
-      g_source_remove(stdout_channel_source_id_);
-    if (timeout_source_id_ != 0)
-      g_source_remove(timeout_source_id_);
-    if (stdout_fd_ != -1)
-      close(stdout_fd_);
-    if (pid_ != 0)
-      kill(pid_, SIGTERM);
+    if (timeout_task_ != MessageLoop::kTaskIdNull)
+      MessageLoop::current()->CancelTask(timeout_task_);
+    if (child_tag_)
+      Subprocess::Get().KillExec(child_tag_);
   }
 
-  void InitiateLookup(gchar **argv, TimeDelta timeout) {
+  void InitiateLookup(const vector<string>& cmd, TimeDelta timeout) {
     // NOTE: if we fail early (i.e. in this method), we need to schedule
     // an idle to report the error. This is because we guarantee that
-    // the callback is always called from the GLib mainloop (this
+    // the callback is always called from the message loop (this
     // guarantee is useful for testing).
 
-    GError *error = nullptr;
-    if (!g_spawn_async_with_pipes(nullptr,  // working_directory
-                                  argv,
-                                  nullptr,  // envp
-                                  static_cast<GSpawnFlags>(G_SPAWN_SEARCH_PATH |
-                                                     G_SPAWN_DO_NOT_REAP_CHILD),
-                                  nullptr,  // child_setup
-                                  this,
-                                  &pid_,
-                                  nullptr,  // standard_input
-                                  &stdout_fd_,
-                                  nullptr,  // standard_error
-                                  &error)) {
-      LOG(ERROR) << "Error spawning p2p-client: "
-                 << utils::GetAndFreeGError(&error);
+    // We expect to run just "p2p-client" and find it in the path.
+    child_tag_ = Subprocess::Get().ExecFlags(
+        cmd, G_SPAWN_SEARCH_PATH, false /* redirect stderr */, OnLookupDone,
+        this);
+
+    if (!child_tag_) {
+      LOG(ERROR) << "Error spawning " << utils::StringVectorToString(cmd);
       ReportErrorAndDeleteInIdle();
       return;
     }
 
-    GIOChannel* io_channel = g_io_channel_unix_new(stdout_fd_);
-    stdout_channel_source_id_ = g_io_add_watch(
-        io_channel,
-        static_cast<GIOCondition>(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),
-        OnIOChannelActivity, this);
-    CHECK_NE(stdout_channel_source_id_, 0u);
-    g_io_channel_unref(io_channel);
-
-    child_watch_source_id_ = g_child_watch_add(pid_, OnChildWatchActivity,
-                                               this);
-    CHECK_NE(child_watch_source_id_, 0u);
-
-    if (timeout.ToInternalValue() > 0) {
-      timeout_source_id_ = g_timeout_add(timeout.InMilliseconds(),
-                                         OnTimeout, this);
-      CHECK_NE(timeout_source_id_, 0u);
+    if (timeout > TimeDelta()) {
+      timeout_task_ = MessageLoop::current()->PostDelayedTask(
+          FROM_HERE,
+          Bind(&LookupData::OnTimeout, base::Unretained(this)),
+          timeout);
     }
   }
 
  private:
   void ReportErrorAndDeleteInIdle() {
-    g_idle_add(static_cast<GSourceFunc>(OnIdleForReportErrorAndDelete), this);
+    MessageLoop::current()->PostTask(FROM_HERE, Bind(
+        &LookupData::OnIdleForReportErrorAndDelete,
+        base::Unretained(this)));
   }
 
-  static gboolean OnIdleForReportErrorAndDelete(gpointer user_data) {
-    LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
-    lookup_data->ReportError();
-    delete lookup_data;
-    return FALSE;  // Remove source.
+  void OnIdleForReportErrorAndDelete() {
+    ReportError();
+    delete this;
   }
 
   void IssueCallback(const string& url) {
@@ -485,11 +436,10 @@
     reported_ = true;
   }
 
-  void ReportSuccess() {
+  void ReportSuccess(const string& output) {
     if (reported_)
       return;
-
-    string url = stdout_;
+    string url = output;
     size_t newline_pos = url.find('\n');
     if (newline_pos != string::npos)
       url.resize(newline_pos);
@@ -503,72 +453,40 @@
       LOG(ERROR) << "p2p URL '" << url << "' does not look right. Ignoring.";
       ReportError();
     }
-
     reported_ = true;
   }
 
-  static gboolean OnIOChannelActivity(GIOChannel *source,
-                                      GIOCondition condition,
-                                      gpointer user_data) {
+  static void OnLookupDone(int return_code,
+                           const string& output,
+                           void *user_data) {
     LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
-    gchar* str = nullptr;
-    GError* error = nullptr;
-    GIOStatus status = g_io_channel_read_line(source,
-                                              &str,
-                                              nullptr,  // len
-                                              nullptr,  // line_terminator
-                                              &error);
-    if (status != G_IO_STATUS_NORMAL) {
-      // Ignore EOF since we usually get that before SIGCHLD and we
-      // need to examine exit status there.
-      if (status != G_IO_STATUS_EOF) {
-        LOG(ERROR) << "Error reading a line from p2p-client: "
-                   << utils::GetAndFreeGError(&error);
-        lookup_data->ReportError();
-        delete lookup_data;
-      }
-    } else {
-      if (str != nullptr) {
-        lookup_data->stdout_ += str;
-        g_free(str);
-      }
-    }
-    return TRUE;  // Don't remove source.
-  }
-
-  static void OnChildWatchActivity(GPid pid,
-                                   gint status,
-                                   gpointer user_data) {
-    LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
-
-    if (!WIFEXITED(status)) {
-      LOG(ERROR) << "Child didn't exit normally";
-      lookup_data->ReportError();
-    } else if (WEXITSTATUS(status) != 0) {
+    lookup_data->child_tag_ = 0;
+    if (return_code != 0) {
       LOG(INFO) << "Child exited with non-zero exit code "
-                << WEXITSTATUS(status);
+                << return_code;
       lookup_data->ReportError();
     } else {
-      lookup_data->ReportSuccess();
+      lookup_data->ReportSuccess(output);
     }
     delete lookup_data;
   }
 
-  static gboolean OnTimeout(gpointer user_data) {
-    LookupData *lookup_data = reinterpret_cast<LookupData*>(user_data);
-    lookup_data->ReportError();
-    delete lookup_data;
-    return TRUE;  // Don't remove source.
+  void OnTimeout() {
+    timeout_task_ = MessageLoop::kTaskIdNull;
+    ReportError();
+    delete this;
   }
 
   P2PManager::LookupCallback callback_;
-  GPid pid_;
-  gint stdout_fd_;
-  guint stdout_channel_source_id_;
-  guint child_watch_source_id_;
-  guint timeout_source_id_;
-  string stdout_;
-  bool reported_;
+
+  // The Subprocess tag of the running process. A value of 0 means that the
+  // process is not running.
+  uint32_t child_tag_{0};
+
+  // The timeout task_id we are waiting on, if any.
+  MessageLoop::TaskId timeout_task_{MessageLoop::kTaskIdNull};
+
+  bool reported_{false};
 };
 
 void P2PManagerImpl::LookupUrlForFile(const string& file_id,
@@ -579,9 +497,7 @@
   string file_id_with_ext = file_id + "." + file_extension_;
   vector<string> args = configuration_->GetP2PClientArgs(file_id_with_ext,
                                                          minimum_size);
-  gchar **argv = utils::StringVectorToGStrv(args);
-  lookup_data->InitiateLookup(argv, max_time_to_wait);
-  g_strfreev(argv);
+  lookup_data->InitiateLookup(args, max_time_to_wait);
 }
 
 bool P2PManagerImpl::FileShare(const string& file_id,
@@ -823,7 +739,7 @@
     UpdateManager* update_manager,
     const string& file_extension,
     const int num_files_to_keep,
-    const base::TimeDelta& max_file_age) {
+    const TimeDelta& max_file_age) {
   return new P2PManagerImpl(configuration,
                             clock,
                             update_manager,