Merge "Do not create worker thread if threading isn't enabled"
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
index 798bc73..c7b83a8 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
@@ -120,6 +120,12 @@
void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
void Finalize();
+ static std::basic_string<uint8_t> Compress(CowCompressionAlgorithm compression,
+ const void* data, size_t length);
+
+ static bool CompressBlocks(CowCompressionAlgorithm compression, size_t block_size,
+ const void* buffer, size_t num_blocks,
+ std::vector<std::basic_string<uint8_t>>* compressed_data);
private:
struct CompressWork {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
index 4d9b748..9b50986 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
@@ -32,9 +32,13 @@
namespace android {
namespace snapshot {
-
std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
- switch (compression_) {
+ return Compress(compression_, data, length);
+}
+
+std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm compression,
+ const void* data, size_t length) {
+ switch (compression) {
case kCowCompressGz: {
const auto bound = compressBound(length);
std::basic_string<uint8_t> buffer(bound, '\0');
@@ -94,17 +98,22 @@
return buffer;
}
default:
- LOG(ERROR) << "unhandled compression type: " << compression_;
+ LOG(ERROR) << "unhandled compression type: " << compression;
break;
}
return {};
}
-
bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
std::vector<std::basic_string<uint8_t>>* compressed_data) {
+ return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data);
+}
+
+bool CompressWorker::CompressBlocks(CowCompressionAlgorithm compression, size_t block_size,
+ const void* buffer, size_t num_blocks,
+ std::vector<std::basic_string<uint8_t>>* compressed_data) {
const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
while (num_blocks) {
- auto data = Compress(iter, block_size_);
+ auto data = Compress(compression, iter, block_size);
if (data.empty()) {
PLOG(ERROR) << "CompressBlocks: Compression failed";
return false;
@@ -116,7 +125,7 @@
compressed_data->emplace_back(std::move(data));
num_blocks -= 1;
- iter += block_size_;
+ iter += block_size;
}
return true;
}
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
index 2d5e4bc..3932fad 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
@@ -275,6 +275,10 @@
}
void CowWriter::InitWorkers() {
+ if (num_compress_threads_ <= 1) {
+ LOG(INFO) << "Not creating new threads for compression.";
+ return;
+ }
for (int i = 0; i < num_compress_threads_; i++) {
auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size);
threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
@@ -447,6 +451,10 @@
size_t num_blocks_per_thread = num_blocks / num_threads;
const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
compressed_buf_.clear();
+ if (num_threads <= 1) {
+ return CompressWorker::CompressBlocks(compression_, options_.block_size, data, num_blocks,
+ &compressed_buf_);
+ }
// Submit the blocks per thread. The retrieval of
// compressed buffers has to be done in the same order.
@@ -490,13 +498,12 @@
while (num_blocks) {
size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));
- if (compression_) {
+ if (compression_ && num_compress_threads_ > 1) {
if (!CompressBlocks(pending_blocks, iter)) {
return false;
}
buf_iter_ = compressed_buf_.begin();
CHECK(pending_blocks == compressed_buf_.size());
- iter += (pending_blocks * header_.block_size);
}
num_blocks -= pending_blocks;
@@ -512,7 +519,17 @@
}
if (compression_) {
- auto data = std::move(*buf_iter_);
+ auto data = [&, this]() {
+ if (num_compress_threads_ > 1) {
+ auto data = std::move(*buf_iter_);
+ buf_iter_++;
+ return data;
+ } else {
+ auto data =
+ CompressWorker::Compress(compression_, iter, header_.block_size);
+ return data;
+ }
+ }();
op.compression = compression_;
op.data_length = static_cast<uint16_t>(data.size());
@@ -520,15 +537,14 @@
PLOG(ERROR) << "AddRawBlocks: write failed";
return false;
}
- buf_iter_++;
} else {
op.data_length = static_cast<uint16_t>(header_.block_size);
if (!WriteOperation(op, iter, header_.block_size)) {
PLOG(ERROR) << "AddRawBlocks: write failed";
return false;
}
- iter += header_.block_size;
}
+ iter += header_.block_size;
i += 1;
pending_blocks -= 1;