RESTRICT AUTOMERGE: snapuserd: Split the server into two classes.
This splits server into two classes: one that handles the IPC requests,
and one that managers snapshot handlers. This will allow embedding of
the snapshot handling code, without booting up a server. It'll also make
testing much easier.
The handler code has been further placed behind a virtual interface,
though this is likely unnecessary unless we start testing the server
logic itself (or attempt to unify the S and T+ versions of snapuserd).
Bug: 269361087
Test: ota
Change-Id: I3ca3ad9c8c1fb910a1c7bf9f44165e2f44c930c9
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index 9261482..7fcaac5 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -62,6 +62,7 @@
"dm-snapshot-merge/snapuserd_worker.cpp",
"dm-snapshot-merge/snapuserd_readahead.cpp",
"snapuserd_buffer.cpp",
+ "user-space-merge/handler_manager.cpp",
"user-space-merge/snapuserd_core.cpp",
"user-space-merge/snapuserd_dm_user.cpp",
"user-space-merge/snapuserd_merge.cpp",
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
index bfe93eb..0557214 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
@@ -114,7 +114,7 @@
return false;
}
auto handler = user_server_.AddHandler(parts[0], parts[1], parts[2], parts[3]);
- if (!handler || !user_server_.StartHandler(handler)) {
+ if (!handler || !user_server_.StartHandler(parts[0])) {
return false;
}
}
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
new file mode 100644
index 0000000..c5150c4
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
@@ -0,0 +1,371 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "handler_manager.h"
+
+#include <sys/eventfd.h>
+
+#include <android-base/logging.h>
+
+#include "snapuserd_core.h"
+
+namespace android {
+namespace snapshot {
+
+static constexpr uint8_t kMaxMergeThreads = 2;
+
+void HandlerThread::FreeResources() {
+ // Each worker thread holds a reference to snapuserd.
+ // Clear them so that all the resources
+ // held by snapuserd is released
+ if (snapuserd_) {
+ snapuserd_->FreeResources();
+ snapuserd_ = nullptr;
+ }
+}
+
+SnapshotHandlerManager::SnapshotHandlerManager() {
+ monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
+ if (monitor_merge_event_fd_ == -1) {
+ PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
+ }
+}
+
+std::shared_ptr<HandlerThread> SnapshotHandlerManager::AddHandler(
+ const std::string& misc_name, const std::string& cow_device_path,
+ const std::string& backing_device, const std::string& base_path_merge,
+ int num_worker_threads, bool use_iouring, bool perform_verification) {
+ auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
+ base_path_merge, num_worker_threads,
+ use_iouring, perform_verification);
+ if (!snapuserd->InitCowDevice()) {
+ LOG(ERROR) << "Failed to initialize Snapuserd";
+ return nullptr;
+ }
+
+ if (!snapuserd->InitializeWorkers()) {
+ LOG(ERROR) << "Failed to initialize workers";
+ return nullptr;
+ }
+
+ auto handler = std::make_shared<HandlerThread>(snapuserd);
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ if (FindHandler(&lock, misc_name) != dm_users_.end()) {
+ LOG(ERROR) << "Handler already exists: " << misc_name;
+ return nullptr;
+ }
+ dm_users_.push_back(handler);
+ }
+ return handler;
+}
+
+bool SnapshotHandlerManager::StartHandler(const std::string& misc_name) {
+ std::lock_guard<std::mutex> lock(lock_);
+ auto iter = FindHandler(&lock, misc_name);
+ if (iter == dm_users_.end()) {
+ LOG(ERROR) << "Could not find handler: " << misc_name;
+ return false;
+ }
+ if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
+ LOG(ERROR) << "Tried to re-attach control device: " << misc_name;
+ return false;
+ }
+ if (!StartHandler(*iter)) {
+ return false;
+ }
+ return true;
+}
+
+bool SnapshotHandlerManager::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
+ if (handler->snapuserd()->IsAttached()) {
+ LOG(ERROR) << "Handler already attached";
+ return false;
+ }
+
+ handler->snapuserd()->AttachControlDevice();
+
+ handler->thread() = std::thread(std::bind(&SnapshotHandlerManager::RunThread, this, handler));
+ return true;
+}
+
+bool SnapshotHandlerManager::DeleteHandler(const std::string& misc_name) {
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ auto iter = FindHandler(&lock, misc_name);
+ if (iter == dm_users_.end()) {
+ // After merge is completed, we swap dm-user table with
+ // the underlying dm-linear base device. Hence, worker
+ // threads would have terminted and was removed from
+ // the list.
+ LOG(DEBUG) << "Could not find handler: " << misc_name;
+ return true;
+ }
+
+ if (!(*iter)->ThreadTerminated()) {
+ (*iter)->snapuserd()->NotifyIOTerminated();
+ }
+ }
+ if (!RemoveAndJoinHandler(misc_name)) {
+ return false;
+ }
+ return true;
+}
+
+void SnapshotHandlerManager::RunThread(std::shared_ptr<HandlerThread> handler) {
+ LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
+
+ if (!handler->snapuserd()->Start()) {
+ LOG(ERROR) << " Failed to launch all worker threads";
+ }
+
+ handler->snapuserd()->CloseFds();
+ bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
+ handler->snapuserd()->UnmapBufferRegion();
+
+ auto misc_name = handler->misc_name();
+ LOG(INFO) << "Handler thread about to exit: " << misc_name;
+
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ if (merge_completed) {
+ num_partitions_merge_complete_ += 1;
+ active_merge_threads_ -= 1;
+ WakeupMonitorMergeThread();
+ }
+ handler->SetThreadTerminated();
+ auto iter = FindHandler(&lock, handler->misc_name());
+ if (iter == dm_users_.end()) {
+ // RemoveAndJoinHandler() already removed us from the list, and is
+ // now waiting on a join(), so just return. Additionally, release
+ // all the resources held by snapuserd object which are shared
+ // by worker threads. This should be done when the last reference
+ // of "handler" is released; but we will explicitly release here
+ // to make sure snapuserd object is freed as it is the biggest
+ // consumer of memory in the daemon.
+ handler->FreeResources();
+ LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
+ return;
+ }
+
+ LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
+
+ if (handler->snapuserd()->IsAttached()) {
+ handler->thread().detach();
+ }
+
+ // Important: free resources within the lock. This ensures that if
+ // WaitForDelete() is called, the handler is either in the list, or
+ // it's not and its resources are guaranteed to be freed.
+ handler->FreeResources();
+ dm_users_.erase(iter);
+ }
+}
+
+bool SnapshotHandlerManager::InitiateMerge(const std::string& misc_name) {
+ std::lock_guard<std::mutex> lock(lock_);
+ auto iter = FindHandler(&lock, misc_name);
+ if (iter == dm_users_.end()) {
+ LOG(ERROR) << "Could not find handler: " << misc_name;
+ return false;
+ }
+
+ return StartMerge(&lock, *iter);
+}
+
+bool SnapshotHandlerManager::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+ const std::shared_ptr<HandlerThread>& handler) {
+ CHECK(proof_of_lock);
+
+ if (!handler->snapuserd()->IsAttached()) {
+ LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
+ return false;
+ }
+
+ handler->snapuserd()->MonitorMerge();
+
+ if (!is_merge_monitor_started_) {
+ std::thread(&SnapshotHandlerManager::MonitorMerge, this).detach();
+ is_merge_monitor_started_ = true;
+ }
+
+ merge_handlers_.push(handler);
+ WakeupMonitorMergeThread();
+ return true;
+}
+
+void SnapshotHandlerManager::WakeupMonitorMergeThread() {
+ uint64_t notify = 1;
+ ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), ¬ify, sizeof(notify)));
+ if (rc < 0) {
+ PLOG(FATAL) << "failed to notify monitor merge thread";
+ }
+}
+
+void SnapshotHandlerManager::MonitorMerge() {
+ while (!stop_monitor_merge_thread_) {
+ uint64_t testVal;
+ ssize_t ret =
+ TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
+ if (ret == -1) {
+ PLOG(FATAL) << "Failed to read from eventfd";
+ } else if (ret == 0) {
+ LOG(FATAL) << "Hit EOF on eventfd";
+ }
+
+ LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
+ auto handler = merge_handlers_.front();
+ merge_handlers_.pop();
+
+ if (!handler->snapuserd()) {
+ LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
+ continue;
+ }
+
+ LOG(INFO) << "Starting merge for partition: "
+ << handler->snapuserd()->GetMiscName();
+ handler->snapuserd()->InitiateMerge();
+ active_merge_threads_ += 1;
+ }
+ }
+ }
+
+ LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
+}
+
+std::string SnapshotHandlerManager::GetMergeStatus(const std::string& misc_name) {
+ std::lock_guard<std::mutex> lock(lock_);
+ auto iter = FindHandler(&lock, misc_name);
+ if (iter == dm_users_.end()) {
+ LOG(ERROR) << "Could not find handler: " << misc_name;
+ return {};
+ }
+
+ return (*iter)->snapuserd()->GetMergeStatus();
+}
+
+double SnapshotHandlerManager::GetMergePercentage() {
+ std::lock_guard<std::mutex> lock(lock_);
+
+ double percentage = 0.0;
+ int n = 0;
+
+ for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+ auto& th = (*iter)->thread();
+ if (th.joinable()) {
+ // Merge percentage by individual partitions wherein merge is still
+ // in-progress
+ percentage += (*iter)->snapuserd()->GetMergePercentage();
+ n += 1;
+ }
+ }
+
+ // Calculate final merge including those partitions where merge was already
+ // completed - num_partitions_merge_complete_ will track them when each
+ // thread exists in RunThread.
+ int total_partitions = n + num_partitions_merge_complete_;
+
+ if (total_partitions) {
+ percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
+ }
+
+ LOG(DEBUG) << "Merge %: " << percentage
+ << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
+ << " total_partitions: " << total_partitions << " n: " << n;
+ return percentage;
+}
+
+bool SnapshotHandlerManager::GetVerificationStatus() {
+ std::lock_guard<std::mutex> lock(lock_);
+
+ bool status = true;
+ for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+ auto& th = (*iter)->thread();
+ if (th.joinable() && status) {
+ status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
+ } else {
+ // return immediately if there is a failure
+ return false;
+ }
+ }
+
+ return status;
+}
+
+bool SnapshotHandlerManager::RemoveAndJoinHandler(const std::string& misc_name) {
+ std::shared_ptr<HandlerThread> handler;
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+
+ auto iter = FindHandler(&lock, misc_name);
+ if (iter == dm_users_.end()) {
+ // Client already deleted.
+ return true;
+ }
+ handler = std::move(*iter);
+ dm_users_.erase(iter);
+ }
+
+ auto& th = handler->thread();
+ if (th.joinable()) {
+ th.join();
+ }
+ return true;
+}
+
+void SnapshotHandlerManager::TerminateMergeThreads() {
+ std::lock_guard<std::mutex> guard(lock_);
+
+ for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+ if (!(*iter)->ThreadTerminated()) {
+ (*iter)->snapuserd()->NotifyIOTerminated();
+ }
+ }
+}
+
+void SnapshotHandlerManager::JoinAllThreads() {
+ // Acquire the thread list within the lock.
+ std::vector<std::shared_ptr<HandlerThread>> dm_users;
+ {
+ std::lock_guard<std::mutex> guard(lock_);
+ dm_users = std::move(dm_users_);
+ }
+
+ for (auto& client : dm_users) {
+ auto& th = client->thread();
+
+ if (th.joinable()) th.join();
+ }
+
+ stop_monitor_merge_thread_ = true;
+ WakeupMonitorMergeThread();
+}
+
+auto SnapshotHandlerManager::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
+ const std::string& misc_name) -> HandlerList::iterator {
+ CHECK(proof_of_lock);
+
+ for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+ if ((*iter)->misc_name() == misc_name) {
+ return iter;
+ }
+ }
+ return dm_users_.end();
+}
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
new file mode 100644
index 0000000..b7ddac1
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
@@ -0,0 +1,131 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <memory>
+#include <queue>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <android-base/unique_fd.h>
+
+namespace android {
+namespace snapshot {
+
+class SnapshotHandler;
+
+class HandlerThread {
+ public:
+ explicit HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd);
+
+ void FreeResources();
+ const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
+ std::thread& thread() { return thread_; }
+
+ const std::string& misc_name() const { return misc_name_; }
+ bool ThreadTerminated() { return thread_terminated_; }
+ void SetThreadTerminated() { thread_terminated_ = true; }
+
+ private:
+ std::thread thread_;
+ std::shared_ptr<SnapshotHandler> snapuserd_;
+ std::string misc_name_;
+ bool thread_terminated_ = false;
+};
+
+class ISnapshotHandlerManager {
+ public:
+ virtual ~ISnapshotHandlerManager() {}
+
+ // Add a new snapshot handler but do not start serving requests yet.
+ virtual std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
+ const std::string& cow_device_path,
+ const std::string& backing_device,
+ const std::string& base_path_merge,
+ int num_worker_threads, bool use_iouring,
+ bool perform_verification) = 0;
+
+ // Start serving requests on a snapshot handler.
+ virtual bool StartHandler(const std::string& misc_name) = 0;
+
+ // Stop serving requests on a snapshot handler and remove it.
+ virtual bool DeleteHandler(const std::string& misc_name) = 0;
+
+ // Begin merging blocks on the given snapshot handler.
+ virtual bool InitiateMerge(const std::string& misc_name) = 0;
+
+ // Return a string containing a status code indicating the merge status
+ // on the handler. Returns empty on error.
+ virtual std::string GetMergeStatus(const std::string& misc_name) = 0;
+
+ // Wait until all handlers have terminated.
+ virtual void JoinAllThreads() = 0;
+
+ // Stop any in-progress merge threads.
+ virtual void TerminateMergeThreads() = 0;
+
+ // Returns the merge progress across all merging snapshot handlers.
+ virtual double GetMergePercentage() = 0;
+
+ // Returns whether all snapshots have verified.
+ virtual bool GetVerificationStatus() = 0;
+};
+
+class SnapshotHandlerManager final : public ISnapshotHandlerManager {
+ public:
+ SnapshotHandlerManager();
+ std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
+ const std::string& cow_device_path,
+ const std::string& backing_device,
+ const std::string& base_path_merge,
+ int num_worker_threads, bool use_iouring,
+ bool perform_verification) override;
+ bool StartHandler(const std::string& misc_name) override;
+ bool DeleteHandler(const std::string& misc_name) override;
+ bool InitiateMerge(const std::string& misc_name) override;
+ std::string GetMergeStatus(const std::string& misc_name) override;
+ void JoinAllThreads() override;
+ void TerminateMergeThreads() override;
+ double GetMergePercentage() override;
+ bool GetVerificationStatus() override;
+
+ private:
+ bool StartHandler(const std::shared_ptr<HandlerThread>& handler);
+ void RunThread(std::shared_ptr<HandlerThread> handler);
+ bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+ const std::shared_ptr<HandlerThread>& handler);
+ void MonitorMerge();
+ void WakeupMonitorMergeThread();
+ bool RemoveAndJoinHandler(const std::string& misc_name);
+
+ // Find a HandlerThread within a lock.
+ using HandlerList = std::vector<std::shared_ptr<HandlerThread>>;
+ HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
+ const std::string& misc_name);
+
+ std::mutex lock_;
+ HandlerList dm_users_;
+
+ bool is_merge_monitor_started_ = false;
+ bool stop_monitor_merge_thread_ = false;
+ int active_merge_threads_ = 0;
+ int num_partitions_merge_complete_ = 0;
+ std::queue<std::shared_ptr<HandlerThread>> merge_handlers_;
+ android::base::unique_fd monitor_merge_event_fd_;
+};
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
index b7ce210..cc8115f 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
@@ -62,11 +62,8 @@
}
UserSnapshotServer::UserSnapshotServer() {
- monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
- if (monitor_merge_event_fd_ == -1) {
- PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
- }
terminating_ = false;
+ handlers_ = std::make_unique<SnapshotHandlerManager>();
}
UserSnapshotServer::~UserSnapshotServer() {
@@ -99,7 +96,7 @@
void UserSnapshotServer::ShutdownThreads() {
terminating_ = true;
- JoinAllThreads();
+ handlers_->JoinAllThreads();
}
HandlerThread::HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd)
@@ -166,17 +163,7 @@
return Sendmsg(fd, "fail");
}
- std::lock_guard<std::mutex> lock(lock_);
- auto iter = FindHandler(&lock, out[1]);
- if (iter == dm_users_.end()) {
- LOG(ERROR) << "Could not find handler: " << out[1];
- return Sendmsg(fd, "fail");
- }
- if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
- LOG(ERROR) << "Tried to re-attach control device: " << out[1];
- return Sendmsg(fd, "fail");
- }
- if (!StartHandler(*iter)) {
+ if (!handlers_->StartHandler(out[1])) {
return Sendmsg(fd, "fail");
}
return Sendmsg(fd, "success");
@@ -209,30 +196,13 @@
LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
return Sendmsg(fd, "fail");
}
- {
- std::lock_guard<std::mutex> lock(lock_);
- auto iter = FindHandler(&lock, out[1]);
- if (iter == dm_users_.end()) {
- // After merge is completed, we swap dm-user table with
- // the underlying dm-linear base device. Hence, worker
- // threads would have terminted and was removed from
- // the list.
- LOG(DEBUG) << "Could not find handler: " << out[1];
- return Sendmsg(fd, "success");
- }
-
- if (!(*iter)->ThreadTerminated()) {
- (*iter)->snapuserd()->NotifyIOTerminated();
- }
- }
- if (!RemoveAndJoinHandler(out[1])) {
+ if (!handlers_->DeleteHandler(out[1])) {
return Sendmsg(fd, "fail");
}
return Sendmsg(fd, "success");
}
case DaemonOps::DETACH: {
- std::lock_guard<std::mutex> lock(lock_);
- TerminateMergeThreads(&lock);
+ handlers_->TerminateMergeThreads();
terminating_ = true;
return true;
}
@@ -252,24 +222,15 @@
return Sendmsg(fd, "fail");
}
if (out[0] == "initiate_merge") {
- std::lock_guard<std::mutex> lock(lock_);
- auto iter = FindHandler(&lock, out[1]);
- if (iter == dm_users_.end()) {
- LOG(ERROR) << "Could not find handler: " << out[1];
+ if (!handlers_->InitiateMerge(out[1])) {
return Sendmsg(fd, "fail");
}
-
- if (!StartMerge(&lock, *iter)) {
- return Sendmsg(fd, "fail");
- }
-
return Sendmsg(fd, "success");
}
return Sendmsg(fd, "fail");
}
case DaemonOps::PERCENTAGE: {
- std::lock_guard<std::mutex> lock(lock_);
- double percentage = GetMergePercentage(&lock);
+ double percentage = handlers_->GetMergePercentage();
return Sendmsg(fd, std::to_string(percentage));
}
@@ -280,24 +241,16 @@
LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
return Sendmsg(fd, "snapshot-merge-failed");
}
- {
- std::lock_guard<std::mutex> lock(lock_);
- auto iter = FindHandler(&lock, out[1]);
- if (iter == dm_users_.end()) {
- LOG(ERROR) << "Could not find handler: " << out[1];
- return Sendmsg(fd, "snapshot-merge-failed");
- }
-
- std::string merge_status = GetMergeStatus(*iter);
- return Sendmsg(fd, merge_status);
+ auto status = handlers_->GetMergeStatus(out[1]);
+ if (status.empty()) {
+ return Sendmsg(fd, "snapshot-merge-failed");
}
+ return Sendmsg(fd, status);
}
case DaemonOps::UPDATE_VERIFY: {
- std::lock_guard<std::mutex> lock(lock_);
- if (!UpdateVerification(&lock)) {
+ if (!handlers_->GetVerificationStatus()) {
return Sendmsg(fd, "fail");
}
-
return Sendmsg(fd, "success");
}
default: {
@@ -308,56 +261,6 @@
}
}
-void UserSnapshotServer::RunThread(std::shared_ptr<HandlerThread> handler) {
- LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
-
- if (!handler->snapuserd()->Start()) {
- LOG(ERROR) << " Failed to launch all worker threads";
- }
-
- handler->snapuserd()->CloseFds();
- bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
- handler->snapuserd()->UnmapBufferRegion();
-
- auto misc_name = handler->misc_name();
- LOG(INFO) << "Handler thread about to exit: " << misc_name;
-
- {
- std::lock_guard<std::mutex> lock(lock_);
- if (merge_completed) {
- num_partitions_merge_complete_ += 1;
- active_merge_threads_ -= 1;
- WakeupMonitorMergeThread();
- }
- handler->SetThreadTerminated();
- auto iter = FindHandler(&lock, handler->misc_name());
- if (iter == dm_users_.end()) {
- // RemoveAndJoinHandler() already removed us from the list, and is
- // now waiting on a join(), so just return. Additionally, release
- // all the resources held by snapuserd object which are shared
- // by worker threads. This should be done when the last reference
- // of "handler" is released; but we will explicitly release here
- // to make sure snapuserd object is freed as it is the biggest
- // consumer of memory in the daemon.
- handler->FreeResources();
- LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
- return;
- }
-
- LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
-
- if (handler->snapuserd()->IsAttached()) {
- handler->thread().detach();
- }
-
- // Important: free resources within the lock. This ensures that if
- // WaitForDelete() is called, the handler is either in the list, or
- // it's not and its resources are guaranteed to be freed.
- handler->FreeResources();
- dm_users_.erase(iter);
- }
-}
-
bool UserSnapshotServer::Start(const std::string& socketname) {
bool start_listening = true;
@@ -423,28 +326,10 @@
}
}
- JoinAllThreads();
+ handlers_->JoinAllThreads();
return true;
}
-void UserSnapshotServer::JoinAllThreads() {
- // Acquire the thread list within the lock.
- std::vector<std::shared_ptr<HandlerThread>> dm_users;
- {
- std::lock_guard<std::mutex> guard(lock_);
- dm_users = std::move(dm_users_);
- }
-
- for (auto& client : dm_users) {
- auto& th = client->thread();
-
- if (th.joinable()) th.join();
- }
-
- stop_monitor_merge_thread_ = true;
- WakeupMonitorMergeThread();
-}
-
void UserSnapshotServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
struct pollfd p = {};
p.fd = fd.get();
@@ -506,185 +391,13 @@
perform_verification = false;
}
- auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
- base_path_merge, num_worker_threads,
- io_uring_enabled_, perform_verification);
- if (!snapuserd->InitCowDevice()) {
- LOG(ERROR) << "Failed to initialize Snapuserd";
- return nullptr;
- }
-
- if (!snapuserd->InitializeWorkers()) {
- LOG(ERROR) << "Failed to initialize workers";
- return nullptr;
- }
-
- auto handler = std::make_shared<HandlerThread>(snapuserd);
- {
- std::lock_guard<std::mutex> lock(lock_);
- if (FindHandler(&lock, misc_name) != dm_users_.end()) {
- LOG(ERROR) << "Handler already exists: " << misc_name;
- return nullptr;
- }
- dm_users_.push_back(handler);
- }
- return handler;
-}
-
-bool UserSnapshotServer::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
- if (handler->snapuserd()->IsAttached()) {
- LOG(ERROR) << "Handler already attached";
- return false;
- }
-
- handler->snapuserd()->AttachControlDevice();
-
- handler->thread() = std::thread(std::bind(&UserSnapshotServer::RunThread, this, handler));
- return true;
-}
-
-bool UserSnapshotServer::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
- const std::shared_ptr<HandlerThread>& handler) {
- CHECK(proof_of_lock);
-
- if (!handler->snapuserd()->IsAttached()) {
- LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
- return false;
- }
-
- handler->snapuserd()->MonitorMerge();
-
- if (!is_merge_monitor_started_.has_value()) {
- std::thread(&UserSnapshotServer::MonitorMerge, this).detach();
- is_merge_monitor_started_ = true;
- }
-
- merge_handlers_.push(handler);
- WakeupMonitorMergeThread();
- return true;
-}
-
-auto UserSnapshotServer::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
- const std::string& misc_name) -> HandlerList::iterator {
- CHECK(proof_of_lock);
-
- for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
- if ((*iter)->misc_name() == misc_name) {
- return iter;
- }
- }
- return dm_users_.end();
-}
-
-void UserSnapshotServer::TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock) {
- CHECK(proof_of_lock);
-
- for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
- if (!(*iter)->ThreadTerminated()) {
- (*iter)->snapuserd()->NotifyIOTerminated();
- }
- }
-}
-
-std::string UserSnapshotServer::GetMergeStatus(const std::shared_ptr<HandlerThread>& handler) {
- return handler->snapuserd()->GetMergeStatus();
-}
-
-double UserSnapshotServer::GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock) {
- CHECK(proof_of_lock);
- double percentage = 0.0;
- int n = 0;
-
- for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
- auto& th = (*iter)->thread();
- if (th.joinable()) {
- // Merge percentage by individual partitions wherein merge is still
- // in-progress
- percentage += (*iter)->snapuserd()->GetMergePercentage();
- n += 1;
- }
- }
-
- // Calculate final merge including those partitions where merge was already
- // completed - num_partitions_merge_complete_ will track them when each
- // thread exists in RunThread.
- int total_partitions = n + num_partitions_merge_complete_;
-
- if (total_partitions) {
- percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
- }
-
- LOG(DEBUG) << "Merge %: " << percentage
- << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
- << " total_partitions: " << total_partitions << " n: " << n;
- return percentage;
-}
-
-bool UserSnapshotServer::RemoveAndJoinHandler(const std::string& misc_name) {
- std::shared_ptr<HandlerThread> handler;
- {
- std::lock_guard<std::mutex> lock(lock_);
-
- auto iter = FindHandler(&lock, misc_name);
- if (iter == dm_users_.end()) {
- // Client already deleted.
- return true;
- }
- handler = std::move(*iter);
- dm_users_.erase(iter);
- }
-
- auto& th = handler->thread();
- if (th.joinable()) {
- th.join();
- }
- return true;
-}
-
-void UserSnapshotServer::WakeupMonitorMergeThread() {
- uint64_t notify = 1;
- ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), ¬ify, sizeof(notify)));
- if (rc < 0) {
- PLOG(FATAL) << "failed to notify monitor merge thread";
- }
-}
-
-void UserSnapshotServer::MonitorMerge() {
- while (!stop_monitor_merge_thread_) {
- uint64_t testVal;
- ssize_t ret =
- TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
- if (ret == -1) {
- PLOG(FATAL) << "Failed to read from eventfd";
- } else if (ret == 0) {
- LOG(FATAL) << "Hit EOF on eventfd";
- }
-
- LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
- {
- std::lock_guard<std::mutex> lock(lock_);
- while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
- auto handler = merge_handlers_.front();
- merge_handlers_.pop();
-
- if (!handler->snapuserd()) {
- LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
- continue;
- }
-
- LOG(INFO) << "Starting merge for partition: "
- << handler->snapuserd()->GetMiscName();
- handler->snapuserd()->InitiateMerge();
- active_merge_threads_ += 1;
- }
- }
- }
-
- LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
+ return handlers_->AddHandler(misc_name, cow_device_path, backing_device, base_path_merge,
+ num_worker_threads, io_uring_enabled_, perform_verification);
}
bool UserSnapshotServer::WaitForSocket() {
- auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
+ auto scope_guard =
+ android::base::make_scope_guard([this]() -> void { handlers_->JoinAllThreads(); });
auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy;
@@ -781,21 +494,8 @@
return true;
}
-bool UserSnapshotServer::UpdateVerification(std::lock_guard<std::mutex>* proof_of_lock) {
- CHECK(proof_of_lock);
-
- bool status = true;
- for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
- auto& th = (*iter)->thread();
- if (th.joinable() && status) {
- status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
- } else {
- // return immediately if there is a failure
- return false;
- }
- }
-
- return status;
+bool UserSnapshotServer::StartHandler(const std::string& misc_name) {
+ return handlers_->StartHandler(misc_name);
}
} // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
index 12c3903..ae51903 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
@@ -31,6 +31,7 @@
#include <vector>
#include <android-base/unique_fd.h>
+#include "handler_manager.h"
#include "snapuserd_core.h"
namespace android {
@@ -54,33 +55,6 @@
INVALID,
};
-class HandlerThread {
- public:
- explicit HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd);
-
- void FreeResources() {
- // Each worker thread holds a reference to snapuserd.
- // Clear them so that all the resources
- // held by snapuserd is released
- if (snapuserd_) {
- snapuserd_->FreeResources();
- snapuserd_ = nullptr;
- }
- }
- const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
- std::thread& thread() { return thread_; }
-
- const std::string& misc_name() const { return misc_name_; }
- bool ThreadTerminated() { return thread_terminated_; }
- void SetThreadTerminated() { thread_terminated_ = true; }
-
- private:
- std::thread thread_;
- std::shared_ptr<SnapshotHandler> snapuserd_;
- std::string misc_name_;
- bool thread_terminated_ = false;
-};
-
class UserSnapshotServer {
private:
android::base::unique_fd sockfd_;
@@ -88,21 +62,12 @@
volatile bool received_socket_signal_ = false;
std::vector<struct pollfd> watched_fds_;
bool is_socket_present_ = false;
- int num_partitions_merge_complete_ = 0;
- int active_merge_threads_ = 0;
- bool stop_monitor_merge_thread_ = false;
bool is_server_running_ = false;
bool io_uring_enabled_ = false;
- std::optional<bool> is_merge_monitor_started_;
-
- android::base::unique_fd monitor_merge_event_fd_;
+ std::unique_ptr<ISnapshotHandlerManager> handlers_;
std::mutex lock_;
- using HandlerList = std::vector<std::shared_ptr<HandlerThread>>;
- HandlerList dm_users_;
- std::queue<std::shared_ptr<HandlerThread>> merge_handlers_;
-
void AddWatchedFd(android::base::borrowed_fd fd, int events);
void AcceptClient();
bool HandleClient(android::base::borrowed_fd fd, int revents);
@@ -111,28 +76,15 @@
bool Receivemsg(android::base::borrowed_fd fd, const std::string& str);
void ShutdownThreads();
- bool RemoveAndJoinHandler(const std::string& control_device);
DaemonOps Resolveop(std::string& input);
std::string GetDaemonStatus();
void Parsemsg(std::string const& msg, const char delim, std::vector<std::string>& out);
bool IsTerminating() { return terminating_; }
- void RunThread(std::shared_ptr<HandlerThread> handler);
- void MonitorMerge();
-
void JoinAllThreads();
bool StartWithSocket(bool start_listening);
- // Find a HandlerThread within a lock.
- HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
- const std::string& misc_name);
-
- double GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock);
- void TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock);
-
- bool UpdateVerification(std::lock_guard<std::mutex>* proof_of_lock);
-
public:
UserSnapshotServer();
~UserSnapshotServer();
@@ -147,12 +99,8 @@
const std::string& cow_device_path,
const std::string& backing_device,
const std::string& base_path_merge);
- bool StartHandler(const std::shared_ptr<HandlerThread>& handler);
- bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
- const std::shared_ptr<HandlerThread>& handler);
- std::string GetMergeStatus(const std::shared_ptr<HandlerThread>& handler);
+ bool StartHandler(const std::string& misc_name);
- void WakeupMonitorMergeThread();
void SetTerminating() { terminating_ = true; }
void ReceivedSocketSignal() { received_socket_signal_ = true; }
void SetServerRunning() { is_server_running_ = true; }