Use FileDescriptorPtr to implement async reads in verify stage

During FileSystemVerify stage, update_engine needs to read from source
or target partition to verify hashes && write verity. Previously we use
brillow's file stream to implement async reads. WIth Virtual AB
Compression, reading from target partition must go through libsnapshot's
interface(FileDescriptorPtr). So we replace brillo::FileStream with
FileDescriptorPtr for ease of integrating with VABC.

Test: serve an OTA update, verify: slot switch resume, regular resume

Change-Id: Id8531757468f60e3e21667b7761b83f7c2af2dbf
diff --git a/payload_consumer/delta_performer.cc b/payload_consumer/delta_performer.cc
index 30bd1ef..ada1372 100644
--- a/payload_consumer/delta_performer.cc
+++ b/payload_consumer/delta_performer.cc
@@ -489,6 +489,9 @@
     // We know there are more operations to perform because we didn't reach the
     // |num_total_operations_| limit yet.
     if (next_operation_num_ >= acc_num_operations_[current_partition_]) {
+      if (partition_writer_) {
+        TEST_AND_RETURN_FALSE(partition_writer_->FinishedInstallOps());
+      }
       CloseCurrentPartition();
       // Skip until there are operations for current_partition_.
       while (next_operation_num_ >= acc_num_operations_[current_partition_]) {
diff --git a/payload_consumer/file_descriptor.cc b/payload_consumer/file_descriptor.cc
index 6101c68..5b27c38 100644
--- a/payload_consumer/file_descriptor.cc
+++ b/payload_consumer/file_descriptor.cc
@@ -29,6 +29,12 @@
 
 namespace chromeos_update_engine {
 
+EintrSafeFileDescriptor::~EintrSafeFileDescriptor() {
+  if (IsOpen()) {
+    Close();
+  }
+}
+
 bool EintrSafeFileDescriptor::Open(const char* path, int flags, mode_t mode) {
   CHECK_EQ(fd_, -1);
   return ((fd_ = HANDLE_EINTR(open(path, flags, mode))) >= 0);
diff --git a/payload_consumer/file_descriptor.h b/payload_consumer/file_descriptor.h
index fb07ff0..faebcc1 100644
--- a/payload_consumer/file_descriptor.h
+++ b/payload_consumer/file_descriptor.h
@@ -111,6 +111,7 @@
 class EintrSafeFileDescriptor : public FileDescriptor {
  public:
   EintrSafeFileDescriptor() : fd_(-1) {}
+  ~EintrSafeFileDescriptor();
 
   // Interface methods.
   bool Open(const char* path, int flags, mode_t mode) override;
diff --git a/payload_consumer/filesystem_verifier_action.cc b/payload_consumer/filesystem_verifier_action.cc
index 61917ea..634f03f 100644
--- a/payload_consumer/filesystem_verifier_action.cc
+++ b/payload_consumer/filesystem_verifier_action.cc
@@ -20,17 +20,22 @@
 #include <fcntl.h>
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <unistd.h>
 
 #include <algorithm>
 #include <cstdlib>
+#include <memory>
 #include <string>
+#include <utility>
 
 #include <base/bind.h>
-#include <brillo/data_encoding.h>
-#include <brillo/streams/file_stream.h>
 #include <base/strings/string_util.h>
+#include <brillo/data_encoding.h>
+#include <brillo/message_loops/message_loop.h>
+#include <brillo/streams/file_stream.h>
 
 #include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
 
 using brillo::data_encoding::Base64Encode;
 using std::string;
@@ -59,18 +64,19 @@
     return;
   }
   install_plan_.Dump();
-
   StartPartitionHashing();
   abort_action_completer.set_should_complete(false);
 }
 
 void FilesystemVerifierAction::TerminateProcessing() {
+  brillo::MessageLoop::current()->CancelTask(pending_task_id_);
   cancelled_ = true;
   Cleanup(ErrorCode::kSuccess);  // error code is ignored if canceled_ is true.
 }
 
 void FilesystemVerifierAction::Cleanup(ErrorCode code) {
-  src_stream_.reset();
+  read_fd_.reset();
+  write_fd_.reset();
   // This memory is not used anymore.
   buffer_.clear();
 
@@ -88,6 +94,43 @@
   }
 }
 
+bool FilesystemVerifierAction::InitializeFdVABC() {
+  const InstallPlan::Partition& partition =
+      install_plan_.partitions[partition_index_];
+
+  read_fd_ = dynamic_control_->OpenCowReader(
+      partition.name, partition.source_path, true);
+  if (!read_fd_) {
+    LOG(ERROR) << "OpenCowReader(" << partition.name << ", "
+               << partition.source_path << ") failed.";
+    return false;
+  }
+  partition_size_ = partition.target_size;
+  // TODO(b/173432386): Support Verity writes for VABC.
+  CHECK_EQ(partition.fec_size, 0U);
+  CHECK_EQ(partition.hash_tree_size, 0U);
+  return true;
+}
+
+bool FilesystemVerifierAction::InitializeFd(const std::string& part_path) {
+  read_fd_ = FileDescriptorPtr(new EintrSafeFileDescriptor());
+  if (!read_fd_->Open(part_path.c_str(), O_RDONLY)) {
+    LOG(ERROR) << "Unable to open " << part_path << " for reading.";
+    return false;
+  }
+
+  // Can't re-use |read_fd_|, as verity writer may call `seek` to modify state
+  // of a file descriptor.
+  if (ShouldWriteVerity()) {
+    write_fd_ = FileDescriptorPtr(new EintrSafeFileDescriptor());
+    if (!write_fd_->Open(part_path.c_str(), O_RDWR)) {
+      LOG(ERROR) << "Unable to open " << part_path << " for Read/Write.";
+      return false;
+    }
+  }
+  return true;
+}
+
 void FilesystemVerifierAction::StartPartitionHashing() {
   if (partition_index_ == install_plan_.partitions.size()) {
     if (!install_plan_.untouched_dynamic_partitions.empty()) {
@@ -109,7 +152,6 @@
   }
   const InstallPlan::Partition& partition =
       install_plan_.partitions[partition_index_];
-
   string part_path;
   switch (verifier_step_) {
     case VerifierStep::kVerifySourceHash:
@@ -122,44 +164,40 @@
       break;
   }
 
-  if (part_path.empty()) {
-    if (partition_size_ == 0) {
-      LOG(INFO) << "Skip hashing partition " << partition_index_ << " ("
-                << partition.name << ") because size is 0.";
-      partition_index_++;
-      StartPartitionHashing();
-      return;
-    }
-    LOG(ERROR) << "Cannot hash partition " << partition_index_ << " ("
-               << partition.name
-               << ") because its device path cannot be determined.";
-    Cleanup(ErrorCode::kFilesystemVerifierError);
-    return;
-  }
-
   LOG(INFO) << "Hashing partition " << partition_index_ << " ("
             << partition.name << ") on device " << part_path;
-
-  brillo::ErrorPtr error;
-  src_stream_ =
-      brillo::FileStream::Open(base::FilePath(part_path),
-                               brillo::Stream::AccessMode::READ,
-                               brillo::FileStream::Disposition::OPEN_EXISTING,
-                               &error);
-
-  if (!src_stream_) {
-    LOG(ERROR) << "Unable to open " << part_path << " for reading";
+  auto success = false;
+  if (dynamic_control_->GetVirtualAbCompressionFeatureFlag().IsEnabled() &&
+      dynamic_control_->IsDynamicPartition(partition.name) &&
+      verifier_step_ == VerifierStep::kVerifyTargetHash) {
+    success = InitializeFdVABC();
+  } else {
+    if (part_path.empty()) {
+      if (partition_size_ == 0) {
+        LOG(INFO) << "Skip hashing partition " << partition_index_ << " ("
+                  << partition.name << ") because size is 0.";
+        partition_index_++;
+        StartPartitionHashing();
+        return;
+      }
+      LOG(ERROR) << "Cannot hash partition " << partition_index_ << " ("
+                 << partition.name
+                 << ") because its device path cannot be determined.";
+      Cleanup(ErrorCode::kFilesystemVerifierError);
+      return;
+    }
+    success = InitializeFd(part_path);
+  }
+  if (!success) {
     Cleanup(ErrorCode::kFilesystemVerifierError);
     return;
   }
-
   buffer_.resize(kReadFileBufferSize);
   hasher_ = std::make_unique<HashCalculator>();
 
   offset_ = 0;
-  if (verifier_step_ == VerifierStep::kVerifyTargetHash &&
-      install_plan_.write_verity) {
-    if (!verity_writer_->Init(partition)) {
+  if (ShouldWriteVerity()) {
+    if (!verity_writer_->Init(partition, read_fd_, write_fd_)) {
       Cleanup(ErrorCode::kVerityCalculationError);
       return;
     }
@@ -169,6 +207,14 @@
   ScheduleRead();
 }
 
+bool FilesystemVerifierAction::ShouldWriteVerity() {
+  const InstallPlan::Partition& partition =
+      install_plan_.partitions[partition_index_];
+  return verifier_step_ == VerifierStep::kVerifyTargetHash &&
+         install_plan_.write_verity &&
+         (partition.hash_tree_size > 0 || partition.fec_size > 0);
+}
+
 void FilesystemVerifierAction::ScheduleRead() {
   const InstallPlan::Partition& partition =
       install_plan_.partitions[partition_index_];
@@ -190,22 +236,21 @@
     return;
   }
 
-  bool read_async_ok = src_stream_->ReadAsync(
-      buffer_.data(),
-      bytes_to_read,
-      base::Bind(&FilesystemVerifierAction::OnReadDoneCallback,
-                 base::Unretained(this)),
-      base::Bind(&FilesystemVerifierAction::OnReadErrorCallback,
-                 base::Unretained(this)),
-      nullptr);
-
-  if (!read_async_ok) {
+  auto bytes_read = read_fd_->Read(buffer_.data(), bytes_to_read);
+  if (bytes_read < 0) {
     LOG(ERROR) << "Unable to schedule an asynchronous read from the stream.";
     Cleanup(ErrorCode::kError);
+  } else {
+    // We could just invoke |OnReadDoneCallback()|, it works. But |PostTask|
+    // is used so that users can cancel updates.
+    pending_task_id_ = brillo::MessageLoop::current()->PostTask(
+        base::Bind(&FilesystemVerifierAction::OnReadDone,
+                   base::Unretained(this),
+                   bytes_read));
   }
 }
 
-void FilesystemVerifierAction::OnReadDoneCallback(size_t bytes_read) {
+void FilesystemVerifierAction::OnReadDone(size_t bytes_read) {
   if (cancelled_) {
     Cleanup(ErrorCode::kError);
     return;
@@ -231,8 +276,7 @@
   UpdateProgress(
       (static_cast<double>(offset_) / partition_size_ + partition_index_) /
       install_plan_.partitions.size());
-  if (verifier_step_ == VerifierStep::kVerifyTargetHash &&
-      install_plan_.write_verity) {
+  if (ShouldWriteVerity()) {
     if (!verity_writer_->Update(offset_, buffer_.data(), bytes_read)) {
       Cleanup(ErrorCode::kVerityCalculationError);
       return;
@@ -249,12 +293,6 @@
   ScheduleRead();
 }
 
-void FilesystemVerifierAction::OnReadErrorCallback(const brillo::Error* error) {
-  // TODO(deymo): Transform the read-error into an specific ErrorCode.
-  LOG(ERROR) << "Asynchronous read failed.";
-  Cleanup(ErrorCode::kError);
-}
-
 void FilesystemVerifierAction::FinishPartitionHashing() {
   if (!hasher_->Finalize()) {
     LOG(ERROR) << "Unable to finalize the hash.";
@@ -278,8 +316,8 @@
         }
         // If we have not verified source partition yet, now that the target
         // partition does not match, and it's not a full payload, we need to
-        // switch to kVerifySourceHash step to check if it's because the source
-        // partition does not match either.
+        // switch to kVerifySourceHash step to check if it's because the
+        // source partition does not match either.
         verifier_step_ = VerifierStep::kVerifySourceHash;
       } else {
         partition_index_++;
@@ -315,17 +353,22 @@
       }
       // The action will skip kVerifySourceHash step if target partition hash
       // matches, if we are in this step, it means target hash does not match,
-      // and now that the source partition hash matches, we should set the error
-      // code to reflect the error in target partition.
-      // We only need to verify the source partition which the target hash does
-      // not match, the rest of the partitions don't matter.
+      // and now that the source partition hash matches, we should set the
+      // error code to reflect the error in target partition. We only need to
+      // verify the source partition which the target hash does not match, the
+      // rest of the partitions don't matter.
       Cleanup(ErrorCode::kNewRootfsVerificationError);
       return;
   }
   // Start hashing the next partition, if any.
   hasher_.reset();
   buffer_.clear();
-  src_stream_->CloseBlocking(nullptr);
+  if (read_fd_) {
+    read_fd_.reset();
+  }
+  if (write_fd_) {
+    write_fd_.reset();
+  }
   StartPartitionHashing();
 }
 
diff --git a/payload_consumer/filesystem_verifier_action.h b/payload_consumer/filesystem_verifier_action.h
index 6a8823a..b6df4b8 100644
--- a/payload_consumer/filesystem_verifier_action.h
+++ b/payload_consumer/filesystem_verifier_action.h
@@ -24,10 +24,11 @@
 #include <string>
 #include <vector>
 
-#include <brillo/streams/stream.h>
+#include <brillo/message_loops/message_loop.h>
 
 #include "update_engine/common/action.h"
 #include "update_engine/common/hash_calculator.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
 #include "update_engine/payload_consumer/install_plan.h"
 #include "update_engine/payload_consumer/verity_writer_interface.h"
 
@@ -83,6 +84,9 @@
 
  private:
   friend class FilesystemVerifierActionTestDelegate;
+
+  // Return true if we need to write verity bytes.
+  bool ShouldWriteVerity();
   // Starts the hashing of the current partition. If there aren't any partitions
   // remaining to be hashed, it finishes the action.
   void StartPartitionHashing();
@@ -92,8 +96,7 @@
 
   // Called from the main loop when a single read from |src_stream_| succeeds or
   // fails, calling OnReadDoneCallback() and OnReadErrorCallback() respectively.
-  void OnReadDoneCallback(size_t bytes_read);
-  void OnReadErrorCallback(const brillo::Error* error);
+  void OnReadDone(size_t bytes_read);
 
   // When the read is done, finalize the hash checking of the current partition
   // and continue checking the next one.
@@ -107,6 +110,10 @@
   // Invoke delegate callback to report progress, if delegate is not null
   void UpdateProgress(double progress);
 
+  // Initialize read_fd_ and write_fd_
+  bool InitializeFd(const std::string& part_path);
+  bool InitializeFdVABC();
+
   // The type of the partition that we are verifying.
   VerifierStep verifier_step_ = VerifierStep::kVerifyTargetHash;
 
@@ -114,8 +121,15 @@
   // being hashed.
   size_t partition_index_{0};
 
-  // If not null, the FileStream used to read from the device.
-  brillo::StreamPtr src_stream_;
+  // If not null, the FileDescriptor used to read from the device.
+  // |read_fd_| and |write_fd_| will be initialized when we begin hashing a
+  // partition. They will be deallocated once we encounter an error or
+  // successfully finished hashing.
+  FileDescriptorPtr read_fd_;
+  // If not null, the FileDescriptor used to write to the device.
+  // For VABC, this will be different from |read_fd_|. For other cases
+  // this can be the same as |read_fd_|.
+  FileDescriptorPtr write_fd_;
 
   // Buffer for storing data we read.
   brillo::Blob buffer_;
@@ -144,6 +158,11 @@
   // An observer that observes progress updates of this action.
   FilesystemVerifyDelegate* delegate_{};
 
+  // Callback that should be cancelled on |TerminateProcessing|. Usually this
+  // points to pending read callbacks from async stream.
+  brillo::MessageLoop::TaskId pending_task_id_{
+      brillo::MessageLoop::kTaskIdNull};
+
   DISALLOW_COPY_AND_ASSIGN(FilesystemVerifierAction);
 };
 
diff --git a/payload_consumer/filesystem_verifier_action_unittest.cc b/payload_consumer/filesystem_verifier_action_unittest.cc
index 2c29b44..925fdab 100644
--- a/payload_consumer/filesystem_verifier_action_unittest.cc
+++ b/payload_consumer/filesystem_verifier_action_unittest.cc
@@ -72,7 +72,7 @@
     if (action->Type() == FilesystemVerifierAction::StaticType()) {
       ran_ = true;
       code_ = code;
-      EXPECT_FALSE(static_cast<FilesystemVerifierAction*>(action)->src_stream_);
+      EXPECT_FALSE(static_cast<FilesystemVerifierAction*>(action)->read_fd_);
     } else if (action->Type() ==
                ObjectCollectorAction<InstallPlan>::StaticType()) {
       auto collector_action =
@@ -384,4 +384,5 @@
   EXPECT_TRUE(delegate.ran());
   EXPECT_EQ(ErrorCode::kSuccess, delegate.code());
 }
+
 }  // namespace chromeos_update_engine
diff --git a/payload_consumer/partition_writer.h b/payload_consumer/partition_writer.h
index 1acbddc..1c8ffbd 100644
--- a/payload_consumer/partition_writer.h
+++ b/payload_consumer/partition_writer.h
@@ -48,6 +48,9 @@
   [[nodiscard]] virtual bool Init(const InstallPlan* install_plan,
                                   bool source_may_exist);
 
+  // Close partition writer, when calling this function there's no guarantee
+  // that all |InstallOperations| are sent to |PartitionWriter|. This function
+  // will be called even if we are pausing/aborting the update.
   int Close();
 
   // These perform a specific type of operation and return true on success.
@@ -72,6 +75,11 @@
       size_t count);
   [[nodiscard]] virtual bool Flush();
 
+  // |DeltaPerformer| calls this when all Install Ops are sent to partition
+  // writer. No |Perform*Operation| methods will be called in the future, and
+  // the partition writer is expected to be closed soon.
+  [[nodiscard]] virtual bool FinishedInstallOps() { return true; }
+
  protected:
   friend class PartitionWriterTest;
   FRIEND_TEST(PartitionWriterTest, ChooseSourceFDTest);
diff --git a/payload_consumer/vabc_partition_writer.cc b/payload_consumer/vabc_partition_writer.cc
index d95103b..980f2ca 100644
--- a/payload_consumer/vabc_partition_writer.cc
+++ b/payload_consumer/vabc_partition_writer.cc
@@ -111,6 +111,12 @@
   return true;
 }
 
+[[nodiscard]] bool VABCPartitionWriter::FinishedInstallOps() {
+  // Add a hardcoded magic label to indicate end of all install ops. This label
+  // is needed by filesystem verification, don't remove.
+  return cow_writer_->AddLabel(kEndOfInstallLabel);
+}
+
 VABCPartitionWriter::~VABCPartitionWriter() {
   cow_writer_->Finalize();
 }
diff --git a/payload_consumer/vabc_partition_writer.h b/payload_consumer/vabc_partition_writer.h
index 7657cb4..5eab23f 100644
--- a/payload_consumer/vabc_partition_writer.h
+++ b/payload_consumer/vabc_partition_writer.h
@@ -50,6 +50,8 @@
                              android::snapshot::ICowWriter* cow_writer,
                              FileDescriptorPtr source_fd);
 
+  [[nodiscard]] bool FinishedInstallOps() override;
+
  private:
   std::unique_ptr<android::snapshot::ISnapshotWriter> cow_writer_;
 };