libsnapshot: Deprecate the IByteSink API.

This introduces a replacement for the IByteSink API, which was never
really used in any advantageous way, and is not expected to be useful.
The new API attempts to fill an entire buffer rather than request
individual slices of a buffer.

This patch simply introduces the API and refactors tests. Subsequent
patches will replace IByteSink callers and remove the API.

Bug: 278637212
Test: cow_api_test
Change-Id: Ib740de5e65fee8d61f603b106752338cc8e95967
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
index ba75a8d..69079cb 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
@@ -15,7 +15,9 @@
 #pragma once
 
 #include <stdint.h>
-#include <string>
+
+#include <optional>
+#include <string_view>
 
 namespace android {
 namespace snapshot {
@@ -196,5 +198,8 @@
 // Ops that have dependencies on old blocks, and must take care in their merge order
 bool IsOrderedOp(const CowOperation& op);
 
+// Convert compression name to internal value.
+std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::string_view name);
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
index e8e4d72..8e61a21 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
@@ -65,6 +65,7 @@
 
     // Return the file header.
     virtual bool GetHeader(CowHeader* header) = 0;
+    virtual CowHeader& GetHeader() = 0;
 
     // Return the file footer.
     virtual bool GetFooter(CowFooter* footer) = 0;
@@ -85,6 +86,19 @@
     // Get decoded bytes from the data section, handling any decompression.
     // All retrieved data is passed to the sink.
     virtual bool ReadData(const CowOperation& op, IByteSink* sink) = 0;
+
+    // Get decoded bytes from the data section, handling any decompression.
+    //
+    // If ignore_bytes is non-zero, it specifies the initial number of bytes
+    // to skip writing to |buffer|.
+    //
+    // Returns the number of bytes written to |buffer|, or -1 on failure.
+    // errno is NOT set.
+    //
+    // Partial reads are not possible unless |buffer_size| is less than the
+    // operation block size.
+    virtual ssize_t ReadData(const CowOperation& op, void* buffer, size_t buffer_size,
+                             size_t ignore_bytes = 0) = 0;
 };
 
 // Iterate over a sequence of COW operations.
@@ -140,6 +154,10 @@
     std::unique_ptr<ICowOpIter> GetMergeOpIter(bool ignore_progress = false) override;
 
     bool ReadData(const CowOperation& op, IByteSink* sink) override;
+    ssize_t ReadData(const CowOperation& op, void* buffer, size_t buffer_size,
+                     size_t ignore_bytes = 0) override;
+
+    CowHeader& GetHeader() override { return header_; }
 
     bool GetRawBytes(uint64_t offset, void* buffer, size_t len, size_t* read);
 
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
index 862ce55..f05aeb2 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
@@ -24,6 +24,7 @@
 #include <gtest/gtest.h>
 #include <libsnapshot/cow_reader.h>
 #include <libsnapshot/cow_writer.h>
+#include "cow_decompress.h"
 
 using testing::AssertionFailure;
 using testing::AssertionResult;
@@ -44,23 +45,10 @@
     std::unique_ptr<TemporaryFile> cow_;
 };
 
-// Sink that always appends to the end of a string.
-class StringSink : public IByteSink {
-  public:
-    void* GetBuffer(size_t requested, size_t* actual) override {
-        size_t old_size = stream_.size();
-        stream_.resize(old_size + requested, '\0');
-        *actual = requested;
-        return stream_.data() + old_size;
-    }
-    bool ReturnData(void*, size_t) override { return true; }
-    void Reset() { stream_.clear(); }
-
-    std::string& stream() { return stream_; }
-
-  private:
-    std::string stream_;
-};
+// Helper to check read sizes.
+static inline bool ReadData(CowReader& reader, const CowOperation& op, void* buffer, size_t size) {
+    return reader.ReadData(op, buffer, size) == size;
+}
 
 TEST_F(CowTest, CopyContiguous) {
     CowOptions options;
@@ -144,7 +132,7 @@
     ASSERT_EQ(op->new_block, 10);
     ASSERT_EQ(op->source, 20);
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -154,8 +142,8 @@
     ASSERT_EQ(op->compression, kCowCompressNone);
     ASSERT_EQ(op->data_length, 4096);
     ASSERT_EQ(op->new_block, 50);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data);
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -222,7 +210,7 @@
     ASSERT_EQ(op->new_block, 10);
     ASSERT_EQ(op->source, 20);
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -233,8 +221,8 @@
     ASSERT_EQ(op->data_length, 4096);
     ASSERT_EQ(op->new_block, 50);
     ASSERT_EQ(op->source, 98314);  // 4096 * 24 + 10
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data);
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -285,22 +273,22 @@
     ASSERT_FALSE(iter->Done());
     auto op = &iter->Get();
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     ASSERT_EQ(op->type, kCowReplaceOp);
     ASSERT_EQ(op->compression, kCowCompressGz);
     ASSERT_EQ(op->data_length, 56);  // compressed!
     ASSERT_EQ(op->new_block, 50);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data);
 
     iter->Next();
     ASSERT_TRUE(iter->Done());
 }
 
-class CompressionRWTest : public CowTest, public testing::WithParamInterface<const char*> {};
+class CompressionTest : public CowTest, public testing::WithParamInterface<const char*> {};
 
-TEST_P(CompressionRWTest, ThreadedBatchWrites) {
+TEST_P(CompressionTest, ThreadedBatchWrites) {
     CowOptions options;
     options.compression = GetParam();
     options.num_compress_threads = 2;
@@ -342,31 +330,32 @@
 
         if (op->type == kCowXorOp) {
             total_blocks += 1;
-            StringSink sink;
+            std::string sink(xor_data.size(), '\0');
             ASSERT_EQ(op->new_block, 50);
             ASSERT_EQ(op->source, 98314);  // 4096 * 24 + 10
-            ASSERT_TRUE(reader.ReadData(*op, &sink));
-            ASSERT_EQ(sink.stream(), xor_data);
+            ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+            ASSERT_EQ(sink, xor_data);
         }
 
         if (op->type == kCowReplaceOp) {
             total_blocks += 1;
             if (op->new_block == 100) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
                 data.resize(options.block_size);
-                ASSERT_EQ(sink.stream(), data);
+                std::string sink(data.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink.size(), data.size());
+                ASSERT_EQ(sink, data);
             }
             if (op->new_block == 6000) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
                 data2.resize(options.block_size);
-                ASSERT_EQ(sink.stream(), data2);
+                std::string sink(data2.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink, data2);
             }
             if (op->new_block == 9000) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
-                ASSERT_EQ(sink.stream(), data3);
+                std::string sink(data3.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink, data3);
             }
         }
 
@@ -376,7 +365,7 @@
     ASSERT_EQ(total_blocks, expected_blocks);
 }
 
-TEST_P(CompressionRWTest, NoBatchWrites) {
+TEST_P(CompressionTest, NoBatchWrites) {
     CowOptions options;
     options.compression = GetParam();
     options.num_compress_threads = 1;
@@ -416,21 +405,21 @@
         if (op->type == kCowReplaceOp) {
             total_blocks += 1;
             if (op->new_block == 50) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
                 data.resize(options.block_size);
-                ASSERT_EQ(sink.stream(), data);
+                std::string sink(data.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink, data);
             }
             if (op->new_block == 3000) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
                 data2.resize(options.block_size);
-                ASSERT_EQ(sink.stream(), data2);
+                std::string sink(data2.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink, data2);
             }
             if (op->new_block == 5000) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
-                ASSERT_EQ(sink.stream(), data3);
+                std::string sink(data3.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink, data3);
             }
         }
 
@@ -440,7 +429,66 @@
     ASSERT_EQ(total_blocks, expected_blocks);
 }
 
-INSTANTIATE_TEST_SUITE_P(CowApi, CompressionRWTest, testing::Values("none", "gz", "brotli", "lz4"));
+template <typename T>
+class HorribleStream : public IByteStream {
+  public:
+    HorribleStream(const std::basic_string<T>& input) : input_(input) {}
+
+    ssize_t Read(void* buffer, size_t length) override {
+        if (pos_ >= input_.size()) {
+            return 0;
+        }
+        if (length) {
+            *reinterpret_cast<char*>(buffer) = input_[pos_];
+        }
+        pos_++;
+        return 1;
+    }
+    size_t Size() const override { return input_.size(); }
+
+  private:
+    std::basic_string<T> input_;
+    size_t pos_ = 0;
+};
+
+TEST(HorribleStream, ReadFully) {
+    std::string expected = "this is some data";
+
+    HorribleStream<char> stream(expected);
+
+    std::string buffer(expected.size(), '\0');
+    ASSERT_TRUE(stream.ReadFully(buffer.data(), buffer.size()));
+    ASSERT_EQ(buffer, expected);
+}
+
+TEST_P(CompressionTest, HorribleStream) {
+    if (strcmp(GetParam(), "none") == 0) {
+        GTEST_SKIP();
+    }
+
+    auto algorithm = CompressionAlgorithmFromString(GetParam());
+    ASSERT_TRUE(algorithm.has_value());
+
+    std::string expected = "The quick brown fox jumps over the lazy dog.";
+    expected.resize(4096, '\0');
+
+    auto result = CompressWorker::Compress(*algorithm, expected.data(), expected.size());
+    ASSERT_FALSE(result.empty());
+
+    HorribleStream<uint8_t> stream(result);
+    auto decomp = IDecompressor::FromString(GetParam());
+    ASSERT_NE(decomp, nullptr);
+    decomp->set_stream(&stream);
+
+    expected = expected.substr(10, 500);
+
+    std::string buffer(expected.size(), '\0');
+    ASSERT_EQ(decomp->Decompress(buffer.data(), 500, 4096, 10), 500);
+    ASSERT_EQ(buffer, expected);
+}
+
+INSTANTIATE_TEST_SUITE_P(AllCompressors, CompressionTest,
+                         testing::Values("none", "gz", "brotli", "lz4"));
 
 TEST_F(CowTest, ClusterCompressGz) {
     CowOptions options;
@@ -470,14 +518,14 @@
     ASSERT_FALSE(iter->Done());
     auto op = &iter->Get();
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     ASSERT_EQ(op->type, kCowReplaceOp);
     ASSERT_EQ(op->compression, kCowCompressGz);
     ASSERT_EQ(op->data_length, 56);  // compressed!
     ASSERT_EQ(op->new_block, 50);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data);
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -489,12 +537,13 @@
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
 
-    sink.Reset();
+    sink = {};
+    sink.resize(data2.size(), '\0');
     ASSERT_EQ(op->compression, kCowCompressGz);
     ASSERT_EQ(op->data_length, 41);  // compressed!
     ASSERT_EQ(op->new_block, 51);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data2);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data2);
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -531,55 +580,15 @@
     iter->Next();
     ASSERT_FALSE(iter->Done());
 
-    StringSink sink;
+    std::string sink(options.block_size, '\0');
 
     auto op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
     ASSERT_EQ(op->compression, kCowCompressGz);
     ASSERT_EQ(op->new_block, 51);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
 }
 
-// Only return 1-byte buffers, to stress test the partial read logic in
-// CowReader.
-class HorribleStringSink : public StringSink {
-  public:
-    void* GetBuffer(size_t, size_t* actual) override { return StringSink::GetBuffer(1, actual); }
-};
-
-class CompressionTest : public CowTest, public testing::WithParamInterface<const char*> {};
-
-TEST_P(CompressionTest, HorribleSink) {
-    CowOptions options;
-    options.compression = GetParam();
-    options.cluster_ops = 0;
-    CowWriter writer(options);
-
-    ASSERT_TRUE(writer.Initialize(cow_->fd));
-
-    std::string data = "This is some data, believe it";
-    data.resize(options.block_size, '\0');
-
-    ASSERT_TRUE(writer.AddRawBlocks(50, data.data(), data.size()));
-    ASSERT_TRUE(writer.Finalize());
-
-    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
-
-    CowReader reader;
-    ASSERT_TRUE(reader.Parse(cow_->fd));
-
-    auto iter = reader.GetOpIter();
-    ASSERT_NE(iter, nullptr);
-    ASSERT_FALSE(iter->Done());
-
-    HorribleStringSink sink;
-    auto op = &iter->Get();
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
-}
-
-INSTANTIATE_TEST_SUITE_P(CowApi, CompressionTest, testing::Values("none", "gz", "brotli"));
-
 TEST_F(CowTest, GetSize) {
     CowOptions options;
     options.cluster_ops = 0;
@@ -641,7 +650,7 @@
     ASSERT_TRUE(reader.GetLastLabel(&label));
     ASSERT_EQ(label, 3);
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
@@ -649,11 +658,12 @@
     ASSERT_FALSE(iter->Done());
     auto op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data);
 
     iter->Next();
-    sink.Reset();
+    sink = {};
+    sink.resize(data2.size(), '\0');
 
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
@@ -665,8 +675,8 @@
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data2);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data2);
 
     iter->Next();
     ASSERT_TRUE(iter->Done());
@@ -705,8 +715,6 @@
     CowReader reader;
     ASSERT_TRUE(reader.Parse(cow_->fd));
 
-    StringSink sink;
-
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
 
@@ -765,8 +773,6 @@
     CowReader reader;
     ASSERT_TRUE(reader.Parse(cow_->fd));
 
-    StringSink sink;
-
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
 
@@ -816,7 +822,7 @@
     CowReader reader;
     ASSERT_TRUE(reader.Parse(cow_->fd));
 
-    StringSink sink;
+    std::string sink(options.block_size, '\0');
 
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
@@ -824,20 +830,20 @@
     ASSERT_FALSE(iter->Done());
     auto op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data.substr(0, options.block_size));
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data.substr(0, options.block_size));
 
     iter->Next();
-    sink.Reset();
+    sink = {};
+    sink.resize(options.block_size, '\0');
 
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data.substr(options.block_size, 2 * options.block_size));
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data.substr(options.block_size, 2 * options.block_size));
 
     iter->Next();
-    sink.Reset();
 
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
@@ -897,7 +903,7 @@
     CowReader reader;
     ASSERT_TRUE(reader.Parse(cow_->fd));
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
@@ -905,11 +911,10 @@
     ASSERT_FALSE(iter->Done());
     auto op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data.substr(0, options.block_size));
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data.substr(0, options.block_size));
 
     iter->Next();
-    sink.Reset();
 
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
@@ -997,7 +1002,7 @@
     ASSERT_TRUE(reader.GetLastLabel(&label));
     ASSERT_EQ(label, 50);
 
-    StringSink sink;
+    std::string sink(data2.size(), '\0');
 
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
@@ -1012,8 +1017,8 @@
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data2);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data2);
 
     iter->Next();
 
@@ -1066,13 +1071,13 @@
     std::string cmp = data;
     cmp.resize(header.block_size, '\0');
 
-    StringSink sink;
-    if (!reader->ReadData(op, &sink)) {
+    std::string sink(cmp.size(), '\0');
+    if (!reader->ReadData(op, sink.data(), sink.size())) {
         return AssertionFailure() << "Failed to read data block";
     }
-    if (cmp != sink.stream()) {
+    if (cmp != sink) {
         return AssertionFailure() << "Data blocks did not match, expected " << cmp << ", got "
-                                  << sink.stream();
+                                  << sink;
     }
 
     return AssertionSuccess();
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
index 9b50986..d06c904 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
@@ -32,6 +32,21 @@
 
 namespace android {
 namespace snapshot {
+
+std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::string_view name) {
+    if (name == "gz") {
+        return {kCowCompressGz};
+    } else if (name == "brotli") {
+        return {kCowCompressBrotli};
+    } else if (name == "lz4") {
+        return {kCowCompressLz4};
+    } else if (name == "none" || name.empty()) {
+        return {kCowCompressNone};
+    } else {
+        return {};
+    }
+}
+
 std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
     return Compress(compression_, data, length);
 }
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp
index 139a29f..483d559 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp
@@ -16,6 +16,7 @@
 
 #include "cow_decompress.h"
 
+#include <array>
 #include <utility>
 
 #include <android-base/logging.h>
@@ -26,9 +27,50 @@
 namespace android {
 namespace snapshot {
 
+ssize_t IByteStream::ReadFully(void* buffer, size_t buffer_size) {
+    size_t stream_remaining = Size();
+
+    char* buffer_start = reinterpret_cast<char*>(buffer);
+    char* buffer_pos = buffer_start;
+    size_t buffer_remaining = buffer_size;
+    while (stream_remaining) {
+        const size_t to_read = std::min(buffer_remaining, stream_remaining);
+        const ssize_t actual_read = Read(buffer_pos, to_read);
+        if (actual_read < 0) {
+            return -1;
+        }
+        if (!actual_read) {
+            LOG(ERROR) << "Stream ended prematurely";
+            return -1;
+        }
+        CHECK_LE(actual_read, to_read);
+
+        stream_remaining -= actual_read;
+        buffer_pos += actual_read;
+        buffer_remaining -= actual_read;
+    }
+    return buffer_pos - buffer_start;
+}
+
+std::unique_ptr<IDecompressor> IDecompressor::FromString(std::string_view compressor) {
+    if (compressor == "lz4") {
+        return IDecompressor::Lz4();
+    } else if (compressor == "brotli") {
+        return IDecompressor::Brotli();
+    } else if (compressor == "gz") {
+        return IDecompressor::Gz();
+    } else {
+        return nullptr;
+    }
+}
+
 class NoDecompressor final : public IDecompressor {
   public:
     bool Decompress(size_t) override;
+    ssize_t Decompress(void*, size_t, size_t, size_t) override {
+        LOG(ERROR) << "Not supported";
+        return -1;
+    }
 };
 
 bool NoDecompressor::Decompress(size_t) {
@@ -45,8 +87,8 @@
         uint8_t* buffer_pos = buffer;
         size_t bytes_to_read = std::min(buffer_size, stream_remaining);
         while (bytes_to_read) {
-            size_t read;
-            if (!stream_->Read(buffer_pos, bytes_to_read, &read)) {
+            ssize_t read = stream_->Read(buffer_pos, bytes_to_read);
+            if (read < 0) {
                 return false;
             }
             if (!read) {
@@ -73,10 +115,13 @@
 class StreamDecompressor : public IDecompressor {
   public:
     bool Decompress(size_t output_bytes) override;
+    ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size,
+                       size_t ignore_bytes) override;
 
     virtual bool Init() = 0;
     virtual bool DecompressInput(const uint8_t* data, size_t length) = 0;
-    virtual bool Done() = 0;
+    virtual bool PartialDecompress(const uint8_t* data, size_t length) = 0;
+    bool OutputFull() const { return !ignore_bytes_ && !output_buffer_remaining_; }
 
   protected:
     bool GetFreshBuffer();
@@ -85,6 +130,8 @@
     size_t stream_remaining_;
     uint8_t* output_buffer_ = nullptr;
     size_t output_buffer_remaining_ = 0;
+    size_t ignore_bytes_ = 0;
+    bool decompressor_ended_ = false;
 };
 
 static constexpr size_t kChunkSize = 4096;
@@ -99,8 +146,9 @@
 
     uint8_t chunk[kChunkSize];
     while (stream_remaining_) {
-        size_t read = std::min(stream_remaining_, sizeof(chunk));
-        if (!stream_->Read(chunk, read, &read)) {
+        size_t max_read = std::min(stream_remaining_, sizeof(chunk));
+        ssize_t read = stream_->Read(chunk, max_read);
+        if (read < 0) {
             return false;
         }
         if (!read) {
@@ -113,18 +161,65 @@
 
         stream_remaining_ -= read;
 
-        if (stream_remaining_ && Done()) {
+        if (stream_remaining_ && decompressor_ended_) {
             LOG(ERROR) << "Decompressor terminated early";
             return false;
         }
     }
-    if (!Done()) {
+    if (!decompressor_ended_) {
         LOG(ERROR) << "Decompressor expected more bytes";
         return false;
     }
     return true;
 }
 
+ssize_t StreamDecompressor::Decompress(void* buffer, size_t buffer_size, size_t,
+                                       size_t ignore_bytes) {
+    if (!Init()) {
+        return false;
+    }
+
+    stream_remaining_ = stream_->Size();
+    output_buffer_ = reinterpret_cast<uint8_t*>(buffer);
+    output_buffer_remaining_ = buffer_size;
+    ignore_bytes_ = ignore_bytes;
+
+    uint8_t chunk[kChunkSize];
+    while (stream_remaining_ && output_buffer_remaining_ && !decompressor_ended_) {
+        size_t max_read = std::min(stream_remaining_, sizeof(chunk));
+        ssize_t read = stream_->Read(chunk, max_read);
+        if (read < 0) {
+            return -1;
+        }
+        if (!read) {
+            LOG(ERROR) << "Stream ended prematurely";
+            return -1;
+        }
+        if (!PartialDecompress(chunk, read)) {
+            return -1;
+        }
+        stream_remaining_ -= read;
+    }
+
+    if (stream_remaining_) {
+        if (decompressor_ended_ && !OutputFull()) {
+            // If there's more input in the stream, but we haven't finished
+            // consuming ignored bytes or available output space yet, then
+            // something weird happened. Report it and fail.
+            LOG(ERROR) << "Decompressor terminated early";
+            return -1;
+        }
+    } else {
+        if (!decompressor_ended_ && !OutputFull()) {
+            // The stream ended, but the decoder doesn't think so, and there are
+            // more bytes in the output buffer.
+            LOG(ERROR) << "Decompressor expected more bytes";
+            return -1;
+        }
+    }
+    return buffer_size - output_buffer_remaining_;
+}
+
 bool StreamDecompressor::GetFreshBuffer() {
     size_t request_size = std::min(output_bytes_, kChunkSize);
     output_buffer_ =
@@ -142,11 +237,10 @@
 
     bool Init() override;
     bool DecompressInput(const uint8_t* data, size_t length) override;
-    bool Done() override { return ended_; }
+    bool PartialDecompress(const uint8_t* data, size_t length) override;
 
   private:
     z_stream z_ = {};
-    bool ended_ = false;
 };
 
 bool GzDecompressor::Init() {
@@ -198,7 +292,61 @@
                 LOG(ERROR) << "Gz stream ended prematurely";
                 return false;
             }
-            ended_ = true;
+            decompressor_ended_ = true;
+            return true;
+        }
+    }
+    return true;
+}
+
+bool GzDecompressor::PartialDecompress(const uint8_t* data, size_t length) {
+    z_.next_in = reinterpret_cast<Bytef*>(const_cast<uint8_t*>(data));
+    z_.avail_in = length;
+
+    // If we're asked to ignore starting bytes, we sink those into the output
+    // repeatedly until there is nothing left to ignore.
+    while (ignore_bytes_ && z_.avail_in) {
+        std::array<Bytef, kChunkSize> ignore_buffer;
+        size_t max_ignore = std::min(ignore_bytes_, ignore_buffer.size());
+        z_.next_out = ignore_buffer.data();
+        z_.avail_out = max_ignore;
+
+        int rv = inflate(&z_, Z_NO_FLUSH);
+        if (rv != Z_OK && rv != Z_STREAM_END) {
+            LOG(ERROR) << "inflate returned error code " << rv;
+            return false;
+        }
+
+        size_t returned = max_ignore - z_.avail_out;
+        CHECK_LE(returned, ignore_bytes_);
+
+        ignore_bytes_ -= returned;
+
+        if (rv == Z_STREAM_END) {
+            decompressor_ended_ = true;
+            return true;
+        }
+    }
+
+    z_.next_out = reinterpret_cast<Bytef*>(output_buffer_);
+    z_.avail_out = output_buffer_remaining_;
+
+    while (z_.avail_in && z_.avail_out) {
+        // Decompress.
+        int rv = inflate(&z_, Z_NO_FLUSH);
+        if (rv != Z_OK && rv != Z_STREAM_END) {
+            LOG(ERROR) << "inflate returned error code " << rv;
+            return false;
+        }
+
+        size_t returned = output_buffer_remaining_ - z_.avail_out;
+        CHECK_LE(returned, output_buffer_remaining_);
+
+        output_buffer_ += returned;
+        output_buffer_remaining_ -= returned;
+
+        if (rv == Z_STREAM_END) {
+            decompressor_ended_ = true;
             return true;
         }
     }
@@ -215,7 +363,7 @@
 
     bool Init() override;
     bool DecompressInput(const uint8_t* data, size_t length) override;
-    bool Done() override { return BrotliDecoderIsFinished(decoder_); }
+    bool PartialDecompress(const uint8_t* data, size_t length) override;
 
   private:
     BrotliDecoderState* decoder_ = nullptr;
@@ -257,6 +405,44 @@
     return true;
 }
 
+bool BrotliDecompressor::PartialDecompress(const uint8_t* data, size_t length) {
+    size_t available_in = length;
+    const uint8_t* next_in = data;
+
+    while (available_in && ignore_bytes_ && !BrotliDecoderIsFinished(decoder_)) {
+        std::array<uint8_t, kChunkSize> ignore_buffer;
+        size_t max_ignore = std::min(ignore_bytes_, ignore_buffer.size());
+        size_t ignore_size = max_ignore;
+
+        uint8_t* ignore_buffer_ptr = ignore_buffer.data();
+        auto r = BrotliDecoderDecompressStream(decoder_, &available_in, &next_in, &ignore_size,
+                                               &ignore_buffer_ptr, nullptr);
+        if (r == BROTLI_DECODER_RESULT_ERROR) {
+            LOG(ERROR) << "brotli decode failed";
+            return false;
+        } else if (r == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && available_in) {
+            LOG(ERROR) << "brotli unexpected needs more input";
+            return false;
+        }
+        ignore_bytes_ -= max_ignore - ignore_size;
+    }
+
+    while (available_in && !BrotliDecoderIsFinished(decoder_)) {
+        auto r = BrotliDecoderDecompressStream(decoder_, &available_in, &next_in,
+                                               &output_buffer_remaining_, &output_buffer_, nullptr);
+        if (r == BROTLI_DECODER_RESULT_ERROR) {
+            LOG(ERROR) << "brotli decode failed";
+            return false;
+        } else if (r == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && available_in) {
+            LOG(ERROR) << "brotli unexpected needs more input";
+            return false;
+        }
+    }
+
+    decompressor_ended_ = BrotliDecoderIsFinished(decoder_);
+    return true;
+}
+
 std::unique_ptr<IDecompressor> IDecompressor::Brotli() {
     return std::unique_ptr<IDecompressor>(new BrotliDecompressor());
 }
@@ -275,8 +461,7 @@
         }
         // If input size is same as output size, then input is uncompressed.
         if (stream_->Size() == output_size) {
-            size_t bytes_read = 0;
-            stream_->Read(output_buffer, output_size, &bytes_read);
+            ssize_t bytes_read = stream_->ReadFully(output_buffer, output_size);
             if (bytes_read != output_size) {
                 LOG(ERROR) << "Failed to read all input at once. Expected: " << output_size
                            << " actual: " << bytes_read;
@@ -287,8 +472,7 @@
         }
         std::string input_buffer;
         input_buffer.resize(stream_->Size());
-        size_t bytes_read = 0;
-        stream_->Read(input_buffer.data(), input_buffer.size(), &bytes_read);
+        ssize_t bytes_read = stream_->ReadFully(input_buffer.data(), input_buffer.size());
         if (bytes_read != input_buffer.size()) {
             LOG(ERROR) << "Failed to read all input at once. Expected: " << input_buffer.size()
                        << " actual: " << bytes_read;
@@ -305,6 +489,61 @@
         sink_->ReturnData(output_buffer, output_size);
         return true;
     }
+
+    ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size,
+                       size_t ignore_bytes) override {
+        std::string input_buffer(stream_->Size(), '\0');
+        ssize_t streamed_in = stream_->ReadFully(input_buffer.data(), input_buffer.size());
+        if (streamed_in < 0) {
+            return -1;
+        }
+        CHECK_EQ(streamed_in, stream_->Size());
+
+        char* decode_buffer = reinterpret_cast<char*>(buffer);
+        size_t decode_buffer_size = buffer_size;
+
+        // It's unclear if LZ4 can exactly satisfy a partial decode request, so
+        // if we get one, create a temporary buffer.
+        std::string temp;
+        if (buffer_size < decompressed_size) {
+            temp.resize(decompressed_size, '\0');
+            decode_buffer = temp.data();
+            decode_buffer_size = temp.size();
+        }
+
+        const int bytes_decompressed = LZ4_decompress_safe(input_buffer.data(), decode_buffer,
+                                                           input_buffer.size(), decode_buffer_size);
+        if (bytes_decompressed < 0) {
+            LOG(ERROR) << "Failed to decompress LZ4 block, code: " << bytes_decompressed;
+            return -1;
+        }
+        if (bytes_decompressed != decompressed_size) {
+            LOG(ERROR) << "Failed to decompress LZ4 block, expected output size: "
+                       << bytes_decompressed << ", actual: " << bytes_decompressed;
+            return -1;
+        }
+        CHECK_LE(bytes_decompressed, decode_buffer_size);
+
+        if (ignore_bytes > bytes_decompressed) {
+            LOG(ERROR) << "Ignoring more bytes than exist in stream (ignoring " << ignore_bytes
+                       << ", got " << bytes_decompressed << ")";
+            return -1;
+        }
+
+        if (temp.empty()) {
+            // LZ4's API has no way to sink out the first N bytes of decoding,
+            // so we read them all in and memmove() to drop the partial read.
+            if (ignore_bytes) {
+                memmove(decode_buffer, decode_buffer + ignore_bytes,
+                        bytes_decompressed - ignore_bytes);
+            }
+            return bytes_decompressed - ignore_bytes;
+        }
+
+        size_t max_copy = std::min(bytes_decompressed - ignore_bytes, buffer_size);
+        memcpy(buffer, temp.data() + ignore_bytes, max_copy);
+        return max_copy;
+    }
 };
 
 std::unique_ptr<IDecompressor> IDecompressor::Lz4() {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h
index 7f74eda..09164d3 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h
@@ -26,11 +26,16 @@
     virtual ~IByteStream() {}
 
     // Read up to |length| bytes, storing the number of bytes read in the out-
-    // parameter. If the end of the stream is reached, 0 is returned.
-    virtual bool Read(void* buffer, size_t length, size_t* read) = 0;
+    // parameter. If the end of the stream is reached, 0 is returned. On error,
+    // -1 is returned. errno is NOT set.
+    virtual ssize_t Read(void* buffer, size_t length) = 0;
 
     // Size of the stream.
     virtual size_t Size() const = 0;
+
+    // Helper for Read(). Read the entire stream into |buffer|, up to |length|
+    // bytes.
+    ssize_t ReadFully(void* buffer, size_t length);
 };
 
 class IDecompressor {
@@ -43,9 +48,21 @@
     static std::unique_ptr<IDecompressor> Brotli();
     static std::unique_ptr<IDecompressor> Lz4();
 
+    static std::unique_ptr<IDecompressor> FromString(std::string_view compressor);
+
     // |output_bytes| is the expected total number of bytes to sink.
     virtual bool Decompress(size_t output_bytes) = 0;
 
+    // Decompress at most |buffer_size| bytes, ignoring the first |ignore_bytes|
+    // of the decoded stream. |buffer_size| must be at least one byte.
+    // |decompressed_size| is the expected total size if the entire stream were
+    // decompressed.
+    //
+    // Returns the number of bytes written to |buffer|, or -1 on error. errno
+    // is NOT set.
+    virtual ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size,
+                               size_t ignore_bytes = 0) = 0;
+
     void set_stream(IByteStream* stream) { stream_ = stream; }
     void set_sink(IByteSink* sink) { sink_ = sink; }
 
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
index 45be191..e583ff0 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
@@ -747,18 +747,18 @@
         remaining_ = data_length_;
     }
 
-    bool Read(void* buffer, size_t length, size_t* read) override {
+    ssize_t Read(void* buffer, size_t length) override {
         size_t to_read = std::min(length, remaining_);
         if (!to_read) {
-            *read = 0;
-            return true;
+            return 0;
         }
-        if (!reader_->GetRawBytes(offset_, buffer, to_read, read)) {
-            return false;
+        size_t read;
+        if (!reader_->GetRawBytes(offset_, buffer, to_read, &read)) {
+            return -1;
         }
-        offset_ += *read;
-        remaining_ -= *read;
-        return true;
+        offset_ += read;
+        remaining_ -= read;
+        return read;
     }
 
     size_t Size() const override { return data_length_; }
@@ -802,5 +802,44 @@
     return decompressor->Decompress(header_.block_size);
 }
 
+ssize_t CowReader::ReadData(const CowOperation& op, void* buffer, size_t buffer_size,
+                            size_t ignore_bytes) {
+    std::unique_ptr<IDecompressor> decompressor;
+    switch (op.compression) {
+        case kCowCompressNone:
+            break;
+        case kCowCompressGz:
+            decompressor = IDecompressor::Gz();
+            break;
+        case kCowCompressBrotli:
+            decompressor = IDecompressor::Brotli();
+            break;
+        case kCowCompressLz4:
+            if (header_.block_size != op.data_length) {
+                decompressor = IDecompressor::Lz4();
+            }
+            break;
+        default:
+            LOG(ERROR) << "Unknown compression type: " << op.compression;
+            return -1;
+    }
+
+    uint64_t offset;
+    if (op.type == kCowXorOp) {
+        offset = data_loc_->at(op.new_block);
+    } else {
+        offset = op.source;
+    }
+
+    if (!decompressor) {
+        CowDataStream stream(this, offset + ignore_bytes, op.data_length - ignore_bytes);
+        return stream.ReadFully(buffer, buffer_size);
+    }
+
+    CowDataStream stream(this, offset, op.data_length);
+    decompressor->set_stream(&stream);
+    return decompressor->Decompress(buffer, buffer_size, header_.block_size, ignore_bytes);
+}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
index 56b48f0..042ffb4 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
@@ -37,6 +37,14 @@
 #include <sys/ioctl.h>
 #include <unistd.h>
 
+// The info messages here are spammy, but as useful for update_engine. Disable
+// them when running on the host.
+#ifdef __ANDROID__
+#define LOG_INFO LOG(INFO)
+#else
+#define LOG_INFO LOG(VERBOSE)
+#endif
+
 namespace android {
 namespace snapshot {
 
@@ -194,18 +202,13 @@
 }
 
 bool CowWriter::ParseOptions() {
-    if (options_.compression == "gz") {
-        compression_ = kCowCompressGz;
-    } else if (options_.compression == "brotli") {
-        compression_ = kCowCompressBrotli;
-    } else if (options_.compression == "lz4") {
-        compression_ = kCowCompressLz4;
-    } else if (options_.compression == "none") {
-        compression_ = kCowCompressNone;
-    } else if (!options_.compression.empty()) {
+    auto algorithm = CompressionAlgorithmFromString(options_.compression);
+    if (!algorithm) {
         LOG(ERROR) << "unrecognized compression: " << options_.compression;
         return false;
     }
+    compression_ = *algorithm;
+
     if (options_.cluster_ops == 1) {
         LOG(ERROR) << "Clusters must contain at least two operations to function.";
         return false;
@@ -239,10 +242,10 @@
                 return false;
             }
             cow_image_size_ = size_in_bytes;
-            LOG(INFO) << "COW image " << file_path << " has size " << size_in_bytes;
+            LOG_INFO << "COW image " << file_path << " has size " << size_in_bytes;
         } else {
-            LOG(INFO) << "COW image " << file_path
-                      << " is not a block device, assuming unlimited space.";
+            LOG_INFO << "COW image " << file_path
+                     << " is not a block device, assuming unlimited space.";
         }
     }
     return true;
@@ -271,12 +274,12 @@
     }
 
     std::string batch_write = batch_write_ ? "enabled" : "disabled";
-    LOG(INFO) << "Batch writes: " << batch_write;
+    LOG_INFO << "Batch writes: " << batch_write;
 }
 
 void CowWriter::InitWorkers() {
     if (num_compress_threads_ <= 1) {
-        LOG(INFO) << "Not creating new threads for compression.";
+        LOG_INFO << "Not creating new threads for compression.";
         return;
     }
     for (int i = 0; i < num_compress_threads_; i++) {
@@ -285,7 +288,7 @@
         compress_threads_.push_back(std::move(wt));
     }
 
-    LOG(INFO) << num_compress_threads_ << " thread used for compression";
+    LOG_INFO << num_compress_threads_ << " thread used for compression";
 }
 
 bool CowWriter::Initialize(unique_fd&& fd) {