update_engine: Watch file descriptors using chromeos::MessageLoop.

This patch removes all the calls to g_io_*() functions used to create
io_channels from a file descriptor and watch for them in the main loop.
Instead, we use the chromeos::MessageLoop backed with the glib
implementation.

This patch also removes the duplicated process handling work done in
P2PManager and uses the common Subprocess class instead.

BUG=chromium:499886
TEST=Added and updated unittests.

Change-Id: Ia093b060d2396325fce69b2bbdb62957ba7bfbc6
Reviewed-on: https://chromium-review.googlesource.com/284593
Tested-by: Alex Deymo <deymo@chromium.org>
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Alex Deymo <deymo@chromium.org>
Trybot-Ready: Alex Deymo <deymo@chromium.org>
diff --git a/libcurl_http_fetcher.cc b/libcurl_http_fetcher.cc
index da69281..9d32293 100644
--- a/libcurl_http_fetcher.cc
+++ b/libcurl_http_fetcher.cc
@@ -18,7 +18,6 @@
 
 using base::TimeDelta;
 using chromeos::MessageLoop;
-using std::make_pair;
 using std::max;
 using std::string;
 
@@ -363,7 +362,7 @@
     }
   } else {
     // set up callback
-    SetupMainloopSources();
+    SetupMessageLoopSources();
   }
 }
 
@@ -410,8 +409,8 @@
   CHECK_EQ(curl_easy_pause(curl_handle_, CURLPAUSE_CONT), CURLE_OK);
 }
 
-// This method sets up callbacks with the glib main loop.
-void LibcurlHttpFetcher::SetupMainloopSources() {
+// This method sets up callbacks with the MessageLoop.
+void LibcurlHttpFetcher::SetupMessageLoopSources() {
   fd_set fd_read;
   fd_set fd_write;
   fd_set fd_exc;
@@ -429,14 +428,14 @@
 
   // We should iterate through all file descriptors up to libcurl's fd_max or
   // the highest one we're tracking, whichever is larger.
-  for (size_t t = 0; t < arraysize(io_channels_); ++t) {
-    if (!io_channels_[t].empty())
-      fd_max = max(fd_max, io_channels_[t].rbegin()->first);
+  for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+    if (!fd_task_maps_[t].empty())
+      fd_max = max(fd_max, fd_task_maps_[t].rbegin()->first);
   }
 
   // For each fd, if we're not tracking it, track it. If we are tracking it, but
   // libcurl doesn't care about it anymore, stop tracking it. After this loop,
-  // there should be exactly as many GIOChannel objects in io_channels_[0|1] as
+  // there should be exactly as many tasks scheduled in fd_task_maps_[0|1] as
   // there are read/write fds that we're tracking.
   for (int fd = 0; fd <= fd_max; ++fd) {
     // Note that fd_exc is unused in the current version of libcurl so is_exc
@@ -446,16 +445,20 @@
       is_exc || (FD_ISSET(fd, &fd_read) != 0),  // track 0 -- read
       is_exc || (FD_ISSET(fd, &fd_write) != 0)  // track 1 -- write
     };
+    MessageLoop::WatchMode watch_modes[2] = {
+      MessageLoop::WatchMode::kWatchRead,
+      MessageLoop::WatchMode::kWatchWrite,
+    };
 
-    for (size_t t = 0; t < arraysize(io_channels_); ++t) {
-      bool tracked = io_channels_[t].find(fd) != io_channels_[t].end();
+    for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+      auto fd_task_it = fd_task_maps_[t].find(fd);
+      bool tracked = fd_task_it != fd_task_maps_[t].end();
 
       if (!must_track[t]) {
         // If we have an outstanding io_channel, remove it.
         if (tracked) {
-          g_source_remove(io_channels_[t][fd].second);
-          g_io_channel_unref(io_channels_[t][fd].first);
-          io_channels_[t].erase(io_channels_[t].find(fd));
+          MessageLoop::current()->CancelTask(fd_task_it->second);
+          fd_task_maps_[t].erase(fd_task_it);
         }
         continue;
       }
@@ -464,16 +467,15 @@
       if (tracked)
         continue;
 
-      // Set conditions appropriately -- read for track 0, write for track 1.
-      GIOCondition condition = static_cast<GIOCondition>(
-          ((t == 0) ? (G_IO_IN | G_IO_PRI) : G_IO_OUT) | G_IO_ERR | G_IO_HUP);
-
       // Track a new fd.
-      GIOChannel* io_channel = g_io_channel_unix_new(fd);
-      guint tag =
-          g_io_add_watch(io_channel, condition, &StaticFDCallback, this);
+      fd_task_maps_[t][fd] = MessageLoop::current()->WatchFileDescriptor(
+          FROM_HERE,
+          fd,
+          watch_modes[t],
+          true,  // persistent
+          base::Bind(&LibcurlHttpFetcher::CurlPerformOnce,
+                     base::Unretained(this)));
 
-      io_channels_[t][fd] = make_pair(io_channel, tag);
       static int io_counter = 0;
       io_counter++;
       if (io_counter % 50 == 0) {
@@ -493,16 +495,6 @@
   }
 }
 
-bool LibcurlHttpFetcher::FDCallback(GIOChannel *source,
-                                    GIOCondition condition) {
-  CurlPerformOnce();
-  // We handle removing of this source elsewhere, so we always return true.
-  // The docs say, "the function should return FALSE if the event source
-  // should be removed."
-  // http://www.gtk.org/api/2.6/glib/glib-IO-Channels.html#GIOFunc
-  return true;
-}
-
 void LibcurlHttpFetcher::RetryTimeoutCallback() {
   ResumeTransfer(url_);
   CurlPerformOnce();
@@ -525,13 +517,16 @@
   MessageLoop::current()->CancelTask(timeout_id_);
   timeout_id_ = MessageLoop::kTaskIdNull;
 
-  for (size_t t = 0; t < arraysize(io_channels_); ++t) {
-    for (IOChannels::iterator it = io_channels_[t].begin();
-         it != io_channels_[t].end(); ++it) {
-      g_source_remove(it->second.second);
-      g_io_channel_unref(it->second.first);
+  for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+    for (const auto& fd_taks_pair : fd_task_maps_[t]) {
+      if (!MessageLoop::current()->CancelTask(fd_taks_pair.second)) {
+        LOG(WARNING) << "Error canceling the watch task "
+                     << fd_taks_pair.second << " for "
+                     << (t ? "writing" : "reading") << " the fd "
+                     << fd_taks_pair.first;
+      }
     }
-    io_channels_[t].clear();
+    fd_task_maps_[t].clear();
   }
 
   if (curl_http_headers_) {