adb: convert Connection to a nonblocking interface.

Rename the existing Connection to BlockingConnection, add a nonblocking
Connection, and add an adapter between the two, to enable future work
to reduce the impedance mismatch from implementing a blocking interface
on top of nonblocking primitives.

While we're here, delete A_SYNC, and remove one layer of pipes when
sending a packet (replacing it with a condition variable when using
BlockingConnectionAdapter).

Test: python test_device.py, manually plugging/unplugging devices
Change-Id: Ieac2bf937471d9d494075575f07e53b589aba20a
diff --git a/adb/transport.h b/adb/transport.h
index 9700f44..a492f00 100644
--- a/adb/transport.h
+++ b/adb/transport.h
@@ -20,12 +20,14 @@
 #include <sys/types.h>
 
 #include <atomic>
+#include <condition_variable>
 #include <deque>
 #include <functional>
 #include <list>
 #include <memory>
 #include <mutex>
 #include <string>
+#include <thread>
 #include <unordered_set>
 
 #include <openssl/rsa.h>
@@ -57,15 +59,47 @@
 
 TransportId NextTransportId();
 
-// Abstraction for a blocking packet transport.
+// Abstraction for a non-blocking packet transport.
 struct Connection {
     Connection() = default;
-    Connection(const Connection& copy) = delete;
-    Connection(Connection&& move) = delete;
-
-    // Destroy a Connection. Formerly known as 'Close' in atransport.
     virtual ~Connection() = default;
 
+    void SetTransportName(std::string transport_name) {
+        transport_name_ = std::move(transport_name);
+    }
+
+    using ReadCallback = std::function<bool(Connection*, std::unique_ptr<apacket>)>;
+    void SetReadCallback(ReadCallback callback) {
+        CHECK(!read_callback_);
+        read_callback_ = callback;
+    }
+
+    // Called after the Connection has terminated, either by an error or because Stop was called.
+    using ErrorCallback = std::function<void(Connection*, const std::string&)>;
+    void SetErrorCallback(ErrorCallback callback) {
+        CHECK(!error_callback_);
+        error_callback_ = callback;
+    }
+
+    virtual bool Write(std::unique_ptr<apacket> packet) = 0;
+
+    virtual void Start() = 0;
+    virtual void Stop() = 0;
+
+    std::string transport_name_;
+    ReadCallback read_callback_;
+    ErrorCallback error_callback_;
+};
+
+// Abstraction for a blocking packet transport.
+struct BlockingConnection {
+    BlockingConnection() = default;
+    BlockingConnection(const BlockingConnection& copy) = delete;
+    BlockingConnection(BlockingConnection&& move) = delete;
+
+    // Destroy a BlockingConnection. Formerly known as 'Close' in atransport.
+    virtual ~BlockingConnection() = default;
+
     // Read/Write a packet. These functions are concurrently called from a transport's reader/writer
     // threads.
     virtual bool Read(apacket* packet) = 0;
@@ -77,7 +111,30 @@
     virtual void Close() = 0;
 };
 
-struct FdConnection : public Connection {
+struct BlockingConnectionAdapter : public Connection {
+    explicit BlockingConnectionAdapter(std::unique_ptr<BlockingConnection> connection);
+
+    virtual ~BlockingConnectionAdapter();
+
+    virtual bool Write(std::unique_ptr<apacket> packet) override final;
+
+    virtual void Start() override final;
+    virtual void Stop() override final;
+
+    bool stopped_ = false;
+
+    std::unique_ptr<BlockingConnection> underlying_;
+    std::thread read_thread_;
+    std::thread write_thread_;
+
+    std::deque<std::unique_ptr<apacket>> write_queue_;
+    std::mutex mutex_;
+    std::condition_variable cv_;
+
+    std::once_flag error_flag_;
+};
+
+struct FdConnection : public BlockingConnection {
     explicit FdConnection(unique_fd fd) : fd_(std::move(fd)) {}
 
     bool Read(apacket* packet) override final;
@@ -89,7 +146,7 @@
     unique_fd fd_;
 };
 
-struct UsbConnection : public Connection {
+struct UsbConnection : public BlockingConnection {
     explicit UsbConnection(usb_handle* handle) : handle_(handle) {}
     ~UsbConnection();
 
@@ -110,7 +167,6 @@
 
     atransport(ConnectionState state = kCsOffline)
         : id(NextTransportId()), connection_state_(state) {
-        transport_fde = {};
         // Initialize protocol to min version for compatibility with older versions.
         // Version will be updated post-connect.
         protocol_version = A_VERSION_MIN;
@@ -126,11 +182,7 @@
     void SetConnectionState(ConnectionState state);
 
     const TransportId id;
-    int fd = -1;
-    int transport_socket = -1;
-    fdevent transport_fde;
     size_t ref_count = 0;
-    uint32_t sync_token = 0;
     bool online = false;
     TransportType type = kTransportAny;