adb: extract soon-to-be-common code.

As a side effect, delete FDE_ACTIVE, which was always set on every
fdevent, and FDE_PENDING, which was an internal implementation detail.

This patch removes spin detection, which will be reimplemented
separately later.

Test: adb_test on host
Test: adbd_test on blueline
Change-Id: I40be3504ce03c4fae5e071fa018542a051b7511d
diff --git a/adb/fdevent/fdevent.cpp b/adb/fdevent/fdevent.cpp
index 28b8f37..d215217 100644
--- a/adb/fdevent/fdevent.cpp
+++ b/adb/fdevent/fdevent.cpp
@@ -28,14 +28,21 @@
 #include "fdevent.h"
 #include "fdevent_poll.h"
 
+using namespace std::chrono_literals;
+using std::chrono::duration_cast;
+
+void invoke_fde(struct fdevent* fde, unsigned events) {
+    if (auto f = std::get_if<fd_func>(&fde->func)) {
+        (*f)(fde->fd.get(), events, fde->arg);
+    } else if (auto f = std::get_if<fd_func2>(&fde->func)) {
+        (*f)(fde, events, fde->arg);
+    } else {
+        __builtin_unreachable();
+    }
+}
+
 std::string dump_fde(const fdevent* fde) {
     std::string state;
-    if (fde->state & FDE_ACTIVE) {
-        state += "A";
-    }
-    if (fde->state & FDE_PENDING) {
-        state += "P";
-    }
     if (fde->state & FDE_READ) {
         state += "R";
     }
@@ -53,9 +60,11 @@
     CheckMainThread();
     CHECK_GE(fd.get(), 0);
 
+    int fd_num = fd.get();
+
     fdevent* fde = new fdevent();
     fde->id = fdevent_id_++;
-    fde->state = FDE_ACTIVE;
+    fde->state = 0;
     fde->fd = std::move(fd);
     fde->func = func;
     fde->arg = arg;
@@ -66,6 +75,10 @@
         LOG(ERROR) << "failed to set non-blocking mode for fd " << fde->fd.get();
     }
 
+    auto [it, inserted] = this->installed_fdevents_.emplace(fd_num, fde);
+    CHECK(inserted);
+    UNUSED(it);
+
     this->Register(fde);
     return fde;
 }
@@ -78,18 +91,22 @@
 
     this->Unregister(fde);
 
+    auto erased = this->installed_fdevents_.erase(fde->fd.get());
+    CHECK_EQ(1UL, erased);
+
     unique_fd result = std::move(fde->fd);
     delete fde;
     return result;
 }
 
 void fdevent_context::Add(fdevent* fde, unsigned events) {
-    Set(fde, (fde->state & FDE_EVENTMASK) | events);
+    CHECK(!(events & FDE_TIMEOUT));
+    Set(fde, fde->state | events);
 }
 
 void fdevent_context::Del(fdevent* fde, unsigned events) {
     CHECK(!(events & FDE_TIMEOUT));
-    Set(fde, (fde->state & FDE_EVENTMASK) & ~events);
+    Set(fde, fde->state & ~events);
 }
 
 void fdevent_context::SetTimeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) {
@@ -98,6 +115,56 @@
     fde->last_active = std::chrono::steady_clock::now();
 }
 
+std::optional<std::chrono::milliseconds> fdevent_context::CalculatePollDuration() {
+    std::optional<std::chrono::milliseconds> result = std::nullopt;
+    auto now = std::chrono::steady_clock::now();
+    CheckMainThread();
+
+    for (const auto& [fd, fde] : this->installed_fdevents_) {
+        UNUSED(fd);
+        auto timeout_opt = fde->timeout;
+        if (timeout_opt) {
+            auto deadline = fde->last_active + *timeout_opt;
+            auto time_left = duration_cast<std::chrono::milliseconds>(deadline - now);
+            if (time_left < 0ms) {
+                time_left = 0ms;
+            }
+
+            if (!result) {
+                result = time_left;
+            } else {
+                result = std::min(*result, time_left);
+            }
+        }
+    }
+
+    return result;
+}
+
+void fdevent_context::HandleEvents(const std::vector<fdevent_event>& events) {
+    for (const auto& event : events) {
+        invoke_fde(event.fde, event.events);
+    }
+    FlushRunQueue();
+}
+
+void fdevent_context::FlushRunQueue() {
+    // We need to be careful around reentrancy here, since a function we call can queue up another
+    // function.
+    while (true) {
+        std::function<void()> fn;
+        {
+            std::lock_guard<std::mutex> lock(this->run_queue_mutex_);
+            if (this->run_queue_.empty()) {
+                break;
+            }
+            fn = std::move(this->run_queue_.front());
+            this->run_queue_.pop_front();
+        }
+        fn();
+    }
+}
+
 void fdevent_context::CheckMainThread() {
     if (main_thread_id_) {
         CHECK_EQ(*main_thread_id_, android::base::GetThreadId());
@@ -118,23 +185,6 @@
     Interrupt();
 }
 
-void fdevent_context::FlushRunQueue() {
-    // We need to be careful around reentrancy here, since a function we call can queue up another
-    // function.
-    while (true) {
-        std::function<void()> fn;
-        {
-            std::lock_guard<std::mutex> lock(this->run_queue_mutex_);
-            if (this->run_queue_.empty()) {
-                break;
-            }
-            fn = this->run_queue_.front();
-            this->run_queue_.pop_front();
-        }
-        fn();
-    }
-}
-
 static auto& g_ambient_fdevent_context =
         *new std::unique_ptr<fdevent_context>(new fdevent_context_poll());