Merge "Update return values in main"
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
index ba75a8d..69079cb 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
@@ -15,7 +15,9 @@
 #pragma once
 
 #include <stdint.h>
-#include <string>
+
+#include <optional>
+#include <string_view>
 
 namespace android {
 namespace snapshot {
@@ -196,5 +198,8 @@
 // Ops that have dependencies on old blocks, and must take care in their merge order
 bool IsOrderedOp(const CowOperation& op);
 
+// Convert compression name to internal value.
+std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::string_view name);
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
index e8e4d72..8e61a21 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
@@ -65,6 +65,7 @@
 
     // Return the file header.
     virtual bool GetHeader(CowHeader* header) = 0;
+    virtual CowHeader& GetHeader() = 0;
 
     // Return the file footer.
     virtual bool GetFooter(CowFooter* footer) = 0;
@@ -85,6 +86,19 @@
     // Get decoded bytes from the data section, handling any decompression.
     // All retrieved data is passed to the sink.
     virtual bool ReadData(const CowOperation& op, IByteSink* sink) = 0;
+
+    // Get decoded bytes from the data section, handling any decompression.
+    //
+    // If ignore_bytes is non-zero, it specifies the initial number of bytes
+    // to skip writing to |buffer|.
+    //
+    // Returns the number of bytes written to |buffer|, or -1 on failure.
+    // errno is NOT set.
+    //
+    // Partial reads are not possible unless |buffer_size| is less than the
+    // operation block size.
+    virtual ssize_t ReadData(const CowOperation& op, void* buffer, size_t buffer_size,
+                             size_t ignore_bytes = 0) = 0;
 };
 
 // Iterate over a sequence of COW operations.
@@ -140,6 +154,10 @@
     std::unique_ptr<ICowOpIter> GetMergeOpIter(bool ignore_progress = false) override;
 
     bool ReadData(const CowOperation& op, IByteSink* sink) override;
+    ssize_t ReadData(const CowOperation& op, void* buffer, size_t buffer_size,
+                     size_t ignore_bytes = 0) override;
+
+    CowHeader& GetHeader() override { return header_; }
 
     bool GetRawBytes(uint64_t offset, void* buffer, size_t len, size_t* read);
 
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
index 862ce55..f05aeb2 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
@@ -24,6 +24,7 @@
 #include <gtest/gtest.h>
 #include <libsnapshot/cow_reader.h>
 #include <libsnapshot/cow_writer.h>
+#include "cow_decompress.h"
 
 using testing::AssertionFailure;
 using testing::AssertionResult;
@@ -44,23 +45,10 @@
     std::unique_ptr<TemporaryFile> cow_;
 };
 
-// Sink that always appends to the end of a string.
-class StringSink : public IByteSink {
-  public:
-    void* GetBuffer(size_t requested, size_t* actual) override {
-        size_t old_size = stream_.size();
-        stream_.resize(old_size + requested, '\0');
-        *actual = requested;
-        return stream_.data() + old_size;
-    }
-    bool ReturnData(void*, size_t) override { return true; }
-    void Reset() { stream_.clear(); }
-
-    std::string& stream() { return stream_; }
-
-  private:
-    std::string stream_;
-};
+// Helper to check read sizes.
+static inline bool ReadData(CowReader& reader, const CowOperation& op, void* buffer, size_t size) {
+    return reader.ReadData(op, buffer, size) == size;
+}
 
 TEST_F(CowTest, CopyContiguous) {
     CowOptions options;
@@ -144,7 +132,7 @@
     ASSERT_EQ(op->new_block, 10);
     ASSERT_EQ(op->source, 20);
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -154,8 +142,8 @@
     ASSERT_EQ(op->compression, kCowCompressNone);
     ASSERT_EQ(op->data_length, 4096);
     ASSERT_EQ(op->new_block, 50);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data);
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -222,7 +210,7 @@
     ASSERT_EQ(op->new_block, 10);
     ASSERT_EQ(op->source, 20);
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -233,8 +221,8 @@
     ASSERT_EQ(op->data_length, 4096);
     ASSERT_EQ(op->new_block, 50);
     ASSERT_EQ(op->source, 98314);  // 4096 * 24 + 10
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data);
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -285,22 +273,22 @@
     ASSERT_FALSE(iter->Done());
     auto op = &iter->Get();
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     ASSERT_EQ(op->type, kCowReplaceOp);
     ASSERT_EQ(op->compression, kCowCompressGz);
     ASSERT_EQ(op->data_length, 56);  // compressed!
     ASSERT_EQ(op->new_block, 50);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data);
 
     iter->Next();
     ASSERT_TRUE(iter->Done());
 }
 
-class CompressionRWTest : public CowTest, public testing::WithParamInterface<const char*> {};
+class CompressionTest : public CowTest, public testing::WithParamInterface<const char*> {};
 
-TEST_P(CompressionRWTest, ThreadedBatchWrites) {
+TEST_P(CompressionTest, ThreadedBatchWrites) {
     CowOptions options;
     options.compression = GetParam();
     options.num_compress_threads = 2;
@@ -342,31 +330,32 @@
 
         if (op->type == kCowXorOp) {
             total_blocks += 1;
-            StringSink sink;
+            std::string sink(xor_data.size(), '\0');
             ASSERT_EQ(op->new_block, 50);
             ASSERT_EQ(op->source, 98314);  // 4096 * 24 + 10
-            ASSERT_TRUE(reader.ReadData(*op, &sink));
-            ASSERT_EQ(sink.stream(), xor_data);
+            ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+            ASSERT_EQ(sink, xor_data);
         }
 
         if (op->type == kCowReplaceOp) {
             total_blocks += 1;
             if (op->new_block == 100) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
                 data.resize(options.block_size);
-                ASSERT_EQ(sink.stream(), data);
+                std::string sink(data.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink.size(), data.size());
+                ASSERT_EQ(sink, data);
             }
             if (op->new_block == 6000) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
                 data2.resize(options.block_size);
-                ASSERT_EQ(sink.stream(), data2);
+                std::string sink(data2.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink, data2);
             }
             if (op->new_block == 9000) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
-                ASSERT_EQ(sink.stream(), data3);
+                std::string sink(data3.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink, data3);
             }
         }
 
@@ -376,7 +365,7 @@
     ASSERT_EQ(total_blocks, expected_blocks);
 }
 
-TEST_P(CompressionRWTest, NoBatchWrites) {
+TEST_P(CompressionTest, NoBatchWrites) {
     CowOptions options;
     options.compression = GetParam();
     options.num_compress_threads = 1;
@@ -416,21 +405,21 @@
         if (op->type == kCowReplaceOp) {
             total_blocks += 1;
             if (op->new_block == 50) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
                 data.resize(options.block_size);
-                ASSERT_EQ(sink.stream(), data);
+                std::string sink(data.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink, data);
             }
             if (op->new_block == 3000) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
                 data2.resize(options.block_size);
-                ASSERT_EQ(sink.stream(), data2);
+                std::string sink(data2.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink, data2);
             }
             if (op->new_block == 5000) {
-                StringSink sink;
-                ASSERT_TRUE(reader.ReadData(*op, &sink));
-                ASSERT_EQ(sink.stream(), data3);
+                std::string sink(data3.size(), '\0');
+                ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+                ASSERT_EQ(sink, data3);
             }
         }
 
@@ -440,7 +429,66 @@
     ASSERT_EQ(total_blocks, expected_blocks);
 }
 
-INSTANTIATE_TEST_SUITE_P(CowApi, CompressionRWTest, testing::Values("none", "gz", "brotli", "lz4"));
+template <typename T>
+class HorribleStream : public IByteStream {
+  public:
+    HorribleStream(const std::basic_string<T>& input) : input_(input) {}
+
+    ssize_t Read(void* buffer, size_t length) override {
+        if (pos_ >= input_.size()) {
+            return 0;
+        }
+        if (length) {
+            *reinterpret_cast<char*>(buffer) = input_[pos_];
+        }
+        pos_++;
+        return 1;
+    }
+    size_t Size() const override { return input_.size(); }
+
+  private:
+    std::basic_string<T> input_;
+    size_t pos_ = 0;
+};
+
+TEST(HorribleStream, ReadFully) {
+    std::string expected = "this is some data";
+
+    HorribleStream<char> stream(expected);
+
+    std::string buffer(expected.size(), '\0');
+    ASSERT_TRUE(stream.ReadFully(buffer.data(), buffer.size()));
+    ASSERT_EQ(buffer, expected);
+}
+
+TEST_P(CompressionTest, HorribleStream) {
+    if (strcmp(GetParam(), "none") == 0) {
+        GTEST_SKIP();
+    }
+
+    auto algorithm = CompressionAlgorithmFromString(GetParam());
+    ASSERT_TRUE(algorithm.has_value());
+
+    std::string expected = "The quick brown fox jumps over the lazy dog.";
+    expected.resize(4096, '\0');
+
+    auto result = CompressWorker::Compress(*algorithm, expected.data(), expected.size());
+    ASSERT_FALSE(result.empty());
+
+    HorribleStream<uint8_t> stream(result);
+    auto decomp = IDecompressor::FromString(GetParam());
+    ASSERT_NE(decomp, nullptr);
+    decomp->set_stream(&stream);
+
+    expected = expected.substr(10, 500);
+
+    std::string buffer(expected.size(), '\0');
+    ASSERT_EQ(decomp->Decompress(buffer.data(), 500, 4096, 10), 500);
+    ASSERT_EQ(buffer, expected);
+}
+
+INSTANTIATE_TEST_SUITE_P(AllCompressors, CompressionTest,
+                         testing::Values("none", "gz", "brotli", "lz4"));
 
 TEST_F(CowTest, ClusterCompressGz) {
     CowOptions options;
@@ -470,14 +518,14 @@
     ASSERT_FALSE(iter->Done());
     auto op = &iter->Get();
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     ASSERT_EQ(op->type, kCowReplaceOp);
     ASSERT_EQ(op->compression, kCowCompressGz);
     ASSERT_EQ(op->data_length, 56);  // compressed!
     ASSERT_EQ(op->new_block, 50);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data);
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -489,12 +537,13 @@
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
 
-    sink.Reset();
+    sink = {};
+    sink.resize(data2.size(), '\0');
     ASSERT_EQ(op->compression, kCowCompressGz);
     ASSERT_EQ(op->data_length, 41);  // compressed!
     ASSERT_EQ(op->new_block, 51);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data2);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data2);
 
     iter->Next();
     ASSERT_FALSE(iter->Done());
@@ -531,55 +580,15 @@
     iter->Next();
     ASSERT_FALSE(iter->Done());
 
-    StringSink sink;
+    std::string sink(options.block_size, '\0');
 
     auto op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
     ASSERT_EQ(op->compression, kCowCompressGz);
     ASSERT_EQ(op->new_block, 51);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
 }
 
-// Only return 1-byte buffers, to stress test the partial read logic in
-// CowReader.
-class HorribleStringSink : public StringSink {
-  public:
-    void* GetBuffer(size_t, size_t* actual) override { return StringSink::GetBuffer(1, actual); }
-};
-
-class CompressionTest : public CowTest, public testing::WithParamInterface<const char*> {};
-
-TEST_P(CompressionTest, HorribleSink) {
-    CowOptions options;
-    options.compression = GetParam();
-    options.cluster_ops = 0;
-    CowWriter writer(options);
-
-    ASSERT_TRUE(writer.Initialize(cow_->fd));
-
-    std::string data = "This is some data, believe it";
-    data.resize(options.block_size, '\0');
-
-    ASSERT_TRUE(writer.AddRawBlocks(50, data.data(), data.size()));
-    ASSERT_TRUE(writer.Finalize());
-
-    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
-
-    CowReader reader;
-    ASSERT_TRUE(reader.Parse(cow_->fd));
-
-    auto iter = reader.GetOpIter();
-    ASSERT_NE(iter, nullptr);
-    ASSERT_FALSE(iter->Done());
-
-    HorribleStringSink sink;
-    auto op = &iter->Get();
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
-}
-
-INSTANTIATE_TEST_SUITE_P(CowApi, CompressionTest, testing::Values("none", "gz", "brotli"));
-
 TEST_F(CowTest, GetSize) {
     CowOptions options;
     options.cluster_ops = 0;
@@ -641,7 +650,7 @@
     ASSERT_TRUE(reader.GetLastLabel(&label));
     ASSERT_EQ(label, 3);
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
@@ -649,11 +658,12 @@
     ASSERT_FALSE(iter->Done());
     auto op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data);
 
     iter->Next();
-    sink.Reset();
+    sink = {};
+    sink.resize(data2.size(), '\0');
 
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
@@ -665,8 +675,8 @@
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data2);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data2);
 
     iter->Next();
     ASSERT_TRUE(iter->Done());
@@ -705,8 +715,6 @@
     CowReader reader;
     ASSERT_TRUE(reader.Parse(cow_->fd));
 
-    StringSink sink;
-
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
 
@@ -765,8 +773,6 @@
     CowReader reader;
     ASSERT_TRUE(reader.Parse(cow_->fd));
 
-    StringSink sink;
-
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
 
@@ -816,7 +822,7 @@
     CowReader reader;
     ASSERT_TRUE(reader.Parse(cow_->fd));
 
-    StringSink sink;
+    std::string sink(options.block_size, '\0');
 
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
@@ -824,20 +830,20 @@
     ASSERT_FALSE(iter->Done());
     auto op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data.substr(0, options.block_size));
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data.substr(0, options.block_size));
 
     iter->Next();
-    sink.Reset();
+    sink = {};
+    sink.resize(options.block_size, '\0');
 
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data.substr(options.block_size, 2 * options.block_size));
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data.substr(options.block_size, 2 * options.block_size));
 
     iter->Next();
-    sink.Reset();
 
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
@@ -897,7 +903,7 @@
     CowReader reader;
     ASSERT_TRUE(reader.Parse(cow_->fd));
 
-    StringSink sink;
+    std::string sink(data.size(), '\0');
 
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
@@ -905,11 +911,10 @@
     ASSERT_FALSE(iter->Done());
     auto op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data.substr(0, options.block_size));
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data.substr(0, options.block_size));
 
     iter->Next();
-    sink.Reset();
 
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
@@ -997,7 +1002,7 @@
     ASSERT_TRUE(reader.GetLastLabel(&label));
     ASSERT_EQ(label, 50);
 
-    StringSink sink;
+    std::string sink(data2.size(), '\0');
 
     auto iter = reader.GetOpIter();
     ASSERT_NE(iter, nullptr);
@@ -1012,8 +1017,8 @@
     ASSERT_FALSE(iter->Done());
     op = &iter->Get();
     ASSERT_EQ(op->type, kCowReplaceOp);
-    ASSERT_TRUE(reader.ReadData(*op, &sink));
-    ASSERT_EQ(sink.stream(), data2);
+    ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
+    ASSERT_EQ(sink, data2);
 
     iter->Next();
 
@@ -1066,13 +1071,13 @@
     std::string cmp = data;
     cmp.resize(header.block_size, '\0');
 
-    StringSink sink;
-    if (!reader->ReadData(op, &sink)) {
+    std::string sink(cmp.size(), '\0');
+    if (!reader->ReadData(op, sink.data(), sink.size())) {
         return AssertionFailure() << "Failed to read data block";
     }
-    if (cmp != sink.stream()) {
+    if (cmp != sink) {
         return AssertionFailure() << "Data blocks did not match, expected " << cmp << ", got "
-                                  << sink.stream();
+                                  << sink;
     }
 
     return AssertionSuccess();
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
index 9b50986..d06c904 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
@@ -32,6 +32,21 @@
 
 namespace android {
 namespace snapshot {
+
+std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::string_view name) {
+    if (name == "gz") {
+        return {kCowCompressGz};
+    } else if (name == "brotli") {
+        return {kCowCompressBrotli};
+    } else if (name == "lz4") {
+        return {kCowCompressLz4};
+    } else if (name == "none" || name.empty()) {
+        return {kCowCompressNone};
+    } else {
+        return {};
+    }
+}
+
 std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
     return Compress(compression_, data, length);
 }
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp
index 139a29f..483d559 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp
@@ -16,6 +16,7 @@
 
 #include "cow_decompress.h"
 
+#include <array>
 #include <utility>
 
 #include <android-base/logging.h>
@@ -26,9 +27,50 @@
 namespace android {
 namespace snapshot {
 
+ssize_t IByteStream::ReadFully(void* buffer, size_t buffer_size) {
+    size_t stream_remaining = Size();
+
+    char* buffer_start = reinterpret_cast<char*>(buffer);
+    char* buffer_pos = buffer_start;
+    size_t buffer_remaining = buffer_size;
+    while (stream_remaining) {
+        const size_t to_read = std::min(buffer_remaining, stream_remaining);
+        const ssize_t actual_read = Read(buffer_pos, to_read);
+        if (actual_read < 0) {
+            return -1;
+        }
+        if (!actual_read) {
+            LOG(ERROR) << "Stream ended prematurely";
+            return -1;
+        }
+        CHECK_LE(actual_read, to_read);
+
+        stream_remaining -= actual_read;
+        buffer_pos += actual_read;
+        buffer_remaining -= actual_read;
+    }
+    return buffer_pos - buffer_start;
+}
+
+std::unique_ptr<IDecompressor> IDecompressor::FromString(std::string_view compressor) {
+    if (compressor == "lz4") {
+        return IDecompressor::Lz4();
+    } else if (compressor == "brotli") {
+        return IDecompressor::Brotli();
+    } else if (compressor == "gz") {
+        return IDecompressor::Gz();
+    } else {
+        return nullptr;
+    }
+}
+
 class NoDecompressor final : public IDecompressor {
   public:
     bool Decompress(size_t) override;
+    ssize_t Decompress(void*, size_t, size_t, size_t) override {
+        LOG(ERROR) << "Not supported";
+        return -1;
+    }
 };
 
 bool NoDecompressor::Decompress(size_t) {
@@ -45,8 +87,8 @@
         uint8_t* buffer_pos = buffer;
         size_t bytes_to_read = std::min(buffer_size, stream_remaining);
         while (bytes_to_read) {
-            size_t read;
-            if (!stream_->Read(buffer_pos, bytes_to_read, &read)) {
+            ssize_t read = stream_->Read(buffer_pos, bytes_to_read);
+            if (read < 0) {
                 return false;
             }
             if (!read) {
@@ -73,10 +115,13 @@
 class StreamDecompressor : public IDecompressor {
   public:
     bool Decompress(size_t output_bytes) override;
+    ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size,
+                       size_t ignore_bytes) override;
 
     virtual bool Init() = 0;
     virtual bool DecompressInput(const uint8_t* data, size_t length) = 0;
-    virtual bool Done() = 0;
+    virtual bool PartialDecompress(const uint8_t* data, size_t length) = 0;
+    bool OutputFull() const { return !ignore_bytes_ && !output_buffer_remaining_; }
 
   protected:
     bool GetFreshBuffer();
@@ -85,6 +130,8 @@
     size_t stream_remaining_;
     uint8_t* output_buffer_ = nullptr;
     size_t output_buffer_remaining_ = 0;
+    size_t ignore_bytes_ = 0;
+    bool decompressor_ended_ = false;
 };
 
 static constexpr size_t kChunkSize = 4096;
@@ -99,8 +146,9 @@
 
     uint8_t chunk[kChunkSize];
     while (stream_remaining_) {
-        size_t read = std::min(stream_remaining_, sizeof(chunk));
-        if (!stream_->Read(chunk, read, &read)) {
+        size_t max_read = std::min(stream_remaining_, sizeof(chunk));
+        ssize_t read = stream_->Read(chunk, max_read);
+        if (read < 0) {
             return false;
         }
         if (!read) {
@@ -113,18 +161,65 @@
 
         stream_remaining_ -= read;
 
-        if (stream_remaining_ && Done()) {
+        if (stream_remaining_ && decompressor_ended_) {
             LOG(ERROR) << "Decompressor terminated early";
             return false;
         }
     }
-    if (!Done()) {
+    if (!decompressor_ended_) {
         LOG(ERROR) << "Decompressor expected more bytes";
         return false;
     }
     return true;
 }
 
+ssize_t StreamDecompressor::Decompress(void* buffer, size_t buffer_size, size_t,
+                                       size_t ignore_bytes) {
+    if (!Init()) {
+        return false;
+    }
+
+    stream_remaining_ = stream_->Size();
+    output_buffer_ = reinterpret_cast<uint8_t*>(buffer);
+    output_buffer_remaining_ = buffer_size;
+    ignore_bytes_ = ignore_bytes;
+
+    uint8_t chunk[kChunkSize];
+    while (stream_remaining_ && output_buffer_remaining_ && !decompressor_ended_) {
+        size_t max_read = std::min(stream_remaining_, sizeof(chunk));
+        ssize_t read = stream_->Read(chunk, max_read);
+        if (read < 0) {
+            return -1;
+        }
+        if (!read) {
+            LOG(ERROR) << "Stream ended prematurely";
+            return -1;
+        }
+        if (!PartialDecompress(chunk, read)) {
+            return -1;
+        }
+        stream_remaining_ -= read;
+    }
+
+    if (stream_remaining_) {
+        if (decompressor_ended_ && !OutputFull()) {
+            // If there's more input in the stream, but we haven't finished
+            // consuming ignored bytes or available output space yet, then
+            // something weird happened. Report it and fail.
+            LOG(ERROR) << "Decompressor terminated early";
+            return -1;
+        }
+    } else {
+        if (!decompressor_ended_ && !OutputFull()) {
+            // The stream ended, but the decoder doesn't think so, and there are
+            // more bytes in the output buffer.
+            LOG(ERROR) << "Decompressor expected more bytes";
+            return -1;
+        }
+    }
+    return buffer_size - output_buffer_remaining_;
+}
+
 bool StreamDecompressor::GetFreshBuffer() {
     size_t request_size = std::min(output_bytes_, kChunkSize);
     output_buffer_ =
@@ -142,11 +237,10 @@
 
     bool Init() override;
     bool DecompressInput(const uint8_t* data, size_t length) override;
-    bool Done() override { return ended_; }
+    bool PartialDecompress(const uint8_t* data, size_t length) override;
 
   private:
     z_stream z_ = {};
-    bool ended_ = false;
 };
 
 bool GzDecompressor::Init() {
@@ -198,7 +292,61 @@
                 LOG(ERROR) << "Gz stream ended prematurely";
                 return false;
             }
-            ended_ = true;
+            decompressor_ended_ = true;
+            return true;
+        }
+    }
+    return true;
+}
+
+bool GzDecompressor::PartialDecompress(const uint8_t* data, size_t length) {
+    z_.next_in = reinterpret_cast<Bytef*>(const_cast<uint8_t*>(data));
+    z_.avail_in = length;
+
+    // If we're asked to ignore starting bytes, we sink those into the output
+    // repeatedly until there is nothing left to ignore.
+    while (ignore_bytes_ && z_.avail_in) {
+        std::array<Bytef, kChunkSize> ignore_buffer;
+        size_t max_ignore = std::min(ignore_bytes_, ignore_buffer.size());
+        z_.next_out = ignore_buffer.data();
+        z_.avail_out = max_ignore;
+
+        int rv = inflate(&z_, Z_NO_FLUSH);
+        if (rv != Z_OK && rv != Z_STREAM_END) {
+            LOG(ERROR) << "inflate returned error code " << rv;
+            return false;
+        }
+
+        size_t returned = max_ignore - z_.avail_out;
+        CHECK_LE(returned, ignore_bytes_);
+
+        ignore_bytes_ -= returned;
+
+        if (rv == Z_STREAM_END) {
+            decompressor_ended_ = true;
+            return true;
+        }
+    }
+
+    z_.next_out = reinterpret_cast<Bytef*>(output_buffer_);
+    z_.avail_out = output_buffer_remaining_;
+
+    while (z_.avail_in && z_.avail_out) {
+        // Decompress.
+        int rv = inflate(&z_, Z_NO_FLUSH);
+        if (rv != Z_OK && rv != Z_STREAM_END) {
+            LOG(ERROR) << "inflate returned error code " << rv;
+            return false;
+        }
+
+        size_t returned = output_buffer_remaining_ - z_.avail_out;
+        CHECK_LE(returned, output_buffer_remaining_);
+
+        output_buffer_ += returned;
+        output_buffer_remaining_ -= returned;
+
+        if (rv == Z_STREAM_END) {
+            decompressor_ended_ = true;
             return true;
         }
     }
@@ -215,7 +363,7 @@
 
     bool Init() override;
     bool DecompressInput(const uint8_t* data, size_t length) override;
-    bool Done() override { return BrotliDecoderIsFinished(decoder_); }
+    bool PartialDecompress(const uint8_t* data, size_t length) override;
 
   private:
     BrotliDecoderState* decoder_ = nullptr;
@@ -257,6 +405,44 @@
     return true;
 }
 
+bool BrotliDecompressor::PartialDecompress(const uint8_t* data, size_t length) {
+    size_t available_in = length;
+    const uint8_t* next_in = data;
+
+    while (available_in && ignore_bytes_ && !BrotliDecoderIsFinished(decoder_)) {
+        std::array<uint8_t, kChunkSize> ignore_buffer;
+        size_t max_ignore = std::min(ignore_bytes_, ignore_buffer.size());
+        size_t ignore_size = max_ignore;
+
+        uint8_t* ignore_buffer_ptr = ignore_buffer.data();
+        auto r = BrotliDecoderDecompressStream(decoder_, &available_in, &next_in, &ignore_size,
+                                               &ignore_buffer_ptr, nullptr);
+        if (r == BROTLI_DECODER_RESULT_ERROR) {
+            LOG(ERROR) << "brotli decode failed";
+            return false;
+        } else if (r == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && available_in) {
+            LOG(ERROR) << "brotli unexpected needs more input";
+            return false;
+        }
+        ignore_bytes_ -= max_ignore - ignore_size;
+    }
+
+    while (available_in && !BrotliDecoderIsFinished(decoder_)) {
+        auto r = BrotliDecoderDecompressStream(decoder_, &available_in, &next_in,
+                                               &output_buffer_remaining_, &output_buffer_, nullptr);
+        if (r == BROTLI_DECODER_RESULT_ERROR) {
+            LOG(ERROR) << "brotli decode failed";
+            return false;
+        } else if (r == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && available_in) {
+            LOG(ERROR) << "brotli unexpected needs more input";
+            return false;
+        }
+    }
+
+    decompressor_ended_ = BrotliDecoderIsFinished(decoder_);
+    return true;
+}
+
 std::unique_ptr<IDecompressor> IDecompressor::Brotli() {
     return std::unique_ptr<IDecompressor>(new BrotliDecompressor());
 }
@@ -275,8 +461,7 @@
         }
         // If input size is same as output size, then input is uncompressed.
         if (stream_->Size() == output_size) {
-            size_t bytes_read = 0;
-            stream_->Read(output_buffer, output_size, &bytes_read);
+            ssize_t bytes_read = stream_->ReadFully(output_buffer, output_size);
             if (bytes_read != output_size) {
                 LOG(ERROR) << "Failed to read all input at once. Expected: " << output_size
                            << " actual: " << bytes_read;
@@ -287,8 +472,7 @@
         }
         std::string input_buffer;
         input_buffer.resize(stream_->Size());
-        size_t bytes_read = 0;
-        stream_->Read(input_buffer.data(), input_buffer.size(), &bytes_read);
+        ssize_t bytes_read = stream_->ReadFully(input_buffer.data(), input_buffer.size());
         if (bytes_read != input_buffer.size()) {
             LOG(ERROR) << "Failed to read all input at once. Expected: " << input_buffer.size()
                        << " actual: " << bytes_read;
@@ -305,6 +489,61 @@
         sink_->ReturnData(output_buffer, output_size);
         return true;
     }
+
+    ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size,
+                       size_t ignore_bytes) override {
+        std::string input_buffer(stream_->Size(), '\0');
+        ssize_t streamed_in = stream_->ReadFully(input_buffer.data(), input_buffer.size());
+        if (streamed_in < 0) {
+            return -1;
+        }
+        CHECK_EQ(streamed_in, stream_->Size());
+
+        char* decode_buffer = reinterpret_cast<char*>(buffer);
+        size_t decode_buffer_size = buffer_size;
+
+        // It's unclear if LZ4 can exactly satisfy a partial decode request, so
+        // if we get one, create a temporary buffer.
+        std::string temp;
+        if (buffer_size < decompressed_size) {
+            temp.resize(decompressed_size, '\0');
+            decode_buffer = temp.data();
+            decode_buffer_size = temp.size();
+        }
+
+        const int bytes_decompressed = LZ4_decompress_safe(input_buffer.data(), decode_buffer,
+                                                           input_buffer.size(), decode_buffer_size);
+        if (bytes_decompressed < 0) {
+            LOG(ERROR) << "Failed to decompress LZ4 block, code: " << bytes_decompressed;
+            return -1;
+        }
+        if (bytes_decompressed != decompressed_size) {
+            LOG(ERROR) << "Failed to decompress LZ4 block, expected output size: "
+                       << bytes_decompressed << ", actual: " << bytes_decompressed;
+            return -1;
+        }
+        CHECK_LE(bytes_decompressed, decode_buffer_size);
+
+        if (ignore_bytes > bytes_decompressed) {
+            LOG(ERROR) << "Ignoring more bytes than exist in stream (ignoring " << ignore_bytes
+                       << ", got " << bytes_decompressed << ")";
+            return -1;
+        }
+
+        if (temp.empty()) {
+            // LZ4's API has no way to sink out the first N bytes of decoding,
+            // so we read them all in and memmove() to drop the partial read.
+            if (ignore_bytes) {
+                memmove(decode_buffer, decode_buffer + ignore_bytes,
+                        bytes_decompressed - ignore_bytes);
+            }
+            return bytes_decompressed - ignore_bytes;
+        }
+
+        size_t max_copy = std::min(bytes_decompressed - ignore_bytes, buffer_size);
+        memcpy(buffer, temp.data() + ignore_bytes, max_copy);
+        return max_copy;
+    }
 };
 
 std::unique_ptr<IDecompressor> IDecompressor::Lz4() {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h
index 7f74eda..09164d3 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h
@@ -26,11 +26,16 @@
     virtual ~IByteStream() {}
 
     // Read up to |length| bytes, storing the number of bytes read in the out-
-    // parameter. If the end of the stream is reached, 0 is returned.
-    virtual bool Read(void* buffer, size_t length, size_t* read) = 0;
+    // parameter. If the end of the stream is reached, 0 is returned. On error,
+    // -1 is returned. errno is NOT set.
+    virtual ssize_t Read(void* buffer, size_t length) = 0;
 
     // Size of the stream.
     virtual size_t Size() const = 0;
+
+    // Helper for Read(). Read the entire stream into |buffer|, up to |length|
+    // bytes.
+    ssize_t ReadFully(void* buffer, size_t length);
 };
 
 class IDecompressor {
@@ -43,9 +48,21 @@
     static std::unique_ptr<IDecompressor> Brotli();
     static std::unique_ptr<IDecompressor> Lz4();
 
+    static std::unique_ptr<IDecompressor> FromString(std::string_view compressor);
+
     // |output_bytes| is the expected total number of bytes to sink.
     virtual bool Decompress(size_t output_bytes) = 0;
 
+    // Decompress at most |buffer_size| bytes, ignoring the first |ignore_bytes|
+    // of the decoded stream. |buffer_size| must be at least one byte.
+    // |decompressed_size| is the expected total size if the entire stream were
+    // decompressed.
+    //
+    // Returns the number of bytes written to |buffer|, or -1 on error. errno
+    // is NOT set.
+    virtual ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size,
+                               size_t ignore_bytes = 0) = 0;
+
     void set_stream(IByteStream* stream) { stream_ = stream; }
     void set_sink(IByteSink* sink) { sink_ = sink; }
 
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
index 45be191..e583ff0 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
@@ -747,18 +747,18 @@
         remaining_ = data_length_;
     }
 
-    bool Read(void* buffer, size_t length, size_t* read) override {
+    ssize_t Read(void* buffer, size_t length) override {
         size_t to_read = std::min(length, remaining_);
         if (!to_read) {
-            *read = 0;
-            return true;
+            return 0;
         }
-        if (!reader_->GetRawBytes(offset_, buffer, to_read, read)) {
-            return false;
+        size_t read;
+        if (!reader_->GetRawBytes(offset_, buffer, to_read, &read)) {
+            return -1;
         }
-        offset_ += *read;
-        remaining_ -= *read;
-        return true;
+        offset_ += read;
+        remaining_ -= read;
+        return read;
     }
 
     size_t Size() const override { return data_length_; }
@@ -802,5 +802,44 @@
     return decompressor->Decompress(header_.block_size);
 }
 
+ssize_t CowReader::ReadData(const CowOperation& op, void* buffer, size_t buffer_size,
+                            size_t ignore_bytes) {
+    std::unique_ptr<IDecompressor> decompressor;
+    switch (op.compression) {
+        case kCowCompressNone:
+            break;
+        case kCowCompressGz:
+            decompressor = IDecompressor::Gz();
+            break;
+        case kCowCompressBrotli:
+            decompressor = IDecompressor::Brotli();
+            break;
+        case kCowCompressLz4:
+            if (header_.block_size != op.data_length) {
+                decompressor = IDecompressor::Lz4();
+            }
+            break;
+        default:
+            LOG(ERROR) << "Unknown compression type: " << op.compression;
+            return -1;
+    }
+
+    uint64_t offset;
+    if (op.type == kCowXorOp) {
+        offset = data_loc_->at(op.new_block);
+    } else {
+        offset = op.source;
+    }
+
+    if (!decompressor) {
+        CowDataStream stream(this, offset + ignore_bytes, op.data_length - ignore_bytes);
+        return stream.ReadFully(buffer, buffer_size);
+    }
+
+    CowDataStream stream(this, offset, op.data_length);
+    decompressor->set_stream(&stream);
+    return decompressor->Decompress(buffer, buffer_size, header_.block_size, ignore_bytes);
+}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
index 56b48f0..042ffb4 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
@@ -37,6 +37,14 @@
 #include <sys/ioctl.h>
 #include <unistd.h>
 
+// The info messages here are spammy, but as useful for update_engine. Disable
+// them when running on the host.
+#ifdef __ANDROID__
+#define LOG_INFO LOG(INFO)
+#else
+#define LOG_INFO LOG(VERBOSE)
+#endif
+
 namespace android {
 namespace snapshot {
 
@@ -194,18 +202,13 @@
 }
 
 bool CowWriter::ParseOptions() {
-    if (options_.compression == "gz") {
-        compression_ = kCowCompressGz;
-    } else if (options_.compression == "brotli") {
-        compression_ = kCowCompressBrotli;
-    } else if (options_.compression == "lz4") {
-        compression_ = kCowCompressLz4;
-    } else if (options_.compression == "none") {
-        compression_ = kCowCompressNone;
-    } else if (!options_.compression.empty()) {
+    auto algorithm = CompressionAlgorithmFromString(options_.compression);
+    if (!algorithm) {
         LOG(ERROR) << "unrecognized compression: " << options_.compression;
         return false;
     }
+    compression_ = *algorithm;
+
     if (options_.cluster_ops == 1) {
         LOG(ERROR) << "Clusters must contain at least two operations to function.";
         return false;
@@ -239,10 +242,10 @@
                 return false;
             }
             cow_image_size_ = size_in_bytes;
-            LOG(INFO) << "COW image " << file_path << " has size " << size_in_bytes;
+            LOG_INFO << "COW image " << file_path << " has size " << size_in_bytes;
         } else {
-            LOG(INFO) << "COW image " << file_path
-                      << " is not a block device, assuming unlimited space.";
+            LOG_INFO << "COW image " << file_path
+                     << " is not a block device, assuming unlimited space.";
         }
     }
     return true;
@@ -271,12 +274,12 @@
     }
 
     std::string batch_write = batch_write_ ? "enabled" : "disabled";
-    LOG(INFO) << "Batch writes: " << batch_write;
+    LOG_INFO << "Batch writes: " << batch_write;
 }
 
 void CowWriter::InitWorkers() {
     if (num_compress_threads_ <= 1) {
-        LOG(INFO) << "Not creating new threads for compression.";
+        LOG_INFO << "Not creating new threads for compression.";
         return;
     }
     for (int i = 0; i < num_compress_threads_; i++) {
@@ -285,7 +288,7 @@
         compress_threads_.push_back(std::move(wt));
     }
 
-    LOG(INFO) << num_compress_threads_ << " thread used for compression";
+    LOG_INFO << num_compress_threads_ << " thread used for compression";
 }
 
 bool CowWriter::Initialize(unique_fd&& fd) {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp
index 167ff8c..2716156 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp
@@ -63,24 +63,6 @@
     bool include_merged;
 };
 
-// Sink that always appends to the end of a string.
-class StringSink : public IByteSink {
-  public:
-    void* GetBuffer(size_t requested, size_t* actual) override {
-        size_t old_size = stream_.size();
-        stream_.resize(old_size + requested, '\0');
-        *actual = requested;
-        return stream_.data() + old_size;
-    }
-    bool ReturnData(void*, size_t) override { return true; }
-    void Reset() { stream_.clear(); }
-
-    std::string& stream() { return stream_; }
-
-  private:
-    std::string stream_;
-};
-
 static void ShowBad(CowReader& reader, const struct CowOperation& op) {
     size_t count;
     auto buffer = std::make_unique<uint8_t[]>(op.data_length);
@@ -153,7 +135,9 @@
     } else if (opt.iter_type == Merge) {
         iter = reader.GetMergeOpIter(opt.include_merged);
     }
-    StringSink sink;
+
+    std::string buffer(header.block_size, '\0');
+
     bool success = true;
     uint64_t xor_ops = 0, copy_ops = 0, replace_ops = 0, zero_ops = 0;
     while (!iter->Done()) {
@@ -162,12 +146,11 @@
         if (!opt.silent && opt.show_ops) std::cout << op << "\n";
 
         if (opt.decompress && op.type == kCowReplaceOp && op.compression != kCowCompressNone) {
-            if (!reader.ReadData(op, &sink)) {
+            if (reader.ReadData(op, buffer.data(), buffer.size()) < 0) {
                 std::cerr << "Failed to decompress for :" << op << "\n";
                 success = false;
                 if (opt.show_bad) ShowBad(reader, op);
             }
-            sink.Reset();
         }
 
         if (op.type == kCowSequenceOp && opt.show_seq) {