Write verity first, then do fs verification

Old behavior:
   Read partition, for each block:
    Update hasher
    Update verity writer
   before reading hashtree/verity:
    write hashtree/verity to disk
   Read the last verity blocks.
   Finalize hasher, verity hashes.

The old bahvior tries to minimize fs read by only read once and feed
data to hasher and verity writer. However, in VABC, reading/writing are
handled very differently. Read can be done via regular fd, but writes
must go through special COW API. As we have seen in b/186196758, using
COW API in filesystem hashing can lead to inconsistent read and boot
failure. Therefore, we've decided to write verity first using COW API,
then read/hash partition using regular fd. This does mean that we need
to read everything twice, but we think this is a worth while tradeoff.
As verity writes can take 5 minutes, but reading the entire partition
again only takes <10 seconds.

New behavior:
    Read partition, for each block:
      Update verity writer
    Finalize verity writer, write verity to disk
    launch snapuserd, open a regular fd.
    Read partition, for each block:
      Update hasher
    Finaliaze hasher, verity hashes.

Test: th
Test: Manual testing on pixel of the following scenario:
        1. Verity enabled, VABC enabled, pause/resume multiple times
        2. Verity disabled, VABC enabled, pause/resume multiple times
        3. Verity Enabled, VABC enabled, pause/resume multiple times

Bug: 186196758
Change-Id: I2477c2dc4da5b921e84b48a54d0d8a877c1a52ef
diff --git a/common/scoped_task_id.h b/common/scoped_task_id.h
new file mode 100644
index 0000000..5c89935
--- /dev/null
+++ b/common/scoped_task_id.h
@@ -0,0 +1,121 @@
+//
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef UPDATE_ENGINE_SCOPED_TASK_ID_H_
+#define UPDATE_ENGINE_SCOPED_TASK_ID_H_
+
+#include <type_traits>
+#include <utility>
+
+#include <base/bind.h>
+#include <brillo/message_loops/message_loop.h>
+
+namespace chromeos_update_engine {
+
+// This class provides unique_ptr like semantic for |MessageLoop::TaskId|, when
+// instance of this class goes out of scope, underlying task will be cancelled.
+class ScopedTaskId {
+  using MessageLoop = brillo::MessageLoop;
+
+ public:
+  constexpr ScopedTaskId() = default;
+
+  constexpr ScopedTaskId(ScopedTaskId&& other) noexcept {
+    *this = std::move(other);
+  }
+
+  constexpr ScopedTaskId& operator=(ScopedTaskId&& other) noexcept {
+    std::swap(task_id_, other.task_id_);
+    return *this;
+  }
+
+  // Post a callback on current message loop, return true if succeeded, false if
+  // the previous callback hasn't run yet, or scheduling failed at MessageLoop
+  // side.
+  [[nodiscard]] bool PostTask(const base::Location& from_here,
+                              base::OnceClosure&& callback,
+                              base::TimeDelta delay = {}) noexcept {
+    return PostTask<decltype(callback)>(from_here, std::move(callback), delay);
+  }
+  [[nodiscard]] bool PostTask(const base::Location& from_here,
+                              std::function<void()>&& callback,
+                              base::TimeDelta delay = {}) noexcept {
+    return PostTask<decltype(callback)>(from_here, std::move(callback), delay);
+  }
+
+  ~ScopedTaskId() noexcept { Cancel(); }
+
+  // Cancel the underlying managed task, true if cancel successful. False if no
+  // task scheduled or task cancellation failed
+  bool Cancel() noexcept {
+    if (task_id_ != MessageLoop::kTaskIdNull) {
+      if (MessageLoop::current()->CancelTask(task_id_)) {
+        LOG(INFO) << "Cancelled task id " << task_id_;
+        task_id_ = MessageLoop::kTaskIdNull;
+        return true;
+      }
+    }
+    return false;
+  }
+
+  [[nodiscard]] constexpr bool IsScheduled() const noexcept {
+    return task_id_ != MessageLoop::kTaskIdNull;
+  }
+
+  [[nodiscard]] constexpr bool operator==(const ScopedTaskId& other) const
+      noexcept {
+    return other.task_id_ == task_id_;
+  }
+
+  [[nodiscard]] constexpr bool operator<(const ScopedTaskId& other) const
+      noexcept {
+    return task_id_ < other.task_id_;
+  }
+
+ private:
+  ScopedTaskId(const ScopedTaskId&) = delete;
+  ScopedTaskId& operator=(const ScopedTaskId&) = delete;
+  template <typename Callable>
+  [[nodiscard]] bool PostTask(const base::Location& from_here,
+                              Callable&& callback,
+                              base::TimeDelta delay) noexcept {
+    if (task_id_ != MessageLoop::kTaskIdNull) {
+      LOG(ERROR) << "Scheduling another task but task id " << task_id_
+                 << " isn't executed yet! This can cause the old task to leak.";
+      return false;
+    }
+    task_id_ = MessageLoop::current()->PostDelayedTask(
+        from_here,
+        base::BindOnce(&ScopedTaskId::ExecuteTask<decltype(callback)>,
+                       base::Unretained(this),
+                       std::move(callback)),
+        delay);
+    return task_id_ != MessageLoop::kTaskIdNull;
+  }
+  template <typename Callable>
+  void ExecuteTask(Callable&& callback) {
+    task_id_ = MessageLoop::kTaskIdNull;
+    if constexpr (std::is_same_v<Callable&&, base::OnceClosure&&>) {
+      std::move(callback).Run();
+    } else {
+      std::move(callback)();
+    }
+  }
+  MessageLoop::TaskId task_id_{MessageLoop::kTaskIdNull};
+};
+}  // namespace chromeos_update_engine
+
+#endif