Merge "Do not create worker thread if threading isn't enabled"
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
index 798bc73..c7b83a8 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
@@ -120,6 +120,12 @@
     void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
     bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
     void Finalize();
+    static std::basic_string<uint8_t> Compress(CowCompressionAlgorithm compression,
+                                               const void* data, size_t length);
+
+    static bool CompressBlocks(CowCompressionAlgorithm compression, size_t block_size,
+                               const void* buffer, size_t num_blocks,
+                               std::vector<std::basic_string<uint8_t>>* compressed_data);
 
   private:
     struct CompressWork {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
index 4d9b748..9b50986 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
@@ -32,9 +32,13 @@
 
 namespace android {
 namespace snapshot {
-
 std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
-    switch (compression_) {
+    return Compress(compression_, data, length);
+}
+
+std::basic_string<uint8_t> CompressWorker::Compress(CowCompressionAlgorithm compression,
+                                                    const void* data, size_t length) {
+    switch (compression) {
         case kCowCompressGz: {
             const auto bound = compressBound(length);
             std::basic_string<uint8_t> buffer(bound, '\0');
@@ -94,17 +98,22 @@
             return buffer;
         }
         default:
-            LOG(ERROR) << "unhandled compression type: " << compression_;
+            LOG(ERROR) << "unhandled compression type: " << compression;
             break;
     }
     return {};
 }
-
 bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
                                     std::vector<std::basic_string<uint8_t>>* compressed_data) {
+    return CompressBlocks(compression_, block_size_, buffer, num_blocks, compressed_data);
+}
+
+bool CompressWorker::CompressBlocks(CowCompressionAlgorithm compression, size_t block_size,
+                                    const void* buffer, size_t num_blocks,
+                                    std::vector<std::basic_string<uint8_t>>* compressed_data) {
     const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
     while (num_blocks) {
-        auto data = Compress(iter, block_size_);
+        auto data = Compress(compression, iter, block_size);
         if (data.empty()) {
             PLOG(ERROR) << "CompressBlocks: Compression failed";
             return false;
@@ -116,7 +125,7 @@
 
         compressed_data->emplace_back(std::move(data));
         num_blocks -= 1;
-        iter += block_size_;
+        iter += block_size;
     }
     return true;
 }
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
index 2d5e4bc..3932fad 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
@@ -275,6 +275,10 @@
 }
 
 void CowWriter::InitWorkers() {
+    if (num_compress_threads_ <= 1) {
+        LOG(INFO) << "Not creating new threads for compression.";
+        return;
+    }
     for (int i = 0; i < num_compress_threads_; i++) {
         auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size);
         threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
@@ -447,6 +451,10 @@
     size_t num_blocks_per_thread = num_blocks / num_threads;
     const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
     compressed_buf_.clear();
+    if (num_threads <= 1) {
+        return CompressWorker::CompressBlocks(compression_, options_.block_size, data, num_blocks,
+                                              &compressed_buf_);
+    }
 
     // Submit the blocks per thread. The retrieval of
     // compressed buffers has to be done in the same order.
@@ -490,13 +498,12 @@
     while (num_blocks) {
         size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));
 
-        if (compression_) {
+        if (compression_ && num_compress_threads_ > 1) {
             if (!CompressBlocks(pending_blocks, iter)) {
                 return false;
             }
             buf_iter_ = compressed_buf_.begin();
             CHECK(pending_blocks == compressed_buf_.size());
-            iter += (pending_blocks * header_.block_size);
         }
 
         num_blocks -= pending_blocks;
@@ -512,7 +519,17 @@
             }
 
             if (compression_) {
-                auto data = std::move(*buf_iter_);
+                auto data = [&, this]() {
+                    if (num_compress_threads_ > 1) {
+                        auto data = std::move(*buf_iter_);
+                        buf_iter_++;
+                        return data;
+                    } else {
+                        auto data =
+                                CompressWorker::Compress(compression_, iter, header_.block_size);
+                        return data;
+                    }
+                }();
                 op.compression = compression_;
                 op.data_length = static_cast<uint16_t>(data.size());
 
@@ -520,15 +537,14 @@
                     PLOG(ERROR) << "AddRawBlocks: write failed";
                     return false;
                 }
-                buf_iter_++;
             } else {
                 op.data_length = static_cast<uint16_t>(header_.block_size);
                 if (!WriteOperation(op, iter, header_.block_size)) {
                     PLOG(ERROR) << "AddRawBlocks: write failed";
                     return false;
                 }
-                iter += header_.block_size;
             }
+            iter += header_.block_size;
 
             i += 1;
             pending_blocks -= 1;