libsnapshot: Use two threads to run compression
Compression is a hot function in the install path. Use
two threads for compression.
By default, number of thread is set to 1. If the property,
"ro.virtual_ab.compression.threads" is true, the number
of threads is increased to 2.
OTA install time (without post-install) on Pixel 6 Pro with 2 threads:
Without-this-patch With-this-patch
Full OTA: 23 Minutes 17 Minutes
Bug: 254188450
Test: Full/Incremental OTA on Pixel
Change-Id: I4a11dca3a5ebfe11dcc7f0d882332d491f2d7933
Signed-off-by: Akilesh Kailash <akailash@google.com>
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
index a9682a1..291bebb 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
@@ -16,10 +16,17 @@
#include <stdint.h>
+#include <condition_variable>
#include <cstdint>
+#include <future>
#include <memory>
+#include <mutex>
#include <optional>
+#include <queue>
#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
#include <android-base/unique_fd.h>
#include <libsnapshot/cow_format.h>
@@ -42,6 +49,9 @@
// Preset the number of merged ops. Only useful for testing.
uint64_t num_merge_ops = 0;
+
+ // Number of threads for compression
+ int num_compress_threads = 0;
};
// Interface for writing to a snapuserd COW. All operations are ordered; merges
@@ -100,9 +110,40 @@
CowOptions options_;
};
+class CompressWorker {
+ public:
+ CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size);
+ bool RunThread();
+ void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
+ bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
+ void Finalize();
+
+ private:
+ struct CompressWork {
+ const void* buffer;
+ size_t num_blocks;
+ bool compression_status = false;
+ std::vector<std::basic_string<uint8_t>> compressed_data;
+ };
+
+ CowCompressionAlgorithm compression_;
+ uint32_t block_size_;
+
+ std::queue<CompressWork> work_queue_;
+ std::queue<CompressWork> compressed_queue_;
+ std::mutex lock_;
+ std::condition_variable cv_;
+ bool stopped_ = false;
+
+ std::basic_string<uint8_t> Compress(const void* data, size_t length);
+ bool CompressBlocks(const void* buffer, size_t num_blocks,
+ std::vector<std::basic_string<uint8_t>>* compressed_data);
+};
+
class CowWriter : public ICowWriter {
public:
explicit CowWriter(const CowOptions& options);
+ ~CowWriter();
// Set up the writer.
// The file starts from the beginning.
@@ -138,6 +179,7 @@
bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
uint16_t offset, uint8_t type);
void SetupHeaders();
+ void SetupWriteOptions();
bool ParseOptions();
bool OpenForWrite();
bool OpenForAppend(uint64_t label);
@@ -145,9 +187,10 @@
bool WriteRawData(const void* data, size_t size);
bool WriteOperation(const CowOperation& op, const void* data = nullptr, size_t size = 0);
void AddOperation(const CowOperation& op);
- std::basic_string<uint8_t> Compress(const void* data, size_t length);
void InitPos();
+ void InitWorkers();
+ bool CompressBlocks(size_t num_blocks, const void* data);
bool SetFd(android::base::borrowed_fd fd);
bool Sync();
bool Truncate(off_t length);
@@ -168,6 +211,12 @@
bool merge_in_progress_ = false;
bool is_block_device_ = false;
uint64_t cow_image_size_ = INT64_MAX;
+
+ int num_compress_threads_ = 1;
+ std::vector<std::unique_ptr<CompressWorker>> compress_threads_;
+ std::vector<std::future<bool>> threads_;
+ std::vector<std::basic_string<uint8_t>> compressed_buf_;
+ std::vector<std::basic_string<uint8_t>>::iterator buf_iter_;
};
} // namespace snapshot
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
index 0eb231b..4d9b748 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
@@ -33,7 +33,7 @@
namespace android {
namespace snapshot {
-std::basic_string<uint8_t> CowWriter::Compress(const void* data, size_t length) {
+std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
switch (compression_) {
case kCowCompressGz: {
const auto bound = compressBound(length);
@@ -100,5 +100,119 @@
return {};
}
+bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
+ std::vector<std::basic_string<uint8_t>>* compressed_data) {
+ const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
+ while (num_blocks) {
+ auto data = Compress(iter, block_size_);
+ if (data.empty()) {
+ PLOG(ERROR) << "CompressBlocks: Compression failed";
+ return false;
+ }
+ if (data.size() > std::numeric_limits<uint16_t>::max()) {
+ LOG(ERROR) << "Compressed block is too large: " << data.size();
+ return false;
+ }
+
+ compressed_data->emplace_back(std::move(data));
+ num_blocks -= 1;
+ iter += block_size_;
+ }
+ return true;
+}
+
+bool CompressWorker::RunThread() {
+ while (true) {
+ // Wait for work
+ CompressWork blocks;
+ {
+ std::unique_lock<std::mutex> lock(lock_);
+ while (work_queue_.empty() && !stopped_) {
+ cv_.wait(lock);
+ }
+
+ if (stopped_) {
+ return true;
+ }
+
+ blocks = std::move(work_queue_.front());
+ work_queue_.pop();
+ }
+
+ // Compress blocks
+ bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data);
+ blocks.compression_status = ret;
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ compressed_queue_.push(std::move(blocks));
+ }
+
+ // Notify completion
+ cv_.notify_all();
+
+ if (!ret) {
+ LOG(ERROR) << "CompressBlocks failed";
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) {
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+
+ CompressWork blocks = {};
+ blocks.buffer = buffer;
+ blocks.num_blocks = num_blocks;
+ work_queue_.push(std::move(blocks));
+ }
+ cv_.notify_all();
+}
+
+bool CompressWorker::GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf) {
+ {
+ std::unique_lock<std::mutex> lock(lock_);
+ while (compressed_queue_.empty() && !stopped_) {
+ cv_.wait(lock);
+ }
+
+ if (stopped_) {
+ return true;
+ }
+ }
+
+ {
+ std::lock_guard<std::mutex> lock(lock_);
+ while (compressed_queue_.size() > 0) {
+ CompressWork blocks = std::move(compressed_queue_.front());
+ compressed_queue_.pop();
+
+ if (blocks.compression_status) {
+ compressed_buf->insert(compressed_buf->end(),
+ std::make_move_iterator(blocks.compressed_data.begin()),
+ std::make_move_iterator(blocks.compressed_data.end()));
+ } else {
+ LOG(ERROR) << "Block compression failed";
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+void CompressWorker::Finalize() {
+ {
+ std::unique_lock<std::mutex> lock(lock_);
+ stopped_ = true;
+ }
+ cv_.notify_all();
+}
+
+CompressWorker::CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size)
+ : compression_(compression), block_size_(block_size) {}
+
} // namespace snapshot
} // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
index 43c17a6..ba6e7a0 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
@@ -22,6 +22,7 @@
#include <android-base/file.h>
#include <android-base/logging.h>
+#include <android-base/properties.h>
#include <android-base/unique_fd.h>
#include <brotli/encode.h>
#include <libsnapshot/cow_format.h>
@@ -132,6 +133,39 @@
CowWriter::CowWriter(const CowOptions& options) : ICowWriter(options), fd_(-1) {
SetupHeaders();
+ SetupWriteOptions();
+}
+
+CowWriter::~CowWriter() {
+ for (size_t i = 0; i < compress_threads_.size(); i++) {
+ CompressWorker* worker = compress_threads_[i].get();
+ if (worker) {
+ worker->Finalize();
+ }
+ }
+
+ bool ret = true;
+ for (auto& t : threads_) {
+ ret = t.get() && ret;
+ }
+
+ if (!ret) {
+ LOG(ERROR) << "Compression failed";
+ }
+ compress_threads_.clear();
+}
+
+void CowWriter::SetupWriteOptions() {
+ num_compress_threads_ = options_.num_compress_threads;
+
+ if (!num_compress_threads_) {
+ num_compress_threads_ = 1;
+ // We prefer not to have more than two threads as the overhead of additional
+ // threads is far greater than cutting down compression time.
+ if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) {
+ num_compress_threads_ = 2;
+ }
+ }
}
void CowWriter::SetupHeaders() {
@@ -206,6 +240,14 @@
return true;
}
+void CowWriter::InitWorkers() {
+ for (int i = 0; i < num_compress_threads_; i++) {
+ auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size);
+ threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
+ compress_threads_.push_back(std::move(wt));
+ }
+}
+
bool CowWriter::Initialize(unique_fd&& fd) {
owned_fd_ = std::move(fd);
return Initialize(borrowed_fd{owned_fd_});
@@ -216,7 +258,13 @@
return false;
}
- return OpenForWrite();
+ bool ret = OpenForWrite();
+
+ if (ret) {
+ InitWorkers();
+ }
+
+ return ret;
}
bool CowWriter::InitializeAppend(android::base::unique_fd&& fd, uint64_t label) {
@@ -229,7 +277,13 @@
return false;
}
- return OpenForAppend(label);
+ bool ret = OpenForAppend(label);
+
+ if (ret && !compress_threads_.size()) {
+ InitWorkers();
+ }
+
+ return ret;
}
void CowWriter::InitPos() {
@@ -348,47 +402,99 @@
return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp);
}
+bool CowWriter::CompressBlocks(size_t num_blocks, const void* data) {
+ size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
+ size_t num_blocks_per_thread = num_blocks / num_threads;
+ const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
+ compressed_buf_.clear();
+
+ // Submit the blocks per thread. The retrieval of
+ // compressed buffers has to be done in the same order.
+ // We should not poll for completed buffers in a different order as the
+ // buffers are tightly coupled with block ordering.
+ for (size_t i = 0; i < num_threads; i++) {
+ CompressWorker* worker = compress_threads_[i].get();
+ if (i == num_threads - 1) {
+ num_blocks_per_thread = num_blocks;
+ }
+ worker->EnqueueCompressBlocks(iter, num_blocks_per_thread);
+ iter += (num_blocks_per_thread * header_.block_size);
+ num_blocks -= num_blocks_per_thread;
+ }
+
+ for (size_t i = 0; i < num_threads; i++) {
+ CompressWorker* worker = compress_threads_[i].get();
+ if (!worker->GetCompressedBuffers(&compressed_buf_)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
uint64_t old_block, uint16_t offset, uint8_t type) {
- const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
CHECK(!merge_in_progress_);
- for (size_t i = 0; i < size / header_.block_size; i++) {
- CowOperation op = {};
- op.new_block = new_block_start + i;
- op.type = type;
- if (type == kCowXorOp) {
- op.source = (old_block + i) * header_.block_size + offset;
- } else {
- op.source = next_data_pos_;
- }
+ const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
+
+ // Update engine can potentially send 100MB of blocks at a time. We
+ // don't want to process all those blocks in one shot as it can
+ // stress the memory. Hence, process the blocks in chunks.
+ //
+ // 1024 blocks is reasonable given we will end up using max
+ // memory of ~4MB.
+ const size_t kProcessingBlocks = 1024;
+ size_t num_blocks = (size / header_.block_size);
+ size_t i = 0;
+
+ while (num_blocks) {
+ size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));
if (compression_) {
- auto data = Compress(iter, header_.block_size);
- if (data.empty()) {
- PLOG(ERROR) << "AddRawBlocks: compression failed";
+ if (!CompressBlocks(pending_blocks, iter)) {
return false;
}
- if (data.size() > std::numeric_limits<uint16_t>::max()) {
- LOG(ERROR) << "Compressed block is too large: " << data.size() << " bytes";
- return false;
- }
- op.compression = compression_;
- op.data_length = static_cast<uint16_t>(data.size());
-
- if (!WriteOperation(op, data.data(), data.size())) {
- PLOG(ERROR) << "AddRawBlocks: write failed, bytes requested: " << size
- << ", bytes written: " << i * header_.block_size;
- return false;
- }
- } else {
- op.data_length = static_cast<uint16_t>(header_.block_size);
- if (!WriteOperation(op, iter, header_.block_size)) {
- PLOG(ERROR) << "AddRawBlocks: write failed";
- return false;
- }
+ buf_iter_ = compressed_buf_.begin();
+ CHECK(pending_blocks == compressed_buf_.size());
+ iter += (pending_blocks * header_.block_size);
}
- iter += header_.block_size;
+ num_blocks -= pending_blocks;
+
+ while (i < size / header_.block_size && pending_blocks) {
+ CowOperation op = {};
+ op.new_block = new_block_start + i;
+ op.type = type;
+ if (type == kCowXorOp) {
+ op.source = (old_block + i) * header_.block_size + offset;
+ } else {
+ op.source = next_data_pos_;
+ }
+
+ if (compression_) {
+ auto data = std::move(*buf_iter_);
+ op.compression = compression_;
+ op.data_length = static_cast<uint16_t>(data.size());
+
+ if (!WriteOperation(op, data.data(), data.size())) {
+ PLOG(ERROR) << "AddRawBlocks: write failed";
+ return false;
+ }
+ buf_iter_++;
+ } else {
+ op.data_length = static_cast<uint16_t>(header_.block_size);
+ if (!WriteOperation(op, iter, header_.block_size)) {
+ PLOG(ERROR) << "AddRawBlocks: write failed";
+ return false;
+ }
+ iter += header_.block_size;
+ }
+
+ i += 1;
+ pending_blocks -= 1;
+ }
+
+ CHECK(pending_blocks == 0);
}
return true;
}