adbd: implement a nonblocking USB Connection.

Implement a Connection that implements a nonblocking interface to
functionfs, to replace the existing implementation that uses two
threads that loop and call read and write respectively. The existing
implementation is vulnerable to a race condition that can occur when a
connection is terminated, where one thread can notice failure and
complete reinitialization of the USB endpoints before the other thread
noticed anything went wrong, resulting in either the first packet
coming from the other end disappearing in to the void, or the other end
getting a packet of garbage.

As a side benefit, this improves performance on walleye from:

    push 100MiB: 10 runs: median 49.48 MiB/s, mean 50.00 MiB/s, stddev: 2.77 MiB/s
    pull 100MiB: 10 runs: median 75.82 MiB/s, mean 76.18 MiB/s, stddev: 6.60 MiB/s

to:

    push 100MiB: 10 runs: median 73.90 MiB/s, mean 73.51 MiB/s, stddev: 5.26 MiB/s
    pull 100MiB: 10 runs: median 105.90 MiB/s, mean 107.19 MiB/s, stddev: 6.10 MiB/s

Test: python test_device.py
Change-Id: I9b77c1057965edfef739ed9736e5d76613adf60a
diff --git a/adb/transport.cpp b/adb/transport.cpp
index d41f9c8..051ee2f 100644
--- a/adb/transport.cpp
+++ b/adb/transport.cpp
@@ -52,7 +52,6 @@
 #include "fdevent.h"
 #include "sysdeps/chrono.h"
 
-static void register_transport(atransport* transport);
 static void remove_transport(atransport* transport);
 static void transport_unref(atransport* transport);
 
@@ -671,7 +670,7 @@
             return true;
         });
         t->connection()->SetErrorCallback([t](Connection*, const std::string& error) {
-            D("%s: connection terminated: %s", t->serial.c_str(), error.c_str());
+            LOG(INFO) << t->serial_name() << ": connection terminated: " << error;
             fdevent_run_on_main_thread([t]() {
                 handle_offline(t);
                 transport_unref(t);
@@ -730,7 +729,7 @@
 }
 
 /* the fdevent select pump is single threaded */
-static void register_transport(atransport* transport) {
+void register_transport(atransport* transport) {
     tmsg m;
     m.transport = transport;
     m.action = 1;
@@ -758,6 +757,7 @@
     CHECK_GT(t->ref_count, 0u);
     t->ref_count--;
     if (t->ref_count == 0) {
+        LOG(INFO) << "destroying transport " << t->serial_name();
         t->connection()->Stop();
 #if ADB_HOST
         if (t->IsTcpDevice() && !t->kicked()) {
@@ -1293,6 +1293,7 @@
     register_transport(t);
 }
 
+#if ADB_HOST
 // This should only be used for transports with connection_state == kCsNoPerm.
 void unregister_usb_transport(usb_handle* usb) {
     std::lock_guard<std::recursive_mutex> lock(transport_lock);
@@ -1304,6 +1305,7 @@
         return false;
     });
 }
+#endif
 
 bool check_header(apacket* p, atransport* t) {
     if (p->msg.magic != (p->msg.command ^ 0xffffffff)) {