update_engine: Replace FileWatcher.

base::MessageLoop::WatchFileDescriptor() is removed in the next uprev.
This CL replaces the uses (including indirect uses), by FileDescriptorWatcher.

BUG=chromium:909719
TEST=Built locally. Ran cros_run_unit_tests.

Change-Id: Ide9f94daf2be28696ec6bc1f82ab46a2bd2b6c6f
diff --git a/common/subprocess.cc b/common/subprocess.cc
index 0131f10..36655c7 100644
--- a/common/subprocess.cc
+++ b/common/subprocess.cc
@@ -127,8 +127,7 @@
     if (!ok || eof) {
       // There was either an error or an EOF condition, so we are done watching
       // the file descriptor.
-      MessageLoop::current()->CancelTask(record->stdout_task_id);
-      record->stdout_task_id = MessageLoop::kTaskIdNull;
+      record->stdout_controller.reset();
       return;
     }
   } while (bytes_read);
@@ -143,8 +142,7 @@
   // Make sure we read any remaining process output and then close the pipe.
   OnStdoutReady(record);
 
-  MessageLoop::current()->CancelTask(record->stdout_task_id);
-  record->stdout_task_id = MessageLoop::kTaskIdNull;
+  record->stdout_controller.reset();
 
   // Don't print any log if the subprocess exited with exit code 0.
   if (info.si_code != CLD_EXITED) {
@@ -199,12 +197,9 @@
                << record->stdout_fd << ".";
   }
 
-  record->stdout_task_id = MessageLoop::current()->WatchFileDescriptor(
-      FROM_HERE,
+  record->stdout_controller = base::FileDescriptorWatcher::WatchReadable(
       record->stdout_fd,
-      MessageLoop::WatchMode::kWatchRead,
-      true,
-      base::Bind(&Subprocess::OnStdoutReady, record.get()));
+      base::BindRepeating(&Subprocess::OnStdoutReady, record.get()));
 
   subprocess_records_[pid] = std::move(record);
   return pid;
diff --git a/common/subprocess.h b/common/subprocess.h
index bc19d16..bac9e48 100644
--- a/common/subprocess.h
+++ b/common/subprocess.h
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include <base/callback.h>
+#include <base/files/file_descriptor_watcher_posix.h>
 #include <base/logging.h>
 #include <base/macros.h>
 #include <brillo/asynchronous_signal_handler_interface.h>
@@ -120,8 +121,7 @@
 
     // These are used to monitor the stdout of the running process, including
     // the stderr if it was redirected.
-    brillo::MessageLoop::TaskId stdout_task_id{
-        brillo::MessageLoop::kTaskIdNull};
+    std::unique_ptr<base::FileDescriptorWatcher::Controller> stdout_controller;
     int stdout_fd{-1};
     std::string stdout;
   };
diff --git a/libcurl_http_fetcher.cc b/libcurl_http_fetcher.cc
index 247327a..6b30eeb 100644
--- a/libcurl_http_fetcher.cc
+++ b/libcurl_http_fetcher.cc
@@ -80,16 +80,8 @@
 #endif  // __ANDROID__
   LibcurlHttpFetcher* fetcher = static_cast<LibcurlHttpFetcher*>(clientp);
   // Stop watching the socket before closing it.
-  for (size_t t = 0; t < arraysize(fetcher->fd_task_maps_); ++t) {
-    const auto fd_task_pair = fetcher->fd_task_maps_[t].find(item);
-    if (fd_task_pair != fetcher->fd_task_maps_[t].end()) {
-      if (!MessageLoop::current()->CancelTask(fd_task_pair->second)) {
-        LOG(WARNING) << "Error canceling the watch task "
-                     << fd_task_pair->second << " for "
-                     << (t ? "writing" : "reading") << " the fd " << item;
-      }
-      fetcher->fd_task_maps_[t].erase(item);
-    }
+  for (size_t t = 0; t < arraysize(fetcher->fd_controller_maps_); ++t) {
+    fetcher->fd_controller_maps_[t].erase(item);
   }
 
   // Documentation for this callback says to return 0 on success or 1 on error.
@@ -701,15 +693,15 @@
 
   // We should iterate through all file descriptors up to libcurl's fd_max or
   // the highest one we're tracking, whichever is larger.
-  for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
-    if (!fd_task_maps_[t].empty())
-      fd_max = max(fd_max, fd_task_maps_[t].rbegin()->first);
+  for (size_t t = 0; t < arraysize(fd_controller_maps_); ++t) {
+    if (!fd_controller_maps_[t].empty())
+      fd_max = max(fd_max, fd_controller_maps_[t].rbegin()->first);
   }
 
   // For each fd, if we're not tracking it, track it. If we are tracking it, but
   // libcurl doesn't care about it anymore, stop tracking it. After this loop,
-  // there should be exactly as many tasks scheduled in fd_task_maps_[0|1] as
-  // there are read/write fds that we're tracking.
+  // there should be exactly as many tasks scheduled in
+  // fd_controller_maps_[0|1] as there are read/write fds that we're tracking.
   for (int fd = 0; fd <= fd_max; ++fd) {
     // Note that fd_exc is unused in the current version of libcurl so is_exc
     // should always be false.
@@ -718,21 +710,14 @@
         is_exc || (FD_ISSET(fd, &fd_read) != 0),  // track 0 -- read
         is_exc || (FD_ISSET(fd, &fd_write) != 0)  // track 1 -- write
     };
-    MessageLoop::WatchMode watch_modes[2] = {
-        MessageLoop::WatchMode::kWatchRead,
-        MessageLoop::WatchMode::kWatchWrite,
-    };
 
-    for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
-      auto fd_task_it = fd_task_maps_[t].find(fd);
-      bool tracked = fd_task_it != fd_task_maps_[t].end();
+    for (size_t t = 0; t < arraysize(fd_controller_maps_); ++t) {
+      bool tracked =
+          fd_controller_maps_[t].find(fd) != fd_controller_maps_[t].end();
 
       if (!must_track[t]) {
         // If we have an outstanding io_channel, remove it.
-        if (tracked) {
-          MessageLoop::current()->CancelTask(fd_task_it->second);
-          fd_task_maps_[t].erase(fd_task_it);
-        }
+        fd_controller_maps_[t].erase(fd);
         continue;
       }
 
@@ -741,14 +726,21 @@
         continue;
 
       // Track a new fd.
-      fd_task_maps_[t][fd] = MessageLoop::current()->WatchFileDescriptor(
-          FROM_HERE,
-          fd,
-          watch_modes[t],
-          true,  // persistent
-          base::Bind(&LibcurlHttpFetcher::CurlPerformOnce,
-                     base::Unretained(this)));
-
+      switch (t) {
+        case 0:  // Read
+          fd_controller_maps_[t][fd] =
+              base::FileDescriptorWatcher::WatchReadable(
+                  fd,
+                  base::BindRepeating(&LibcurlHttpFetcher::CurlPerformOnce,
+                                      base::Unretained(this)));
+          break;
+        case 1:  // Write
+          fd_controller_maps_[t][fd] =
+              base::FileDescriptorWatcher::WatchWritable(
+                  fd,
+                  base::BindRepeating(&LibcurlHttpFetcher::CurlPerformOnce,
+                                      base::Unretained(this)));
+      }
       static int io_counter = 0;
       io_counter++;
       if (io_counter % 50 == 0) {
@@ -800,15 +792,8 @@
   MessageLoop::current()->CancelTask(timeout_id_);
   timeout_id_ = MessageLoop::kTaskIdNull;
 
-  for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
-    for (const auto& fd_taks_pair : fd_task_maps_[t]) {
-      if (!MessageLoop::current()->CancelTask(fd_taks_pair.second)) {
-        LOG(WARNING) << "Error canceling the watch task " << fd_taks_pair.second
-                     << " for " << (t ? "writing" : "reading") << " the fd "
-                     << fd_taks_pair.first;
-      }
-    }
-    fd_task_maps_[t].clear();
+  for (size_t t = 0; t < arraysize(fd_controller_maps_); ++t) {
+    fd_controller_maps_[t].clear();
   }
 
   if (curl_http_headers_) {
diff --git a/libcurl_http_fetcher.h b/libcurl_http_fetcher.h
index cdd489d..8f4258d 100644
--- a/libcurl_http_fetcher.h
+++ b/libcurl_http_fetcher.h
@@ -24,6 +24,7 @@
 
 #include <curl/curl.h>
 
+#include <base/files/file_descriptor_watcher_posix.h>
 #include <base/logging.h>
 #include <base/macros.h>
 #include <brillo/message_loops/message_loop.h>
@@ -215,7 +216,7 @@
   }
 
   // Cleans up the following if they are non-null:
-  // curl(m) handles, fd_task_maps_, timeout_id_.
+  // curl(m) handles, fd_controller_maps_, timeout_id_.
   void CleanUp();
 
   // Force terminate the transfer. This will invoke the delegate's (if any)
@@ -252,7 +253,8 @@
   // the message loop. libcurl may open/close descriptors and switch their
   // directions so maintain two separate lists so that watch conditions can be
   // set appropriately.
-  std::map<int, brillo::MessageLoop::TaskId> fd_task_maps_[2];
+  std::map<int, std::unique_ptr<base::FileDescriptorWatcher::Controller>>
+      fd_controller_maps_[2];
 
   // The TaskId of the timer we're waiting on. kTaskIdNull if we are not waiting
   // on it.
diff --git a/payload_consumer/postinstall_runner_action.cc b/payload_consumer/postinstall_runner_action.cc
index 894ac7d..264161c 100644
--- a/payload_consumer/postinstall_runner_action.cc
+++ b/payload_consumer/postinstall_runner_action.cc
@@ -217,13 +217,10 @@
     PLOG(ERROR) << "Unable to set non-blocking I/O mode on fd " << progress_fd_;
   }
 
-  progress_task_ = MessageLoop::current()->WatchFileDescriptor(
-      FROM_HERE,
+  progress_controller_ = base::FileDescriptorWatcher::WatchReadable(
       progress_fd_,
-      MessageLoop::WatchMode::kWatchRead,
-      true,
-      base::Bind(&PostinstallRunnerAction::OnProgressFdReady,
-                 base::Unretained(this)));
+      base::BindRepeating(&PostinstallRunnerAction::OnProgressFdReady,
+                          base::Unretained(this)));
 }
 
 void PostinstallRunnerAction::OnProgressFdReady() {
@@ -248,8 +245,7 @@
     if (!ok || eof) {
       // There was either an error or an EOF condition, so we are done watching
       // the file descriptor.
-      MessageLoop::current()->CancelTask(progress_task_);
-      progress_task_ = MessageLoop::kTaskIdNull;
+      progress_controller_.reset();
       return;
     }
   } while (bytes_read);
@@ -293,10 +289,7 @@
   fs_mount_dir_.clear();
 
   progress_fd_ = -1;
-  if (progress_task_ != MessageLoop::kTaskIdNull) {
-    MessageLoop::current()->CancelTask(progress_task_);
-    progress_task_ = MessageLoop::kTaskIdNull;
-  }
+  progress_controller_.reset();
   progress_buffer_.clear();
 }
 
diff --git a/payload_consumer/postinstall_runner_action.h b/payload_consumer/postinstall_runner_action.h
index b9b7069..838b235 100644
--- a/payload_consumer/postinstall_runner_action.h
+++ b/payload_consumer/postinstall_runner_action.h
@@ -17,9 +17,11 @@
 #ifndef UPDATE_ENGINE_PAYLOAD_CONSUMER_POSTINSTALL_RUNNER_ACTION_H_
 #define UPDATE_ENGINE_PAYLOAD_CONSUMER_POSTINSTALL_RUNNER_ACTION_H_
 
+#include <memory>
 #include <string>
 #include <vector>
 
+#include <base/files/file_descriptor_watcher_posix.h>
 #include <brillo/message_loops/message_loop.h>
 #include <gtest/gtest_prod.h>
 
@@ -139,7 +141,7 @@
   // The parent progress file descriptor used to watch for progress reports from
   // the postinstall program and the task watching for them.
   int progress_fd_{-1};
-  brillo::MessageLoop::TaskId progress_task_{brillo::MessageLoop::kTaskIdNull};
+  std::unique_ptr<base::FileDescriptorWatcher::Controller> progress_controller_;
 
   // A buffer of a partial read line from the progress file descriptor.
   std::string progress_buffer_;