snapuserd: Remove IByteSink usage from snapuserd.

This added some new failure paths, so the patch also includes an RAII
helper to assist with handling those.

Bug: 278637212
Test: vts_libsnapshot_test
      apply ota on CF
Change-Id: Ide59c83cb34b6d448fe9afdc1d15c6139a0c99fa
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
index 0d0f711..3bc02d4 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
@@ -76,11 +76,15 @@
 // internal COW format and if the block is compressed,
 // it will be de-compressed.
 bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
-    if (!reader_->ReadData(*cow_op, &bufsink_)) {
+    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+    if (!buffer) {
+        SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer";
+        return false;
+    }
+    if (!reader_->ReadData(*cow_op, buffer, BLOCK_SZ)) {
         SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
         return false;
     }
-
     return true;
 }
 
@@ -125,12 +129,26 @@
     if (!ReadFromSourceDevice(cow_op)) {
         return false;
     }
+
     xorsink_.Reset();
-    if (!reader_->ReadData(*cow_op, &xorsink_)) {
-        SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block;
+
+    size_t actual = 0;
+    void* buffer = xorsink_.GetBuffer(BLOCK_SZ, &actual);
+    if (!buffer || actual < BLOCK_SZ) {
+        SNAP_LOG(ERROR) << "ProcessXorOp failed to get buffer of " << BLOCK_SZ << " size, got "
+                        << actual;
         return false;
     }
-
+    ssize_t size = reader_->ReadData(*cow_op, buffer, BLOCK_SZ);
+    if (size != BLOCK_SZ) {
+        SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block
+                        << ", return value: " << size;
+        return false;
+    }
+    if (!xorsink_.ReturnData(buffer, size)) {
+        SNAP_LOG(ERROR) << "ProcessXorOp failed to return data";
+        return false;
+    }
     return true;
 }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
index fbe57d2..b175fe8 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -114,6 +114,24 @@
     return nr_consecutive;
 }
 
+class [[nodiscard]] AutoNotifyReadAheadFailed {
+  public:
+    AutoNotifyReadAheadFailed(std::shared_ptr<SnapshotHandler> snapuserd) : snapuserd_(snapuserd) {}
+
+    ~AutoNotifyReadAheadFailed() {
+        if (cancelled_) {
+            return;
+        }
+        snapuserd_->ReadAheadIOFailed();
+    }
+
+    void Cancel() { cancelled_ = true; }
+
+  private:
+    std::shared_ptr<SnapshotHandler> snapuserd_;
+    bool cancelled_ = false;
+};
+
 bool ReadAhead::ReconstructDataFromCow() {
     std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
     loff_t metadata_offset = 0;
@@ -145,6 +163,8 @@
         metadata_offset += sizeof(struct ScratchMetadata);
     }
 
+    AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_);
+
     // We are done re-constructing the mapping; however, we need to make sure
     // all the COW operations to-be merged are present in the re-constructed
     // mapping.
@@ -162,7 +182,6 @@
         if (!(num_ops == 0)) {
             SNAP_LOG(ERROR) << "ReconstructDataFromCow failed. Not all ops recoverd "
                             << " Pending ops: " << num_ops;
-            snapuserd_->ReadAheadIOFailed();
             return false;
         }
 
@@ -175,11 +194,11 @@
 
     if (!snapuserd_->ReadAheadIOCompleted(true)) {
         SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
-        snapuserd_->ReadAheadIOFailed();
         return false;
     }
 
     SNAP_LOG(INFO) << "ReconstructDataFromCow success";
+    notify_read_ahead_failed.Cancel();
     return true;
 }
 
@@ -467,9 +486,16 @@
         if (xor_op_index < xor_op_vec.size()) {
             const CowOperation* xor_op = xor_op_vec[xor_op_index];
             if (xor_op->new_block == new_block) {
-                if (!reader_->ReadData(*xor_op, &bufsink_)) {
+                void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+                if (!buffer) {
+                    SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer for block: "
+                                    << xor_op->new_block;
+                    return false;
+                }
+                if (ssize_t rv = reader_->ReadData(*xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) {
                     SNAP_LOG(ERROR)
-                            << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
+                            << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block
+                            << ", return value: " << rv;
                     return false;
                 }
 
@@ -492,6 +518,8 @@
     blocks_.clear();
     std::vector<const CowOperation*> xor_op_vec;
 
+    AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_);
+
     bufsink_.ResetBufferOffset();
 
     // Number of ops to be merged in this window. This is a fixed size
@@ -518,8 +546,6 @@
                              << " offset :" << source_offset % BLOCK_SZ
                              << " buffer_offset : " << buffer_offset << " io_size : " << io_size
                              << " buf-addr : " << read_ahead_buffer_;
-
-            snapuserd_->ReadAheadIOFailed();
             return false;
         }
 
@@ -530,6 +556,7 @@
 
     // Done with merging ordered ops
     if (RAIterDone() && total_blocks_merged_ == 0) {
+        notify_read_ahead_failed.Cancel();
         return true;
     }
 
@@ -560,20 +587,25 @@
             // Check if this block is an XOR op
             if (xor_op->new_block == new_block) {
                 // Read the xor'ed data from COW
-                if (!reader_->ReadData(*xor_op, &bufsink)) {
+                void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+                if (!buffer) {
+                    SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer";
+                    return false;
+                }
+                if (ssize_t rv = reader_->ReadData(*xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) {
                     SNAP_LOG(ERROR)
-                            << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
-                    snapuserd_->ReadAheadIOFailed();
+                            << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block
+                            << ", return value: " << rv;
                     return false;
                 }
                 // Pointer to the data read from base device
-                uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
+                uint8_t* read_buffer = reinterpret_cast<uint8_t*>(bufptr);
                 // Get the xor'ed data read from COW device
                 uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink.GetPayloadBufPtr());
 
                 // Retrieve the original data
                 for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
-                    buffer[byte_offset] ^= xor_data[byte_offset];
+                    read_buffer[byte_offset] ^= xor_data[byte_offset];
                 }
 
                 // Move to next XOR op
@@ -604,6 +636,7 @@
     bm->new_block = 0;
     bm->file_offset = 0;
 
+    notify_read_ahead_failed.Cancel();
     return true;
 }