[adb] Optimize adbd's usb reading

Try to not allocate as many blocks on the heap, and reuse
memory instead of copying it

Get rid of unique_ptr and shared_ptr where possible, move
the Block objects themselves

Overall this reduces the time spent in memcpy() from 30% to
15% of the whole 'adb push' command, and gets rid of about 5%
of the time spent in the malloc/free calls

Test: builds
Change-Id: I8995115274b6f08a4df13c58183c928ef384a767
diff --git a/adb/daemon/usb.cpp b/adb/daemon/usb.cpp
index 1abae87..a9ad805 100644
--- a/adb/daemon/usb.cpp
+++ b/adb/daemon/usb.cpp
@@ -117,14 +117,18 @@
     }
 };
 
+template <class Payload>
 struct IoBlock {
     bool pending = false;
     struct iocb control = {};
-    std::shared_ptr<Block> payload;
+    Payload payload;
 
     TransferId id() const { return TransferId::from_value(control.aio_data); }
 };
 
+using IoReadBlock = IoBlock<Block>;
+using IoWriteBlock = IoBlock<std::shared_ptr<Block>>;
+
 struct ScopedAioContext {
     ScopedAioContext() = default;
     ~ScopedAioContext() { reset(); }
@@ -208,16 +212,17 @@
 
     virtual bool Write(std::unique_ptr<apacket> packet) override final {
         LOG(DEBUG) << "USB write: " << dump_header(&packet->msg);
-        Block header(sizeof(packet->msg));
-        memcpy(header.data(), &packet->msg, sizeof(packet->msg));
+        auto header = std::make_shared<Block>(sizeof(packet->msg));
+        memcpy(header->data(), &packet->msg, sizeof(packet->msg));
 
         std::lock_guard<std::mutex> lock(write_mutex_);
-        write_requests_.push_back(CreateWriteBlock(std::move(header), next_write_id_++));
+        write_requests_.push_back(
+                CreateWriteBlock(std::move(header), 0, sizeof(packet->msg), next_write_id_++));
         if (!packet->payload.empty()) {
             // The kernel attempts to allocate a contiguous block of memory for each write,
             // which can fail if the write is large and the kernel heap is fragmented.
             // Split large writes into smaller chunks to avoid this.
-            std::shared_ptr<Block> payload = std::make_shared<Block>(std::move(packet->payload));
+            auto payload = std::make_shared<Block>(std::move(packet->payload));
             size_t offset = 0;
             size_t len = payload->size();
 
@@ -464,16 +469,20 @@
         worker_thread_.join();
     }
 
-    void PrepareReadBlock(IoBlock* block, uint64_t id) {
+    void PrepareReadBlock(IoReadBlock* block, uint64_t id) {
         block->pending = false;
-        block->payload = std::make_shared<Block>(kUsbReadSize);
+        if (block->payload.capacity() >= kUsbReadSize) {
+            block->payload.resize(kUsbReadSize);
+        } else {
+            block->payload = Block(kUsbReadSize);
+        }
         block->control.aio_data = static_cast<uint64_t>(TransferId::read(id));
-        block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data());
-        block->control.aio_nbytes = block->payload->size();
+        block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data());
+        block->control.aio_nbytes = block->payload.size();
     }
 
-    IoBlock CreateReadBlock(uint64_t id) {
-        IoBlock block;
+    IoReadBlock CreateReadBlock(uint64_t id) {
+        IoReadBlock block;
         PrepareReadBlock(&block, id);
         block.control.aio_rw_flags = 0;
         block.control.aio_lio_opcode = IOCB_CMD_PREAD;
@@ -518,9 +527,9 @@
 
     void HandleRead(TransferId id, int64_t size) {
         uint64_t read_idx = id.id % kUsbReadQueueDepth;
-        IoBlock* block = &read_requests_[read_idx];
+        IoReadBlock* block = &read_requests_[read_idx];
         block->pending = false;
-        block->payload->resize(size);
+        block->payload.resize(size);
 
         // Notification for completed reads can be received out of order.
         if (block->id().id != needed_read_id_) {
@@ -531,7 +540,7 @@
 
         for (uint64_t id = needed_read_id_;; ++id) {
             size_t read_idx = id % kUsbReadQueueDepth;
-            IoBlock* current_block = &read_requests_[read_idx];
+            IoReadBlock* current_block = &read_requests_[read_idx];
             if (current_block->pending) {
                 break;
             }
@@ -540,19 +549,19 @@
         }
     }
 
-    void ProcessRead(IoBlock* block) {
-        if (!block->payload->empty()) {
+    void ProcessRead(IoReadBlock* block) {
+        if (!block->payload.empty()) {
             if (!incoming_header_.has_value()) {
-                CHECK_EQ(sizeof(amessage), block->payload->size());
-                amessage msg;
-                memcpy(&msg, block->payload->data(), sizeof(amessage));
+                CHECK_EQ(sizeof(amessage), block->payload.size());
+                amessage& msg = incoming_header_.emplace();
+                memcpy(&msg, block->payload.data(), sizeof(msg));
                 LOG(DEBUG) << "USB read:" << dump_header(&msg);
                 incoming_header_ = msg;
             } else {
                 size_t bytes_left = incoming_header_->data_length - incoming_payload_.size();
-                Block payload = std::move(*block->payload);
+                Block payload = std::move(block->payload);
                 CHECK_LE(payload.size(), bytes_left);
-                incoming_payload_.append(std::make_unique<Block>(std::move(payload)));
+                incoming_payload_.append(std::move(payload));
             }
 
             if (incoming_header_->data_length == incoming_payload_.size()) {
@@ -560,11 +569,15 @@
                 packet->msg = *incoming_header_;
 
                 // TODO: Make apacket contain an IOVector so we don't have to coalesce.
-                packet->payload = incoming_payload_.coalesce();
+                packet->payload = std::move(incoming_payload_).coalesce();
                 read_callback_(this, std::move(packet));
 
                 incoming_header_.reset();
-                incoming_payload_.clear();
+                // reuse the capacity of the incoming payload while we can.
+                auto free_block = incoming_payload_.clear();
+                if (block->payload.capacity() == 0) {
+                    block->payload = std::move(free_block);
+                }
             }
         }
 
@@ -572,7 +585,7 @@
         SubmitRead(block);
     }
 
-    bool SubmitRead(IoBlock* block) {
+    bool SubmitRead(IoReadBlock* block) {
         block->pending = true;
         struct iocb* iocb = &block->control;
         if (io_submit(aio_context_.get(), 1, &iocb) != 1) {
@@ -594,7 +607,7 @@
         std::lock_guard<std::mutex> lock(write_mutex_);
         auto it =
                 std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) {
-                    return static_cast<uint64_t>(req->id()) == static_cast<uint64_t>(id);
+                    return static_cast<uint64_t>(req.id()) == static_cast<uint64_t>(id);
                 });
         CHECK(it != write_requests_.end());
 
@@ -605,27 +618,26 @@
         SubmitWrites();
     }
 
-    std::unique_ptr<IoBlock> CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset,
-                                              size_t len, uint64_t id) {
-        auto block = std::make_unique<IoBlock>();
-        block->payload = std::move(payload);
-        block->control.aio_data = static_cast<uint64_t>(TransferId::write(id));
-        block->control.aio_rw_flags = 0;
-        block->control.aio_lio_opcode = IOCB_CMD_PWRITE;
-        block->control.aio_reqprio = 0;
-        block->control.aio_fildes = write_fd_.get();
-        block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data() + offset);
-        block->control.aio_nbytes = len;
-        block->control.aio_offset = 0;
-        block->control.aio_flags = IOCB_FLAG_RESFD;
-        block->control.aio_resfd = worker_event_fd_.get();
+    IoWriteBlock CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len,
+                                  uint64_t id) {
+        auto block = IoWriteBlock();
+        block.payload = std::move(payload);
+        block.control.aio_data = static_cast<uint64_t>(TransferId::write(id));
+        block.control.aio_rw_flags = 0;
+        block.control.aio_lio_opcode = IOCB_CMD_PWRITE;
+        block.control.aio_reqprio = 0;
+        block.control.aio_fildes = write_fd_.get();
+        block.control.aio_buf = reinterpret_cast<uintptr_t>(block.payload->data() + offset);
+        block.control.aio_nbytes = len;
+        block.control.aio_offset = 0;
+        block.control.aio_flags = IOCB_FLAG_RESFD;
+        block.control.aio_resfd = worker_event_fd_.get();
         return block;
     }
 
-    std::unique_ptr<IoBlock> CreateWriteBlock(Block payload, uint64_t id) {
-        std::shared_ptr<Block> block = std::make_shared<Block>(std::move(payload));
-        size_t len = block->size();
-        return CreateWriteBlock(std::move(block), 0, len, id);
+    IoWriteBlock CreateWriteBlock(Block&& payload, uint64_t id) {
+        size_t len = payload.size();
+        return CreateWriteBlock(std::make_shared<Block>(std::move(payload)), 0, len, id);
     }
 
     void SubmitWrites() REQUIRES(write_mutex_) {
@@ -642,9 +654,9 @@
 
         struct iocb* iocbs[kUsbWriteQueueDepth];
         for (int i = 0; i < writes_to_submit; ++i) {
-            CHECK(!write_requests_[writes_submitted_ + i]->pending);
-            write_requests_[writes_submitted_ + i]->pending = true;
-            iocbs[i] = &write_requests_[writes_submitted_ + i]->control;
+            CHECK(!write_requests_[writes_submitted_ + i].pending);
+            write_requests_[writes_submitted_ + i].pending = true;
+            iocbs[i] = &write_requests_[writes_submitted_ + i].control;
             LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]);
         }
 
@@ -689,7 +701,7 @@
     std::optional<amessage> incoming_header_;
     IOVector incoming_payload_;
 
-    std::array<IoBlock, kUsbReadQueueDepth> read_requests_;
+    std::array<IoReadBlock, kUsbReadQueueDepth> read_requests_;
     IOVector read_data_;
 
     // ID of the next request that we're going to send out.
@@ -699,7 +711,7 @@
     size_t needed_read_id_ = 0;
 
     std::mutex write_mutex_;
-    std::deque<std::unique_ptr<IoBlock>> write_requests_ GUARDED_BY(write_mutex_);
+    std::deque<IoWriteBlock> write_requests_ GUARDED_BY(write_mutex_);
     size_t next_write_id_ GUARDED_BY(write_mutex_) = 0;
     size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0;