Merge changes I25fb5fce,I86cffff6,I031eb1a1,Ie40633c0,I08562b89
* changes:
snapuserd: Sort REPLACE ops for batch merge
snapuserd: API to query snapshot and merge status
snapuserd: Wire up API's for Initiating and tracking Merge
snapuserd: I/O requests which are not block aligned.
snapuserd: Service I/O requests from dm-user
diff --git a/fs_mgr/libsnapshot/cow_reader.cpp b/fs_mgr/libsnapshot/cow_reader.cpp
index 5306b28..20030b9 100644
--- a/fs_mgr/libsnapshot/cow_reader.cpp
+++ b/fs_mgr/libsnapshot/cow_reader.cpp
@@ -34,11 +34,12 @@
namespace android {
namespace snapshot {
-CowReader::CowReader()
+CowReader::CowReader(ReaderFlags reader_flag)
: fd_(-1),
header_(),
fd_size_(0),
- merge_op_blocks_(std::make_shared<std::vector<uint32_t>>()) {}
+ merge_op_blocks_(std::make_shared<std::vector<uint32_t>>()),
+ reader_flag_(reader_flag) {}
static void SHA256(const void*, size_t, uint8_t[]) {
#if 0
@@ -415,7 +416,7 @@
//==============================================================
bool CowReader::PrepMergeOps() {
auto merge_op_blocks = std::make_shared<std::vector<uint32_t>>();
- std::set<int, std::greater<int>> other_ops;
+ std::vector<int> other_ops;
auto seq_ops_set = std::unordered_set<uint32_t>();
auto block_map = std::make_shared<std::unordered_map<uint32_t, int>>();
size_t num_seqs = 0;
@@ -446,7 +447,7 @@
if (!has_seq_ops_ && IsOrderedOp(current_op)) {
merge_op_blocks->emplace_back(current_op.new_block);
} else if (seq_ops_set.count(current_op.new_block) == 0) {
- other_ops.insert(current_op.new_block);
+ other_ops.push_back(current_op.new_block);
}
block_map->insert({current_op.new_block, i});
}
@@ -462,6 +463,18 @@
} else {
num_ordered_ops_to_merge_ = 0;
}
+
+ // Sort the vector in increasing order if merging in user-space as
+ // we can batch merge them when iterating from forward.
+ //
+ // dm-snapshot-merge requires decreasing order as we iterate the blocks
+ // in reverse order.
+ if (reader_flag_ == ReaderFlags::USERSPACE_MERGE) {
+ std::sort(other_ops.begin(), other_ops.end());
+ } else {
+ std::sort(other_ops.begin(), other_ops.end(), std::greater<int>());
+ }
+
merge_op_blocks->reserve(merge_op_blocks->size() + other_ops.size());
for (auto block : other_ops) {
merge_op_blocks->emplace_back(block);
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
index c15682a..9f4ddbb 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
@@ -26,8 +26,8 @@
static constexpr uint32_t kCowVersionManifest = 2;
-static constexpr uint32_t BLOCK_SZ = 4096;
-static constexpr uint32_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1);
+static constexpr size_t BLOCK_SZ = 4096;
+static constexpr size_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1);
// This header appears as the first sequence of bytes in the COW. All fields
// in the layout are little-endian encoded. The on-disk layout is:
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
index 63a9e68..d5b4335 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
@@ -104,7 +104,12 @@
class CowReader final : public ICowReader {
public:
- CowReader();
+ enum class ReaderFlags {
+ DEFAULT = 0,
+ USERSPACE_MERGE = 1,
+ };
+
+ CowReader(ReaderFlags reader_flag = ReaderFlags::DEFAULT);
~CowReader() { owned_fd_ = {}; }
// Parse the COW, optionally, up to the given label. If no label is
@@ -166,6 +171,7 @@
uint64_t num_ordered_ops_to_merge_;
bool has_seq_ops_;
std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc_;
+ ReaderFlags reader_flag_;
};
} // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index 837f33a..c9b0512 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -56,7 +56,7 @@
"fs_mgr_defaults",
],
srcs: [
- "snapuserd_server.cpp",
+ "dm-snapshot-merge/snapuserd_server.cpp",
"dm-snapshot-merge/snapuserd.cpp",
"dm-snapshot-merge/snapuserd_worker.cpp",
"dm-snapshot-merge/snapuserd_readahead.cpp",
@@ -67,6 +67,7 @@
"user-space-merge/snapuserd_merge.cpp",
"user-space-merge/snapuserd_readahead.cpp",
"user-space-merge/snapuserd_transitions.cpp",
+ "user-space-merge/snapuserd_server.cpp",
],
cflags: [
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp
similarity index 99%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp
index 91b4190..9ddc963 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp
@@ -31,6 +31,7 @@
#include <android-base/scopeguard.h>
#include <fs_mgr/file_wait.h>
#include <snapuserd/snapuserd_client.h>
+
#include "snapuserd_server.h"
#define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h
similarity index 98%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h
index 14e5de6..3b6ff15 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h
@@ -28,7 +28,7 @@
#include <vector>
#include <android-base/unique_fd.h>
-#include "dm-snapshot-merge/snapuserd.h"
+#include "snapuserd.h"
namespace android {
namespace snapshot {
diff --git a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h
index aeecf41..6ed55af 100644
--- a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h
+++ b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h
@@ -79,6 +79,15 @@
// Returns true if the snapuserd instance supports bridging a socket to second-stage init.
bool SupportsSecondStageSocketHandoff();
+
+ // Returns true if the merge is started(or resumed from crash).
+ bool InitiateMerge(const std::string& misc_name);
+
+ // Returns Merge completion percentage
+ double GetMergePercent();
+
+ // Return the status of the snapshot
+ std::string QuerySnapshotStatus(const std::string& misc_name);
};
} // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp
index 1ea05a3..e345269 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp
@@ -231,5 +231,35 @@
return true;
}
+bool SnapuserdClient::InitiateMerge(const std::string& misc_name) {
+ std::string msg = "initiate_merge," + misc_name;
+ if (!Sendmsg(msg)) {
+ LOG(ERROR) << "Failed to send message " << msg << " to snapuserd";
+ return false;
+ }
+ std::string response = Receivemsg();
+ return response == "success";
+}
+
+double SnapuserdClient::GetMergePercent() {
+ std::string msg = "merge_percent";
+ if (!Sendmsg(msg)) {
+ LOG(ERROR) << "Failed to send message " << msg << " to snapuserd";
+ return false;
+ }
+ std::string response = Receivemsg();
+
+ return std::stod(response);
+}
+
+std::string SnapuserdClient::QuerySnapshotStatus(const std::string& misc_name) {
+ std::string msg = "getstatus," + misc_name;
+ if (!Sendmsg(msg)) {
+ LOG(ERROR) << "Failed to send message " << msg << " to snapuserd";
+ return "snapshot-merge-failed";
+ }
+ return Receivemsg();
+}
+
} // namespace snapshot
} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
index e05822e..912884f 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
@@ -21,8 +21,6 @@
#include <gflags/gflags.h>
#include <snapuserd/snapuserd_client.h>
-#include "snapuserd_server.h"
-
DEFINE_string(socket, android::snapshot::kSnapuserdSocket, "Named socket or socket path.");
DEFINE_bool(no_socket, false,
"If true, no socket is used. Each additional argument is an INIT message.");
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h
index b660ba2..fbf57d9 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h
@@ -19,7 +19,7 @@
#include <string>
#include <vector>
-#include "snapuserd_server.h"
+#include "dm-snapshot-merge/snapuserd_server.h"
namespace android {
namespace snapshot {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index a2538d2..57e47e7 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -59,6 +59,15 @@
return reader_->CloneCowReader();
}
+void SnapshotHandler::UpdateMergeCompletionPercentage() {
+ struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
+ merge_completion_percentage_ = (ch->num_merge_ops * 100.0) / reader_->get_num_total_data_ops();
+
+ SNAP_LOG(DEBUG) << "Merge-complete %: " << merge_completion_percentage_
+ << " num_merge_ops: " << ch->num_merge_ops
+ << " total-ops: " << reader_->get_num_total_data_ops();
+}
+
bool SnapshotHandler::CommitMerge(int num_merge_ops) {
struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
ch->num_merge_ops += num_merge_ops;
@@ -95,6 +104,12 @@
}
}
+ // Update the merge completion - this is used by update engine
+ // to track the completion. No need to take a lock. It is ok
+ // even if there is a miss on reading a latest updated value.
+ // Subsequent polling will eventually converge to completion.
+ UpdateMergeCompletionPercentage();
+
return true;
}
@@ -124,7 +139,7 @@
}
bool SnapshotHandler::ReadMetadata() {
- reader_ = std::make_unique<CowReader>();
+ reader_ = std::make_unique<CowReader>(CowReader::ReaderFlags::USERSPACE_MERGE);
CowHeader header;
CowOptions options;
@@ -152,16 +167,48 @@
return false;
}
+ UpdateMergeCompletionPercentage();
+
// Initialize the iterator for reading metadata
std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
+ int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
+ int ra_index = 0;
+
+ size_t copy_ops = 0, replace_ops = 0, zero_ops = 0, xor_ops = 0;
+
while (!cowop_iter->Done()) {
const CowOperation* cow_op = &cowop_iter->Get();
+ if (cow_op->type == kCowCopyOp) {
+ copy_ops += 1;
+ } else if (cow_op->type == kCowReplaceOp) {
+ replace_ops += 1;
+ } else if (cow_op->type == kCowZeroOp) {
+ zero_ops += 1;
+ } else if (cow_op->type == kCowXorOp) {
+ xor_ops += 1;
+ }
+
chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op));
- if (!ra_thread_ && IsOrderedOp(*cow_op)) {
+ if (IsOrderedOp(*cow_op)) {
ra_thread_ = true;
+ block_to_ra_index_[cow_op->new_block] = ra_index;
+ num_ra_ops_per_iter -= 1;
+
+ if ((ra_index + 1) - merge_blk_state_.size() == 1) {
+ std::unique_ptr<MergeGroupState> blk_state = std::make_unique<MergeGroupState>(
+ MERGE_GROUP_STATE::GROUP_MERGE_PENDING, 0);
+
+ merge_blk_state_.push_back(std::move(blk_state));
+ }
+
+ // Move to next RA block
+ if (num_ra_ops_per_iter == 0) {
+ num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
+ ra_index += 1;
+ }
}
cowop_iter->Next();
}
@@ -173,6 +220,12 @@
PrepareReadAhead();
+ SNAP_LOG(INFO) << "Merged-ops: " << header.num_merge_ops
+ << " Total-data-ops: " << reader_->get_num_total_data_ops()
+ << " Unmerged-ops: " << chunk_vec_.size() << " Copy-ops: " << copy_ops
+ << " Zero-ops: " << zero_ops << " Replace-ops: " << replace_ops
+ << " Xor-ops: " << xor_ops;
+
return true;
}
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index c171eda..13b56fa 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -67,6 +67,27 @@
class SnapshotHandler;
+enum class MERGE_GROUP_STATE {
+ GROUP_MERGE_PENDING,
+ GROUP_MERGE_RA_READY,
+ GROUP_MERGE_IN_PROGRESS,
+ GROUP_MERGE_COMPLETED,
+ GROUP_MERGE_FAILED,
+ GROUP_INVALID,
+};
+
+struct MergeGroupState {
+ MERGE_GROUP_STATE merge_state_;
+ // Ref count I/O when group state
+ // is in "GROUP_MERGE_PENDING"
+ size_t num_ios_in_progress;
+ std::mutex m_lock;
+ std::condition_variable m_cv;
+
+ MergeGroupState(MERGE_GROUP_STATE state, size_t n_ios)
+ : merge_state_(state), num_ios_in_progress(n_ios) {}
+};
+
class ReadAhead {
public:
ReadAhead(const std::string& cow_device, const std::string& backing_device,
@@ -133,16 +154,33 @@
base_path_merge_fd_ = {};
}
+ // Functions interacting with dm-user
+ bool ReadDmUserHeader();
+ bool WriteDmUserPayload(size_t size, bool header_response);
+ bool DmuserReadRequest();
+
// IO Path
bool ProcessIORequest();
+ bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
+
+ bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
+ bool ReadFromSourceDevice(const CowOperation* cow_op);
+
+ bool ReadAlignedSector(sector_t sector, size_t sz, bool header_response);
+ bool ReadUnalignedSector(sector_t sector, size_t size);
+ int ReadUnalignedSector(sector_t sector, size_t size,
+ std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
+ bool RespondIOError(bool header_response);
// Processing COW operations
+ bool ProcessCowOp(const CowOperation* cow_op);
bool ProcessReplaceOp(const CowOperation* cow_op);
bool ProcessZeroOp();
// Handles Copy and Xor
bool ProcessCopyOp(const CowOperation* cow_op);
bool ProcessXorOp(const CowOperation* cow_op);
+ bool ProcessOrderedOp(const CowOperation* cow_op);
// Merge related ops
bool Merge();
@@ -152,6 +190,9 @@
const std::unique_ptr<ICowOpIter>& cowop_iter,
std::vector<const CowOperation*>* replace_zero_vec = nullptr);
+ sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
+ chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
+
std::unique_ptr<CowReader> reader_;
BufferSink bufsink_;
XorSink xorsink_;
@@ -210,6 +251,7 @@
// Read-ahead related functions
void* GetMappedAddr() { return mapped_addr_; }
void PrepareReadAhead();
+ std::unordered_map<uint64_t, void*>& GetReadAheadMap() { return read_ahead_buffer_map_; }
// State transitions for merge
void InitiateMerge();
@@ -226,6 +268,8 @@
bool ShouldReconstructDataFromCow() { return populate_data_from_cow_; }
void FinishReconstructDataFromCow() { populate_data_from_cow_ = false; }
+ // Return the snapshot status
+ std::string GetMergeStatus();
// RA related functions
uint64_t GetBufferMetadataOffset();
@@ -238,12 +282,23 @@
int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; }
void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
bool MergeInitiated() { return merge_initiated_; }
+ double GetMergePercentage() { return merge_completion_percentage_; }
+
+ // Merge Block State Transitions
+ void SetMergeCompleted(size_t block_index);
+ void SetMergeInProgress(size_t block_index);
+ void SetMergeFailed(size_t block_index);
+ void NotifyIOCompletion(uint64_t new_block);
+ bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
+ MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer);
private:
bool ReadMetadata();
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
+ bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
struct BufferState* GetBufferState();
+ void UpdateMergeCompletionPercentage();
void ReadBlocks(const std::string partition_name, const std::string& dm_block_device);
void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name,
@@ -261,7 +316,6 @@
unique_fd cow_fd_;
- // Number of sectors required when initializing dm-user
uint64_t num_sectors_;
std::unique_ptr<CowReader> reader_;
@@ -283,8 +337,16 @@
int total_ra_blocks_merged_ = 0;
MERGE_IO_TRANSITION io_state_;
std::unique_ptr<ReadAhead> read_ahead_thread_;
+ std::unordered_map<uint64_t, void*> read_ahead_buffer_map_;
+
+ // user-space-merging
+ std::unordered_map<uint64_t, int> block_to_ra_index_;
+
+ // Merge Block state
+ std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
std::unique_ptr<Worker> merge_thread_;
+ double merge_completion_percentage_;
bool merge_initiated_ = false;
bool attached_ = false;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
index 18c7f2c..bfbacf9 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
@@ -84,6 +84,56 @@
return true;
}
+bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
+ void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+ if (buffer == nullptr) {
+ SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
+ return false;
+ }
+ SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
+ << " Source: " << cow_op->source;
+ uint64_t offset = cow_op->source;
+ if (cow_op->type == kCowCopyOp) {
+ offset *= BLOCK_SZ;
+ }
+ if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) {
+ std::string op;
+ if (cow_op->type == kCowCopyOp)
+ op = "Copy-op";
+ else {
+ op = "Xor-op";
+ }
+ SNAP_PLOG(ERROR) << op << " failed. Read from backing store: " << backing_store_device_
+ << "at block :" << offset / BLOCK_SZ << " offset:" << offset % BLOCK_SZ;
+ return false;
+ }
+
+ return true;
+}
+
+// Start the copy operation. This will read the backing
+// block device which is represented by cow_op->source.
+bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
+ if (!ReadFromSourceDevice(cow_op)) {
+ return false;
+ }
+
+ return true;
+}
+
+bool Worker::ProcessXorOp(const CowOperation* cow_op) {
+ if (!ReadFromSourceDevice(cow_op)) {
+ return false;
+ }
+ xorsink_.Reset();
+ if (!reader_->ReadData(*cow_op, &xorsink_)) {
+ SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block;
+ return false;
+ }
+
+ return true;
+}
+
bool Worker::ProcessZeroOp() {
// Zero out the entire block
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
@@ -96,12 +146,85 @@
return true;
}
-bool Worker::ProcessCopyOp(const CowOperation*) {
- return true;
+bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
+ void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+ if (buffer == nullptr) {
+ SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
+ return false;
+ }
+
+ MERGE_GROUP_STATE state = snapuserd_->ProcessMergingBlock(cow_op->new_block, buffer);
+
+ switch (state) {
+ case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: {
+ // Merge is completed for this COW op; just read directly from
+ // the base device
+ SNAP_LOG(DEBUG) << "Merge-completed: Reading from base device sector: "
+ << (cow_op->new_block >> SECTOR_SHIFT)
+ << " Block-number: " << cow_op->new_block;
+ if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), BLOCK_SZ)) {
+ SNAP_LOG(ERROR) << "ReadDataFromBaseDevice at sector: "
+ << (cow_op->new_block >> SECTOR_SHIFT) << " after merge-complete.";
+ return false;
+ }
+ return true;
+ }
+ case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
+ bool ret;
+ if (cow_op->type == kCowCopyOp) {
+ ret = ProcessCopyOp(cow_op);
+ } else {
+ ret = ProcessXorOp(cow_op);
+ }
+
+ // I/O is complete - decrement the refcount irrespective of the return
+ // status
+ snapuserd_->NotifyIOCompletion(cow_op->new_block);
+ return ret;
+ }
+ // We already have the data in the buffer retrieved from RA thread.
+ // Nothing to process further.
+ case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: {
+ [[fallthrough]];
+ }
+ case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: {
+ return true;
+ }
+ default: {
+ // All other states, fail the I/O viz (GROUP_MERGE_FAILED and GROUP_INVALID)
+ return false;
+ }
+ }
+
+ return false;
}
-bool Worker::ProcessXorOp(const CowOperation*) {
- return true;
+bool Worker::ProcessCowOp(const CowOperation* cow_op) {
+ if (cow_op == nullptr) {
+ SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
+ return false;
+ }
+
+ switch (cow_op->type) {
+ case kCowReplaceOp: {
+ return ProcessReplaceOp(cow_op);
+ }
+
+ case kCowZeroOp: {
+ return ProcessZeroOp();
+ }
+
+ case kCowCopyOp:
+ [[fallthrough]];
+ case kCowXorOp: {
+ return ProcessOrderedOp(cow_op);
+ }
+
+ default: {
+ SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type;
+ }
+ }
+ return false;
}
void Worker::InitializeBufsink() {
@@ -129,7 +252,7 @@
}
bool Worker::RunThread() {
- SNAP_LOG(DEBUG) << "Processing snapshot I/O requests...";
+ SNAP_LOG(INFO) << "Processing snapshot I/O requests....";
// Start serving IO
while (true) {
if (!ProcessIORequest()) {
@@ -143,8 +266,378 @@
return true;
}
+// Read Header from dm-user misc device. This gives
+// us the sector number for which IO is issued by dm-snapshot device
+bool Worker::ReadDmUserHeader() {
+ if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) {
+ if (errno != ENOTBLK) {
+ SNAP_PLOG(ERROR) << "Control-read failed";
+ }
+
+ SNAP_PLOG(DEBUG) << "ReadDmUserHeader failed....";
+ return false;
+ }
+
+ return true;
+}
+
+// Send the payload/data back to dm-user misc device.
+bool Worker::WriteDmUserPayload(size_t size, bool header_response) {
+ size_t payload_size = size;
+ void* buf = bufsink_.GetPayloadBufPtr();
+ if (header_response) {
+ payload_size += sizeof(struct dm_user_header);
+ buf = bufsink_.GetBufPtr();
+ }
+
+ if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
+ SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
+ return false;
+ }
+
+ return true;
+}
+
+bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
+ CHECK(read_size <= BLOCK_SZ);
+
+ void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+ if (buffer == nullptr) {
+ SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
+ return false;
+ }
+
+ loff_t offset = sector << SECTOR_SHIFT;
+ if (!android::base::ReadFullyAtOffset(base_path_merge_fd_, buffer, read_size, offset)) {
+ SNAP_PLOG(ERROR) << "ReadDataFromBaseDevice failed. fd: " << base_path_merge_fd_
+ << "at sector :" << sector << " size: " << read_size;
+ return false;
+ }
+
+ return true;
+}
+
+bool Worker::ReadAlignedSector(sector_t sector, size_t sz, bool header_response) {
+ struct dm_user_header* header = bufsink_.GetHeaderPtr();
+ size_t remaining_size = sz;
+ std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
+ bool io_error = false;
+ int ret = 0;
+
+ do {
+ // Process 1MB payload at a time
+ size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
+
+ header->type = DM_USER_RESP_SUCCESS;
+ size_t total_bytes_read = 0;
+ io_error = false;
+ bufsink_.ResetBufferOffset();
+
+ while (read_size) {
+ // We need to check every 4k block to verify if it is
+ // present in the mapping.
+ size_t size = std::min(BLOCK_SZ, read_size);
+
+ auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
+ std::make_pair(sector, nullptr), SnapshotHandler::compare);
+ bool not_found = (it == chunk_vec.end() || it->first != sector);
+
+ if (not_found) {
+ // Block not found in map - which means this block was not
+ // changed as per the OTA. Just route the I/O to the base
+ // device.
+ if (!ReadDataFromBaseDevice(sector, size)) {
+ SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
+ header->type = DM_USER_RESP_ERROR;
+ }
+
+ ret = size;
+ } else {
+ // We found the sector in mapping. Check the type of COW OP and
+ // process it.
+ if (!ProcessCowOp(it->second)) {
+ SNAP_LOG(ERROR) << "ProcessCowOp failed";
+ header->type = DM_USER_RESP_ERROR;
+ }
+
+ ret = BLOCK_SZ;
+ }
+
+ // Just return the header if it is an error
+ if (header->type == DM_USER_RESP_ERROR) {
+ if (!RespondIOError(header_response)) {
+ return false;
+ }
+
+ io_error = true;
+ break;
+ }
+
+ read_size -= ret;
+ total_bytes_read += ret;
+ sector += (ret >> SECTOR_SHIFT);
+ bufsink_.UpdateBufferOffset(ret);
+ }
+
+ if (!io_error) {
+ if (!WriteDmUserPayload(total_bytes_read, header_response)) {
+ return false;
+ }
+
+ SNAP_LOG(DEBUG) << "WriteDmUserPayload success total_bytes_read: " << total_bytes_read
+ << " header-response: " << header_response
+ << " remaining_size: " << remaining_size;
+ header_response = false;
+ remaining_size -= total_bytes_read;
+ }
+ } while (remaining_size > 0 && !io_error);
+
+ return true;
+}
+
+int Worker::ReadUnalignedSector(
+ sector_t sector, size_t size,
+ std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
+ size_t skip_sector_size = 0;
+
+ SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
+ << " Aligned sector: " << it->first;
+
+ if (!ProcessCowOp(it->second)) {
+ SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
+ << " Aligned sector: " << it->first;
+ return -1;
+ }
+
+ int num_sectors_skip = sector - it->first;
+
+ if (num_sectors_skip > 0) {
+ skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
+ char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
+ struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
+
+ if (skip_sector_size == BLOCK_SZ) {
+ SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
+ << " Base-sector: " << it->first;
+ return -1;
+ }
+
+ memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
+ (BLOCK_SZ - skip_sector_size));
+ }
+
+ bufsink_.ResetBufferOffset();
+ return std::min(size, (BLOCK_SZ - skip_sector_size));
+}
+
+bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
+ struct dm_user_header* header = bufsink_.GetHeaderPtr();
+ header->type = DM_USER_RESP_SUCCESS;
+ bufsink_.ResetBufferOffset();
+ std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
+
+ auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
+ SnapshotHandler::compare);
+
+ // |-------|-------|-------|
+ // 0 1 2 3
+ //
+ // Block 0 - op 1
+ // Block 1 - op 2
+ // Block 2 - op 3
+ //
+ // chunk_vec will have block 0, 1, 2 which maps to relavant COW ops.
+ //
+ // Each block is 4k bytes. Thus, the last block will span 8 sectors
+ // ranging till block 3 (However, block 3 won't be in chunk_vec as
+ // it doesn't have any mapping to COW ops. Now, if we get an I/O request for a sector
+ // spanning between block 2 and block 3, we need to step back
+ // and get hold of the last element.
+ //
+ // Additionally, we need to make sure that the requested sector is
+ // indeed within the range of the final sector. It is perfectly valid
+ // to get an I/O request for block 3 and beyond which are not mapped
+ // to any COW ops. In that case, we just need to read from the base
+ // device.
+ bool merge_complete = false;
+ bool header_response = true;
+ if (it == chunk_vec.end()) {
+ if (chunk_vec.size() > 0) {
+ // I/O request beyond the last mapped sector
+ it = std::prev(chunk_vec.end());
+ } else {
+ // This can happen when a partition merge is complete but snapshot
+ // state in /metadata is not yet deleted; during this window if the
+ // device is rebooted, subsequent attempt will mount the snapshot.
+ // However, since the merge was completed we wouldn't have any
+ // mapping to COW ops thus chunk_vec will be empty. In that case,
+ // mark this as merge_complete and route the I/O to the base device.
+ merge_complete = true;
+ }
+ } else if (it->first != sector) {
+ if (it != chunk_vec.begin()) {
+ --it;
+ }
+ } else {
+ return ReadAlignedSector(sector, size, header_response);
+ }
+
+ loff_t requested_offset = sector << SECTOR_SHIFT;
+
+ loff_t final_offset = 0;
+ if (!merge_complete) {
+ final_offset = it->first << SECTOR_SHIFT;
+ }
+
+ // Since a COW op span 4k block size, we need to make sure that the requested
+ // offset is within the 4k region. Consider the following case:
+ //
+ // |-------|-------|-------|
+ // 0 1 2 3
+ //
+ // Block 0 - op 1
+ // Block 1 - op 2
+ //
+ // We have an I/O request for a sector between block 2 and block 3. However,
+ // we have mapping to COW ops only for block 0 and block 1. Thus, the
+ // requested offset in this case is beyond the last mapped COW op size (which
+ // is block 1 in this case).
+
+ size_t total_bytes_read = 0;
+ size_t remaining_size = size;
+ int ret = 0;
+ if (!merge_complete && (requested_offset >= final_offset) &&
+ (requested_offset - final_offset) < BLOCK_SZ) {
+ // Read the partial un-aligned data
+ ret = ReadUnalignedSector(sector, remaining_size, it);
+ if (ret < 0) {
+ SNAP_LOG(ERROR) << "ReadUnalignedSector failed for sector: " << sector
+ << " size: " << size << " it->sector: " << it->first;
+ return RespondIOError(header_response);
+ }
+
+ remaining_size -= ret;
+ total_bytes_read += ret;
+ sector += (ret >> SECTOR_SHIFT);
+
+ // Send the data back
+ if (!WriteDmUserPayload(total_bytes_read, header_response)) {
+ return false;
+ }
+
+ header_response = false;
+ // If we still have pending data to be processed, this will be aligned I/O
+ if (remaining_size) {
+ return ReadAlignedSector(sector, remaining_size, header_response);
+ }
+ } else {
+ // This is all about handling I/O request to be routed to base device
+ // as the I/O is not mapped to any of the COW ops.
+ loff_t aligned_offset = requested_offset;
+ // Align to nearest 4k
+ aligned_offset += BLOCK_SZ - 1;
+ aligned_offset &= ~(BLOCK_SZ - 1);
+ // Find the diff of the aligned offset
+ size_t diff_size = aligned_offset - requested_offset;
+ CHECK(diff_size <= BLOCK_SZ);
+ if (remaining_size < diff_size) {
+ if (!ReadDataFromBaseDevice(sector, remaining_size)) {
+ return RespondIOError(header_response);
+ }
+ total_bytes_read += remaining_size;
+
+ if (!WriteDmUserPayload(total_bytes_read, header_response)) {
+ return false;
+ }
+ } else {
+ if (!ReadDataFromBaseDevice(sector, diff_size)) {
+ return RespondIOError(header_response);
+ }
+
+ total_bytes_read += diff_size;
+
+ if (!WriteDmUserPayload(total_bytes_read, header_response)) {
+ return false;
+ }
+
+ remaining_size -= diff_size;
+ size_t num_sectors_read = (diff_size >> SECTOR_SHIFT);
+ sector += num_sectors_read;
+ CHECK(IsBlockAligned(sector << SECTOR_SHIFT));
+ header_response = false;
+
+ // If we still have pending data to be processed, this will be aligned I/O
+ return ReadAlignedSector(sector, remaining_size, header_response);
+ }
+ }
+
+ return true;
+}
+
+bool Worker::RespondIOError(bool header_response) {
+ struct dm_user_header* header = bufsink_.GetHeaderPtr();
+ header->type = DM_USER_RESP_ERROR;
+ // This is an issue with the dm-user interface. There
+ // is no way to propagate the I/O error back to dm-user
+ // if we have already communicated the header back. Header
+ // is responded once at the beginning; however I/O can
+ // be processed in chunks. If we encounter an I/O error
+ // somewhere in the middle of the processing, we can't communicate
+ // this back to dm-user.
+ //
+ // TODO: Fix the interface
+ CHECK(header_response);
+
+ if (!WriteDmUserPayload(0, header_response)) {
+ return false;
+ }
+
+ // There is no need to process further as we have already seen
+ // an I/O error
+ return true;
+}
+
+bool Worker::DmuserReadRequest() {
+ struct dm_user_header* header = bufsink_.GetHeaderPtr();
+
+ // Unaligned I/O request
+ if (!IsBlockAligned(header->sector << SECTOR_SHIFT)) {
+ return ReadUnalignedSector(header->sector, header->len);
+ }
+
+ return ReadAlignedSector(header->sector, header->len, true);
+}
+
bool Worker::ProcessIORequest() {
- // No communication with dm-user yet
+ struct dm_user_header* header = bufsink_.GetHeaderPtr();
+
+ if (!ReadDmUserHeader()) {
+ return false;
+ }
+
+ SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
+ SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
+ SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
+ SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
+ SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
+
+ switch (header->type) {
+ case DM_USER_REQ_MAP_READ: {
+ if (!DmuserReadRequest()) {
+ return false;
+ }
+ break;
+ }
+
+ case DM_USER_REQ_MAP_WRITE: {
+ // TODO: We should not get any write request
+ // to dm-user as we mount all partitions
+ // as read-only. Need to verify how are TRIM commands
+ // handled during mount.
+ return false;
+ }
+ }
+
return true;
}
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
index 696ede7..47fc7db 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
@@ -52,7 +52,6 @@
break;
}
- // Check for consecutive blocks
uint64_t next_offset = op->new_block * BLOCK_SZ;
if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
break;
@@ -177,6 +176,7 @@
void* mapped_addr = snapuserd_->GetMappedAddr();
void* read_ahead_buffer =
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
+ size_t block_index = 0;
SNAP_LOG(INFO) << "MergeOrderedOps started....";
@@ -190,9 +190,12 @@
// Wait for RA thread to notify that the merge window
// is ready for merging.
if (!snapuserd_->WaitForMergeBegin()) {
+ snapuserd_->SetMergeFailed(block_index);
return false;
}
+ snapuserd_->SetMergeInProgress(block_index);
+
loff_t offset = 0;
int num_ops = snapuserd_->GetTotalBlocksToMerge();
SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
@@ -213,6 +216,7 @@
if (ret < 0 || ret != io_size) {
SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
<< " at offset: " << source_offset << " io_size: " << io_size;
+ snapuserd_->SetMergeFailed(block_index);
return false;
}
@@ -226,6 +230,7 @@
// Flush the data
if (fsync(base_path_merge_fd_.get()) < 0) {
SNAP_LOG(ERROR) << " Failed to fsync merged data";
+ snapuserd_->SetMergeFailed(block_index);
return false;
}
@@ -233,14 +238,20 @@
// the merge completion
if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
+ snapuserd_->SetMergeFailed(block_index);
return false;
}
SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
+ // Mark the block as merge complete
+ snapuserd_->SetMergeCompleted(block_index);
// Notify RA thread that the merge thread is ready to merge the next
// window
snapuserd_->NotifyRAForMergeReady();
+
+ // Get the next block
+ block_index += 1;
}
return true;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
index 319755b..0bcf26e 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -115,7 +115,7 @@
}
bool ReadAhead::ReconstructDataFromCow() {
- std::unordered_map<uint64_t, void*> read_ahead_buffer_map;
+ std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
loff_t metadata_offset = 0;
loff_t start_data_offset = snapuserd_->GetBufferDataOffset();
int num_ops = 0;
@@ -319,6 +319,18 @@
memcpy(metadata_buffer_, ra_temp_meta_buffer.get(), snapuserd_->GetBufferMetadataSize());
memcpy(read_ahead_buffer_, ra_temp_buffer.get(), total_blocks_merged * BLOCK_SZ);
+ offset = 0;
+ std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
+ read_ahead_buffer_map.clear();
+
+ for (size_t block_index = 0; block_index < blocks.size(); block_index++) {
+ void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + offset);
+ uint64_t new_block = blocks[block_index];
+
+ read_ahead_buffer_map[new_block] = bufptr;
+ offset += BLOCK_SZ;
+ }
+
snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
// Flush the data only if we have a overlapping blocks in the region
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
similarity index 66%
copy from fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
copy to fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
index 91b4190..a4fd5a0 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
@@ -44,26 +44,29 @@
using android::base::borrowed_fd;
using android::base::unique_fd;
-DaemonOperations SnapuserdServer::Resolveop(std::string& input) {
- if (input == "init") return DaemonOperations::INIT;
- if (input == "start") return DaemonOperations::START;
- if (input == "stop") return DaemonOperations::STOP;
- if (input == "query") return DaemonOperations::QUERY;
- if (input == "delete") return DaemonOperations::DELETE;
- if (input == "detach") return DaemonOperations::DETACH;
- if (input == "supports") return DaemonOperations::SUPPORTS;
+DaemonOps SnapuserServer::Resolveop(std::string& input) {
+ if (input == "init") return DaemonOps::INIT;
+ if (input == "start") return DaemonOps::START;
+ if (input == "stop") return DaemonOps::STOP;
+ if (input == "query") return DaemonOps::QUERY;
+ if (input == "delete") return DaemonOps::DELETE;
+ if (input == "detach") return DaemonOps::DETACH;
+ if (input == "supports") return DaemonOps::SUPPORTS;
+ if (input == "initiate_merge") return DaemonOps::INITIATE;
+ if (input == "merge_percent") return DaemonOps::PERCENTAGE;
+ if (input == "getstatus") return DaemonOps::GETSTATUS;
- return DaemonOperations::INVALID;
+ return DaemonOps::INVALID;
}
-SnapuserdServer::~SnapuserdServer() {
+SnapuserServer::~SnapuserServer() {
// Close any client sockets that were added via AcceptClient().
for (size_t i = 1; i < watched_fds_.size(); i++) {
close(watched_fds_[i].fd);
}
}
-std::string SnapuserdServer::GetDaemonStatus() {
+std::string SnapuserServer::GetDaemonStatus() {
std::string msg = "";
if (IsTerminating())
@@ -74,8 +77,8 @@
return msg;
}
-void SnapuserdServer::Parsemsg(std::string const& msg, const char delim,
- std::vector<std::string>& out) {
+void SnapuserServer::Parsemsg(std::string const& msg, const char delim,
+ std::vector<std::string>& out) {
std::stringstream ss(msg);
std::string s;
@@ -84,15 +87,15 @@
}
}
-void SnapuserdServer::ShutdownThreads() {
- StopThreads();
+void SnapuserServer::ShutdownThreads() {
+ terminating_ = true;
JoinAllThreads();
}
-DmUserHandler::DmUserHandler(std::shared_ptr<Snapuserd> snapuserd)
+DmUserHandler::DmUserHandler(std::shared_ptr<SnapshotHandler> snapuserd)
: snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {}
-bool SnapuserdServer::Sendmsg(android::base::borrowed_fd fd, const std::string& msg) {
+bool SnapuserServer::Sendmsg(android::base::borrowed_fd fd, const std::string& msg) {
ssize_t ret = TEMP_FAILURE_RETRY(send(fd.get(), msg.data(), msg.size(), MSG_NOSIGNAL));
if (ret < 0) {
PLOG(ERROR) << "Snapuserd:server: send() failed";
@@ -106,7 +109,7 @@
return true;
}
-bool SnapuserdServer::Recv(android::base::borrowed_fd fd, std::string* data) {
+bool SnapuserServer::Recv(android::base::borrowed_fd fd, std::string* data) {
char msg[MAX_PACKET_SIZE];
ssize_t rv = TEMP_FAILURE_RETRY(recv(fd.get(), msg, sizeof(msg), 0));
if (rv < 0) {
@@ -117,25 +120,25 @@
return true;
}
-bool SnapuserdServer::Receivemsg(android::base::borrowed_fd fd, const std::string& str) {
+bool SnapuserServer::Receivemsg(android::base::borrowed_fd fd, const std::string& str) {
const char delim = ',';
std::vector<std::string> out;
Parsemsg(str, delim, out);
- DaemonOperations op = Resolveop(out[0]);
+ DaemonOps op = Resolveop(out[0]);
switch (op) {
- case DaemonOperations::INIT: {
+ case DaemonOps::INIT: {
// Message format:
- // init,<misc_name>,<cow_device_path>,<backing_device>
+ // init,<misc_name>,<cow_device_path>,<backing_device>,<base_path_merge>
//
// Reads the metadata and send the number of sectors
- if (out.size() != 4) {
+ if (out.size() != 5) {
LOG(ERROR) << "Malformed init message, " << out.size() << " parts";
return Sendmsg(fd, "fail");
}
- auto handler = AddHandler(out[1], out[2], out[3]);
+ auto handler = AddHandler(out[1], out[2], out[3], out[4]);
if (!handler) {
return Sendmsg(fd, "fail");
}
@@ -143,7 +146,7 @@
auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors());
return Sendmsg(fd, retval);
}
- case DaemonOperations::START: {
+ case DaemonOps::START: {
// Message format:
// start,<misc_name>
//
@@ -168,7 +171,7 @@
}
return Sendmsg(fd, "success");
}
- case DaemonOperations::STOP: {
+ case DaemonOps::STOP: {
// Message format: stop
//
// Stop all the threads gracefully and then shutdown the
@@ -177,7 +180,7 @@
ShutdownThreads();
return true;
}
- case DaemonOperations::QUERY: {
+ case DaemonOps::QUERY: {
// Message format: query
//
// As part of transition, Second stage daemon will be
@@ -189,23 +192,41 @@
// be ready to receive control message.
return Sendmsg(fd, GetDaemonStatus());
}
- case DaemonOperations::DELETE: {
+ case DaemonOps::DELETE: {
// Message format:
// delete,<misc_name>
if (out.size() != 2) {
LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
return Sendmsg(fd, "fail");
}
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ auto iter = FindHandler(&lock, out[1]);
+ if (iter == dm_users_.end()) {
+ // After merge is completed, we swap dm-user table with
+ // the underlying dm-linear base device. Hence, worker
+ // threads would have terminted and was removed from
+ // the list.
+ LOG(DEBUG) << "Could not find handler: " << out[1];
+ return Sendmsg(fd, "success");
+ }
+
+ if (!(*iter)->ThreadTerminated()) {
+ (*iter)->snapuserd()->NotifyIOTerminated();
+ }
+ }
if (!RemoveAndJoinHandler(out[1])) {
return Sendmsg(fd, "fail");
}
return Sendmsg(fd, "success");
}
- case DaemonOperations::DETACH: {
+ case DaemonOps::DETACH: {
+ std::lock_guard<std::mutex> lock(lock_);
+ TerminateMergeThreads(&lock);
terminating_ = true;
return true;
}
- case DaemonOperations::SUPPORTS: {
+ case DaemonOps::SUPPORTS: {
if (out.size() != 2) {
LOG(ERROR) << "Malformed supports message, " << out.size() << " parts";
return Sendmsg(fd, "fail");
@@ -215,6 +236,52 @@
}
return Sendmsg(fd, "fail");
}
+ case DaemonOps::INITIATE: {
+ if (out.size() != 2) {
+ LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts";
+ return Sendmsg(fd, "fail");
+ }
+ if (out[0] == "initiate_merge") {
+ std::lock_guard<std::mutex> lock(lock_);
+ auto iter = FindHandler(&lock, out[1]);
+ if (iter == dm_users_.end()) {
+ LOG(ERROR) << "Could not find handler: " << out[1];
+ return Sendmsg(fd, "fail");
+ }
+
+ if (!StartMerge(*iter)) {
+ return Sendmsg(fd, "fail");
+ }
+
+ return Sendmsg(fd, "success");
+ }
+ return Sendmsg(fd, "fail");
+ }
+ case DaemonOps::PERCENTAGE: {
+ std::lock_guard<std::mutex> lock(lock_);
+ double percentage = GetMergePercentage(&lock);
+
+ return Sendmsg(fd, std::to_string(percentage));
+ }
+ case DaemonOps::GETSTATUS: {
+ // Message format:
+ // getstatus,<misc_name>
+ if (out.size() != 2) {
+ LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
+ return Sendmsg(fd, "snapshot-merge-failed");
+ }
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ auto iter = FindHandler(&lock, out[1]);
+ if (iter == dm_users_.end()) {
+ LOG(ERROR) << "Could not find handler: " << out[1];
+ return Sendmsg(fd, "snapshot-merge-failed");
+ }
+
+ std::string merge_status = GetMergeStatus(*iter);
+ return Sendmsg(fd, merge_status);
+ }
+ }
default: {
LOG(ERROR) << "Received unknown message type from client";
Sendmsg(fd, "fail");
@@ -223,7 +290,7 @@
}
}
-void SnapuserdServer::RunThread(std::shared_ptr<DmUserHandler> handler) {
+void SnapuserServer::RunThread(std::shared_ptr<DmUserHandler> handler) {
LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
handler->snapuserd()->SetSocketPresent(is_socket_present_);
@@ -240,6 +307,8 @@
{
std::lock_guard<std::mutex> lock(lock_);
+ num_partitions_merge_complete_ += 1;
+ handler->SetThreadTerminated();
auto iter = FindHandler(&lock, handler->misc_name());
if (iter == dm_users_.end()) {
// RemoveAndJoinHandler() already removed us from the list, and is
@@ -264,10 +333,11 @@
// WaitForDelete() is called, the handler is either in the list, or
// it's not and its resources are guaranteed to be freed.
handler->FreeResources();
+ dm_users_.erase(iter);
}
}
-bool SnapuserdServer::Start(const std::string& socketname) {
+bool SnapuserServer::Start(const std::string& socketname) {
bool start_listening = true;
sockfd_.reset(android_get_control_socket(socketname.c_str()));
@@ -283,7 +353,7 @@
return StartWithSocket(start_listening);
}
-bool SnapuserdServer::StartWithSocket(bool start_listening) {
+bool SnapuserServer::StartWithSocket(bool start_listening) {
if (start_listening && listen(sockfd_.get(), 4) < 0) {
PLOG(ERROR) << "listen socket failed";
return false;
@@ -304,7 +374,7 @@
return true;
}
-bool SnapuserdServer::Run() {
+bool SnapuserServer::Run() {
LOG(INFO) << "Now listening on snapuserd socket";
while (!IsTerminating()) {
@@ -336,7 +406,7 @@
return true;
}
-void SnapuserdServer::JoinAllThreads() {
+void SnapuserServer::JoinAllThreads() {
// Acquire the thread list within the lock.
std::vector<std::shared_ptr<DmUserHandler>> dm_users;
{
@@ -351,14 +421,14 @@
}
}
-void SnapuserdServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
+void SnapuserServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
struct pollfd p = {};
p.fd = fd.get();
p.events = events;
watched_fds_.emplace_back(std::move(p));
}
-void SnapuserdServer::AcceptClient() {
+void SnapuserServer::AcceptClient() {
int fd = TEMP_FAILURE_RETRY(accept4(sockfd_.get(), nullptr, nullptr, SOCK_CLOEXEC));
if (fd < 0) {
PLOG(ERROR) << "accept4 failed";
@@ -368,7 +438,7 @@
AddWatchedFd(fd, POLLIN);
}
-bool SnapuserdServer::HandleClient(android::base::borrowed_fd fd, int revents) {
+bool SnapuserServer::HandleClient(android::base::borrowed_fd fd, int revents) {
if (revents & POLLHUP) {
LOG(DEBUG) << "Snapuserd client disconnected";
return false;
@@ -385,16 +455,18 @@
return true;
}
-void SnapuserdServer::Interrupt() {
+void SnapuserServer::Interrupt() {
// Force close the socket so poll() fails.
sockfd_ = {};
SetTerminating();
}
-std::shared_ptr<DmUserHandler> SnapuserdServer::AddHandler(const std::string& misc_name,
- const std::string& cow_device_path,
- const std::string& backing_device) {
- auto snapuserd = std::make_shared<Snapuserd>(misc_name, cow_device_path, backing_device);
+std::shared_ptr<DmUserHandler> SnapuserServer::AddHandler(const std::string& misc_name,
+ const std::string& cow_device_path,
+ const std::string& backing_device,
+ const std::string& base_path_merge) {
+ auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
+ base_path_merge);
if (!snapuserd->InitCowDevice()) {
LOG(ERROR) << "Failed to initialize Snapuserd";
return nullptr;
@@ -417,7 +489,7 @@
return handler;
}
-bool SnapuserdServer::StartHandler(const std::shared_ptr<DmUserHandler>& handler) {
+bool SnapuserServer::StartHandler(const std::shared_ptr<DmUserHandler>& handler) {
if (handler->snapuserd()->IsAttached()) {
LOG(ERROR) << "Handler already attached";
return false;
@@ -425,12 +497,22 @@
handler->snapuserd()->AttachControlDevice();
- handler->thread() = std::thread(std::bind(&SnapuserdServer::RunThread, this, handler));
+ handler->thread() = std::thread(std::bind(&SnapuserServer::RunThread, this, handler));
return true;
}
-auto SnapuserdServer::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
- const std::string& misc_name) -> HandlerList::iterator {
+bool SnapuserServer::StartMerge(const std::shared_ptr<DmUserHandler>& handler) {
+ if (!handler->snapuserd()->IsAttached()) {
+ LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
+ return false;
+ }
+
+ handler->snapuserd()->InitiateMerge();
+ return true;
+}
+
+auto SnapuserServer::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
+ const std::string& misc_name) -> HandlerList::iterator {
CHECK(proof_of_lock);
for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
@@ -441,7 +523,51 @@
return dm_users_.end();
}
-bool SnapuserdServer::RemoveAndJoinHandler(const std::string& misc_name) {
+void SnapuserServer::TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock) {
+ CHECK(proof_of_lock);
+
+ for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+ if (!(*iter)->ThreadTerminated()) {
+ (*iter)->snapuserd()->NotifyIOTerminated();
+ }
+ }
+}
+
+std::string SnapuserServer::GetMergeStatus(const std::shared_ptr<DmUserHandler>& handler) {
+ return handler->snapuserd()->GetMergeStatus();
+}
+
+double SnapuserServer::GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock) {
+ CHECK(proof_of_lock);
+ double percentage = 0.0;
+ int n = 0;
+
+ for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+ auto& th = (*iter)->thread();
+ if (th.joinable()) {
+ // Merge percentage by individual partitions wherein merge is still
+ // in-progress
+ percentage += (*iter)->snapuserd()->GetMergePercentage();
+ n += 1;
+ }
+ }
+
+ // Calculate final merge including those partitions where merge was already
+ // completed - num_partitions_merge_complete_ will track them when each
+ // thread exists in RunThread.
+ int total_partitions = n + num_partitions_merge_complete_;
+
+ if (total_partitions) {
+ percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
+ }
+
+ LOG(DEBUG) << "Merge %: " << percentage
+ << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
+ << " total_partitions: " << total_partitions << " n: " << n;
+ return percentage;
+}
+
+bool SnapuserServer::RemoveAndJoinHandler(const std::string& misc_name) {
std::shared_ptr<DmUserHandler> handler;
{
std::lock_guard<std::mutex> lock(lock_);
@@ -462,7 +588,7 @@
return true;
}
-bool SnapuserdServer::WaitForSocket() {
+bool SnapuserServer::WaitForSocket() {
auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy;
@@ -516,7 +642,7 @@
return Run();
}
-bool SnapuserdServer::RunForSocketHandoff() {
+bool SnapuserServer::RunForSocketHandoff() {
unique_fd proxy_fd(android_get_control_socket(kSnapuserdSocketProxy));
if (proxy_fd < 0) {
PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocketProxy;
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
similarity index 76%
copy from fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
copy to fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
index 14e5de6..e93621c 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
@@ -28,14 +28,14 @@
#include <vector>
#include <android-base/unique_fd.h>
-#include "dm-snapshot-merge/snapuserd.h"
+#include "snapuserd_core.h"
namespace android {
namespace snapshot {
static constexpr uint32_t MAX_PACKET_SIZE = 512;
-enum class DaemonOperations {
+enum class DaemonOps {
INIT,
START,
QUERY,
@@ -43,12 +43,15 @@
DELETE,
DETACH,
SUPPORTS,
+ INITIATE,
+ PERCENTAGE,
+ GETSTATUS,
INVALID,
};
class DmUserHandler {
public:
- explicit DmUserHandler(std::shared_ptr<Snapuserd> snapuserd);
+ explicit DmUserHandler(std::shared_ptr<SnapshotHandler> snapuserd);
void FreeResources() {
// Each worker thread holds a reference to snapuserd.
@@ -59,44 +62,28 @@
snapuserd_ = nullptr;
}
}
- const std::shared_ptr<Snapuserd>& snapuserd() const { return snapuserd_; }
+ const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
std::thread& thread() { return thread_; }
const std::string& misc_name() const { return misc_name_; }
+ bool ThreadTerminated() { return thread_terminated_; }
+ void SetThreadTerminated() { thread_terminated_ = true; }
private:
std::thread thread_;
- std::shared_ptr<Snapuserd> snapuserd_;
+ std::shared_ptr<SnapshotHandler> snapuserd_;
std::string misc_name_;
+ bool thread_terminated_ = false;
};
-class Stoppable {
- std::promise<void> exitSignal_;
- std::future<void> futureObj_;
-
- public:
- Stoppable() : futureObj_(exitSignal_.get_future()) {}
-
- virtual ~Stoppable() {}
-
- bool StopRequested() {
- // checks if value in future object is available
- if (futureObj_.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) {
- return false;
- }
- return true;
- }
- // Request the thread to stop by setting value in promise object
- void StopThreads() { exitSignal_.set_value(); }
-};
-
-class SnapuserdServer : public Stoppable {
+class SnapuserServer {
private:
android::base::unique_fd sockfd_;
bool terminating_;
volatile bool received_socket_signal_ = false;
std::vector<struct pollfd> watched_fds_;
bool is_socket_present_ = false;
+ int num_partitions_merge_complete_ = 0;
std::mutex lock_;
@@ -112,7 +99,7 @@
void ShutdownThreads();
bool RemoveAndJoinHandler(const std::string& control_device);
- DaemonOperations Resolveop(std::string& input);
+ DaemonOps Resolveop(std::string& input);
std::string GetDaemonStatus();
void Parsemsg(std::string const& msg, const char delim, std::vector<std::string>& out);
@@ -126,9 +113,12 @@
HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
const std::string& misc_name);
+ double GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock);
+ void TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock);
+
public:
- SnapuserdServer() { terminating_ = false; }
- ~SnapuserdServer();
+ SnapuserServer() { terminating_ = false; }
+ ~SnapuserServer();
bool Start(const std::string& socketname);
bool Run();
@@ -138,8 +128,11 @@
std::shared_ptr<DmUserHandler> AddHandler(const std::string& misc_name,
const std::string& cow_device_path,
- const std::string& backing_device);
+ const std::string& backing_device,
+ const std::string& base_path_merge);
bool StartHandler(const std::shared_ptr<DmUserHandler>& handler);
+ bool StartMerge(const std::shared_ptr<DmUserHandler>& handler);
+ std::string GetMergeStatus(const std::shared_ptr<DmUserHandler>& handler);
void SetTerminating() { terminating_ = true; }
void ReceivedSocketSignal() { received_socket_signal_ = true; }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
index 97418bd..6c91fde 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
@@ -359,5 +359,289 @@
}
}
+std::string SnapshotHandler::GetMergeStatus() {
+ bool merge_not_initiated = false;
+ bool merge_failed = false;
+
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ if (!MergeInitiated()) {
+ merge_not_initiated = true;
+ }
+
+ if (io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED) {
+ merge_failed = true;
+ }
+ }
+
+ struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
+ bool merge_complete = (ch->num_merge_ops == reader_->get_num_total_data_ops());
+
+ if (merge_not_initiated) {
+ // Merge was not initiated yet; however, we have merge completion
+ // recorded in the COW Header. This can happen if the device was
+ // rebooted during merge. During next reboot, libsnapshot will
+ // query the status and if the merge is completed, then snapshot-status
+ // file will be deleted
+ if (merge_complete) {
+ return "snapshot-merge-complete";
+ }
+
+ // Return the state as "snapshot". If the device was rebooted during
+ // merge, we will return the status as "snapshot". This is ok, as
+ // libsnapshot will explicitly resume the merge. This is slightly
+ // different from kernel snapshot wherein once the snapshot was switched
+ // to merge target, during next boot, we immediately switch to merge
+ // target. We don't do that here because, during first stage init, we
+ // don't want to initiate the merge. The problem is that we have daemon
+ // transition between first and second stage init. If the merge was
+ // started, then we will have to quiesce the merge before switching
+ // the dm tables. Instead, we just wait until second stage daemon is up
+ // before resuming the merge.
+ return "snapshot";
+ }
+
+ if (merge_failed) {
+ return "snapshot-merge-failed";
+ }
+
+ // Merge complete
+ if (merge_complete) {
+ return "snapshot-merge-complete";
+ }
+
+ // Merge is in-progress
+ return "snapshot-merge";
+}
+
+//========== End of Read-ahead state transition functions ====================
+
+/*
+ * Root partitions are mounted off dm-user and the I/O's are served
+ * by snapuserd worker threads.
+ *
+ * When there is an I/O request to be served by worker threads, we check
+ * if the corresponding sector is "changed" due to OTA by doing a lookup.
+ * If the lookup succeeds then the sector has been changed and that can
+ * either fall into 4 COW operations viz: COPY, XOR, REPLACE and ZERO.
+ *
+ * For the case of REPLACE and ZERO ops, there is not much of a concern
+ * as there is no dependency between blocks. Hence all the I/O request
+ * mapped to these two COW operations will be served by reading the COW device.
+ *
+ * However, COPY and XOR ops are tricky. Since the merge operations are
+ * in-progress, we cannot just go and read from the source device. We need
+ * to be in sync with the state of the merge thread before serving the I/O.
+ *
+ * Given that we know merge thread processes a set of COW ops called as RA
+ * Blocks - These set of COW ops are fixed size wherein each Block comprises
+ * of 510 COW ops.
+ *
+ * +--------------------------+
+ * |op-1|op-2|op-3|....|op-510|
+ * +--------------------------+
+ *
+ * <------ Merge Group Block N ------>
+ *
+ * Thus, a Merge Group Block N, will fall into one of these states and will
+ * transition the states in the following order:
+ *
+ * 1: GROUP_MERGE_PENDING
+ * 2: GROUP_MERGE_RA_READY
+ * 2: GROUP_MERGE_IN_PROGRESS
+ * 3: GROUP_MERGE_COMPLETED
+ * 4: GROUP_MERGE_FAILED
+ *
+ * Let's say that we have the I/O request from dm-user whose sector gets mapped
+ * to a COPY operation with op-10 in the above "Merge Group Block N".
+ *
+ * 1: If the Group is in "GROUP_MERGE_PENDING" state:
+ *
+ * Just read the data from source block based on COW op->source field. Note,
+ * that we will take a ref count on "Block N". This ref count will prevent
+ * merge thread to begin merging if there are any pending I/Os. Once the I/O
+ * is completed, ref count on "Group N" is decremented. Merge thread will
+ * resume merging "Group N" if there are no pending I/Os.
+ *
+ * 2: If the Group is in "GROUP_MERGE_IN_PROGRESS" or "GROUP_MERGE_RA_READY" state:
+ *
+ * When the merge thread is ready to process a "Group", it will first move
+ * the state to GROUP_MERGE_PENDING -> GROUP_MERGE_RA_READY. From this point
+ * onwards, I/O will be served from Read-ahead buffer. However, merge thread
+ * cannot start merging this "Group" immediately. If there were any in-flight
+ * I/O requests, merge thread should wait and allow those I/O's to drain.
+ * Once all the in-flight I/O's are completed, merge thread will move the
+ * state from "GROUP_MERGE_RA_READY" -> "GROUP_MERGE_IN_PROGRESS". I/O will
+ * be continued to serve from Read-ahead buffer during the entire duration
+ * of the merge.
+ *
+ * See SetMergeInProgress().
+ *
+ * 3: If the Group is in "GROUP_MERGE_COMPLETED" state:
+ *
+ * This is straightforward. We just read the data directly from "Base"
+ * device. We should not be reading the COW op->source field.
+ *
+ * 4: If the Block is in "GROUP_MERGE_FAILED" state:
+ *
+ * Terminate the I/O with an I/O error as we don't know which "op" in the
+ * "Group" failed.
+ *
+ * Transition ensures that the I/O from root partitions are never made to
+ * wait and are processed immediately. Thus the state transition for any
+ * "Group" is:
+ *
+ * GROUP_MERGE_PENDING
+ * |
+ * |
+ * v
+ * GROUP_MERGE_RA_READY
+ * |
+ * |
+ * v
+ * GROUP_MERGE_IN_PROGRESS
+ * |
+ * |----------------------------(on failure)
+ * | |
+ * v v
+ * GROUP_MERGE_COMPLETED GROUP_MERGE_FAILED
+ *
+ */
+
+// Invoked by Merge thread
+void SnapshotHandler::SetMergeCompleted(size_t ra_index) {
+ MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
+ {
+ std::lock_guard<std::mutex> lock(blk_state->m_lock);
+
+ CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS);
+ CHECK(blk_state->num_ios_in_progress == 0);
+
+ // Merge is complete - All I/O henceforth should be read directly
+ // from base device
+ blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED;
+ }
+}
+
+// Invoked by Merge thread. This is called just before the beginning
+// of merging a given Block of 510 ops. If there are any in-flight I/O's
+// from dm-user then wait for them to complete.
+void SnapshotHandler::SetMergeInProgress(size_t ra_index) {
+ MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
+ {
+ std::unique_lock<std::mutex> lock(blk_state->m_lock);
+
+ CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING);
+
+ // First set the state to RA_READY so that in-flight I/O will drain
+ // and any new I/O will start reading from RA buffer
+ blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_RA_READY;
+
+ // Wait if there are any in-flight I/O's - we cannot merge at this point
+ while (!(blk_state->num_ios_in_progress == 0)) {
+ blk_state->m_cv.wait(lock);
+ }
+
+ blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS;
+ }
+}
+
+// Invoked by Merge thread on failure
+void SnapshotHandler::SetMergeFailed(size_t ra_index) {
+ MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
+ {
+ std::unique_lock<std::mutex> lock(blk_state->m_lock);
+
+ blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_FAILED;
+ }
+}
+
+// Invoked by worker threads when I/O is complete on a "MERGE_PENDING"
+// Block. If there are no more in-flight I/Os, wake up merge thread
+// to resume merging.
+void SnapshotHandler::NotifyIOCompletion(uint64_t new_block) {
+ auto it = block_to_ra_index_.find(new_block);
+ CHECK(it != block_to_ra_index_.end()) << " invalid block: " << new_block;
+
+ bool pending_ios = true;
+
+ int ra_index = it->second;
+ MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
+ {
+ std::unique_lock<std::mutex> lock(blk_state->m_lock);
+
+ CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING);
+ blk_state->num_ios_in_progress -= 1;
+ if (blk_state->num_ios_in_progress == 0) {
+ pending_ios = false;
+ }
+ }
+
+ // Give a chance to merge-thread to resume merge
+ // as there are no pending I/O.
+ if (!pending_ios) {
+ blk_state->m_cv.notify_all();
+ }
+}
+
+bool SnapshotHandler::GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block,
+ void* buffer) {
+ if (!lock->owns_lock()) {
+ SNAP_LOG(ERROR) << "GetRABuffer - Lock not held";
+ return false;
+ }
+ std::unordered_map<uint64_t, void*>::iterator it = read_ahead_buffer_map_.find(block);
+
+ if (it == read_ahead_buffer_map_.end()) {
+ SNAP_LOG(ERROR) << "Block: " << block << " not found in RA buffer";
+ return false;
+ }
+
+ memcpy(buffer, it->second, BLOCK_SZ);
+ return true;
+}
+
+// Invoked by worker threads in the I/O path. This is called when a sector
+// is mapped to a COPY/XOR COW op.
+MERGE_GROUP_STATE SnapshotHandler::ProcessMergingBlock(uint64_t new_block, void* buffer) {
+ auto it = block_to_ra_index_.find(new_block);
+ if (it == block_to_ra_index_.end()) {
+ return MERGE_GROUP_STATE::GROUP_INVALID;
+ }
+
+ int ra_index = it->second;
+ MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
+ {
+ std::unique_lock<std::mutex> lock(blk_state->m_lock);
+
+ MERGE_GROUP_STATE state = blk_state->merge_state_;
+ switch (state) {
+ case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
+ blk_state->num_ios_in_progress += 1; // ref count
+ [[fallthrough]];
+ }
+ case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: {
+ [[fallthrough]];
+ }
+ case MERGE_GROUP_STATE::GROUP_MERGE_FAILED: {
+ return state;
+ }
+ // Fetch the data from RA buffer.
+ case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: {
+ [[fallthrough]];
+ }
+ case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: {
+ if (!GetRABuffer(&lock, new_block, buffer)) {
+ return MERGE_GROUP_STATE::GROUP_INVALID;
+ }
+ return state;
+ }
+ default: {
+ return MERGE_GROUP_STATE::GROUP_INVALID;
+ }
+ }
+ }
+}
+
} // namespace snapshot
} // namespace android