Merge "snapuserd: Remove IByteSink usage from snapuserd."
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
index 0d0f711..3bc02d4 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
@@ -76,11 +76,15 @@
// internal COW format and if the block is compressed,
// it will be de-compressed.
bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
- if (!reader_->ReadData(*cow_op, &bufsink_)) {
+ void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+ if (!buffer) {
+ SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer";
+ return false;
+ }
+ if (!reader_->ReadData(*cow_op, buffer, BLOCK_SZ)) {
SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
return false;
}
-
return true;
}
@@ -125,12 +129,26 @@
if (!ReadFromSourceDevice(cow_op)) {
return false;
}
+
xorsink_.Reset();
- if (!reader_->ReadData(*cow_op, &xorsink_)) {
- SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block;
+
+ size_t actual = 0;
+ void* buffer = xorsink_.GetBuffer(BLOCK_SZ, &actual);
+ if (!buffer || actual < BLOCK_SZ) {
+ SNAP_LOG(ERROR) << "ProcessXorOp failed to get buffer of " << BLOCK_SZ << " size, got "
+ << actual;
return false;
}
-
+ ssize_t size = reader_->ReadData(*cow_op, buffer, BLOCK_SZ);
+ if (size != BLOCK_SZ) {
+ SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block
+ << ", return value: " << size;
+ return false;
+ }
+ if (!xorsink_.ReturnData(buffer, size)) {
+ SNAP_LOG(ERROR) << "ProcessXorOp failed to return data";
+ return false;
+ }
return true;
}
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
index fbe57d2..b175fe8 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -114,6 +114,24 @@
return nr_consecutive;
}
+class [[nodiscard]] AutoNotifyReadAheadFailed {
+ public:
+ AutoNotifyReadAheadFailed(std::shared_ptr<SnapshotHandler> snapuserd) : snapuserd_(snapuserd) {}
+
+ ~AutoNotifyReadAheadFailed() {
+ if (cancelled_) {
+ return;
+ }
+ snapuserd_->ReadAheadIOFailed();
+ }
+
+ void Cancel() { cancelled_ = true; }
+
+ private:
+ std::shared_ptr<SnapshotHandler> snapuserd_;
+ bool cancelled_ = false;
+};
+
bool ReadAhead::ReconstructDataFromCow() {
std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
loff_t metadata_offset = 0;
@@ -145,6 +163,8 @@
metadata_offset += sizeof(struct ScratchMetadata);
}
+ AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_);
+
// We are done re-constructing the mapping; however, we need to make sure
// all the COW operations to-be merged are present in the re-constructed
// mapping.
@@ -162,7 +182,6 @@
if (!(num_ops == 0)) {
SNAP_LOG(ERROR) << "ReconstructDataFromCow failed. Not all ops recoverd "
<< " Pending ops: " << num_ops;
- snapuserd_->ReadAheadIOFailed();
return false;
}
@@ -175,11 +194,11 @@
if (!snapuserd_->ReadAheadIOCompleted(true)) {
SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
- snapuserd_->ReadAheadIOFailed();
return false;
}
SNAP_LOG(INFO) << "ReconstructDataFromCow success";
+ notify_read_ahead_failed.Cancel();
return true;
}
@@ -467,9 +486,16 @@
if (xor_op_index < xor_op_vec.size()) {
const CowOperation* xor_op = xor_op_vec[xor_op_index];
if (xor_op->new_block == new_block) {
- if (!reader_->ReadData(*xor_op, &bufsink_)) {
+ void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+ if (!buffer) {
+ SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer for block: "
+ << xor_op->new_block;
+ return false;
+ }
+ if (ssize_t rv = reader_->ReadData(*xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) {
SNAP_LOG(ERROR)
- << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
+ << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block
+ << ", return value: " << rv;
return false;
}
@@ -492,6 +518,8 @@
blocks_.clear();
std::vector<const CowOperation*> xor_op_vec;
+ AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_);
+
bufsink_.ResetBufferOffset();
// Number of ops to be merged in this window. This is a fixed size
@@ -518,8 +546,6 @@
<< " offset :" << source_offset % BLOCK_SZ
<< " buffer_offset : " << buffer_offset << " io_size : " << io_size
<< " buf-addr : " << read_ahead_buffer_;
-
- snapuserd_->ReadAheadIOFailed();
return false;
}
@@ -530,6 +556,7 @@
// Done with merging ordered ops
if (RAIterDone() && total_blocks_merged_ == 0) {
+ notify_read_ahead_failed.Cancel();
return true;
}
@@ -560,20 +587,25 @@
// Check if this block is an XOR op
if (xor_op->new_block == new_block) {
// Read the xor'ed data from COW
- if (!reader_->ReadData(*xor_op, &bufsink)) {
+ void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+ if (!buffer) {
+ SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer";
+ return false;
+ }
+ if (ssize_t rv = reader_->ReadData(*xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) {
SNAP_LOG(ERROR)
- << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
- snapuserd_->ReadAheadIOFailed();
+ << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block
+ << ", return value: " << rv;
return false;
}
// Pointer to the data read from base device
- uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
+ uint8_t* read_buffer = reinterpret_cast<uint8_t*>(bufptr);
// Get the xor'ed data read from COW device
uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink.GetPayloadBufPtr());
// Retrieve the original data
for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
- buffer[byte_offset] ^= xor_data[byte_offset];
+ read_buffer[byte_offset] ^= xor_data[byte_offset];
}
// Move to next XOR op
@@ -604,6 +636,7 @@
bm->new_block = 0;
bm->file_offset = 0;
+ notify_read_ahead_failed.Cancel();
return true;
}