Fetch local files asynchronously.

This patch implements a new fetcher that only handles local files.
While libcurl supports file:// urls, the stream can't be suspended when
accessing local files.

This new FileFetcher is based on the brillo::FileStream class which
properly handles the asynchronous reads from regular files.

Bug: 28866512
TEST=Added unittest. Deployed an update from a file:// URL.

(cherry picked from commit 2c131bbf81d8c02ade163b939c96e44aa93765e9)

Change-Id: I9949a0f214de992c2fd86c1d73aca1c1792f0de0
diff --git a/common/file_fetcher.cc b/common/file_fetcher.cc
new file mode 100644
index 0000000..77dadd1
--- /dev/null
+++ b/common/file_fetcher.cc
@@ -0,0 +1,187 @@
+//
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/common/file_fetcher.h"
+
+#include <algorithm>
+#include <string>
+
+#include <base/bind.h>
+#include <base/format_macros.h>
+#include <base/location.h>
+#include <base/logging.h>
+#include <base/strings/string_util.h>
+#include <base/strings/stringprintf.h>
+#include <brillo/streams/file_stream.h>
+
+#include "update_engine/common/certificate_checker.h"
+#include "update_engine/common/hardware_interface.h"
+#include "update_engine/common/platform_constants.h"
+
+using std::string;
+
+namespace {
+
+size_t kReadBufferSize = 16 * 1024;
+
+}  // namespace
+
+namespace chromeos_update_engine {
+
+// static
+bool FileFetcher::SupportedUrl(const string& url) {
+  // Note that we require the file path to start with a "/".
+  return base::StartsWith(
+      url, "file:///", base::CompareCase::INSENSITIVE_ASCII);
+}
+
+FileFetcher::~FileFetcher() {
+  LOG_IF(ERROR, transfer_in_progress_)
+      << "Destroying the fetcher while a transfer is in progress.";
+  CleanUp();
+}
+
+// Begins the transfer, which must not have already been started.
+void FileFetcher::BeginTransfer(const string& url) {
+  CHECK(!transfer_in_progress_);
+
+  if (!SupportedUrl(url)) {
+    LOG(ERROR) << "Unsupported file URL: " << url;
+    // No HTTP error code when the URL is not supported.
+    http_response_code_ = 0;
+    CleanUp();
+    if (delegate_)
+      delegate_->TransferComplete(this, false);
+    return;
+  }
+
+  string file_path = url.substr(strlen("file://"));
+  stream_ =
+      brillo::FileStream::Open(base::FilePath(file_path),
+                               brillo::Stream::AccessMode::READ,
+                               brillo::FileStream::Disposition::OPEN_EXISTING,
+                               nullptr);
+
+  if (!stream_) {
+    LOG(ERROR) << "Couldn't open " << file_path;
+    http_response_code_ = kHttpResponseNotFound;
+    CleanUp();
+    if (delegate_)
+      delegate_->TransferComplete(this, false);
+    return;
+  }
+  http_response_code_ = kHttpResponseOk;
+
+  if (offset_)
+    stream_->SetPosition(offset_, nullptr);
+  bytes_copied_ = 0;
+  transfer_in_progress_ = true;
+  ScheduleRead();
+}
+
+void FileFetcher::TerminateTransfer() {
+  CleanUp();
+  if (delegate_) {
+    // Note that after the callback returns this object may be destroyed.
+    delegate_->TransferTerminated(this);
+  }
+}
+
+void FileFetcher::ScheduleRead() {
+  if (transfer_paused_ || ongoing_read_ || !transfer_in_progress_)
+    return;
+
+  buffer_.resize(kReadBufferSize);
+  size_t bytes_to_read = buffer_.size();
+  if (data_length_ >= 0) {
+    bytes_to_read = std::min(static_cast<uint64_t>(bytes_to_read),
+                             data_length_ - bytes_copied_);
+  }
+
+  if (!bytes_to_read) {
+    OnReadDoneCallback(0);
+    return;
+  }
+
+  ongoing_read_ = stream_->ReadAsync(
+      buffer_.data(),
+      bytes_to_read,
+      base::Bind(&FileFetcher::OnReadDoneCallback, base::Unretained(this)),
+      base::Bind(&FileFetcher::OnReadErrorCallback, base::Unretained(this)),
+      nullptr);
+
+  if (!ongoing_read_) {
+    LOG(ERROR) << "Unable to schedule an asynchronous read from the stream.";
+    CleanUp();
+    if (delegate_)
+      delegate_->TransferComplete(this, false);
+  }
+}
+
+void FileFetcher::OnReadDoneCallback(size_t bytes_read) {
+  ongoing_read_ = false;
+  if (bytes_read == 0) {
+    CleanUp();
+    if (delegate_)
+      delegate_->TransferComplete(this, true);
+  } else {
+    bytes_copied_ += bytes_read;
+    if (delegate_)
+      delegate_->ReceivedBytes(this, buffer_.data(), bytes_read);
+    ScheduleRead();
+  }
+}
+
+void FileFetcher::OnReadErrorCallback(const brillo::Error* error) {
+  LOG(ERROR) << "Asynchronous read failed: " << error->GetMessage();
+  CleanUp();
+  if (delegate_)
+    delegate_->TransferComplete(this, false);
+}
+
+void FileFetcher::Pause() {
+  if (transfer_paused_) {
+    LOG(ERROR) << "Fetcher already paused.";
+    return;
+  }
+  transfer_paused_ = true;
+}
+
+void FileFetcher::Unpause() {
+  if (!transfer_paused_) {
+    LOG(ERROR) << "Resume attempted when fetcher not paused.";
+    return;
+  }
+  transfer_paused_ = false;
+  ScheduleRead();
+}
+
+void FileFetcher::CleanUp() {
+  if (stream_) {
+    stream_->CancelPendingAsyncOperations();
+    stream_->CloseBlocking(nullptr);
+    stream_.reset();
+  }
+  // Destroying the |stream_| releases the callback, so we don't have any
+  // ongoing read at this point.
+  ongoing_read_ = false;
+  buffer_ = brillo::Blob();
+
+  transfer_in_progress_ = false;
+  transfer_paused_ = false;
+}
+
+}  // namespace chromeos_update_engine
diff --git a/common/file_fetcher.h b/common/file_fetcher.h
new file mode 100644
index 0000000..2368b1d
--- /dev/null
+++ b/common/file_fetcher.h
@@ -0,0 +1,119 @@
+//
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef UPDATE_ENGINE_COMMON_FILE_FETCHER_H_
+#define UPDATE_ENGINE_COMMON_FILE_FETCHER_H_
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <base/logging.h>
+#include <base/macros.h>
+#include <brillo/message_loops/message_loop.h>
+#include <brillo/streams/stream.h>
+
+#include "update_engine/common/http_fetcher.h"
+
+// This is a concrete implementation of HttpFetcher that reads files
+// asynchronously.
+
+namespace chromeos_update_engine {
+
+class FileFetcher : public HttpFetcher {
+ public:
+  // Returns whether the passed url is supported.
+  static bool SupportedUrl(const std::string& url);
+
+  FileFetcher() : HttpFetcher(nullptr) {}
+
+  // Cleans up all internal state. Does not notify delegate.
+  ~FileFetcher() override;
+
+  // HttpFetcher overrides.
+  void SetOffset(off_t offset) override { offset_ = offset; }
+  void SetLength(size_t length) override { data_length_ = length; }
+  void UnsetLength() override { SetLength(0); }
+
+  // Begins the transfer if it hasn't already begun.
+  void BeginTransfer(const std::string& url) override;
+
+  // If the transfer is in progress, aborts the transfer early. The transfer
+  // cannot be resumed.
+  void TerminateTransfer() override;
+
+  // Ignore all extra headers for files.
+  void SetHeader(const std::string& header_name,
+                 const std::string& header_value) override {};
+
+  // Suspend the asynchronous file read.
+  void Pause() override;
+
+  // Resume the suspended file read.
+  void Unpause() override;
+
+  size_t GetBytesDownloaded() override {
+    return static_cast<size_t>(bytes_copied_);
+  }
+
+  // Ignore all the time limits for files.
+  void set_low_speed_limit(int low_speed_bps, int low_speed_sec) override {}
+  void set_connect_timeout(int connect_timeout_seconds) override {}
+  void set_max_retry_count(int max_retry_count) override {}
+
+ private:
+  // Cleans up the fetcher, resetting its status to a newly constructed one.
+  void CleanUp();
+
+  // Schedule a new asynchronous read if the stream is not paused and no other
+  // read is in process. This method can be called at any point.
+  void ScheduleRead();
+
+  // Called from the main loop when a single read from |stream_| succeeds or
+  // fails, calling OnReadDoneCallback() and OnReadErrorCallback() respectively.
+  void OnReadDoneCallback(size_t bytes_read);
+  void OnReadErrorCallback(const brillo::Error* error);
+
+  // Whether the transfer was started and didn't finish yet.
+  bool transfer_in_progress_{false};
+
+  // Whether the transfer is paused.
+  bool transfer_paused_{false};
+
+  // Whether there's an ongoing asynchronous read. When this value is true, the
+  // the |buffer_| is being used by the |stream_|.
+  bool ongoing_read_{false};
+
+  // Total number of bytes copied.
+  uint64_t bytes_copied_{0};
+
+  // The offset inside the file where the read should start.
+  uint64_t offset_{0};
+
+  // The length of the data or -1 if unknown (will read until EOF).
+  int64_t data_length_{-1};
+
+  brillo::StreamPtr stream_;
+
+  // The buffer used for reading from the stream.
+  brillo::Blob buffer_;
+
+  DISALLOW_COPY_AND_ASSIGN(FileFetcher);
+};
+
+}  // namespace chromeos_update_engine
+
+#endif  // UPDATE_ENGINE_COMMON_FILE_FETCHER_H_
diff --git a/common/file_fetcher_unittest.cc b/common/file_fetcher_unittest.cc
new file mode 100644
index 0000000..9c6b0ec
--- /dev/null
+++ b/common/file_fetcher_unittest.cc
@@ -0,0 +1,37 @@
+//
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/common/file_fetcher.h"
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "update_engine/common/test_utils.h"
+
+namespace chromeos_update_engine {
+
+class FileFetcherUnitTest : public ::testing::Test {};
+
+TEST_F(FileFetcherUnitTest, SupporterUrlsTest) {
+  EXPECT_TRUE(FileFetcher::SupportedUrl("file:///path/to/somewhere.bin"));
+  EXPECT_TRUE(FileFetcher::SupportedUrl("FILE:///I/LIKE/TO/SHOUT"));
+
+  EXPECT_FALSE(FileFetcher::SupportedUrl("file://relative"));
+  EXPECT_FALSE(FileFetcher::SupportedUrl("http:///no_http_here"));
+}
+
+}  // namespace chromeos_update_engine
diff --git a/common/http_fetcher_unittest.cc b/common/http_fetcher_unittest.cc
index 0d4b5da..9bc4373 100644
--- a/common/http_fetcher_unittest.cc
+++ b/common/http_fetcher_unittest.cc
@@ -42,6 +42,7 @@
 #include <gtest/gtest.h>
 
 #include "update_engine/common/fake_hardware.h"
+#include "update_engine/common/file_fetcher.h"
 #include "update_engine/common/http_common.h"
 #include "update_engine/common/libcurl_http_fetcher.h"
 #include "update_engine/common/mock_http_fetcher.h"
@@ -219,6 +220,7 @@
 
   virtual bool IsMock() const = 0;
   virtual bool IsMulti() const = 0;
+  virtual bool IsHttpSupported() const = 0;
 
   virtual void IgnoreServerAborting(HttpServer* server) const {}
 
@@ -251,6 +253,7 @@
 
   bool IsMock() const override { return true; }
   bool IsMulti() const override { return false; }
+  bool IsHttpSupported() const override { return true; }
 
   HttpServer* CreateServer() override {
     return new NullHttpServer;
@@ -291,6 +294,7 @@
 
   bool IsMock() const override { return false; }
   bool IsMulti() const override { return false; }
+  bool IsHttpSupported() const override { return true; }
 
   void IgnoreServerAborting(HttpServer* server) const override {
     // Nothing to do.
@@ -326,6 +330,42 @@
   bool IsMulti() const override { return true; }
 };
 
+class FileFetcherTest : public AnyHttpFetcherTest {
+ public:
+  // Necessary to unhide the definition in the base class.
+  using AnyHttpFetcherTest::NewLargeFetcher;
+  HttpFetcher* NewLargeFetcher(ProxyResolver* /* proxy_resolver */) override {
+    return new FileFetcher();
+  }
+
+  // Necessary to unhide the definition in the base class.
+  using AnyHttpFetcherTest::NewSmallFetcher;
+  HttpFetcher* NewSmallFetcher(ProxyResolver* proxy_resolver) override {
+    return NewLargeFetcher(proxy_resolver);
+  }
+
+  string BigUrl(in_port_t port) const override {
+    return "file://" + temp_file_.path();
+  }
+  string SmallUrl(in_port_t port) const override {
+    test_utils::WriteFileString(temp_file_.path(), "small contents");
+    return "file://" + temp_file_.path();
+  }
+  string ErrorUrl(in_port_t port) const override {
+    return "file:///path/to/non-existing-file";
+  }
+
+  bool IsMock() const override { return false; }
+  bool IsMulti() const override { return false; }
+  bool IsHttpSupported() const override { return false; }
+
+  void IgnoreServerAborting(HttpServer* server) const override {}
+
+  HttpServer* CreateServer() override { return new NullHttpServer; }
+
+ private:
+  test_utils::ScopedTempFile temp_file_{"ue_file_fetcher.XXXXXX"};
+};
 
 //
 // Infrastructure for type tests of HTTP fetcher.
@@ -363,7 +403,9 @@
 // Test case types list.
 typedef ::testing::Types<LibcurlHttpFetcherTest,
                          MockHttpFetcherTest,
-                         MultiRangeHttpFetcherTest> HttpFetcherTestTypes;
+                         MultiRangeHttpFetcherTest,
+                         FileFetcherTest>
+    HttpFetcherTestTypes;
 TYPED_TEST_CASE(HttpFetcherTest, HttpFetcherTestTypes);
 
 
@@ -394,6 +436,7 @@
   void TransferTerminated(HttpFetcher* fetcher) override {
     ADD_FAILURE();
     times_transfer_terminated_called_++;
+    MessageLoop::current()->BreakLoop();
   }
 
   // Are we expecting an error response? (default: no)
@@ -477,7 +520,7 @@
 }
 
 TYPED_TEST(HttpFetcherTest, ExtraHeadersInRequestTest) {
-  if (this->test_.IsMock())
+  if (this->test_.IsMock() || !this->test_.IsHttpSupported())
     return;
 
   HttpFetcherTestDelegate delegate;
@@ -498,7 +541,11 @@
   int port = server.GetPort();
   ASSERT_TRUE(server.started_);
 
-  StartTransfer(fetcher.get(), LocalServerUrlForPath(port, "/echo-headers"));
+  this->loop_.PostTask(
+      FROM_HERE,
+      base::Bind(StartTransfer,
+                 fetcher.get(),
+                 LocalServerUrlForPath(port, "/echo-headers")));
   this->loop_.Run();
 
   EXPECT_NE(string::npos,
@@ -571,7 +618,7 @@
 // This test will pause the fetcher while the download is not yet started
 // because it is waiting for the proxy to be resolved.
 TYPED_TEST(HttpFetcherTest, PauseWhileResolvingProxyTest) {
-  if (this->test_.IsMock())
+  if (this->test_.IsMock() || !this->test_.IsHttpSupported())
     return;
   MockProxyResolver mock_resolver;
   unique_ptr<HttpFetcher> fetcher(this->test_.NewLargeFetcher(&mock_resolver));
@@ -684,7 +731,7 @@
 }  // namespace
 
 TYPED_TEST(HttpFetcherTest, FlakyTest) {
-  if (this->test_.IsMock())
+  if (this->test_.IsMock() || !this->test_.IsHttpSupported())
     return;
   {
     FlakyHttpFetcherTestDelegate delegate;
@@ -897,7 +944,7 @@
 }  // namespace
 
 TYPED_TEST(HttpFetcherTest, SimpleRedirectTest) {
-  if (this->test_.IsMock())
+  if (this->test_.IsMock() || !this->test_.IsHttpSupported())
     return;
 
   unique_ptr<HttpServer> server(this->test_.CreateServer());
@@ -912,7 +959,7 @@
 }
 
 TYPED_TEST(HttpFetcherTest, MaxRedirectTest) {
-  if (this->test_.IsMock())
+  if (this->test_.IsMock() || !this->test_.IsHttpSupported())
     return;
 
   unique_ptr<HttpServer> server(this->test_.CreateServer());
@@ -928,7 +975,7 @@
 }
 
 TYPED_TEST(HttpFetcherTest, BeyondMaxRedirectTest) {
-  if (this->test_.IsMock())
+  if (this->test_.IsMock() || !this->test_.IsHttpSupported())
     return;
 
   unique_ptr<HttpServer> server(this->test_.CreateServer());
diff --git a/common/test_utils.h b/common/test_utils.h
index 60ec90e..ed64c80 100644
--- a/common/test_utils.h
+++ b/common/test_utils.h
@@ -174,7 +174,7 @@
     unlinker_.reset(new ScopedPathUnlinker(path_));
   }
 
-  const std::string& path() { return path_; }
+  const std::string& path() const { return path_; }
 
  private:
   std::string path_;