Merge changes I1625d1a6,I2db9cfa2,I59c31318,Ic0ed1a8d,I612374bb into main
* changes:
snapuserd: Move Process ops out of Worker.
snapuserd: Move more fields out of Worker.
snapuserd: Split more methods out of Worker.
snapuserd: Create a ReadWorker class.
snapuserd: Create a MergeWorker class.
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index e5b561b..a9b96e2 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -64,12 +64,13 @@
"dm-snapshot-merge/snapuserd_readahead.cpp",
"snapuserd_buffer.cpp",
"user-space-merge/handler_manager.cpp",
+ "user-space-merge/read_worker.cpp",
"user-space-merge/snapuserd_core.cpp",
- "user-space-merge/snapuserd_dm_user.cpp",
"user-space-merge/snapuserd_merge.cpp",
"user-space-merge/snapuserd_readahead.cpp",
"user-space-merge/snapuserd_transitions.cpp",
"user-space-merge/snapuserd_verify.cpp",
+ "user-space-merge/worker.cpp",
],
static_libs: [
"libbase",
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
index bdba5c0..4105b4b 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
@@ -18,7 +18,9 @@
#include <android-base/logging.h>
+#include "read_worker.h"
#include "snapuserd_core.h"
+#include "snapuserd_merge.h"
namespace android {
namespace snapshot {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
similarity index 88%
rename from fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
rename to fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
index 2b9d14e..dd2996b 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
@@ -14,6 +14,8 @@
* limitations under the License.
*/
+#include "read_worker.h"
+
#include "snapuserd_core.h"
namespace android {
@@ -23,59 +25,24 @@
using namespace android::dm;
using android::base::unique_fd;
-Worker::Worker(const std::string& cow_device, const std::string& backing_device,
- const std::string& control_device, const std::string& misc_name,
- const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd) {
- cow_device_ = cow_device;
- backing_store_device_ = backing_device;
- control_device_ = control_device;
- misc_name_ = misc_name;
- base_path_merge_ = base_path_merge;
- snapuserd_ = snapuserd;
+void ReadWorker::CloseFds() {
+ ctrl_fd_ = {};
+ backing_store_fd_ = {};
+ Worker::CloseFds();
}
-bool Worker::InitializeFds() {
- backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
- if (backing_store_fd_ < 0) {
- SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
- return false;
- }
-
- cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
- if (cow_fd_ < 0) {
- SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
- return false;
- }
-
- ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
- if (ctrl_fd_ < 0) {
- SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
- return false;
- }
-
- // Base device used by merge thread
- base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR));
- if (base_path_merge_fd_ < 0) {
- SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_;
- return false;
- }
-
- return true;
-}
-
-bool Worker::InitReader() {
- reader_ = snapuserd_->CloneReaderForWorker();
-
- if (!reader_->InitForMerge(std::move(cow_fd_))) {
- return false;
- }
- return true;
-}
+ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing_device,
+ const std::string& control_device, const std::string& misc_name,
+ const std::string& base_path_merge,
+ std::shared_ptr<SnapshotHandler> snapuserd)
+ : Worker(cow_device, misc_name, base_path_merge, snapuserd),
+ backing_store_device_(backing_device),
+ control_device_(control_device) {}
// Start the replace operation. This will read the
// internal COW format and if the block is compressed,
// it will be de-compressed.
-bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (!buffer) {
SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer";
@@ -88,7 +55,7 @@
return true;
}
-bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
+bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
@@ -118,7 +85,7 @@
// Start the copy operation. This will read the backing
// block device which is represented by cow_op->source.
-bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op) {
if (!ReadFromSourceDevice(cow_op)) {
return false;
}
@@ -126,7 +93,7 @@
return true;
}
-bool Worker::ProcessXorOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessXorOp(const CowOperation* cow_op) {
if (!ReadFromSourceDevice(cow_op)) {
return false;
}
@@ -153,7 +120,7 @@
return true;
}
-bool Worker::ProcessZeroOp() {
+bool ReadWorker::ProcessZeroOp() {
// Zero out the entire block
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
@@ -165,7 +132,7 @@
return true;
}
-bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
@@ -218,7 +185,7 @@
return false;
}
-bool Worker::ProcessCowOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
if (cow_op == nullptr) {
SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
return false;
@@ -246,31 +213,28 @@
return false;
}
-void Worker::InitializeBufsink() {
- // Allocate the buffer which is used to communicate between
- // daemon and dm-user. The buffer comprises of header and a fixed payload.
- // If the dm-user requests a big IO, the IO will be broken into chunks
- // of PAYLOAD_BUFFER_SZ.
- size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_BUFFER_SZ;
- bufsink_.Initialize(buf_size);
-}
+bool ReadWorker::Init() {
+ if (!Worker::Init()) {
+ return false;
+ }
-bool Worker::Init() {
- InitializeBufsink();
+ backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
+ if (backing_store_fd_ < 0) {
+ SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
+ return false;
+ }
+
+ ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
+ if (ctrl_fd_ < 0) {
+ SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
+ return false;
+ }
+
xorsink_.Initialize(&bufsink_, BLOCK_SZ);
-
- if (!InitializeFds()) {
- return false;
- }
-
- if (!InitReader()) {
- return false;
- }
-
return true;
}
-bool Worker::RunThread() {
+bool ReadWorker::Run() {
SNAP_LOG(INFO) << "Processing snapshot I/O requests....";
if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
@@ -291,7 +255,7 @@
}
// Send the payload/data back to dm-user misc device.
-bool Worker::WriteDmUserPayload(size_t size) {
+bool ReadWorker::WriteDmUserPayload(size_t size) {
size_t payload_size = size;
void* buf = bufsink_.GetPayloadBufPtr();
if (header_response_) {
@@ -310,7 +274,7 @@
return true;
}
-bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
+bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
CHECK(read_size <= BLOCK_SZ);
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
@@ -329,7 +293,7 @@
return true;
}
-bool Worker::ReadAlignedSector(sector_t sector, size_t sz) {
+bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
size_t remaining_size = sz;
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
int ret = 0;
@@ -389,7 +353,7 @@
return true;
}
-int Worker::ReadUnalignedSector(
+int ReadWorker::ReadUnalignedSector(
sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
size_t skip_sector_size = 0;
@@ -424,7 +388,7 @@
return std::min(size, (BLOCK_SZ - skip_sector_size));
}
-bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
+bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
bufsink_.ResetBufferOffset();
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
@@ -563,7 +527,7 @@
return true;
}
-void Worker::RespondIOError() {
+void ReadWorker::RespondIOError() {
struct dm_user_header* header = bufsink_.GetHeaderPtr();
header->type = DM_USER_RESP_ERROR;
// This is an issue with the dm-user interface. There
@@ -580,7 +544,7 @@
WriteDmUserPayload(0);
}
-bool Worker::DmuserReadRequest() {
+bool ReadWorker::DmuserReadRequest() {
struct dm_user_header* header = bufsink_.GetHeaderPtr();
// Unaligned I/O request
@@ -591,7 +555,7 @@
return ReadAlignedSector(header->sector, header->len);
}
-bool Worker::ProcessIORequest() {
+bool ReadWorker::ProcessIORequest() {
// Read Header from dm-user misc device. This gives
// us the sector number for which IO is issued by dm-snapshot device
struct dm_user_header* header = bufsink_.GetHeaderPtr();
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
new file mode 100644
index 0000000..c3a4c34
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
@@ -0,0 +1,70 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <utility>
+#include <vector>
+
+#include "worker.h"
+
+namespace android {
+namespace snapshot {
+
+class ReadWorker : public Worker {
+ public:
+ ReadWorker(const std::string& cow_device, const std::string& backing_device,
+ const std::string& control_device, const std::string& misc_name,
+ const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
+
+ bool Run();
+ bool Init() override;
+ void CloseFds() override;
+
+ private:
+ // Functions interacting with dm-user
+ bool ProcessIORequest();
+ bool WriteDmUserPayload(size_t size);
+ bool DmuserReadRequest();
+ void RespondIOError();
+
+ bool ProcessCowOp(const CowOperation* cow_op);
+ bool ProcessXorOp(const CowOperation* cow_op);
+ bool ProcessOrderedOp(const CowOperation* cow_op);
+ bool ProcessCopyOp(const CowOperation* cow_op);
+ bool ProcessReplaceOp(const CowOperation* cow_op);
+ bool ProcessZeroOp();
+
+ bool ReadAlignedSector(sector_t sector, size_t sz);
+ bool ReadUnalignedSector(sector_t sector, size_t size);
+ int ReadUnalignedSector(sector_t sector, size_t size,
+ std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
+ bool ReadFromSourceDevice(const CowOperation* cow_op);
+ bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
+
+ constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
+ constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
+
+ std::string backing_store_device_;
+ unique_fd backing_store_fd_;
+
+ std::string control_device_;
+ unique_fd ctrl_fd_;
+
+ XorSink xorsink_;
+ bool header_response_ = false;
+};
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index 8e1212b..e52d752 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -23,6 +23,9 @@
#include <android-base/scopeguard.h>
#include <android-base/strings.h>
+#include "read_worker.h"
+#include "snapuserd_merge.h"
+
namespace android {
namespace snapshot {
@@ -46,9 +49,8 @@
bool SnapshotHandler::InitializeWorkers() {
for (int i = 0; i < num_worker_threads_; i++) {
- std::unique_ptr<Worker> wt =
- std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
- misc_name_, base_path_merge_, GetSharedPtr());
+ auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, control_device_,
+ misc_name_, base_path_merge_, GetSharedPtr());
if (!wt->Init()) {
SNAP_LOG(ERROR) << "Thread initialization failed";
return false;
@@ -57,8 +59,8 @@
worker_threads_.push_back(std::move(wt));
}
- merge_thread_ = std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
- misc_name_, base_path_merge_, GetSharedPtr());
+ merge_thread_ = std::make_unique<MergeWorker>(cow_device_, misc_name_, base_path_merge_,
+ GetSharedPtr());
read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
GetSharedPtr());
@@ -312,11 +314,11 @@
// Launch worker threads
for (int i = 0; i < worker_threads_.size(); i++) {
threads.emplace_back(
- std::async(std::launch::async, &Worker::RunThread, worker_threads_[i].get()));
+ std::async(std::launch::async, &ReadWorker::Run, worker_threads_[i].get()));
}
std::future<bool> merge_thread =
- std::async(std::launch::async, &Worker::RunMergeThread, merge_thread_.get());
+ std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get());
// Now that the worker threads are up, scan the partitions.
if (perform_verification_) {
@@ -452,5 +454,11 @@
return update_verify_->CheckPartitionVerification();
}
+void SnapshotHandler::FreeResources() {
+ worker_threads_.clear();
+ read_ahead_thread_ = nullptr;
+ merge_thread_ = nullptr;
+}
+
} // namespace snapshot
} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index c984a61..cdc38c0 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -75,7 +75,8 @@
READ_AHEAD_FAILURE,
};
-class SnapshotHandler;
+class MergeWorker;
+class ReadWorker;
enum class MERGE_GROUP_STATE {
GROUP_MERGE_PENDING,
@@ -98,102 +99,6 @@
: merge_state_(state), num_ios_in_progress(n_ios) {}
};
-class Worker {
- public:
- Worker(const std::string& cow_device, const std::string& backing_device,
- const std::string& control_device, const std::string& misc_name,
- const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
- bool RunThread();
- bool RunMergeThread();
- bool Init();
-
- private:
- // Initialization
- void InitializeBufsink();
- bool InitializeFds();
- bool InitReader();
- void CloseFds() {
- ctrl_fd_ = {};
- backing_store_fd_ = {};
- base_path_merge_fd_ = {};
- }
-
- // Functions interacting with dm-user
- bool WriteDmUserPayload(size_t size);
- bool DmuserReadRequest();
-
- // IO Path
- bool ProcessIORequest();
- bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
-
- bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
- bool ReadFromSourceDevice(const CowOperation* cow_op);
-
- bool ReadAlignedSector(sector_t sector, size_t sz);
- bool ReadUnalignedSector(sector_t sector, size_t size);
- int ReadUnalignedSector(sector_t sector, size_t size,
- std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
- void RespondIOError();
-
- // Processing COW operations
- bool ProcessCowOp(const CowOperation* cow_op);
- bool ProcessReplaceOp(const CowOperation* cow_op);
- bool ProcessZeroOp();
-
- // Handles Copy and Xor
- bool ProcessCopyOp(const CowOperation* cow_op);
- bool ProcessXorOp(const CowOperation* cow_op);
- bool ProcessOrderedOp(const CowOperation* cow_op);
-
- // Merge related ops
- bool Merge();
- bool AsyncMerge();
- bool SyncMerge();
- bool MergeOrderedOps();
- bool MergeOrderedOpsAsync();
- bool MergeReplaceZeroOps();
- int PrepareMerge(uint64_t* source_offset, int* pending_ops,
- std::vector<const CowOperation*>* replace_zero_vec = nullptr);
-
- sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
- chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
-
- bool InitializeIouring();
- void FinalizeIouring();
-
- std::unique_ptr<CowReader> reader_;
- BufferSink bufsink_;
- XorSink xorsink_;
-
- std::string cow_device_;
- std::string backing_store_device_;
- std::string control_device_;
- std::string misc_name_;
- std::string base_path_merge_;
-
- unique_fd cow_fd_;
- unique_fd backing_store_fd_;
- unique_fd base_path_merge_fd_;
- unique_fd ctrl_fd_;
- bool header_response_ = false;
-
- std::unique_ptr<ICowOpIter> cowop_iter_;
- size_t ra_block_index_ = 0;
- uint64_t blocks_merged_in_group_ = 0;
- bool merge_async_ = false;
- // Queue depth of 8 seems optimal. We don't want
- // to have a huge depth as it may put more memory pressure
- // on the kernel worker threads given that we use
- // IOSQE_ASYNC flag - ASYNC flags can potentially
- // result in EINTR; Since we don't restart
- // syscalls and fallback to synchronous I/O, we
- // don't want huge queue depth
- int queue_depth_ = 8;
- std::unique_ptr<struct io_uring> ring_;
-
- std::shared_ptr<SnapshotHandler> snapuserd_;
-};
-
class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
public:
SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device,
@@ -212,11 +117,7 @@
bool CommitMerge(int num_merge_ops);
void CloseFds() { cow_fd_ = {}; }
- void FreeResources() {
- worker_threads_.clear();
- read_ahead_thread_ = nullptr;
- merge_thread_ = nullptr;
- }
+ void FreeResources();
bool InitializeWorkers();
std::unique_ptr<CowReader> CloneReaderForWorker();
@@ -315,7 +216,7 @@
void* mapped_addr_;
size_t total_mapped_addr_length_;
- std::vector<std::unique_ptr<Worker>> worker_threads_;
+ std::vector<std::unique_ptr<ReadWorker>> worker_threads_;
// Read-ahead related
bool populate_data_from_cow_ = false;
bool ra_thread_ = false;
@@ -330,7 +231,7 @@
// Merge Block state
std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
- std::unique_ptr<Worker> merge_thread_;
+ std::unique_ptr<MergeWorker> merge_thread_;
double merge_completion_percentage_;
bool merge_initiated_ = false;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
index ce95b76..563f6ad 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include "snapuserd_merge.h"
#include "snapuserd_core.h"
@@ -23,8 +24,13 @@
using namespace android::dm;
using android::base::unique_fd;
-int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
- std::vector<const CowOperation*>* replace_zero_vec) {
+MergeWorker::MergeWorker(const std::string& cow_device, const std::string& misc_name,
+ const std::string& base_path_merge,
+ std::shared_ptr<SnapshotHandler> snapuserd)
+ : Worker(cow_device, misc_name, base_path_merge, snapuserd) {}
+
+int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
+ std::vector<const CowOperation*>* replace_zero_vec) {
int num_ops = *pending_ops;
int nr_consecutive = 0;
bool checkOrderedOp = (replace_zero_vec == nullptr);
@@ -70,7 +76,7 @@
return nr_consecutive;
}
-bool Worker::MergeReplaceZeroOps() {
+bool MergeWorker::MergeReplaceZeroOps() {
// Flush after merging 2MB. Since all ops are independent and there is no
// dependency between COW ops, we will flush the data and the number
// of ops merged in COW block device. If there is a crash, we will
@@ -99,17 +105,20 @@
for (size_t i = 0; i < replace_zero_vec.size(); i++) {
const CowOperation* cow_op = replace_zero_vec[i];
+
+ void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+ if (!buffer) {
+ SNAP_LOG(ERROR) << "Failed to acquire buffer in merge";
+ return false;
+ }
if (cow_op->type == kCowReplaceOp) {
- if (!ProcessReplaceOp(cow_op)) {
- SNAP_LOG(ERROR) << "Merge - ReplaceOp failed for block: " << cow_op->new_block;
+ if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
+ SNAP_LOG(ERROR) << "Failed to read COW in merge";
return false;
}
} else {
CHECK(cow_op->type == kCowZeroOp);
- if (!ProcessZeroOp()) {
- SNAP_LOG(ERROR) << "Merge ZeroOp failed.";
- return false;
- }
+ memset(buffer, 0, BLOCK_SZ);
}
bufsink_.UpdateBufferOffset(BLOCK_SZ);
@@ -149,7 +158,7 @@
if (snapuserd_->IsIOTerminated()) {
SNAP_LOG(ERROR)
- << "MergeReplaceZeroOps: Worker threads terminated - shutting down merge";
+ << "MergeReplaceZeroOps: MergeWorker threads terminated - shutting down merge";
return false;
}
}
@@ -173,7 +182,7 @@
return true;
}
-bool Worker::MergeOrderedOpsAsync() {
+bool MergeWorker::MergeOrderedOpsAsync() {
void* mapped_addr = snapuserd_->GetMappedAddr();
void* read_ahead_buffer =
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
@@ -354,7 +363,7 @@
return true;
}
-bool Worker::MergeOrderedOps() {
+bool MergeWorker::MergeOrderedOps() {
void* mapped_addr = snapuserd_->GetMappedAddr();
void* read_ahead_buffer =
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
@@ -439,7 +448,7 @@
return true;
}
-bool Worker::AsyncMerge() {
+bool MergeWorker::AsyncMerge() {
if (!MergeOrderedOpsAsync()) {
SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
// Reset the iter so that we retry the merge
@@ -455,7 +464,7 @@
return true;
}
-bool Worker::SyncMerge() {
+bool MergeWorker::SyncMerge() {
if (!MergeOrderedOps()) {
SNAP_LOG(ERROR) << "Merge failed for ordered ops";
return false;
@@ -465,7 +474,7 @@
return true;
}
-bool Worker::Merge() {
+bool MergeWorker::Merge() {
cowop_iter_ = reader_->GetOpIter(true);
bool retry = false;
@@ -511,7 +520,7 @@
return true;
}
-bool Worker::InitializeIouring() {
+bool MergeWorker::InitializeIouring() {
if (!snapuserd_->IsIouringSupported()) {
return false;
}
@@ -530,13 +539,13 @@
return true;
}
-void Worker::FinalizeIouring() {
+void MergeWorker::FinalizeIouring() {
if (merge_async_) {
io_uring_queue_exit(ring_.get());
}
}
-bool Worker::RunMergeThread() {
+bool MergeWorker::Run() {
SNAP_LOG(DEBUG) << "Waiting for merge begin...";
if (!snapuserd_->WaitForMergeBegin()) {
SNAP_LOG(ERROR) << "Merge terminated early...";
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.h
new file mode 100644
index 0000000..f35147f
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.h
@@ -0,0 +1,58 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#pragma once
+
+#include "worker.h"
+
+#include <liburing.h>
+
+namespace android {
+namespace snapshot {
+
+class MergeWorker : public Worker {
+ public:
+ MergeWorker(const std::string& cow_device, const std::string& misc_name,
+ const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
+ bool Run();
+
+ private:
+ int PrepareMerge(uint64_t* source_offset, int* pending_ops,
+ std::vector<const CowOperation*>* replace_zero_vec = nullptr);
+ bool MergeReplaceZeroOps();
+ bool MergeOrderedOps();
+ bool MergeOrderedOpsAsync();
+ bool Merge();
+ bool AsyncMerge();
+ bool SyncMerge();
+ bool InitializeIouring();
+ void FinalizeIouring();
+
+ private:
+ std::unique_ptr<ICowOpIter> cowop_iter_;
+ std::unique_ptr<struct io_uring> ring_;
+ size_t ra_block_index_ = 0;
+ uint64_t blocks_merged_in_group_ = 0;
+ bool merge_async_ = false;
+ // Queue depth of 8 seems optimal. We don't want
+ // to have a huge depth as it may put more memory pressure
+ // on the kernel worker threads given that we use
+ // IOSQE_ASYNC flag - ASYNC flags can potentially
+ // result in EINTR; Since we don't restart
+ // syscalls and fallback to synchronous I/O, we
+ // don't want huge queue depth
+ int queue_depth_ = 8;
+};
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.cpp
new file mode 100644
index 0000000..aa15630
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.cpp
@@ -0,0 +1,80 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "worker.h"
+
+#include "snapuserd_core.h"
+
+namespace android {
+namespace snapshot {
+
+Worker::Worker(const std::string& cow_device, const std::string& misc_name,
+ const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd) {
+ cow_device_ = cow_device;
+ misc_name_ = misc_name;
+ base_path_merge_ = base_path_merge;
+ snapuserd_ = snapuserd;
+}
+
+void Worker::InitializeBufsink() {
+ // Allocate the buffer which is used to communicate between
+ // daemon and dm-user. The buffer comprises of header and a fixed payload.
+ // If the dm-user requests a big IO, the IO will be broken into chunks
+ // of PAYLOAD_BUFFER_SZ.
+ size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_BUFFER_SZ;
+ bufsink_.Initialize(buf_size);
+}
+
+bool Worker::Init() {
+ InitializeBufsink();
+
+ if (!InitializeFds()) {
+ return false;
+ }
+
+ if (!InitReader()) {
+ return false;
+ }
+
+ return true;
+}
+
+bool Worker::InitReader() {
+ reader_ = snapuserd_->CloneReaderForWorker();
+
+ if (!reader_->InitForMerge(std::move(cow_fd_))) {
+ return false;
+ }
+ return true;
+}
+
+bool Worker::InitializeFds() {
+ cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
+ if (cow_fd_ < 0) {
+ SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
+ return false;
+ }
+
+ // Base device used by merge thread
+ base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR));
+ if (base_path_merge_fd_ < 0) {
+ SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_;
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.h
new file mode 100644
index 0000000..813b159
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.h
@@ -0,0 +1,65 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <stddef.h>
+
+#include <memory>
+#include <string>
+
+#include <android-base/unique_fd.h>
+#include <libsnapshot/cow_reader.h>
+#include <snapuserd/snapuserd_buffer.h>
+#include <snapuserd/snapuserd_kernel.h>
+
+namespace android {
+namespace snapshot {
+
+using android::base::unique_fd;
+
+class SnapshotHandler;
+
+class Worker {
+ public:
+ Worker(const std::string& cow_device, const std::string& misc_name,
+ const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
+ virtual ~Worker() = default;
+
+ virtual bool Init();
+
+ protected:
+ // Initialization
+ void InitializeBufsink();
+ bool InitializeFds();
+ bool InitReader();
+ virtual void CloseFds() { base_path_merge_fd_ = {}; }
+
+ std::unique_ptr<CowReader> reader_;
+ BufferSink bufsink_;
+
+ std::string misc_name_; // Needed for SNAP_LOG.
+
+ unique_fd base_path_merge_fd_;
+
+ std::shared_ptr<SnapshotHandler> snapuserd_;
+
+ private:
+ std::string cow_device_;
+ std::string base_path_merge_;
+ unique_fd cow_fd_;
+};
+
+} // namespace snapshot
+} // namespace android