adb: extract atransport's connection interface.

As step one of refactoring atransport to separate out protocol handling
from its underlying connection, extract atransport's existing
hand-rolled connection vtable out to its own abstract interface.

This should not change behavior except in one case: emulators are
now treated as TCP devices for the purposes of `adb disconnect`.

Test: python test_device.py, with walleye over USB + TCP
Test: manually connecting and disconnecting devices/emulators
Change-Id: I877b8027e567cc6a7461749432b49f6cb2c2f0d7
diff --git a/adb/transport.cpp b/adb/transport.cpp
index 4a9d91a..5acaaec 100644
--- a/adb/transport.cpp
+++ b/adb/transport.cpp
@@ -41,6 +41,7 @@
 
 #include "adb.h"
 #include "adb_auth.h"
+#include "adb_io.h"
 #include "adb_trace.h"
 #include "adb_utils.h"
 #include "diagnose_usb.h"
@@ -65,6 +66,36 @@
     return next++;
 }
 
+bool FdConnection::Read(apacket* packet) {
+    if (!ReadFdExactly(fd_.get(), &packet->msg, sizeof(amessage))) {
+        D("remote local: read terminated (message)");
+        return false;
+    }
+
+    if (!ReadFdExactly(fd_.get(), &packet->data, packet->msg.data_length)) {
+        D("remote local: terminated (data)");
+        return false;
+    }
+
+    return true;
+}
+
+bool FdConnection::Write(apacket* packet) {
+    uint32_t length = packet->msg.data_length;
+
+    if (!WriteFdExactly(fd_.get(), &packet->msg, sizeof(amessage) + length)) {
+        D("remote local: write terminated");
+        return false;
+    }
+
+    return true;
+}
+
+void FdConnection::Close() {
+    adb_shutdown(fd_.get());
+    fd_.reset();
+}
+
 static std::string dump_packet(const char* name, const char* func, apacket* p) {
     unsigned command = p->msg.command;
     int len = p->msg.data_length;
@@ -220,11 +251,18 @@
 
         {
             ATRACE_NAME("read_transport read_remote");
-            if (t->read_from_remote(p, t) != 0) {
+            if (!t->connection->Read(p)) {
                 D("%s: remote read failed for transport", t->serial);
                 put_apacket(p);
                 break;
             }
+
+            if (!check_header(p, t)) {
+                D("%s: remote read: bad header", t->serial);
+                put_apacket(p);
+                break;
+            }
+
 #if ADB_HOST
             if (p->msg.command == 0) {
                 put_apacket(p);
@@ -626,7 +664,7 @@
     t->ref_count--;
     if (t->ref_count == 0) {
         D("transport: %s unref (kicking and closing)", t->serial);
-        t->close(t);
+        t->connection->Close();
         remove_transport(t);
     } else {
         D("transport: %s unref (count=%zu)", t->serial, t->ref_count);
@@ -754,14 +792,14 @@
 }
 
 int atransport::Write(apacket* p) {
-    return write_func_(p, this);
+    return this->connection->Write(p) ? 0 : -1;
 }
 
 void atransport::Kick() {
     if (!kicked_) {
+        D("kicking transport %s", this->serial);
         kicked_ = true;
-        CHECK(kick_func_ != nullptr);
-        kick_func_(this);
+        this->connection->Close();
     }
 }
 
@@ -1083,8 +1121,12 @@
 // This should only be used for transports with connection_state == kCsNoPerm.
 void unregister_usb_transport(usb_handle* usb) {
     std::lock_guard<std::recursive_mutex> lock(transport_lock);
-    transport_list.remove_if(
-        [usb](atransport* t) { return t->usb == usb && t->GetConnectionState() == kCsNoPerm; });
+    transport_list.remove_if([usb](atransport* t) {
+        if (auto connection = dynamic_cast<UsbConnection*>(t->connection.get())) {
+            return connection->handle_ == usb && t->GetConnectionState() == kCsNoPerm;
+        }
+        return false;
+    });
 }
 
 bool check_header(apacket* p, atransport* t) {