update_engine: Watch file descriptors using chromeos::MessageLoop.
This patch removes all the calls to g_io_*() functions used to create
io_channels from a file descriptor and watch for them in the main loop.
Instead, we use the chromeos::MessageLoop backed with the glib
implementation.
This patch also removes the duplicated process handling work done in
P2PManager and uses the common Subprocess class instead.
BUG=chromium:499886
TEST=Added and updated unittests.
Change-Id: Ia093b060d2396325fce69b2bbdb62957ba7bfbc6
Reviewed-on: https://chromium-review.googlesource.com/284593
Tested-by: Alex Deymo <deymo@chromium.org>
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Commit-Queue: Alex Deymo <deymo@chromium.org>
Trybot-Ready: Alex Deymo <deymo@chromium.org>
diff --git a/libcurl_http_fetcher.cc b/libcurl_http_fetcher.cc
index da69281..9d32293 100644
--- a/libcurl_http_fetcher.cc
+++ b/libcurl_http_fetcher.cc
@@ -18,7 +18,6 @@
using base::TimeDelta;
using chromeos::MessageLoop;
-using std::make_pair;
using std::max;
using std::string;
@@ -363,7 +362,7 @@
}
} else {
// set up callback
- SetupMainloopSources();
+ SetupMessageLoopSources();
}
}
@@ -410,8 +409,8 @@
CHECK_EQ(curl_easy_pause(curl_handle_, CURLPAUSE_CONT), CURLE_OK);
}
-// This method sets up callbacks with the glib main loop.
-void LibcurlHttpFetcher::SetupMainloopSources() {
+// This method sets up callbacks with the MessageLoop.
+void LibcurlHttpFetcher::SetupMessageLoopSources() {
fd_set fd_read;
fd_set fd_write;
fd_set fd_exc;
@@ -429,14 +428,14 @@
// We should iterate through all file descriptors up to libcurl's fd_max or
// the highest one we're tracking, whichever is larger.
- for (size_t t = 0; t < arraysize(io_channels_); ++t) {
- if (!io_channels_[t].empty())
- fd_max = max(fd_max, io_channels_[t].rbegin()->first);
+ for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+ if (!fd_task_maps_[t].empty())
+ fd_max = max(fd_max, fd_task_maps_[t].rbegin()->first);
}
// For each fd, if we're not tracking it, track it. If we are tracking it, but
// libcurl doesn't care about it anymore, stop tracking it. After this loop,
- // there should be exactly as many GIOChannel objects in io_channels_[0|1] as
+ // there should be exactly as many tasks scheduled in fd_task_maps_[0|1] as
// there are read/write fds that we're tracking.
for (int fd = 0; fd <= fd_max; ++fd) {
// Note that fd_exc is unused in the current version of libcurl so is_exc
@@ -446,16 +445,20 @@
is_exc || (FD_ISSET(fd, &fd_read) != 0), // track 0 -- read
is_exc || (FD_ISSET(fd, &fd_write) != 0) // track 1 -- write
};
+ MessageLoop::WatchMode watch_modes[2] = {
+ MessageLoop::WatchMode::kWatchRead,
+ MessageLoop::WatchMode::kWatchWrite,
+ };
- for (size_t t = 0; t < arraysize(io_channels_); ++t) {
- bool tracked = io_channels_[t].find(fd) != io_channels_[t].end();
+ for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+ auto fd_task_it = fd_task_maps_[t].find(fd);
+ bool tracked = fd_task_it != fd_task_maps_[t].end();
if (!must_track[t]) {
// If we have an outstanding io_channel, remove it.
if (tracked) {
- g_source_remove(io_channels_[t][fd].second);
- g_io_channel_unref(io_channels_[t][fd].first);
- io_channels_[t].erase(io_channels_[t].find(fd));
+ MessageLoop::current()->CancelTask(fd_task_it->second);
+ fd_task_maps_[t].erase(fd_task_it);
}
continue;
}
@@ -464,16 +467,15 @@
if (tracked)
continue;
- // Set conditions appropriately -- read for track 0, write for track 1.
- GIOCondition condition = static_cast<GIOCondition>(
- ((t == 0) ? (G_IO_IN | G_IO_PRI) : G_IO_OUT) | G_IO_ERR | G_IO_HUP);
-
// Track a new fd.
- GIOChannel* io_channel = g_io_channel_unix_new(fd);
- guint tag =
- g_io_add_watch(io_channel, condition, &StaticFDCallback, this);
+ fd_task_maps_[t][fd] = MessageLoop::current()->WatchFileDescriptor(
+ FROM_HERE,
+ fd,
+ watch_modes[t],
+ true, // persistent
+ base::Bind(&LibcurlHttpFetcher::CurlPerformOnce,
+ base::Unretained(this)));
- io_channels_[t][fd] = make_pair(io_channel, tag);
static int io_counter = 0;
io_counter++;
if (io_counter % 50 == 0) {
@@ -493,16 +495,6 @@
}
}
-bool LibcurlHttpFetcher::FDCallback(GIOChannel *source,
- GIOCondition condition) {
- CurlPerformOnce();
- // We handle removing of this source elsewhere, so we always return true.
- // The docs say, "the function should return FALSE if the event source
- // should be removed."
- // http://www.gtk.org/api/2.6/glib/glib-IO-Channels.html#GIOFunc
- return true;
-}
-
void LibcurlHttpFetcher::RetryTimeoutCallback() {
ResumeTransfer(url_);
CurlPerformOnce();
@@ -525,13 +517,16 @@
MessageLoop::current()->CancelTask(timeout_id_);
timeout_id_ = MessageLoop::kTaskIdNull;
- for (size_t t = 0; t < arraysize(io_channels_); ++t) {
- for (IOChannels::iterator it = io_channels_[t].begin();
- it != io_channels_[t].end(); ++it) {
- g_source_remove(it->second.second);
- g_io_channel_unref(it->second.first);
+ for (size_t t = 0; t < arraysize(fd_task_maps_); ++t) {
+ for (const auto& fd_taks_pair : fd_task_maps_[t]) {
+ if (!MessageLoop::current()->CancelTask(fd_taks_pair.second)) {
+ LOG(WARNING) << "Error canceling the watch task "
+ << fd_taks_pair.second << " for "
+ << (t ? "writing" : "reading") << " the fd "
+ << fd_taks_pair.first;
+ }
}
- io_channels_[t].clear();
+ fd_task_maps_[t].clear();
}
if (curl_http_headers_) {