Merge changes Ifc1da577,Ide630e36,Ia215f1a0 am: c3229abcc7 am: 23ff5e4cf7 am: 5e81851253

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1698087

Change-Id: I1e0e3dc2147d713c4a82230cc10390513610f967
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 17c8efd..3c63789 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -22,6 +22,7 @@
 #include <thread>
 #include <vector>
 
+#include <android-base/scopeguard.h>
 #include <binder/Parcel.h>
 #include <binder/RpcServer.h>
 #include <log/log.h>
@@ -32,6 +33,7 @@
 
 namespace android {
 
+using base::ScopeGuard;
 using base::unique_fd;
 
 RpcServer::RpcServer() {}
@@ -125,30 +127,33 @@
 }
 
 void RpcServer::join() {
+    while (true) {
+        (void)acceptOne();
+    }
+}
+
+bool RpcServer::acceptOne() {
     LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
+    LOG_ALWAYS_FATAL_IF(mServer.get() == -1, "RpcServer must be setup to join.");
+
+    unique_fd clientFd(
+            TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC)));
+
+    if (clientFd < 0) {
+        ALOGE("Could not accept4 socket: %s", strerror(errno));
+        return false;
+    }
+    LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
+
     {
         std::lock_guard<std::mutex> _l(mLock);
-        LOG_ALWAYS_FATAL_IF(mServer.get() == -1, "RpcServer must be setup to join.");
+        std::thread thread =
+                std::thread(&RpcServer::establishConnection, this,
+                            std::move(sp<RpcServer>::fromExisting(this)), std::move(clientFd));
+        mConnectingThreads[thread.get_id()] = std::move(thread);
     }
 
-    while (true) {
-        unique_fd clientFd(TEMP_FAILURE_RETRY(
-                accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC)));
-
-        if (clientFd < 0) {
-            ALOGE("Could not accept4 socket: %s", strerror(errno));
-            continue;
-        }
-        LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
-
-        {
-            std::lock_guard<std::mutex> _l(mLock);
-            std::thread thread =
-                    std::thread(&RpcServer::establishConnection, this,
-                                std::move(sp<RpcServer>::fromExisting(this)), std::move(clientFd));
-            mConnectingThreads[thread.get_id()] = std::move(thread);
-        }
-    }
+    return true;
 }
 
 std::vector<sp<RpcSession>> RpcServer::listSessions() {
@@ -161,15 +166,21 @@
     return sessions;
 }
 
+size_t RpcServer::numUninitializedSessions() {
+    std::lock_guard<std::mutex> _l(mLock);
+    return mConnectingThreads.size();
+}
+
 void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) {
     LOG_ALWAYS_FATAL_IF(this != server.get(), "Must pass same ownership object");
 
     // TODO(b/183988761): cannot trust this simple ID
     LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
+    bool idValid = true;
     int32_t id;
     if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) {
         ALOGE("Could not read ID from fd %d", clientFd.get());
-        return;
+        idValid = false;
     }
 
     std::thread thisThread;
@@ -181,8 +192,13 @@
         LOG_ALWAYS_FATAL_IF(threadId == mConnectingThreads.end(),
                             "Must establish connection on owned thread");
         thisThread = std::move(threadId->second);
+        ScopeGuard detachGuard = [&]() { thisThread.detach(); };
         mConnectingThreads.erase(threadId);
 
+        if (!idValid) {
+            return;
+        }
+
         if (id == RPC_SESSION_ID_NEW) {
             LOG_ALWAYS_FATAL_IF(mSessionIdCounter >= INT32_MAX, "Out of session IDs");
             mSessionIdCounter++;
@@ -199,6 +215,9 @@
             }
             session = it->second;
         }
+
+        detachGuard.Disable();
+        session->preJoin(std::move(thisThread));
     }
 
     // avoid strong cycle
@@ -208,7 +227,7 @@
     // DO NOT ACCESS MEMBER VARIABLES BELOW
     //
 
-    session->join(std::move(thisThread), std::move(clientFd));
+    session->join(std::move(clientFd));
 }
 
 bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index f32aa7a..05fa49e 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -131,14 +131,16 @@
     return OK;
 }
 
-void RpcSession::join(std::thread thread, unique_fd client) {
+void RpcSession::preJoin(std::thread thread) {
     LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");
 
     {
         std::lock_guard<std::mutex> _l(mMutex);
         mThreads[thread.get_id()] = std::move(thread);
     }
+}
 
+void RpcSession::join(unique_fd client) {
     // must be registered to allow arbitrary client code executing commands to
     // be able to do nested calls (we can't only read from it)
     sp<RpcConnection> connection = assignServerToThisThread(std::move(client));
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 20fdbfe..2ba9fa2 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -182,6 +182,27 @@
     }
 }
 
+RpcState::CommandData::CommandData(size_t size) : mSize(size) {
+    // The maximum size for regular binder is 1MB for all concurrent
+    // transactions. A very small proportion of transactions are even
+    // larger than a page, but we need to avoid allocating too much
+    // data on behalf of an arbitrary client, or we could risk being in
+    // a position where a single additional allocation could run out of
+    // memory.
+    //
+    // Note, this limit may not reflect the total amount of data allocated for a
+    // transaction (in some cases, additional fixed size amounts are added),
+    // though for rough consistency, we should avoid cases where this data type
+    // is used for multiple dynamic allocations for a single transaction.
+    constexpr size_t kMaxTransactionAllocation = 100 * 1000;
+    if (size == 0) return;
+    if (size > kMaxTransactionAllocation) {
+        ALOGW("Transaction requested too much data allocation %zu", size);
+        return;
+    }
+    mData.reset(new (std::nothrow) uint8_t[size]);
+}
+
 bool RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data, size_t size) {
     LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
 
@@ -326,7 +347,7 @@
             .asyncNumber = asyncNumber,
     };
 
-    ByteVec transactionData(sizeof(RpcWireTransaction) + data.dataSize());
+    CommandData transactionData(sizeof(RpcWireTransaction) + data.dataSize());
     if (!transactionData.valid()) {
         return NO_MEMORY;
     }
@@ -383,7 +404,7 @@
         if (status != OK) return status;
     }
 
-    ByteVec data(command.bodySize);
+    CommandData data(command.bodySize);
     if (!data.valid()) {
         return NO_MEMORY;
     }
@@ -469,7 +490,7 @@
                                    const RpcWireHeader& command) {
     LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_TRANSACT, "command: %d", command.command);
 
-    ByteVec transactionData(command.bodySize);
+    CommandData transactionData(command.bodySize);
     if (!transactionData.valid()) {
         return NO_MEMORY;
     }
@@ -490,7 +511,7 @@
 }
 
 status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                           ByteVec transactionData) {
+                                           CommandData transactionData) {
     if (transactionData.size() < sizeof(RpcWireTransaction)) {
         ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
               sizeof(RpcWireTransaction), transactionData.size());
@@ -640,7 +661,7 @@
                 // justification for const_cast (consider avoiding priority_queue):
                 // - AsyncTodo operator< doesn't depend on 'data' object
                 // - gotta go fast
-                ByteVec data = std::move(
+                CommandData data = std::move(
                         const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top()).data);
                 it->second.asyncTodo.pop();
                 _l.unlock();
@@ -654,7 +675,7 @@
             .status = replyStatus,
     };
 
-    ByteVec replyData(sizeof(RpcWireReply) + reply.dataSize());
+    CommandData replyData(sizeof(RpcWireReply) + reply.dataSize());
     if (!replyData.valid()) {
         return NO_MEMORY;
     }
@@ -684,7 +705,7 @@
 status_t RpcState::processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command) {
     LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command);
 
-    ByteVec commandData(command.bodySize);
+    CommandData commandData(command.bodySize);
     if (!commandData.valid()) {
         return NO_MEMORY;
     }
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index 83d0344..31f8a22 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -101,10 +101,10 @@
      */
     void terminate();
 
-    // alternative to std::vector<uint8_t> that doesn't abort on too big of allocations
-    struct ByteVec {
-        explicit ByteVec(size_t size)
-              : mData(size > 0 ? new (std::nothrow) uint8_t[size] : nullptr), mSize(size) {}
+    // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
+    // large allocations to avoid being requested from allocating too much data.
+    struct CommandData {
+        explicit CommandData(size_t size);
         bool valid() { return mSize == 0 || mData != nullptr; }
         size_t size() { return mSize; }
         uint8_t* data() { return mData.get(); }
@@ -128,7 +128,7 @@
                                            const RpcWireHeader& command);
     [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd,
                                                    const sp<RpcSession>& session,
-                                                   ByteVec transactionData);
+                                                   CommandData transactionData);
     [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd,
                                             const RpcWireHeader& command);
 
@@ -163,7 +163,7 @@
 
         // async transaction queue, _only_ for local binder
         struct AsyncTodo {
-            ByteVec data;
+            CommandData data;
             uint64_t asyncNumber = 0;
 
             bool operator<(const AsyncTodo& o) const {
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 7d0c198..6e27540 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -108,9 +108,16 @@
     void join();
 
     /**
+     * Accept one connection on this server. You must have at least one client
+     * session before calling this.
+     */
+    [[nodiscard]] bool acceptOne();
+
+    /**
      * For debugging!
      */
     std::vector<sp<RpcSession>> listSessions();
+    size_t numUninitializedSessions();
 
     ~RpcServer();
 
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 92ee100..bcc213c 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -114,7 +114,10 @@
 
     status_t readId();
 
-    void join(std::thread thread, base::unique_fd client);
+    // transfer ownership of thread
+    void preJoin(std::thread thread);
+    // join on thread passed to preJoin
+    void join(base::unique_fd client);
     void terminateLocked();
 
     struct RpcConnection : public RefBase {
diff --git a/libs/binder/tests/rpc_fuzzer/Android.bp b/libs/binder/tests/rpc_fuzzer/Android.bp
new file mode 100644
index 0000000..1c75306
--- /dev/null
+++ b/libs/binder/tests/rpc_fuzzer/Android.bp
@@ -0,0 +1,40 @@
+package {
+    // See: http://go/android-license-faq
+    // A large-scale-change added 'default_applicable_licenses' to import
+    // all of the 'license_kinds' from "frameworks_native_license"
+    // to get the below license kinds:
+    //   SPDX-license-identifier-Apache-2.0
+    default_applicable_licenses: ["frameworks_native_license"],
+}
+
+cc_fuzz {
+    name: "binder_rpc_fuzzer",
+    host_supported: true,
+
+    fuzz_config: {
+        cc: ["smoreland@google.com"],
+    },
+
+    srcs: [
+        "main.cpp",
+    ],
+    static_libs: [
+        "libbase",
+        "libcutils",
+        "liblog",
+        "libutils",
+    ],
+
+    target: {
+        android: {
+            shared_libs: [
+                "libbinder",
+            ],
+        },
+        host: {
+            static_libs: [
+                "libbinder",
+            ],
+        },
+    },
+}
diff --git a/libs/binder/tests/rpc_fuzzer/main.cpp b/libs/binder/tests/rpc_fuzzer/main.cpp
new file mode 100644
index 0000000..3603ebe
--- /dev/null
+++ b/libs/binder/tests/rpc_fuzzer/main.cpp
@@ -0,0 +1,121 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <android-base/file.h>
+#include <android-base/logging.h>
+#include <android-base/unique_fd.h>
+#include <binder/Binder.h>
+#include <binder/Parcel.h>
+#include <binder/RpcServer.h>
+#include <binder/RpcSession.h>
+
+#include <sys/resource.h>
+#include <sys/un.h>
+
+namespace android {
+
+static const std::string kSock = std::string(getenv("TMPDIR") ?: "/tmp") +
+        "/binderRpcFuzzerSocket_" + std::to_string(getpid());
+
+size_t getHardMemoryLimit() {
+    struct rlimit limit;
+    CHECK(0 == getrlimit(RLIMIT_AS, &limit)) << errno;
+    return limit.rlim_max;
+}
+
+void setMemoryLimit(size_t cur, size_t max) {
+    const struct rlimit kLimit = {
+            .rlim_cur = cur,
+            .rlim_max = max,
+    };
+    CHECK(0 == setrlimit(RLIMIT_AS, &kLimit)) << errno;
+}
+
+class SomeBinder : public BBinder {
+    status_t onTransact(uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags = 0) {
+        (void)flags;
+
+        if ((code & 1) == 0) {
+            sp<IBinder> binder;
+            (void)data.readStrongBinder(&binder);
+            if (binder != nullptr) {
+                (void)binder->pingBinder();
+            }
+        }
+        if ((code & 2) == 0) {
+            (void)data.readInt32();
+        }
+        if ((code & 4) == 0) {
+            (void)reply->writeStrongBinder(sp<BBinder>::make());
+        }
+
+        return OK;
+    }
+};
+
+extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
+    if (size > 50000) return 0;
+
+    unlink(kSock.c_str());
+
+    sp<RpcServer> server = RpcServer::make();
+    server->setRootObject(sp<SomeBinder>::make());
+    server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
+    CHECK(server->setupUnixDomainServer(kSock.c_str()));
+
+    static constexpr size_t kMemLimit = 1llu * 1024 * 1024 * 1024;
+    size_t hardLimit = getHardMemoryLimit();
+    setMemoryLimit(std::min(kMemLimit, hardLimit), hardLimit);
+
+    std::thread serverThread([=] { (void)server->acceptOne(); });
+
+    sockaddr_un addr{
+            .sun_family = AF_UNIX,
+    };
+    CHECK_LT(kSock.size(), sizeof(addr.sun_path));
+    memcpy(&addr.sun_path, kSock.c_str(), kSock.size());
+
+    base::unique_fd clientFd(TEMP_FAILURE_RETRY(socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)));
+    CHECK_NE(clientFd.get(), -1);
+    CHECK_EQ(0,
+             TEMP_FAILURE_RETRY(
+                     connect(clientFd.get(), reinterpret_cast<sockaddr*>(&addr), sizeof(addr))))
+            << strerror(errno);
+
+    serverThread.join();
+
+    // TODO(b/182938024): fuzz multiple sessions, instead of just one
+
+#if 0
+    // make fuzzer more productive locally by forcing it to create a new session
+    int32_t id = -1;
+    CHECK(base::WriteFully(clientFd, &id, sizeof(id)));
+#endif
+
+    CHECK(base::WriteFully(clientFd, data, size));
+
+    clientFd.reset();
+
+    // TODO(b/185167543): better way to force a server to shutdown
+    while (!server->listSessions().empty() && server->numUninitializedSessions()) {
+        usleep(1);
+    }
+
+    setMemoryLimit(hardLimit, hardLimit);
+
+    return 0;
+}
+
+} // namespace android