libsnapshot: Test batch writes and threaded compression

Test all compression algorithms.

Bug: 254188450
Test: cow_api_test
Change-Id: I977e631402eb2dfaa76205f5d8cb955e6d3bddbb
Signed-off-by: Akilesh Kailash <akailash@google.com>
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
index 2c1187f..862ce55 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
@@ -298,6 +298,150 @@
     ASSERT_TRUE(iter->Done());
 }
 
+class CompressionRWTest : public CowTest, public testing::WithParamInterface<const char*> {};
+
+TEST_P(CompressionRWTest, ThreadedBatchWrites) {
+    CowOptions options;
+    options.compression = GetParam();
+    options.num_compress_threads = 2;
+
+    CowWriter writer(options);
+
+    ASSERT_TRUE(writer.Initialize(cow_->fd));
+
+    std::string xor_data = "This is test data-1. Testing xor";
+    xor_data.resize(options.block_size, '\0');
+    ASSERT_TRUE(writer.AddXorBlocks(50, xor_data.data(), xor_data.size(), 24, 10));
+
+    std::string data = "This is test data-2. Testing replace ops";
+    data.resize(options.block_size * 2048, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(100, data.data(), data.size()));
+
+    std::string data2 = "This is test data-3. Testing replace ops";
+    data2.resize(options.block_size * 259, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(6000, data2.data(), data2.size()));
+
+    std::string data3 = "This is test data-4. Testing replace ops";
+    data3.resize(options.block_size, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(9000, data3.data(), data3.size()));
+
+    ASSERT_TRUE(writer.Finalize());
+
+    int expected_blocks = (1 + 2048 + 259 + 1);
+    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+
+    auto iter = reader.GetOpIter();
+    ASSERT_NE(iter, nullptr);
+
+    int total_blocks = 0;
+    while (!iter->Done()) {
+        auto op = &iter->Get();
+
+        if (op->type == kCowXorOp) {
+            total_blocks += 1;
+            StringSink sink;
+            ASSERT_EQ(op->new_block, 50);
+            ASSERT_EQ(op->source, 98314);  // 4096 * 24 + 10
+            ASSERT_TRUE(reader.ReadData(*op, &sink));
+            ASSERT_EQ(sink.stream(), xor_data);
+        }
+
+        if (op->type == kCowReplaceOp) {
+            total_blocks += 1;
+            if (op->new_block == 100) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                data.resize(options.block_size);
+                ASSERT_EQ(sink.stream(), data);
+            }
+            if (op->new_block == 6000) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                data2.resize(options.block_size);
+                ASSERT_EQ(sink.stream(), data2);
+            }
+            if (op->new_block == 9000) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                ASSERT_EQ(sink.stream(), data3);
+            }
+        }
+
+        iter->Next();
+    }
+
+    ASSERT_EQ(total_blocks, expected_blocks);
+}
+
+TEST_P(CompressionRWTest, NoBatchWrites) {
+    CowOptions options;
+    options.compression = GetParam();
+    options.num_compress_threads = 1;
+    options.cluster_ops = 0;
+
+    CowWriter writer(options);
+
+    ASSERT_TRUE(writer.Initialize(cow_->fd));
+
+    std::string data = "Testing replace ops without batch writes";
+    data.resize(options.block_size * 1024, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(50, data.data(), data.size()));
+
+    std::string data2 = "Testing odd blocks without batch writes";
+    data2.resize(options.block_size * 111, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(3000, data2.data(), data2.size()));
+
+    std::string data3 = "Testing single 4k block";
+    data3.resize(options.block_size, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(5000, data3.data(), data3.size()));
+
+    ASSERT_TRUE(writer.Finalize());
+
+    int expected_blocks = (1024 + 111 + 1);
+    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+
+    auto iter = reader.GetOpIter();
+    ASSERT_NE(iter, nullptr);
+
+    int total_blocks = 0;
+    while (!iter->Done()) {
+        auto op = &iter->Get();
+
+        if (op->type == kCowReplaceOp) {
+            total_blocks += 1;
+            if (op->new_block == 50) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                data.resize(options.block_size);
+                ASSERT_EQ(sink.stream(), data);
+            }
+            if (op->new_block == 3000) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                data2.resize(options.block_size);
+                ASSERT_EQ(sink.stream(), data2);
+            }
+            if (op->new_block == 5000) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                ASSERT_EQ(sink.stream(), data3);
+            }
+        }
+
+        iter->Next();
+    }
+
+    ASSERT_EQ(total_blocks, expected_blocks);
+}
+
+INSTANTIATE_TEST_SUITE_P(CowApi, CompressionRWTest, testing::Values("none", "gz", "brotli", "lz4"));
+
 TEST_F(CowTest, ClusterCompressGz) {
     CowOptions options;
     options.compression = "gz";