libbinder: reverse connections

When connecting to an RPC client server, you can request to serve a
threadpool so that you can receive callbacks from it.

Future considerations:
- starting threads dynamically (likely very, very soon after this CL)
- combining threadpools (as needed)

Bug: 185167543
Test: binderRpcTest
Change-Id: I992959e963ebc1b3da2f89fdb6c1ec625cb51af4
diff --git a/libs/binder/tests/Android.bp b/libs/binder/tests/Android.bp
index 9cf433d..c7c899f 100644
--- a/libs/binder/tests/Android.bp
+++ b/libs/binder/tests/Android.bp
@@ -118,6 +118,7 @@
     host_supported: true,
     unstable: true,
     srcs: [
+        "IBinderRpcCallback.aidl",
         "IBinderRpcSession.aidl",
         "IBinderRpcTest.aidl",
     ],
diff --git a/libs/binder/tests/IBinderRpcCallback.aidl b/libs/binder/tests/IBinderRpcCallback.aidl
new file mode 100644
index 0000000..0336961
--- /dev/null
+++ b/libs/binder/tests/IBinderRpcCallback.aidl
@@ -0,0 +1,20 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+interface IBinderRpcCallback {
+    void sendCallback(@utf8InCpp String str);
+    oneway void sendOnewayCallback(@utf8InCpp String str);
+}
diff --git a/libs/binder/tests/IBinderRpcTest.aidl b/libs/binder/tests/IBinderRpcTest.aidl
index 646bcc6..b0c8b2d 100644
--- a/libs/binder/tests/IBinderRpcTest.aidl
+++ b/libs/binder/tests/IBinderRpcTest.aidl
@@ -54,6 +54,8 @@
     void sleepMs(int ms);
     oneway void sleepMsAsync(int ms);
 
+    void doCallback(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value);
+
     void die(boolean cleanup);
     void scheduleShutdown();
 
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index efc70e6..80708df 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -14,6 +14,7 @@
  * limitations under the License.
  */
 
+#include <BnBinderRpcCallback.h>
 #include <BnBinderRpcSession.h>
 #include <BnBinderRpcTest.h>
 #include <aidl/IBinderRpcTest.h>
@@ -34,6 +35,7 @@
 #include <cstdlib>
 #include <iostream>
 #include <thread>
+#include <type_traits>
 
 #include <sys/prctl.h>
 #include <unistd.h>
@@ -89,6 +91,22 @@
 };
 std::atomic<int32_t> MyBinderRpcSession::gNum;
 
+class MyBinderRpcCallback : public BnBinderRpcCallback {
+    Status sendCallback(const std::string& value) {
+        std::unique_lock _l(mMutex);
+        mValues.push_back(value);
+        _l.unlock();
+        mCv.notify_one();
+        return Status::ok();
+    }
+    Status sendOnewayCallback(const std::string& value) { return sendCallback(value); }
+
+public:
+    std::mutex mMutex;
+    std::condition_variable mCv;
+    std::vector<std::string> mValues;
+};
+
 class MyBinderRpcTest : public BnBinderRpcTest {
 public:
     wp<RpcServer> server;
@@ -187,6 +205,27 @@
         return sleepMs(ms);
     }
 
+    Status doCallback(const sp<IBinderRpcCallback>& callback, bool oneway, bool delayed,
+                      const std::string& value) override {
+        if (callback == nullptr) {
+            return Status::fromExceptionCode(Status::EX_NULL_POINTER);
+        }
+
+        if (delayed) {
+            std::thread([=]() {
+                ALOGE("Executing delayed callback: '%s'", value.c_str());
+                (void)doCallback(callback, oneway, false, value);
+            }).detach();
+            return Status::ok();
+        }
+
+        if (oneway) {
+            return callback->sendOnewayCallback(value);
+        }
+
+        return callback->sendCallback(value);
+    }
+
     Status die(bool cleanup) override {
         if (cleanup) {
             exit(1);
@@ -308,6 +347,9 @@
 
     BinderRpcTestProcessSession(BinderRpcTestProcessSession&&) = default;
     ~BinderRpcTestProcessSession() {
+        EXPECT_NE(nullptr, rootIface);
+        if (rootIface == nullptr) return;
+
         if (!expectAlreadyShutdown) {
             std::vector<int32_t> remoteCounts;
             // calling over any sessions counts across all sessions
@@ -348,7 +390,7 @@
     // This creates a new process serving an interface on a certain number of
     // threads.
     ProcessSession createRpcTestSocketServerProcess(
-            size_t numThreads, size_t numSessions,
+            size_t numThreads, size_t numSessions, size_t numReverseConnections,
             const std::function<void(const sp<RpcServer>&)>& configure) {
         CHECK_GE(numSessions, 1) << "Must have at least one session to a server";
 
@@ -404,6 +446,8 @@
 
         for (size_t i = 0; i < numSessions; i++) {
             sp<RpcSession> session = RpcSession::make();
+            session->setMaxReverseConnections(numReverseConnections);
+
             switch (socketType) {
                 case SocketType::UNIX:
                     if (session->setupUnixDomainClient(addr.c_str())) goto success;
@@ -425,9 +469,11 @@
     }
 
     BinderRpcTestProcessSession createRpcTestSocketServerProcess(size_t numThreads,
-                                                                 size_t numSessions = 1) {
+                                                                 size_t numSessions = 1,
+                                                                 size_t numReverseConnections = 0) {
         BinderRpcTestProcessSession ret{
                 .proc = createRpcTestSocketServerProcess(numThreads, numSessions,
+                                                         numReverseConnections,
                                                          [&](const sp<RpcServer>& server) {
                                                              sp<MyBinderRpcTest> service =
                                                                      new MyBinderRpcTest;
@@ -895,6 +941,38 @@
     for (auto& t : threads) t.join();
 }
 
+TEST_P(BinderRpc, Callbacks) {
+    const static std::string kTestString = "good afternoon!";
+
+    for (bool oneway : {true, false}) {
+        for (bool delayed : {true, false}) {
+            auto proc = createRpcTestSocketServerProcess(1, 1, 1);
+            auto cb = sp<MyBinderRpcCallback>::make();
+
+            EXPECT_OK(proc.rootIface->doCallback(cb, oneway, delayed, kTestString));
+
+            using std::literals::chrono_literals::operator""s;
+            std::unique_lock<std::mutex> _l(cb->mMutex);
+            cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); });
+
+            EXPECT_EQ(cb->mValues.size(), 1) << "oneway: " << oneway << "delayed: " << delayed;
+            if (cb->mValues.empty()) continue;
+            EXPECT_EQ(cb->mValues.at(0), kTestString)
+                    << "oneway: " << oneway << "delayed: " << delayed;
+
+            // since we are severing the connection, we need to go ahead and
+            // tell the server to shutdown and exit so that waitpid won't hang
+            EXPECT_OK(proc.rootIface->scheduleShutdown());
+
+            // since this session has a reverse connection w/ a threadpool, we
+            // need to manually shut it down
+            EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdown());
+
+            proc.expectAlreadyShutdown = true;
+        }
+    }
+}
+
 TEST_P(BinderRpc, Die) {
     for (bool doDeathCleanup : {true, false}) {
         auto proc = createRpcTestSocketServerProcess(1);