Support multi-threaded compression in COW v3
Performance of COW v3 is now on par with v2 in both multi-threaded and
single threaded configurations. Note, v2 cow writer can cache up to 1024
blocks in memory if multi-threaded compression is enabled(even though
batch size is configured as 200). For a fair comparison, benchmarks are
ran with batch size of 256. For batch size of 256 or greater, v2 and v3
have similar multi-threaded performance.
Test: th
Bug: 313962438
Change-Id: I377c8291689a7a038bb00b09d7371a155e6972e9
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
index e736847..be6b6da 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
@@ -34,6 +34,7 @@
#include <zlib.h>
#include <fcntl.h>
+#include <libsnapshot/cow_compress.h>
#include <libsnapshot_cow/parser_v3.h>
#include <linux/fs.h>
#include <sys/ioctl.h>
@@ -55,11 +56,35 @@
using android::base::unique_fd;
+// Divide |x| by |y| and round up to the nearest integer.
+constexpr uint64_t DivRoundUp(uint64_t x, uint64_t y) {
+ return (x + y - 1) / y;
+}
+
CowWriterV3::CowWriterV3(const CowOptions& options, unique_fd&& fd)
: CowWriterBase(options, std::move(fd)), batch_size_(std::max<size_t>(options.cluster_ops, 1)) {
SetupHeaders();
}
+void CowWriterV3::InitWorkers() {
+ if (num_compress_threads_ <= 1) {
+ LOG_INFO << "Not creating new threads for compression.";
+ return;
+ }
+ compress_threads_.reserve(num_compress_threads_);
+ compress_threads_.clear();
+ threads_.reserve(num_compress_threads_);
+ threads_.clear();
+ for (size_t i = 0; i < num_compress_threads_; i++) {
+ std::unique_ptr<ICompressor> compressor =
+ ICompressor::Create(compression_, header_.block_size);
+ auto&& wt = compress_threads_.emplace_back(
+ std::make_unique<CompressWorker>(std::move(compressor), header_.block_size));
+ threads_.emplace_back(std::thread([wt = wt.get()]() { wt->RunThread(); }));
+ }
+ LOG(INFO) << num_compress_threads_ << " thread used for compression";
+}
+
void CowWriterV3::SetupHeaders() {
header_ = {};
header_.prefix.magic = kCowMagicNumber;
@@ -135,10 +160,24 @@
} else {
LOG(INFO) << "Batch writes: disabled";
}
+ if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false) &&
+ options_.num_compress_threads) {
+ num_compress_threads_ = options_.num_compress_threads;
+ }
+ InitWorkers();
return true;
}
-CowWriterV3::~CowWriterV3() {}
+CowWriterV3::~CowWriterV3() {
+ for (const auto& t : compress_threads_) {
+ t->Finalize();
+ }
+ for (auto& t : threads_) {
+ if (t.joinable()) {
+ t.join();
+ }
+ }
+}
bool CowWriterV3::Initialize(std::optional<uint64_t> label) {
if (!InitFd() || !ParseOptions()) {
@@ -289,19 +328,24 @@
<< " but compressor is uninitialized.";
return false;
}
+ const auto bytes = reinterpret_cast<const uint8_t*>(data);
const size_t num_blocks = (size / header_.block_size);
for (size_t i = 0; i < num_blocks;) {
const auto blocks_to_write =
std::min<size_t>(batch_size_ - cached_data_.size(), num_blocks - i);
size_t compressed_bytes = 0;
+ auto&& blocks = CompressBlocks(blocks_to_write, bytes + header_.block_size * i);
+ if (blocks.size() != blocks_to_write) {
+ LOG(ERROR) << "Failed to compress blocks " << new_block_start + i << ", "
+ << blocks_to_write << ", actual number of blocks received from compressor "
+ << blocks.size();
+ return false;
+ }
for (size_t j = 0; j < blocks_to_write; j++) {
- const uint8_t* const iter =
- reinterpret_cast<const uint8_t*>(data) + (header_.block_size * (i + j));
-
CowOperation& op = cached_ops_.emplace_back();
auto& vec = data_vec_.emplace_back();
- auto& compressed_data = cached_data_.emplace_back();
+ auto& compressed_data = cached_data_.emplace_back(std::move(blocks[j]));
op.new_block = new_block_start + i + j;
op.set_type(type);
@@ -310,20 +354,6 @@
} else {
op.set_source(next_data_pos_ + compressed_bytes);
}
- if (compression_.algorithm == kCowCompressNone) {
- compressed_data.resize(header_.block_size);
- } else {
- compressed_data = compressor_->Compress(iter, header_.block_size);
- if (compressed_data.empty()) {
- LOG(ERROR) << "Compression failed during EmitBlocks(" << new_block_start << ", "
- << num_blocks << ");";
- return false;
- }
- }
- if (compressed_data.size() >= header_.block_size) {
- compressed_data.resize(header_.block_size);
- std::memcpy(compressed_data.data(), iter, header_.block_size);
- }
vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
op.data_length = vec.iov_len;
compressed_bytes += op.data_length;
@@ -443,6 +473,57 @@
return true;
}
+std::vector<std::basic_string<uint8_t>> CowWriterV3::CompressBlocks(const size_t num_blocks,
+ const void* data) {
+ const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
+ const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
+ std::vector<std::basic_string<uint8_t>> compressed_buf;
+ compressed_buf.clear();
+ const uint8_t* const iter = reinterpret_cast<const uint8_t*>(data);
+ if (compression_.algorithm == kCowCompressNone) {
+ for (size_t i = 0; i < num_blocks; i++) {
+ auto& buf = compressed_buf.emplace_back();
+ buf.resize(header_.block_size);
+ std::memcpy(buf.data(), iter + i * header_.block_size, header_.block_size);
+ }
+ return compressed_buf;
+ }
+ if (num_threads <= 1) {
+ if (!CompressWorker::CompressBlocks(compressor_.get(), header_.block_size, data, num_blocks,
+ &compressed_buf)) {
+ return {};
+ }
+ } else {
+ // Submit the blocks per thread. The retrieval of
+ // compressed buffers has to be done in the same order.
+ // We should not poll for completed buffers in a different order as the
+ // buffers are tightly coupled with block ordering.
+ for (size_t i = 0; i < num_threads; i++) {
+ CompressWorker* worker = compress_threads_[i].get();
+ const auto blocks_in_batch =
+ std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
+ worker->EnqueueCompressBlocks(iter + i * blocks_per_thread * header_.block_size,
+ blocks_in_batch);
+ }
+
+ for (size_t i = 0; i < num_threads; i++) {
+ CompressWorker* worker = compress_threads_[i].get();
+ if (!worker->GetCompressedBuffers(&compressed_buf)) {
+ return {};
+ }
+ }
+ }
+ for (size_t i = 0; i < num_blocks; i++) {
+ auto& block = compressed_buf[i];
+ if (block.size() >= header_.block_size) {
+ block.resize(header_.block_size);
+ std::memcpy(block.data(), iter + header_.block_size * i, header_.block_size);
+ }
+ }
+
+ return compressed_buf;
+}
+
bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
std::basic_string_view<struct iovec> data) {
const auto total_data_size =
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
index 73ac520..b19af60 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
@@ -16,6 +16,7 @@
#include <android-base/logging.h>
#include <string_view>
+#include <thread>
#include <vector>
#include "writer_base.h"
@@ -51,12 +52,14 @@
std::basic_string_view<struct iovec> data);
bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
uint16_t offset, CowOperationType type);
- bool CompressBlocks(size_t num_blocks, const void* data);
bool CheckOpCount(size_t op_count);
private:
+ std::vector<std::basic_string<uint8_t>> CompressBlocks(const size_t num_blocks,
+ const void* data);
bool ReadBackVerification();
bool FlushCacheOps();
+ void InitWorkers();
CowHeaderV3 header_{};
CowCompression compression_;
// in the case that we are using one thread for compression, we can store and re-use the same
@@ -75,6 +78,8 @@
std::vector<CowOperationV3> cached_ops_;
std::vector<std::basic_string<uint8_t>> cached_data_;
std::vector<struct iovec> data_vec_;
+
+ std::vector<std::thread> threads_;
};
} // namespace snapshot