update_engine: Migrate time-based glib main loop calls to MessageLoop.

This patch replaces most calls to g_idle_add* and g_timeout_add* with
the equivalent MessageLoop::Post*Task(). To maintain compatibility with
unittests running the main loop and doing I/O we instantiate a
GlibMessageLoop for those tests.

BUG=chromium:499886
TEST=unittests still pass.

Change-Id: Ic87ba69bc47391ac3c36d1bfc3ca28d069666af1
Reviewed-on: https://chromium-review.googlesource.com/281197
Reviewed-by: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Deymo <deymo@chromium.org>
Commit-Queue: Alex Deymo <deymo@chromium.org>
Trybot-Ready: Alex Deymo <deymo@chromium.org>
diff --git a/chrome_browser_proxy_resolver.cc b/chrome_browser_proxy_resolver.cc
index 7696d4b..116c67a 100644
--- a/chrome_browser_proxy_resolver.cc
+++ b/chrome_browser_proxy_resolver.cc
@@ -22,6 +22,8 @@
 namespace chromeos_update_engine {
 
 using base::StringTokenizer;
+using base::TimeDelta;
+using chromeos::MessageLoop;
 using std::deque;
 using std::make_pair;
 using std::pair;
@@ -52,7 +54,7 @@
 
 ChromeBrowserProxyResolver::ChromeBrowserProxyResolver(
     DBusWrapperInterface* dbus)
-    : dbus_(dbus), proxy_(nullptr), timeout_(kTimeout) {}
+    : dbus_(dbus), timeout_(kTimeout) {}
 
 bool ChromeBrowserProxyResolver::Init() {
   if (proxy_)
@@ -105,8 +107,8 @@
 
   // Kill outstanding timers
   for (auto& timer : timers_) {
-    g_source_destroy(timer.second);
-    timer.second = nullptr;
+    MessageLoop::current()->CancelTask(timer.second);
+    timer.second = MessageLoop::kTaskIdNull;
   }
 }
 
@@ -137,14 +139,12 @@
   }
 
   callbacks_.insert(make_pair(url, make_pair(callback, data)));
-  base::Closure* closure = new base::Closure(base::Bind(
-      &ChromeBrowserProxyResolver::HandleTimeout,
-      base::Unretained(this),
-      url));
-  GSource* timer = g_timeout_source_new_seconds(timeout);
-  g_source_set_callback(
-      timer, utils::GlibRunClosure, closure, utils::GlibDestroyClosure);
-  g_source_attach(timer, nullptr);
+  MessageLoop::TaskId timer = MessageLoop::current()->PostDelayedTask(
+      FROM_HERE,
+      base::Bind(&ChromeBrowserProxyResolver::HandleTimeout,
+                 base::Unretained(this),
+                 url),
+      TimeDelta::FromSeconds(timeout));
   timers_.insert(make_pair(url, timer));
   return true;
 }
@@ -196,7 +196,7 @@
     TEST_AND_RETURN_FALSE(it != timers_.end());
     TEST_AND_RETURN_FALSE(it->first == source_url);
     if (delete_timer)
-      g_source_destroy(it->second);
+      MessageLoop::current()->CancelTask(it->second);
     timers_.erase(it);
   }
   return true;
diff --git a/chrome_browser_proxy_resolver.h b/chrome_browser_proxy_resolver.h
index 8b47672..ab0a92c 100644
--- a/chrome_browser_proxy_resolver.h
+++ b/chrome_browser_proxy_resolver.h
@@ -14,6 +14,8 @@
 #include <dbus/dbus-glib-lowlevel.h>
 #include <gtest/gtest_prod.h>  // for FRIEND_TEST
 
+#include <chromeos/message_loops/message_loop.h>
+
 #include "update_engine/dbus_wrapper_interface.h"
 #include "update_engine/proxy_resolver.h"
 
@@ -52,7 +54,7 @@
   FRIEND_TEST(ChromeBrowserProxyResolverTest, SuccessTest);
   typedef std::multimap<std::string, std::pair<ProxiesResolvedFn, void*>>
       CallbacksMap;
-  typedef std::multimap<std::string, GSource*> TimeoutsMap;
+  typedef std::multimap<std::string, chromeos::MessageLoop::TaskId> TimeoutsMap;
 
   // Handle a reply from Chrome:
   void HandleReply(const std::string& source_url,
@@ -68,7 +70,7 @@
   static std::deque<std::string> ParseProxyString(const std::string& input);
 
   // Deletes internal state for the first instance of url in the state.
-  // If delete_timer is set, calls g_source_destroy on the timer source.
+  // If delete_timer is set, calls CancelTask on the timer id.
   // Returns the callback in an out parameter. Returns true on success.
   bool DeleteUrlState(const std::string& url,
                       bool delete_timer,
@@ -78,8 +80,8 @@
   void Shutdown();
 
   DBusWrapperInterface* dbus_;
-  DBusGProxy* proxy_;
-  DBusGProxy* peer_proxy_;
+  DBusGProxy* proxy_{nullptr};
+  DBusGProxy* peer_proxy_{nullptr};
   int timeout_;
   TimeoutsMap timers_;
   CallbacksMap callbacks_;
diff --git a/chrome_browser_proxy_resolver_unittest.cc b/chrome_browser_proxy_resolver_unittest.cc
index d3cf99e..3866358 100644
--- a/chrome_browser_proxy_resolver_unittest.cc
+++ b/chrome_browser_proxy_resolver_unittest.cc
@@ -9,20 +9,36 @@
 
 #include <gtest/gtest.h>
 
+#include <base/bind.h>
+#include <chromeos/message_loops/fake_message_loop.h>
+
 #include "update_engine/mock_dbus_wrapper.h"
 
 using ::testing::Return;
 using ::testing::SetArgPointee;
 using ::testing::StrEq;
 using ::testing::_;
+using chromeos::MessageLoop;
 using std::deque;
 using std::string;
 
 namespace chromeos_update_engine {
 
-class ChromeBrowserProxyResolverTest : public ::testing::Test { };
+class ChromeBrowserProxyResolverTest : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    loop_.SetAsCurrent();
+  }
 
-TEST(ChromeBrowserProxyResolverTest, ParseTest) {
+  void TearDown() override {
+    EXPECT_FALSE(loop_.PendingTasks());
+  }
+
+ private:
+  chromeos::FakeMessageLoop loop_{nullptr};
+};
+
+TEST_F(ChromeBrowserProxyResolverTest, ParseTest) {
   // Test ideas from
   // http://src.chromium.org/svn/trunk/src/net/proxy/proxy_list_unittest.cc
   const char* inputs[] = {
@@ -79,31 +95,26 @@
 
 namespace {
 void DBusWrapperTestResolved(const deque<string>& proxies,
-                             void* data) {
+                             void* /* pirv_data */) {
   EXPECT_EQ(2, proxies.size());
   EXPECT_EQ("socks5://192.168.52.83:5555", proxies[0]);
   EXPECT_EQ(kNoProxy, proxies[1]);
-  g_main_loop_quit(reinterpret_cast<GMainLoop*>(data));
+  MessageLoop::current()->BreakLoop();
 }
 void DBusWrapperTestResolvedNoReply(const deque<string>& proxies,
-                                    void* data) {
+                                    void* /* pirv_data */) {
   EXPECT_EQ(1, proxies.size());
   EXPECT_EQ(kNoProxy, proxies[0]);
-  g_main_loop_quit(reinterpret_cast<GMainLoop*>(data));
+  MessageLoop::current()->BreakLoop();
 }
-struct SendReplyArgs {
-  DBusConnection* connection;
-  DBusMessage* message;
-  ChromeBrowserProxyResolver* resolver;
-};
-gboolean SendReply(gpointer data) {
+
+void SendReply(DBusConnection* connection,
+               DBusMessage* message,
+               ChromeBrowserProxyResolver* resolver) {
   LOG(INFO) << "Calling SendReply";
-  SendReplyArgs* args = reinterpret_cast<SendReplyArgs*>(data);
-  ChromeBrowserProxyResolver::StaticFilterMessage(
-      args->connection,
-      args->message,
-      args->resolver);
-  return FALSE;  // Don't keep calling this function
+  ChromeBrowserProxyResolver::StaticFilterMessage(connection,
+                                                  message,
+                                                  resolver);
 }
 
 // chrome_replies should be set to whether or not we fake a reply from
@@ -169,37 +180,32 @@
                         Return(TRUE)));
   }
 
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
-
   ChromeBrowserProxyResolver resolver(&dbus_iface);
   EXPECT_EQ(true, resolver.Init());
   resolver.set_timeout(1);
-  SendReplyArgs args = {
-    kMockSystemBus,
-    kMockDbusMessage,
-    &resolver
-  };
-  if (chrome_replies)
-    g_idle_add(SendReply, &args);
-  EXPECT_TRUE(resolver.GetProxiesForUrl(kUrl,
-                                        chrome_replies ?
-                                        &DBusWrapperTestResolved :
-                                        &DBusWrapperTestResolvedNoReply,
-                                        loop));
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+  ProxiesResolvedFn get_proxies_response = &DBusWrapperTestResolvedNoReply;
+
+  if (chrome_replies) {
+    get_proxies_response = &DBusWrapperTestResolved;
+    MessageLoop::current()->PostTask(
+        FROM_HERE,
+        base::Bind(&SendReply, kMockSystemBus, kMockDbusMessage, &resolver));
+  }
+
+  EXPECT_TRUE(resolver.GetProxiesForUrl(kUrl, get_proxies_response, nullptr));
+  MessageLoop::current()->Run();
 }
 }  // namespace
 
-TEST(ChromeBrowserProxyResolverTest, SuccessTest) {
+TEST_F(ChromeBrowserProxyResolverTest, SuccessTest) {
   RunTest(true, true);
 }
 
-TEST(ChromeBrowserProxyResolverTest, NoReplyTest) {
+TEST_F(ChromeBrowserProxyResolverTest, NoReplyTest) {
   RunTest(false, true);
 }
 
-TEST(ChromeBrowserProxyResolverTest, NoChromeTest) {
+TEST_F(ChromeBrowserProxyResolverTest, NoChromeTest) {
   RunTest(false, false);
 }
 
diff --git a/download_action.cc b/download_action.cc
index 6f55220..8e6061c 100644
--- a/download_action.cc
+++ b/download_action.cc
@@ -8,7 +8,6 @@
 #include <algorithm>
 #include <string>
 #include <vector>
-#include <glib.h>
 
 #include <base/files/file_path.h>
 #include <base/strings/stringprintf.h>
@@ -17,7 +16,6 @@
 #include "update_engine/omaha_request_params.h"
 #include "update_engine/p2p_manager.h"
 #include "update_engine/payload_state_interface.h"
-#include "update_engine/subprocess.h"
 #include "update_engine/utils.h"
 
 using base::FilePath;
@@ -99,7 +97,7 @@
   return true;
 }
 
-void DownloadAction::WriteToP2PFile(const void *data,
+void DownloadAction::WriteToP2PFile(const void* data,
                                     size_t length,
                                     off_t file_offset) {
   if (p2p_sharing_fd_ == -1) {
@@ -247,7 +245,7 @@
   bytes_received_ = offset;
 }
 
-void DownloadAction::ReceivedBytes(HttpFetcher *fetcher,
+void DownloadAction::ReceivedBytes(HttpFetcher* fetcher,
                                    const void* bytes,
                                    size_t length) {
   // Note that bytes_received_ is the current offset.
@@ -281,7 +279,7 @@
   }
 }
 
-void DownloadAction::TransferComplete(HttpFetcher *fetcher, bool successful) {
+void DownloadAction::TransferComplete(HttpFetcher* fetcher, bool successful) {
   if (writer_) {
     LOG_IF(WARNING, writer_->Close() != 0) << "Error closing the writer.";
     writer_ = nullptr;
diff --git a/download_action.h b/download_action.h
index 52efb46..0d51d9a 100644
--- a/download_action.h
+++ b/download_action.h
@@ -69,11 +69,11 @@
   std::string Type() const override { return StaticType(); }
 
   // HttpFetcherDelegate methods (see http_fetcher.h)
-  void ReceivedBytes(HttpFetcher *fetcher,
+  void ReceivedBytes(HttpFetcher* fetcher,
                      const void* bytes, size_t length) override;
   void SeekToOffset(off_t offset) override;
-  void TransferComplete(HttpFetcher *fetcher, bool successful) override;
-  void TransferTerminated(HttpFetcher *fetcher) override;
+  void TransferComplete(HttpFetcher* fetcher, bool successful) override;
+  void TransferTerminated(HttpFetcher* fetcher) override;
 
   DownloadActionDelegate* delegate() const { return delegate_; }
   void set_delegate(DownloadActionDelegate* delegate) {
@@ -103,7 +103,7 @@
   //
   // This method does nothing if SetupP2PSharingFd() hasn't been
   // called or if CloseP2PSharingFd() has been called.
-  void WriteToP2PFile(const void *data, size_t length, off_t file_offset);
+  void WriteToP2PFile(const void* data, size_t length, off_t file_offset);
 
   // The InstallPlan passed in
   InstallPlan install_plan_;
diff --git a/download_action_unittest.cc b/download_action_unittest.cc
index 1dc1beb..254dbd7 100644
--- a/download_action_unittest.cc
+++ b/download_action_unittest.cc
@@ -13,9 +13,14 @@
 #include <utility>
 #include <vector>
 
+#include <base/bind.h>
 #include <base/files/file_path.h>
 #include <base/files/file_util.h>
+#include <base/location.h>
 #include <base/strings/stringprintf.h>
+#include <chromeos/bind_lambda.h>
+#include <chromeos/message_loops/fake_message_loop.h>
+#include <chromeos/message_loops/message_loop.h>
 
 #include "update_engine/action_pipe.h"
 #include "update_engine/fake_p2p_manager_configuration.h"
@@ -53,16 +58,14 @@
 class DownloadActionTestProcessorDelegate : public ActionProcessorDelegate {
  public:
   explicit DownloadActionTestProcessorDelegate(ErrorCode expected_code)
-      : loop_(nullptr),
-        processing_done_called_(false),
+      : processing_done_called_(false),
         expected_code_(expected_code) {}
   ~DownloadActionTestProcessorDelegate() override {
     EXPECT_TRUE(processing_done_called_);
   }
   void ProcessingDone(const ActionProcessor* processor,
                       ErrorCode code) override {
-    ASSERT_TRUE(loop_);
-    g_main_loop_quit(loop_);
+    chromeos::MessageLoop::current()->BreakLoop();
     chromeos::Blob found_data;
     ASSERT_TRUE(utils::ReadFile(path_, &found_data));
     if (expected_code_ != ErrorCode::kDownloadWriteError) {
@@ -85,7 +88,6 @@
     }
   }
 
-  GMainLoop *loop_;
   string path_;
   chromeos::Blob expected_data_;
   bool processing_done_called_;
@@ -110,25 +112,17 @@
   int current_write_;
 };
 
-struct StartProcessorInRunLoopArgs {
-  ActionProcessor* processor;
-  MockHttpFetcher* http_fetcher;
-};
-
-gboolean StartProcessorInRunLoop(gpointer data) {
-  ActionProcessor* processor =
-      reinterpret_cast<StartProcessorInRunLoopArgs*>(data)->processor;
+void StartProcessorInRunLoop(ActionProcessor* processor,
+                             MockHttpFetcher* http_fetcher) {
   processor->StartProcessing();
-  MockHttpFetcher* http_fetcher =
-      reinterpret_cast<StartProcessorInRunLoopArgs*>(data)->http_fetcher;
   http_fetcher->SetOffset(1);
-  return FALSE;
 }
 
 void TestWithData(const chromeos::Blob& data,
                   int fail_write,
                   bool use_download_delegate) {
-  GMainLoop *loop = g_main_loop_new(g_main_context_default(), FALSE);
+  chromeos::FakeMessageLoop loop(nullptr);
+  loop.SetAsCurrent();
 
   // TODO(adlr): see if we need a different file for build bots
   ScopedTempFile output_temp_file;
@@ -177,7 +171,6 @@
   if (fail_write > 0)
     expected_code = ErrorCode::kDownloadWriteError;
   DownloadActionTestProcessorDelegate delegate(expected_code);
-  delegate.loop_ = loop;
   delegate.expected_data_ = chromeos::Blob(data.begin() + 1, data.end());
   delegate.path_ = output_temp_file.GetPath();
   ActionProcessor processor;
@@ -185,12 +178,10 @@
   processor.EnqueueAction(&feeder_action);
   processor.EnqueueAction(&download_action);
 
-  StartProcessorInRunLoopArgs args;
-  args.processor = &processor;
-  args.http_fetcher = http_fetcher;
-  g_timeout_add(0, &StartProcessorInRunLoop, &args);
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+  loop.PostTask(FROM_HERE,
+                base::Bind(&StartProcessorInRunLoop, &processor, http_fetcher));
+  loop.Run();
+  EXPECT_FALSE(loop.PendingTasks());
 }
 }  // namespace
 
@@ -240,22 +231,19 @@
 class TerminateEarlyTestProcessorDelegate : public ActionProcessorDelegate {
  public:
   void ProcessingStopped(const ActionProcessor* processor) {
-    ASSERT_TRUE(loop_);
-    g_main_loop_quit(loop_);
+    chromeos::MessageLoop::current()->BreakLoop();
   }
-  GMainLoop *loop_;
 };
 
-gboolean TerminateEarlyTestStarter(gpointer data) {
-  ActionProcessor *processor = reinterpret_cast<ActionProcessor*>(data);
+void TerminateEarlyTestStarter(ActionProcessor* processor) {
   processor->StartProcessing();
   CHECK(processor->IsRunning());
   processor->StopProcessing();
-  return FALSE;
 }
 
 void TestTerminateEarly(bool use_download_delegate) {
-  GMainLoop *loop = g_main_loop_new(g_main_context_default(), FALSE);
+  chromeos::FakeMessageLoop loop(nullptr);
+  loop.SetAsCurrent();
 
   chromeos::Blob data(kMockHttpFetcherChunkSize +
                       kMockHttpFetcherChunkSize / 2);
@@ -284,16 +272,16 @@
       EXPECT_CALL(download_delegate, SetDownloadStatus(false)).Times(1);
     }
     TerminateEarlyTestProcessorDelegate delegate;
-    delegate.loop_ = loop;
     ActionProcessor processor;
     processor.set_delegate(&delegate);
     processor.EnqueueAction(&feeder_action);
     processor.EnqueueAction(&download_action);
     BondActions(&feeder_action, &download_action);
 
-    g_timeout_add(0, &TerminateEarlyTestStarter, &processor);
-    g_main_loop_run(loop);
-    g_main_loop_unref(loop);
+    loop.PostTask(FROM_HERE,
+                  base::Bind(&TerminateEarlyTestStarter, &processor));
+    loop.Run();
+    EXPECT_FALSE(loop.PendingTasks());
   }
 
   // 1 or 0 chunks should have come through
@@ -350,21 +338,15 @@
 class PassObjectOutTestProcessorDelegate : public ActionProcessorDelegate {
  public:
   void ProcessingDone(const ActionProcessor* processor, ErrorCode code) {
-    ASSERT_TRUE(loop_);
-    g_main_loop_quit(loop_);
+    chromeos::MessageLoop::current()->BreakLoop();
   }
-  GMainLoop *loop_;
 };
 
-gboolean PassObjectOutTestStarter(gpointer data) {
-  ActionProcessor *processor = reinterpret_cast<ActionProcessor*>(data);
-  processor->StartProcessing();
-  return FALSE;
-}
 }  // namespace
 
 TEST(DownloadActionTest, PassObjectOutTest) {
-  GMainLoop *loop = g_main_loop_new(g_main_context_default(), FALSE);
+  chromeos::FakeMessageLoop loop(nullptr);
+  loop.SetAsCurrent();
 
   DirectFileWriter writer;
 
@@ -395,21 +377,22 @@
 
   ActionProcessor processor;
   PassObjectOutTestProcessorDelegate delegate;
-  delegate.loop_ = loop;
   processor.set_delegate(&delegate);
   processor.EnqueueAction(&feeder_action);
   processor.EnqueueAction(&download_action);
   processor.EnqueueAction(&test_action);
 
-  g_timeout_add(0, &PassObjectOutTestStarter, &processor);
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+  loop.PostTask(FROM_HERE,
+                base::Bind([&processor] { processor.StartProcessing(); }));
+  loop.Run();
+  EXPECT_FALSE(loop.PendingTasks());
 
   EXPECT_EQ(true, test_action.did_run_);
 }
 
 TEST(DownloadActionTest, BadOutFileTest) {
-  GMainLoop *loop = g_main_loop_new(g_main_context_default(), FALSE);
+  chromeos::FakeMessageLoop loop(nullptr);
+  loop.SetAsCurrent();
 
   const string path("/fake/path/that/cant/be/created/because/of/missing/dirs");
   DirectFileWriter writer;
@@ -432,34 +415,27 @@
   processor.StartProcessing();
   ASSERT_FALSE(processor.IsRunning());
 
-  g_main_loop_unref(loop);
-}
-
-gboolean StartProcessorInRunLoopForP2P(gpointer user_data) {
-  ActionProcessor* processor = reinterpret_cast<ActionProcessor*>(user_data);
-  processor->StartProcessing();
-  return FALSE;
+  loop.Run();
+  EXPECT_FALSE(loop.PendingTasks());
 }
 
 // Test fixture for P2P tests.
 class P2PDownloadActionTest : public testing::Test {
  protected:
   P2PDownloadActionTest()
-    : loop_(nullptr),
-      start_at_offset_(0),
+    : start_at_offset_(0),
       fake_um_(fake_system_state_.fake_clock()) {}
 
   ~P2PDownloadActionTest() override {}
 
   // Derived from testing::Test.
   void SetUp() override {
-    loop_ = g_main_loop_new(g_main_context_default(), FALSE);
+    loop_.SetAsCurrent();
   }
 
   // Derived from testing::Test.
   void TearDown() override {
-    if (loop_ != nullptr)
-      g_main_loop_unref(loop_);
+    EXPECT_FALSE(loop_.PendingTasks());
   }
 
   // To be called by tests to setup the download. The
@@ -513,7 +489,6 @@
     download_action_->SetTestFileWriter(&writer);
     BondActions(&feeder_action, download_action_.get());
     DownloadActionTestProcessorDelegate delegate(ErrorCode::kSuccess);
-    delegate.loop_ = loop_;
     delegate.expected_data_ = chromeos::Blob(data_.begin() + start_at_offset_,
                                              data_.end());
     delegate.path_ = output_temp_file.GetPath();
@@ -521,10 +496,15 @@
     processor_.EnqueueAction(&feeder_action);
     processor_.EnqueueAction(download_action_.get());
 
-    g_timeout_add(0, &StartProcessorInRunLoopForP2P, this);
-    g_main_loop_run(loop_);
+    loop_.PostTask(FROM_HERE, base::Bind(
+        &P2PDownloadActionTest::StartProcessorInRunLoopForP2P,
+        base::Unretained(this)));
+    loop_.Run();
   }
 
+  // Mainloop used to make StartDownload() synchronous.
+  chromeos::FakeMessageLoop loop_{nullptr};
+
   // The DownloadAction instance under test.
   unique_ptr<DownloadAction> download_action_;
 
@@ -545,17 +525,11 @@
 
  private:
   // Callback used in StartDownload() method.
-  static gboolean StartProcessorInRunLoopForP2P(gpointer user_data) {
-    class P2PDownloadActionTest *test =
-        reinterpret_cast<P2PDownloadActionTest*>(user_data);
-    test->processor_.StartProcessing();
-    test->http_fetcher_->SetOffset(test->start_at_offset_);
-    return FALSE;
+  void StartProcessorInRunLoopForP2P() {
+    processor_.StartProcessing();
+    http_fetcher_->SetOffset(start_at_offset_);
   }
 
-  // Mainloop used to make StartDownload() synchronous.
-  GMainLoop *loop_;
-
   // The requested starting offset passed to SetupDownload().
   off_t start_at_offset_;
 
diff --git a/http_fetcher.cc b/http_fetcher.cc
index 3caadf0..b355fdb 100644
--- a/http_fetcher.cc
+++ b/http_fetcher.cc
@@ -4,16 +4,19 @@
 
 #include "update_engine/http_fetcher.h"
 
+#include <base/bind.h>
+
 using base::Closure;
+using chromeos::MessageLoop;
 using std::deque;
 using std::string;
 
 namespace chromeos_update_engine {
 
 HttpFetcher::~HttpFetcher() {
-  if (no_resolver_idle_id_) {
-    g_source_remove(no_resolver_idle_id_);
-    no_resolver_idle_id_ = 0;
+  if (no_resolver_idle_id_ != MessageLoop::kTaskIdNull) {
+    MessageLoop::current()->CancelTask(no_resolver_idle_id_);
+    no_resolver_idle_id_ = MessageLoop::kTaskIdNull;
   }
 }
 
@@ -21,7 +24,7 @@
                               HttpContentType type) {
   post_data_set_ = true;
   post_data_.clear();
-  const char *char_data = reinterpret_cast<const char*>(data);
+  const char* char_data = reinterpret_cast<const char*>(data);
   post_data_.insert(post_data_.end(), char_data, char_data + size);
   post_content_type_ = type;
 }
@@ -31,30 +34,34 @@
 }
 
 // Proxy methods to set the proxies, then to pop them off.
-bool HttpFetcher::ResolveProxiesForUrl(const string& url, Closure* callback) {
+bool HttpFetcher::ResolveProxiesForUrl(const string& url,
+                                       const Closure& callback) {
+  CHECK_EQ(static_cast<Closure*>(nullptr), callback_.get());
+  callback_.reset(new Closure(callback));
+
   if (!proxy_resolver_) {
     LOG(INFO) << "Not resolving proxies (no proxy resolver).";
-    no_resolver_idle_id_ = g_idle_add_full(
-        G_PRIORITY_DEFAULT,
-        utils::GlibRunClosure,
-        callback,
-        utils::GlibDestroyClosure);
+    no_resolver_idle_id_ = MessageLoop::current()->PostTask(
+        FROM_HERE,
+        base::Bind(&HttpFetcher::NoProxyResolverCallback,
+                   base::Unretained(this)));
     return true;
   }
-  CHECK_EQ(static_cast<Closure*>(nullptr), callback_);
-  callback_ = callback;
   return proxy_resolver_->GetProxiesForUrl(url,
                                            &HttpFetcher::StaticProxiesResolved,
                                            this);
 }
 
+void HttpFetcher::NoProxyResolverCallback() {
+  ProxiesResolved(deque<string>());
+}
+
 void HttpFetcher::ProxiesResolved(const deque<string>& proxies) {
-  no_resolver_idle_id_ = 0;
+  no_resolver_idle_id_ = MessageLoop::kTaskIdNull;
   if (!proxies.empty())
     SetProxies(proxies);
-  CHECK_NE(static_cast<Closure*>(nullptr), callback_);
-  Closure* callback = callback_;
-  callback_ = nullptr;
+  CHECK_NE(static_cast<Closure*>(nullptr), callback_.get());
+  Closure* callback = callback_.release();
   // This may indirectly call back into ResolveProxiesForUrl():
   callback->Run();
   delete callback;
diff --git a/http_fetcher.h b/http_fetcher.h
index 9eb477d..0d19a9a 100644
--- a/http_fetcher.h
+++ b/http_fetcher.h
@@ -9,11 +9,10 @@
 #include <string>
 #include <vector>
 
-#include <glib.h>
-
 #include <base/callback.h>
 #include <base/logging.h>
 #include <base/macros.h>
+#include <chromeos/message_loops/message_loop.h>
 
 #include "update_engine/http_common.h"
 #include "update_engine/proxy_resolver.h"
@@ -23,8 +22,7 @@
 // easily mock out this interface for testing.
 
 // Implementations of this class should use asynchronous i/o. They can access
-// the glib main loop to request callbacks when timers or file descriptors
-// change.
+// the MessageLoop to request callbacks when timers or file descriptors change.
 
 namespace chromeos_update_engine {
 
@@ -41,7 +39,6 @@
         delegate_(nullptr),
         proxies_(1, kNoProxy),
         proxy_resolver_(proxy_resolver),
-        no_resolver_idle_id_(0),
         callback_(nullptr),
         system_state_(system_state) {}
   virtual ~HttpFetcher();
@@ -60,7 +57,8 @@
 
   // Proxy methods to set the proxies, then to pop them off.
   // Returns true on success.
-  bool ResolveProxiesForUrl(const std::string& url, base::Closure* callback);
+  bool ResolveProxiesForUrl(const std::string& url,
+                            const base::Closure& callback);
 
   void SetProxies(const std::deque<std::string>& proxies) {
     proxies_ = proxies;
@@ -147,10 +145,11 @@
   ProxyResolver* const proxy_resolver_;
 
   // The ID of the idle callback, used when we have no proxy resolver.
-  guint no_resolver_idle_id_;
+  chromeos::MessageLoop::TaskId no_resolver_idle_id_{
+      chromeos::MessageLoop::kTaskIdNull};
 
   // Callback for when we are resolving proxies
-  base::Closure* callback_;
+  std::unique_ptr<base::Closure> callback_;
 
   // Global system context.
   SystemState* system_state_;
@@ -163,6 +162,10 @@
     reinterpret_cast<HttpFetcher*>(data)->ProxiesResolved(proxies);
   }
 
+  // Callback used to run the proxy resolver callback when there is no
+  // |proxy_resolver_|.
+  void NoProxyResolverCallback();
+
   DISALLOW_COPY_AND_ASSIGN(HttpFetcher);
 };
 
diff --git a/http_fetcher_unittest.cc b/http_fetcher_unittest.cc
index 119c3d9..fd520e9 100644
--- a/http_fetcher_unittest.cc
+++ b/http_fetcher_unittest.cc
@@ -12,11 +12,14 @@
 #include <utility>
 #include <vector>
 
+#include <base/location.h>
 #include <base/logging.h>
 #include <base/strings/string_util.h>
 #include <base/strings/stringprintf.h>
 #include <base/time/time.h>
-#include <chromeos/dbus/service_constants.h>
+#include <chromeos/message_loops/glib_message_loop.h>
+#include <chromeos/message_loops/message_loop.h>
+#include <chromeos/message_loops/message_loop_utils.h>
 #include <glib.h>
 #include <gtest/gtest.h>
 
@@ -28,13 +31,13 @@
 #include "update_engine/proxy_resolver.h"
 #include "update_engine/utils.h"
 
+using chromeos::MessageLoop;
 using std::make_pair;
 using std::pair;
 using std::string;
 using std::unique_ptr;
 using std::vector;
 
-
 namespace {
 
 const int kBigLength           = 100000;
@@ -213,7 +216,7 @@
 
   virtual void IgnoreServerAborting(HttpServer* server) const {}
 
-  virtual HttpServer *CreateServer() = 0;
+  virtual HttpServer* CreateServer() = 0;
 
  protected:
   DirectProxyResolver proxy_resolver_;
@@ -248,7 +251,7 @@
   bool IsMock() const override { return true; }
   bool IsMulti() const override { return false; }
 
-  HttpServer *CreateServer() override {
+  HttpServer* CreateServer() override {
     return new NullHttpServer;
   }
 };
@@ -295,7 +298,7 @@
     // Nothing to do.
   }
 
-  HttpServer *CreateServer() override {
+  HttpServer* CreateServer() override {
     return new PythonHttpServer;
   }
 };
@@ -342,10 +345,24 @@
 template <typename T>
 class HttpFetcherTest : public ::testing::Test {
  public:
+  // TODO(deymo): Replace this with a FakeMessageLoop. We can't do that yet
+  // because these tests use g_spawn_async_with_pipes() to launch the
+  // http_test_server.
+  chromeos::GlibMessageLoop loop_;
+
   T test_;
 
+ protected:
+  HttpFetcherTest() {
+    loop_.SetAsCurrent();
+  }
+
+  void TearDown() override {
+    EXPECT_EQ(0, chromeos::MessageLoopRunMaxIterations(&loop_, 1));
+  }
+
  private:
-  static void TypeConstraint(T *a) {
+  static void TypeConstraint(T* a) {
     AnyHttpFetcherTest *b = a;
     if (b == 0)  // Silence compiler warning of unused variable.
       *b = a;
@@ -377,7 +394,7 @@
       EXPECT_EQ(kHttpResponseNotFound, fetcher->http_response_code());
     else
       EXPECT_EQ(kHttpResponseOk, fetcher->http_response_code());
-    g_main_loop_quit(loop_);
+    MessageLoop::current()->BreakLoop();
 
     // Update counter
     times_transfer_complete_called_++;
@@ -388,8 +405,6 @@
     times_transfer_terminated_called_++;
   }
 
-  GMainLoop* loop_;
-
   // Are we expecting an error response? (default: no)
   bool is_expect_error_;
 
@@ -399,56 +414,40 @@
   int times_received_bytes_called_;
 };
 
-struct StartTransferArgs {
-  HttpFetcher *http_fetcher;
-  string url;
-};
 
-gboolean StartTransfer(gpointer data) {
-  StartTransferArgs *args = reinterpret_cast<StartTransferArgs*>(data);
-  args->http_fetcher->BeginTransfer(args->url);
-  return FALSE;
+void StartTransfer(HttpFetcher* http_fetcher, const string& url) {
+  http_fetcher->BeginTransfer(url);
 }
 }  // namespace
 
 TYPED_TEST(HttpFetcherTest, SimpleTest) {
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
-  {
-    HttpFetcherTestDelegate delegate;
-    delegate.loop_ = loop;
-    unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
-    fetcher->set_delegate(&delegate);
+  HttpFetcherTestDelegate delegate;
+  unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
+  fetcher->set_delegate(&delegate);
 
-    unique_ptr<HttpServer> server(this->test_.CreateServer());
-    ASSERT_TRUE(server->started_);
+  unique_ptr<HttpServer> server(this->test_.CreateServer());
+  ASSERT_TRUE(server->started_);
 
-    StartTransferArgs start_xfer_args = {
-      fetcher.get(), this->test_.SmallUrl(server->GetPort())};
-
-    g_timeout_add(0, StartTransfer, &start_xfer_args);
-    g_main_loop_run(loop);
-  }
-  g_main_loop_unref(loop);
+  this->loop_.PostTask(FROM_HERE, base::Bind(
+      StartTransfer,
+      fetcher.get(),
+      this->test_.SmallUrl(server->GetPort())));
+  this->loop_.Run();
 }
 
 TYPED_TEST(HttpFetcherTest, SimpleBigTest) {
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
-  {
-    HttpFetcherTestDelegate delegate;
-    delegate.loop_ = loop;
-    unique_ptr<HttpFetcher> fetcher(this->test_.NewLargeFetcher());
-    fetcher->set_delegate(&delegate);
+  HttpFetcherTestDelegate delegate;
+  unique_ptr<HttpFetcher> fetcher(this->test_.NewLargeFetcher());
+  fetcher->set_delegate(&delegate);
 
-    unique_ptr<HttpServer> server(this->test_.CreateServer());
-    ASSERT_TRUE(server->started_);
+  unique_ptr<HttpServer> server(this->test_.CreateServer());
+  ASSERT_TRUE(server->started_);
 
-    StartTransferArgs start_xfer_args = {
-      fetcher.get(), this->test_.BigUrl(server->GetPort())};
-
-    g_timeout_add(0, StartTransfer, &start_xfer_args);
-    g_main_loop_run(loop);
-  }
-  g_main_loop_unref(loop);
+  this->loop_.PostTask(FROM_HERE, base::Bind(
+      StartTransfer,
+      fetcher.get(),
+      this->test_.BigUrl(server->GetPort())));
+  this->loop_.Run();
 }
 
 // Issue #9648: when server returns an error HTTP response, the fetcher needs to
@@ -456,38 +455,31 @@
 TYPED_TEST(HttpFetcherTest, ErrorTest) {
   if (this->test_.IsMock() || this->test_.IsMulti())
     return;
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
-  {
-    HttpFetcherTestDelegate delegate;
-    delegate.loop_ = loop;
+  HttpFetcherTestDelegate delegate;
 
-    // Delegate should expect an error response.
-    delegate.is_expect_error_ = true;
+  // Delegate should expect an error response.
+  delegate.is_expect_error_ = true;
 
-    unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
-    fetcher->set_delegate(&delegate);
+  unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
+  fetcher->set_delegate(&delegate);
 
-    unique_ptr<HttpServer> server(this->test_.CreateServer());
-    ASSERT_TRUE(server->started_);
+  unique_ptr<HttpServer> server(this->test_.CreateServer());
+  ASSERT_TRUE(server->started_);
 
-    StartTransferArgs start_xfer_args = {
+  this->loop_.PostTask(FROM_HERE, base::Bind(
+      StartTransfer,
       fetcher.get(),
-      this->test_.ErrorUrl(server->GetPort())
-    };
+      this->test_.ErrorUrl(server->GetPort())));
+  this->loop_.Run();
 
-    g_timeout_add(0, StartTransfer, &start_xfer_args);
-    g_main_loop_run(loop);
+  // Make sure that no bytes were received.
+  CHECK_EQ(delegate.times_received_bytes_called_, 0);
+  CHECK_EQ(fetcher->GetBytesDownloaded(), static_cast<size_t>(0));
 
-    // Make sure that no bytes were received.
-    CHECK_EQ(delegate.times_received_bytes_called_, 0);
-    CHECK_EQ(fetcher->GetBytesDownloaded(), static_cast<size_t>(0));
-
-    // Make sure that transfer completion was signaled once, and no termination
-    // was signaled.
-    CHECK_EQ(delegate.times_transfer_complete_called_, 1);
-    CHECK_EQ(delegate.times_transfer_terminated_called_, 0);
-  }
-  g_main_loop_unref(loop);
+  // Make sure that transfer completion was signaled once, and no termination
+  // was signaled.
+  CHECK_EQ(delegate.times_transfer_complete_called_, 1);
+  CHECK_EQ(delegate.times_transfer_terminated_called_, 0);
 }
 
 namespace {
@@ -500,7 +492,7 @@
     fetcher->Pause();
   }
   void TransferComplete(HttpFetcher* fetcher, bool successful) override {
-    g_main_loop_quit(loop_);
+    MessageLoop::current()->BreakLoop();
   }
   void TransferTerminated(HttpFetcher* fetcher) override {
     ADD_FAILURE();
@@ -512,39 +504,41 @@
   }
   bool paused_;
   HttpFetcher* fetcher_;
-  GMainLoop* loop_;
 };
 
-gboolean UnpausingTimeoutCallback(gpointer data) {
-  PausingHttpFetcherTestDelegate *delegate =
-      reinterpret_cast<PausingHttpFetcherTestDelegate*>(data);
+void UnpausingTimeoutCallback(PausingHttpFetcherTestDelegate* delegate,
+                              MessageLoop::TaskId* my_id) {
   if (delegate->paused_)
     delegate->Unpause();
-  return TRUE;
+  // Update the task id with the new scheduled callback.
+  *my_id = MessageLoop::current()->PostDelayedTask(
+      FROM_HERE,
+      base::Bind(&UnpausingTimeoutCallback, delegate, my_id),
+      base::TimeDelta::FromMilliseconds(200));
 }
 }  // namespace
 
 TYPED_TEST(HttpFetcherTest, PauseTest) {
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
   {
     PausingHttpFetcherTestDelegate delegate;
     unique_ptr<HttpFetcher> fetcher(this->test_.NewLargeFetcher());
     delegate.paused_ = false;
-    delegate.loop_ = loop;
     delegate.fetcher_ = fetcher.get();
     fetcher->set_delegate(&delegate);
 
     unique_ptr<HttpServer> server(this->test_.CreateServer());
     ASSERT_TRUE(server->started_);
 
-    guint callback_id = g_timeout_add(kHttpResponseInternalServerError,
-                                      UnpausingTimeoutCallback, &delegate);
+    MessageLoop::TaskId callback_id;
+    callback_id = this->loop_.PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&UnpausingTimeoutCallback, &delegate, &callback_id),
+        base::TimeDelta::FromMilliseconds(200));
     fetcher->BeginTransfer(this->test_.BigUrl(server->GetPort()));
 
-    g_main_loop_run(loop);
-    g_source_remove(callback_id);
+    this->loop_.Run();
+    EXPECT_TRUE(this->loop_.CancelTask(callback_id));
   }
-  g_main_loop_unref(loop);
 }
 
 namespace {
@@ -554,7 +548,7 @@
                      const void* bytes, size_t length) override {}
   void TransferComplete(HttpFetcher* fetcher, bool successful) override {
     ADD_FAILURE();  // We should never get here
-    g_main_loop_quit(loop_);
+    MessageLoop::current()->BreakLoop();
   }
   void TransferTerminated(HttpFetcher* fetcher) override {
     EXPECT_EQ(fetcher, fetcher_.get());
@@ -571,54 +565,49 @@
     fetcher_->TerminateTransfer();
   }
   void EndLoop() {
-    g_main_loop_quit(loop_);
+    MessageLoop::current()->BreakLoop();
   }
   bool once_;
   bool callback_once_;
   unique_ptr<HttpFetcher> fetcher_;
-  GMainLoop* loop_;
 };
 
-gboolean AbortingTimeoutCallback(gpointer data) {
-  AbortingHttpFetcherTestDelegate *delegate =
-      reinterpret_cast<AbortingHttpFetcherTestDelegate*>(data);
+void AbortingTimeoutCallback(AbortingHttpFetcherTestDelegate* delegate,
+                             MessageLoop::TaskId* my_id) {
   if (delegate->once_) {
     delegate->TerminateTransfer();
-    return TRUE;
+    *my_id = MessageLoop::current()->PostTask(
+        FROM_HERE,
+        base::Bind(AbortingTimeoutCallback, delegate, my_id));
   } else {
     delegate->EndLoop();
-    return FALSE;
+    *my_id = MessageLoop::kTaskIdNull;
   }
 }
 }  // namespace
 
 TYPED_TEST(HttpFetcherTest, AbortTest) {
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
-  {
-    AbortingHttpFetcherTestDelegate delegate;
-    delegate.fetcher_.reset(this->test_.NewLargeFetcher());
-    delegate.once_ = true;
-    delegate.callback_once_ = true;
-    delegate.loop_ = loop;
-    delegate.fetcher_->set_delegate(&delegate);
+  AbortingHttpFetcherTestDelegate delegate;
+  delegate.fetcher_.reset(this->test_.NewLargeFetcher());
+  delegate.once_ = true;
+  delegate.callback_once_ = true;
+  delegate.fetcher_->set_delegate(&delegate);
 
-    unique_ptr<HttpServer> server(this->test_.CreateServer());
-    this->test_.IgnoreServerAborting(server.get());
-    ASSERT_TRUE(server->started_);
+  unique_ptr<HttpServer> server(this->test_.CreateServer());
+  this->test_.IgnoreServerAborting(server.get());
+  ASSERT_TRUE(server->started_);
 
-    GSource* timeout_source_;
-    timeout_source_ = g_timeout_source_new(0);  // ms
-    g_source_set_callback(timeout_source_, AbortingTimeoutCallback, &delegate,
-                          nullptr);
-    g_source_attach(timeout_source_, nullptr);
-    delegate.fetcher_->BeginTransfer(this->test_.BigUrl(server->GetPort()));
+  MessageLoop::TaskId task_id = MessageLoop::kTaskIdNull;
 
-    g_main_loop_run(loop);
-    CHECK(!delegate.once_);
-    CHECK(!delegate.callback_once_);
-    g_source_destroy(timeout_source_);
-  }
-  g_main_loop_unref(loop);
+  task_id = this->loop_.PostTask(
+      FROM_HERE,
+      base::Bind(AbortingTimeoutCallback, &delegate, &task_id));
+  delegate.fetcher_->BeginTransfer(this->test_.BigUrl(server->GetPort()));
+
+  this->loop_.Run();
+  CHECK(!delegate.once_);
+  CHECK(!delegate.callback_once_);
+  this->loop_.CancelTask(task_id);
 }
 
 namespace {
@@ -631,40 +620,36 @@
   void TransferComplete(HttpFetcher* fetcher, bool successful) override {
     EXPECT_TRUE(successful);
     EXPECT_EQ(kHttpResponsePartialContent, fetcher->http_response_code());
-    g_main_loop_quit(loop_);
+    MessageLoop::current()->BreakLoop();
   }
   void TransferTerminated(HttpFetcher* fetcher) override {
     ADD_FAILURE();
   }
   string data;
-  GMainLoop* loop_;
 };
 }  // namespace
 
 TYPED_TEST(HttpFetcherTest, FlakyTest) {
   if (this->test_.IsMock())
     return;
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
   {
     FlakyHttpFetcherTestDelegate delegate;
-    delegate.loop_ = loop;
     unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
     fetcher->set_delegate(&delegate);
 
     unique_ptr<HttpServer> server(this->test_.CreateServer());
     ASSERT_TRUE(server->started_);
 
-    StartTransferArgs start_xfer_args = {
-      fetcher.get(),
-      LocalServerUrlForPath(server->GetPort(),
-                            base::StringPrintf("/flaky/%d/%d/%d/%d", kBigLength,
-                                               kFlakyTruncateLength,
-                                               kFlakySleepEvery,
-                                               kFlakySleepSecs))
-    };
-
-    g_timeout_add(0, StartTransfer, &start_xfer_args);
-    g_main_loop_run(loop);
+    this->loop_.PostTask(FROM_HERE, base::Bind(
+        &StartTransfer,
+        fetcher.get(),
+        LocalServerUrlForPath(server->GetPort(),
+                              base::StringPrintf("/flaky/%d/%d/%d/%d",
+                                                 kBigLength,
+                                                 kFlakyTruncateLength,
+                                                 kFlakySleepEvery,
+                                                 kFlakySleepSecs))));
+    this->loop_.Run();
 
     // verify the data we get back
     ASSERT_EQ(kBigLength, delegate.data.size());
@@ -673,7 +658,6 @@
       ASSERT_EQ(delegate.data.substr(i, 10), "abcdefghij");
     }
   }
-  g_main_loop_unref(loop);
 }
 
 namespace {
@@ -683,8 +667,7 @@
 class FailureHttpFetcherTestDelegate : public HttpFetcherDelegate {
  public:
   explicit FailureHttpFetcherTestDelegate(PythonHttpServer* server)
-      : loop_(nullptr),
-        server_(server) {}
+      : server_(server) {}
 
   ~FailureHttpFetcherTestDelegate() override {
     if (server_) {
@@ -706,12 +689,11 @@
   void TransferComplete(HttpFetcher* fetcher, bool successful) override {
     EXPECT_FALSE(successful);
     EXPECT_EQ(0, fetcher->http_response_code());
-    g_main_loop_quit(loop_);
+    MessageLoop::current()->BreakLoop();
   }
   void TransferTerminated(HttpFetcher* fetcher) override {
     ADD_FAILURE();
   }
-  GMainLoop* loop_;
   PythonHttpServer* server_;
 };
 }  // namespace
@@ -722,24 +704,19 @@
   // available at all.
   if (this->test_.IsMock())
     return;
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
   {
     FailureHttpFetcherTestDelegate delegate(nullptr);
-    delegate.loop_ = loop;
     unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
     fetcher->set_delegate(&delegate);
 
-    StartTransferArgs start_xfer_args = {
-      fetcher.get(),
-      "http://host_doesnt_exist99999999",
-    };
-
-    g_timeout_add(0, StartTransfer, &start_xfer_args);
-    g_main_loop_run(loop);
+    this->loop_.PostTask(FROM_HERE,
+                         base::Bind(StartTransfer,
+                                    fetcher.get(),
+                                    "http://host_doesnt_exist99999999"));
+    this->loop_.Run();
 
     // Exiting and testing happens in the delegate
   }
-  g_main_loop_unref(loop);
 }
 
 TYPED_TEST(HttpFetcherTest, ServerDiesTest) {
@@ -748,7 +725,6 @@
   // retries and aborts correctly.
   if (this->test_.IsMock())
     return;
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
   {
     PythonHttpServer* server = new PythonHttpServer();
     int port = server->GetPort();
@@ -756,24 +732,22 @@
 
     // Handles destruction and claims ownership.
     FailureHttpFetcherTestDelegate delegate(server);
-    delegate.loop_ = loop;
     unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
     fetcher->set_delegate(&delegate);
 
-    StartTransferArgs start_xfer_args = {
-      fetcher.get(),
-      LocalServerUrlForPath(port,
-                            base::StringPrintf("/flaky/%d/%d/%d/%d", kBigLength,
-                                               kFlakyTruncateLength,
-                                               kFlakySleepEvery,
-                                               kFlakySleepSecs))
-    };
-    g_timeout_add(0, StartTransfer, &start_xfer_args);
-    g_main_loop_run(loop);
+    this->loop_.PostTask(FROM_HERE, base::Bind(
+        StartTransfer,
+        fetcher.get(),
+        LocalServerUrlForPath(port,
+                              base::StringPrintf("/flaky/%d/%d/%d/%d",
+                                                 kBigLength,
+                                                 kFlakyTruncateLength,
+                                                 kFlakySleepEvery,
+                                                 kFlakySleepSecs))));
+    this->loop_.Run();
 
     // Exiting and testing happens in the delegate
   }
-  g_main_loop_unref(loop);
 }
 
 namespace {
@@ -798,14 +772,13 @@
       EXPECT_GE(fetcher->http_response_code(), kHttpResponseMovedPermanently);
       EXPECT_LE(fetcher->http_response_code(), kHttpResponseTempRedirect);
     }
-    g_main_loop_quit(loop_);
+    MessageLoop::current()->BreakLoop();
   }
   void TransferTerminated(HttpFetcher* fetcher) override {
     ADD_FAILURE();
   }
   bool expected_successful_;
   string data;
-  GMainLoop* loop_;
 };
 
 // RedirectTest takes ownership of |http_fetcher|.
@@ -813,28 +786,23 @@
                   bool expected_successful,
                   const string& url,
                   HttpFetcher* http_fetcher) {
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
-  {
-    RedirectHttpFetcherTestDelegate delegate(expected_successful);
-    delegate.loop_ = loop;
-    unique_ptr<HttpFetcher> fetcher(http_fetcher);
-    fetcher->set_delegate(&delegate);
+  RedirectHttpFetcherTestDelegate delegate(expected_successful);
+  unique_ptr<HttpFetcher> fetcher(http_fetcher);
+  fetcher->set_delegate(&delegate);
 
-    StartTransferArgs start_xfer_args =
-        { fetcher.get(), LocalServerUrlForPath(server->GetPort(), url) };
-
-    g_timeout_add(0, StartTransfer, &start_xfer_args);
-    g_main_loop_run(loop);
-    if (expected_successful) {
-      // verify the data we get back
-      ASSERT_EQ(kMediumLength, delegate.data.size());
-      for (int i = 0; i < kMediumLength; i += 10) {
-        // Assert so that we don't flood the screen w/ EXPECT errors on failure.
-        ASSERT_EQ(delegate.data.substr(i, 10), "abcdefghij");
-      }
+  MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
+      StartTransfer,
+      fetcher.get(),
+      LocalServerUrlForPath(server->GetPort(), url)));
+  MessageLoop::current()->Run();
+  if (expected_successful) {
+    // verify the data we get back
+    ASSERT_EQ(kMediumLength, delegate.data.size());
+    for (int i = 0; i < kMediumLength; i += 10) {
+      // Assert so that we don't flood the screen w/ EXPECT errors on failure.
+      ASSERT_EQ(delegate.data.substr(i, 10), "abcdefghij");
     }
   }
-  g_main_loop_unref(loop);
 }
 }  // namespace
 
@@ -904,7 +872,7 @@
       EXPECT_EQ(expected_response_code_, fetcher->http_response_code());
     // Destroy the fetcher (because we're allowed to).
     fetcher_.reset(nullptr);
-    g_main_loop_quit(loop_);
+    MessageLoop::current()->BreakLoop();
   }
 
   void TransferTerminated(HttpFetcher* fetcher) override {
@@ -914,7 +882,6 @@
   unique_ptr<HttpFetcher> fetcher_;
   int expected_response_code_;
   string data;
-  GMainLoop* loop_;
 };
 
 void MultiTest(HttpFetcher* fetcher_in,
@@ -923,42 +890,37 @@
                const string& expected_prefix,
                off_t expected_size,
                HttpResponseCode expected_response_code) {
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
-  {
-    MultiHttpFetcherTestDelegate delegate(expected_response_code);
-    delegate.loop_ = loop;
-    delegate.fetcher_.reset(fetcher_in);
+  MultiHttpFetcherTestDelegate delegate(expected_response_code);
+  delegate.fetcher_.reset(fetcher_in);
 
-    MultiRangeHttpFetcher* multi_fetcher =
-        dynamic_cast<MultiRangeHttpFetcher*>(fetcher_in);
-    ASSERT_TRUE(multi_fetcher);
-    multi_fetcher->ClearRanges();
-    for (vector<pair<off_t, off_t>>::const_iterator it = ranges.begin(),
-             e = ranges.end(); it != e; ++it) {
-      string tmp_str = base::StringPrintf("%jd+", it->first);
-      if (it->second > 0) {
-        base::StringAppendF(&tmp_str, "%jd", it->second);
-        multi_fetcher->AddRange(it->first, it->second);
-      } else {
-        base::StringAppendF(&tmp_str, "?");
-        multi_fetcher->AddRange(it->first);
-      }
-      LOG(INFO) << "added range: " << tmp_str;
+  MultiRangeHttpFetcher* multi_fetcher =
+      dynamic_cast<MultiRangeHttpFetcher*>(fetcher_in);
+  ASSERT_TRUE(multi_fetcher);
+  multi_fetcher->ClearRanges();
+  for (vector<pair<off_t, off_t>>::const_iterator it = ranges.begin(),
+           e = ranges.end(); it != e; ++it) {
+    string tmp_str = base::StringPrintf("%jd+", it->first);
+    if (it->second > 0) {
+      base::StringAppendF(&tmp_str, "%jd", it->second);
+      multi_fetcher->AddRange(it->first, it->second);
+    } else {
+      base::StringAppendF(&tmp_str, "?");
+      multi_fetcher->AddRange(it->first);
     }
-    dynamic_cast<FakeSystemState*>(fetcher_in->GetSystemState())
-        ->fake_hardware()->SetIsOfficialBuild(false);
-    multi_fetcher->set_delegate(&delegate);
-
-    StartTransferArgs start_xfer_args = {multi_fetcher, url};
-
-    g_timeout_add(0, StartTransfer, &start_xfer_args);
-    g_main_loop_run(loop);
-
-    EXPECT_EQ(expected_size, delegate.data.size());
-    EXPECT_EQ(expected_prefix,
-              string(delegate.data.data(), expected_prefix.size()));
+    LOG(INFO) << "added range: " << tmp_str;
   }
-  g_main_loop_unref(loop);
+  dynamic_cast<FakeSystemState*>(fetcher_in->GetSystemState())
+      ->fake_hardware()->SetIsOfficialBuild(false);
+  multi_fetcher->set_delegate(&delegate);
+
+  MessageLoop::current()->PostTask(
+      FROM_HERE,
+      base::Bind(StartTransfer, multi_fetcher, url));
+  MessageLoop::current()->Run();
+
+  EXPECT_EQ(expected_size, delegate.data.size());
+  EXPECT_EQ(expected_prefix,
+            string(delegate.data.data(), expected_prefix.size()));
 }
 }  // namespace
 
@@ -1094,50 +1056,44 @@
   }
   void TransferComplete(HttpFetcher* fetcher, bool successful) override {
     EXPECT_FALSE(successful);
-    g_main_loop_quit(loop_);
+    MessageLoop::current()->BreakLoop();
   }
   void TransferTerminated(HttpFetcher* fetcher) override {
     ADD_FAILURE();
   }
-  GMainLoop* loop_;
 };
 
+void BlockedTransferTestHelper(AnyHttpFetcherTest* fetcher_test,
+                               bool is_official_build) {
+  if (fetcher_test->IsMock() || fetcher_test->IsMulti())
+    return;
+
+  unique_ptr<HttpServer> server(fetcher_test->CreateServer());
+  ASSERT_TRUE(server->started_);
+
+  BlockedTransferTestDelegate delegate;
+  unique_ptr<HttpFetcher> fetcher(fetcher_test->NewLargeFetcher());
+  LOG(INFO) << "is_official_build: " << is_official_build;
+  // NewLargeFetcher creates the HttpFetcher* with a FakeSystemState.
+  dynamic_cast<FakeSystemState*>(fetcher->GetSystemState())
+      ->fake_hardware()->SetIsOfficialBuild(is_official_build);
+  fetcher->set_delegate(&delegate);
+
+  MessageLoop::current()->PostTask(FROM_HERE, base::Bind(
+      StartTransfer,
+      fetcher.get(),
+      LocalServerUrlForPath(server->GetPort(),
+                            fetcher_test->SmallUrl(server->GetPort()))));
+  MessageLoop::current()->Run();
+}
 }  // namespace
 
 TYPED_TEST(HttpFetcherTest, BlockedTransferTest) {
-  if (this->test_.IsMock() || this->test_.IsMulti())
-    return;
+  BlockedTransferTestHelper(&this->test_, false);
+}
 
-  for (int i = 0; i < 2; i++) {
-    unique_ptr<HttpServer> server(this->test_.CreateServer());
-    ASSERT_TRUE(server->started_);
-
-    GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
-    {
-      BlockedTransferTestDelegate delegate;
-      delegate.loop_ = loop;
-
-      bool is_allowed = (i != 0);
-      unique_ptr<HttpFetcher> fetcher(this->test_.NewLargeFetcher());
-
-      bool is_official_build = (i == 1);
-      LOG(INFO) << "is_update_allowed_over_connection: " << is_allowed;
-      LOG(INFO) << "is_official_build: " << is_official_build;
-      // NewLargeFetcher creates the HttpFetcher* with a FakeSystemState.
-      dynamic_cast<FakeSystemState*>(fetcher->GetSystemState())
-          ->fake_hardware()->SetIsOfficialBuild(is_official_build);
-      fetcher->set_delegate(&delegate);
-
-      StartTransferArgs start_xfer_args =
-          {fetcher.get(),
-           LocalServerUrlForPath(server->GetPort(),
-                                 this->test_.SmallUrl(server->GetPort()))};
-
-      g_timeout_add(0, StartTransfer, &start_xfer_args);
-      g_main_loop_run(loop);
-    }
-    g_main_loop_unref(loop);
-  }
+TYPED_TEST(HttpFetcherTest, BlockedTransferOfficialBuildTest) {
+  BlockedTransferTestHelper(&this->test_, true);
 }
 
 }  // namespace chromeos_update_engine
diff --git a/libcurl_http_fetcher.cc b/libcurl_http_fetcher.cc
index ee8820b..da69281 100644
--- a/libcurl_http_fetcher.cc
+++ b/libcurl_http_fetcher.cc
@@ -8,6 +8,7 @@
 #include <string>
 
 #include <base/bind.h>
+#include <base/location.h>
 #include <base/logging.h>
 #include <base/strings/string_util.h>
 #include <base/strings/stringprintf.h>
@@ -15,6 +16,8 @@
 #include "update_engine/certificate_checker.h"
 #include "update_engine/hardware_interface.h"
 
+using base::TimeDelta;
+using chromeos::MessageLoop;
 using std::make_pair;
 using std::max;
 using std::string;
@@ -223,7 +226,7 @@
   url_ = url;
   auto closure = base::Bind(&LibcurlHttpFetcher::ProxiesResolved,
                             base::Unretained(this));
-  if (!ResolveProxiesForUrl(url_, new base::Closure(closure))) {
+  if (!ResolveProxiesForUrl(url_, closure)) {
     LOG(ERROR) << "Couldn't resolve proxies";
     if (delegate_)
       delegate_->TransferComplete(this, false);
@@ -292,9 +295,11 @@
         http_response_code_ == 0 &&
         no_network_retry_count_ < no_network_max_retries_) {
       no_network_retry_count_++;
-      g_timeout_add_seconds(kNoNetworkRetrySeconds,
-                            &LibcurlHttpFetcher::StaticRetryTimeoutCallback,
-                            this);
+      MessageLoop::current()->PostDelayedTask(
+          FROM_HERE,
+          base::Bind(&LibcurlHttpFetcher::RetryTimeoutCallback,
+                     base::Unretained(this)),
+          TimeDelta::FromSeconds(kNoNetworkRetrySeconds));
       LOG(INFO) << "No HTTP response, retry " << no_network_retry_count_;
       return;
     }
@@ -318,7 +323,10 @@
       if (HasProxy()) {
         // We have another proxy. Retry immediately.
         LOG(INFO) << "Retrying with next proxy setting";
-        g_idle_add(&LibcurlHttpFetcher::StaticRetryTimeoutCallback, this);
+        MessageLoop::current()->PostTask(
+            FROM_HERE,
+            base::Bind(&LibcurlHttpFetcher::RetryTimeoutCallback,
+                       base::Unretained(this)));
       } else {
         // Out of proxies. Give up.
         LOG(INFO) << "No further proxies, indicating transfer complete";
@@ -339,9 +347,11 @@
       } else {
         // Need to restart transfer
         LOG(INFO) << "Restarting transfer to download the remaining bytes";
-        g_timeout_add_seconds(retry_seconds_,
-                              &LibcurlHttpFetcher::StaticRetryTimeoutCallback,
-                              this);
+        MessageLoop::current()->PostDelayedTask(
+            FROM_HERE,
+            base::Bind(&LibcurlHttpFetcher::RetryTimeoutCallback,
+                       base::Unretained(this)),
+            TimeDelta::FromSeconds(retry_seconds_));
       }
     } else {
       LOG(INFO) << "Transfer completed (" << http_response_code_
@@ -473,12 +483,13 @@
   }
 
   // Set up a timeout callback for libcurl.
-  if (!timeout_source_) {
+  if (timeout_id_ == MessageLoop::kTaskIdNull) {
     LOG(INFO) << "Setting up timeout source: " << idle_seconds_ << " seconds.";
-    timeout_source_ = g_timeout_source_new_seconds(idle_seconds_);
-    g_source_set_callback(timeout_source_, StaticTimeoutCallback, this,
-                          nullptr);
-    g_source_attach(timeout_source_, nullptr);
+    timeout_id_ = MessageLoop::current()->PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&LibcurlHttpFetcher::TimeoutCallback,
+                   base::Unretained(this)),
+        TimeDelta::FromSeconds(idle_seconds_));
   }
 }
 
@@ -492,27 +503,27 @@
   return true;
 }
 
-gboolean LibcurlHttpFetcher::RetryTimeoutCallback() {
+void LibcurlHttpFetcher::RetryTimeoutCallback() {
   ResumeTransfer(url_);
   CurlPerformOnce();
-  return FALSE;  // Don't have glib auto call this callback again
 }
 
-gboolean LibcurlHttpFetcher::TimeoutCallback() {
-  // We always return true, even if we don't want glib to call us back.
-  // We will remove the event source separately if we don't want to
+void LibcurlHttpFetcher::TimeoutCallback() {
+  if (transfer_in_progress_)
+    CurlPerformOnce();
+
+  // We always re-schedule the callback, even if we don't want to be called
+  // anymore. We will remove the event source separately if we don't want to
   // be called back.
-  if (!transfer_in_progress_)
-    return TRUE;
-  CurlPerformOnce();
-  return TRUE;
+  timeout_id_ = MessageLoop::current()->PostDelayedTask(
+      FROM_HERE,
+      base::Bind(&LibcurlHttpFetcher::TimeoutCallback, base::Unretained(this)),
+      TimeDelta::FromSeconds(idle_seconds_));
 }
 
 void LibcurlHttpFetcher::CleanUp() {
-  if (timeout_source_) {
-    g_source_destroy(timeout_source_);
-    timeout_source_ = nullptr;
-  }
+  MessageLoop::current()->CancelTask(timeout_id_);
+  timeout_id_ = MessageLoop::kTaskIdNull;
 
   for (size_t t = 0; t < arraysize(io_channels_); ++t) {
     for (IOChannels::iterator it = io_channels_[t].begin();
diff --git a/libcurl_http_fetcher.h b/libcurl_http_fetcher.h
index b341f6e..d52b783 100644
--- a/libcurl_http_fetcher.h
+++ b/libcurl_http_fetcher.h
@@ -14,6 +14,7 @@
 
 #include <base/logging.h>
 #include <base/macros.h>
+#include <chromeos/message_loops/message_loop.h>
 
 #include "update_engine/certificate_checker.h"
 #include "update_engine/hardware_interface.h"
@@ -30,29 +31,7 @@
  public:
   LibcurlHttpFetcher(ProxyResolver* proxy_resolver,
                      SystemState* system_state)
-      : HttpFetcher(proxy_resolver, system_state),
-        curl_multi_handle_(nullptr),
-        curl_handle_(nullptr),
-        curl_http_headers_(nullptr),
-        timeout_source_(nullptr),
-        transfer_in_progress_(false),
-        transfer_size_(0),
-        bytes_downloaded_(0),
-        download_length_(0),
-        resume_offset_(0),
-        retry_count_(0),
-        max_retry_count_(kDownloadMaxRetryCount),
-        retry_seconds_(20),
-        no_network_retry_count_(0),
-        no_network_max_retries_(0),
-        idle_seconds_(1),
-        in_write_callback_(false),
-        sent_byte_(false),
-        terminate_requested_(false),
-        check_certificate_(CertificateChecker::kNone),
-        low_speed_limit_bps_(kDownloadLowSpeedLimitBps),
-        low_speed_time_seconds_(kDownloadLowSpeedTimeSeconds),
-        connect_timeout_seconds_(kDownloadConnectTimeoutSeconds) {
+      : HttpFetcher(proxy_resolver, system_state) {
     // Dev users want a longer timeout (180 seconds) because they may
     // be waiting on the dev server to build an image.
     if (!system_state->hardware()->IsOfficialBuild())
@@ -156,15 +135,8 @@
     return reinterpret_cast<LibcurlHttpFetcher*>(data)->FDCallback(source,
                                                                    condition);
   }
-  gboolean TimeoutCallback();
-  static gboolean StaticTimeoutCallback(gpointer data) {
-    return reinterpret_cast<LibcurlHttpFetcher*>(data)->TimeoutCallback();
-  }
-
-  gboolean RetryTimeoutCallback();
-  static gboolean StaticRetryTimeoutCallback(void* arg) {
-    return static_cast<LibcurlHttpFetcher*>(arg)->RetryTimeoutCallback();
-  }
+  void TimeoutCallback();
+  void RetryTimeoutCallback();
 
   // Calls into curl_multi_perform to let libcurl do its work. Returns after
   // curl_multi_perform is finished, which may actually be after more than
@@ -208,9 +180,9 @@
   bool GetProxyType(const std::string& proxy, curl_proxytype* out_type);
 
   // Handles for the libcurl library
-  CURLM *curl_multi_handle_;
-  CURL *curl_handle_;
-  struct curl_slist *curl_http_headers_;
+  CURLM* curl_multi_handle_{nullptr};
+  CURL* curl_handle_{nullptr};
+  struct curl_slist* curl_http_headers_{nullptr};
 
   // Lists of all read(0)/write(1) file descriptors that we're waiting on from
   // the glib main loop. libcurl may open/close descriptors and switch their
@@ -219,61 +191,62 @@
   typedef std::map<int, std::pair<GIOChannel*, guint>> IOChannels;
   IOChannels io_channels_[2];
 
-  // if non-null, a timer we're waiting on. glib main loop will call us back
-  // when it fires.
-  GSource* timeout_source_;
+  // The TaskId of the timer we're waiting on. kTaskIdNull if we are not waiting
+  // on it.
+  chromeos::MessageLoop::TaskId timeout_id_{chromeos::MessageLoop::kTaskIdNull};
 
-  bool transfer_in_progress_;
+  bool transfer_in_progress_ = false;
 
   // The transfer size. -1 if not known.
-  off_t transfer_size_;
+  off_t transfer_size_{0};
 
   // How many bytes have been downloaded and sent to the delegate.
-  off_t bytes_downloaded_;
+  off_t bytes_downloaded_{0};
 
   // The remaining maximum number of bytes to download. Zero represents an
   // unspecified length.
-  size_t download_length_;
+  size_t download_length_{0};
 
   // If we resumed an earlier transfer, data offset that we used for the
   // new connection.  0 otherwise.
   // In this class, resume refers to resuming a dropped HTTP connection,
   // not to resuming an interrupted download.
-  off_t resume_offset_;
+  off_t resume_offset_{0};
 
   // Number of resumes performed so far and the max allowed.
-  int retry_count_;
-  int max_retry_count_;
+  int retry_count_{0};
+  int max_retry_count_{kDownloadMaxRetryCount};
 
   // Seconds to wait before retrying a resume.
-  int retry_seconds_;
+  int retry_seconds_{20};
 
   // Number of resumes due to no network (e.g., HTTP response code 0).
-  int no_network_retry_count_;
-  int no_network_max_retries_;
+  int no_network_retry_count_{0};
+  int no_network_max_retries_{0};
 
   // Seconds to wait before asking libcurl to "perform".
-  int idle_seconds_;
+  int idle_seconds_{1};
 
   // If true, we are currently performing a write callback on the delegate.
-  bool in_write_callback_;
+  bool in_write_callback_{false};
 
   // If true, we have returned at least one byte in the write callback
   // to the delegate.
-  bool sent_byte_;
+  bool sent_byte_{false};
 
   // We can't clean everything up while we're in a write callback, so
   // if we get a terminate request, queue it until we can handle it.
-  bool terminate_requested_;
+  bool terminate_requested_{false};
 
   // Represents which server certificate to be checked against this
   // connection's certificate. If no certificate check needs to be performed,
   // this should be kNone.
-  CertificateChecker::ServerToCheck check_certificate_;
+  CertificateChecker::ServerToCheck check_certificate_{
+      CertificateChecker::kNone};
 
-  int low_speed_limit_bps_;
-  int low_speed_time_seconds_;
-  int connect_timeout_seconds_;
+  int low_speed_limit_bps_{kDownloadLowSpeedLimitBps};
+  int low_speed_time_seconds_{kDownloadLowSpeedTimeSeconds};
+  int connect_timeout_seconds_{kDownloadConnectTimeoutSeconds};
   int num_max_retries_;
 
   DISALLOW_COPY_AND_ASSIGN(LibcurlHttpFetcher);
diff --git a/mock_http_fetcher.cc b/mock_http_fetcher.cc
index e178a50..a8dd1ae 100644
--- a/mock_http_fetcher.cc
+++ b/mock_http_fetcher.cc
@@ -11,12 +11,14 @@
 
 // This is a mock implementation of HttpFetcher which is useful for testing.
 
+using chromeos::MessageLoop;
 using std::min;
 
 namespace chromeos_update_engine {
 
 MockHttpFetcher::~MockHttpFetcher() {
-  CHECK(!timeout_source_) << "Call TerminateTransfer() before dtor.";
+  CHECK(timeout_id_ == MessageLoop::kTaskIdNull) <<
+      "Call TerminateTransfer() before dtor.";
 }
 
 void MockHttpFetcher::BeginTransfer(const std::string& url) {
@@ -30,13 +32,13 @@
     SendData(true);
 }
 
-// Returns false on one condition: If timeout_source_ was already set
-// and it needs to be deleted by the caller. If timeout_source_ is null
+// Returns false on one condition: If timeout_id_ was already set
+// and it needs to be deleted by the caller. If timeout_id_ is null
 // when this function is called, this function will always return true.
 bool MockHttpFetcher::SendData(bool skip_delivery) {
   if (fail_transfer_) {
     SignalTransferComplete();
-    return timeout_source_;
+    return timeout_id_ != MessageLoop::kTaskIdNull;
   }
 
   CHECK_LT(sent_size_, data_.size());
@@ -48,7 +50,7 @@
     // We may get terminated in the callback.
     if (sent_size_ == data_.size()) {
       LOG(INFO) << "Terminated in the ReceivedBytes callback.";
-      return timeout_source_;
+      return timeout_id_ != MessageLoop::kTaskIdNull;
     }
     sent_size_ += chunk_size;
     CHECK_LE(sent_size_, data_.size());
@@ -59,32 +61,35 @@
   }
 
   if (paused_) {
-    // If we're paused, we should return true if timeout_source_ is set,
+    // If we're paused, we should return true if timeout_id_ is set,
     // since we need the caller to delete it.
-    return timeout_source_;
+    return timeout_id_ != MessageLoop::kTaskIdNull;
   }
 
-  if (timeout_source_) {
+  if (timeout_id_ != MessageLoop::kTaskIdNull) {
     // we still need a timeout if there's more data to send
     return sent_size_ < data_.size();
   } else if (sent_size_ < data_.size()) {
     // we don't have a timeout source and we need one
-    timeout_source_ = g_timeout_source_new(10);
-    CHECK(timeout_source_);
-    g_source_set_callback(timeout_source_, StaticTimeoutCallback, this,
-                          nullptr);
-    timout_tag_ = g_source_attach(timeout_source_, nullptr);
+    timeout_id_ = MessageLoop::current()->PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&MockHttpFetcher::TimeoutCallback, base::Unretained(this)),
+        base::TimeDelta::FromMilliseconds(10));
   }
   return true;
 }
 
-bool MockHttpFetcher::TimeoutCallback() {
+void MockHttpFetcher::TimeoutCallback() {
   CHECK(!paused_);
-  bool ret = SendData(false);
-  if (false == ret) {
-    timeout_source_ = nullptr;
+  if (SendData(false)) {
+    // We need to re-schedule the timeout.
+    timeout_id_ = MessageLoop::current()->PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&MockHttpFetcher::TimeoutCallback, base::Unretained(this)),
+        base::TimeDelta::FromMilliseconds(10));
+  } else {
+    timeout_id_ = MessageLoop::kTaskIdNull;
   }
-  return ret;
 }
 
 // If the transfer is in progress, aborts the transfer early.
@@ -92,23 +97,17 @@
 void MockHttpFetcher::TerminateTransfer() {
   LOG(INFO) << "Terminating transfer.";
   sent_size_ = data_.size();
-  // kill any timeout
-  if (timeout_source_) {
-    g_source_remove(timout_tag_);
-    g_source_destroy(timeout_source_);
-    timeout_source_ = nullptr;
-  }
+  // Kill any timeout, it is ok to call with kTaskIdNull.
+  MessageLoop::current()->CancelTask(timeout_id_);
+  timeout_id_ = MessageLoop::kTaskIdNull;
   delegate_->TransferTerminated(this);
 }
 
 void MockHttpFetcher::Pause() {
   CHECK(!paused_);
   paused_ = true;
-  if (timeout_source_) {
-    g_source_remove(timout_tag_);
-    g_source_destroy(timeout_source_);
-    timeout_source_ = nullptr;
-  }
+  MessageLoop::current()->CancelTask(timeout_id_);
+  timeout_id_ = MessageLoop::kTaskIdNull;
 }
 
 void MockHttpFetcher::Unpause() {
diff --git a/mock_http_fetcher.h b/mock_http_fetcher.h
index c97eeb3..89f5d21 100644
--- a/mock_http_fetcher.h
+++ b/mock_http_fetcher.h
@@ -9,6 +9,7 @@
 #include <vector>
 
 #include <base/logging.h>
+#include <chromeos/message_loops/message_loop.h>
 #include <glib.h>
 
 #include "update_engine/fake_system_state.h"
@@ -36,8 +37,7 @@
                   ProxyResolver* proxy_resolver)
       : HttpFetcher(proxy_resolver, &fake_system_state_),
         sent_size_(0),
-        timeout_source_(nullptr),
-        timout_tag_(0),
+        timeout_id_(chromeos::MessageLoop::kTaskIdNull),
         paused_(false),
         fail_transfer_(false),
         never_use_(false),
@@ -97,7 +97,7 @@
   }
 
  private:
-  // Sends data to the delegate and sets up a glib timeout callback if needed.
+  // Sends data to the delegate and sets up a timeout callback if needed.
   // There must be a delegate and there must be data to send. If there is
   // already a timeout callback, and it should be deleted by the caller,
   // this will return false; otherwise true is returned.
@@ -105,11 +105,8 @@
   // still be set if needed.
   bool SendData(bool skip_delivery);
 
-  // Callback for when our glib main loop callback is called
-  bool TimeoutCallback();
-  static gboolean StaticTimeoutCallback(gpointer data) {
-    return reinterpret_cast<MockHttpFetcher*>(data)->TimeoutCallback();
-  }
+  // Callback for when our message loop timeout expires.
+  void TimeoutCallback();
 
   // Sets the HTTP response code and signals to the delegate that the transfer
   // is complete.
@@ -121,12 +118,9 @@
   // The number of bytes we've sent so far
   size_t sent_size_;
 
-  // The glib main loop timeout source. After each chunk of data sent, we
+  // The TaskId of the timeout callback. After each chunk of data sent, we
   // time out for 0s just to make sure that run loop services other clients.
-  GSource* timeout_source_;
-
-  // ID of the timeout source, valid only if timeout_source_ != null
-  guint timout_tag_;
+  chromeos::MessageLoop::TaskId timeout_id_;
 
   // True iff the fetcher is paused.
   bool paused_;
diff --git a/omaha_request_action_unittest.cc b/omaha_request_action_unittest.cc
index 04c230d..c6355b1 100644
--- a/omaha_request_action_unittest.cc
+++ b/omaha_request_action_unittest.cc
@@ -10,11 +10,16 @@
 #include <string>
 #include <vector>
 
+#include <base/bind.h>
 #include <base/strings/string_number_conversions.h>
 #include <base/strings/string_util.h>
 #include <base/strings/stringprintf.h>
 #include <base/time/time.h>
+#include <chromeos/bind_lambda.h>
 #include <chromeos/dbus/service_constants.h>
+#include <chromeos/message_loops/fake_message_loop.h>
+#include <chromeos/message_loops/message_loop.h>
+#include <chromeos/message_loops/message_loop_utils.h>
 #include <gtest/gtest.h>
 
 #include "update_engine/action_pipe.h"
@@ -216,14 +221,12 @@
 class OmahaRequestActionTestProcessorDelegate : public ActionProcessorDelegate {
  public:
   OmahaRequestActionTestProcessorDelegate()
-      : loop_(nullptr),
-        expected_code_(ErrorCode::kSuccess) {}
+      : expected_code_(ErrorCode::kSuccess) {}
   ~OmahaRequestActionTestProcessorDelegate() override {
   }
   void ProcessingDone(const ActionProcessor* processor,
                       ErrorCode code) override {
-    ASSERT_TRUE(loop_);
-    g_main_loop_quit(loop_);
+    chromeos::MessageLoop::current()->BreakLoop();
   }
 
   void ActionCompleted(ActionProcessor* processor,
@@ -235,15 +238,8 @@
     else
       EXPECT_EQ(ErrorCode::kSuccess, code);
   }
-  GMainLoop *loop_;
   ErrorCode expected_code_;
 };
-
-gboolean StartProcessorInRunLoop(gpointer data) {
-  ActionProcessor *processor = reinterpret_cast<ActionProcessor*>(data);
-  processor->StartProcessing();
-  return FALSE;
-}
 }  // namespace
 
 class OutputObjectCollectorAction;
@@ -291,7 +287,8 @@
     metrics::DownloadErrorCode expected_download_error_code,
     OmahaResponse* out_response,
     chromeos::Blob* out_post_data) {
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
+  chromeos::FakeMessageLoop loop(nullptr);
+  loop.SetAsCurrent();
   MockHttpFetcher* fetcher = new MockHttpFetcher(http_response.data(),
                                                  http_response.size(),
                                                  nullptr);
@@ -305,7 +302,6 @@
                             fetcher,
                             ping_only);
   OmahaRequestActionTestProcessorDelegate delegate;
-  delegate.loop_ = loop;
   delegate.expected_code_ = expected_code;
 
   ActionProcessor processor;
@@ -334,9 +330,11 @@
       .Times(expected_download_error_code == metrics::DownloadErrorCode::kUnset
              ? 0 : 1);
 
-  g_timeout_add(0, &StartProcessorInRunLoop, &processor);
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+  loop.PostTask(base::Bind([&processor] { processor.StartProcessing(); }));
+  LOG(INFO) << "loop.PendingTasks() = " << loop.PendingTasks();
+  loop.Run();
+  LOG(INFO) << "loop.PendingTasks() = " << loop.PendingTasks();
+  EXPECT_FALSE(loop.PendingTasks());
   if (collector_action.has_input_object_ && out_response)
     *out_response = collector_action.omaha_response_;
   if (out_post_data)
@@ -351,7 +349,8 @@
                OmahaEvent* event,
                const string& http_response,
                chromeos::Blob* out_post_data) {
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
+  chromeos::FakeMessageLoop loop(nullptr);
+  loop.SetAsCurrent();
   MockHttpFetcher* fetcher = new MockHttpFetcher(http_response.data(),
                                                  http_response.size(),
                                                  nullptr);
@@ -359,14 +358,17 @@
   fake_system_state.set_request_params(&params);
   OmahaRequestAction action(&fake_system_state, event, fetcher, false);
   OmahaRequestActionTestProcessorDelegate delegate;
-  delegate.loop_ = loop;
   ActionProcessor processor;
   processor.set_delegate(&delegate);
   processor.EnqueueAction(&action);
 
-  g_timeout_add(0, &StartProcessorInRunLoop, &processor);
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+  loop.PostTask(base::Bind([&processor] { processor.StartProcessing(); }));
+  loop.Run();
+
+  // This test should schedule a callback to notify the crash reporter if
+  // the passed event is an error.
+  EXPECT_EQ(event->result == OmahaEvent::kResultError, loop.PendingTasks());
+
   if (out_post_data)
     *out_post_data = fetcher->post_data();
 }
@@ -784,7 +786,8 @@
 TEST_F(OmahaRequestActionTest, NoOutputPipeTest) {
   const string http_response(fake_update_response_.GetNoUpdateResponse());
 
-  GMainLoop *loop = g_main_loop_new(g_main_context_default(), FALSE);
+  chromeos::FakeMessageLoop loop(nullptr);
+  loop.SetAsCurrent();
 
   OmahaRequestParams params = request_params_;
   fake_system_state_.set_request_params(&params);
@@ -794,14 +797,13 @@
                                                 nullptr),
                             false);
   OmahaRequestActionTestProcessorDelegate delegate;
-  delegate.loop_ = loop;
   ActionProcessor processor;
   processor.set_delegate(&delegate);
   processor.EnqueueAction(&action);
 
-  g_timeout_add(0, &StartProcessorInRunLoop, &processor);
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+  loop.PostTask(base::Bind([&processor] { processor.StartProcessing(); }));
+  loop.Run();
+  EXPECT_FALSE(loop.PendingTasks());
   EXPECT_FALSE(processor.IsRunning());
 }
 
@@ -942,39 +944,35 @@
 class TerminateEarlyTestProcessorDelegate : public ActionProcessorDelegate {
  public:
   void ProcessingStopped(const ActionProcessor* processor) {
-    ASSERT_TRUE(loop_);
-    g_main_loop_quit(loop_);
+    chromeos::MessageLoop::current()->BreakLoop();
   }
-  GMainLoop *loop_;
 };
 
-gboolean TerminateTransferTestStarter(gpointer data) {
-  ActionProcessor *processor = reinterpret_cast<ActionProcessor*>(data);
+void TerminateTransferTestStarter(ActionProcessor* processor) {
   processor->StartProcessing();
   CHECK(processor->IsRunning());
   processor->StopProcessing();
-  return FALSE;
 }
 }  // namespace
 
 TEST_F(OmahaRequestActionTest, TerminateTransferTest) {
-  string http_response("doesn't matter");
-  GMainLoop *loop = g_main_loop_new(g_main_context_default(), FALSE);
+  chromeos::FakeMessageLoop loop(nullptr);
+  loop.SetAsCurrent();
 
+  string http_response("doesn't matter");
   OmahaRequestAction action(&fake_system_state_, nullptr,
                             new MockHttpFetcher(http_response.data(),
                                                 http_response.size(),
                                                 nullptr),
                             false);
   TerminateEarlyTestProcessorDelegate delegate;
-  delegate.loop_ = loop;
   ActionProcessor processor;
   processor.set_delegate(&delegate);
   processor.EnqueueAction(&action);
 
-  g_timeout_add(0, &TerminateTransferTestStarter, &processor);
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+  loop.PostTask(base::Bind(&TerminateTransferTestStarter, &processor));
+  loop.Run();
+  EXPECT_FALSE(loop.PendingTasks());
 }
 
 TEST_F(OmahaRequestActionTest, XmlEncodeTest) {
diff --git a/p2p_manager_unittest.cc b/p2p_manager_unittest.cc
index 990fb3e..7c4cfd9 100644
--- a/p2p_manager_unittest.cc
+++ b/p2p_manager_unittest.cc
@@ -16,7 +16,7 @@
 #include <base/bind.h>
 #include <base/callback.h>
 #include <base/strings/stringprintf.h>
-#include <chromeos/message_loops/fake_message_loop.h>
+#include <chromeos/message_loops/glib_message_loop.h>
 #include <chromeos/message_loops/message_loop.h>
 #include <chromeos/message_loops/message_loop_utils.h>
 #include <gmock/gmock.h>
@@ -70,13 +70,17 @@
   }
 
   void TearDown() override {
-    EXPECT_FALSE(loop_.PendingTasks());
+    EXPECT_EQ(0, chromeos::MessageLoopRunMaxIterations(&loop_, 1));
   }
 
+  // TODO(deymo): Replace this with a FakeMessageLoop. P2PManager uses glib to
+  // interact with the p2p-client tool, so we need to run a GlibMessageLoop
+  // here.
+  chromeos::GlibMessageLoop loop_;
+
   // The P2PManager::Configuration instance used for testing.
   FakeP2PManagerConfiguration *test_conf_;
 
-  chromeos::FakeMessageLoop loop_{nullptr};
   FakeClock fake_clock_;
   chromeos_update_manager::MockPolicy *mock_policy_ = nullptr;
   chromeos_update_manager::FakeUpdateManager fake_um_;
@@ -478,59 +482,53 @@
 }
 
 static void ExpectUrl(const string& expected_url,
-                      GMainLoop* loop,
                       const string& url) {
   EXPECT_EQ(url, expected_url);
-  g_main_loop_quit(loop);
+  MessageLoop::current()->BreakLoop();
 }
 
 // Like StartP2P, we're mocking the different results that p2p-client
 // can return. It's not pretty but it works.
 TEST_F(P2PManagerTest, LookupURL) {
-  GMainLoop *loop = g_main_loop_new(nullptr, FALSE);
-
   // Emulate p2p-client returning valid URL with "fooX", 42 and "cros_au"
   // being propagated in the right places.
   test_conf_->SetP2PClientCommandLine(
       "echo 'http://1.2.3.4/{file_id}_{minsize}'");
   manager_->LookupUrlForFile("fooX", 42, TimeDelta(),
                              base::Bind(ExpectUrl,
-                                        "http://1.2.3.4/fooX.cros_au_42",
-                                        loop));
-  g_main_loop_run(loop);
+                                        "http://1.2.3.4/fooX.cros_au_42"));
+  loop_.Run();
 
   // Emulate p2p-client returning invalid URL.
   test_conf_->SetP2PClientCommandLine("echo 'not_a_valid_url'");
   manager_->LookupUrlForFile("foobar", 42, TimeDelta(),
-                             base::Bind(ExpectUrl, "", loop));
-  g_main_loop_run(loop);
+                             base::Bind(ExpectUrl, ""));
+  loop_.Run();
 
   // Emulate p2p-client conveying failure.
   test_conf_->SetP2PClientCommandLine("false");
   manager_->LookupUrlForFile("foobar", 42, TimeDelta(),
-                             base::Bind(ExpectUrl, "", loop));
-  g_main_loop_run(loop);
+                             base::Bind(ExpectUrl, ""));
+  loop_.Run();
 
   // Emulate p2p-client not existing.
   test_conf_->SetP2PClientCommandLine("/path/to/non/existent/helper/program");
   manager_->LookupUrlForFile("foobar", 42,
                              TimeDelta(),
-                             base::Bind(ExpectUrl, "", loop));
-  g_main_loop_run(loop);
+                             base::Bind(ExpectUrl, ""));
+  loop_.Run();
 
   // Emulate p2p-client crashing.
   test_conf_->SetP2PClientCommandLine("sh -c 'kill -SEGV $$'");
   manager_->LookupUrlForFile("foobar", 42, TimeDelta(),
-                             base::Bind(ExpectUrl, "", loop));
-  g_main_loop_run(loop);
+                             base::Bind(ExpectUrl, ""));
+  loop_.Run();
 
   // Emulate p2p-client exceeding its timeout.
   test_conf_->SetP2PClientCommandLine("sh -c 'echo http://1.2.3.4/; sleep 2'");
   manager_->LookupUrlForFile("foobar", 42, TimeDelta::FromMilliseconds(500),
-                             base::Bind(ExpectUrl, "", loop));
-  g_main_loop_run(loop);
-
-  g_main_loop_unref(loop);
+                             base::Bind(ExpectUrl, ""));
+  loop_.Run();
 }
 
 }  // namespace chromeos_update_engine
diff --git a/proxy_resolver.cc b/proxy_resolver.cc
index c6ba03b..8b8a9c8 100644
--- a/proxy_resolver.cc
+++ b/proxy_resolver.cc
@@ -5,7 +5,9 @@
 #include "update_engine/proxy_resolver.h"
 
 #include <base/bind.h>
+#include <base/location.h>
 
+using chromeos::MessageLoop;
 using std::deque;
 using std::string;
 
@@ -14,31 +16,33 @@
 const char kNoProxy[] = "direct://";
 
 DirectProxyResolver::~DirectProxyResolver() {
-  if (idle_callback_id_) {
-    g_source_remove(idle_callback_id_);
-    idle_callback_id_ = 0;
+  if (idle_callback_id_ != MessageLoop::kTaskIdNull) {
+    // The DirectProxyResolver is instantiated as part of the UpdateAttempter
+    // which is also instantiated by default by the FakeSystemState, even when
+    // it is not used. We check the manage_shares_id_ before calling the
+    // MessageLoop::current() since the unit test using a FakeSystemState may
+    // have not define a MessageLoop for the current thread.
+    MessageLoop::current()->CancelTask(idle_callback_id_);
+    idle_callback_id_ = MessageLoop::kTaskIdNull;
   }
 }
 
 bool DirectProxyResolver::GetProxiesForUrl(const string& url,
                                            ProxiesResolvedFn callback,
                                            void* data) {
-  base::Closure* closure = new base::Closure(base::Bind(
-      &DirectProxyResolver::ReturnCallback,
-      base::Unretained(this),
-      callback,
-      data));
-  idle_callback_id_ = g_idle_add_full(
-      G_PRIORITY_DEFAULT,
-      utils::GlibRunClosure,
-      closure,
-      utils::GlibDestroyClosure);
+  idle_callback_id_ = MessageLoop::current()->PostTask(
+      FROM_HERE,
+      base::Bind(
+            &DirectProxyResolver::ReturnCallback,
+            base::Unretained(this),
+            callback,
+            data));
   return true;
 }
 
 void DirectProxyResolver::ReturnCallback(ProxiesResolvedFn callback,
                                          void* data) {
-  idle_callback_id_ = 0;
+  idle_callback_id_ = MessageLoop::kTaskIdNull;
 
   // Initialize proxy pool with as many proxies as indicated (all identical).
   deque<string> proxies(num_proxies_, kNoProxy);
diff --git a/proxy_resolver.h b/proxy_resolver.h
index d59b888..9d6b73a 100644
--- a/proxy_resolver.h
+++ b/proxy_resolver.h
@@ -5,11 +5,11 @@
 #ifndef UPDATE_ENGINE_PROXY_RESOLVER_H_
 #define UPDATE_ENGINE_PROXY_RESOLVER_H_
 
-
 #include <deque>
 #include <string>
 
 #include <base/logging.h>
+#include <chromeos/message_loops/message_loop.h>
 
 #include "update_engine/utils.h"
 
@@ -45,7 +45,7 @@
 // Always says to not use a proxy
 class DirectProxyResolver : public ProxyResolver {
  public:
-  DirectProxyResolver() : idle_callback_id_(0), num_proxies_(1) {}
+  DirectProxyResolver() = default;
   ~DirectProxyResolver() override;
   bool GetProxiesForUrl(const std::string& url,
                         ProxiesResolvedFn callback,
@@ -58,12 +58,13 @@
   }
 
  private:
-  // The ID of the idle main loop callback
-  guint idle_callback_id_;
+  // The ID of the main loop callback.
+  chromeos::MessageLoop::TaskId idle_callback_id_{
+      chromeos::MessageLoop::kTaskIdNull};
 
   // Number of direct proxies to return on resolved list; currently used for
   // testing.
-  size_t num_proxies_;
+  size_t num_proxies_{1};
 
   // The MainLoop callback, from here we return to the client.
   void ReturnCallback(ProxiesResolvedFn callback, void* data);
diff --git a/subprocess.h b/subprocess.h
index 1b11494..02746fe 100644
--- a/subprocess.h
+++ b/subprocess.h
@@ -14,6 +14,7 @@
 
 #include <base/logging.h>
 #include <base/macros.h>
+#include <gtest/gtest_prod.h>  // for FRIEND_TEST
 
 // The Subprocess class is a singleton. It's used to spawn off a subprocess
 // and get notified when the subprocess exits. The result of Exec() can
@@ -61,6 +62,8 @@
   bool SubprocessInFlight();
 
  private:
+  FRIEND_TEST(SubprocessTest, CancelTest);
+
   struct SubprocessRecord {
     SubprocessRecord()
         : tag(0),
diff --git a/subprocess_unittest.cc b/subprocess_unittest.cc
index 4e7ebf6..37dea05 100644
--- a/subprocess_unittest.cc
+++ b/subprocess_unittest.cc
@@ -16,9 +16,15 @@
 #include <string>
 #include <vector>
 
+#include <base/bind.h>
+#include <base/location.h>
 #include <base/strings/string_util.h>
 #include <base/strings/stringprintf.h>
 #include <base/time/time.h>
+#include <chromeos/bind_lambda.h>
+#include <chromeos/message_loops/glib_message_loop.h>
+#include <chromeos/message_loops/message_loop.h>
+#include <chromeos/message_loops/message_loop_utils.h>
 #include <glib.h>
 #include <gtest/gtest.h>
 
@@ -26,6 +32,7 @@
 #include "update_engine/utils.h"
 
 using base::TimeDelta;
+using chromeos::MessageLoop;
 using std::string;
 using std::vector;
 
@@ -33,62 +40,65 @@
 
 class SubprocessTest : public ::testing::Test {
  protected:
-  bool callback_done;
+  void SetUp() override {
+    loop_.SetAsCurrent();
+  }
+
+  void TearDown() override {
+    EXPECT_EQ(0, chromeos::MessageLoopRunMaxIterations(&loop_, 1));
+  }
+
+  // TODO(deymo): Replace this with a FakeMessageLoop. Subprocess uses glib to
+  // asynchronously spawn a process, so we need to run a GlibMessageLoop here.
+  chromeos::GlibMessageLoop loop_;
 };
 
 namespace {
 int local_server_port = 0;
 
-void Callback(int return_code, const string& output, void *p) {
+void Callback(int return_code, const string& output, void* /* unused */) {
   EXPECT_EQ(1, return_code);
-  GMainLoop* loop = reinterpret_cast<GMainLoop*>(p);
-  g_main_loop_quit(loop);
+  MessageLoop::current()->BreakLoop();
 }
 
-void CallbackEcho(int return_code, const string& output, void *p) {
+void CallbackEcho(int return_code, const string& output, void* /* unused */) {
   EXPECT_EQ(0, return_code);
   EXPECT_NE(string::npos, output.find("this is stdout"));
   EXPECT_NE(string::npos, output.find("this is stderr"));
-  GMainLoop* loop = reinterpret_cast<GMainLoop*>(p);
-  g_main_loop_quit(loop);
+  MessageLoop::current()->BreakLoop();
 }
 
-gboolean LaunchFalseInMainLoop(gpointer data) {
-  vector<string> cmd;
-  cmd.push_back("/bin/false");
-  Subprocess::Get().Exec(cmd, Callback, data);
-  return FALSE;
-}
-
-gboolean LaunchEchoInMainLoop(gpointer data) {
-  vector<string> cmd;
-  cmd.push_back("/bin/sh");
-  cmd.push_back("-c");
-  cmd.push_back("echo this is stdout; echo this is stderr > /dev/stderr");
-  Subprocess::Get().Exec(cmd, CallbackEcho, data);
-  return FALSE;
-}
 }  // namespace
 
-TEST(SubprocessTest, SimpleTest) {
-  GMainLoop *loop = g_main_loop_new(g_main_context_default(), FALSE);
-  g_timeout_add(0, &LaunchFalseInMainLoop, loop);
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+TEST_F(SubprocessTest, SimpleTest) {
+  loop_.PostTask(
+      FROM_HERE,
+      base::Bind([] {
+        Subprocess::Get().Exec(vector<string>{"/bin/false"}, Callback, nullptr);
+      }));
+  loop_.Run();
 }
 
-TEST(SubprocessTest, EchoTest) {
-  GMainLoop *loop = g_main_loop_new(g_main_context_default(), FALSE);
-  g_timeout_add(0, &LaunchEchoInMainLoop, loop);
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+TEST_F(SubprocessTest, EchoTest) {
+  loop_.PostTask(
+      FROM_HERE,
+      base::Bind([] {
+        Subprocess::Get().Exec(
+            vector<string>{
+                "/bin/sh",
+                "-c",
+                "echo this is stdout; echo this is stderr > /dev/stderr"},
+            CallbackEcho,
+            nullptr);
+      }));
+  loop_.Run();
 }
 
-TEST(SubprocessTest, SynchronousEchoTest) {
-  vector<string> cmd;
-  cmd.push_back("/bin/sh");
-  cmd.push_back("-c");
-  cmd.push_back("echo -n stdout-here; echo -n stderr-there > /dev/stderr");
+TEST_F(SubprocessTest, SynchronousEchoTest) {
+  vector<string> cmd = {
+    "/bin/sh",
+    "-c",
+    "echo -n stdout-here; echo -n stderr-there > /dev/stderr"};
   int rc = -1;
   string stdout;
   ASSERT_TRUE(Subprocess::SynchronousExec(cmd, &rc, &stdout));
@@ -96,26 +106,21 @@
   EXPECT_EQ("stdout-herestderr-there", stdout);
 }
 
-TEST(SubprocessTest, SynchronousEchoNoOutputTest) {
-  vector<string> cmd;
-  cmd.push_back("/bin/sh");
-  cmd.push_back("-c");
-  cmd.push_back("echo test");
+TEST_F(SubprocessTest, SynchronousEchoNoOutputTest) {
+  vector<string> cmd = {
+      "/bin/sh",
+      "-c",
+      "echo test"};
   int rc = -1;
   ASSERT_TRUE(Subprocess::SynchronousExec(cmd, &rc, nullptr));
   EXPECT_EQ(0, rc);
 }
 
 namespace {
-void CallbackBad(int return_code, const string& output, void *p) {
+void CallbackBad(int return_code, const string& output, void* p) {
   CHECK(false) << "should never be called.";
 }
 
-struct CancelTestData {
-  bool spawned;
-  GMainLoop *loop;
-};
-
 // TODO(garnold) this test method uses test_http_server as a representative for
 // interactive processes that can be spawned/terminated at will. This causes us
 // to go through hoops when spawning this process (e.g. obtaining the port
@@ -124,9 +129,7 @@
 // (doesn't have to be able to communicate through a temp file) and the test
 // code below; for example, it sounds like a brain dead sleep loop with proper
 // signal handlers could be used instead.
-gboolean StartAndCancelInRunLoop(gpointer data) {
-  CancelTestData* cancel_test_data = reinterpret_cast<CancelTestData*>(data);
-
+void StartAndCancelInRunLoop(bool* spawned) {
   // Create a temp file for test_http_server to communicate its port number.
   char temp_file_name[] = "/tmp/subprocess_unittest-test_http_server-XXXXXX";
   int temp_fd = mkstemp(temp_file_name);
@@ -139,7 +142,7 @@
   cmd.push_back(temp_file_name);
   uint32_t tag = Subprocess::Get().Exec(cmd, CallbackBad, nullptr);
   EXPECT_NE(0, tag);
-  cancel_test_data->spawned = true;
+  *spawned = true;
   printf("test http server spawned\n");
   // Wait for server to be up and running
   TimeDelta total_wait_time;
@@ -174,13 +177,10 @@
   CHECK_GT(local_server_port, 0);
   LOG(INFO) << "server listening on port " << local_server_port;
   Subprocess::Get().CancelExec(tag);
-  return FALSE;
 }
-}  // namespace
 
-gboolean ExitWhenDone(gpointer data) {
-  CancelTestData* cancel_test_data = reinterpret_cast<CancelTestData*>(data);
-  if (cancel_test_data->spawned && !Subprocess::Get().SubprocessInFlight()) {
+void ExitWhenDone(bool* spawned) {
+  if (*spawned && !Subprocess::Get().SubprocessInFlight()) {
     // tear down the sub process
     printf("tear down time\n");
     int status = test_utils::System(
@@ -189,21 +189,37 @@
     EXPECT_NE(-1, status) << "system() failed";
     EXPECT_TRUE(WIFEXITED(status))
         << "command failed to run or died abnormally";
-    g_main_loop_quit(cancel_test_data->loop);
-    return FALSE;
+    MessageLoop::current()->BreakLoop();
+  } else {
+    // Re-run this callback again in 10 ms.
+    MessageLoop::current()->PostDelayedTask(
+        FROM_HERE,
+        base::Bind(&ExitWhenDone, spawned),
+        TimeDelta::FromMilliseconds(10));
   }
-  return TRUE;
 }
 
-TEST(SubprocessTest, CancelTest) {
-  GMainLoop *loop = g_main_loop_new(g_main_context_default(), FALSE);
-  CancelTestData cancel_test_data;
-  cancel_test_data.spawned = false;
-  cancel_test_data.loop = loop;
-  g_timeout_add(100, &StartAndCancelInRunLoop, &cancel_test_data);
-  g_timeout_add(10, &ExitWhenDone, &cancel_test_data);
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+}  // namespace
+
+TEST_F(SubprocessTest, CancelTest) {
+  bool spawned = false;
+  loop_.PostDelayedTask(
+      FROM_HERE,
+      base::Bind(&StartAndCancelInRunLoop, &spawned),
+      TimeDelta::FromMilliseconds(100));
+  loop_.PostDelayedTask(
+      FROM_HERE,
+      base::Bind(&ExitWhenDone, &spawned),
+      TimeDelta::FromMilliseconds(10));
+  loop_.Run();
+  // This test would leak a callback that runs when the child process exits
+  // unless we wait for it to run.
+  chromeos::MessageLoopRunUntil(
+      &loop_,
+      TimeDelta::FromSeconds(10),
+      base::Bind([] {
+        return Subprocess::Get().subprocess_records_.empty();
+      }));
 }
 
 }  // namespace chromeos_update_engine
diff --git a/update_attempter.cc b/update_attempter.cc
index 678f38c..a1b2f11 100644
--- a/update_attempter.cc
+++ b/update_attempter.cc
@@ -19,7 +19,9 @@
 #include <base/rand_util.h>
 #include <base/strings/string_util.h>
 #include <base/strings/stringprintf.h>
+#include <chromeos/bind_lambda.h>
 #include <chromeos/dbus/service_constants.h>
+#include <chromeos/message_loops/message_loop.h>
 
 #include <glib.h>
 #include <metrics/metrics_library.h>
@@ -57,6 +59,7 @@
 using base::Time;
 using base::TimeDelta;
 using base::TimeTicks;
+using chromeos::MessageLoop;
 using chromeos_update_manager::EvalStatus;
 using chromeos_update_manager::Policy;
 using chromeos_update_manager::UpdateCheckParams;
@@ -1374,47 +1377,41 @@
 }
 
 void UpdateAttempter::SetupCpuSharesManagement() {
-  if (manage_shares_source_) {
+  if (manage_shares_id_ != MessageLoop::kTaskIdNull) {
     LOG(ERROR) << "Cpu shares timeout source hasn't been destroyed.";
     CleanupCpuSharesManagement();
   }
   const int kCpuSharesTimeout = 2 * 60 * 60;  // 2 hours
-  manage_shares_source_ = g_timeout_source_new_seconds(kCpuSharesTimeout);
-  g_source_set_callback(manage_shares_source_,
-                        StaticManageCpuSharesCallback,
-                        this,
-                        nullptr);
-  g_source_attach(manage_shares_source_, nullptr);
+  manage_shares_id_ = MessageLoop::current()->PostDelayedTask(
+      FROM_HERE,
+      Bind(&UpdateAttempter::ManageCpuSharesCallback, base::Unretained(this)),
+      TimeDelta::FromSeconds(kCpuSharesTimeout));
   SetCpuShares(utils::kCpuSharesLow);
 }
 
 void UpdateAttempter::CleanupCpuSharesManagement() {
-  if (manage_shares_source_) {
-    g_source_destroy(manage_shares_source_);
-    manage_shares_source_ = nullptr;
+  if (manage_shares_id_ != MessageLoop::kTaskIdNull) {
+    // The UpdateAttempter is instantiated by default by the FakeSystemState,
+    // even when it is not used. We check the manage_shares_id_ before calling
+    // the MessageLoop::current() since the unit test using a FakeSystemState
+    // may have not define a MessageLoop for the current thread.
+    MessageLoop::current()->CancelTask(manage_shares_id_);
+    manage_shares_id_ = MessageLoop::kTaskIdNull;
   }
   SetCpuShares(utils::kCpuSharesNormal);
 }
 
-gboolean UpdateAttempter::StaticManageCpuSharesCallback(gpointer data) {
-  return reinterpret_cast<UpdateAttempter*>(data)->ManageCpuSharesCallback();
-}
-
-gboolean UpdateAttempter::StaticStartProcessing(gpointer data) {
-  reinterpret_cast<UpdateAttempter*>(data)->processor_->StartProcessing();
-  return FALSE;  // Don't call this callback again.
-}
-
 void UpdateAttempter::ScheduleProcessingStart() {
   LOG(INFO) << "Scheduling an action processor start.";
   start_action_processor_ = false;
-  g_idle_add(&StaticStartProcessing, this);
+  MessageLoop::current()->PostTask(
+      FROM_HERE,
+      Bind([this] { this->processor_->StartProcessing(); }));
 }
 
-bool UpdateAttempter::ManageCpuSharesCallback() {
+void UpdateAttempter::ManageCpuSharesCallback() {
   SetCpuShares(utils::kCpuSharesNormal);
-  manage_shares_source_ = nullptr;
-  return false;  // Destroy the timeout source.
+  manage_shares_id_ = MessageLoop::kTaskIdNull;
 }
 
 void UpdateAttempter::DisableDeltaUpdateIfNeeded() {
diff --git a/update_attempter.h b/update_attempter.h
index c3495da..1326365 100644
--- a/update_attempter.h
+++ b/update_attempter.h
@@ -283,12 +283,8 @@
   void CleanupCpuSharesManagement();
 
   // The cpu shares timeout source callback sets the current cpu shares to
-  // normal. Returns false so that GLib destroys the timeout source.
-  static gboolean StaticManageCpuSharesCallback(gpointer data);
-  bool ManageCpuSharesCallback();
-
-  // Callback to start the action processor.
-  static gboolean StaticStartProcessing(gpointer data);
+  // normal.
+  void ManageCpuSharesCallback();
 
   // Schedules an event loop callback to start the action processor. This is
   // scheduled asynchronously to unblock the event loop.
@@ -432,8 +428,9 @@
   // Current cpu shares.
   utils::CpuShares shares_ = utils::kCpuSharesNormal;
 
-  // The cpu shares management timeout source.
-  GSource* manage_shares_source_ = nullptr;
+  // The cpu shares management timeout task id.
+  chromeos::MessageLoop::TaskId manage_shares_id_{
+      chromeos::MessageLoop::kTaskIdNull};
 
   // Set to true if an update download is active (and BytesReceived
   // will be called), set to false otherwise.
diff --git a/update_attempter_unittest.cc b/update_attempter_unittest.cc
index 6ef1d7a..41a8232 100644
--- a/update_attempter_unittest.cc
+++ b/update_attempter_unittest.cc
@@ -9,7 +9,11 @@
 #include <memory>
 
 #include <base/files/file_util.h>
+#include <chromeos/bind_lambda.h>
 #include <chromeos/dbus/service_constants.h>
+#include <chromeos/message_loops/glib_message_loop.h>
+#include <chromeos/message_loops/message_loop.h>
+#include <chromeos/message_loops/message_loop_utils.h>
 #include <gtest/gtest.h>
 #include <policy/libpolicy.h>
 #include <policy/mock_device_policy.h>
@@ -33,6 +37,7 @@
 
 using base::Time;
 using base::TimeDelta;
+using chromeos::MessageLoop;
 using std::string;
 using std::unique_ptr;
 using testing::A;
@@ -84,7 +89,7 @@
   bool schedule_updates_called() const { return schedule_updates_called_; }
 
   // Need to expose forced_omaha_url_ so we can test it.
-  const std::string& forced_omaha_url() const { return forced_omaha_url_; }
+  const string& forced_omaha_url() const { return forced_omaha_url_; }
 
  private:
   bool schedule_updates_called_ = false;
@@ -96,12 +101,12 @@
   UpdateAttempterTest()
       : attempter_(&fake_system_state_, &dbus_),
         mock_connection_manager(&fake_system_state_),
-        loop_(nullptr),
         fake_dbus_system_bus_(reinterpret_cast<DBusGConnection*>(1)),
         fake_dbus_debugd_proxy_(reinterpret_cast<DBusGProxy*>(2)) {
     // Override system state members.
     fake_system_state_.set_connection_manager(&mock_connection_manager);
     fake_system_state_.set_update_attempter(&attempter_);
+    loop_.SetAsCurrent();
 
     // Finish initializing the attempter.
     attempter_.Init();
@@ -120,7 +125,7 @@
     EXPECT_NE(nullptr, attempter_.system_state_);
     EXPECT_EQ(0, attempter_.http_response_code_);
     EXPECT_EQ(utils::kCpuSharesNormal, attempter_.shares_);
-    EXPECT_EQ(nullptr, attempter_.manage_shares_source_);
+    EXPECT_EQ(MessageLoop::kTaskIdNull, attempter_.manage_shares_id_);
     EXPECT_FALSE(attempter_.download_active_);
     EXPECT_EQ(UPDATE_STATUS_IDLE, attempter_.status_);
     EXPECT_EQ(0.0, attempter_.download_progress_);
@@ -160,52 +165,26 @@
 
   void TearDown() override {
     test_utils::RecursiveUnlinkDir(test_dir_);
+    EXPECT_EQ(0, MessageLoopRunMaxIterations(&loop_, 1));
   }
 
-  void QuitMainLoop();
-  static gboolean StaticQuitMainLoop(gpointer data);
+ public:
+  void ScheduleQuitMainLoop();
 
+  // Callbacks to run the different tests from the main loop.
   void UpdateTestStart();
   void UpdateTestVerify();
-  void RollbackTestStart(bool enterprise_rollback,
-                         bool valid_slot);
+  void RollbackTestStart(bool enterprise_rollback, bool valid_slot);
   void RollbackTestVerify();
-  static gboolean StaticUpdateTestStart(gpointer data);
-  static gboolean StaticUpdateTestVerify(gpointer data);
-  static gboolean StaticRollbackTestStart(gpointer data);
-  static gboolean StaticInvalidSlotRollbackTestStart(gpointer data);
-  static gboolean StaticEnterpriseRollbackTestStart(gpointer data);
-  static gboolean StaticRollbackTestVerify(gpointer data);
-
   void PingOmahaTestStart();
-  static gboolean StaticPingOmahaTestStart(gpointer data);
-
   void ReadScatterFactorFromPolicyTestStart();
-  static gboolean StaticReadScatterFactorFromPolicyTestStart(
-      gpointer data);
-
   void DecrementUpdateCheckCountTestStart();
-  static gboolean StaticDecrementUpdateCheckCountTestStart(
-      gpointer data);
-
   void NoScatteringDoneDuringManualUpdateTestStart();
-  static gboolean StaticNoScatteringDoneDuringManualUpdateTestStart(
-      gpointer data);
-
   void P2PNotEnabledStart();
-  static gboolean StaticP2PNotEnabled(gpointer data);
-
   void P2PEnabledStart();
-  static gboolean StaticP2PEnabled(gpointer data);
-
   void P2PEnabledInteractiveStart();
-  static gboolean StaticP2PEnabledInteractive(gpointer data);
-
   void P2PEnabledStartingFailsStart();
-  static gboolean StaticP2PEnabledStartingFails(gpointer data);
-
   void P2PEnabledHousekeepingFailsStart();
-  static gboolean StaticP2PEnabledHousekeepingFails(gpointer data);
 
   bool actual_using_p2p_for_downloading() {
     return actual_using_p2p_for_downloading_;
@@ -214,13 +193,16 @@
     return actual_using_p2p_for_sharing_;
   }
 
+  // TODO(deymo): Replace this with a FakeMessageLoop. Some of these tests use a
+  // real LibcurlHttpFetcher, which still requires a GlibMessageLoop.
+  chromeos::GlibMessageLoop loop_;
+
   FakeSystemState fake_system_state_;
   NiceMock<MockDBusWrapper> dbus_;
   UpdateAttempterUnderTest attempter_;
   NiceMock<MockActionProcessor>* processor_;
   NiceMock<MockPrefs>* prefs_;  // Shortcut to fake_system_state_->mock_prefs().
   NiceMock<MockConnectionManager> mock_connection_manager;
-  GMainLoop* loop_;
   // fake_dbus_xxx pointers will be non-null for comparison purposes, but won't
   // be valid objects so don't try to use them.
   DBusGConnection* fake_dbus_system_bus_;
@@ -232,6 +214,10 @@
   bool actual_using_p2p_for_sharing_;
 };
 
+void UpdateAttempterTest::ScheduleQuitMainLoop() {
+  loop_.PostTask(FROM_HERE, base::Bind([this] { this->loop_.BreakLoop(); }));
+}
+
 TEST_F(UpdateAttempterTest, ActionCompletedDownloadTest) {
   unique_ptr<MockHttpFetcher> fetcher(new MockHttpFetcher("", 0, nullptr));
   fetcher->FailTransfer(503);  // Sets the HTTP response code.
@@ -385,75 +371,6 @@
   EXPECT_EQ(UPDATE_STATUS_REPORTING_ERROR_EVENT, attempter_.status());
 }
 
-void UpdateAttempterTest::QuitMainLoop() {
-  g_main_loop_quit(loop_);
-}
-
-gboolean UpdateAttempterTest::StaticQuitMainLoop(gpointer data) {
-  reinterpret_cast<UpdateAttempterTest*>(data)->QuitMainLoop();
-  return FALSE;
-}
-
-gboolean UpdateAttempterTest::StaticUpdateTestStart(gpointer data) {
-  reinterpret_cast<UpdateAttempterTest*>(data)->UpdateTestStart();
-  return FALSE;
-}
-
-gboolean UpdateAttempterTest::StaticUpdateTestVerify(gpointer data) {
-  reinterpret_cast<UpdateAttempterTest*>(data)->UpdateTestVerify();
-  return FALSE;
-}
-
-gboolean UpdateAttempterTest::StaticRollbackTestStart(gpointer data) {
-  reinterpret_cast<UpdateAttempterTest*>(data)->RollbackTestStart(
-      false, true);
-  return FALSE;
-}
-
-gboolean UpdateAttempterTest::StaticInvalidSlotRollbackTestStart(
-    gpointer data) {
-  reinterpret_cast<UpdateAttempterTest*>(data)->RollbackTestStart(
-      false, false);
-  return FALSE;
-}
-
-gboolean UpdateAttempterTest::StaticEnterpriseRollbackTestStart(gpointer data) {
-  reinterpret_cast<UpdateAttempterTest*>(data)->RollbackTestStart(
-      true, true);
-  return FALSE;
-}
-
-gboolean UpdateAttempterTest::StaticRollbackTestVerify(gpointer data) {
-  reinterpret_cast<UpdateAttempterTest*>(data)->RollbackTestVerify();
-  return FALSE;
-}
-
-gboolean UpdateAttempterTest::StaticPingOmahaTestStart(gpointer data) {
-  reinterpret_cast<UpdateAttempterTest*>(data)->PingOmahaTestStart();
-  return FALSE;
-}
-
-gboolean UpdateAttempterTest::StaticReadScatterFactorFromPolicyTestStart(
-    gpointer data) {
-  UpdateAttempterTest* ua_test = reinterpret_cast<UpdateAttempterTest*>(data);
-  ua_test->ReadScatterFactorFromPolicyTestStart();
-  return FALSE;
-}
-
-gboolean UpdateAttempterTest::StaticDecrementUpdateCheckCountTestStart(
-    gpointer data) {
-  UpdateAttempterTest* ua_test = reinterpret_cast<UpdateAttempterTest*>(data);
-  ua_test->DecrementUpdateCheckCountTestStart();
-  return FALSE;
-}
-
-gboolean UpdateAttempterTest::StaticNoScatteringDoneDuringManualUpdateTestStart(
-    gpointer data) {
-  UpdateAttempterTest* ua_test = reinterpret_cast<UpdateAttempterTest*>(data);
-  ua_test->NoScatteringDoneDuringManualUpdateTestStart();
-  return FALSE;
-}
-
 namespace {
 // Actions that will be built as part of an update check.
 const string kUpdateActionTypes[] = {  // NOLINT(runtime/string)
@@ -499,7 +416,9 @@
   }
 
   attempter_.Update("", "", "", "", false, false);
-  g_idle_add(&StaticUpdateTestVerify, this);
+  loop_.PostTask(FROM_HERE,
+                 base::Bind(&UpdateAttempterTest::UpdateTestVerify,
+                            base::Unretained(this)));
 }
 
 void UpdateAttempterTest::UpdateTestVerify() {
@@ -516,7 +435,7 @@
   ASSERT_NE(nullptr, download_action);
   EXPECT_EQ(&attempter_, download_action->delegate());
   EXPECT_EQ(UPDATE_STATUS_CHECKING_FOR_UPDATE, attempter_.status());
-  g_main_loop_quit(loop_);
+  loop_.BreakLoop();
 }
 
 void UpdateAttempterTest::RollbackTestStart(
@@ -566,10 +485,12 @@
     EXPECT_CALL(*processor_, StartProcessing());
 
     EXPECT_TRUE(attempter_.Rollback(true));
-    g_idle_add(&StaticRollbackTestVerify, this);
+    loop_.PostTask(FROM_HERE,
+                   base::Bind(&UpdateAttempterTest::RollbackTestVerify,
+                              base::Unretained(this)));
   } else {
     EXPECT_FALSE(attempter_.Rollback(true));
-    g_main_loop_quit(loop_);
+    loop_.BreakLoop();
   }
 }
 
@@ -589,39 +510,38 @@
   EXPECT_EQ(install_plan->install_path, string("/dev/sdz3"));
   EXPECT_EQ(install_plan->kernel_install_path, string("/dev/sdz2"));
   EXPECT_EQ(install_plan->powerwash_required, true);
-  g_main_loop_quit(loop_);
+  loop_.BreakLoop();
 }
 
 TEST_F(UpdateAttempterTest, UpdateTest) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticUpdateTestStart, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(FROM_HERE,
+                 base::Bind(&UpdateAttempterTest::UpdateTestStart,
+                            base::Unretained(this)));
+  loop_.Run();
 }
 
 TEST_F(UpdateAttempterTest, RollbackTest) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticRollbackTestStart, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(FROM_HERE,
+                 base::Bind(&UpdateAttempterTest::RollbackTestStart,
+                            base::Unretained(this),
+                            false, true));
+  loop_.Run();
 }
 
 TEST_F(UpdateAttempterTest, InvalidSlotRollbackTest) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticInvalidSlotRollbackTestStart, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(FROM_HERE,
+                 base::Bind(&UpdateAttempterTest::RollbackTestStart,
+                            base::Unretained(this),
+                            false, false));
+  loop_.Run();
 }
 
 TEST_F(UpdateAttempterTest, EnterpriseRollbackTest) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticEnterpriseRollbackTestStart, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(FROM_HERE,
+                 base::Bind(&UpdateAttempterTest::RollbackTestStart,
+                            base::Unretained(this),
+                            true, true));
+  loop_.Run();
 }
 
 void UpdateAttempterTest::PingOmahaTestStart() {
@@ -630,7 +550,7 @@
                                      OmahaRequestAction::StaticType())));
   EXPECT_CALL(*processor_, StartProcessing());
   attempter_.PingOmaha();
-  g_idle_add(&StaticQuitMainLoop, this);
+  ScheduleQuitMainLoop();
 }
 
 TEST_F(UpdateAttempterTest, PingOmahaTest) {
@@ -639,11 +559,10 @@
   // Disable scheduling of subsequnet checks; we're using the DefaultPolicy in
   // testing, which is more permissive than we want to handle here.
   attempter_.DisableScheduleUpdates();
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticPingOmahaTestStart, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(FROM_HERE,
+                 base::Bind(&UpdateAttempterTest::PingOmahaTestStart,
+                            base::Unretained(this)));
+  chromeos::MessageLoopRunMaxIterations(&loop_, 100);
   EXPECT_EQ(UPDATE_STATUS_UPDATED_NEED_REBOOT, attempter_.status());
   EXPECT_TRUE(attempter_.schedule_updates_called());
 }
@@ -706,17 +625,12 @@
 }
 
 TEST_F(UpdateAttempterTest, P2PNotEnabled) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticP2PNotEnabled, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(FROM_HERE,
+                 base::Bind(&UpdateAttempterTest::P2PNotEnabledStart,
+                            base::Unretained(this)));
+  loop_.Run();
 }
-gboolean UpdateAttempterTest::StaticP2PNotEnabled(gpointer data) {
-  UpdateAttempterTest* ua_test = reinterpret_cast<UpdateAttempterTest*>(data);
-  ua_test->P2PNotEnabledStart();
-  return FALSE;
-}
+
 void UpdateAttempterTest::P2PNotEnabledStart() {
   // If P2P is not enabled, check that we do not attempt housekeeping
   // and do not convey that p2p is to be used.
@@ -725,24 +639,18 @@
   mock_p2p_manager.fake().SetP2PEnabled(false);
   EXPECT_CALL(mock_p2p_manager, PerformHousekeeping()).Times(0);
   attempter_.Update("", "", "", "", false, false);
-  EXPECT_FALSE(actual_using_p2p_for_downloading());
+  EXPECT_FALSE(actual_using_p2p_for_downloading_);
   EXPECT_FALSE(actual_using_p2p_for_sharing());
-  g_idle_add(&StaticQuitMainLoop, this);
+  ScheduleQuitMainLoop();
 }
 
 TEST_F(UpdateAttempterTest, P2PEnabledStartingFails) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticP2PEnabledStartingFails, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(FROM_HERE,
+                 base::Bind(&UpdateAttempterTest::P2PEnabledStartingFailsStart,
+                            base::Unretained(this)));
+  loop_.Run();
 }
-gboolean UpdateAttempterTest::StaticP2PEnabledStartingFails(
-    gpointer data) {
-  UpdateAttempterTest* ua_test = reinterpret_cast<UpdateAttempterTest*>(data);
-  ua_test->P2PEnabledStartingFailsStart();
-  return FALSE;
-}
+
 void UpdateAttempterTest::P2PEnabledStartingFailsStart() {
   // If p2p is enabled, but starting it fails ensure we don't do
   // any housekeeping and do not convey that p2p should be used.
@@ -755,22 +663,17 @@
   attempter_.Update("", "", "", "", false, false);
   EXPECT_FALSE(actual_using_p2p_for_downloading());
   EXPECT_FALSE(actual_using_p2p_for_sharing());
-  g_idle_add(&StaticQuitMainLoop, this);
+  ScheduleQuitMainLoop();
 }
 
 TEST_F(UpdateAttempterTest, P2PEnabledHousekeepingFails) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticP2PEnabledHousekeepingFails, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(
+      FROM_HERE,
+      base::Bind(&UpdateAttempterTest::P2PEnabledHousekeepingFailsStart,
+                 base::Unretained(this)));
+  loop_.Run();
 }
-gboolean UpdateAttempterTest::StaticP2PEnabledHousekeepingFails(
-    gpointer data) {
-  UpdateAttempterTest* ua_test = reinterpret_cast<UpdateAttempterTest*>(data);
-  ua_test->P2PEnabledHousekeepingFailsStart();
-  return FALSE;
-}
+
 void UpdateAttempterTest::P2PEnabledHousekeepingFailsStart() {
   // If p2p is enabled, starting it works but housekeeping fails, ensure
   // we do not convey p2p is to be used.
@@ -783,21 +686,16 @@
   attempter_.Update("", "", "", "", false, false);
   EXPECT_FALSE(actual_using_p2p_for_downloading());
   EXPECT_FALSE(actual_using_p2p_for_sharing());
-  g_idle_add(&StaticQuitMainLoop, this);
+  ScheduleQuitMainLoop();
 }
 
 TEST_F(UpdateAttempterTest, P2PEnabled) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticP2PEnabled, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(FROM_HERE,
+                 base::Bind(&UpdateAttempterTest::P2PEnabledStart,
+                            base::Unretained(this)));
+  loop_.Run();
 }
-gboolean UpdateAttempterTest::StaticP2PEnabled(gpointer data) {
-  UpdateAttempterTest* ua_test = reinterpret_cast<UpdateAttempterTest*>(data);
-  ua_test->P2PEnabledStart();
-  return FALSE;
-}
+
 void UpdateAttempterTest::P2PEnabledStart() {
   MockP2PManager mock_p2p_manager;
   fake_system_state_.set_p2p_manager(&mock_p2p_manager);
@@ -810,21 +708,16 @@
   attempter_.Update("", "", "", "", false, false);
   EXPECT_TRUE(actual_using_p2p_for_downloading());
   EXPECT_TRUE(actual_using_p2p_for_sharing());
-  g_idle_add(&StaticQuitMainLoop, this);
+  ScheduleQuitMainLoop();
 }
 
 TEST_F(UpdateAttempterTest, P2PEnabledInteractive) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticP2PEnabledInteractive, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(FROM_HERE,
+                 base::Bind(&UpdateAttempterTest::P2PEnabledInteractiveStart,
+                            base::Unretained(this)));
+  loop_.Run();
 }
-gboolean UpdateAttempterTest::StaticP2PEnabledInteractive(gpointer data) {
-  UpdateAttempterTest* ua_test = reinterpret_cast<UpdateAttempterTest*>(data);
-  ua_test->P2PEnabledInteractiveStart();
-  return FALSE;
-}
+
 void UpdateAttempterTest::P2PEnabledInteractiveStart() {
   MockP2PManager mock_p2p_manager;
   fake_system_state_.set_p2p_manager(&mock_p2p_manager);
@@ -838,15 +731,15 @@
   attempter_.Update("", "", "", "", false, true /* interactive */);
   EXPECT_FALSE(actual_using_p2p_for_downloading());
   EXPECT_TRUE(actual_using_p2p_for_sharing());
-  g_idle_add(&StaticQuitMainLoop, this);
+  ScheduleQuitMainLoop();
 }
 
 TEST_F(UpdateAttempterTest, ReadScatterFactorFromPolicy) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticReadScatterFactorFromPolicyTestStart, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(
+      FROM_HERE,
+      base::Bind(&UpdateAttempterTest::ReadScatterFactorFromPolicyTestStart,
+                 base::Unretained(this)));
+  loop_.Run();
 }
 
 // Tests that the scatter_factor_in_seconds value is properly fetched
@@ -868,15 +761,15 @@
   attempter_.Update("", "", "", "", false, false);
   EXPECT_EQ(scatter_factor_in_seconds, attempter_.scatter_factor_.InSeconds());
 
-  g_idle_add(&StaticQuitMainLoop, this);
+  ScheduleQuitMainLoop();
 }
 
 TEST_F(UpdateAttempterTest, DecrementUpdateCheckCountTest) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticDecrementUpdateCheckCountTestStart, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(
+      FROM_HERE,
+      base::Bind(&UpdateAttempterTest::DecrementUpdateCheckCountTestStart,
+                 base::Unretained(this)));
+  loop_.Run();
 }
 
 void UpdateAttempterTest::DecrementUpdateCheckCountTestStart() {
@@ -924,15 +817,14 @@
   EXPECT_TRUE(fake_prefs.GetInt64(kPrefsUpdateCheckCount, &new_value));
   EXPECT_EQ(initial_value, new_value);
 
-  g_idle_add(&StaticQuitMainLoop, this);
+  ScheduleQuitMainLoop();
 }
 
 TEST_F(UpdateAttempterTest, NoScatteringDoneDuringManualUpdateTestStart) {
-  loop_ = g_main_loop_new(g_main_context_default(), FALSE);
-  g_idle_add(&StaticNoScatteringDoneDuringManualUpdateTestStart, this);
-  g_main_loop_run(loop_);
-  g_main_loop_unref(loop_);
-  loop_ = nullptr;
+  loop_.PostTask(FROM_HERE, base::Bind(
+      &UpdateAttempterTest::NoScatteringDoneDuringManualUpdateTestStart,
+      base::Unretained(this)));
+  loop_.Run();
 }
 
 void UpdateAttempterTest::NoScatteringDoneDuringManualUpdateTestStart() {
@@ -977,7 +869,7 @@
       attempter_.omaha_request_params_->update_check_count_wait_enabled());
   EXPECT_FALSE(fake_prefs.Exists(kPrefsUpdateCheckCount));
 
-  g_idle_add(&StaticQuitMainLoop, this);
+  ScheduleQuitMainLoop();
 }
 
 // Checks that we only report daily metrics at most every 24 hours.
diff --git a/utils.cc b/utils.cc
index ec05062..03f642a 100644
--- a/utils.cc
+++ b/utils.cc
@@ -29,6 +29,7 @@
 #include <base/files/file_path.h>
 #include <base/files/file_util.h>
 #include <base/files/scoped_file.h>
+#include <base/location.h>
 #include <base/logging.h>
 #include <base/posix/eintr_wrapper.h>
 #include <base/rand_util.h>
@@ -38,6 +39,7 @@
 #include <base/strings/stringprintf.h>
 #include <chromeos/data_encoding.h>
 #include <chromeos/key_value_store.h>
+#include <chromeos/message_loops/message_loop.h>
 #include <glib.h>
 
 #include "update_engine/clock_interface.h"
@@ -796,7 +798,7 @@
   return true;
 }
 
-bool IsExtFilesystem(const std::string& device) {
+bool IsExtFilesystem(const string& device) {
   chromeos::Blob header;
   // The first 2 KiB is enough to read the ext2 superblock (located at offset
   // 1024).
@@ -805,7 +807,7 @@
   return GetExt3Size(header.data(), header.size(), nullptr, nullptr);
 }
 
-bool IsSquashfsFilesystem(const std::string& device) {
+bool IsSquashfsFilesystem(const string& device) {
   chromeos::Blob header;
   // The first 96 is enough to read the squashfs superblock.
   const ssize_t kSquashfsSuperBlockSize = 96;
@@ -900,7 +902,7 @@
 namespace {
 // Do the actual trigger. We do it as a main-loop callback to (try to) get a
 // consistent stack trace.
-gboolean TriggerCrashReporterUpload(void* unused) {
+void TriggerCrashReporterUpload() {
   pid_t pid = fork();
   CHECK_GE(pid, 0) << "fork failed";  // fork() failed. Something is very wrong.
   if (pid == 0) {
@@ -910,12 +912,13 @@
   // We are the parent. Wait for child to terminate.
   pid_t result = waitpid(pid, nullptr, 0);
   LOG_IF(ERROR, result < 0) << "waitpid() failed";
-  return FALSE;  // Don't call this callback again
 }
 }  // namespace
 
 void ScheduleCrashReporterUpload() {
-  g_idle_add(&TriggerCrashReporterUpload, nullptr);
+  chromeos::MessageLoop::current()->PostTask(
+      FROM_HERE,
+      base::Bind(&TriggerCrashReporterUpload));
 }
 
 bool SetCpuShares(CpuShares shares) {
@@ -938,16 +941,6 @@
   return base::RandInt(min, max);
 }
 
-gboolean GlibRunClosure(gpointer data) {
-  base::Closure* callback = reinterpret_cast<base::Closure*>(data);
-  callback->Run();
-  return FALSE;
-}
-
-void GlibDestroyClosure(gpointer data) {
-  delete reinterpret_cast<base::Closure*>(data);
-}
-
 string FormatSecs(unsigned secs) {
   return FormatTimeDelta(TimeDelta::FromSeconds(secs));
 }
@@ -1647,7 +1640,7 @@
 
 bool GetMinorVersion(base::FilePath path, uint32_t* minor_version) {
   chromeos::KeyValueStore store;
-  std::string result;
+  string result;
   if (base::PathExists(path) && store.Load(path) &&
       store.GetString("PAYLOAD_MINOR_VERSION", &result)) {
     if (!base::StringToUint(result, minor_version)) {
@@ -1659,7 +1652,7 @@
   return false;
 }
 
-bool ReadExtents(const std::string& path, const vector<Extent>& extents,
+bool ReadExtents(const string& path, const vector<Extent>& extents,
                  chromeos::Blob* out_data, ssize_t out_data_size,
                  size_t block_size) {
   chromeos::Blob data(out_data_size);
diff --git a/utils.h b/utils.h
index 7a506c5..f33c011 100644
--- a/utils.h
+++ b/utils.h
@@ -329,12 +329,6 @@
 // success, false otherwise.
 bool SetCpuShares(CpuShares shares);
 
-// Assumes |data| points to a Closure. Runs it and returns FALSE;
-gboolean GlibRunClosure(gpointer data);
-
-// Destroys the Closure pointed by |data|.
-void GlibDestroyClosure(gpointer data);
-
 // Converts seconds into human readable notation including days, hours, minutes
 // and seconds. For example, 185 will yield 3m5s, 4300 will yield 1h11m40s, and
 // 360000 will yield 4d4h0m0s.  Zero padding not applied. Seconds are always
diff --git a/utils_unittest.cc b/utils_unittest.cc
index 2c30d30..308345a 100644
--- a/utils_unittest.cc
+++ b/utils_unittest.cc
@@ -18,6 +18,8 @@
 #include <base/files/scoped_temp_dir.h>
 #include <base/strings/string_util.h>
 #include <base/strings/stringprintf.h>
+#include <chromeos/message_loops/fake_message_loop.h>
+#include <chromeos/message_loops/message_loop_utils.h>
 #include <gtest/gtest.h>
 
 #include "update_engine/fake_clock.h"
@@ -26,6 +28,7 @@
 #include "update_engine/prefs.h"
 #include "update_engine/test_utils.h"
 
+using chromeos::FakeMessageLoop;
 using std::map;
 using std::string;
 using std::vector;
@@ -411,22 +414,15 @@
                       0xb0, 0x04, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00});
 }
 
-namespace {
-gboolean  TerminateScheduleCrashReporterUploadTest(void* arg) {
-  GMainLoop* loop = reinterpret_cast<GMainLoop*>(arg);
-  g_main_loop_quit(loop);
-  return FALSE;  // Don't call this callback again
-}
-}  // namespace
-
 TEST(UtilsTest, ScheduleCrashReporterUploadTest) {
   // Not much to test. At least this tests for memory leaks, crashes,
   // log errors.
-  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
+  FakeMessageLoop loop(nullptr);
+  loop.SetAsCurrent();
   utils::ScheduleCrashReporterUpload();
-  g_timeout_add_seconds(1, &TerminateScheduleCrashReporterUploadTest, loop);
-  g_main_loop_run(loop);
-  g_main_loop_unref(loop);
+  // Test that we scheduled one callback from the crash reporter.
+  EXPECT_EQ(1, chromeos::MessageLoopRunMaxIterations(&loop, 100));
+  EXPECT_FALSE(loop.PendingTasks());
 }
 
 TEST(UtilsTest, FormatTimeDeltaTest) {