libsnapshot_cow: Support multi-block compression

This patch supports compression for bigger block size.

3 bits [57-59] in the COW Operation "source_info_" field is used to store
the compression factor. Supported compression factors are power of 2
viz: 4k, 8k, 16k, 32k, 64k, 128k, 256k.

Only REPLACE operations will have the bigger block size support for now.
This can be extended to other operations later.

The write path in EmitBlocks() has the core logic wherein consecutive
sequence of REPLACE ops are compressed based on the compression factor
settings. Thus, for a 64k compression factor, there will be just one
COW operation which encodes all the 16 operation and the entire 64k
block is compressed in one shot.

NOTE: There is no read I/O path support in this patch. Subsequent patch
will have the read support.

Performance data (with read I/O path support in subsequent patch):

go/variable-block-vabc-perf covers detail performance runs
on Pixel 6 for full and incremental OTA.

TL;DR:

Performance of a full OTA (All numbers are compared against 4k block
size)
=======================================
Snapshot-size:

~10-11% decrease in snapshot-size (disk-space) for zstd with 256k block
size.

~8% decrease in snapshot-size (disk-space) for lz4

Install time:

~13% decrease in OTA install time for zstd with 256k block size.

Snapshot-merge:

~50% decrease in snapshot-merge time with 256k block size for zstd

Post OTA boot-time:

~10.5 decrease in boot time for 64k block size for zstd

In-memory footprint for COW operations:

~80% decrease in memory footprint for 256k block size. (58MB -> 9.2MB)

============================================

For more improvements, further tuning of zstd/lz4 is
required primariy the compression levels, zstd compression window,
performance of gz with compression levels.

Bug: 319309466

Test: cow_api test covering all the supported block sizes for v3 writer.

On Pixel 6:

=======================================
COW Writer V3:

for OTA in full, incremental OTA
   for block_size in 4k, 16k, 32k, 64k, 128k, 256k
       for compression_algo in lz4, zstd, gz, none
          install OTA, reboot, verify merge
=======================================
COW Writer V2:

for OTA in full, incremental OTA
   for block_size in 4k
      for compression_algo in lz4, zstd, gz, none
          install OTA, reboot, verity merge

=====================================
Change-Id: I96201f1609582aa9d44d8085852e284b0c4a426d
Signed-off-by: Akilesh Kailash <akailash@google.com>
diff --git a/fs_mgr/libsnapshot/Android.bp b/fs_mgr/libsnapshot/Android.bp
index a8a7716..5ceaf28 100644
--- a/fs_mgr/libsnapshot/Android.bp
+++ b/fs_mgr/libsnapshot/Android.bp
@@ -108,7 +108,7 @@
     ],
     srcs: [":libsnapshot_sources"],
     static_libs: [
-        "libfs_mgr_binder"
+        "libfs_mgr_binder",
     ],
 }
 
@@ -128,12 +128,12 @@
     static_libs: [
         "libc++fs",
         "libsnapshot_cow",
-    ]
+    ],
 }
 
 cc_library_static {
     name: "libsnapshot_init",
-    native_coverage : true,
+    native_coverage: true,
     defaults: ["libsnapshot_defaults"],
     srcs: [":libsnapshot_sources"],
     ramdisk_available: true,
@@ -204,6 +204,10 @@
         "libsnapshot_cow/writer_v2.cpp",
         "libsnapshot_cow/writer_v3.cpp",
     ],
+
+    header_libs: [
+        "libstorage_literals_headers",
+    ],
     export_include_dirs: ["include"],
     host_supported: true,
     recovery_available: true,
@@ -243,7 +247,10 @@
 
 cc_defaults {
     name: "libsnapshot_test_defaults",
-    defaults: ["libsnapshot_defaults", "libsnapshot_cow_defaults"],
+    defaults: [
+        "libsnapshot_defaults",
+        "libsnapshot_cow_defaults",
+    ],
     srcs: [
         "partition_cow_creator_test.cpp",
         "snapshot_metadata_updater_test.cpp",
@@ -283,10 +290,13 @@
 
 cc_test {
     name: "vts_libsnapshot_test",
-    defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"],
+    defaults: [
+        "libsnapshot_test_defaults",
+        "libsnapshot_hal_deps",
+    ],
     test_suites: [
         "vts",
-        "device-tests"
+        "device-tests",
     ],
     test_options: {
         min_shipping_api_level: 30,
@@ -295,12 +305,15 @@
 
 cc_test {
     name: "vab_legacy_tests",
-    defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"],
+    defaults: [
+        "libsnapshot_test_defaults",
+        "libsnapshot_hal_deps",
+    ],
     cppflags: [
         "-DLIBSNAPSHOT_TEST_VAB_LEGACY",
     ],
     test_suites: [
-        "device-tests"
+        "device-tests",
     ],
     test_options: {
         // Legacy VAB launched in Android R.
@@ -310,12 +323,15 @@
 
 cc_test {
     name: "vabc_legacy_tests",
-    defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"],
+    defaults: [
+        "libsnapshot_test_defaults",
+        "libsnapshot_hal_deps",
+    ],
     cppflags: [
         "-DLIBSNAPSHOT_TEST_VABC_LEGACY",
     ],
     test_suites: [
-        "device-tests"
+        "device-tests",
     ],
     test_options: {
         // Legacy VABC launched in Android S.
@@ -343,7 +359,10 @@
 
 cc_binary {
     name: "snapshotctl",
-    defaults: ["libsnapshot_cow_defaults", "libsnapshot_hal_deps"],
+    defaults: [
+        "libsnapshot_cow_defaults",
+        "libsnapshot_hal_deps",
+    ],
     srcs: [
         "snapshotctl.cpp",
     ],
@@ -412,8 +431,11 @@
         "libgtest",
         "libsnapshot_cow",
     ],
+    header_libs: [
+        "libstorage_literals_headers",
+    ],
     test_suites: [
-        "device-tests"
+        "device-tests",
     ],
     test_options: {
         min_shipping_api_level: 30,
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
index d410c14..8f62234 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
@@ -201,6 +201,12 @@
 static constexpr uint64_t kCowOpSourceInfoTypeBit = 60;
 static constexpr uint64_t kCowOpSourceInfoTypeNumBits = 4;
 static constexpr uint64_t kCowOpSourceInfoTypeMask = (1ULL << kCowOpSourceInfoTypeNumBits) - 1;
+
+static constexpr uint64_t kCowOpSourceInfoCompressionBit = 57;
+static constexpr uint64_t kCowOpSourceInfoCompressionNumBits = 3;
+static constexpr uint64_t kCowOpSourceInfoCompressionMask =
+        ((1ULL << kCowOpSourceInfoCompressionNumBits) - 1);
+
 // The on disk format of cow (currently ==  CowOperation)
 struct CowOperationV3 {
     // If this operation reads from the data section of the COW, this contains
@@ -211,8 +217,8 @@
     uint32_t new_block;
 
     // source_info with have the following layout
-    // |---4 bits ---| ---12 bits---| --- 48 bits ---|
-    // |--- type --- | -- unused -- | --- source --- |
+    // |--- 4 bits -- | --------- 3 bits ------ | --- 9 bits --- | --- 48 bits ---|
+    // |--- type ---  | -- compression factor --| --- unused --- | --- source --- |
     //
     // The value of |source| depends on the operation code.
     //
@@ -225,6 +231,17 @@
     // For ops other than Label:
     //  Bits 47-62 are reserved and must be zero.
     // A block is compressed if it’s data is < block_sz
+    //
+    // Bits [57-59] represents the compression factor.
+    //
+    //       Compression - factor
+    // ==========================
+    // 000 -  4k
+    // 001 -  8k
+    // 010 -  16k
+    // ...
+    // 110 -  256k
+    //
     uint64_t source_info_;
     constexpr uint64_t source() const { return source_info_ & kCowOpSourceInfoDataMask; }
     constexpr void set_source(uint64_t source) {
@@ -245,6 +262,20 @@
         source_info_ |= (static_cast<uint64_t>(type) & kCowOpSourceInfoTypeMask)
                         << kCowOpSourceInfoTypeBit;
     }
+    constexpr void set_compression_bits(uint8_t compression_factor) {
+        // Clear the 3 bits from bit 57 - [57-59]
+        source_info_ &= ~(kCowOpSourceInfoCompressionMask << kCowOpSourceInfoCompressionBit);
+        // Set the actual compression factor
+        source_info_ |=
+                (static_cast<uint64_t>(compression_factor) & kCowOpSourceInfoCompressionMask)
+                << kCowOpSourceInfoCompressionBit;
+    }
+    constexpr uint8_t compression_bits() const {
+        // Grab the 3 bits from [57-59]
+        const auto compression_factor =
+                (source_info_ >> kCowOpSourceInfoCompressionBit) & kCowOpSourceInfoCompressionMask;
+        return static_cast<uint8_t>(compression_factor);
+    }
 } __attribute__((packed));
 
 // Ensure that getters/setters added to CowOperationV3 does not increases size
@@ -326,5 +357,7 @@
 // Convert compression name to internal value.
 std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::string_view name);
 
+// Return block size used for compression
+size_t CowOpCompressionSize(const CowOperation* op, size_t block_size);
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
index 0194ffd..89699dc 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
@@ -119,9 +119,9 @@
 
 class CompressWorker {
   public:
-    CompressWorker(std::unique_ptr<ICompressor>&& compressor, uint32_t block_size);
+    CompressWorker(std::unique_ptr<ICompressor>&& compressor);
     bool RunThread();
-    void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
+    void EnqueueCompressBlocks(const void* buffer, size_t block_size, size_t num_blocks);
     bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
     void Finalize();
     static uint32_t GetDefaultCompressionLevel(CowCompressionAlgorithm compression);
@@ -134,20 +134,22 @@
     struct CompressWork {
         const void* buffer;
         size_t num_blocks;
+        size_t block_size;
         bool compression_status = false;
         std::vector<std::basic_string<uint8_t>> compressed_data;
     };
 
     std::unique_ptr<ICompressor> compressor_;
-    uint32_t block_size_;
 
     std::queue<CompressWork> work_queue_;
     std::queue<CompressWork> compressed_queue_;
     std::mutex lock_;
     std::condition_variable cv_;
     bool stopped_ = false;
+    size_t total_submitted_ = 0;
+    size_t total_processed_ = 0;
 
-    bool CompressBlocks(const void* buffer, size_t num_blocks,
+    bool CompressBlocks(const void* buffer, size_t num_blocks, size_t block_size,
                         std::vector<std::basic_string<uint8_t>>* compressed_data);
 };
 
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
index abc7d33..c7a5f05 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
@@ -208,9 +208,9 @@
     std::unique_ptr<ZSTD_CCtx, decltype(&ZSTD_freeCCtx)> zstd_context_;
 };
 
-bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
+bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, size_t block_size,
                                     std::vector<std::basic_string<uint8_t>>* compressed_data) {
-    return CompressBlocks(compressor_.get(), block_size_, buffer, num_blocks, compressed_data);
+    return CompressBlocks(compressor_.get(), block_size, buffer, num_blocks, compressed_data);
 }
 
 bool CompressWorker::CompressBlocks(ICompressor* compressor, size_t block_size, const void* buffer,
@@ -254,7 +254,8 @@
         }
 
         // Compress blocks
-        bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data);
+        bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, blocks.block_size,
+                                  &blocks.compressed_data);
         blocks.compression_status = ret;
         {
             std::lock_guard<std::mutex> lock(lock_);
@@ -273,35 +274,31 @@
     return true;
 }
 
-void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) {
+void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t block_size,
+                                           size_t num_blocks) {
     {
         std::lock_guard<std::mutex> lock(lock_);
 
         CompressWork blocks = {};
         blocks.buffer = buffer;
+        blocks.block_size = block_size;
         blocks.num_blocks = num_blocks;
         work_queue_.push(std::move(blocks));
+        total_submitted_ += 1;
     }
     cv_.notify_all();
 }
 
 bool CompressWorker::GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf) {
-    {
+    while (true) {
         std::unique_lock<std::mutex> lock(lock_);
-        while (compressed_queue_.empty() && !stopped_) {
+        while ((total_submitted_ != total_processed_) && compressed_queue_.empty() && !stopped_) {
             cv_.wait(lock);
         }
-
-        if (stopped_) {
-            return true;
-        }
-    }
-
-    {
-        std::lock_guard<std::mutex> lock(lock_);
         while (compressed_queue_.size() > 0) {
             CompressWork blocks = std::move(compressed_queue_.front());
             compressed_queue_.pop();
+            total_processed_ += 1;
 
             if (blocks.compression_status) {
                 compressed_buf->insert(compressed_buf->end(),
@@ -312,9 +309,12 @@
                 return false;
             }
         }
+        if ((total_submitted_ == total_processed_) || stopped_) {
+            total_submitted_ = 0;
+            total_processed_ = 0;
+            return true;
+        }
     }
-
-    return true;
 }
 
 std::unique_ptr<ICompressor> ICompressor::Brotli(uint32_t compression_level,
@@ -344,8 +344,8 @@
     cv_.notify_all();
 }
 
-CompressWorker::CompressWorker(std::unique_ptr<ICompressor>&& compressor, uint32_t block_size)
-    : compressor_(std::move(compressor)), block_size_(block_size) {}
+CompressWorker::CompressWorker(std::unique_ptr<ICompressor>&& compressor)
+    : compressor_(std::move(compressor)) {}
 
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
index 8d1786c..4e1c666 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
@@ -21,6 +21,7 @@
 #include <android-base/logging.h>
 #include <android-base/stringprintf.h>
 #include <libsnapshot/cow_format.h>
+#include <storage_literals/storage_literals.h>
 #include "writer_v2.h"
 #include "writer_v3.h"
 
@@ -28,6 +29,7 @@
 namespace snapshot {
 
 using android::base::unique_fd;
+using namespace android::storage_literals;
 
 std::ostream& EmitCowTypeString(std::ostream& os, CowOperationType cow_type) {
     switch (cow_type) {
@@ -174,5 +176,10 @@
     return CreateCowWriter(version, options, unique_fd{-1}, std::nullopt);
 }
 
+size_t CowOpCompressionSize(const CowOperation* op, size_t block_size) {
+    uint8_t compression_bits = op->compression_bits();
+    return (block_size << compression_bits);
+}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
index 1b4a971..d6a03ca 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
@@ -26,6 +26,7 @@
 #include <android-base/logging.h>
 #include <libsnapshot/cow_format.h>
 #include <libsnapshot/cow_reader.h>
+#include <storage_literals/storage_literals.h>
 #include <zlib.h>
 
 #include "cow_decompress.h"
@@ -35,6 +36,8 @@
 namespace android {
 namespace snapshot {
 
+using namespace android::storage_literals;
+
 bool ReadCowHeader(android::base::borrowed_fd fd, CowHeaderV3* header) {
     if (lseek(fd.get(), 0, SEEK_SET) < 0) {
         PLOG(ERROR) << "lseek header failed";
@@ -705,6 +708,11 @@
 ssize_t CowReader::ReadData(const CowOperation* op, void* buffer, size_t buffer_size,
                             size_t ignore_bytes) {
     std::unique_ptr<IDecompressor> decompressor;
+    const size_t op_buf_size = CowOpCompressionSize(op, header_.block_size);
+    if (!op_buf_size) {
+        LOG(ERROR) << "Compression size is zero. op: " << *op;
+        return -1;
+    }
     switch (GetCompressionType()) {
         case kCowCompressNone:
             break;
@@ -715,12 +723,12 @@
             decompressor = IDecompressor::Brotli();
             break;
         case kCowCompressZstd:
-            if (header_.block_size != op->data_length) {
+            if (op_buf_size != op->data_length) {
                 decompressor = IDecompressor::Zstd();
             }
             break;
         case kCowCompressLz4:
-            if (header_.block_size != op->data_length) {
+            if (op_buf_size != op->data_length) {
                 decompressor = IDecompressor::Lz4();
             }
             break;
@@ -736,14 +744,14 @@
         offset = op->source();
     }
     if (!decompressor ||
-        ((op->data_length == header_.block_size) && (header_.prefix.major_version == 3))) {
+        ((op->data_length == op_buf_size) && (header_.prefix.major_version == 3))) {
         CowDataStream stream(this, offset + ignore_bytes, op->data_length - ignore_bytes);
         return stream.ReadFully(buffer, buffer_size);
     }
 
     CowDataStream stream(this, offset, op->data_length);
     decompressor->set_stream(&stream);
-    return decompressor->Decompress(buffer, buffer_size, header_.block_size, ignore_bytes);
+    return decompressor->Decompress(buffer, buffer_size, op_buf_size, ignore_bytes);
 }
 
 bool CowReader::GetSourceOffset(const CowOperation* op, uint64_t* source_offset) {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp
index fe977b7..a35b614 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp
@@ -206,6 +206,8 @@
 
         auto& new_op = out->ops->at(i);
         new_op.set_type(v2_op.type);
+        // v2 ops always have 4k compression
+        new_op.set_compression_bits(0);
         new_op.data_length = v2_op.data_length;
 
         if (v2_op.new_block > std::numeric_limits<uint32_t>::max()) {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
index de60213..3c5b394 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
@@ -18,6 +18,7 @@
 #include <libsnapshot/cow_format.h>
 #include <libsnapshot/cow_reader.h>
 #include <libsnapshot/cow_writer.h>
+#include <storage_literals/storage_literals.h>
 #include "writer_v2.h"
 #include "writer_v3.h"
 
@@ -29,6 +30,9 @@
 namespace android {
 namespace snapshot {
 
+using namespace android::storage_literals;
+using ::testing::TestWithParam;
+
 class CowTestV3 : public ::testing::Test {
   protected:
     virtual void SetUp() override {
@@ -484,14 +488,14 @@
     ASSERT_TRUE(reader.Parse(cow_->fd));
 
     auto header = reader.header_v3();
-    ASSERT_EQ(header.sequence_data_count, 0);
+    ASSERT_EQ(header.sequence_data_count, static_cast<uint64_t>(0));
     ASSERT_EQ(header.resume_point_count, 0);
     ASSERT_EQ(header.resume_point_max, 4);
 
     writer->AddLabel(0);
     ASSERT_TRUE(reader.Parse(cow_->fd));
     header = reader.header_v3();
-    ASSERT_EQ(header.sequence_data_count, 0);
+    ASSERT_EQ(header.sequence_data_count, static_cast<uint64_t>(0));
     ASSERT_EQ(header.resume_point_count, 1);
     ASSERT_EQ(header.resume_point_max, 4);
 
@@ -699,5 +703,189 @@
     ASSERT_FALSE(writer->AddZeroBlocks(0, 19));
 }
 
+struct TestParam {
+    std::string compression;
+    int block_size;
+    int num_threads;
+    size_t cluster_ops;
+};
+
+class VariableBlockTest : public ::testing::TestWithParam<TestParam> {
+  protected:
+    virtual void SetUp() override {
+        cow_ = std::make_unique<TemporaryFile>();
+        ASSERT_GE(cow_->fd, 0) << strerror(errno);
+    }
+
+    virtual void TearDown() override { cow_ = nullptr; }
+
+    unique_fd GetCowFd() { return unique_fd{dup(cow_->fd)}; }
+
+    std::unique_ptr<TemporaryFile> cow_;
+};
+
+// Helper to check read sizes.
+static inline void ReadBlockData(CowReader& reader, const CowOperation* op, void* buffer,
+                                 size_t size) {
+    size_t block_size = CowOpCompressionSize(op, 4096);
+    std::string data(block_size, '\0');
+    size_t value = reader.ReadData(op, data.data(), block_size);
+    ASSERT_TRUE(value == block_size);
+    std::memcpy(buffer, data.data(), size);
+}
+
+TEST_P(VariableBlockTest, VariableBlockCompressionTest) {
+    const TestParam params = GetParam();
+
+    CowOptions options;
+    options.op_count_max = 100000;
+    options.compression = params.compression;
+    options.num_compress_threads = params.num_threads;
+    options.batch_write = true;
+    options.compression_factor = params.block_size;
+    options.cluster_ops = params.cluster_ops;
+
+    CowWriterV3 writer(options, GetCowFd());
+
+    ASSERT_TRUE(writer.Initialize());
+
+    std::string xor_data = "This is test data-1. Testing xor";
+    xor_data.resize(options.block_size, '\0');
+    ASSERT_TRUE(writer.AddXorBlocks(50, xor_data.data(), xor_data.size(), 24, 10));
+
+    // Large number of blocks
+    std::string data = "This is test data-2. Testing replace ops";
+    data.resize(options.block_size * 2048, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(100, data.data(), data.size()));
+
+    std::string data2 = "This is test data-3. Testing replace ops";
+    data2.resize(options.block_size * 259, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(6000, data2.data(), data2.size()));
+
+    // Test data size is smaller than block size
+
+    // 4k block
+    std::string data3 = "This is test data-4. Testing replace ops";
+    data3.resize(options.block_size, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(9000, data3.data(), data3.size()));
+
+    // 8k block
+    std::string data4;
+    data4.resize(options.block_size * 2, '\0');
+    for (size_t i = 0; i < data4.size(); i++) {
+        data4[i] = static_cast<char>('A' + i / options.block_size);
+    }
+    ASSERT_TRUE(writer.AddRawBlocks(10000, data4.data(), data4.size()));
+
+    // 16k block
+    std::string data5;
+    data.resize(options.block_size * 4, '\0');
+    for (int i = 0; i < data5.size(); i++) {
+        data5[i] = static_cast<char>('C' + i / options.block_size);
+    }
+    ASSERT_TRUE(writer.AddRawBlocks(11000, data5.data(), data5.size()));
+
+    // 64k Random buffer which cannot be compressed
+    unique_fd rnd_fd(open("/dev/random", O_RDONLY));
+    ASSERT_GE(rnd_fd, 0);
+    std::string random_buffer;
+    random_buffer.resize(65536, '\0');
+    ASSERT_EQ(android::base::ReadFullyAtOffset(rnd_fd, random_buffer.data(), 65536, 0), true);
+    ASSERT_TRUE(writer.AddRawBlocks(12000, random_buffer.data(), 65536));
+
+    ASSERT_TRUE(writer.Finalize());
+
+    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+
+    auto iter = reader.GetOpIter();
+    ASSERT_NE(iter, nullptr);
+
+    while (!iter->AtEnd()) {
+        auto op = iter->Get();
+
+        if (op->type() == kCowXorOp) {
+            std::string sink(xor_data.size(), '\0');
+            ASSERT_EQ(op->new_block, 50);
+            ASSERT_EQ(op->source(), 98314);  // 4096 * 24 + 10
+            ReadBlockData(reader, op, sink.data(), sink.size());
+            ASSERT_EQ(sink, xor_data);
+        }
+        if (op->type() == kCowReplaceOp) {
+            if (op->new_block == 100) {
+                data.resize(options.block_size);
+                std::string sink(data.size(), '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink.size(), data.size());
+                ASSERT_EQ(sink, data);
+            }
+            if (op->new_block == 6000) {
+                data2.resize(options.block_size);
+                std::string sink(data2.size(), '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink, data2);
+            }
+            if (op->new_block == 9000) {
+                std::string sink(data3.size(), '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink, data3);
+            }
+            if (op->new_block == 10000) {
+                data4.resize(options.block_size);
+                std::string sink(options.block_size, '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink, data4);
+            }
+            if (op->new_block == 11000) {
+                data5.resize(options.block_size);
+                std::string sink(options.block_size, '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink, data5);
+            }
+            if (op->new_block == 12000) {
+                random_buffer.resize(options.block_size);
+                std::string sink(options.block_size, '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink, random_buffer);
+            }
+        }
+
+        iter->Next();
+    }
+}
+
+std::vector<TestParam> GetTestConfigs() {
+    std::vector<TestParam> testParams;
+
+    std::vector<int> block_sizes = {4_KiB, 8_KiB, 16_KiB, 32_KiB, 64_KiB, 128_KiB, 256_KiB};
+    std::vector<std::string> compression_algo = {"none", "lz4", "zstd", "gz"};
+    std::vector<int> threads = {1, 2};
+    // This will also test batch size
+    std::vector<size_t> cluster_ops = {1, 256};
+
+    // This should test 112 combination
+    for (auto block : block_sizes) {
+        for (auto compression : compression_algo) {
+            for (auto thread : threads) {
+                for (auto cluster : cluster_ops) {
+                    TestParam param;
+                    param.block_size = block;
+                    param.compression = compression;
+                    param.num_threads = thread;
+                    param.cluster_ops = cluster;
+                    testParams.push_back(std::move(param));
+                }
+            }
+        }
+    }
+
+    return testParams;
+}
+
+INSTANTIATE_TEST_SUITE_P(CompressorsWithVariableBlocks, VariableBlockTest,
+                         ::testing::ValuesIn(GetTestConfigs()));
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp
index 75cd111..d0864e0 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp
@@ -185,7 +185,7 @@
     for (int i = 0; i < num_compress_threads_; i++) {
         std::unique_ptr<ICompressor> compressor =
                 ICompressor::Create(compression_, header_.block_size);
-        auto wt = std::make_unique<CompressWorker>(std::move(compressor), header_.block_size);
+        auto wt = std::make_unique<CompressWorker>(std::move(compressor));
         threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
         compress_threads_.push_back(std::move(wt));
     }
@@ -353,7 +353,7 @@
         if (i == num_threads - 1) {
             num_blocks_per_thread = num_blocks;
         }
-        worker->EnqueueCompressBlocks(iter, num_blocks_per_thread);
+        worker->EnqueueCompressBlocks(iter, header_.block_size, num_blocks_per_thread);
         iter += (num_blocks_per_thread * header_.block_size);
         num_blocks -= num_blocks_per_thread;
     }
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
index 251b24e..c92460a 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
@@ -37,6 +37,7 @@
 #include <libsnapshot/cow_compress.h>
 #include <libsnapshot_cow/parser_v3.h>
 #include <linux/fs.h>
+#include <storage_literals/storage_literals.h>
 #include <sys/ioctl.h>
 #include <unistd.h>
 #include <numeric>
@@ -54,6 +55,7 @@
 
 static_assert(sizeof(off_t) == sizeof(uint64_t));
 
+using namespace android::storage_literals;
 using android::base::unique_fd;
 
 // Divide |x| by |y| and round up to the nearest integer.
@@ -77,9 +79,9 @@
     threads_.clear();
     for (size_t i = 0; i < num_compress_threads_; i++) {
         std::unique_ptr<ICompressor> compressor =
-                ICompressor::Create(compression_, header_.block_size);
+                ICompressor::Create(compression_, header_.max_compression_size);
         auto&& wt = compress_threads_.emplace_back(
-                std::make_unique<CompressWorker>(std::move(compressor), header_.block_size));
+                std::make_unique<CompressWorker>(std::move(compressor)));
         threads_.emplace_back(std::thread([wt = wt.get()]() { wt->RunThread(); }));
     }
     LOG(INFO) << num_compress_threads_ << " thread used for compression";
@@ -111,10 +113,15 @@
     header_.op_count_max = 0;
     header_.compression_algorithm = kCowCompressNone;
     header_.max_compression_size = options_.compression_factor;
-    return;
 }
 
 bool CowWriterV3::ParseOptions() {
+    if (!header_.max_compression_size || !IsBlockAligned(header_.max_compression_size)) {
+        LOG(ERROR) << "Invalid compression factor: " << header_.max_compression_size;
+        return false;
+    }
+
+    LOG(INFO) << "Compression factor: " << header_.max_compression_size;
     num_compress_threads_ = std::max(int(options_.num_compress_threads), 1);
     auto parts = android::base::Split(options_.compression, ",");
     if (parts.size() > 2) {
@@ -154,20 +161,22 @@
 
     compression_.algorithm = *algorithm;
     if (compression_.algorithm != kCowCompressNone) {
-        compressor_ = ICompressor::Create(compression_, header_.block_size);
+        compressor_ = ICompressor::Create(compression_, header_.max_compression_size);
         if (compressor_ == nullptr) {
             LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm;
             return false;
         }
-        if (options_.cluster_ops &&
-            (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
-             options_.batch_write)) {
-            batch_size_ = std::max<size_t>(options_.cluster_ops, 1);
-            data_vec_.reserve(batch_size_);
-            cached_data_.reserve(batch_size_);
-            cached_ops_.reserve(batch_size_);
-        }
     }
+
+    if (options_.cluster_ops &&
+        (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
+         options_.batch_write)) {
+        batch_size_ = std::max<size_t>(options_.cluster_ops, 1);
+        data_vec_.reserve(batch_size_);
+        cached_data_.reserve(batch_size_);
+        cached_ops_.reserve(batch_size_);
+    }
+
     if (batch_size_ > 1) {
         LOG(INFO) << "Batch writes: enabled with batch size " << batch_size_;
     } else {
@@ -178,6 +187,7 @@
         num_compress_threads_ = options_.num_compress_threads;
     }
     InitWorkers();
+
     return true;
 }
 
@@ -206,6 +216,14 @@
         }
     }
 
+    // TODO: b/322279333
+    // Set compression factor to 4k during estimation.
+    // Once COW estimator is ready to support variable
+    // block size, this check has to be removed.
+    if (IsEstimating()) {
+        header_.max_compression_size = header_.block_size;
+    }
+
     return true;
 }
 
@@ -328,6 +346,46 @@
     return cached_data_.size() >= batch_size_ || cached_ops_.size() >= batch_size_ * 16;
 }
 
+bool CowWriterV3::ConstructCowOpCompressedBuffers(uint64_t new_block_start, const void* data,
+                                                  uint64_t old_block, uint16_t offset,
+                                                  CowOperationType type, size_t blocks_to_write) {
+    size_t compressed_bytes = 0;
+    auto&& blocks = CompressBlocks(blocks_to_write, data, type);
+    if (blocks.empty()) {
+        LOG(ERROR) << "Failed to compress blocks " << new_block_start << ", " << blocks_to_write
+                   << ", actual number of blocks received from compressor " << blocks.size();
+        return false;
+    }
+    size_t blocks_written = 0;
+    for (size_t blk_index = 0; blk_index < blocks.size(); blk_index++) {
+        CowOperation& op = cached_ops_.emplace_back();
+        auto& vec = data_vec_.emplace_back();
+        CompressedBuffer buffer = std::move(blocks[blk_index]);
+        auto& compressed_data = cached_data_.emplace_back(std::move(buffer.compressed_data));
+        op.new_block = new_block_start + blocks_written;
+
+        op.set_type(type);
+        op.set_compression_bits(std::log2(buffer.compression_factor / header_.block_size));
+
+        if (type == kCowXorOp) {
+            op.set_source((old_block + blocks_written) * header_.block_size + offset);
+        } else {
+            op.set_source(next_data_pos_ + compressed_bytes);
+        }
+
+        vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
+        op.data_length = vec.iov_len;
+        compressed_bytes += op.data_length;
+        blocks_written += (buffer.compression_factor / header_.block_size);
+    }
+    if (blocks_written != blocks_to_write) {
+        LOG(ERROR) << "Total compressed blocks: " << blocks_written
+                   << " Expected: " << blocks_to_write;
+        return false;
+    }
+    return true;
+}
+
 bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
                              uint64_t old_block, uint16_t offset, CowOperationType type) {
     if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) {
@@ -341,38 +399,21 @@
         return false;
     }
     for (size_t i = 0; i < num_blocks;) {
-        const auto blocks_to_write =
+        const size_t blocks_to_write =
                 std::min<size_t>(batch_size_ - cached_data_.size(), num_blocks - i);
-        size_t compressed_bytes = 0;
-        auto&& blocks = CompressBlocks(blocks_to_write, bytes + header_.block_size * i);
-        if (blocks.size() != blocks_to_write) {
-            LOG(ERROR) << "Failed to compress blocks " << new_block_start + i << ", "
-                       << blocks_to_write << ", actual number of blocks received from compressor "
-                       << blocks.size();
+
+        if (!ConstructCowOpCompressedBuffers(new_block_start + i, bytes + header_.block_size * i,
+                                             old_block + i, offset, type, blocks_to_write)) {
             return false;
         }
-        for (size_t j = 0; j < blocks_to_write; j++) {
-            CowOperation& op = cached_ops_.emplace_back();
-            auto& vec = data_vec_.emplace_back();
-            auto& compressed_data = cached_data_.emplace_back(std::move(blocks[j]));
-            op.new_block = new_block_start + i + j;
 
-            op.set_type(type);
-            if (type == kCowXorOp) {
-                op.set_source((old_block + i + j) * header_.block_size + offset);
-            } else {
-                op.set_source(next_data_pos_ + compressed_bytes);
-            }
-            vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
-            op.data_length = vec.iov_len;
-            compressed_bytes += op.data_length;
-        }
         if (NeedsFlush() && !FlushCacheOps()) {
             LOG(ERROR) << "EmitBlocks with compression: write failed. new block: "
                        << new_block_start << " compression: " << compression_.algorithm
                        << ", op type: " << type;
             return false;
         }
+
         i += blocks_to_write;
     }
 
@@ -482,55 +523,165 @@
     return true;
 }
 
-std::vector<std::basic_string<uint8_t>> CowWriterV3::CompressBlocks(const size_t num_blocks,
-                                                                    const void* data) {
-    const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
-    const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
-    std::vector<std::basic_string<uint8_t>> compressed_buf;
-    compressed_buf.clear();
-    const uint8_t* const iter = reinterpret_cast<const uint8_t*>(data);
-    if (compression_.algorithm == kCowCompressNone) {
-        for (size_t i = 0; i < num_blocks; i++) {
-            auto& buf = compressed_buf.emplace_back();
-            buf.resize(header_.block_size);
-            std::memcpy(buf.data(), iter + i * header_.block_size, header_.block_size);
-        }
-        return compressed_buf;
+size_t CowWriterV3::GetCompressionFactor(const size_t blocks_to_compress,
+                                         CowOperationType type) const {
+    // For XOR ops, we don't support bigger block size compression yet.
+    // For bigger block size support, snapshot-merge also has to changed. We
+    // aren't there yet; hence, just stick to 4k for now until
+    // snapshot-merge is ready for XOR operation.
+    if (type == kCowXorOp) {
+        return header_.block_size;
     }
-    if (num_threads <= 1) {
-        if (!CompressWorker::CompressBlocks(compressor_.get(), header_.block_size, data, num_blocks,
-                                            &compressed_buf)) {
+
+    size_t compression_factor = header_.max_compression_size;
+    while (compression_factor > header_.block_size) {
+        size_t num_blocks = compression_factor / header_.block_size;
+        if (blocks_to_compress >= num_blocks) {
+            return compression_factor;
+        }
+        compression_factor >>= 1;
+    }
+    return header_.block_size;
+}
+
+std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithNoCompression(
+        const size_t num_blocks, const void* data, CowOperationType type) {
+    size_t blocks_to_compress = num_blocks;
+    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
+    std::vector<CompressedBuffer> compressed_vec;
+
+    while (blocks_to_compress) {
+        CompressedBuffer buffer;
+
+        const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
+        size_t num_blocks = compression_factor / header_.block_size;
+
+        buffer.compression_factor = compression_factor;
+        buffer.compressed_data.resize(compression_factor);
+
+        // No compression. Just copy the data as-is.
+        std::memcpy(buffer.compressed_data.data(), iter, compression_factor);
+
+        compressed_vec.push_back(std::move(buffer));
+        blocks_to_compress -= num_blocks;
+        iter += compression_factor;
+    }
+    return compressed_vec;
+}
+
+std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithCompression(
+        const size_t num_blocks, const void* data, CowOperationType type) {
+    size_t blocks_to_compress = num_blocks;
+    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
+    std::vector<CompressedBuffer> compressed_vec;
+
+    while (blocks_to_compress) {
+        CompressedBuffer buffer;
+
+        const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
+        size_t num_blocks = compression_factor / header_.block_size;
+
+        buffer.compression_factor = compression_factor;
+        // Compress the blocks
+        buffer.compressed_data = compressor_->Compress(iter, compression_factor);
+        if (buffer.compressed_data.empty()) {
+            PLOG(ERROR) << "Compression failed";
             return {};
         }
-    } else {
-        // Submit the blocks per thread. The retrieval of
-        // compressed buffers has to be done in the same order.
-        // We should not poll for completed buffers in a different order as the
-        // buffers are tightly coupled with block ordering.
-        for (size_t i = 0; i < num_threads; i++) {
-            CompressWorker* worker = compress_threads_[i].get();
-            const auto blocks_in_batch =
-                    std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
-            worker->EnqueueCompressBlocks(iter + i * blocks_per_thread * header_.block_size,
-                                          blocks_in_batch);
+
+        // Check if the buffer was indeed compressed
+        if (buffer.compressed_data.size() >= compression_factor) {
+            buffer.compressed_data.resize(compression_factor);
+            std::memcpy(buffer.compressed_data.data(), iter, compression_factor);
         }
 
-        for (size_t i = 0; i < num_threads; i++) {
-            CompressWorker* worker = compress_threads_[i].get();
-            if (!worker->GetCompressedBuffers(&compressed_buf)) {
-                return {};
-            }
+        compressed_vec.push_back(std::move(buffer));
+        blocks_to_compress -= num_blocks;
+        iter += compression_factor;
+    }
+    return compressed_vec;
+}
+
+std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreadedCompression(
+        const size_t num_blocks, const void* data, CowOperationType type) {
+    const size_t num_threads = num_compress_threads_;
+    const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
+    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
+
+    std::vector<CompressedBuffer> compressed_vec;
+    // Submit the blocks per thread. The retrieval of
+    // compressed buffers has to be done in the same order.
+    // We should not poll for completed buffers in a different order as the
+    // buffers are tightly coupled with block ordering.
+    for (size_t i = 0; i < num_threads; i++) {
+        CompressWorker* worker = compress_threads_[i].get();
+        auto blocks_in_batch = std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
+        // Enqueue the blocks to be compressed for each thread.
+        while (blocks_in_batch) {
+            CompressedBuffer buffer;
+
+            const size_t compression_factor = GetCompressionFactor(blocks_in_batch, type);
+            size_t num_blocks = compression_factor / header_.block_size;
+
+            buffer.compression_factor = compression_factor;
+            worker->EnqueueCompressBlocks(iter, compression_factor, 1);
+            compressed_vec.push_back(std::move(buffer));
+            blocks_in_batch -= num_blocks;
+            iter += compression_factor;
         }
     }
-    for (size_t i = 0; i < num_blocks; i++) {
+
+    // Fetch compressed buffers from the threads
+    std::vector<std::basic_string<uint8_t>> compressed_buf;
+    compressed_buf.clear();
+    for (size_t i = 0; i < num_threads; i++) {
+        CompressWorker* worker = compress_threads_[i].get();
+        if (!worker->GetCompressedBuffers(&compressed_buf)) {
+            return {};
+        }
+    }
+
+    if (compressed_vec.size() != compressed_buf.size()) {
+        LOG(ERROR) << "Compressed buffer size: " << compressed_buf.size()
+                   << " - Expected: " << compressed_vec.size();
+        return {};
+    }
+
+    iter = reinterpret_cast<const uint8_t*>(data);
+    // Walk through all the compressed buffers
+    for (size_t i = 0; i < compressed_buf.size(); i++) {
+        auto& buffer = compressed_vec[i];
         auto& block = compressed_buf[i];
-        if (block.size() >= header_.block_size) {
-            block.resize(header_.block_size);
-            std::memcpy(block.data(), iter + header_.block_size * i, header_.block_size);
+        size_t block_size = buffer.compression_factor;
+        // Check if the blocks was indeed compressed
+        if (block.size() >= block_size) {
+            buffer.compressed_data.resize(block_size);
+            std::memcpy(buffer.compressed_data.data(), iter, block_size);
+        } else {
+            // Compressed block
+            buffer.compressed_data.resize(block.size());
+            std::memcpy(buffer.compressed_data.data(), block.data(), block.size());
         }
+        iter += block_size;
+    }
+    return compressed_vec;
+}
+
+std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::CompressBlocks(const size_t num_blocks,
+                                                                       const void* data,
+                                                                       CowOperationType type) {
+    if (compression_.algorithm == kCowCompressNone) {
+        return ProcessBlocksWithNoCompression(num_blocks, data, type);
     }
 
-    return compressed_buf;
+    const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
+
+    // If no threads are required, just compress the blocks inline.
+    if (num_threads <= 1) {
+        return ProcessBlocksWithCompression(num_blocks, data, type);
+    }
+
+    return ProcessBlocksWithThreadedCompression(num_blocks, data, type);
 }
 
 bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
index b19af60..4915e9c 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
@@ -19,11 +19,15 @@
 #include <thread>
 #include <vector>
 
+#include <libsnapshot/cow_format.h>
+#include <storage_literals/storage_literals.h>
 #include "writer_base.h"
 
 namespace android {
 namespace snapshot {
 
+using namespace android::storage_literals;
+
 class CowWriterV3 : public CowWriterBase {
   public:
     explicit CowWriterV3(const CowOptions& options, android::base::unique_fd&& fd);
@@ -43,6 +47,10 @@
     virtual bool EmitSequenceData(size_t num_ops, const uint32_t* data) override;
 
   private:
+    struct CompressedBuffer {
+        size_t compression_factor;
+        std::basic_string<uint8_t> compressed_data;
+    };
     void SetupHeaders();
     bool NeedsFlush() const;
     bool ParseOptions();
@@ -52,11 +60,38 @@
                         std::basic_string_view<struct iovec> data);
     bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
                     uint16_t offset, CowOperationType type);
+    bool ConstructCowOpCompressedBuffers(uint64_t new_block_start, const void* data,
+                                         uint64_t old_block, uint16_t offset, CowOperationType type,
+                                         size_t blocks_to_write);
     bool CheckOpCount(size_t op_count);
 
   private:
-    std::vector<std::basic_string<uint8_t>> CompressBlocks(const size_t num_blocks,
-                                                           const void* data);
+    std::vector<CompressedBuffer> ProcessBlocksWithNoCompression(const size_t num_blocks,
+                                                                 const void* data,
+                                                                 CowOperationType type);
+    std::vector<CompressedBuffer> ProcessBlocksWithCompression(const size_t num_blocks,
+                                                               const void* data,
+                                                               CowOperationType type);
+    std::vector<CompressedBuffer> ProcessBlocksWithThreadedCompression(const size_t num_blocks,
+                                                                       const void* data,
+                                                                       CowOperationType type);
+    std::vector<CompressedBuffer> CompressBlocks(const size_t num_blocks, const void* data,
+                                                 CowOperationType type);
+    size_t GetCompressionFactor(const size_t blocks_to_compress, CowOperationType type) const;
+
+    constexpr bool IsBlockAligned(const size_t size) {
+        // These are the only block size supported. Block size beyond 256k
+        // may impact random read performance post OTA boot.
+        const size_t values[] = {4_KiB, 8_KiB, 16_KiB, 32_KiB, 64_KiB, 128_KiB, 256_KiB};
+
+        auto it = std::lower_bound(std::begin(values), std::end(values), size);
+
+        if (it != std::end(values) && *it == size) {
+            return true;
+        }
+        return false;
+    }
+
     bool ReadBackVerification();
     bool FlushCacheOps();
     void InitWorkers();