Merge changes I042f25f8,I973f42c5,Icb4acea5

* changes:
  adbd: add source/sink services.
  adb: make `adb raw` bidirectional.
  adb: remove incorrect use of RTTI.
diff --git a/adb/Android.bp b/adb/Android.bp
index ae8e386..6234af1 100644
--- a/adb/Android.bp
+++ b/adb/Android.bp
@@ -24,7 +24,7 @@
         "-Wno-missing-field-initializers",
         "-Wvla",
     ],
-    rtti: true,
+    cpp_std: "experimental",
 
     use_version_lib: true,
 
diff --git a/adb/adb.h b/adb/adb.h
index e2911e8..cdd6346 100644
--- a/adb/adb.h
+++ b/adb/adb.h
@@ -149,6 +149,10 @@
 #endif
 
 #if !ADB_HOST
+asocket* daemon_service_to_socket(std::string_view name);
+#endif
+
+#if !ADB_HOST
 int init_jdwp(void);
 asocket* create_jdwp_service_socket();
 asocket* create_jdwp_tracker_service_socket();
diff --git a/adb/client/commandline.cpp b/adb/client/commandline.cpp
index 95e66ae..c8e834e 100644
--- a/adb/client/commandline.cpp
+++ b/adb/client/commandline.cpp
@@ -1275,6 +1275,42 @@
     return 0;
 }
 
+static int adb_connect_command_bidirectional(const std::string& command) {
+    std::string error;
+    int fd = adb_connect(command, &error);
+    if (fd < 0) {
+        fprintf(stderr, "error: %s\n", error.c_str());
+        return 1;
+    }
+
+    static constexpr auto forward = [](int src, int sink, bool exit_on_end) {
+        char buf[4096];
+        while (true) {
+            int rc = adb_read(src, buf, sizeof(buf));
+            if (rc == 0) {
+                if (exit_on_end) {
+                    exit(0);
+                } else {
+                    adb_shutdown(sink, SHUT_WR);
+                }
+                return;
+            } else if (rc < 0) {
+                perror_exit("read failed");
+            }
+            if (!WriteFdExactly(sink, buf, rc)) {
+                perror_exit("write failed");
+            }
+        }
+    };
+
+    std::thread read(forward, fd, STDOUT_FILENO, true);
+    std::thread write(forward, STDIN_FILENO, fd, false);
+    read.join();
+    write.join();
+    adb_close(fd);
+    return 0;
+}
+
 static int adb_query_command(const std::string& command) {
     std::string result;
     std::string error;
@@ -1766,7 +1802,7 @@
         if (argc != 2) {
             error_exit("usage: adb raw SERVICE");
         }
-        return adb_connect_command(argv[1]);
+        return adb_connect_command_bidirectional(argv[1]);
     }
 
     /* "adb /?" is a common idiom under Windows */
diff --git a/adb/daemon/services.cpp b/adb/daemon/services.cpp
index b300fac..3182ddd 100644
--- a/adb/daemon/services.cpp
+++ b/adb/daemon/services.cpp
@@ -33,6 +33,7 @@
 #include <thread>
 
 #include <android-base/file.h>
+#include <android-base/parseint.h>
 #include <android-base/parsenetaddress.h>
 #include <android-base/properties.h>
 #include <android-base/stringprintf.h>
@@ -223,6 +224,105 @@
     WriteFdExactly(fd.get(), "spinning\n");
 }
 
+struct ServiceSocket : public asocket {
+    ServiceSocket() {
+        install_local_socket(this);
+        this->enqueue = [](asocket* self, apacket::payload_type data) {
+            return static_cast<ServiceSocket*>(self)->Enqueue(std::move(data));
+        };
+        this->ready = [](asocket* self) { return static_cast<ServiceSocket*>(self)->Ready(); };
+        this->close = [](asocket* self) { return static_cast<ServiceSocket*>(self)->Close(); };
+    }
+    virtual ~ServiceSocket() = default;
+
+    virtual int Enqueue(apacket::payload_type data) { return -1; }
+    virtual void Ready() {}
+    virtual void Close() {
+        if (peer) {
+            peer->peer = nullptr;
+            if (peer->shutdown) {
+                peer->shutdown(peer);
+            }
+            peer->close(peer);
+        }
+
+        remove_socket(this);
+        delete this;
+    }
+};
+
+struct SinkSocket : public ServiceSocket {
+    explicit SinkSocket(size_t byte_count) {
+        LOG(INFO) << "Creating new SinkSocket with capacity " << byte_count;
+        bytes_left_ = byte_count;
+    }
+
+    virtual ~SinkSocket() { LOG(INFO) << "SinkSocket destroyed"; }
+
+    virtual int Enqueue(apacket::payload_type data) override final {
+        if (bytes_left_ <= data.size()) {
+            // Done reading.
+            Close();
+            return -1;
+        }
+
+        bytes_left_ -= data.size();
+        return 0;
+    }
+
+    size_t bytes_left_;
+};
+
+struct SourceSocket : public ServiceSocket {
+    explicit SourceSocket(size_t byte_count) {
+        LOG(INFO) << "Creating new SourceSocket with capacity " << byte_count;
+        bytes_left_ = byte_count;
+    }
+
+    virtual ~SourceSocket() { LOG(INFO) << "SourceSocket destroyed"; }
+
+    void Ready() {
+        size_t len = std::min(bytes_left_, get_max_payload());
+        if (len == 0) {
+            Close();
+            return;
+        }
+
+        Block block(len);
+        memset(block.data(), 0, block.size());
+        peer->enqueue(peer, std::move(block));
+        bytes_left_ -= len;
+    }
+
+    int Enqueue(apacket::payload_type data) { return -1; }
+
+    size_t bytes_left_;
+};
+
+asocket* daemon_service_to_socket(std::string_view name) {
+    if (name == "jdwp") {
+        return create_jdwp_service_socket();
+    } else if (name == "track-jdwp") {
+        return create_jdwp_tracker_service_socket();
+    } else if (name.starts_with("sink:")) {
+        name.remove_prefix(strlen("sink:"));
+        uint64_t byte_count = 0;
+        if (!android::base::ParseUint(name.data(), &byte_count)) {
+            return nullptr;
+        }
+        return new SinkSocket(byte_count);
+    } else if (name.starts_with("source:")) {
+        name.remove_prefix(strlen("source:"));
+        uint64_t byte_count = 0;
+        if (!android::base::ParseUint(name.data(), &byte_count)) {
+            return nullptr;
+        }
+        return new SourceSocket(byte_count);
+    }
+
+    return nullptr;
+}
+
 unique_fd daemon_service_to_fd(const char* name, atransport* transport) {
     if (!strncmp("dev:", name, 4)) {
         return unique_fd{unix_open(name + 4, O_RDWR | O_CLOEXEC)};
diff --git a/adb/sockets.cpp b/adb/sockets.cpp
index 8b07f74..1bd57c1 100644
--- a/adb/sockets.cpp
+++ b/adb/sockets.cpp
@@ -348,11 +348,8 @@
 
 asocket* create_local_service_socket(const char* name, atransport* transport) {
 #if !ADB_HOST
-    if (!strcmp(name, "jdwp")) {
-        return create_jdwp_service_socket();
-    }
-    if (!strcmp(name, "track-jdwp")) {
-        return create_jdwp_tracker_service_socket();
+    if (asocket* s = daemon_service_to_socket(name); s) {
+        return s;
     }
 #endif
     int fd = service_to_fd(name, transport);
diff --git a/adb/transport.cpp b/adb/transport.cpp
index 8741654..f59a135 100644
--- a/adb/transport.cpp
+++ b/adb/transport.cpp
@@ -1305,11 +1305,7 @@
 void unregister_usb_transport(usb_handle* usb) {
     std::lock_guard<std::recursive_mutex> lock(transport_lock);
     transport_list.remove_if([usb](atransport* t) {
-        auto connection = t->connection();
-        if (auto usb_connection = dynamic_cast<UsbConnection*>(connection.get())) {
-            return usb_connection->handle_ == usb && t->GetConnectionState() == kCsNoPerm;
-        }
-        return false;
+        return t->GetUsbHandle() == usb && t->GetConnectionState() == kCsNoPerm;
     });
 }
 #endif
diff --git a/adb/transport.h b/adb/transport.h
index d593700..790004f 100644
--- a/adb/transport.h
+++ b/adb/transport.h
@@ -37,6 +37,7 @@
 
 #include "adb.h"
 #include "adb_unique_fd.h"
+#include "usb.h"
 
 typedef std::unordered_set<std::string> FeatureSet;
 
@@ -242,6 +243,9 @@
         return connection_;
     }
 
+    void SetUsbHandle(usb_handle* h) { usb_handle_ = h; }
+    usb_handle* GetUsbHandle() { return usb_handle_; }
+
     const TransportId id;
     size_t ref_count = 0;
     bool online = false;
@@ -333,6 +337,9 @@
     // The underlying connection object.
     std::shared_ptr<Connection> connection_ GUARDED_BY(mutex_);
 
+    // USB handle for the connection, if available.
+    usb_handle* usb_handle_ = nullptr;
+
     // A callback that will be invoked when the atransport needs to reconnect.
     ReconnectCallback reconnect_;
 
diff --git a/adb/transport_usb.cpp b/adb/transport_usb.cpp
index c471bf9..2e5918a 100644
--- a/adb/transport_usb.cpp
+++ b/adb/transport_usb.cpp
@@ -180,6 +180,7 @@
     auto connection = std::make_unique<UsbConnection>(h);
     t->SetConnection(std::make_unique<BlockingConnectionAdapter>(std::move(connection)));
     t->type = kTransportUsb;
+    t->SetUsbHandle(h);
 }
 
 int is_adb_interface(int usb_class, int usb_subclass, int usb_protocol) {