Cleanup the RetryTimeoutCallback().

When canceling a request or destroying the LibcurlHttpFetcher, a
RetryTimeoutCallback callback could be leaked if the fetcher was
waiting on a network retry.

This patch keeps track of the retry callback and cancels it on CleanUp,
making sure the callback is not leaked.

Bug: 34178297
Test: Added unittest to trigger this case.
Change-Id: I7016641a7f31429933779e55c77cbabb6289c3dd
diff --git a/common/http_fetcher_unittest.cc b/common/http_fetcher_unittest.cc
index f6a76ae..dcc1573 100644
--- a/common/http_fetcher_unittest.cc
+++ b/common/http_fetcher_unittest.cc
@@ -795,7 +795,7 @@
   ~FailureHttpFetcherTestDelegate() override {
     if (server_) {
       LOG(INFO) << "Stopping server in destructor";
-      delete server_;
+      server_.reset();
       LOG(INFO) << "server stopped";
     }
   }
@@ -804,20 +804,23 @@
                      const void* bytes, size_t length) override {
     if (server_) {
       LOG(INFO) << "Stopping server in ReceivedBytes";
-      delete server_;
+      server_.reset();
       LOG(INFO) << "server stopped";
-      server_ = nullptr;
     }
   }
   void TransferComplete(HttpFetcher* fetcher, bool successful) override {
     EXPECT_FALSE(successful);
     EXPECT_EQ(0, fetcher->http_response_code());
+    times_transfer_complete_called_++;
     MessageLoop::current()->BreakLoop();
   }
   void TransferTerminated(HttpFetcher* fetcher) override {
-    ADD_FAILURE();
+    times_transfer_terminated_called_++;
+    MessageLoop::current()->BreakLoop();
   }
-  PythonHttpServer* server_;
+  unique_ptr<PythonHttpServer> server_;
+  int times_transfer_terminated_called_{0};
+  int times_transfer_complete_called_{0};
 };
 }  // namespace
 
@@ -827,19 +830,19 @@
   // available at all.
   if (this->test_.IsMock())
     return;
-  {
-    FailureHttpFetcherTestDelegate delegate(nullptr);
-    unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
-    fetcher->set_delegate(&delegate);
+  FailureHttpFetcherTestDelegate delegate(nullptr);
+  unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
+  fetcher->set_delegate(&delegate);
 
-    this->loop_.PostTask(FROM_HERE,
-                         base::Bind(StartTransfer,
-                                    fetcher.get(),
-                                    "http://host_doesnt_exist99999999"));
-    this->loop_.Run();
+  this->loop_.PostTask(
+      FROM_HERE,
+      base::Bind(
+          StartTransfer, fetcher.get(), "http://host_doesnt_exist99999999"));
+  this->loop_.Run();
+  EXPECT_EQ(1, delegate.times_transfer_complete_called_);
+  EXPECT_EQ(0, delegate.times_transfer_terminated_called_);
 
-    // Exiting and testing happens in the delegate
-  }
+  // Exiting and testing happens in the delegate
 }
 
 TYPED_TEST(HttpFetcherTest, NoResponseTest) {
@@ -867,6 +870,8 @@
       fetcher.get(),
       LocalServerUrlForPath(port, "/hang")));
   this->loop_.Run();
+  EXPECT_EQ(1, delegate.times_transfer_complete_called_);
+  EXPECT_EQ(0, delegate.times_transfer_terminated_called_);
 
   // Check that no other callback runs in the next two seconds. That would
   // indicate a leaked callback.
@@ -885,29 +890,78 @@
   // retries and aborts correctly.
   if (this->test_.IsMock())
     return;
-  {
-    PythonHttpServer* server = new PythonHttpServer();
-    int port = server->GetPort();
-    ASSERT_TRUE(server->started_);
+  PythonHttpServer* server = new PythonHttpServer();
+  int port = server->GetPort();
+  ASSERT_TRUE(server->started_);
 
-    // Handles destruction and claims ownership.
-    FailureHttpFetcherTestDelegate delegate(server);
-    unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
-    fetcher->set_delegate(&delegate);
+  // Handles destruction and claims ownership.
+  FailureHttpFetcherTestDelegate delegate(server);
+  unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
+  fetcher->set_delegate(&delegate);
 
-    this->loop_.PostTask(FROM_HERE, base::Bind(
-        StartTransfer,
-        fetcher.get(),
-        LocalServerUrlForPath(port,
-                              base::StringPrintf("/flaky/%d/%d/%d/%d",
-                                                 kBigLength,
-                                                 kFlakyTruncateLength,
-                                                 kFlakySleepEvery,
-                                                 kFlakySleepSecs))));
-    this->loop_.Run();
+  this->loop_.PostTask(
+      FROM_HERE,
+      base::Bind(StartTransfer,
+                 fetcher.get(),
+                 LocalServerUrlForPath(port,
+                                       base::StringPrintf("/flaky/%d/%d/%d/%d",
+                                                          kBigLength,
+                                                          kFlakyTruncateLength,
+                                                          kFlakySleepEvery,
+                                                          kFlakySleepSecs))));
+  this->loop_.Run();
+  EXPECT_EQ(1, delegate.times_transfer_complete_called_);
+  EXPECT_EQ(0, delegate.times_transfer_terminated_called_);
 
-    // Exiting and testing happens in the delegate
-  }
+  // Exiting and testing happens in the delegate
+}
+
+// Test that we can cancel a transfer while it is still trying to connect to the
+// server. This test kills the server after a few bytes are received.
+TYPED_TEST(HttpFetcherTest, TerminateTransferWhenServerDiedTest) {
+  if (this->test_.IsMock() || !this->test_.IsHttpSupported())
+    return;
+
+  PythonHttpServer* server = new PythonHttpServer();
+  int port = server->GetPort();
+  ASSERT_TRUE(server->started_);
+
+  // Handles destruction and claims ownership.
+  FailureHttpFetcherTestDelegate delegate(server);
+  unique_ptr<HttpFetcher> fetcher(this->test_.NewSmallFetcher());
+  fetcher->set_delegate(&delegate);
+
+  this->loop_.PostTask(
+      FROM_HERE,
+      base::Bind(StartTransfer,
+                 fetcher.get(),
+                 LocalServerUrlForPath(port,
+                                       base::StringPrintf("/flaky/%d/%d/%d/%d",
+                                                          kBigLength,
+                                                          kFlakyTruncateLength,
+                                                          kFlakySleepEvery,
+                                                          kFlakySleepSecs))));
+  // Terminating the transfer after 3 seconds gives it a chance to contact the
+  // server and enter the retry loop.
+  this->loop_.PostDelayedTask(FROM_HERE,
+                              base::Bind(&HttpFetcher::TerminateTransfer,
+                                         base::Unretained(fetcher.get())),
+                              base::TimeDelta::FromSeconds(3));
+
+  // Exiting and testing happens in the delegate.
+  this->loop_.Run();
+  EXPECT_EQ(0, delegate.times_transfer_complete_called_);
+  EXPECT_EQ(1, delegate.times_transfer_terminated_called_);
+
+  // Check that no other callback runs in the next two seconds. That would
+  // indicate a leaked callback.
+  bool timeout = false;
+  auto callback = base::Bind([](bool* timeout) { *timeout = true; },
+                             base::Unretained(&timeout));
+  this->loop_.PostDelayedTask(
+      FROM_HERE, callback, base::TimeDelta::FromSeconds(2));
+  EXPECT_TRUE(this->loop_.RunOnce(true));
+  EXPECT_TRUE(timeout);
 }
 
 namespace {
diff --git a/libcurl_http_fetcher.cc b/libcurl_http_fetcher.cc
index ee31c8d..63c67c0 100644
--- a/libcurl_http_fetcher.cc
+++ b/libcurl_http_fetcher.cc
@@ -393,7 +393,7 @@
       http_response_code_ == 0 &&
       no_network_retry_count_ < no_network_max_retries_) {
     no_network_retry_count_++;
-    MessageLoop::current()->PostDelayedTask(
+    retry_task_id_ = MessageLoop::current()->PostDelayedTask(
         FROM_HERE,
         base::Bind(&LibcurlHttpFetcher::RetryTimeoutCallback,
                    base::Unretained(this)),
@@ -419,7 +419,7 @@
     if (HasProxy()) {
       // We have another proxy. Retry immediately.
       LOG(INFO) << "Retrying with next proxy setting";
-      MessageLoop::current()->PostTask(
+      retry_task_id_ = MessageLoop::current()->PostTask(
           FROM_HERE,
           base::Bind(&LibcurlHttpFetcher::RetryTimeoutCallback,
                      base::Unretained(this)));
@@ -446,7 +446,7 @@
     }
     // Need to restart transfer
     LOG(INFO) << "Restarting transfer to download the remaining bytes";
-    MessageLoop::current()->PostDelayedTask(
+    retry_task_id_ = MessageLoop::current()->PostDelayedTask(
         FROM_HERE,
         base::Bind(&LibcurlHttpFetcher::RetryTimeoutCallback,
                    base::Unretained(this)),
@@ -624,6 +624,7 @@
 }
 
 void LibcurlHttpFetcher::RetryTimeoutCallback() {
+  retry_task_id_ = MessageLoop::kTaskIdNull;
   if (transfer_paused_) {
     restart_transfer_on_unpause_ = true;
     return;
@@ -648,6 +649,9 @@
 }
 
 void LibcurlHttpFetcher::CleanUp() {
+  MessageLoop::current()->CancelTask(retry_task_id_);
+  retry_task_id_ = MessageLoop::kTaskIdNull;
+
   MessageLoop::current()->CancelTask(timeout_id_);
   timeout_id_ = MessageLoop::kTaskIdNull;
 
diff --git a/libcurl_http_fetcher.h b/libcurl_http_fetcher.h
index 1541ea4..5b16811 100644
--- a/libcurl_http_fetcher.h
+++ b/libcurl_http_fetcher.h
@@ -233,6 +233,9 @@
   // Seconds to wait before retrying a resume.
   int retry_seconds_{20};
 
+  // When waiting for a retry, the task id of the retry callback.
+  brillo::MessageLoop::TaskId retry_task_id_{brillo::MessageLoop::kTaskIdNull};
+
   // Number of resumes due to no network (e.g., HTTP response code 0).
   int no_network_retry_count_{0};
   int no_network_max_retries_{0};