Do not create worker thread if threading isn't enabled

Currently, we would create worker threads even if number of compression
thread is set to 1. This works, but having context switches and data
exchange between two threads is complete overhead if main thread is just
blocking on the worker thread.

Test: th
Change-Id: I02f98ee1e0c4889dc1ae602eb06667b05796d3f0
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
index 798bc73..c7b83a8 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
@@ -120,6 +120,12 @@
     void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
     bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
     void Finalize();
+    static std::basic_string<uint8_t> Compress(CowCompressionAlgorithm compression,
+                                               const void* data, size_t length);
+
+    static bool CompressBlocks(CowCompressionAlgorithm compression, size_t block_size,
+                               const void* buffer, size_t num_blocks,
+                               std::vector<std::basic_string<uint8_t>>* compressed_data);
 
   private:
     struct CompressWork {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
index 4d9b748..9b50986 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
@@ -32,9 +32,13 @@
 
 namespace android {
 namespace snapshot {
-
 std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
-    switch (compression_) {
+    return Compress(compression_, data, length);
+}
+
+std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm compression,
+                                                    const void* data, size_t length) {
+    switch (compression) {
         case kCowCompressGz: {
             const auto bound = compressBound(length);
             std::basic_string<uint8_t> buffer(bound, '\0');
@@ -94,17 +98,22 @@
             return buffer;
         }
         default:
-            LOG(ERROR) << "unhandled compression type: " << compression_;
+            LOG(ERROR) << "unhandled compression type: " << compression;
             break;
     }
     return {};
 }
-
 bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
                                     std::vector<std::basic_string<uint8_t>>* compressed_data) {
+    return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data);
+}
+
+bool CompressWorker::CompressBlocks(CowCompressionAlgorithm compression, size_t block_size,
+                                    const void* buffer, size_t num_blocks,
+                                    std::vector<std::basic_string<uint8_t>>* compressed_data) {
     const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
     while (num_blocks) {
-        auto data = Compress(iter, block_size_);
+        auto data = Compress(compression, iter, block_size);
         if (data.empty()) {
             PLOG(ERROR) << "CompressBlocks: Compression failed";
             return false;
@@ -116,7 +125,7 @@
 
         compressed_data->emplace_back(std::move(data));
         num_blocks -= 1;
-        iter += block_size_;
+        iter += block_size;
     }
     return true;
 }
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
index 2d5e4bc..3932fad 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
@@ -275,6 +275,10 @@
 }
 
 void CowWriter::InitWorkers() {
+    if (num_compress_threads_ <= 1) {
+        LOG(INFO) << "Not creating new threads for compression.";
+        return;
+    }
     for (int i = 0; i < num_compress_threads_; i++) {
         auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size);
         threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
@@ -447,6 +451,10 @@
     size_t num_blocks_per_thread = num_blocks / num_threads;
     const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
     compressed_buf_.clear();
+    if (num_threads <= 1) {
+        return CompressWorker::CompressBlocks(compression_, options_.block_size, data, num_blocks,
+                                              &compressed_buf_);
+    }
 
     // Submit the blocks per thread. The retrieval of
     // compressed buffers has to be done in the same order.
@@ -490,13 +498,12 @@
     while (num_blocks) {
         size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));
 
-        if (compression_) {
+        if (compression_ && num_compress_threads_ > 1) {
             if (!CompressBlocks(pending_blocks, iter)) {
                 return false;
             }
             buf_iter_ = compressed_buf_.begin();
             CHECK(pending_blocks == compressed_buf_.size());
-            iter += (pending_blocks * header_.block_size);
         }
 
         num_blocks -= pending_blocks;
@@ -512,7 +519,17 @@
             }
 
             if (compression_) {
-                auto data = std::move(*buf_iter_);
+                auto data = [&, this]() {
+                    if (num_compress_threads_ > 1) {
+                        auto data = std::move(*buf_iter_);
+                        buf_iter_++;
+                        return data;
+                    } else {
+                        auto data =
+                                CompressWorker::Compress(compression_, iter, header_.block_size);
+                        return data;
+                    }
+                }();
                 op.compression = compression_;
                 op.data_length = static_cast<uint16_t>(data.size());
 
@@ -520,15 +537,14 @@
                     PLOG(ERROR) << "AddRawBlocks: write failed";
                     return false;
                 }
-                buf_iter_++;
             } else {
                 op.data_length = static_cast<uint16_t>(header_.block_size);
                 if (!WriteOperation(op, iter, header_.block_size)) {
                     PLOG(ERROR) << "AddRawBlocks: write failed";
                     return false;
                 }
-                iter += header_.block_size;
             }
+            iter += header_.block_size;
 
             i += 1;
             pending_blocks -= 1;