libsnapshot: Add single threaded compression to v3 writer

Add compression path back into Cow operations. Main change is that the
compression algorithm is stored in the header instead of each individaul
op. Have the writer_v3 set this algorithm when parsing options.

There looks to be a lot of code we'll be able to factor out into the
base class, but we can leave that to a later CL.

Test: cow_api_test
Change-Id: Ie9a8eceb5fbdaecae50911119c75f2e51d776a28
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
index 296987a..cacd29d 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
@@ -685,7 +685,7 @@
     } else {
         offset = GetCowOpSourceInfoData(*op);
     }
-    if (!decompressor) {
+    if (!decompressor || op->data_length == header_.block_size) {
         CowDataStream stream(this, offset + ignore_bytes, op->data_length - ignore_bytes);
         return stream.ReadFully(buffer, buffer_size);
     }
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
index d39c4a7..d0af0f9 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
@@ -339,15 +339,16 @@
     ASSERT_EQ(i, 5);
 }
 
-TEST_F(CowTestV3, AllOps) {
+TEST_F(CowTestV3, AllOpsWithCompression) {
     CowOptions options;
+    options.compression = "gz";
     options.op_count_max = 100;
     auto writer = CreateCowWriter(3, options, GetCowFd());
 
     std::string data;
     data.resize(options.block_size * 5);
     for (int i = 0; i < data.size(); i++) {
-        data[i] = char(rand() % 256);
+        data[i] = char(rand() % 4);
     }
 
     ASSERT_TRUE(writer->AddZeroBlocks(10, 5));
@@ -397,7 +398,6 @@
     while (i < 5) {
         auto op = iter->Get();
         ASSERT_EQ(op->type, kCowReplaceOp);
-        ASSERT_EQ(op->data_length, options.block_size);
         ASSERT_EQ(op->new_block, 18 + i);
         ASSERT_TRUE(
                 ReadData(reader, op, sink.data() + (i * options.block_size), options.block_size));
@@ -411,7 +411,6 @@
     while (i < 5) {
         auto op = iter->Get();
         ASSERT_EQ(op->type, kCowXorOp);
-        ASSERT_EQ(op->data_length, 4096);
         ASSERT_EQ(op->new_block, 50 + i);
         ASSERT_EQ(GetCowOpSourceInfoData(*op), 98314 + (i * options.block_size));  // 4096 * 24 + 10
         ASSERT_TRUE(
@@ -421,5 +420,42 @@
     }
     ASSERT_EQ(sink, data);
 }
+
+TEST_F(CowTestV3, GzCompression) {
+    CowOptions options;
+    options.op_count_max = 100;
+    options.compression = "gz";
+    auto writer = CreateCowWriter(3, options, GetCowFd());
+
+    std::string data = "This is some data, believe it";
+    data.resize(options.block_size, '\0');
+
+    ASSERT_TRUE(writer->AddRawBlocks(50, data.data(), data.size()));
+    ASSERT_TRUE(writer->Finalize());
+
+    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+
+    auto header = reader.header_v3();
+    ASSERT_EQ(header.compression_algorithm, kCowCompressGz);
+
+    auto iter = reader.GetOpIter();
+    ASSERT_NE(iter, nullptr);
+    ASSERT_FALSE(iter->AtEnd());
+    auto op = iter->Get();
+
+    std::string sink(data.size(), '\0');
+
+    ASSERT_EQ(op->type, kCowReplaceOp);
+    ASSERT_EQ(op->data_length, 56);  // compressed!
+    ASSERT_EQ(op->new_block, 50);
+    ASSERT_TRUE(ReadData(reader, op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data);
+
+    iter->Next();
+    ASSERT_TRUE(iter->AtEnd());
+}
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
index 13b6157..86dd9f7 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
@@ -97,6 +97,8 @@
         LOG(ERROR) << "unrecognized compression: " << options_.compression;
         return false;
     }
+    header_.compression_algorithm = *algorithm;
+
     if (parts.size() > 1) {
         if (!android::base::ParseUint(parts[1], &compression_.compression_level)) {
             LOG(ERROR) << "failed to parse compression level invalid type: " << parts[1];
@@ -108,6 +110,7 @@
     }
 
     compression_.algorithm = *algorithm;
+    compressor_ = ICompressor::Create(compression_, header_.block_size);
     return true;
 }
 
@@ -191,27 +194,41 @@
 
 bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
                              uint64_t old_block, uint16_t offset, uint8_t type) {
-    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
     const size_t num_blocks = (size / header_.block_size);
-
     for (size_t i = 0; i < num_blocks; i++) {
+        const uint8_t* const iter =
+                reinterpret_cast<const uint8_t*>(data) + (header_.block_size * i);
+
         CowOperation op = {};
         op.new_block = new_block_start + i;
 
         op.type = type;
-        op.data_length = static_cast<uint16_t>(header_.block_size);
-
         if (type == kCowXorOp) {
             op.source_info = (old_block + i) * header_.block_size + offset;
         } else {
             op.source_info = next_data_pos_;
         }
-        if (!WriteOperation(op, iter, header_.block_size)) {
-            LOG(ERROR) << "AddRawBlocks: write failed";
+        std::basic_string<uint8_t> compressed_data;
+        const void* out_data = iter;
+
+        op.data_length = header_.block_size;
+
+        if (compression_.algorithm) {
+            if (!compressor_) {
+                PLOG(ERROR) << "Compressor not initialized";
+                return false;
+            }
+            compressed_data = compressor_->Compress(out_data, header_.block_size);
+            if (compressed_data.size() < op.data_length) {
+                out_data = compressed_data.data();
+                op.data_length = compressed_data.size();
+            }
+        }
+        if (!WriteOperation(op, out_data, op.data_length)) {
+            PLOG(ERROR) << "AddRawBlocks with compression: write failed. new block: "
+                        << new_block_start << " compression: " << compression_.algorithm;
             return false;
         }
-
-        iter += header_.block_size;
     }
 
     return true;
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
index af71a03..2347e91 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
@@ -46,6 +46,7 @@
     bool WriteOperation(const CowOperationV3& op, const void* data = nullptr, size_t size = 0);
     bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
                     uint16_t offset, uint8_t type);
+    bool CompressBlocks(size_t num_blocks, const void* data);
 
     off_t GetOpOffset(uint32_t op_index) const {
         CHECK_LT(op_index, header_.op_count_max);
@@ -56,9 +57,14 @@
   private:
     CowHeaderV3 header_{};
     CowCompression compression_;
+    // in the case that we are using one thread for compression, we can store and re-use the same
+    // compressor
+    std::unique_ptr<ICompressor> compressor_;
+    std::vector<std::unique_ptr<CompressWorker>> compress_threads_;
 
     uint64_t next_op_pos_ = 0;
     uint64_t next_data_pos_ = 0;
+    std::vector<std::basic_string<uint8_t>> compressed_buf_;
 
     // in the case that we are using one thread for compression, we can store and re-use the same
     // compressor