Merge "Revert "Delete KM1""
diff --git a/fs_mgr/fs_mgr.cpp b/fs_mgr/fs_mgr.cpp
index 002b302..07e1e6b 100644
--- a/fs_mgr/fs_mgr.cpp
+++ b/fs_mgr/fs_mgr.cpp
@@ -2119,6 +2119,9 @@
PERROR << "Cannot open " << loop_device;
return false;
}
+ if (!LoopControl::SetAutoClearStatus(loop_fd.get())) {
+ PERROR << "Failed set LO_FLAGS_AUTOCLEAR for " << loop_device;
+ }
if (!LoopControl::EnableDirectIo(loop_fd.get())) {
return false;
}
diff --git a/fs_mgr/libdm/include/libdm/loop_control.h b/fs_mgr/libdm/include/libdm/loop_control.h
index ad53c11..f519054 100644
--- a/fs_mgr/libdm/include/libdm/loop_control.h
+++ b/fs_mgr/libdm/include/libdm/loop_control.h
@@ -46,6 +46,9 @@
// Enable Direct I/O on a loop device. This requires kernel 4.9+.
static bool EnableDirectIo(int fd);
+ // Set LO_FLAGS_AUTOCLEAR on a loop device.
+ static bool SetAutoClearStatus(int fd);
+
LoopControl(const LoopControl&) = delete;
LoopControl& operator=(const LoopControl&) = delete;
LoopControl& operator=(LoopControl&&) = default;
diff --git a/fs_mgr/libdm/loop_control.cpp b/fs_mgr/libdm/loop_control.cpp
index 2e40a18..32d5f38 100644
--- a/fs_mgr/libdm/loop_control.cpp
+++ b/fs_mgr/libdm/loop_control.cpp
@@ -133,6 +133,16 @@
return true;
}
+bool LoopControl::SetAutoClearStatus(int fd) {
+ struct loop_info64 info = {};
+
+ info.lo_flags |= LO_FLAGS_AUTOCLEAR;
+ if (ioctl(fd, LOOP_SET_STATUS64, &info)) {
+ return false;
+ }
+ return true;
+}
+
LoopDevice::LoopDevice(android::base::borrowed_fd fd, const std::chrono::milliseconds& timeout_ms,
bool auto_close)
: fd_(fd), owned_fd_(-1) {
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
index 143f73c..63a9e68 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
@@ -145,6 +145,8 @@
// Creates a clone of the current CowReader without the file handlers
std::unique_ptr<CowReader> CloneCowReader();
+ void UpdateMergeOpsCompleted(int num_merge_ops) { header_.num_merge_ops += num_merge_ops; }
+
private:
bool ParseOps(std::optional<uint64_t> label);
bool PrepMergeOps();
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h b/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
index 55f4ed7..a49b026 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
@@ -408,6 +408,7 @@
FRIEND_TEST(SnapshotUpdateTest, FullUpdateFlow);
FRIEND_TEST(SnapshotUpdateTest, MergeCannotRemoveCow);
FRIEND_TEST(SnapshotUpdateTest, MergeInRecovery);
+ FRIEND_TEST(SnapshotUpdateTest, QueryStatusError);
FRIEND_TEST(SnapshotUpdateTest, SnapshotStatusFileWithoutCow);
FRIEND_TEST(SnapshotUpdateTest, SpaceSwapUpdate);
friend class SnapshotTest;
diff --git a/fs_mgr/libsnapshot/include_test/libsnapshot/test_helpers.h b/fs_mgr/libsnapshot/include_test/libsnapshot/test_helpers.h
index 1f57bbc..c3b40dc 100644
--- a/fs_mgr/libsnapshot/include_test/libsnapshot/test_helpers.h
+++ b/fs_mgr/libsnapshot/include_test/libsnapshot/test_helpers.h
@@ -100,6 +100,9 @@
return IDeviceInfo::OpenImageManager("ota/test");
}
android::dm::IDeviceMapper& GetDeviceMapper() override {
+ if (dm_) {
+ return *dm_;
+ }
return android::dm::DeviceMapper::Instance();
}
@@ -111,6 +114,8 @@
}
void set_recovery(bool value) { recovery_ = value; }
void set_first_stage_init(bool value) { first_stage_init_ = value; }
+ void set_dm(android::dm::IDeviceMapper* dm) { dm_ = dm; }
+
MergeStatus merge_status() const { return merge_status_; }
private:
@@ -120,6 +125,45 @@
bool recovery_ = false;
bool first_stage_init_ = false;
std::unordered_set<uint32_t> unbootable_slots_;
+ android::dm::IDeviceMapper* dm_ = nullptr;
+};
+
+class DeviceMapperWrapper : public android::dm::IDeviceMapper {
+ using DmDeviceState = android::dm::DmDeviceState;
+ using DmTable = android::dm::DmTable;
+
+ public:
+ DeviceMapperWrapper() : impl_(android::dm::DeviceMapper::Instance()) {}
+ explicit DeviceMapperWrapper(android::dm::IDeviceMapper& impl) : impl_(impl) {}
+
+ virtual bool CreateDevice(const std::string& name, const DmTable& table, std::string* path,
+ const std::chrono::milliseconds& timeout_ms) override {
+ return impl_.CreateDevice(name, table, path, timeout_ms);
+ }
+ virtual DmDeviceState GetState(const std::string& name) const override {
+ return impl_.GetState(name);
+ }
+ virtual bool LoadTableAndActivate(const std::string& name, const DmTable& table) {
+ return impl_.LoadTableAndActivate(name, table);
+ }
+ virtual bool GetTableInfo(const std::string& name, std::vector<TargetInfo>* table) {
+ return impl_.GetTableInfo(name, table);
+ }
+ virtual bool GetTableStatus(const std::string& name, std::vector<TargetInfo>* table) {
+ return impl_.GetTableStatus(name, table);
+ }
+ virtual bool GetDmDevicePathByName(const std::string& name, std::string* path) {
+ return impl_.GetDmDevicePathByName(name, path);
+ }
+ virtual bool GetDeviceString(const std::string& name, std::string* dev) {
+ return impl_.GetDeviceString(name, dev);
+ }
+ virtual bool DeleteDeviceIfExists(const std::string& name) {
+ return impl_.DeleteDeviceIfExists(name);
+ }
+
+ private:
+ android::dm::IDeviceMapper& impl_;
};
class SnapshotTestPropertyFetcher : public android::fs_mgr::testing::MockPropertyFetcher {
diff --git a/fs_mgr/libsnapshot/snapshot_test.cpp b/fs_mgr/libsnapshot/snapshot_test.cpp
index 43c7fe2..d78ba0a 100644
--- a/fs_mgr/libsnapshot/snapshot_test.cpp
+++ b/fs_mgr/libsnapshot/snapshot_test.cpp
@@ -56,6 +56,7 @@
using android::base::unique_fd;
using android::dm::DeviceMapper;
using android::dm::DmDeviceState;
+using android::dm::IDeviceMapper;
using android::fiemap::FiemapStatus;
using android::fiemap::IImageManager;
using android::fs_mgr::BlockDeviceInfo;
@@ -911,6 +912,11 @@
ASSERT_TRUE(hash.has_value());
hashes_[name] = *hash;
}
+
+ // OTA client blindly unmaps all partitions that are possibly mapped.
+ for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
+ ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
+ }
}
void TearDown() override {
RETURN_IF_NON_VIRTUAL_AB();
@@ -925,6 +931,14 @@
MountMetadata();
for (const auto& suffix : {"_a", "_b"}) {
test_device->set_slot_suffix(suffix);
+
+ // Cheat our way out of merge failed states.
+ if (sm->ProcessUpdateState() == UpdateState::MergeFailed) {
+ ASSERT_TRUE(AcquireLock());
+ ASSERT_TRUE(sm->WriteUpdateState(lock_.get(), UpdateState::None));
+ lock_ = {};
+ }
+
EXPECT_TRUE(sm->CancelUpdate()) << suffix;
}
EXPECT_TRUE(UnmapAll());
@@ -1097,11 +1111,6 @@
// Also test UnmapUpdateSnapshot unmaps everything.
// Also test first stage mount and merge after this.
TEST_F(SnapshotUpdateTest, FullUpdateFlow) {
- // OTA client blindly unmaps all partitions that are possibly mapped.
- for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
- ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
- }
-
// Grow all partitions. Set |prd| large enough that |sys| and |vnd|'s COWs
// fit in super, but not |prd|.
constexpr uint64_t partition_size = 3788_KiB;
@@ -1189,11 +1198,6 @@
GTEST_SKIP() << "Compression-only test";
}
- // OTA client blindly unmaps all partitions that are possibly mapped.
- for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
- ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
- }
-
// Execute the update.
ASSERT_TRUE(sm->BeginUpdate());
ASSERT_TRUE(sm->CreateUpdateSnapshots(manifest_));
@@ -1239,11 +1243,6 @@
GTEST_SKIP() << "Skipping Virtual A/B Compression test";
}
- // OTA client blindly unmaps all partitions that are possibly mapped.
- for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
- ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
- }
-
auto old_sys_size = GetSize(sys_);
auto old_prd_size = GetSize(prd_);
@@ -1630,11 +1629,6 @@
ASSERT_NE(nullptr, metadata);
ASSERT_TRUE(UpdatePartitionTable(*opener_, "super", *metadata.get(), 0));
- // OTA client blindly unmaps all partitions that are possibly mapped.
- for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
- ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
- }
-
// Add operations for sys. The whole device is written.
AddOperation(sys_);
@@ -2074,11 +2068,6 @@
}
TEST_F(SnapshotUpdateTest, AddPartition) {
- // OTA client blindly unmaps all partitions that are possibly mapped.
- for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
- ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
- }
-
group_->add_partition_names("dlkm");
auto dlkm = manifest_.add_partitions();
@@ -2249,6 +2238,60 @@
ASSERT_TRUE(sm->BeginUpdate());
}
+TEST_F(SnapshotUpdateTest, QueryStatusError) {
+ // Grow all partitions. Set |prd| large enough that |sys| and |vnd|'s COWs
+ // fit in super, but not |prd|.
+ constexpr uint64_t partition_size = 3788_KiB;
+ SetSize(sys_, partition_size);
+
+ AddOperationForPartitions({sys_});
+
+ // Execute the update.
+ ASSERT_TRUE(sm->BeginUpdate());
+ ASSERT_TRUE(sm->CreateUpdateSnapshots(manifest_));
+ ASSERT_TRUE(WriteSnapshotAndHash("sys_b"));
+ ASSERT_TRUE(sm->FinishedSnapshotWrites(false));
+ ASSERT_TRUE(UnmapAll());
+
+ class DmStatusFailure final : public DeviceMapperWrapper {
+ public:
+ bool GetTableStatus(const std::string& name, std::vector<TargetInfo>* table) override {
+ if (!DeviceMapperWrapper::GetTableStatus(name, table)) {
+ return false;
+ }
+ if (name == "sys_b" && !table->empty()) {
+ auto& info = table->at(0);
+ if (DeviceMapper::GetTargetType(info.spec) == "snapshot-merge") {
+ info.data = "Merge failed";
+ }
+ }
+ return true;
+ }
+ };
+ DmStatusFailure wrapper;
+
+ // After reboot, init does first stage mount.
+ auto info = new TestDeviceInfo(fake_super, "_b");
+ info->set_dm(&wrapper);
+
+ auto init = NewManagerForFirstStageMount(info);
+ ASSERT_NE(init, nullptr);
+
+ ASSERT_TRUE(init->NeedSnapshotsInFirstStageMount());
+ ASSERT_TRUE(init->CreateLogicalAndSnapshotPartitions("super", snapshot_timeout_));
+
+ // Initiate the merge and wait for it to be completed.
+ ASSERT_TRUE(init->InitiateMerge());
+ ASSERT_EQ(UpdateState::MergeFailed, init->ProcessUpdateState());
+
+ // Simulate a reboot that tries the merge again, with the non-failing dm.
+ ASSERT_TRUE(UnmapAll());
+ init = NewManagerForFirstStageMount("_b");
+ ASSERT_NE(init, nullptr);
+ ASSERT_TRUE(init->CreateLogicalAndSnapshotPartitions("super", snapshot_timeout_));
+ ASSERT_EQ(UpdateState::MergeCompleted, init->ProcessUpdateState());
+}
+
class FlashAfterUpdateTest : public SnapshotUpdateTest,
public WithParamInterface<std::tuple<uint32_t, bool>> {
public:
@@ -2265,11 +2308,6 @@
};
TEST_P(FlashAfterUpdateTest, FlashSlotAfterUpdate) {
- // OTA client blindly unmaps all partitions that are possibly mapped.
- for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
- ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
- }
-
// Execute the update.
ASSERT_TRUE(sm->BeginUpdate());
ASSERT_TRUE(sm->CreateUpdateSnapshots(manifest_));
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index 6bd5323..837f33a 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -57,10 +57,16 @@
],
srcs: [
"snapuserd_server.cpp",
- "snapuserd.cpp",
+ "dm-snapshot-merge/snapuserd.cpp",
+ "dm-snapshot-merge/snapuserd_worker.cpp",
+ "dm-snapshot-merge/snapuserd_readahead.cpp",
"snapuserd_daemon.cpp",
- "snapuserd_worker.cpp",
- "snapuserd_readahead.cpp",
+ "snapuserd_buffer.cpp",
+ "user-space-merge/snapuserd_core.cpp",
+ "user-space-merge/snapuserd_dm_user.cpp",
+ "user-space-merge/snapuserd_merge.cpp",
+ "user-space-merge/snapuserd_readahead.cpp",
+ "user-space-merge/snapuserd_transitions.cpp",
],
cflags: [
@@ -101,9 +107,10 @@
"fs_mgr_defaults",
],
srcs: [
- "cow_snapuserd_test.cpp",
- "snapuserd.cpp",
- "snapuserd_worker.cpp",
+ "dm-snapshot-merge/cow_snapuserd_test.cpp",
+ "dm-snapshot-merge/snapuserd.cpp",
+ "dm-snapshot-merge/snapuserd_worker.cpp",
+ "snapuserd_buffer.cpp",
],
cflags: [
"-Wall",
diff --git a/fs_mgr/libsnapshot/snapuserd/cow_snapuserd_test.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/cow_snapuserd_test.cpp
similarity index 99%
rename from fs_mgr/libsnapshot/snapuserd/cow_snapuserd_test.cpp
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/cow_snapuserd_test.cpp
index bff0a50..b86a802 100644
--- a/fs_mgr/libsnapshot/snapuserd/cow_snapuserd_test.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/cow_snapuserd_test.cpp
@@ -33,6 +33,7 @@
#include <libdm/dm.h>
#include <libdm/loop_control.h>
#include <libsnapshot/cow_writer.h>
+#include <snapuserd/snapuserd_buffer.h>
#include <snapuserd/snapuserd_client.h>
#include <storage_literals/storage_literals.h>
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd.cpp
similarity index 100%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd.cpp
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd.cpp
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd.h b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd.h
similarity index 91%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd.h
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd.h
index 6388a83..47b9b22 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd.h
+++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd.h
@@ -42,6 +42,7 @@
#include <libdm/dm.h>
#include <libsnapshot/cow_reader.h>
#include <libsnapshot/cow_writer.h>
+#include <snapuserd/snapuserd_buffer.h>
#include <snapuserd/snapuserd_kernel.h>
namespace android {
@@ -89,39 +90,6 @@
READ_AHEAD_FAILURE,
};
-class BufferSink : public IByteSink {
- public:
- void Initialize(size_t size);
- void* GetBufPtr() { return buffer_.get(); }
- void Clear() { memset(GetBufPtr(), 0, buffer_size_); }
- void* GetPayloadBuffer(size_t size);
- void* GetBuffer(size_t requested, size_t* actual) override;
- void UpdateBufferOffset(size_t size) { buffer_offset_ += size; }
- struct dm_user_header* GetHeaderPtr();
- bool ReturnData(void*, size_t) override { return true; }
- void ResetBufferOffset() { buffer_offset_ = 0; }
- void* GetPayloadBufPtr();
-
- private:
- std::unique_ptr<uint8_t[]> buffer_;
- loff_t buffer_offset_;
- size_t buffer_size_;
-};
-
-class XorSink : public IByteSink {
- public:
- void Initialize(BufferSink* sink, size_t size);
- void Reset();
- void* GetBuffer(size_t requested, size_t* actual) override;
- bool ReturnData(void* buffer, size_t len) override;
-
- private:
- BufferSink* bufsink_;
- std::unique_ptr<uint8_t[]> buffer_;
- size_t buffer_size_;
- size_t returned_;
-};
-
class Snapuserd;
class ReadAheadThread {
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_readahead.cpp
similarity index 100%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd_readahead.cpp
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_readahead.cpp
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_worker.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_worker.cpp
similarity index 93%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd_worker.cpp
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_worker.cpp
index 5d184ad..0e9f0f1 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_worker.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_worker.cpp
@@ -32,78 +32,6 @@
#define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
#define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
-void BufferSink::Initialize(size_t size) {
- buffer_size_ = size;
- buffer_offset_ = 0;
- buffer_ = std::make_unique<uint8_t[]>(size);
-}
-
-void* BufferSink::GetPayloadBuffer(size_t size) {
- if ((buffer_size_ - buffer_offset_) < size) return nullptr;
-
- char* buffer = reinterpret_cast<char*>(GetBufPtr());
- struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
- return (char*)msg->payload.buf + buffer_offset_;
-}
-
-void* BufferSink::GetBuffer(size_t requested, size_t* actual) {
- void* buf = GetPayloadBuffer(requested);
- if (!buf) {
- *actual = 0;
- return nullptr;
- }
- *actual = requested;
- return buf;
-}
-
-struct dm_user_header* BufferSink::GetHeaderPtr() {
- if (!(sizeof(struct dm_user_header) <= buffer_size_)) {
- return nullptr;
- }
- char* buf = reinterpret_cast<char*>(GetBufPtr());
- struct dm_user_header* header = (struct dm_user_header*)(&(buf[0]));
- return header;
-}
-
-void* BufferSink::GetPayloadBufPtr() {
- char* buffer = reinterpret_cast<char*>(GetBufPtr());
- struct dm_user_message* msg = reinterpret_cast<struct dm_user_message*>(&(buffer[0]));
- return msg->payload.buf;
-}
-
-void XorSink::Initialize(BufferSink* sink, size_t size) {
- bufsink_ = sink;
- buffer_size_ = size;
- returned_ = 0;
- buffer_ = std::make_unique<uint8_t[]>(size);
-}
-
-void XorSink::Reset() {
- returned_ = 0;
-}
-
-void* XorSink::GetBuffer(size_t requested, size_t* actual) {
- if (requested > buffer_size_) {
- *actual = buffer_size_;
- } else {
- *actual = requested;
- }
- return buffer_.get();
-}
-
-bool XorSink::ReturnData(void* buffer, size_t len) {
- uint8_t* xor_data = reinterpret_cast<uint8_t*>(buffer);
- uint8_t* buff = reinterpret_cast<uint8_t*>(bufsink_->GetPayloadBuffer(len + returned_));
- if (buff == nullptr) {
- return false;
- }
- for (size_t i = 0; i < len; i++) {
- buff[returned_ + i] ^= xor_data[i];
- }
- returned_ += len;
- return true;
-}
-
WorkerThread::WorkerThread(const std::string& cow_device, const std::string& backing_device,
const std::string& control_device, const std::string& misc_name,
std::shared_ptr<Snapuserd> snapuserd) {
diff --git a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_buffer.h b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_buffer.h
new file mode 100644
index 0000000..2e4cac6
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_buffer.h
@@ -0,0 +1,62 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <linux/types.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include <iostream>
+
+#include <libsnapshot/cow_reader.h>
+
+namespace android {
+namespace snapshot {
+
+class BufferSink : public IByteSink {
+ public:
+ void Initialize(size_t size);
+ void* GetBufPtr() { return buffer_.get(); }
+ void Clear() { memset(GetBufPtr(), 0, buffer_size_); }
+ void* GetPayloadBuffer(size_t size);
+ void* GetBuffer(size_t requested, size_t* actual) override;
+ void UpdateBufferOffset(size_t size) { buffer_offset_ += size; }
+ struct dm_user_header* GetHeaderPtr();
+ bool ReturnData(void*, size_t) override { return true; }
+ void ResetBufferOffset() { buffer_offset_ = 0; }
+ void* GetPayloadBufPtr();
+
+ private:
+ std::unique_ptr<uint8_t[]> buffer_;
+ loff_t buffer_offset_;
+ size_t buffer_size_;
+};
+
+class XorSink : public IByteSink {
+ public:
+ void Initialize(BufferSink* sink, size_t size);
+ void Reset();
+ void* GetBuffer(size_t requested, size_t* actual) override;
+ bool ReturnData(void* buffer, size_t len) override;
+
+ private:
+ BufferSink* bufsink_;
+ std::unique_ptr<uint8_t[]> buffer_;
+ size_t buffer_size_;
+ size_t returned_;
+};
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_buffer.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_buffer.cpp
new file mode 100644
index 0000000..ab763ab
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_buffer.cpp
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <snapuserd/snapuserd_buffer.h>
+#include <snapuserd/snapuserd_kernel.h>
+
+namespace android {
+namespace snapshot {
+
+void BufferSink::Initialize(size_t size) {
+ buffer_size_ = size;
+ buffer_offset_ = 0;
+ buffer_ = std::make_unique<uint8_t[]>(size);
+}
+
+void* BufferSink::GetPayloadBuffer(size_t size) {
+ if ((buffer_size_ - buffer_offset_) < size) return nullptr;
+
+ char* buffer = reinterpret_cast<char*>(GetBufPtr());
+ struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
+ return (char*)msg->payload.buf + buffer_offset_;
+}
+
+void* BufferSink::GetBuffer(size_t requested, size_t* actual) {
+ void* buf = GetPayloadBuffer(requested);
+ if (!buf) {
+ *actual = 0;
+ return nullptr;
+ }
+ *actual = requested;
+ return buf;
+}
+
+struct dm_user_header* BufferSink::GetHeaderPtr() {
+ if (!(sizeof(struct dm_user_header) <= buffer_size_)) {
+ return nullptr;
+ }
+ char* buf = reinterpret_cast<char*>(GetBufPtr());
+ struct dm_user_header* header = (struct dm_user_header*)(&(buf[0]));
+ return header;
+}
+
+void* BufferSink::GetPayloadBufPtr() {
+ char* buffer = reinterpret_cast<char*>(GetBufPtr());
+ struct dm_user_message* msg = reinterpret_cast<struct dm_user_message*>(&(buffer[0]));
+ return msg->payload.buf;
+}
+
+void XorSink::Initialize(BufferSink* sink, size_t size) {
+ bufsink_ = sink;
+ buffer_size_ = size;
+ returned_ = 0;
+ buffer_ = std::make_unique<uint8_t[]>(size);
+}
+
+void XorSink::Reset() {
+ returned_ = 0;
+}
+
+void* XorSink::GetBuffer(size_t requested, size_t* actual) {
+ if (requested > buffer_size_) {
+ *actual = buffer_size_;
+ } else {
+ *actual = requested;
+ }
+ return buffer_.get();
+}
+
+bool XorSink::ReturnData(void* buffer, size_t len) {
+ uint8_t* xor_data = reinterpret_cast<uint8_t*>(buffer);
+ uint8_t* buff = reinterpret_cast<uint8_t*>(bufsink_->GetPayloadBuffer(len + returned_));
+ if (buff == nullptr) {
+ return false;
+ }
+ for (size_t i = 0; i < len; i++) {
+ buff[returned_ + i] ^= xor_data[i];
+ }
+ returned_ += len;
+ return true;
+}
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
index 2f87557..91b4190 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
@@ -31,7 +31,6 @@
#include <android-base/scopeguard.h>
#include <fs_mgr/file_wait.h>
#include <snapuserd/snapuserd_client.h>
-#include "snapuserd.h"
#include "snapuserd_server.h"
#define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
index 3b6ff15..14e5de6 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
@@ -28,7 +28,7 @@
#include <vector>
#include <android-base/unique_fd.h>
-#include "snapuserd.h"
+#include "dm-snapshot-merge/snapuserd.h"
namespace android {
namespace snapshot {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
new file mode 100644
index 0000000..a2538d2
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -0,0 +1,464 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "snapuserd_core.h"
+
+#include <android-base/strings.h>
+
+namespace android {
+namespace snapshot {
+
+using namespace android;
+using namespace android::dm;
+using android::base::unique_fd;
+
+SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,
+ std::string backing_device, std::string base_path_merge) {
+ misc_name_ = std::move(misc_name);
+ cow_device_ = std::move(cow_device);
+ backing_store_device_ = std::move(backing_device);
+ control_device_ = "/dev/dm-user/" + misc_name_;
+ base_path_merge_ = std::move(base_path_merge);
+}
+
+bool SnapshotHandler::InitializeWorkers() {
+ for (int i = 0; i < NUM_THREADS_PER_PARTITION; i++) {
+ std::unique_ptr<Worker> wt =
+ std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
+ misc_name_, base_path_merge_, GetSharedPtr());
+ if (!wt->Init()) {
+ SNAP_LOG(ERROR) << "Thread initialization failed";
+ return false;
+ }
+
+ worker_threads_.push_back(std::move(wt));
+ }
+
+ merge_thread_ = std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
+ misc_name_, base_path_merge_, GetSharedPtr());
+
+ read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
+ GetSharedPtr());
+ return true;
+}
+
+std::unique_ptr<CowReader> SnapshotHandler::CloneReaderForWorker() {
+ return reader_->CloneCowReader();
+}
+
+bool SnapshotHandler::CommitMerge(int num_merge_ops) {
+ struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
+ ch->num_merge_ops += num_merge_ops;
+
+ if (scratch_space_) {
+ if (ra_thread_) {
+ struct BufferState* ra_state = GetBufferState();
+ ra_state->read_ahead_state = kCowReadAheadInProgress;
+ }
+
+ int ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
+ if (ret < 0) {
+ SNAP_PLOG(ERROR) << "msync header failed: " << ret;
+ return false;
+ }
+ } else {
+ reader_->UpdateMergeOpsCompleted(num_merge_ops);
+ CowHeader header;
+ reader_->GetHeader(&header);
+
+ if (lseek(cow_fd_.get(), 0, SEEK_SET) < 0) {
+ SNAP_PLOG(ERROR) << "lseek failed";
+ return false;
+ }
+
+ if (!android::base::WriteFully(cow_fd_, &header, sizeof(CowHeader))) {
+ SNAP_PLOG(ERROR) << "Write to header failed";
+ return false;
+ }
+
+ if (fsync(cow_fd_.get()) < 0) {
+ SNAP_PLOG(ERROR) << "fsync failed";
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void SnapshotHandler::PrepareReadAhead() {
+ struct BufferState* ra_state = GetBufferState();
+ // Check if the data has to be re-constructed from COW device
+ if (ra_state->read_ahead_state == kCowReadAheadDone) {
+ populate_data_from_cow_ = true;
+ } else {
+ populate_data_from_cow_ = false;
+ }
+
+ NotifyRAForMergeReady();
+}
+
+void SnapshotHandler::CheckMergeCompletionStatus() {
+ if (!merge_initiated_) {
+ SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: "
+ << reader_->get_num_total_data_ops();
+ return;
+ }
+
+ struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
+
+ SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops
+ << " Total-data-ops: " << reader_->get_num_total_data_ops();
+}
+
+bool SnapshotHandler::ReadMetadata() {
+ reader_ = std::make_unique<CowReader>();
+ CowHeader header;
+ CowOptions options;
+
+ SNAP_LOG(DEBUG) << "ReadMetadata: Parsing cow file";
+
+ if (!reader_->Parse(cow_fd_)) {
+ SNAP_LOG(ERROR) << "Failed to parse";
+ return false;
+ }
+
+ if (!reader_->GetHeader(&header)) {
+ SNAP_LOG(ERROR) << "Failed to get header";
+ return false;
+ }
+
+ if (!(header.block_size == BLOCK_SZ)) {
+ SNAP_LOG(ERROR) << "Invalid header block size found: " << header.block_size;
+ return false;
+ }
+
+ SNAP_LOG(INFO) << "Merge-ops: " << header.num_merge_ops;
+
+ if (!MmapMetadata()) {
+ SNAP_LOG(ERROR) << "mmap failed";
+ return false;
+ }
+
+ // Initialize the iterator for reading metadata
+ std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
+
+ while (!cowop_iter->Done()) {
+ const CowOperation* cow_op = &cowop_iter->Get();
+
+ chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op));
+
+ if (!ra_thread_ && IsOrderedOp(*cow_op)) {
+ ra_thread_ = true;
+ }
+ cowop_iter->Next();
+ }
+
+ chunk_vec_.shrink_to_fit();
+
+ // Sort the vector based on sectors as we need this during un-aligned access
+ std::sort(chunk_vec_.begin(), chunk_vec_.end(), compare);
+
+ PrepareReadAhead();
+
+ return true;
+}
+
+bool SnapshotHandler::MmapMetadata() {
+ CowHeader header;
+ reader_->GetHeader(&header);
+
+ total_mapped_addr_length_ = header.header_size + BUFFER_REGION_DEFAULT_SIZE;
+
+ if (header.major_version >= 2 && header.buffer_size > 0) {
+ scratch_space_ = true;
+ }
+
+ if (scratch_space_) {
+ mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE, MAP_SHARED,
+ cow_fd_.get(), 0);
+ } else {
+ mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_ANONYMOUS, -1, 0);
+ struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
+ ch->num_merge_ops = header.num_merge_ops;
+ }
+
+ if (mapped_addr_ == MAP_FAILED) {
+ SNAP_LOG(ERROR) << "mmap metadata failed";
+ return false;
+ }
+
+ return true;
+}
+
+void SnapshotHandler::UnmapBufferRegion() {
+ int ret = munmap(mapped_addr_, total_mapped_addr_length_);
+ if (ret < 0) {
+ SNAP_PLOG(ERROR) << "munmap failed";
+ }
+}
+
+bool SnapshotHandler::InitCowDevice() {
+ cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
+ if (cow_fd_ < 0) {
+ SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
+ return false;
+ }
+
+ unique_fd fd(TEMP_FAILURE_RETRY(open(base_path_merge_.c_str(), O_RDONLY | O_CLOEXEC)));
+ if (fd < 0) {
+ SNAP_LOG(ERROR) << "Cannot open block device";
+ return false;
+ }
+
+ uint64_t dev_sz = get_block_device_size(fd.get());
+ if (!dev_sz) {
+ SNAP_LOG(ERROR) << "Failed to find block device size: " << base_path_merge_;
+ return false;
+ }
+
+ num_sectors_ = dev_sz >> SECTOR_SHIFT;
+
+ return ReadMetadata();
+}
+
+void SnapshotHandler::ReadBlocksToCache(const std::string& dm_block_device,
+ const std::string& partition_name, off_t offset,
+ size_t size) {
+ android::base::unique_fd fd(TEMP_FAILURE_RETRY(open(dm_block_device.c_str(), O_RDONLY)));
+ if (fd.get() == -1) {
+ SNAP_PLOG(ERROR) << "Error reading " << dm_block_device
+ << " partition-name: " << partition_name;
+ return;
+ }
+
+ size_t remain = size;
+ off_t file_offset = offset;
+ // We pick 4M I/O size based on the fact that the current
+ // update_verifier has a similar I/O size.
+ size_t read_sz = 1024 * BLOCK_SZ;
+ std::vector<uint8_t> buf(read_sz);
+
+ while (remain > 0) {
+ size_t to_read = std::min(remain, read_sz);
+
+ if (!android::base::ReadFullyAtOffset(fd.get(), buf.data(), to_read, file_offset)) {
+ SNAP_PLOG(ERROR) << "Failed to read block from block device: " << dm_block_device
+ << " at offset: " << file_offset
+ << " partition-name: " << partition_name << " total-size: " << size
+ << " remain_size: " << remain;
+ return;
+ }
+
+ file_offset += to_read;
+ remain -= to_read;
+ }
+
+ SNAP_LOG(INFO) << "Finished reading block-device: " << dm_block_device
+ << " partition: " << partition_name << " size: " << size
+ << " offset: " << offset;
+}
+
+void SnapshotHandler::ReadBlocks(const std::string partition_name,
+ const std::string& dm_block_device) {
+ SNAP_LOG(DEBUG) << "Reading partition: " << partition_name
+ << " Block-Device: " << dm_block_device;
+
+ uint64_t dev_sz = 0;
+
+ unique_fd fd(TEMP_FAILURE_RETRY(open(dm_block_device.c_str(), O_RDONLY | O_CLOEXEC)));
+ if (fd < 0) {
+ SNAP_LOG(ERROR) << "Cannot open block device";
+ return;
+ }
+
+ dev_sz = get_block_device_size(fd.get());
+ if (!dev_sz) {
+ SNAP_PLOG(ERROR) << "Could not determine block device size: " << dm_block_device;
+ return;
+ }
+
+ int num_threads = 2;
+ size_t num_blocks = dev_sz >> BLOCK_SHIFT;
+ size_t num_blocks_per_thread = num_blocks / num_threads;
+ size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT;
+ off_t offset = 0;
+
+ for (int i = 0; i < num_threads; i++) {
+ std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device,
+ partition_name, offset, read_sz_per_thread);
+
+ offset += read_sz_per_thread;
+ }
+}
+
+/*
+ * Entry point to launch threads
+ */
+bool SnapshotHandler::Start() {
+ std::vector<std::future<bool>> threads;
+ std::future<bool> ra_thread_status;
+
+ if (ra_thread_) {
+ ra_thread_status =
+ std::async(std::launch::async, &ReadAhead::RunThread, read_ahead_thread_.get());
+
+ SNAP_LOG(INFO) << "Read-ahead thread started...";
+ }
+
+ // Launch worker threads
+ for (int i = 0; i < worker_threads_.size(); i++) {
+ threads.emplace_back(
+ std::async(std::launch::async, &Worker::RunThread, worker_threads_[i].get()));
+ }
+
+ bool second_stage_init = true;
+
+ // We don't want to read the blocks during first stage init.
+ if (android::base::EndsWith(misc_name_, "-init") || is_socket_present_) {
+ second_stage_init = false;
+ }
+
+ if (second_stage_init) {
+ SNAP_LOG(INFO) << "Reading blocks to cache....";
+ auto& dm = DeviceMapper::Instance();
+ auto dm_block_devices = dm.FindDmPartitions();
+ if (dm_block_devices.empty()) {
+ SNAP_LOG(ERROR) << "No dm-enabled block device is found.";
+ } else {
+ auto parts = android::base::Split(misc_name_, "-");
+ std::string partition_name = parts[0];
+
+ const char* suffix_b = "_b";
+ const char* suffix_a = "_a";
+
+ partition_name.erase(partition_name.find_last_not_of(suffix_b) + 1);
+ partition_name.erase(partition_name.find_last_not_of(suffix_a) + 1);
+
+ if (dm_block_devices.find(partition_name) == dm_block_devices.end()) {
+ SNAP_LOG(ERROR) << "Failed to find dm block device for " << partition_name;
+ } else {
+ ReadBlocks(partition_name, dm_block_devices.at(partition_name));
+ }
+ }
+ } else {
+ SNAP_LOG(INFO) << "Not reading block device into cache";
+ }
+
+ std::future<bool> merge_thread =
+ std::async(std::launch::async, &Worker::RunMergeThread, merge_thread_.get());
+
+ bool ret = true;
+ for (auto& t : threads) {
+ ret = t.get() && ret;
+ }
+
+ // Worker threads are terminated by this point - this can only happen:
+ //
+ // 1: If dm-user device is destroyed
+ // 2: We had an I/O failure when reading root partitions
+ //
+ // In case (1), this would be a graceful shutdown. In this case, merge
+ // thread and RA thread should have already terminated by this point. We will be
+ // destroying the dm-user device only _after_ merge is completed.
+ //
+ // In case (2), if merge thread had started, then it will be
+ // continuing to merge; however, since we had an I/O failure and the
+ // I/O on root partitions are no longer served, we will terminate the
+ // merge
+
+ NotifyIOTerminated();
+
+ bool read_ahead_retval = false;
+
+ SNAP_LOG(INFO) << "Snapshot I/O terminated. Waiting for merge thread....";
+ bool merge_thread_status = merge_thread.get();
+
+ if (ra_thread_) {
+ read_ahead_retval = ra_thread_status.get();
+ }
+
+ SNAP_LOG(INFO) << "Worker threads terminated with ret: " << ret
+ << " Merge-thread with ret: " << merge_thread_status
+ << " RA-thread with ret: " << read_ahead_retval;
+ return ret;
+}
+
+uint64_t SnapshotHandler::GetBufferMetadataOffset() {
+ CowHeader header;
+ reader_->GetHeader(&header);
+
+ return (header.header_size + sizeof(BufferState));
+}
+
+/*
+ * Metadata for read-ahead is 16 bytes. For a 2 MB region, we will
+ * end up with 8k (2 PAGE) worth of metadata. Thus, a 2MB buffer
+ * region is split into:
+ *
+ * 1: 8k metadata
+ * 2: Scratch space
+ *
+ */
+size_t SnapshotHandler::GetBufferMetadataSize() {
+ CowHeader header;
+ reader_->GetHeader(&header);
+ size_t buffer_size = header.buffer_size;
+
+ // If there is no scratch space, then just use the
+ // anonymous memory
+ if (buffer_size == 0) {
+ buffer_size = BUFFER_REGION_DEFAULT_SIZE;
+ }
+
+ return ((buffer_size * sizeof(struct ScratchMetadata)) / BLOCK_SZ);
+}
+
+size_t SnapshotHandler::GetBufferDataOffset() {
+ CowHeader header;
+ reader_->GetHeader(&header);
+
+ return (header.header_size + GetBufferMetadataSize());
+}
+
+/*
+ * (2MB - 8K = 2088960 bytes) will be the buffer region to hold the data.
+ */
+size_t SnapshotHandler::GetBufferDataSize() {
+ CowHeader header;
+ reader_->GetHeader(&header);
+ size_t buffer_size = header.buffer_size;
+
+ // If there is no scratch space, then just use the
+ // anonymous memory
+ if (buffer_size == 0) {
+ buffer_size = BUFFER_REGION_DEFAULT_SIZE;
+ }
+
+ return (buffer_size - GetBufferMetadataSize());
+}
+
+struct BufferState* SnapshotHandler::GetBufferState() {
+ CowHeader header;
+ reader_->GetHeader(&header);
+
+ struct BufferState* ra_state =
+ reinterpret_cast<struct BufferState*>((char*)mapped_addr_ + header.header_size);
+ return ra_state;
+}
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
new file mode 100644
index 0000000..c171eda
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -0,0 +1,296 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <linux/types.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <sys/mman.h>
+
+#include <condition_variable>
+#include <cstring>
+#include <future>
+#include <iostream>
+#include <limits>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include <android-base/file.h>
+#include <android-base/logging.h>
+#include <android-base/stringprintf.h>
+#include <android-base/unique_fd.h>
+#include <ext4_utils/ext4_utils.h>
+#include <libdm/dm.h>
+#include <libsnapshot/cow_reader.h>
+#include <libsnapshot/cow_writer.h>
+#include <snapuserd/snapuserd_buffer.h>
+#include <snapuserd/snapuserd_kernel.h>
+
+namespace android {
+namespace snapshot {
+
+using android::base::unique_fd;
+using namespace std::chrono_literals;
+
+static constexpr size_t PAYLOAD_SIZE = (1UL << 20);
+static_assert(PAYLOAD_SIZE >= BLOCK_SZ);
+
+static constexpr int NUM_THREADS_PER_PARTITION = 1;
+
+#define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
+#define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
+
+enum class MERGE_IO_TRANSITION {
+ MERGE_READY,
+ MERGE_BEGIN,
+ MERGE_FAILED,
+ MERGE_COMPLETE,
+ IO_TERMINATED,
+ READ_AHEAD_FAILURE,
+};
+
+class SnapshotHandler;
+
+class ReadAhead {
+ public:
+ ReadAhead(const std::string& cow_device, const std::string& backing_device,
+ const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd);
+ bool RunThread();
+
+ private:
+ void InitializeRAIter();
+ bool RAIterDone();
+ void RAIterNext();
+ const CowOperation* GetRAOpIter();
+
+ void InitializeBuffer();
+ bool InitReader();
+ bool InitializeFds();
+
+ void CloseFds() { backing_store_fd_ = {}; }
+
+ bool ReadAheadIOStart();
+ int PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops,
+ std::vector<uint64_t>& blocks,
+ std::vector<const CowOperation*>& xor_op_vec);
+ bool ReconstructDataFromCow();
+ void CheckOverlap(const CowOperation* cow_op);
+
+ void* read_ahead_buffer_;
+ void* metadata_buffer_;
+
+ std::unique_ptr<ICowOpIter> cowop_iter_;
+
+ std::string cow_device_;
+ std::string backing_store_device_;
+ std::string misc_name_;
+
+ unique_fd cow_fd_;
+ unique_fd backing_store_fd_;
+
+ std::shared_ptr<SnapshotHandler> snapuserd_;
+ std::unique_ptr<CowReader> reader_;
+
+ std::unordered_set<uint64_t> dest_blocks_;
+ std::unordered_set<uint64_t> source_blocks_;
+ bool overlap_;
+ BufferSink bufsink_;
+};
+
+class Worker {
+ public:
+ Worker(const std::string& cow_device, const std::string& backing_device,
+ const std::string& control_device, const std::string& misc_name,
+ const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
+ bool RunThread();
+ bool RunMergeThread();
+ bool Init();
+
+ private:
+ // Initialization
+ void InitializeBufsink();
+ bool InitializeFds();
+ bool InitReader();
+ void CloseFds() {
+ ctrl_fd_ = {};
+ backing_store_fd_ = {};
+ base_path_merge_fd_ = {};
+ }
+
+ // IO Path
+ bool ProcessIORequest();
+
+ // Processing COW operations
+ bool ProcessReplaceOp(const CowOperation* cow_op);
+ bool ProcessZeroOp();
+
+ // Handles Copy and Xor
+ bool ProcessCopyOp(const CowOperation* cow_op);
+ bool ProcessXorOp(const CowOperation* cow_op);
+
+ // Merge related ops
+ bool Merge();
+ bool MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
+ bool MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
+ int PrepareMerge(uint64_t* source_offset, int* pending_ops,
+ const std::unique_ptr<ICowOpIter>& cowop_iter,
+ std::vector<const CowOperation*>* replace_zero_vec = nullptr);
+
+ std::unique_ptr<CowReader> reader_;
+ BufferSink bufsink_;
+ XorSink xorsink_;
+
+ std::string cow_device_;
+ std::string backing_store_device_;
+ std::string control_device_;
+ std::string misc_name_;
+ std::string base_path_merge_;
+
+ unique_fd cow_fd_;
+ unique_fd backing_store_fd_;
+ unique_fd base_path_merge_fd_;
+ unique_fd ctrl_fd_;
+
+ std::shared_ptr<SnapshotHandler> snapuserd_;
+};
+
+class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
+ public:
+ SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device,
+ std::string base_path_merge);
+ bool InitCowDevice();
+ bool Start();
+
+ const std::string& GetControlDevicePath() { return control_device_; }
+ const std::string& GetMiscName() { return misc_name_; }
+ const uint64_t& GetNumSectors() { return num_sectors_; }
+ const bool& IsAttached() const { return attached_; }
+ void AttachControlDevice() { attached_ = true; }
+
+ void CheckMergeCompletionStatus();
+ bool CommitMerge(int num_merge_ops);
+
+ void CloseFds() { cow_fd_ = {}; }
+ void FreeResources() {
+ worker_threads_.clear();
+ read_ahead_thread_ = nullptr;
+ merge_thread_ = nullptr;
+ }
+
+ bool InitializeWorkers();
+ std::unique_ptr<CowReader> CloneReaderForWorker();
+ std::shared_ptr<SnapshotHandler> GetSharedPtr() { return shared_from_this(); }
+
+ std::vector<std::pair<sector_t, const CowOperation*>>& GetChunkVec() { return chunk_vec_; }
+
+ static bool compare(std::pair<sector_t, const CowOperation*> p1,
+ std::pair<sector_t, const CowOperation*> p2) {
+ return p1.first < p2.first;
+ }
+
+ void UnmapBufferRegion();
+ bool MmapMetadata();
+
+ // Read-ahead related functions
+ void* GetMappedAddr() { return mapped_addr_; }
+ void PrepareReadAhead();
+
+ // State transitions for merge
+ void InitiateMerge();
+ void WaitForMergeComplete();
+ bool WaitForMergeBegin();
+ void NotifyRAForMergeReady();
+ bool WaitForMergeReady();
+ void MergeFailed();
+ bool IsIOTerminated();
+ void MergeCompleted();
+ void NotifyIOTerminated();
+ bool ReadAheadIOCompleted(bool sync);
+ void ReadAheadIOFailed();
+
+ bool ShouldReconstructDataFromCow() { return populate_data_from_cow_; }
+ void FinishReconstructDataFromCow() { populate_data_from_cow_ = false; }
+
+ // RA related functions
+ uint64_t GetBufferMetadataOffset();
+ size_t GetBufferMetadataSize();
+ size_t GetBufferDataOffset();
+ size_t GetBufferDataSize();
+
+ // Total number of blocks to be merged in a given read-ahead buffer region
+ void SetMergedBlockCountForNextCommit(int x) { total_ra_blocks_merged_ = x; }
+ int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; }
+ void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
+ bool MergeInitiated() { return merge_initiated_; }
+
+ private:
+ bool ReadMetadata();
+ sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
+ chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
+ struct BufferState* GetBufferState();
+
+ void ReadBlocks(const std::string partition_name, const std::string& dm_block_device);
+ void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name,
+ off_t offset, size_t size);
+
+ // COW device
+ std::string cow_device_;
+ // Source device
+ std::string backing_store_device_;
+ // dm-user control device
+ std::string control_device_;
+ std::string misc_name_;
+ // Base device for merging
+ std::string base_path_merge_;
+
+ unique_fd cow_fd_;
+
+ // Number of sectors required when initializing dm-user
+ uint64_t num_sectors_;
+
+ std::unique_ptr<CowReader> reader_;
+
+ // chunk_vec stores the pseudo mapping of sector
+ // to COW operations.
+ std::vector<std::pair<sector_t, const CowOperation*>> chunk_vec_;
+
+ std::mutex lock_;
+ std::condition_variable cv;
+
+ void* mapped_addr_;
+ size_t total_mapped_addr_length_;
+
+ std::vector<std::unique_ptr<Worker>> worker_threads_;
+ // Read-ahead related
+ bool populate_data_from_cow_ = false;
+ bool ra_thread_ = false;
+ int total_ra_blocks_merged_ = 0;
+ MERGE_IO_TRANSITION io_state_;
+ std::unique_ptr<ReadAhead> read_ahead_thread_;
+
+ std::unique_ptr<Worker> merge_thread_;
+
+ bool merge_initiated_ = false;
+ bool attached_ = false;
+ bool is_socket_present_;
+ bool scratch_space_ = false;
+};
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
new file mode 100644
index 0000000..18c7f2c
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
@@ -0,0 +1,152 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "snapuserd_core.h"
+
+namespace android {
+namespace snapshot {
+
+using namespace android;
+using namespace android::dm;
+using android::base::unique_fd;
+
+Worker::Worker(const std::string& cow_device, const std::string& backing_device,
+ const std::string& control_device, const std::string& misc_name,
+ const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd) {
+ cow_device_ = cow_device;
+ backing_store_device_ = backing_device;
+ control_device_ = control_device;
+ misc_name_ = misc_name;
+ base_path_merge_ = base_path_merge;
+ snapuserd_ = snapuserd;
+}
+
+bool Worker::InitializeFds() {
+ backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
+ if (backing_store_fd_ < 0) {
+ SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
+ return false;
+ }
+
+ cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
+ if (cow_fd_ < 0) {
+ SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
+ return false;
+ }
+
+ ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
+ if (ctrl_fd_ < 0) {
+ SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
+ return false;
+ }
+
+ // Base device used by merge thread
+ base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR));
+ if (base_path_merge_fd_ < 0) {
+ SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_;
+ return false;
+ }
+
+ return true;
+}
+
+bool Worker::InitReader() {
+ reader_ = snapuserd_->CloneReaderForWorker();
+
+ if (!reader_->InitForMerge(std::move(cow_fd_))) {
+ return false;
+ }
+ return true;
+}
+
+// Start the replace operation. This will read the
+// internal COW format and if the block is compressed,
+// it will be de-compressed.
+bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
+ if (!reader_->ReadData(*cow_op, &bufsink_)) {
+ SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
+ return false;
+ }
+
+ return true;
+}
+
+bool Worker::ProcessZeroOp() {
+ // Zero out the entire block
+ void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+ if (buffer == nullptr) {
+ SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
+ return false;
+ }
+
+ memset(buffer, 0, BLOCK_SZ);
+ return true;
+}
+
+bool Worker::ProcessCopyOp(const CowOperation*) {
+ return true;
+}
+
+bool Worker::ProcessXorOp(const CowOperation*) {
+ return true;
+}
+
+void Worker::InitializeBufsink() {
+ // Allocate the buffer which is used to communicate between
+ // daemon and dm-user. The buffer comprises of header and a fixed payload.
+ // If the dm-user requests a big IO, the IO will be broken into chunks
+ // of PAYLOAD_SIZE.
+ size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_SIZE;
+ bufsink_.Initialize(buf_size);
+}
+
+bool Worker::Init() {
+ InitializeBufsink();
+ xorsink_.Initialize(&bufsink_, BLOCK_SZ);
+
+ if (!InitializeFds()) {
+ return false;
+ }
+
+ if (!InitReader()) {
+ return false;
+ }
+
+ return true;
+}
+
+bool Worker::RunThread() {
+ SNAP_LOG(DEBUG) << "Processing snapshot I/O requests...";
+ // Start serving IO
+ while (true) {
+ if (!ProcessIORequest()) {
+ break;
+ }
+ }
+
+ CloseFds();
+ reader_->CloseCowFd();
+
+ return true;
+}
+
+bool Worker::ProcessIORequest() {
+ // No communication with dm-user yet
+ return true;
+}
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
new file mode 100644
index 0000000..696ede7
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
@@ -0,0 +1,300 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "snapuserd_core.h"
+
+namespace android {
+namespace snapshot {
+
+using namespace android;
+using namespace android::dm;
+using android::base::unique_fd;
+
+int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
+ const std::unique_ptr<ICowOpIter>& cowop_iter,
+ std::vector<const CowOperation*>* replace_zero_vec) {
+ int num_ops = *pending_ops;
+ int nr_consecutive = 0;
+ bool checkOrderedOp = (replace_zero_vec == nullptr);
+
+ do {
+ if (!cowop_iter->Done() && num_ops) {
+ const CowOperation* cow_op = &cowop_iter->Get();
+ if (checkOrderedOp && !IsOrderedOp(*cow_op)) {
+ break;
+ }
+
+ *source_offset = cow_op->new_block * BLOCK_SZ;
+ if (!checkOrderedOp) {
+ replace_zero_vec->push_back(cow_op);
+ }
+
+ cowop_iter->Next();
+ num_ops -= 1;
+ nr_consecutive = 1;
+
+ while (!cowop_iter->Done() && num_ops) {
+ const CowOperation* op = &cowop_iter->Get();
+ if (checkOrderedOp && !IsOrderedOp(*op)) {
+ break;
+ }
+
+ // Check for consecutive blocks
+ uint64_t next_offset = op->new_block * BLOCK_SZ;
+ if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
+ break;
+ }
+
+ if (!checkOrderedOp) {
+ replace_zero_vec->push_back(op);
+ }
+
+ nr_consecutive += 1;
+ num_ops -= 1;
+ cowop_iter->Next();
+ }
+ }
+ } while (0);
+
+ return nr_consecutive;
+}
+
+bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
+ // Flush every 2048 ops. Since all ops are independent and there is no
+ // dependency between COW ops, we will flush the data and the number
+ // of ops merged in COW file for every 2048 ops. If there is a crash,
+ // we will end up replaying some of the COW ops which were already merged.
+ // That is ok.
+ //
+ // Why 2048 ops ? We can probably increase this to bigger value but just
+ // need to ensure that merge makes forward progress if there are
+ // crashes repeatedly which is highly unlikely.
+ int total_ops_merged_per_commit = (PAYLOAD_SIZE / BLOCK_SZ) * 8;
+ int num_ops_merged = 0;
+
+ while (!cowop_iter->Done()) {
+ int num_ops = PAYLOAD_SIZE / BLOCK_SZ;
+ std::vector<const CowOperation*> replace_zero_vec;
+ uint64_t source_offset;
+
+ int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter, &replace_zero_vec);
+ if (linear_blocks == 0) {
+ // Merge complete
+ CHECK(cowop_iter->Done());
+ break;
+ }
+
+ for (size_t i = 0; i < replace_zero_vec.size(); i++) {
+ const CowOperation* cow_op = replace_zero_vec[i];
+ if (cow_op->type == kCowReplaceOp) {
+ if (!ProcessReplaceOp(cow_op)) {
+ SNAP_LOG(ERROR) << "Merge - ReplaceOp failed for block: " << cow_op->new_block;
+ return false;
+ }
+ } else {
+ CHECK(cow_op->type == kCowZeroOp);
+ if (!ProcessZeroOp()) {
+ SNAP_LOG(ERROR) << "Merge ZeroOp failed.";
+ return false;
+ }
+ }
+
+ bufsink_.UpdateBufferOffset(BLOCK_SZ);
+ }
+
+ size_t io_size = linear_blocks * BLOCK_SZ;
+
+ // Merge - Write the contents back to base device
+ int ret = pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), io_size,
+ source_offset);
+ if (ret < 0 || ret != io_size) {
+ SNAP_LOG(ERROR)
+ << "Merge: ReplaceZeroOps: Failed to write to backing device while merging "
+ << " at offset: " << source_offset << " io_size: " << io_size;
+ return false;
+ }
+
+ num_ops_merged += linear_blocks;
+
+ if (num_ops_merged == total_ops_merged_per_commit) {
+ // Flush the data
+ if (fsync(base_path_merge_fd_.get()) < 0) {
+ SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
+ return false;
+ }
+
+ // Track the merge completion
+ if (!snapuserd_->CommitMerge(num_ops_merged)) {
+ SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
+ return false;
+ }
+
+ num_ops_merged = 0;
+ }
+
+ bufsink_.ResetBufferOffset();
+
+ if (snapuserd_->IsIOTerminated()) {
+ SNAP_LOG(ERROR)
+ << "MergeReplaceZeroOps: Worker threads terminated - shutting down merge";
+ return false;
+ }
+ }
+
+ // Any left over ops not flushed yet.
+ if (num_ops_merged) {
+ // Flush the data
+ if (fsync(base_path_merge_fd_.get()) < 0) {
+ SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
+ return false;
+ }
+
+ if (!snapuserd_->CommitMerge(num_ops_merged)) {
+ SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
+ return false;
+ }
+
+ num_ops_merged = 0;
+ }
+
+ return true;
+}
+
+bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
+ void* mapped_addr = snapuserd_->GetMappedAddr();
+ void* read_ahead_buffer =
+ static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
+
+ SNAP_LOG(INFO) << "MergeOrderedOps started....";
+
+ while (!cowop_iter->Done()) {
+ const CowOperation* cow_op = &cowop_iter->Get();
+ if (!IsOrderedOp(*cow_op)) {
+ break;
+ }
+
+ SNAP_LOG(DEBUG) << "Waiting for merge begin...";
+ // Wait for RA thread to notify that the merge window
+ // is ready for merging.
+ if (!snapuserd_->WaitForMergeBegin()) {
+ return false;
+ }
+
+ loff_t offset = 0;
+ int num_ops = snapuserd_->GetTotalBlocksToMerge();
+ SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
+ while (num_ops) {
+ uint64_t source_offset;
+
+ int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter);
+ if (linear_blocks == 0) {
+ break;
+ }
+
+ size_t io_size = (linear_blocks * BLOCK_SZ);
+ // Write to the base device. Data is already in the RA buffer. Note
+ // that XOR ops is already handled by the RA thread. We just write
+ // the contents out.
+ int ret = pwrite(base_path_merge_fd_.get(), (char*)read_ahead_buffer + offset, io_size,
+ source_offset);
+ if (ret < 0 || ret != io_size) {
+ SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
+ << " at offset: " << source_offset << " io_size: " << io_size;
+ return false;
+ }
+
+ offset += io_size;
+ num_ops -= linear_blocks;
+ }
+
+ // Verify all ops are merged
+ CHECK(num_ops == 0);
+
+ // Flush the data
+ if (fsync(base_path_merge_fd_.get()) < 0) {
+ SNAP_LOG(ERROR) << " Failed to fsync merged data";
+ return false;
+ }
+
+ // Merge is done and data is on disk. Update the COW Header about
+ // the merge completion
+ if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
+ SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
+ return false;
+ }
+
+ SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
+
+ // Notify RA thread that the merge thread is ready to merge the next
+ // window
+ snapuserd_->NotifyRAForMergeReady();
+ }
+
+ return true;
+}
+
+bool Worker::Merge() {
+ std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
+
+ // Start with Copy and Xor ops
+ if (!MergeOrderedOps(cowop_iter)) {
+ SNAP_LOG(ERROR) << "Merge failed for ordered ops";
+ snapuserd_->MergeFailed();
+ return false;
+ }
+
+ SNAP_LOG(INFO) << "MergeOrderedOps completed...";
+
+ // Replace and Zero ops
+ if (!MergeReplaceZeroOps(cowop_iter)) {
+ SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
+ snapuserd_->MergeFailed();
+ return false;
+ }
+
+ snapuserd_->MergeCompleted();
+
+ return true;
+}
+
+bool Worker::RunMergeThread() {
+ SNAP_LOG(DEBUG) << "Waiting for merge begin...";
+ if (!snapuserd_->WaitForMergeBegin()) {
+ SNAP_LOG(ERROR) << "Merge terminated early...";
+ return true;
+ }
+
+ SNAP_LOG(INFO) << "Merge starting..";
+
+ if (!Init()) {
+ SNAP_LOG(ERROR) << "Merge thread initialization failed...";
+ return false;
+ }
+
+ if (!Merge()) {
+ return false;
+ }
+
+ CloseFds();
+ reader_->CloseCowFd();
+
+ SNAP_LOG(INFO) << "Merge finish";
+
+ return true;
+}
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
new file mode 100644
index 0000000..319755b
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -0,0 +1,424 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "snapuserd_core.h"
+
+namespace android {
+namespace snapshot {
+
+using namespace android;
+using namespace android::dm;
+using android::base::unique_fd;
+
+ReadAhead::ReadAhead(const std::string& cow_device, const std::string& backing_device,
+ const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd) {
+ cow_device_ = cow_device;
+ backing_store_device_ = backing_device;
+ misc_name_ = misc_name;
+ snapuserd_ = snapuserd;
+}
+
+void ReadAhead::CheckOverlap(const CowOperation* cow_op) {
+ uint64_t source_block = cow_op->source;
+ uint64_t source_offset = 0;
+ if (cow_op->type == kCowXorOp) {
+ source_block /= BLOCK_SZ;
+ source_offset = cow_op->source % BLOCK_SZ;
+ }
+ if (dest_blocks_.count(cow_op->new_block) || source_blocks_.count(source_block) ||
+ (source_offset > 0 && source_blocks_.count(source_block + 1))) {
+ overlap_ = true;
+ }
+
+ dest_blocks_.insert(source_block);
+ if (source_offset > 0) {
+ dest_blocks_.insert(source_block + 1);
+ }
+ source_blocks_.insert(cow_op->new_block);
+}
+
+int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops,
+ std::vector<uint64_t>& blocks,
+ std::vector<const CowOperation*>& xor_op_vec) {
+ int num_ops = *pending_ops;
+ int nr_consecutive = 0;
+
+ bool is_ops_present = (!RAIterDone() && num_ops);
+
+ if (!is_ops_present) {
+ return nr_consecutive;
+ }
+
+ // Get the first block with offset
+ const CowOperation* cow_op = GetRAOpIter();
+ *source_offset = cow_op->source;
+
+ if (cow_op->type == kCowCopyOp) {
+ *source_offset *= BLOCK_SZ;
+ } else if (cow_op->type == kCowXorOp) {
+ xor_op_vec.push_back(cow_op);
+ }
+
+ RAIterNext();
+ num_ops -= 1;
+ nr_consecutive = 1;
+ blocks.push_back(cow_op->new_block);
+
+ if (!overlap_) {
+ CheckOverlap(cow_op);
+ }
+
+ /*
+ * Find number of consecutive blocks
+ */
+ while (!RAIterDone() && num_ops) {
+ const CowOperation* op = GetRAOpIter();
+ uint64_t next_offset = op->source;
+
+ if (cow_op->type == kCowCopyOp) {
+ next_offset *= BLOCK_SZ;
+ }
+
+ // Check for consecutive blocks
+ if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
+ break;
+ }
+
+ if (op->type == kCowXorOp) {
+ xor_op_vec.push_back(op);
+ }
+
+ nr_consecutive += 1;
+ num_ops -= 1;
+ blocks.push_back(op->new_block);
+ RAIterNext();
+
+ if (!overlap_) {
+ CheckOverlap(op);
+ }
+ }
+
+ return nr_consecutive;
+}
+
+bool ReadAhead::ReconstructDataFromCow() {
+ std::unordered_map<uint64_t, void*> read_ahead_buffer_map;
+ loff_t metadata_offset = 0;
+ loff_t start_data_offset = snapuserd_->GetBufferDataOffset();
+ int num_ops = 0;
+ int total_blocks_merged = 0;
+
+ while (true) {
+ struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
+ (char*)metadata_buffer_ + metadata_offset);
+
+ // Done reading metadata
+ if (bm->new_block == 0 && bm->file_offset == 0) {
+ break;
+ }
+
+ loff_t buffer_offset = bm->file_offset - start_data_offset;
+ void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + buffer_offset);
+ read_ahead_buffer_map[bm->new_block] = bufptr;
+ num_ops += 1;
+ total_blocks_merged += 1;
+
+ metadata_offset += sizeof(struct ScratchMetadata);
+ }
+
+ // We are done re-constructing the mapping; however, we need to make sure
+ // all the COW operations to-be merged are present in the re-constructed
+ // mapping.
+ while (!RAIterDone()) {
+ const CowOperation* op = GetRAOpIter();
+ if (read_ahead_buffer_map.find(op->new_block) != read_ahead_buffer_map.end()) {
+ num_ops -= 1;
+ RAIterNext();
+ continue;
+ }
+
+ // Verify that we have covered all the ops which were re-constructed
+ // from COW device - These are the ops which are being
+ // re-constructed after crash.
+ if (!(num_ops == 0)) {
+ SNAP_LOG(ERROR) << "ReconstructDataFromCow failed. Not all ops recoverd "
+ << " Pending ops: " << num_ops;
+ snapuserd_->ReadAheadIOFailed();
+ return false;
+ }
+
+ break;
+ }
+
+ snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
+
+ snapuserd_->FinishReconstructDataFromCow();
+
+ if (!snapuserd_->ReadAheadIOCompleted(true)) {
+ SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
+ snapuserd_->ReadAheadIOFailed();
+ return false;
+ }
+
+ SNAP_LOG(INFO) << "ReconstructDataFromCow success";
+ return true;
+}
+
+bool ReadAhead::ReadAheadIOStart() {
+ // Check if the data has to be constructed from the COW file.
+ // This will be true only once during boot up after a crash
+ // during merge.
+ if (snapuserd_->ShouldReconstructDataFromCow()) {
+ return ReconstructDataFromCow();
+ }
+
+ std::vector<uint64_t> blocks;
+
+ int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
+ loff_t buffer_offset = 0;
+ int total_blocks_merged = 0;
+ overlap_ = false;
+ dest_blocks_.clear();
+ source_blocks_.clear();
+ std::vector<const CowOperation*> xor_op_vec;
+
+ auto ra_temp_buffer = std::make_unique<uint8_t[]>(snapuserd_->GetBufferDataSize());
+
+ // Number of ops to be merged in this window. This is a fixed size
+ // except for the last window wherein the number of ops can be less
+ // than the size of the RA window.
+ while (num_ops) {
+ uint64_t source_offset;
+
+ int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks, xor_op_vec);
+ if (linear_blocks == 0) {
+ // No more blocks to read
+ SNAP_LOG(DEBUG) << " Read-ahead completed....";
+ break;
+ }
+
+ size_t io_size = (linear_blocks * BLOCK_SZ);
+
+ // Read from the base device consecutive set of blocks in one shot
+ if (!android::base::ReadFullyAtOffset(backing_store_fd_,
+ (char*)ra_temp_buffer.get() + buffer_offset, io_size,
+ source_offset)) {
+ SNAP_PLOG(ERROR) << "Ordered-op failed. Read from backing store: "
+ << backing_store_device_ << "at block :" << source_offset / BLOCK_SZ
+ << " offset :" << source_offset % BLOCK_SZ
+ << " buffer_offset : " << buffer_offset << " io_size : " << io_size
+ << " buf-addr : " << read_ahead_buffer_;
+
+ snapuserd_->ReadAheadIOFailed();
+ return false;
+ }
+
+ buffer_offset += io_size;
+ total_blocks_merged += linear_blocks;
+ num_ops -= linear_blocks;
+ }
+
+ // Done with merging ordered ops
+ if (RAIterDone() && total_blocks_merged == 0) {
+ return true;
+ }
+
+ loff_t metadata_offset = 0;
+
+ auto ra_temp_meta_buffer = std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
+
+ struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
+ (char*)ra_temp_meta_buffer.get() + metadata_offset);
+
+ bm->new_block = 0;
+ bm->file_offset = 0;
+
+ loff_t file_offset = snapuserd_->GetBufferDataOffset();
+
+ loff_t offset = 0;
+ CHECK(blocks.size() == total_blocks_merged);
+
+ size_t xor_index = 0;
+ for (size_t block_index = 0; block_index < blocks.size(); block_index++) {
+ void* bufptr = static_cast<void*>((char*)ra_temp_buffer.get() + offset);
+ uint64_t new_block = blocks[block_index];
+
+ if (xor_index < xor_op_vec.size()) {
+ const CowOperation* xor_op = xor_op_vec[xor_index];
+
+ // Check if this block is an XOR op
+ if (xor_op->new_block == new_block) {
+ // Read the xor'ed data from COW
+ if (!reader_->ReadData(*xor_op, &bufsink_)) {
+ SNAP_LOG(ERROR)
+ << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
+ snapuserd_->ReadAheadIOFailed();
+ return false;
+ }
+
+ // Pointer to the data read from base device
+ uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
+ // Get the xor'ed data read from COW device
+ uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink_.GetPayloadBufPtr());
+
+ // Retrieve the original data
+ for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
+ buffer[byte_offset] ^= xor_data[byte_offset];
+ }
+
+ // Move to next XOR op
+ xor_index += 1;
+ }
+ }
+
+ offset += BLOCK_SZ;
+ // Track the metadata blocks which are stored in scratch space
+ bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer.get() +
+ metadata_offset);
+
+ bm->new_block = new_block;
+ bm->file_offset = file_offset;
+
+ metadata_offset += sizeof(struct ScratchMetadata);
+ file_offset += BLOCK_SZ;
+ }
+
+ // Verify if all the xor blocks were scanned to retrieve the original data
+ CHECK(xor_index == xor_op_vec.size());
+
+ // This is important - explicitly set the contents to zero. This is used
+ // when re-constructing the data after crash. This indicates end of
+ // reading metadata contents when re-constructing the data
+ bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer.get() +
+ metadata_offset);
+ bm->new_block = 0;
+ bm->file_offset = 0;
+
+ // Wait for the merge to finish for the previous RA window. We shouldn't
+ // be touching the scratch space until merge is complete of previous RA
+ // window. If there is a crash during this time frame, merge should resume
+ // based on the contents of the scratch space.
+ if (!snapuserd_->WaitForMergeReady()) {
+ return false;
+ }
+
+ // Copy the data to scratch space
+ memcpy(metadata_buffer_, ra_temp_meta_buffer.get(), snapuserd_->GetBufferMetadataSize());
+ memcpy(read_ahead_buffer_, ra_temp_buffer.get(), total_blocks_merged * BLOCK_SZ);
+
+ snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
+
+ // Flush the data only if we have a overlapping blocks in the region
+ // Notify the Merge thread to resume merging this window
+ if (!snapuserd_->ReadAheadIOCompleted(overlap_)) {
+ SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
+ snapuserd_->ReadAheadIOFailed();
+ return false;
+ }
+
+ return true;
+}
+
+bool ReadAhead::RunThread() {
+ if (!InitializeFds()) {
+ return false;
+ }
+
+ InitializeBuffer();
+
+ if (!InitReader()) {
+ return false;
+ }
+
+ InitializeRAIter();
+
+ while (!RAIterDone()) {
+ if (!ReadAheadIOStart()) {
+ break;
+ }
+ }
+
+ CloseFds();
+ reader_->CloseCowFd();
+ SNAP_LOG(INFO) << " ReadAhead thread terminating....";
+ return true;
+}
+
+// Initialization
+bool ReadAhead::InitializeFds() {
+ backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
+ if (backing_store_fd_ < 0) {
+ SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
+ return false;
+ }
+
+ cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
+ if (cow_fd_ < 0) {
+ SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
+ return false;
+ }
+
+ return true;
+}
+
+bool ReadAhead::InitReader() {
+ reader_ = snapuserd_->CloneReaderForWorker();
+
+ if (!reader_->InitForMerge(std::move(cow_fd_))) {
+ return false;
+ }
+ return true;
+}
+
+void ReadAhead::InitializeRAIter() {
+ cowop_iter_ = reader_->GetMergeOpIter();
+}
+
+bool ReadAhead::RAIterDone() {
+ if (cowop_iter_->Done()) {
+ return true;
+ }
+
+ const CowOperation* cow_op = GetRAOpIter();
+
+ if (!IsOrderedOp(*cow_op)) {
+ return true;
+ }
+
+ return false;
+}
+
+void ReadAhead::RAIterNext() {
+ cowop_iter_->Next();
+}
+
+const CowOperation* ReadAhead::GetRAOpIter() {
+ const CowOperation* cow_op = &cowop_iter_->Get();
+ return cow_op;
+}
+
+void ReadAhead::InitializeBuffer() {
+ void* mapped_addr = snapuserd_->GetMappedAddr();
+ // Map the scratch space region into memory
+ metadata_buffer_ =
+ static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferMetadataOffset());
+ read_ahead_buffer_ = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
+ // For xor ops
+ bufsink_.Initialize(PAYLOAD_SIZE);
+}
+
+} // namespace snapshot
+} // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
new file mode 100644
index 0000000..97418bd
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
@@ -0,0 +1,363 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "snapuserd_core.h"
+
+/*
+ * Readahead is used to optimize the merge of COPY and XOR Ops.
+ *
+ * We create a scratch space of 2MB to store the read-ahead data in the COW
+ * device.
+ *
+ * +-----------------------+
+ * | Header (fixed) |
+ * +-----------------------+
+ * | Scratch space | <-- 2MB
+ * +-----------------------+
+ *
+ * Scratch space is as follows:
+ *
+ * +-----------------------+
+ * | Metadata | <- 4k page
+ * +-----------------------+
+ * | Metadata | <- 4k page
+ * +-----------------------+
+ * | |
+ * | Read-ahead data |
+ * | |
+ * +-----------------------+
+ *
+ *
+ * * ===================================================================
+ *
+ * Example:
+ *
+ * We have 6 copy operations to be executed in OTA. Update-engine
+ * will write to COW file as follows:
+ *
+ * Op-1: 20 -> 23
+ * Op-2: 19 -> 22
+ * Op-3: 18 -> 21
+ * Op-4: 17 -> 20
+ * Op-5: 16 -> 19
+ * Op-6: 15 -> 18
+ *
+ * Read-ahead thread will read all the 6 source blocks and store the data in the
+ * scratch space. Metadata will contain the destination block numbers. Thus,
+ * scratch space will look something like this:
+ *
+ * +--------------+
+ * | Block 23 |
+ * | offset - 1 |
+ * +--------------+
+ * | Block 22 |
+ * | offset - 2 |
+ * +--------------+
+ * | Block 21 |
+ * | offset - 3 |
+ * +--------------+
+ * ...
+ * ...
+ * +--------------+
+ * | Data-Block 20| <-- offset - 1
+ * +--------------+
+ * | Data-Block 19| <-- offset - 2
+ * +--------------+
+ * | Data-Block 18| <-- offset - 3
+ * +--------------+
+ * ...
+ * ...
+ *
+ * ====================================================================
+ *
+ *
+ * Read-ahead thread will process the COW Ops in fixed set. Consider
+ * the following example:
+ *
+ * +--------------------------+
+ * |op-1|op-2|op-3|....|op-510|
+ * +--------------------------+
+ *
+ * <------ One RA Block ------>
+ *
+ * RA thread will read 510 ordered COW ops at a time and will store
+ * the data in the scratch space.
+ *
+ * RA thread and Merge thread will go lock-step wherein RA thread
+ * will make sure that 510 COW operation data are read upfront
+ * and is in memory. Thus, when merge thread will pick up the data
+ * directly from memory and write it back to base device.
+ *
+ *
+ * +--------------------------+------------------------------------+
+ * |op-1|op-2|op-3|....|op-510|op-511|op-512|op-513........|op-1020|
+ * +--------------------------+------------------------------------+
+ *
+ * <------Merge 510 Blocks----><-Prepare 510 blocks for merge by RA->
+ * ^ ^
+ * | |
+ * Merge thread RA thread
+ *
+ * Both Merge and RA thread will strive to work in parallel.
+ *
+ * ===========================================================================
+ *
+ * State transitions and communication between RA thread and Merge thread:
+ *
+ * Merge Thread RA Thread
+ * ----------------------------------------------------------------------------
+ *
+ * | |
+ * WAIT for RA Block N READ one RA Block (N)
+ * for merge |
+ * | |
+ * | |
+ * <--------------MERGE BEGIN--------READ Block N done(copy to scratch)
+ * | |
+ * | |
+ * Merge Begin Block N READ one RA BLock (N+1)
+ * | |
+ * | |
+ * | READ done. Wait for merge complete
+ * | |
+ * | WAIT
+ * | |
+ * Merge done Block N |
+ * ----------------MERGE READY-------------->|
+ * WAIT for RA Block N+1 Copy RA Block (N+1)
+ * for merge to scratch space
+ * | |
+ * <---------------MERGE BEGIN---------BLOCK N+1 Done
+ * | |
+ * | |
+ * Merge Begin Block N+1 READ one RA BLock (N+2)
+ * | |
+ * | |
+ * | READ done. Wait for merge complete
+ * | |
+ * | WAIT
+ * | |
+ * Merge done Block N+1 |
+ * ----------------MERGE READY-------------->|
+ * WAIT for RA Block N+2 Copy RA Block (N+2)
+ * for merge to scratch space
+ * | |
+ * <---------------MERGE BEGIN---------BLOCK N+2 Done
+ */
+
+namespace android {
+namespace snapshot {
+
+using namespace android;
+using namespace android::dm;
+using android::base::unique_fd;
+
+// This is invoked once primarily by update-engine to initiate
+// the merge
+void SnapshotHandler::InitiateMerge() {
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ merge_initiated_ = true;
+
+ // If there are only REPLACE ops to be merged, then we need
+ // to explicitly set the state to MERGE_BEGIN as there
+ // is no read-ahead thread
+ if (!ra_thread_) {
+ io_state_ = MERGE_IO_TRANSITION::MERGE_BEGIN;
+ }
+ }
+ cv.notify_all();
+}
+
+// Invoked by Merge thread - Waits on RA thread to resume merging. Will
+// be waken up RA thread.
+bool SnapshotHandler::WaitForMergeBegin() {
+ {
+ std::unique_lock<std::mutex> lock(lock_);
+ while (!MergeInitiated()) {
+ cv.wait(lock);
+
+ if (io_state_ == MERGE_IO_TRANSITION::READ_AHEAD_FAILURE ||
+ io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED) {
+ return false;
+ }
+ }
+
+ while (!(io_state_ == MERGE_IO_TRANSITION::MERGE_BEGIN ||
+ io_state_ == MERGE_IO_TRANSITION::READ_AHEAD_FAILURE ||
+ io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED)) {
+ cv.wait(lock);
+ }
+
+ if (io_state_ == MERGE_IO_TRANSITION::READ_AHEAD_FAILURE ||
+ io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED) {
+ return false;
+ }
+
+ return true;
+ }
+}
+
+// Invoked by RA thread - Flushes the RA block to scratch space if necessary
+// and then notifies the merge thread to resume merging
+bool SnapshotHandler::ReadAheadIOCompleted(bool sync) {
+ if (sync) {
+ // Flush the entire buffer region
+ int ret = msync(mapped_addr_, total_mapped_addr_length_, MS_SYNC);
+ if (ret < 0) {
+ PLOG(ERROR) << "msync failed after ReadAheadIOCompleted: " << ret;
+ return false;
+ }
+
+ // Metadata and data are synced. Now, update the state.
+ // We need to update the state after flushing data; if there is a crash
+ // when read-ahead IO is in progress, the state of data in the COW file
+ // is unknown. kCowReadAheadDone acts as a checkpoint wherein the data
+ // in the scratch space is good and during next reboot, read-ahead thread
+ // can safely re-construct the data.
+ struct BufferState* ra_state = GetBufferState();
+ ra_state->read_ahead_state = kCowReadAheadDone;
+
+ ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
+ if (ret < 0) {
+ PLOG(ERROR) << "msync failed to flush Readahead completion state...";
+ return false;
+ }
+ }
+
+ // Notify the merge thread to resume merging
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ if (io_state_ != MERGE_IO_TRANSITION::IO_TERMINATED &&
+ io_state_ != MERGE_IO_TRANSITION::MERGE_FAILED) {
+ io_state_ = MERGE_IO_TRANSITION::MERGE_BEGIN;
+ }
+ }
+
+ cv.notify_all();
+ return true;
+}
+
+// Invoked by RA thread - Waits for merge thread to finish merging
+// RA Block N - RA thread would be ready will with Block N+1 but
+// will wait to merge thread to finish Block N. Once Block N
+// is merged, RA thread will be woken up by Merge thread and will
+// flush the data of Block N+1 to scratch space
+bool SnapshotHandler::WaitForMergeReady() {
+ {
+ std::unique_lock<std::mutex> lock(lock_);
+ while (!(io_state_ == MERGE_IO_TRANSITION::MERGE_READY ||
+ io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED ||
+ io_state_ == MERGE_IO_TRANSITION::MERGE_COMPLETE ||
+ io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED)) {
+ cv.wait(lock);
+ }
+
+ // Check if merge failed
+ if (io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED ||
+ io_state_ == MERGE_IO_TRANSITION::MERGE_COMPLETE ||
+ io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED) {
+ return false;
+ }
+ return true;
+ }
+}
+
+// Invoked by Merge thread - Notify RA thread about Merge completion
+// for Block N and wake up
+void SnapshotHandler::NotifyRAForMergeReady() {
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ if (io_state_ != MERGE_IO_TRANSITION::IO_TERMINATED &&
+ io_state_ != MERGE_IO_TRANSITION::READ_AHEAD_FAILURE) {
+ io_state_ = MERGE_IO_TRANSITION::MERGE_READY;
+ }
+ }
+
+ cv.notify_all();
+}
+
+// The following transitions are mostly in the failure paths
+void SnapshotHandler::MergeFailed() {
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ io_state_ = MERGE_IO_TRANSITION::MERGE_FAILED;
+ }
+
+ cv.notify_all();
+}
+
+void SnapshotHandler::MergeCompleted() {
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ io_state_ = MERGE_IO_TRANSITION::MERGE_COMPLETE;
+ }
+
+ cv.notify_all();
+}
+
+// This is invoked by worker threads.
+//
+// Worker threads are terminated either by two scenarios:
+//
+// 1: If dm-user device is destroyed
+// 2: We had an I/O failure when reading root partitions
+//
+// In case (1), this would be a graceful shutdown. In this case, merge
+// thread and RA thread should have _already_ terminated by this point. We will be
+// destroying the dm-user device only _after_ merge is completed.
+//
+// In case (2), if merge thread had started, then it will be
+// continuing to merge; however, since we had an I/O failure and the
+// I/O on root partitions are no longer served, we will terminate the
+// merge.
+//
+// This functions is about handling case (2)
+void SnapshotHandler::NotifyIOTerminated() {
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ io_state_ = MERGE_IO_TRANSITION::IO_TERMINATED;
+ }
+
+ cv.notify_all();
+}
+
+bool SnapshotHandler::IsIOTerminated() {
+ std::lock_guard<std::mutex> lock(lock_);
+ return (io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED);
+}
+
+// Invoked by RA thread
+void SnapshotHandler::ReadAheadIOFailed() {
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ io_state_ = MERGE_IO_TRANSITION::READ_AHEAD_FAILURE;
+ }
+
+ cv.notify_all();
+}
+
+void SnapshotHandler::WaitForMergeComplete() {
+ std::unique_lock<std::mutex> lock(lock_);
+ while (!(io_state_ == MERGE_IO_TRANSITION::MERGE_COMPLETE ||
+ io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED ||
+ io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED)) {
+ cv.wait(lock);
+ }
+}
+
+} // namespace snapshot
+} // namespace android
diff --git a/init/Android.bp b/init/Android.bp
index 9b02c38..66427dc 100644
--- a/init/Android.bp
+++ b/init/Android.bp
@@ -448,6 +448,7 @@
srcs: [
"devices_test.cpp",
+ "epoll_test.cpp",
"firmware_handler_test.cpp",
"init_test.cpp",
"keychords_test.cpp",
diff --git a/init/epoll.cpp b/init/epoll.cpp
index 17d63fa..74d8aac 100644
--- a/init/epoll.cpp
+++ b/init/epoll.cpp
@@ -38,11 +38,12 @@
return {};
}
-Result<void> Epoll::RegisterHandler(int fd, std::function<void()> handler, uint32_t events) {
+Result<void> Epoll::RegisterHandler(int fd, Handler handler, uint32_t events) {
if (!events) {
return Error() << "Must specify events";
}
- auto [it, inserted] = epoll_handlers_.emplace(fd, std::move(handler));
+ auto sp = std::make_shared<decltype(handler)>(std::move(handler));
+ auto [it, inserted] = epoll_handlers_.emplace(fd, std::move(sp));
if (!inserted) {
return Error() << "Cannot specify two epoll handlers for a given FD";
}
@@ -69,7 +70,7 @@
return {};
}
-Result<std::vector<std::function<void()>*>> Epoll::Wait(
+Result<std::vector<std::shared_ptr<Epoll::Handler>>> Epoll::Wait(
std::optional<std::chrono::milliseconds> timeout) {
int timeout_ms = -1;
if (timeout && timeout->count() < INT_MAX) {
@@ -81,9 +82,10 @@
if (num_events == -1) {
return ErrnoError() << "epoll_wait failed";
}
- std::vector<std::function<void()>*> pending_functions;
+ std::vector<std::shared_ptr<Handler>> pending_functions;
for (int i = 0; i < num_events; ++i) {
- pending_functions.emplace_back(reinterpret_cast<std::function<void()>*>(ev[i].data.ptr));
+ auto sp = *reinterpret_cast<std::shared_ptr<Handler>*>(ev[i].data.ptr);
+ pending_functions.emplace_back(std::move(sp));
}
return pending_functions;
diff --git a/init/epoll.h b/init/epoll.h
index c32a661..0df5289 100644
--- a/init/epoll.h
+++ b/init/epoll.h
@@ -22,6 +22,7 @@
#include <chrono>
#include <functional>
#include <map>
+#include <memory>
#include <optional>
#include <vector>
@@ -36,15 +37,17 @@
public:
Epoll();
+ typedef std::function<void()> Handler;
+
Result<void> Open();
- Result<void> RegisterHandler(int fd, std::function<void()> handler, uint32_t events = EPOLLIN);
+ Result<void> RegisterHandler(int fd, Handler handler, uint32_t events = EPOLLIN);
Result<void> UnregisterHandler(int fd);
- Result<std::vector<std::function<void()>*>> Wait(
+ Result<std::vector<std::shared_ptr<Handler>>> Wait(
std::optional<std::chrono::milliseconds> timeout);
private:
android::base::unique_fd epoll_fd_;
- std::map<int, std::function<void()>> epoll_handlers_;
+ std::map<int, std::shared_ptr<Handler>> epoll_handlers_;
};
} // namespace init
diff --git a/init/epoll_test.cpp b/init/epoll_test.cpp
new file mode 100644
index 0000000..9236cd5
--- /dev/null
+++ b/init/epoll_test.cpp
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "epoll.h"
+
+#include <sys/unistd.h>
+
+#include <unordered_set>
+
+#include <android-base/file.h>
+#include <gtest/gtest.h>
+
+namespace android {
+namespace init {
+
+std::unordered_set<void*> sValidObjects;
+
+class CatchDtor final {
+ public:
+ CatchDtor() { sValidObjects.emplace(this); }
+ CatchDtor(const CatchDtor&) { sValidObjects.emplace(this); }
+ ~CatchDtor() {
+ auto iter = sValidObjects.find(this);
+ if (iter != sValidObjects.end()) {
+ sValidObjects.erase(iter);
+ }
+ }
+};
+
+TEST(epoll, UnregisterHandler) {
+ Epoll epoll;
+ ASSERT_RESULT_OK(epoll.Open());
+
+ int fds[2];
+ ASSERT_EQ(pipe(fds), 0);
+
+ CatchDtor catch_dtor;
+ bool handler_invoked;
+ auto handler = [&, catch_dtor]() -> void {
+ auto result = epoll.UnregisterHandler(fds[0]);
+ ASSERT_EQ(result.ok(), !handler_invoked);
+ handler_invoked = true;
+ ASSERT_NE(sValidObjects.find((void*)&catch_dtor), sValidObjects.end());
+ };
+
+ epoll.RegisterHandler(fds[0], std::move(handler));
+
+ uint8_t byte = 0xee;
+ ASSERT_TRUE(android::base::WriteFully(fds[1], &byte, sizeof(byte)));
+
+ auto results = epoll.Wait({});
+ ASSERT_RESULT_OK(results);
+ ASSERT_EQ(results->size(), size_t(1));
+
+ for (const auto& function : *results) {
+ (*function)();
+ (*function)();
+ }
+ ASSERT_TRUE(handler_invoked);
+}
+
+} // namespace init
+} // namespace android
diff --git a/libprocessgroup/profiles/task_profiles_28.json b/libprocessgroup/profiles/task_profiles_28.json
index 9f83785..56053e0 100644
--- a/libprocessgroup/profiles/task_profiles_28.json
+++ b/libprocessgroup/profiles/task_profiles_28.json
@@ -40,6 +40,19 @@
]
},
{
+ "Name": "ServicePerformance",
+ "Actions": [
+ {
+ "Name": "JoinCgroup",
+ "Params":
+ {
+ "Controller": "schedtune",
+ "Path": "background"
+ }
+ }
+ ]
+ },
+ {
"Name": "HighPerformance",
"Actions": [
{
diff --git a/libprocessgroup/profiles/task_profiles_29.json b/libprocessgroup/profiles/task_profiles_29.json
index 9f83785..52279b8 100644
--- a/libprocessgroup/profiles/task_profiles_29.json
+++ b/libprocessgroup/profiles/task_profiles_29.json
@@ -53,6 +53,19 @@
]
},
{
+ "Name": "ServicePerformance",
+ "Actions": [
+ {
+ "Name": "JoinCgroup",
+ "Params":
+ {
+ "Controller": "schedtune",
+ "Path": "background"
+ }
+ }
+ ]
+ },
+ {
"Name": "MaxPerformance",
"Actions": [
{
diff --git a/libprocessgroup/profiles/task_profiles_30.json b/libprocessgroup/profiles/task_profiles_30.json
index 9f83785..56053e0 100644
--- a/libprocessgroup/profiles/task_profiles_30.json
+++ b/libprocessgroup/profiles/task_profiles_30.json
@@ -40,6 +40,19 @@
]
},
{
+ "Name": "ServicePerformance",
+ "Actions": [
+ {
+ "Name": "JoinCgroup",
+ "Params":
+ {
+ "Controller": "schedtune",
+ "Path": "background"
+ }
+ }
+ ]
+ },
+ {
"Name": "HighPerformance",
"Actions": [
{
diff --git a/libutils/String16.cpp b/libutils/String16.cpp
index c42cada..68642d8 100644
--- a/libutils/String16.cpp
+++ b/libutils/String16.cpp
@@ -199,99 +199,59 @@
return NO_MEMORY;
}
-status_t String16::append(const String16& other)
-{
- const size_t myLen = size();
- const size_t otherLen = other.size();
- if (myLen == 0) {
- setTo(other);
- return OK;
- } else if (otherLen == 0) {
- return OK;
- }
-
- if (myLen >= SIZE_MAX / sizeof(char16_t) - otherLen) {
- android_errorWriteLog(0x534e4554, "73826242");
- abort();
- }
-
- SharedBuffer* buf =
- static_cast<SharedBuffer*>(editResize((myLen + otherLen + 1) * sizeof(char16_t)));
- if (buf) {
- char16_t* str = (char16_t*)buf->data();
- memcpy(str+myLen, other, (otherLen+1)*sizeof(char16_t));
- mString = str;
- return OK;
- }
- return NO_MEMORY;
+status_t String16::append(const String16& other) {
+ return append(other.string(), other.size());
}
-status_t String16::append(const char16_t* chrs, size_t otherLen)
-{
+status_t String16::append(const char16_t* chrs, size_t otherLen) {
const size_t myLen = size();
- if (myLen == 0) {
- setTo(chrs, otherLen);
- return OK;
- } else if (otherLen == 0) {
- return OK;
- }
- if (myLen >= SIZE_MAX / sizeof(char16_t) - otherLen) {
- android_errorWriteLog(0x534e4554, "73826242");
- abort();
- }
+ if (myLen == 0) return setTo(chrs, otherLen);
- SharedBuffer* buf =
- static_cast<SharedBuffer*>(editResize((myLen + otherLen + 1) * sizeof(char16_t)));
- if (buf) {
- char16_t* str = (char16_t*)buf->data();
- memcpy(str+myLen, chrs, otherLen*sizeof(char16_t));
- str[myLen+otherLen] = 0;
- mString = str;
- return OK;
- }
- return NO_MEMORY;
+ if (otherLen == 0) return OK;
+
+ size_t size = myLen;
+ if (__builtin_add_overflow(size, otherLen, &size) ||
+ __builtin_add_overflow(size, 1, &size) ||
+ __builtin_mul_overflow(size, sizeof(char16_t), &size)) return NO_MEMORY;
+
+ SharedBuffer* buf = static_cast<SharedBuffer*>(editResize(size));
+ if (!buf) return NO_MEMORY;
+
+ char16_t* str = static_cast<char16_t*>(buf->data());
+ memcpy(str + myLen, chrs, otherLen * sizeof(char16_t));
+ str[myLen + otherLen] = 0;
+ mString = str;
+ return OK;
}
-status_t String16::insert(size_t pos, const char16_t* chrs)
-{
+status_t String16::insert(size_t pos, const char16_t* chrs) {
return insert(pos, chrs, strlen16(chrs));
}
-status_t String16::insert(size_t pos, const char16_t* chrs, size_t len)
-{
+status_t String16::insert(size_t pos, const char16_t* chrs, size_t otherLen) {
const size_t myLen = size();
- if (myLen == 0) {
- return setTo(chrs, len);
- return OK;
- } else if (len == 0) {
- return OK;
- }
+
+ if (myLen == 0) return setTo(chrs, otherLen);
+
+ if (otherLen == 0) return OK;
if (pos > myLen) pos = myLen;
- #if 0
- printf("Insert in to %s: pos=%d, len=%d, myLen=%d, chrs=%s\n",
- String8(*this).string(), pos,
- len, myLen, String8(chrs, len).string());
- #endif
+ size_t size = myLen;
+ if (__builtin_add_overflow(size, otherLen, &size) ||
+ __builtin_add_overflow(size, 1, &size) ||
+ __builtin_mul_overflow(size, sizeof(char16_t), &size)) return NO_MEMORY;
- SharedBuffer* buf =
- static_cast<SharedBuffer*>(editResize((myLen + len + 1) * sizeof(char16_t)));
- if (buf) {
- char16_t* str = (char16_t*)buf->data();
- if (pos < myLen) {
- memmove(str+pos+len, str+pos, (myLen-pos)*sizeof(char16_t));
- }
- memcpy(str+pos, chrs, len*sizeof(char16_t));
- str[myLen+len] = 0;
- mString = str;
- #if 0
- printf("Result (%d chrs): %s\n", size(), String8(*this).string());
- #endif
- return OK;
- }
- return NO_MEMORY;
+ SharedBuffer* buf = static_cast<SharedBuffer*>(editResize(size));
+ if (!buf) return NO_MEMORY;
+
+ char16_t* str = static_cast<char16_t*>(buf->data());
+ if (pos < myLen) memmove(str + pos + otherLen, str + pos, (myLen - pos) * sizeof(char16_t));
+ memcpy(str + pos, chrs, otherLen * sizeof(char16_t));
+ str[myLen + otherLen] = 0;
+ mString = str;
+ return OK;
}
ssize_t String16::findFirst(char16_t c) const
diff --git a/libutils/String16_test.cpp b/libutils/String16_test.cpp
index 7d7230e..c6e6f74 100644
--- a/libutils/String16_test.cpp
+++ b/libutils/String16_test.cpp
@@ -19,7 +19,7 @@
#include <gtest/gtest.h>
-namespace android {
+using namespace android;
::testing::AssertionResult Char16_tStringEquals(const char16_t* a, const char16_t* b) {
if (strcmp16(a, b) != 0) {
@@ -224,4 +224,36 @@
EXPECT_STR16EQ(another, u"abcdef");
}
-} // namespace android
+TEST(String16Test, append) {
+ String16 s;
+ EXPECT_EQ(OK, s.append(String16(u"foo")));
+ EXPECT_STR16EQ(u"foo", s);
+ EXPECT_EQ(OK, s.append(String16(u"bar")));
+ EXPECT_STR16EQ(u"foobar", s);
+ EXPECT_EQ(OK, s.append(u"baz", 0));
+ EXPECT_STR16EQ(u"foobar", s);
+ EXPECT_EQ(NO_MEMORY, s.append(u"baz", SIZE_MAX));
+ EXPECT_STR16EQ(u"foobar", s);
+}
+
+TEST(String16Test, insert) {
+ String16 s;
+
+ // Inserting into the empty string inserts at the start.
+ EXPECT_EQ(OK, s.insert(123, u"foo"));
+ EXPECT_STR16EQ(u"foo", s);
+
+ // Inserting zero characters at any position is okay, but won't expand the string.
+ EXPECT_EQ(OK, s.insert(123, u"foo", 0));
+ EXPECT_STR16EQ(u"foo", s);
+
+ // Inserting past the end of a non-empty string appends.
+ EXPECT_EQ(OK, s.insert(123, u"bar"));
+ EXPECT_STR16EQ(u"foobar", s);
+
+ EXPECT_EQ(OK, s.insert(3, u"!"));
+ EXPECT_STR16EQ(u"foo!bar", s);
+
+ EXPECT_EQ(NO_MEMORY, s.insert(3, u"", SIZE_MAX));
+ EXPECT_STR16EQ(u"foo!bar", s);
+}
diff --git a/libutils/String8.cpp b/libutils/String8.cpp
index 8511da9..419b2de 100644
--- a/libutils/String8.cpp
+++ b/libutils/String8.cpp
@@ -313,8 +313,8 @@
if (n > 0) {
size_t oldLength = length();
- if ((size_t)n > SIZE_MAX - 1 ||
- oldLength > SIZE_MAX - (size_t)n - 1) {
+ if (n > std::numeric_limits<size_t>::max() - 1 ||
+ oldLength > std::numeric_limits<size_t>::max() - n - 1) {
return NO_MEMORY;
}
char* buf = lockBuffer(oldLength + n);
@@ -327,21 +327,23 @@
return result;
}
-status_t String8::real_append(const char* other, size_t otherLen)
-{
+status_t String8::real_append(const char* other, size_t otherLen) {
const size_t myLen = bytes();
- SharedBuffer* buf = SharedBuffer::bufferFromData(mString)
- ->editResize(myLen+otherLen+1);
- if (buf) {
- char* str = (char*)buf->data();
- mString = str;
- str += myLen;
- memcpy(str, other, otherLen);
- str[otherLen] = '\0';
- return OK;
+ SharedBuffer* buf;
+ size_t newLen;
+ if (__builtin_add_overflow(myLen, otherLen, &newLen) ||
+ __builtin_add_overflow(newLen, 1, &newLen) ||
+ (buf = SharedBuffer::bufferFromData(mString)->editResize(newLen)) == nullptr) {
+ return NO_MEMORY;
}
- return NO_MEMORY;
+
+ char* str = (char*)buf->data();
+ mString = str;
+ str += myLen;
+ memcpy(str, other, otherLen);
+ str[otherLen] = '\0';
+ return OK;
}
char* String8::lockBuffer(size_t size)
diff --git a/libutils/String8_test.cpp b/libutils/String8_test.cpp
index 9efcc6f..1356cd0 100644
--- a/libutils/String8_test.cpp
+++ b/libutils/String8_test.cpp
@@ -15,13 +15,14 @@
*/
#define LOG_TAG "String8_test"
+
#include <utils/Log.h>
#include <utils/String8.h>
#include <utils/String16.h>
#include <gtest/gtest.h>
-namespace android {
+using namespace android;
class String8Test : public testing::Test {
protected:
@@ -101,4 +102,15 @@
String8 valid = String8(String16(tmp));
EXPECT_STREQ(valid, "abcdef");
}
+
+TEST_F(String8Test, append) {
+ String8 s;
+ EXPECT_EQ(OK, s.append("foo"));
+ EXPECT_STREQ("foo", s);
+ EXPECT_EQ(OK, s.append("bar"));
+ EXPECT_STREQ("foobar", s);
+ EXPECT_EQ(OK, s.append("baz", 0));
+ EXPECT_STREQ("foobar", s);
+ EXPECT_EQ(NO_MEMORY, s.append("baz", SIZE_MAX));
+ EXPECT_STREQ("foobar", s);
}
diff --git a/libutils/Threads.cpp b/libutils/Threads.cpp
index 540dcf4..6e293c7 100644
--- a/libutils/Threads.cpp
+++ b/libutils/Threads.cpp
@@ -86,8 +86,10 @@
// A new thread will be in its parent's sched group by default,
// so we just need to handle the background case.
+ // currently set to system_background group which is different
+ // from background group for app.
if (prio >= ANDROID_PRIORITY_BACKGROUND) {
- SetTaskProfiles(0, {"SCHED_SP_BACKGROUND"}, true);
+ SetTaskProfiles(0, {"SCHED_SP_SYSTEM"}, true);
}
if (name) {
@@ -313,7 +315,7 @@
}
if (pri >= ANDROID_PRIORITY_BACKGROUND) {
- rc = SetTaskProfiles(tid, {"SCHED_SP_BACKGROUND"}, true) ? 0 : -1;
+ rc = SetTaskProfiles(tid, {"SCHED_SP_SYSTEM"}, true) ? 0 : -1;
} else if (curr_pri >= ANDROID_PRIORITY_BACKGROUND) {
SchedPolicy policy = SP_FOREGROUND;
// Change to the sched policy group of the process.