Merge changes from topic "cgroup.events2" into main

* changes:
  libprocessgroup: Use cgroup.kill
  Reapply "libprocessgroup: Poll on cgroup.events"
diff --git a/fastboot/fastboot.cpp b/fastboot/fastboot.cpp
index 1bc7b75..235d723 100644
--- a/fastboot/fastboot.cpp
+++ b/fastboot/fastboot.cpp
@@ -1675,7 +1675,7 @@
     }
     for (size_t i = 0; i < tasks->size(); i++) {
         if (auto flash_task = tasks->at(i)->AsFlashTask()) {
-            if (FlashTask::IsDynamicParitition(fp->source.get(), flash_task)) {
+            if (FlashTask::IsDynamicPartition(fp->source.get(), flash_task)) {
                 if (!loc) {
                     loc = i;
                 }
diff --git a/fastboot/task.cpp b/fastboot/task.cpp
index 0947ff9..ea78a01 100644
--- a/fastboot/task.cpp
+++ b/fastboot/task.cpp
@@ -30,7 +30,7 @@
                      const bool apply_vbmeta, const FlashingPlan* fp)
     : pname_(pname), fname_(fname), slot_(slot), apply_vbmeta_(apply_vbmeta), fp_(fp) {}
 
-bool FlashTask::IsDynamicParitition(const ImageSource* source, const FlashTask* task) {
+bool FlashTask::IsDynamicPartition(const ImageSource* source, const FlashTask* task) {
     std::vector<char> contents;
     if (!source->ReadFile("super_empty.img", &contents)) {
         return false;
@@ -152,7 +152,7 @@
             continue;
         }
         auto flash_task = tasks[i + 2]->AsFlashTask();
-        if (!FlashTask::IsDynamicParitition(source, flash_task)) {
+        if (!FlashTask::IsDynamicPartition(source, flash_task)) {
             continue;
         }
         return true;
diff --git a/fastboot/task.h b/fastboot/task.h
index a98c874..7a713cf 100644
--- a/fastboot/task.h
+++ b/fastboot/task.h
@@ -52,7 +52,7 @@
               const bool apply_vbmeta, const FlashingPlan* fp);
     virtual FlashTask* AsFlashTask() override { return this; }
 
-    static bool IsDynamicParitition(const ImageSource* source, const FlashTask* task);
+    static bool IsDynamicPartition(const ImageSource* source, const FlashTask* task);
     void Run() override;
     std::string ToString() const override;
     std::string GetPartition() const { return pname_; }
diff --git a/fastboot/task_test.cpp b/fastboot/task_test.cpp
index 81154c6..519d4ed 100644
--- a/fastboot/task_test.cpp
+++ b/fastboot/task_test.cpp
@@ -233,7 +233,7 @@
             << "size of fastboot-info task list: " << fastboot_info_tasks.size()
             << " size of hardcoded task list: " << hardcoded_tasks.size();
 }
-TEST_F(ParseTest, IsDynamicParitiontest) {
+TEST_F(ParseTest, IsDynamicPartitiontest) {
     if (!get_android_product_out()) {
         GTEST_SKIP();
     }
@@ -258,7 +258,7 @@
                 ParseFastbootInfoLine(fp.get(), android::base::Tokenize(test.first, " "));
         auto flash_task = task->AsFlashTask();
         ASSERT_FALSE(flash_task == nullptr);
-        ASSERT_EQ(FlashTask::IsDynamicParitition(fp->source.get(), flash_task), test.second);
+        ASSERT_EQ(FlashTask::IsDynamicPartition(fp->source.get(), flash_task), test.second);
     }
 }
 
@@ -358,7 +358,7 @@
                     contains_optimized_task = true;
                 }
                 if (auto flash_task = task->AsFlashTask()) {
-                    if (FlashTask::IsDynamicParitition(fp->source.get(), flash_task)) {
+                    if (FlashTask::IsDynamicPartition(fp->source.get(), flash_task)) {
                         return false;
                     }
                 }
diff --git a/fs_mgr/fs_mgr_overlayfs_control.cpp b/fs_mgr/fs_mgr_overlayfs_control.cpp
index 06214ef..08ad80c 100644
--- a/fs_mgr/fs_mgr_overlayfs_control.cpp
+++ b/fs_mgr/fs_mgr_overlayfs_control.cpp
@@ -219,6 +219,35 @@
     return OverlayfsTeardownResult::Ok;
 }
 
+bool GetOverlaysActiveFlag() {
+    auto slot_number = fs_mgr_overlayfs_slot_number();
+    const auto super_device = kPhysicalDevice + fs_mgr_get_super_partition_name();
+
+    auto metadata = ReadMetadata(super_device, slot_number);
+    if (!metadata) {
+        return false;
+    }
+    return !!(metadata->header.flags & LP_HEADER_FLAG_OVERLAYS_ACTIVE);
+}
+
+bool SetOverlaysActiveFlag(bool flag) {
+    // Mark overlays as active in the partition table, to detect re-flash.
+    auto slot_number = fs_mgr_overlayfs_slot_number();
+    const auto super_device = kPhysicalDevice + fs_mgr_get_super_partition_name();
+    auto builder = MetadataBuilder::New(super_device, slot_number);
+    if (!builder) {
+        LERROR << "open " << super_device << " metadata";
+        return false;
+    }
+    builder->SetOverlaysActiveFlag(flag);
+    auto metadata = builder->Export();
+    if (!metadata || !UpdatePartitionTable(super_device, *metadata.get(), slot_number)) {
+        LERROR << "update super metadata";
+        return false;
+    }
+    return true;
+}
+
 OverlayfsTeardownResult fs_mgr_overlayfs_teardown_scratch(const std::string& overlay,
                                                           bool* change) {
     // umount and delete kScratchMountPoint storage if we have logical partitions
@@ -232,6 +261,10 @@
         return OverlayfsTeardownResult::Error;
     }
 
+    // Note: we don't care if SetOverlaysActiveFlag fails, since
+    // the overlays are removed no matter what.
+    SetOverlaysActiveFlag(false);
+
     bool was_mounted = fs_mgr_overlayfs_already_mounted(kScratchMountPoint, false);
     if (was_mounted) {
         fs_mgr_overlayfs_umount_scratch();
@@ -448,6 +481,7 @@
             }
         }
     }
+
     // land the update back on to the partition
     if (changed) {
         auto metadata = builder->Export();
@@ -592,6 +626,12 @@
         return false;
     }
 
+    if (!SetOverlaysActiveFlag(true)) {
+        LOG(ERROR) << "Failed to update dynamic partition data";
+        fs_mgr_overlayfs_teardown_scratch(kScratchMountPoint, nullptr);
+        return false;
+    }
+
     // If the partition exists, assume first that it can be mounted.
     if (partition_exists) {
         if (MountScratch(scratch_device)) {
@@ -856,6 +896,9 @@
         return;
     }
 
+    if (!GetOverlaysActiveFlag()) {
+        return;
+    }
     if (ScratchIsOnData()) {
         if (auto images = IImageManager::Open("remount", 0ms)) {
             images->MapAllImages(init);
@@ -879,6 +922,9 @@
     }
     if (auto images = IImageManager::Open("remount", 0ms)) {
         images->RemoveDisabledImages();
+        if (!GetOverlaysActiveFlag()) {
+            fs_mgr_overlayfs_teardown_scratch(kScratchMountPoint, nullptr);
+        }
     }
 }
 
diff --git a/fs_mgr/liblp/builder.cpp b/fs_mgr/liblp/builder.cpp
index 6cb2c51..4e6e97b 100644
--- a/fs_mgr/liblp/builder.cpp
+++ b/fs_mgr/liblp/builder.cpp
@@ -1211,6 +1211,15 @@
     header_.flags |= LP_HEADER_FLAG_VIRTUAL_AB_DEVICE;
 }
 
+void MetadataBuilder::SetOverlaysActiveFlag(bool flag) {
+    RequireExpandedMetadataHeader();
+    if (flag) {
+        header_.flags |= LP_HEADER_FLAG_OVERLAYS_ACTIVE;
+    } else {
+        header_.flags &= ~LP_HEADER_FLAG_OVERLAYS_ACTIVE;
+    }
+}
+
 bool MetadataBuilder::IsABDevice() {
     return !IPropertyFetcher::GetInstance()->GetProperty("ro.boot.slot_suffix", "").empty();
 }
diff --git a/fs_mgr/liblp/include/liblp/builder.h b/fs_mgr/liblp/include/liblp/builder.h
index 54f31bc..957b96b 100644
--- a/fs_mgr/liblp/include/liblp/builder.h
+++ b/fs_mgr/liblp/include/liblp/builder.h
@@ -346,6 +346,8 @@
     void SetAutoSlotSuffixing();
     // Set the LP_HEADER_FLAG_VIRTUAL_AB_DEVICE flag.
     void SetVirtualABDeviceFlag();
+    // Set or unset the LP_HEADER_FLAG_OVERLAYS_ACTIVE flag.
+    void SetOverlaysActiveFlag(bool flag);
 
     bool GetBlockDeviceInfo(const std::string& partition_name, BlockDeviceInfo* info) const;
     bool UpdateBlockDeviceInfo(const std::string& partition_name, const BlockDeviceInfo& info);
diff --git a/fs_mgr/liblp/include/liblp/metadata_format.h b/fs_mgr/liblp/include/liblp/metadata_format.h
index 41d8b0c..8d77097 100644
--- a/fs_mgr/liblp/include/liblp/metadata_format.h
+++ b/fs_mgr/liblp/include/liblp/metadata_format.h
@@ -240,6 +240,9 @@
  */
 #define LP_HEADER_FLAG_VIRTUAL_AB_DEVICE 0x1
 
+/* This device has overlays activated via "adb remount". */
+#define LP_HEADER_FLAG_OVERLAYS_ACTIVE 0x2
+
 /* This struct defines a logical partition entry, similar to what would be
  * present in a GUID Partition Table.
  */
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
index 4df0e76..de097f5 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
@@ -38,6 +38,7 @@
 #include <linux/fs.h>
 #include <sys/ioctl.h>
 #include <unistd.h>
+#include <numeric>
 
 // The info messages here are spammy, but as useful for update_engine. Disable
 // them when running on the host.
@@ -55,7 +56,7 @@
 using android::base::unique_fd;
 
 CowWriterV3::CowWriterV3(const CowOptions& options, unique_fd&& fd)
-    : CowWriterBase(options, std::move(fd)) {
+    : CowWriterBase(options, std::move(fd)), batch_size_(std::max<size_t>(options.cluster_ops, 1)) {
     SetupHeaders();
 }
 
@@ -70,6 +71,9 @@
     header_.block_size = options_.block_size;
     header_.num_merge_ops = options_.num_merge_ops;
     header_.cluster_ops = 0;
+    if (batch_size_ > 1) {
+        LOG(INFO) << "Batch writes enabled with batch size of " << batch_size_;
+    }
     if (options_.scratch_space) {
         header_.buffer_size = BUFFER_REGION_DEFAULT_SIZE;
     }
@@ -258,40 +262,49 @@
             }
             op.data_length = header_.block_size;
         }
-        return WriteOperation({ops.data(), ops.size()},
-                              {reinterpret_cast<const uint8_t*>(data), size});
+        return WriteOperation({ops.data(), ops.size()}, data, size);
     }
 
-    const auto saved_op_count = header_.op_count;
-    const auto saved_data_pos = next_data_pos_;
-    for (size_t i = 0; i < num_blocks; i++) {
-        const uint8_t* const iter =
-                reinterpret_cast<const uint8_t*>(data) + (header_.block_size * i);
+    for (size_t i = 0; i < num_blocks; i += batch_size_) {
+        const auto blocks_to_write = std::min<size_t>(batch_size_, num_blocks - i);
+        std::vector<std::basic_string<uint8_t>> compressed_blocks(blocks_to_write);
+        std::vector<CowOperationV3> ops(blocks_to_write);
+        std::vector<struct iovec> vec(blocks_to_write);
+        size_t compressed_bytes = 0;
+        for (size_t j = 0; j < blocks_to_write; j++) {
+            const uint8_t* const iter =
+                    reinterpret_cast<const uint8_t*>(data) + (header_.block_size * (i + j));
 
-        CowOperation op{};
-        op.new_block = new_block_start + i;
+            CowOperation& op = ops[j];
+            op.new_block = new_block_start + i + j;
 
-        op.set_type(type);
-        if (type == kCowXorOp) {
-            op.set_source((old_block + i) * header_.block_size + offset);
-        } else {
-            op.set_source(next_data_pos_);
+            op.set_type(type);
+            if (type == kCowXorOp) {
+                op.set_source((old_block + i + j) * header_.block_size + offset);
+            } else {
+                op.set_source(next_data_pos_ + compressed_bytes);
+            }
+
+            std::basic_string<uint8_t> compressed_data =
+                    compressor_->Compress(iter, header_.block_size);
+            if (compressed_data.empty()) {
+                LOG(ERROR) << "Compression failed during EmitBlocks(" << new_block_start << ", "
+                           << num_blocks << ");";
+                return false;
+            }
+            if (compressed_data.size() >= header_.block_size) {
+                compressed_data.resize(header_.block_size);
+                std::memcpy(compressed_data.data(), iter, header_.block_size);
+            }
+            compressed_blocks[j] = std::move(compressed_data);
+            vec[j] = {.iov_base = compressed_blocks[j].data(),
+                      .iov_len = compressed_blocks[j].size()};
+            op.data_length = vec[j].iov_len;
+            compressed_bytes += op.data_length;
         }
-        const void* out_data = iter;
-
-        op.data_length = header_.block_size;
-
-        const std::basic_string<uint8_t> compressed_data =
-                compressor_->Compress(out_data, header_.block_size);
-        if (compressed_data.size() < op.data_length) {
-            out_data = compressed_data.data();
-            op.data_length = compressed_data.size();
-        }
-        if (!WriteOperation(op, out_data, op.data_length)) {
+        if (!WriteOperation({ops.data(), ops.size()}, {vec.data(), vec.size()})) {
             PLOG(ERROR) << "AddRawBlocks with compression: write failed. new block: "
                         << new_block_start << " compression: " << compression_.algorithm;
-            header_.op_count = saved_op_count;
-            next_data_pos_ = saved_data_pos;
             return false;
         }
     }
@@ -299,14 +312,14 @@
     return true;
 }
 
-bool CowWriterV3::EmitZeroBlocks(uint64_t new_block_start, uint64_t num_blocks) {
+bool CowWriterV3::EmitZeroBlocks(uint64_t new_block_start, const uint64_t num_blocks) {
     std::vector<CowOperationV3> ops(num_blocks);
-    for (uint64_t i = 0; i < num_blocks; i++) {
-        CowOperationV3& op = ops[i];
+    for (uint64_t i = 0; i < ops.size(); i++) {
+        auto& op = ops[i];
         op.set_type(kCowZeroOp);
         op.new_block = new_block_start + i;
     }
-    if (!WriteOperation({ops.data(), ops.size()}, {})) {
+    if (!WriteOperation({ops.data(), ops.size()})) {
         return false;
     }
     return true;
@@ -353,8 +366,23 @@
     return true;
 }
 
+bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> op, const void* data,
+                                 size_t size) {
+    struct iovec vec {
+        .iov_len = size
+    };
+    // Dear C++ god, this is effectively a const_cast. I had to do this because
+    // pwritev()'s struct iovec requires a non-const pointer. The input data
+    // will not be modified, as the iovec is only used for a write operation.
+    std::memcpy(&vec.iov_base, &data, sizeof(data));
+    return WriteOperation(op, {&vec, 1});
+}
+
 bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
-                                 std::basic_string_view<uint8_t> data) {
+                                 std::basic_string_view<struct iovec> data) {
+    const auto total_data_size =
+            std::transform_reduce(data.begin(), data.end(), 0, std::plus<size_t>{},
+                                  [](const struct iovec& a) { return a.iov_len; });
     if (IsEstimating()) {
         header_.op_count += ops.size();
         if (header_.op_count > header_.op_count_max) {
@@ -363,7 +391,7 @@
             next_data_pos_ += (header_.op_count - header_.op_count_max) * sizeof(CowOperationV3);
             header_.op_count_max = header_.op_count;
         }
-        next_data_pos_ += data.size();
+        next_data_pos_ += total_data_size;
         return true;
     }
 
@@ -379,20 +407,21 @@
         return false;
     }
     if (!data.empty()) {
-        if (!android::base::WriteFullyAtOffset(fd_, data.data(), data.size(), next_data_pos_)) {
+        const auto ret = pwritev(fd_, data.data(), data.size(), next_data_pos_);
+        if (ret != total_data_size) {
             PLOG(ERROR) << "write failed for data of size: " << data.size()
-                        << " at offset: " << next_data_pos_;
+                        << " at offset: " << next_data_pos_ << " " << ret;
             return false;
         }
     }
     header_.op_count += ops.size();
-    next_data_pos_ += data.size();
+    next_data_pos_ += total_data_size;
 
     return true;
 }
 
 bool CowWriterV3::WriteOperation(const CowOperationV3& op, const void* data, size_t size) {
-    return WriteOperation({&op, 1}, {reinterpret_cast<const uint8_t*>(data), size});
+    return WriteOperation({&op, 1}, data, size);
 }
 
 bool CowWriterV3::Finalize() {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
index 02b4e61..93f1d24 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
@@ -46,7 +46,9 @@
     bool OpenForWrite();
     bool OpenForAppend(uint64_t label);
     bool WriteOperation(std::basic_string_view<CowOperationV3> op,
-                        std::basic_string_view<uint8_t> data);
+                        std::basic_string_view<struct iovec> data);
+    bool WriteOperation(std::basic_string_view<CowOperationV3> op, const void* data = nullptr,
+                        size_t size = 0);
     bool WriteOperation(const CowOperationV3& op, const void* data = nullptr, size_t size = 0);
     bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
                     uint16_t offset, CowOperationType type);
@@ -68,6 +70,7 @@
     // in the case that we are using one thread for compression, we can store and re-use the same
     // compressor
     int num_compress_threads_ = 1;
+    size_t batch_size_ = 0;
 };
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapshot.cpp b/fs_mgr/libsnapshot/snapshot.cpp
index f6a35a8..e33bdff 100644
--- a/fs_mgr/libsnapshot/snapshot.cpp
+++ b/fs_mgr/libsnapshot/snapshot.cpp
@@ -3330,7 +3330,7 @@
         // Terminate stale daemon if any
         std::unique_ptr<SnapuserdClient> snapuserd_client = std::move(snapuserd_client_);
         if (!snapuserd_client) {
-            snapuserd_client = SnapuserdClient::Connect(kSnapuserdSocket, 5s);
+            snapuserd_client = SnapuserdClient::TryConnect(kSnapuserdSocket, 5s);
         }
         if (snapuserd_client) {
             snapuserd_client->DetachSnapuserd();
@@ -3661,7 +3661,7 @@
     cow_options.compression = status.compression_algorithm();
     cow_options.max_blocks = {status.device_size() / cow_options.block_size};
     cow_options.batch_write = status.batched_writes();
-    cow_options.num_compress_threads = status.enable_threading() ? 2 : 0;
+    cow_options.num_compress_threads = status.enable_threading() ? 2 : 1;
     // TODO(b/313962438) Improve op_count estimate. For now, use number of
     // blocks as an upper bound.
     cow_options.op_count_max = status.device_size() / cow_options.block_size;
diff --git a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h
index 010beb3..ede92dd 100644
--- a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h
+++ b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h
@@ -17,11 +17,7 @@
 #include <unistd.h>
 
 #include <chrono>
-#include <cstring>
-#include <iostream>
 #include <string>
-#include <thread>
-#include <vector>
 
 #include <android-base/unique_fd.h>
 
@@ -53,9 +49,14 @@
     explicit SnapuserdClient(android::base::unique_fd&& sockfd);
     SnapuserdClient(){};
 
+    // Attempt to connect to snapsuerd, wait for the daemon to start if
+    // connection failed.
     static std::unique_ptr<SnapuserdClient> Connect(const std::string& socket_name,
                                                     std::chrono::milliseconds timeout_ms);
-
+    // Attempt to connect to snapsuerd, but does not wait for the daemon to
+    // start.
+    static std::unique_ptr<SnapuserdClient> TryConnect(const std::string& socket_name,
+                                                       std::chrono::milliseconds timeout_ms);
     bool StopSnapuserd();
 
     // Initializing a snapuserd handler is a three-step process:
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp
index 3bed3a4..789c980 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp
@@ -27,7 +27,7 @@
 #include <unistd.h>
 
 #include <chrono>
-#include <sstream>
+#include <thread>
 
 #include <android-base/file.h>
 #include <android-base/logging.h>
@@ -64,6 +64,40 @@
     return errno == ECONNREFUSED || errno == EINTR || errno == ENOENT;
 }
 
+std::unique_ptr<SnapuserdClient> SnapuserdClient::TryConnect(const std::string& socket_name,
+                                                             std::chrono::milliseconds timeout_ms) {
+    unique_fd fd;
+    const auto start = std::chrono::steady_clock::now();
+    while (true) {
+        fd.reset(TEMP_FAILURE_RETRY(socket_local_client(
+                socket_name.c_str(), ANDROID_SOCKET_NAMESPACE_RESERVED, SOCK_STREAM)));
+        if (fd >= 0) {
+            auto client = std::make_unique<SnapuserdClient>(std::move(fd));
+            if (!client->ValidateConnection()) {
+                return nullptr;
+            }
+            return client;
+        }
+        if (errno == ENOENT) {
+            LOG(INFO) << "Daemon socket " << socket_name
+                      << " does not exist, return without waiting.";
+            return nullptr;
+        }
+        if (errno == ECONNREFUSED) {
+            const auto now = std::chrono::steady_clock::now();
+            const auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
+            if (elapsed >= timeout_ms) {
+                LOG(ERROR) << "Timed out connecting to snapuserd socket: " << socket_name;
+                return nullptr;
+            }
+            std::this_thread::sleep_for(10ms);
+        } else {
+            PLOG(ERROR) << "connect failed: " << socket_name;
+            return nullptr;
+        }
+    }
+}
+
 std::unique_ptr<SnapuserdClient> SnapuserdClient::Connect(const std::string& socket_name,
                                                           std::chrono::milliseconds timeout_ms) {
     unique_fd fd;
diff --git a/init/first_stage_init.cpp b/init/first_stage_init.cpp
index e48fa15..c4d0f75 100644
--- a/init/first_stage_init.cpp
+++ b/init/first_stage_init.cpp
@@ -30,6 +30,7 @@
 #include <chrono>
 #include <filesystem>
 #include <string>
+#include <thread>
 #include <vector>
 
 #include <android-base/chrono_utils.h>
diff --git a/init/snapuserd_transition.cpp b/init/snapuserd_transition.cpp
index 3a9ff5b..3a78343 100644
--- a/init/snapuserd_transition.cpp
+++ b/init/snapuserd_transition.cpp
@@ -25,6 +25,7 @@
 #include <filesystem>
 #include <string>
 #include <string_view>
+#include <thread>
 
 #include <android-base/file.h>
 #include <android-base/logging.h>