Use FileDescriptorPtr to implement async reads in verify stage

During FileSystemVerify stage, update_engine needs to read from source
or target partition to verify hashes && write verity. Previously we use
brillow's file stream to implement async reads. WIth Virtual AB
Compression, reading from target partition must go through libsnapshot's
interface(FileDescriptorPtr). So we replace brillo::FileStream with
FileDescriptorPtr for ease of integrating with VABC.

Test: serve an OTA update, verify: slot switch resume, regular resume

Change-Id: Id8531757468f60e3e21667b7761b83f7c2af2dbf
diff --git a/payload_consumer/filesystem_verifier_action.cc b/payload_consumer/filesystem_verifier_action.cc
index 61917ea..634f03f 100644
--- a/payload_consumer/filesystem_verifier_action.cc
+++ b/payload_consumer/filesystem_verifier_action.cc
@@ -20,17 +20,22 @@
 #include <fcntl.h>
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <unistd.h>
 
 #include <algorithm>
 #include <cstdlib>
+#include <memory>
 #include <string>
+#include <utility>
 
 #include <base/bind.h>
-#include <brillo/data_encoding.h>
-#include <brillo/streams/file_stream.h>
 #include <base/strings/string_util.h>
+#include <brillo/data_encoding.h>
+#include <brillo/message_loops/message_loop.h>
+#include <brillo/streams/file_stream.h>
 
 #include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
 
 using brillo::data_encoding::Base64Encode;
 using std::string;
@@ -59,18 +64,19 @@
     return;
   }
   install_plan_.Dump();
-
   StartPartitionHashing();
   abort_action_completer.set_should_complete(false);
 }
 
 void FilesystemVerifierAction::TerminateProcessing() {
+  brillo::MessageLoop::current()->CancelTask(pending_task_id_);
   cancelled_ = true;
   Cleanup(ErrorCode::kSuccess);  // error code is ignored if canceled_ is true.
 }
 
 void FilesystemVerifierAction::Cleanup(ErrorCode code) {
-  src_stream_.reset();
+  read_fd_.reset();
+  write_fd_.reset();
   // This memory is not used anymore.
   buffer_.clear();
 
@@ -88,6 +94,43 @@
   }
 }
 
+bool FilesystemVerifierAction::InitializeFdVABC() {
+  const InstallPlan::Partition& partition =
+      install_plan_.partitions[partition_index_];
+
+  read_fd_ = dynamic_control_->OpenCowReader(
+      partition.name, partition.source_path, true);
+  if (!read_fd_) {
+    LOG(ERROR) << "OpenCowReader(" << partition.name << ", "
+               << partition.source_path << ") failed.";
+    return false;
+  }
+  partition_size_ = partition.target_size;
+  // TODO(b/173432386): Support Verity writes for VABC.
+  CHECK_EQ(partition.fec_size, 0U);
+  CHECK_EQ(partition.hash_tree_size, 0U);
+  return true;
+}
+
+bool FilesystemVerifierAction::InitializeFd(const std::string& part_path) {
+  read_fd_ = FileDescriptorPtr(new EintrSafeFileDescriptor());
+  if (!read_fd_->Open(part_path.c_str(), O_RDONLY)) {
+    LOG(ERROR) << "Unable to open " << part_path << " for reading.";
+    return false;
+  }
+
+  // Can't re-use |read_fd_|, as verity writer may call `seek` to modify state
+  // of a file descriptor.
+  if (ShouldWriteVerity()) {
+    write_fd_ = FileDescriptorPtr(new EintrSafeFileDescriptor());
+    if (!write_fd_->Open(part_path.c_str(), O_RDWR)) {
+      LOG(ERROR) << "Unable to open " << part_path << " for Read/Write.";
+      return false;
+    }
+  }
+  return true;
+}
+
 void FilesystemVerifierAction::StartPartitionHashing() {
   if (partition_index_ == install_plan_.partitions.size()) {
     if (!install_plan_.untouched_dynamic_partitions.empty()) {
@@ -109,7 +152,6 @@
   }
   const InstallPlan::Partition& partition =
       install_plan_.partitions[partition_index_];
-
   string part_path;
   switch (verifier_step_) {
     case VerifierStep::kVerifySourceHash:
@@ -122,44 +164,40 @@
       break;
   }
 
-  if (part_path.empty()) {
-    if (partition_size_ == 0) {
-      LOG(INFO) << "Skip hashing partition " << partition_index_ << " ("
-                << partition.name << ") because size is 0.";
-      partition_index_++;
-      StartPartitionHashing();
-      return;
-    }
-    LOG(ERROR) << "Cannot hash partition " << partition_index_ << " ("
-               << partition.name
-               << ") because its device path cannot be determined.";
-    Cleanup(ErrorCode::kFilesystemVerifierError);
-    return;
-  }
-
   LOG(INFO) << "Hashing partition " << partition_index_ << " ("
             << partition.name << ") on device " << part_path;
-
-  brillo::ErrorPtr error;
-  src_stream_ =
-      brillo::FileStream::Open(base::FilePath(part_path),
-                               brillo::Stream::AccessMode::READ,
-                               brillo::FileStream::Disposition::OPEN_EXISTING,
-                               &error);
-
-  if (!src_stream_) {
-    LOG(ERROR) << "Unable to open " << part_path << " for reading";
+  auto success = false;
+  if (dynamic_control_->GetVirtualAbCompressionFeatureFlag().IsEnabled() &&
+      dynamic_control_->IsDynamicPartition(partition.name) &&
+      verifier_step_ == VerifierStep::kVerifyTargetHash) {
+    success = InitializeFdVABC();
+  } else {
+    if (part_path.empty()) {
+      if (partition_size_ == 0) {
+        LOG(INFO) << "Skip hashing partition " << partition_index_ << " ("
+                  << partition.name << ") because size is 0.";
+        partition_index_++;
+        StartPartitionHashing();
+        return;
+      }
+      LOG(ERROR) << "Cannot hash partition " << partition_index_ << " ("
+                 << partition.name
+                 << ") because its device path cannot be determined.";
+      Cleanup(ErrorCode::kFilesystemVerifierError);
+      return;
+    }
+    success = InitializeFd(part_path);
+  }
+  if (!success) {
     Cleanup(ErrorCode::kFilesystemVerifierError);
     return;
   }
-
   buffer_.resize(kReadFileBufferSize);
   hasher_ = std::make_unique<HashCalculator>();
 
   offset_ = 0;
-  if (verifier_step_ == VerifierStep::kVerifyTargetHash &&
-      install_plan_.write_verity) {
-    if (!verity_writer_->Init(partition)) {
+  if (ShouldWriteVerity()) {
+    if (!verity_writer_->Init(partition, read_fd_, write_fd_)) {
       Cleanup(ErrorCode::kVerityCalculationError);
       return;
     }
@@ -169,6 +207,14 @@
   ScheduleRead();
 }
 
+bool FilesystemVerifierAction::ShouldWriteVerity() {
+  const InstallPlan::Partition& partition =
+      install_plan_.partitions[partition_index_];
+  return verifier_step_ == VerifierStep::kVerifyTargetHash &&
+         install_plan_.write_verity &&
+         (partition.hash_tree_size > 0 || partition.fec_size > 0);
+}
+
 void FilesystemVerifierAction::ScheduleRead() {
   const InstallPlan::Partition& partition =
       install_plan_.partitions[partition_index_];
@@ -190,22 +236,21 @@
     return;
   }
 
-  bool read_async_ok = src_stream_->ReadAsync(
-      buffer_.data(),
-      bytes_to_read,
-      base::Bind(&FilesystemVerifierAction::OnReadDoneCallback,
-                 base::Unretained(this)),
-      base::Bind(&FilesystemVerifierAction::OnReadErrorCallback,
-                 base::Unretained(this)),
-      nullptr);
-
-  if (!read_async_ok) {
+  auto bytes_read = read_fd_->Read(buffer_.data(), bytes_to_read);
+  if (bytes_read < 0) {
     LOG(ERROR) << "Unable to schedule an asynchronous read from the stream.";
     Cleanup(ErrorCode::kError);
+  } else {
+    // We could just invoke |OnReadDoneCallback()|, it works. But |PostTask|
+    // is used so that users can cancel updates.
+    pending_task_id_ = brillo::MessageLoop::current()->PostTask(
+        base::Bind(&FilesystemVerifierAction::OnReadDone,
+                   base::Unretained(this),
+                   bytes_read));
   }
 }
 
-void FilesystemVerifierAction::OnReadDoneCallback(size_t bytes_read) {
+void FilesystemVerifierAction::OnReadDone(size_t bytes_read) {
   if (cancelled_) {
     Cleanup(ErrorCode::kError);
     return;
@@ -231,8 +276,7 @@
   UpdateProgress(
       (static_cast<double>(offset_) / partition_size_ + partition_index_) /
       install_plan_.partitions.size());
-  if (verifier_step_ == VerifierStep::kVerifyTargetHash &&
-      install_plan_.write_verity) {
+  if (ShouldWriteVerity()) {
     if (!verity_writer_->Update(offset_, buffer_.data(), bytes_read)) {
       Cleanup(ErrorCode::kVerityCalculationError);
       return;
@@ -249,12 +293,6 @@
   ScheduleRead();
 }
 
-void FilesystemVerifierAction::OnReadErrorCallback(const brillo::Error* error) {
-  // TODO(deymo): Transform the read-error into an specific ErrorCode.
-  LOG(ERROR) << "Asynchronous read failed.";
-  Cleanup(ErrorCode::kError);
-}
-
 void FilesystemVerifierAction::FinishPartitionHashing() {
   if (!hasher_->Finalize()) {
     LOG(ERROR) << "Unable to finalize the hash.";
@@ -278,8 +316,8 @@
         }
         // If we have not verified source partition yet, now that the target
         // partition does not match, and it's not a full payload, we need to
-        // switch to kVerifySourceHash step to check if it's because the source
-        // partition does not match either.
+        // switch to kVerifySourceHash step to check if it's because the
+        // source partition does not match either.
         verifier_step_ = VerifierStep::kVerifySourceHash;
       } else {
         partition_index_++;
@@ -315,17 +353,22 @@
       }
       // The action will skip kVerifySourceHash step if target partition hash
       // matches, if we are in this step, it means target hash does not match,
-      // and now that the source partition hash matches, we should set the error
-      // code to reflect the error in target partition.
-      // We only need to verify the source partition which the target hash does
-      // not match, the rest of the partitions don't matter.
+      // and now that the source partition hash matches, we should set the
+      // error code to reflect the error in target partition. We only need to
+      // verify the source partition which the target hash does not match, the
+      // rest of the partitions don't matter.
       Cleanup(ErrorCode::kNewRootfsVerificationError);
       return;
   }
   // Start hashing the next partition, if any.
   hasher_.reset();
   buffer_.clear();
-  src_stream_->CloseBlocking(nullptr);
+  if (read_fd_) {
+    read_fd_.reset();
+  }
+  if (write_fd_) {
+    write_fd_.reset();
+  }
   StartPartitionHashing();
 }