Merge changes I3c882c36,I930c668d,I29e2d51d into main

* changes:
  init: Fix a bug in the WaitToBeReaped() logging code
  init/host_init_verifier: Fix a compiler warning
  init: Fix a compiler warning
diff --git a/fs_mgr/libsnapshot/Android.bp b/fs_mgr/libsnapshot/Android.bp
index 6fad662..ac58ba0 100644
--- a/fs_mgr/libsnapshot/Android.bp
+++ b/fs_mgr/libsnapshot/Android.bp
@@ -198,6 +198,7 @@
         "libsnapshot_cow/cow_format.cpp",
         "libsnapshot_cow/cow_reader.cpp",
         "libsnapshot_cow/parser_v2.cpp",
+        "libsnapshot_cow/parser_v3.cpp",
         "libsnapshot_cow/snapshot_reader.cpp",
         "libsnapshot_cow/writer_base.cpp",
         "libsnapshot_cow/writer_v2.cpp",
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
index c9777a3..91e0425 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
@@ -99,8 +99,10 @@
     uint64_t sequence_buffer_offset;
     // Size, in bytes, of the CowResumePoint buffer.
     uint32_t resume_buffer_size;
-    // Size, in bytes, of the CowOperation buffer.
-    uint32_t op_buffer_size;
+    // Number of CowOperationV3 structs in the operation buffer, currently and total
+    // region size.
+    uint32_t op_count;
+    uint32_t op_count_max;
     // Compression Algorithm
     uint32_t compression_algorithm;
 } __attribute__((packed));
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
index debaf36..755a3af 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
@@ -145,6 +145,7 @@
                      size_t ignore_bytes = 0) override;
 
     CowHeader& GetHeader() override { return header_; }
+    const CowHeaderV3& header_v3() const { return header_; }
 
     bool GetRawBytes(const CowOperation* op, void* buffer, size_t len, size_t* read);
     bool GetRawBytes(uint64_t offset, void* buffer, size_t len, size_t* read);
@@ -185,7 +186,6 @@
     std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc_;
     ReaderFlags reader_flag_;
     bool is_merge_{};
-    uint8_t compression_type_ = kCowCompressNone;
 };
 
 // Though this function takes in a CowHeaderV3, the struct could be populated as a v1/v2 CowHeader.
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
index 3016e93..5b1e56c 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
@@ -54,6 +54,9 @@
 
     // Batch write cluster ops
     bool batch_write = false;
+
+    // Size of the cow operation buffer; used in v3 only.
+    uint32_t op_count_max = 0;
 };
 
 // Interface for writing to a snapuserd COW. All operations are ordered; merges
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
index 3b84c95..c20194d 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
@@ -28,8 +28,8 @@
 #include <zlib.h>
 
 #include "cow_decompress.h"
-#include "libsnapshot/cow_format.h"
 #include "parser_v2.h"
+#include "parser_v3.h"
 
 namespace android {
 namespace snapshot {
@@ -85,7 +85,6 @@
     cow->data_loc_ = data_loc_;
     cow->block_pos_index_ = block_pos_index_;
     cow->is_merge_ = is_merge_;
-    cow->compression_type_ = compression_type_;
     return cow;
 }
 
@@ -104,11 +103,14 @@
         PLOG(ERROR) << "lseek header failed";
         return false;
     }
-    if (!android::base::ReadFully(fd_, &header_, sizeof(header_))) {
+
+    CHECK_GE(header_.prefix.header_size, sizeof(CowHeader));
+    CHECK_LE(header_.prefix.header_size, sizeof(header_));
+
+    if (!android::base::ReadFully(fd_, &header_, header_.prefix.header_size)) {
         PLOG(ERROR) << "read header failed";
         return false;
     }
-
     return true;
 }
 
@@ -124,52 +126,36 @@
         return false;
     }
 
-    CowParserV2 parser;
-    if (!parser.Parse(fd, header_, label)) {
+    std::unique_ptr<CowParserBase> parser;
+    switch (header_.prefix.major_version) {
+        case 1:
+        case 2:
+            parser = std::make_unique<CowParserV2>();
+            break;
+        case 3:
+            parser = std::make_unique<CowParserV3>();
+            break;
+        default:
+            LOG(ERROR) << "Unknown version: " << header_.prefix.major_version;
+            return false;
+    }
+
+    if (!parser->Parse(fd, header_, label)) {
         return false;
     }
 
-    footer_ = parser.footer();
-    fd_size_ = parser.fd_size();
-    last_label_ = parser.last_label();
-    data_loc_ = parser.data_loc();
-    ops_ = std::make_shared<std::vector<CowOperation>>(parser.ops()->size());
-
-    // Translate the operation buffer from on disk to in memory
-    for (size_t i = 0; i < parser.ops()->size(); i++) {
-        const auto& v2_op = parser.ops()->at(i);
-
-        auto& new_op = ops_->at(i);
-        new_op.type = v2_op.type;
-        new_op.data_length = v2_op.data_length;
-
-        if (v2_op.new_block > std::numeric_limits<uint32_t>::max()) {
-            LOG(ERROR) << "Out-of-range new block in COW op: " << v2_op;
-            return false;
-        }
-        new_op.new_block = v2_op.new_block;
-
-        uint64_t source_info = v2_op.source;
-        if (new_op.type != kCowLabelOp) {
-            source_info &= kCowOpSourceInfoDataMask;
-            if (source_info != v2_op.source) {
-                LOG(ERROR) << "Out-of-range source value in COW op: " << v2_op;
-                return false;
-            }
-        }
-        if (v2_op.compression != kCowCompressNone) {
-            if (compression_type_ == kCowCompressNone) {
-                compression_type_ = v2_op.compression;
-            } else if (compression_type_ != v2_op.compression) {
-                LOG(ERROR) << "COW has mixed compression types which is not supported;"
-                           << " previously saw " << compression_type_ << ", got "
-                           << v2_op.compression << ", op: " << v2_op;
-                return false;
-            }
-        }
-        new_op.source_info = source_info;
+    TranslatedCowOps ops_info;
+    if (!parser->Translate(&ops_info)) {
+        return false;
     }
 
+    header_ = ops_info.header;
+    ops_ = std::move(ops_info.ops);
+    footer_ = parser->footer();
+    fd_size_ = parser->fd_size();
+    last_label_ = parser->last_label();
+    data_loc_ = parser->data_loc();
+
     // If we're resuming a write, we're not ready to merge
     if (label.has_value()) return true;
     return PrepMergeOps();
@@ -664,7 +650,7 @@
 };
 
 uint8_t CowReader::GetCompressionType() {
-    return compression_type_;
+    return header_.compression_algorithm;
 }
 
 ssize_t CowReader::ReadData(const CowOperation* op, void* buffer, size_t buffer_size,
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp
index a291469..76509de 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp
@@ -74,13 +74,13 @@
     }
 }
 
-static bool ShowRawOpStreamV2(borrowed_fd fd, const CowHeader& header) {
+static bool ShowRawOpStreamV2(borrowed_fd fd, const CowHeaderV3& header) {
     CowParserV2 parser;
     if (!parser.Parse(fd, header)) {
         LOG(ERROR) << "v2 parser failed";
         return false;
     }
-    for (const auto& op : *parser.ops()) {
+    for (const auto& op : *parser.get_v2ops()) {
         std::cout << op << "\n";
         if (auto iter = parser.data_loc()->find(op.new_block); iter != parser.data_loc()->end()) {
             std::cout << "    data loc: " << iter->second << "\n";
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/parser_base.h b/fs_mgr/libsnapshot/libsnapshot_cow/parser_base.h
new file mode 100644
index 0000000..2783407
--- /dev/null
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/parser_base.h
@@ -0,0 +1,53 @@
+//
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+
+#include <optional>
+#include <unordered_map>
+
+#include <android-base/unique_fd.h>
+#include <libsnapshot/cow_format.h>
+
+namespace android {
+namespace snapshot {
+
+struct TranslatedCowOps {
+    CowHeaderV3 header;
+    std::shared_ptr<std::vector<CowOperationV3>> ops;
+};
+
+class CowParserBase {
+  public:
+    virtual ~CowParserBase() = default;
+
+    virtual bool Parse(android::base::borrowed_fd fd, const CowHeaderV3& header,
+                       std::optional<uint64_t> label = {}) = 0;
+    virtual bool Translate(TranslatedCowOps* out) = 0;
+    virtual std::optional<CowFooter> footer() const { return std::nullopt; }
+    virtual std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc() const = 0;
+
+    uint64_t fd_size() const { return fd_size_; }
+    const std::optional<uint64_t>& last_label() const { return last_label_; }
+
+  protected:
+    CowHeaderV3 header_ = {};
+    uint64_t fd_size_;
+    std::optional<uint64_t> last_label_;
+};
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp
index 8f20317..51489c8 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp
@@ -18,12 +18,14 @@
 #include <android-base/file.h>
 #include <android-base/logging.h>
 
+#include <libsnapshot/cow_format.h>
+
 namespace android {
 namespace snapshot {
 
 using android::base::borrowed_fd;
 
-bool CowParserV2::Parse(borrowed_fd fd, const CowHeader& header, std::optional<uint64_t> label) {
+bool CowParserV2::Parse(borrowed_fd fd, const CowHeaderV3& header, std::optional<uint64_t> label) {
     auto pos = lseek(fd.get(), 0, SEEK_END);
     if (pos < 0) {
         PLOG(ERROR) << "lseek end failed";
@@ -47,8 +49,7 @@
         return false;
     }
 
-    if ((header_.prefix.major_version > kCowVersionMajor) ||
-        (header_.prefix.minor_version != kCowVersionMinor)) {
+    if (header_.prefix.major_version > 2 || header_.prefix.minor_version != 0) {
         LOG(ERROR) << "Header version mismatch, "
                    << "major version: " << header_.prefix.major_version
                    << ", expected: " << kCowVersionMajor
@@ -190,11 +191,57 @@
         }
     }
 
-    ops_ = ops_buffer;
-    ops_->shrink_to_fit();
+    v2_ops_ = ops_buffer;
+    v2_ops_->shrink_to_fit();
     data_loc_ = data_loc;
     return true;
 }
 
+bool CowParserV2::Translate(TranslatedCowOps* out) {
+    out->ops = std::make_shared<std::vector<CowOperationV3>>(v2_ops_->size());
+
+    // Translate the operation buffer from on disk to in memory
+    for (size_t i = 0; i < out->ops->size(); i++) {
+        const auto& v2_op = v2_ops_->at(i);
+
+        auto& new_op = out->ops->at(i);
+        new_op.type = v2_op.type;
+        new_op.data_length = v2_op.data_length;
+
+        if (v2_op.new_block > std::numeric_limits<uint32_t>::max()) {
+            LOG(ERROR) << "Out-of-range new block in COW op: " << v2_op;
+            return false;
+        }
+        new_op.new_block = v2_op.new_block;
+
+        uint64_t source_info = v2_op.source;
+        if (new_op.type != kCowLabelOp) {
+            source_info &= kCowOpSourceInfoDataMask;
+            if (source_info != v2_op.source) {
+                LOG(ERROR) << "Out-of-range source value in COW op: " << v2_op;
+                return false;
+            }
+        }
+        if (v2_op.compression != kCowCompressNone) {
+            if (header_.compression_algorithm == kCowCompressNone) {
+                header_.compression_algorithm = v2_op.compression;
+            } else if (header_.compression_algorithm != v2_op.compression) {
+                LOG(ERROR) << "COW has mixed compression types which is not supported;"
+                           << " previously saw " << header_.compression_algorithm << ", got "
+                           << v2_op.compression << ", op: " << v2_op;
+                return false;
+            }
+        }
+        new_op.source_info = source_info;
+    }
+
+    out->header = header_;
+    return true;
+}
+
+std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> CowParserV2::data_loc() const {
+    return data_loc_;
+}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.h b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.h
index f51ff88..318a3ac 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.h
@@ -17,36 +17,31 @@
 
 #include <memory>
 #include <optional>
-#include <unordered_map>
 #include <vector>
 
 #include <android-base/unique_fd.h>
 #include <libsnapshot/cow_format.h>
+#include <libsnapshot_cow/parser_base.h>
 
 namespace android {
 namespace snapshot {
 
-class CowParserV2 {
+class CowParserV2 final : public CowParserBase {
   public:
-    bool Parse(android::base::borrowed_fd fd, const CowHeader& header,
-               std::optional<uint64_t> label = {});
+    bool Parse(android::base::borrowed_fd fd, const CowHeaderV3& header,
+               std::optional<uint64_t> label = {}) override;
+    bool Translate(TranslatedCowOps* out) override;
+    std::optional<CowFooter> footer() const override { return footer_; }
 
     const CowHeader& header() const { return header_; }
-    const std::optional<CowFooter>& footer() const { return footer_; }
-    std::shared_ptr<std::vector<CowOperationV2>> ops() { return ops_; }
-    std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc() const { return data_loc_; }
-    uint64_t fd_size() const { return fd_size_; }
-    const std::optional<uint64_t>& last_label() const { return last_label_; }
+    std::shared_ptr<std::vector<CowOperationV2>> get_v2ops() { return v2_ops_; }
+    std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc() const override;
 
   private:
-    bool ParseOps(android::base::borrowed_fd fd, std::optional<uint64_t> label);
-
-    CowHeader header_ = {};
-    std::optional<CowFooter> footer_;
-    std::shared_ptr<std::vector<CowOperationV2>> ops_;
     std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc_;
-    uint64_t fd_size_;
-    std::optional<uint64_t> last_label_;
+    bool ParseOps(android::base::borrowed_fd fd, std::optional<uint64_t> label);
+    std::shared_ptr<std::vector<CowOperationV2>> v2_ops_;
+    std::optional<CowFooter> footer_;
 };
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v3.cpp
new file mode 100644
index 0000000..23ddaae
--- /dev/null
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v3.cpp
@@ -0,0 +1,88 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#include "parser_v3.h"
+
+#include <android-base/file.h>
+#include <android-base/logging.h>
+
+#include <libsnapshot/cow_format.h>
+
+namespace android {
+namespace snapshot {
+
+using android::base::borrowed_fd;
+
+bool CowParserV3::Parse(borrowed_fd fd, const CowHeaderV3& header, std::optional<uint64_t> label) {
+    auto pos = lseek(fd.get(), 0, SEEK_END);
+    if (pos < 0) {
+        PLOG(ERROR) << "lseek end failed";
+        return false;
+    }
+    fd_size_ = pos;
+    header_ = header;
+
+    if (header_.footer_size != 0) {
+        LOG(ERROR) << "Footer size isn't 0, read " << header_.footer_size;
+        return false;
+    }
+    if (header_.op_size != sizeof(CowOperationV3)) {
+        LOG(ERROR) << "Operation size unknown, read " << header_.op_size << ", expected "
+                   << sizeof(CowOperationV3);
+        return false;
+    }
+    if (header_.cluster_ops != 0) {
+        LOG(ERROR) << "Cluster ops not supported in v3";
+        return false;
+    }
+
+    if (header_.prefix.major_version != 3 || header_.prefix.minor_version != 0) {
+        LOG(ERROR) << "Header version mismatch, "
+                   << "major version: " << header_.prefix.major_version
+                   << ", expected: " << kCowVersionMajor
+                   << ", minor version: " << header_.prefix.minor_version
+                   << ", expected: " << kCowVersionMinor;
+        return false;
+    }
+
+    return ParseOps(fd, label);
+}
+
+bool CowParserV3::ParseOps(borrowed_fd fd, std::optional<uint64_t> label) {
+    ops_ = std::make_shared<std::vector<CowOperationV3>>();
+    ops_->resize(header_.op_count);
+
+    const off_t offset = header_.prefix.header_size + header_.buffer_size;
+    if (!android::base::ReadFullyAtOffset(fd, ops_->data(), ops_->size() * sizeof(CowOperationV3),
+                                          offset)) {
+        PLOG(ERROR) << "read ops failed";
+        return false;
+    }
+
+    // :TODO: sequence buffer & resume buffer follow
+    // Once we implement labels, we'll have to discard unused ops and adjust
+    // the header as needed.
+    CHECK(!label);
+
+    ops_->shrink_to_fit();
+    return true;
+}
+
+bool CowParserV3::Translate(TranslatedCowOps* out) {
+    out->ops = ops_;
+    out->header = header_;
+    return true;
+}
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v3.h
new file mode 100644
index 0000000..5760cf2
--- /dev/null
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v3.h
@@ -0,0 +1,60 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#pragma once
+
+#include <stdint.h>
+
+#include <memory>
+#include <optional>
+#include <unordered_map>
+#include <vector>
+
+#include <android-base/unique_fd.h>
+#include <libsnapshot/cow_format.h>
+#include <libsnapshot_cow/parser_base.h>
+
+namespace android {
+namespace snapshot {
+
+class CowParserV3 final : public CowParserBase {
+  public:
+    bool Parse(android::base::borrowed_fd fd, const CowHeaderV3& header,
+               std::optional<uint64_t> label = {}) override;
+    bool Translate(TranslatedCowOps* out) override;
+    std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc() const override {
+        return nullptr;
+    };
+
+  private:
+    bool ParseOps(android::base::borrowed_fd fd, std::optional<uint64_t> label);
+
+    CowHeaderV3 header_ = {};
+    std::shared_ptr<std::vector<CowOperationV3>> ops_;
+};
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
index cc8dd83..07c1d4f 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
@@ -37,7 +37,7 @@
 namespace android {
 namespace snapshot {
 
-class CowOperationV3Test : public ::testing::Test {
+class CowTestV3 : public ::testing::Test {
   protected:
     virtual void SetUp() override {
         cow_ = std::make_unique<TemporaryFile>();
@@ -51,7 +51,7 @@
     std::unique_ptr<TemporaryFile> cow_;
 };
 
-TEST_F(CowOperationV3Test, CowHeaderV2Test) {
+TEST_F(CowTestV3, CowHeaderV2Test) {
     CowOptions options;
     options.cluster_ops = 5;
     options.num_merge_ops = 1;
@@ -67,11 +67,58 @@
 
     const auto& header = reader.GetHeader();
     ASSERT_EQ(header.prefix.magic, kCowMagicNumber);
-    ASSERT_EQ(header.prefix.major_version, kCowVersionMajor);
-    ASSERT_EQ(header.prefix.minor_version, kCowVersionMinor);
+    ASSERT_EQ(header.prefix.major_version, 2);
+    ASSERT_EQ(header.prefix.minor_version, 0);
     ASSERT_EQ(header.block_size, options.block_size);
     ASSERT_EQ(header.cluster_ops, options.cluster_ops);
 }
 
+TEST_F(CowTestV3, Header) {
+    CowOptions options;
+    auto writer = CreateCowWriter(3, options, GetCowFd());
+    ASSERT_TRUE(writer->Finalize());
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+
+    const auto& header = reader.GetHeader();
+    ASSERT_EQ(header.prefix.magic, kCowMagicNumber);
+    ASSERT_EQ(header.prefix.major_version, 3);
+    ASSERT_EQ(header.prefix.minor_version, 0);
+    ASSERT_EQ(header.block_size, options.block_size);
+    ASSERT_EQ(header.cluster_ops, 0);
+}
+
+TEST_F(CowTestV3, ZeroOp) {
+    CowOptions options;
+    options.op_count_max = 20;
+    auto writer = CreateCowWriter(3, options, GetCowFd());
+    ASSERT_TRUE(writer->AddZeroBlocks(1, 2));
+    ASSERT_TRUE(writer->Finalize());
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+    ASSERT_EQ(reader.header_v3().op_count, 2);
+
+    auto iter = reader.GetOpIter();
+    ASSERT_NE(iter, nullptr);
+    ASSERT_FALSE(iter->AtEnd());
+
+    auto op = iter->Get();
+    ASSERT_EQ(op->type, kCowZeroOp);
+    ASSERT_EQ(op->data_length, 0);
+    ASSERT_EQ(op->new_block, 1);
+    ASSERT_EQ(op->source_info, 0);
+
+    iter->Next();
+    ASSERT_FALSE(iter->AtEnd());
+    op = iter->Get();
+
+    ASSERT_EQ(op->type, kCowZeroOp);
+    ASSERT_EQ(op->data_length, 0);
+    ASSERT_EQ(op->new_block, 2);
+    ASSERT_EQ(op->source_info, 0);
+}
+
 }  // namespace snapshot
-}  // namespace android
\ No newline at end of file
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_base.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_base.h
index 709b248..5274456 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_base.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_base.h
@@ -62,6 +62,8 @@
     bool InitFd();
     bool ValidateNewBlock(uint64_t new_block);
 
+    bool IsEstimating() const { return is_dev_null_; }
+
     CowOptions options_;
 
     android::base::unique_fd fd_;
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp
index 83a9b1b..37324c7 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp
@@ -273,10 +273,11 @@
     if (!ReadCowHeader(fd_, &header_v3)) {
         return false;
     }
+
     header_ = header_v3;
 
     CowParserV2 parser;
-    if (!parser.Parse(fd_, header_, {label})) {
+    if (!parser.Parse(fd_, header_v3, {label})) {
         return false;
     }
     if (header_.prefix.major_version > 2) {
@@ -292,7 +293,7 @@
     footer_.op.num_ops = 0;
     InitPos();
 
-    for (const auto& op : *parser.ops()) {
+    for (const auto& op : *parser.get_v2ops()) {
         AddOperation(op);
     }
 
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
index 5ae5f19..ecbf97e 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
@@ -78,7 +78,8 @@
     // during COW size estimation
     header_.sequence_buffer_offset = 0;
     header_.resume_buffer_size = 0;
-    header_.op_buffer_size = 0;
+    header_.op_count = 0;
+    header_.op_count_max = 0;
     header_.compression_algorithm = kCowCompressNone;
     return;
 }
@@ -154,15 +155,12 @@
             return false;
         }
     }
+    header_.op_count_max = options_.op_count_max;
 
     if (!Sync()) {
         LOG(ERROR) << "Header sync failed";
         return false;
     }
-
-    next_op_pos_ = 0;
-    next_data_pos_ = 0;
-
     return true;
 }
 
@@ -187,9 +185,17 @@
 }
 
 bool CowWriterV3::EmitZeroBlocks(uint64_t new_block_start, uint64_t num_blocks) {
-    LOG(ERROR) << __LINE__ << " " << __FILE__ << " <- function here should never be called";
-    if (new_block_start && num_blocks) return false;
-    return false;
+    for (uint64_t i = 0; i < num_blocks; i++) {
+        CowOperationV3 op;
+        op.type = kCowZeroOp;
+        op.data_length = 0;
+        op.new_block = new_block_start + i;
+        op.source_info = 0;
+        if (!WriteOperation(op)) {
+            return false;
+        }
+    }
+    return true;
 }
 
 bool CowWriterV3::EmitLabel(uint64_t label) {
@@ -204,9 +210,34 @@
     return false;
 }
 
+bool CowWriterV3::WriteOperation(const CowOperationV3& op) {
+    if (IsEstimating()) {
+        header_.op_count++;
+        header_.op_count_max++;
+        return true;
+    }
+
+    if (header_.op_count + 1 > header_.op_count_max) {
+        LOG(ERROR) << "Maximum number of ops reached: " << header_.op_count_max;
+        return false;
+    }
+
+    const off_t offset = GetOpOffset(header_.op_count);
+    if (!android::base::WriteFullyAtOffset(fd_, &op, sizeof(op), offset)) {
+        return false;
+    }
+
+    header_.op_count++;
+    return true;
+}
+
 bool CowWriterV3::Finalize() {
-    LOG(ERROR) << __LINE__ << " " << __FILE__ << " <- function here should never be called";
-    return false;
+    CHECK_GE(header_.prefix.header_size, sizeof(CowHeaderV3));
+    CHECK_LE(header_.prefix.header_size, sizeof(header_));
+    if (!android::base::WriteFullyAtOffset(fd_, &header_, header_.prefix.header_size, 0)) {
+        return false;
+    }
+    return Sync();
 }
 
 uint64_t CowWriterV3::GetCowSize() {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
index 2a88a12..8717c01 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
@@ -14,7 +14,8 @@
 
 #pragma once
 
-#include <future>
+#include <android-base/logging.h>
+
 #include "writer_base.h"
 
 namespace android {
@@ -42,16 +43,20 @@
     void SetupHeaders();
     bool ParseOptions();
     bool OpenForWrite();
+    bool WriteOperation(const CowOperationV3& op);
+
+    off_t GetOpOffset(uint32_t op_index) const {
+        CHECK_LT(op_index, header_.op_count_max);
+        return header_.prefix.header_size + header_.buffer_size +
+               (op_index * sizeof(CowOperationV3));
+    }
 
   private:
     CowHeaderV3 header_{};
     CowCompression compression_;
+
     // in the case that we are using one thread for compression, we can store and re-use the same
     // compressor
-
-    uint64_t next_op_pos_ = 0;
-    uint64_t next_data_pos_ = 0;
-
     int num_compress_threads_ = 1;
 };
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index 8208d67..950d771 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -112,7 +112,7 @@
             return false;
         }
 
-        if (!android::base::WriteFully(cow_fd_, &header, sizeof(CowHeader))) {
+        if (!android::base::WriteFully(cow_fd_, &header, header.prefix.header_size)) {
             SNAP_PLOG(ERROR) << "Write to header failed";
             return false;
         }
diff --git a/init/persistent_properties.cpp b/init/persistent_properties.cpp
index e89244f..6f8a4de 100644
--- a/init/persistent_properties.cpp
+++ b/init/persistent_properties.cpp
@@ -46,6 +46,13 @@
 
 constexpr const char kLegacyPersistentPropertyDir[] = "/data/property";
 
+void AddPersistentProperty(const std::string& name, const std::string& value,
+                           PersistentProperties* persistent_properties) {
+    auto persistent_property_record = persistent_properties->add_properties();
+    persistent_property_record->set_name(name);
+    persistent_property_record->set_value(value);
+}
+
 Result<PersistentProperties> LoadLegacyPersistentProperties() {
     std::unique_ptr<DIR, decltype(&closedir)> dir(opendir(kLegacyPersistentPropertyDir), closedir);
     if (!dir) {
@@ -146,13 +153,6 @@
 
 }  // namespace
 
-void AddPersistentProperty(const std::string& name, const std::string& value,
-                           PersistentProperties* persistent_properties) {
-    auto persistent_property_record = persistent_properties->add_properties();
-    persistent_property_record->set_name(name);
-    persistent_property_record->set_value(value);
-}
-
 Result<PersistentProperties> LoadPersistentPropertyFile() {
     auto file_contents = ReadPersistentPropertyFile();
     if (!file_contents.ok()) return file_contents.error();
@@ -266,8 +266,57 @@
         }
     }
 
-    return *persistent_properties;
+    // loop over to find all staged props
+    auto const staged_prefix = std::string_view("next_boot.");
+    auto staged_props = std::unordered_map<std::string, std::string>();
+    for (const auto& property_record : persistent_properties->properties()) {
+        auto const& prop_name = property_record.name();
+        auto const& prop_value = property_record.value();
+        if (StartsWith(prop_name, staged_prefix)) {
+            auto actual_prop_name = prop_name.substr(staged_prefix.size());
+            staged_props[actual_prop_name] = prop_value;
+        }
+    }
+
+    if (staged_props.empty()) {
+        return *persistent_properties;
+    }
+
+    // if has staging, apply staging and perserve the original prop order
+    PersistentProperties updated_persistent_properties;
+    for (const auto& property_record : persistent_properties->properties()) {
+        auto const& prop_name = property_record.name();
+        auto const& prop_value = property_record.value();
+
+        // don't include staged props anymore
+        if (StartsWith(prop_name, staged_prefix)) {
+            continue;
+        }
+
+        auto iter = staged_props.find(prop_name);
+        if (iter != staged_props.end()) {
+            AddPersistentProperty(prop_name, iter->second, &updated_persistent_properties);
+            staged_props.erase(iter);
+        } else {
+            AddPersistentProperty(prop_name, prop_value, &updated_persistent_properties);
+        }
+    }
+
+    // add any additional staged props
+    for (auto const& [prop_name, prop_value] : staged_props) {
+        AddPersistentProperty(prop_name, prop_value, &updated_persistent_properties);
+    }
+
+    // write current updated persist prop file
+    auto result = WritePersistentPropertyFile(updated_persistent_properties);
+    if (!result.ok()) {
+        LOG(ERROR) << "Could not store persistent property: " << result.error();
+    }
+
+    return updated_persistent_properties;
 }
 
+
+
 }  // namespace init
 }  // namespace android
diff --git a/init/persistent_properties.h b/init/persistent_properties.h
index 7e9d438..11083da 100644
--- a/init/persistent_properties.h
+++ b/init/persistent_properties.h
@@ -25,8 +25,6 @@
 namespace android {
 namespace init {
 
-void AddPersistentProperty(const std::string& name, const std::string& value,
-                           PersistentProperties* persistent_properties);
 PersistentProperties LoadPersistentProperties();
 void WritePersistentProperty(const std::string& name, const std::string& value);
 PersistentProperties LoadPersistentPropertiesFromMemory();
diff --git a/init/persistent_properties_test.cpp b/init/persistent_properties_test.cpp
index e5d26db..5763050 100644
--- a/init/persistent_properties_test.cpp
+++ b/init/persistent_properties_test.cpp
@@ -178,5 +178,37 @@
     EXPECT_FALSE(it == read_back_properties.properties().end());
 }
 
+TEST(persistent_properties, StagedPersistProperty) {
+    TemporaryFile tf;
+    ASSERT_TRUE(tf.fd != -1);
+    persistent_property_filename = tf.path;
+
+    std::vector<std::pair<std::string, std::string>> persistent_properties = {
+        {"persist.sys.locale", "en-US"},
+        {"next_boot.persist.test.numbers", "54321"},
+        {"persist.sys.timezone", "America/Los_Angeles"},
+        {"persist.test.numbers", "12345"},
+        {"next_boot.persist.test.extra", "abc"},
+    };
+
+    ASSERT_RESULT_OK(
+            WritePersistentPropertyFile(VectorToPersistentProperties(persistent_properties)));
+
+    std::vector<std::pair<std::string, std::string>> expected_persistent_properties = {
+        {"persist.sys.locale", "en-US"},
+        {"persist.sys.timezone", "America/Los_Angeles"},
+        {"persist.test.numbers", "54321"},
+        {"persist.test.extra", "abc"},
+    };
+
+    // lock down that staged props are applied
+    auto first_read_back_properties = LoadPersistentProperties();
+    CheckPropertiesEqual(expected_persistent_properties, first_read_back_properties);
+
+    // lock down that other props are not overwritten
+    auto second_read_back_properties = LoadPersistentProperties();
+    CheckPropertiesEqual(expected_persistent_properties, second_read_back_properties);
+}
+
 }  // namespace init
 }  // namespace android
diff --git a/init/property_service.cpp b/init/property_service.cpp
index 9ae9990..bd74358 100644
--- a/init/property_service.cpp
+++ b/init/property_service.cpp
@@ -1406,33 +1406,12 @@
     switch (init_message.msg_case()) {
         case InitMessage::kLoadPersistentProperties: {
             load_override_properties();
-            // Read persistent properties after all default values have been loaded.
-            // Apply staged and persistent properties
-            bool has_staged_prop = false;
-            auto const staged_prefix = std::string_view("next_boot.");
 
             auto persistent_properties = LoadPersistentProperties();
             for (const auto& property_record : persistent_properties.properties()) {
                 auto const& prop_name = property_record.name();
                 auto const& prop_value = property_record.value();
-
-                if (StartsWith(prop_name, staged_prefix)) {
-                  has_staged_prop = true;
-                  auto actual_prop_name = prop_name.substr(staged_prefix.size());
-                  InitPropertySet(actual_prop_name, prop_value);
-                } else {
-                  InitPropertySet(prop_name, prop_value);
-                }
-            }
-
-            // Update persist prop file if there are staged props
-            if (has_staged_prop) {
-                PersistentProperties props = LoadPersistentPropertiesFromMemory();
-                // write current updated persist prop file
-                auto result = WritePersistentPropertyFile(props);
-                if (!result.ok()) {
-                    LOG(ERROR) << "Could not store persistent property: " << result.error();
-                }
+                InitPropertySet(prop_name, prop_value);
             }
 
             // Apply debug ramdisk special settings after persistent properties are loaded.