Merge "binderRpcBenchmark: test more data sizes"
diff --git a/libs/binder/Android.bp b/libs/binder/Android.bp
index 2027b6e..f34672c 100644
--- a/libs/binder/Android.bp
+++ b/libs/binder/Android.bp
@@ -123,6 +123,7 @@
"RpcSession.cpp",
"RpcServer.cpp",
"RpcState.cpp",
+ "RpcTransportRaw.cpp",
"Static.cpp",
"Stability.cpp",
"Status.cpp",
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 68c12f7..879e462 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -26,6 +26,7 @@
#include <android-base/scopeguard.h>
#include <binder/Parcel.h>
#include <binder/RpcServer.h>
+#include <binder/RpcTransportRaw.h>
#include <log/log.h>
#include "RpcSocketAddress.h"
@@ -37,13 +38,17 @@
using base::ScopeGuard;
using base::unique_fd;
-RpcServer::RpcServer() {}
+RpcServer::RpcServer(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory)
+ : mRpcTransportCtxFactory(std::move(rpcTransportCtxFactory)) {}
RpcServer::~RpcServer() {
(void)shutdown();
}
-sp<RpcServer> RpcServer::make() {
- return sp<RpcServer>::make();
+sp<RpcServer> RpcServer::make(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory) {
+ // Default is without TLS.
+ if (rpcTransportCtxFactory == nullptr)
+ rpcTransportCtxFactory = RpcTransportCtxFactoryRaw::make();
+ return sp<RpcServer>::make(std::move(rpcTransportCtxFactory));
}
void RpcServer::iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction() {
@@ -153,6 +158,10 @@
mJoinThreadRunning = true;
mShutdownTrigger = RpcSession::FdTrigger::make();
LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");
+
+ mCtx = mRpcTransportCtxFactory->newServerCtx();
+ LOG_ALWAYS_FATAL_IF(mCtx == nullptr, "Unable to create RpcTransportCtx with %s sockets",
+ mRpcTransportCtxFactory->toCString());
}
status_t status;
@@ -219,6 +228,7 @@
LOG_RPC_DETAIL("Finished waiting on shutdown.");
mShutdownTrigger = nullptr;
+ mCtx = nullptr;
return true;
}
@@ -244,14 +254,29 @@
// mShutdownTrigger can only be cleared once connection threads have joined.
// It must be set before this thread is started
LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr);
+ LOG_ALWAYS_FATAL_IF(server->mCtx == nullptr);
+
+ status_t status = OK;
+
+ int clientFdForLog = clientFd.get();
+ auto client = server->mCtx->newTransport(std::move(clientFd));
+ if (client == nullptr) {
+ ALOGE("Dropping accept4()-ed socket because sslAccept fails");
+ status = DEAD_OBJECT;
+ // still need to cleanup before we can return
+ } else {
+ LOG_RPC_DETAIL("Created RpcTransport %p for client fd %d", client.get(), clientFdForLog);
+ }
RpcConnectionHeader header;
- status_t status = server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &header,
- sizeof(header));
- if (status != OK) {
- ALOGE("Failed to read ID for client connecting to RPC server: %s",
- statusToString(status).c_str());
- // still need to cleanup before we can return
+ if (status == OK) {
+ status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header,
+ sizeof(header));
+ if (status != OK) {
+ ALOGE("Failed to read ID for client connecting to RPC server: %s",
+ statusToString(status).c_str());
+ // still need to cleanup before we can return
+ }
}
bool incoming = false;
@@ -271,7 +296,7 @@
.version = protocolVersion,
};
- status = server->mShutdownTrigger->interruptableWriteFully(clientFd.get(), &response,
+ status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response,
sizeof(response));
if (status != OK) {
ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
@@ -341,7 +366,7 @@
}
if (incoming) {
- LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(clientFd), true),
+ LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(client), true),
"server state must already be initialized");
return;
}
@@ -350,7 +375,7 @@
session->preJoinThreadOwnership(std::move(thisThread));
}
- auto setupResult = session->preJoinSetup(std::move(clientFd));
+ auto setupResult = session->preJoinSetup(std::move(client));
// avoid strong cycle
server = nullptr;
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index 66f31b8..7565f1b 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -30,6 +30,7 @@
#include <android_runtime/vm.h>
#include <binder/Parcel.h>
#include <binder/RpcServer.h>
+#include <binder/RpcTransportRaw.h>
#include <binder/Stability.h>
#include <jni.h>
#include <utils/String8.h>
@@ -46,7 +47,8 @@
using base::unique_fd;
-RpcSession::RpcSession() {
+RpcSession::RpcSession(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory)
+ : mRpcTransportCtxFactory(std::move(rpcTransportCtxFactory)) {
LOG_RPC_DETAIL("RpcSession created %p", this);
mState = std::make_unique<RpcState>();
@@ -59,8 +61,11 @@
"Should not be able to destroy a session with servers in use.");
}
-sp<RpcSession> RpcSession::make() {
- return sp<RpcSession>::make();
+sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory) {
+ // Default is without TLS.
+ if (rpcTransportCtxFactory == nullptr)
+ rpcTransportCtxFactory = RpcTransportCtxFactoryRaw::make();
+ return sp<RpcSession>::make(std::move(rpcTransportCtxFactory));
}
void RpcSession::setMaxThreads(size_t threads) {
@@ -121,7 +126,19 @@
return false;
}
+bool RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_fd()>&& request) {
+ return setupClient([&](const RpcAddress& sessionId, bool incoming) {
+ // std::move'd from fd becomes -1 (!ok())
+ if (!fd.ok()) {
+ fd = request();
+ if (!fd.ok()) return false;
+ }
+ return initAndAddConnection(std::move(fd), sessionId, incoming);
+ });
+}
+
bool RpcSession::addNullDebuggingClient() {
+ // Note: only works on raw sockets.
unique_fd serverFd(TEMP_FAILURE_RETRY(open("/dev/null", O_WRONLY | O_CLOEXEC)));
if (serverFd == -1) {
@@ -129,7 +146,17 @@
return false;
}
- return addOutgoingConnection(std::move(serverFd), false);
+ auto ctx = mRpcTransportCtxFactory->newClientCtx();
+ if (ctx == nullptr) {
+ ALOGE("Unable to create RpcTransportCtx for null debugging client");
+ return false;
+ }
+ auto server = ctx->newTransport(std::move(serverFd));
+ if (server == nullptr) {
+ ALOGE("Unable to set up RpcTransport");
+ return false;
+ }
+ return addOutgoingConnection(std::move(server), false);
}
sp<IBinder> RpcSession::getRootObject() {
@@ -205,6 +232,10 @@
return mWrite == -1;
}
+status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) {
+ return triggerablePoll(rpcTransport->pollSocket(), event);
+}
+
status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
while (true) {
pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
@@ -223,28 +254,30 @@
}
}
-status_t RpcSession::FdTrigger::interruptableWriteFully(base::borrowed_fd fd, const void* data,
- size_t size) {
+status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport,
+ const void* data, size_t size) {
const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
const uint8_t* end = buffer + size;
MAYBE_WAIT_IN_FLAKE_MODE;
status_t status;
- while ((status = triggerablePoll(fd, POLLOUT)) == OK) {
- ssize_t writeSize = TEMP_FAILURE_RETRY(send(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
- if (writeSize == 0) return DEAD_OBJECT;
-
- if (writeSize < 0) {
- return -errno;
+ while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) {
+ auto writeSize = rpcTransport->send(buffer, end - buffer);
+ if (!writeSize.ok()) {
+ LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
+ return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
}
- buffer += writeSize;
+
+ if (*writeSize == 0) return DEAD_OBJECT;
+
+ buffer += *writeSize;
if (buffer == end) return OK;
}
return status;
}
-status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, void* data,
+status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data,
size_t size) {
uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
uint8_t* end = buffer + size;
@@ -252,14 +285,16 @@
MAYBE_WAIT_IN_FLAKE_MODE;
status_t status;
- while ((status = triggerablePoll(fd, POLLIN)) == OK) {
- ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
- if (readSize == 0) return DEAD_OBJECT; // EOF
-
- if (readSize < 0) {
- return -errno;
+ while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) {
+ auto readSize = rpcTransport->recv(buffer, end - buffer);
+ if (!readSize.ok()) {
+ LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
+ return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
}
- buffer += readSize;
+
+ if (*readSize == 0) return DEAD_OBJECT; // EOF
+
+ buffer += *readSize;
if (buffer == end) return OK;
}
return status;
@@ -312,10 +347,11 @@
}
}
-RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
+RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(
+ std::unique_ptr<RpcTransport> rpcTransport) {
// must be registered to allow arbitrary client code executing commands to
// be able to do nested calls (we can't only read from it)
- sp<RpcConnection> connection = assignIncomingConnectionToThisThread(std::move(fd));
+ sp<RpcConnection> connection = assignIncomingConnectionToThisThread(std::move(rpcTransport));
status_t status;
@@ -439,7 +475,8 @@
return server;
}
-bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
+bool RpcSession::setupClient(
+ const std::function<bool(const RpcAddress& sessionId, bool incoming)>& connectAndInit) {
{
std::lock_guard<std::mutex> _l(mMutex);
LOG_ALWAYS_FATAL_IF(mOutgoingConnections.size() != 0,
@@ -447,7 +484,7 @@
mOutgoingConnections.size());
}
- if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*incoming*/)) return false;
+ if (!connectAndInit(RpcAddress::zero(), false /*incoming*/)) return false;
{
ExclusiveConnection connection;
@@ -466,37 +503,42 @@
// TODO(b/186470974): first risk of blocking
size_t numThreadsAvailable;
if (status_t status = getRemoteMaxThreads(&numThreadsAvailable); status != OK) {
- ALOGE("Could not get max threads after initial session to %s: %s", addr.toString().c_str(),
+ ALOGE("Could not get max threads after initial session setup: %s",
statusToString(status).c_str());
return false;
}
if (status_t status = readId(); status != OK) {
- ALOGE("Could not get session id after initial session to %s; %s", addr.toString().c_str(),
+ ALOGE("Could not get session id after initial session setup: %s",
statusToString(status).c_str());
return false;
}
- // we've already setup one client
- for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
- // TODO(b/189955605): shutdown existing connections?
- if (!setupOneSocketConnection(addr, mId.value(), false /*incoming*/)) return false;
- }
-
// TODO(b/189955605): we should add additional sessions dynamically
// instead of all at once - the other side should be responsible for setting
// up additional connections. We need to create at least one (unless 0 are
// requested to be set) in order to allow the other side to reliably make
// any requests at all.
+ // we've already setup one client
+ for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
+ if (!connectAndInit(mId.value(), false /*incoming*/)) return false;
+ }
+
for (size_t i = 0; i < mMaxThreads; i++) {
- if (!setupOneSocketConnection(addr, mId.value(), true /*incoming*/)) return false;
+ if (!connectAndInit(mId.value(), true /*incoming*/)) return false;
}
return true;
}
-bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id,
+bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
+ return setupClient([&](const RpcAddress& sessionId, bool incoming) {
+ return setupOneSocketConnection(addr, sessionId, incoming);
+ });
+}
+
+bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& sessionId,
bool incoming) {
for (size_t tries = 0; tries < 5; tries++) {
if (tries > 0) usleep(10000);
@@ -520,36 +562,60 @@
strerror(savedErrno));
return false;
}
-
- RpcConnectionHeader header{
- .version = mProtocolVersion.value_or(RPC_WIRE_PROTOCOL_VERSION),
- .options = 0,
- };
- memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress));
-
- if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
-
- if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) {
- int savedErrno = errno;
- ALOGE("Could not write connection header to socket at %s: %s", addr.toString().c_str(),
- strerror(savedErrno));
- return false;
- }
-
LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());
- if (incoming) {
- return addIncomingConnection(std::move(serverFd));
- } else {
- return addOutgoingConnection(std::move(serverFd), true);
- }
+ return initAndAddConnection(std::move(serverFd), sessionId, incoming);
}
ALOGE("Ran out of retries to connect to %s", addr.toString().c_str());
return false;
}
-bool RpcSession::addIncomingConnection(unique_fd fd) {
+bool RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, bool incoming) {
+ auto ctx = mRpcTransportCtxFactory->newClientCtx();
+ if (ctx == nullptr) {
+ ALOGE("Unable to create client RpcTransportCtx with %s sockets",
+ mRpcTransportCtxFactory->toCString());
+ return false;
+ }
+ auto server = ctx->newTransport(std::move(fd));
+ if (server == nullptr) {
+ ALOGE("Unable to set up RpcTransport in %s context", mRpcTransportCtxFactory->toCString());
+ return false;
+ }
+
+ LOG_RPC_DETAIL("Socket at client with RpcTransport %p", server.get());
+
+ RpcConnectionHeader header{
+ .version = mProtocolVersion.value_or(RPC_WIRE_PROTOCOL_VERSION),
+ .options = 0,
+ };
+ memcpy(&header.sessionId, &sessionId.viewRawEmbedded(), sizeof(RpcWireAddress));
+
+ if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
+
+ auto sentHeader = server->send(&header, sizeof(header));
+ if (!sentHeader.ok()) {
+ ALOGE("Could not write connection header to socket: %s",
+ sentHeader.error().message().c_str());
+ return false;
+ }
+ if (*sentHeader != sizeof(header)) {
+ ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd",
+ *sentHeader, sizeof(header));
+ return false;
+ }
+
+ LOG_RPC_DETAIL("Socket at client: header sent");
+
+ if (incoming) {
+ return addIncomingConnection(std::move(server));
+ } else {
+ return addOutgoingConnection(std::move(server), true /*init*/);
+ }
+}
+
+bool RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) {
std::mutex mutex;
std::condition_variable joinCv;
std::unique_lock<std::mutex> lock(mutex);
@@ -558,13 +624,13 @@
bool ownershipTransferred = false;
thread = std::thread([&]() {
std::unique_lock<std::mutex> threadLock(mutex);
- unique_fd movedFd = std::move(fd);
+ std::unique_ptr<RpcTransport> movedRpcTransport = std::move(rpcTransport);
// NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
sp<RpcSession> session = thiz;
session->preJoinThreadOwnership(std::move(thread));
// only continue once we have a response or the connection fails
- auto setupResult = session->preJoinSetup(std::move(movedFd));
+ auto setupResult = session->preJoinSetup(std::move(movedRpcTransport));
ownershipTransferred = true;
threadLock.unlock();
@@ -578,7 +644,7 @@
return true;
}
-bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) {
+bool RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) {
sp<RpcConnection> connection = sp<RpcConnection>::make();
{
std::lock_guard<std::mutex> _l(mMutex);
@@ -591,7 +657,7 @@
if (mShutdownTrigger == nullptr) return false;
}
- connection->fd = std::move(fd);
+ connection->rpcTransport = std::move(rpcTransport);
connection->exclusiveTid = gettid();
mOutgoingConnections.push_back(connection);
}
@@ -626,7 +692,8 @@
return true;
}
-sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(unique_fd fd) {
+sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(
+ std::unique_ptr<RpcTransport> rpcTransport) {
std::lock_guard<std::mutex> _l(mMutex);
if (mIncomingConnections.size() >= mMaxThreads) {
@@ -644,7 +711,7 @@
}
sp<RpcConnection> session = sp<RpcConnection>::make();
- session->fd = std::move(fd);
+ session->rpcTransport = std::move(rpcTransport);
session->exclusiveTid = gettid();
mIncomingConnections.push_back(session);
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 36c03c5..23382c3 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -273,7 +273,7 @@
status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
const sp<RpcSession>& session, const char* what, const void* data,
size_t size) {
- LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, connection->fd.get(),
+ LOG_RPC_DETAIL("Sending %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
android::base::HexString(data, size).c_str());
if (size > std::numeric_limits<ssize_t>::max()) {
@@ -282,11 +282,12 @@
return BAD_VALUE;
}
- if (status_t status = session->mShutdownTrigger->interruptableWriteFully(connection->fd.get(),
- data, size);
+ if (status_t status =
+ session->mShutdownTrigger->interruptableWriteFully(connection->rpcTransport.get(),
+ data, size);
status != OK) {
- LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on fd %d, error: %s", what, size,
- connection->fd.get(), statusToString(status).c_str());
+ LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
+ connection->rpcTransport.get(), statusToString(status).c_str());
(void)session->shutdownAndWait(false);
return status;
}
@@ -304,14 +305,15 @@
}
if (status_t status =
- session->mShutdownTrigger->interruptableReadFully(connection->fd.get(), data, size);
+ session->mShutdownTrigger->interruptableReadFully(connection->rpcTransport.get(),
+ data, size);
status != OK) {
- LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size,
- connection->fd.get(), statusToString(status).c_str());
+ LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
+ connection->rpcTransport.get(), statusToString(status).c_str());
return status;
}
- LOG_RPC_DETAIL("Received %s on fd %d: %s", what, connection->fd.get(),
+ LOG_RPC_DETAIL("Received %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
android::base::HexString(data, size).c_str());
return OK;
}
@@ -490,7 +492,8 @@
return status;
if (flags & IBinder::FLAG_ONEWAY) {
- LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", connection->fd.get());
+ LOG_RPC_DETAIL("Oneway command, so no longer waiting on RpcTransport %p",
+ connection->rpcTransport.get());
// Do not wait on result.
// However, too many oneway calls may cause refcounts to build up and fill up the socket,
@@ -585,7 +588,7 @@
status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection,
const sp<RpcSession>& session, CommandType type) {
- LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", connection->fd.get());
+ LOG_RPC_DETAIL("getAndExecuteCommand on RpcTransport %p", connection->rpcTransport.get());
RpcWireHeader command;
if (status_t status = rpcRec(connection, session, "command header", &command, sizeof(command));
@@ -598,8 +601,7 @@
status_t RpcState::drainCommands(const sp<RpcSession::RpcConnection>& connection,
const sp<RpcSession>& session, CommandType type) {
uint8_t buf;
- while (0 < TEMP_FAILURE_RETRY(
- recv(connection->fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
+ while (connection->rpcTransport->peek(&buf, sizeof(buf)).value_or(0) > 0) {
status_t status = getAndExecuteCommand(connection, session, type);
if (status != OK) return status;
}
diff --git a/libs/binder/RpcTransportRaw.cpp b/libs/binder/RpcTransportRaw.cpp
new file mode 100644
index 0000000..2fc1945
--- /dev/null
+++ b/libs/binder/RpcTransportRaw.cpp
@@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "RpcRawTransport"
+#include <log/log.h>
+
+#include <binder/RpcTransportRaw.h>
+
+#include "RpcState.h"
+
+using android::base::ErrnoError;
+using android::base::Result;
+
+namespace android {
+
+namespace {
+
+// RpcTransport with TLS disabled.
+class RpcTransportRaw : public RpcTransport {
+public:
+ explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {}
+ Result<size_t> send(const void *buf, size_t size) override {
+ ssize_t ret = TEMP_FAILURE_RETRY(::send(mSocket.get(), buf, size, MSG_NOSIGNAL));
+ if (ret < 0) {
+ return ErrnoError() << "send()";
+ }
+ return ret;
+ }
+ Result<size_t> recv(void *buf, size_t size) override {
+ ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_NOSIGNAL));
+ if (ret < 0) {
+ return ErrnoError() << "recv()";
+ }
+ return ret;
+ }
+ Result<size_t> peek(void *buf, size_t size) override {
+ ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_PEEK | MSG_DONTWAIT));
+ if (ret < 0) {
+ return ErrnoError() << "recv(MSG_PEEK)";
+ }
+ return ret;
+ }
+ bool pending() override { return false; }
+ android::base::borrowed_fd pollSocket() const override { return mSocket; }
+
+private:
+ android::base::unique_fd mSocket;
+};
+
+// RpcTransportCtx with TLS disabled.
+class RpcTransportCtxRaw : public RpcTransportCtx {
+public:
+ std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd) const {
+ return std::make_unique<RpcTransportRaw>(std::move(fd));
+ }
+};
+} // namespace
+
+std::unique_ptr<RpcTransportCtx> RpcTransportCtxFactoryRaw::newServerCtx() const {
+ return std::make_unique<RpcTransportCtxRaw>();
+}
+
+std::unique_ptr<RpcTransportCtx> RpcTransportCtxFactoryRaw::newClientCtx() const {
+ return std::make_unique<RpcTransportCtxRaw>();
+}
+
+const char *RpcTransportCtxFactoryRaw::toCString() const {
+ return "raw";
+}
+
+std::unique_ptr<RpcTransportCtxFactory> RpcTransportCtxFactoryRaw::make() {
+ return std::unique_ptr<RpcTransportCtxFactoryRaw>(new RpcTransportCtxFactoryRaw());
+}
+
+} // namespace android
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index a1fba8a..4abf3b9 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -19,6 +19,7 @@
#include <binder/IBinder.h>
#include <binder/RpcAddress.h>
#include <binder/RpcSession.h>
+#include <binder/RpcTransport.h>
#include <utils/Errors.h>
#include <utils/RefBase.h>
@@ -47,7 +48,8 @@
*/
class RpcServer final : public virtual RefBase, private RpcSession::EventListener {
public:
- static sp<RpcServer> make();
+ static sp<RpcServer> make(
+ std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory = nullptr);
/**
* This represents a session for responses, e.g.:
@@ -167,7 +169,7 @@
private:
friend sp<RpcServer>;
- RpcServer();
+ explicit RpcServer(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory);
void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) override;
void onSessionIncomingThreadEnded() override;
@@ -175,6 +177,7 @@
static void establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd);
bool setupSocketServer(const RpcSocketAddress& address);
+ const std::unique_ptr<RpcTransportCtxFactory> mRpcTransportCtxFactory;
bool mAgreedExperimental = false;
size_t mMaxThreads = 1;
std::optional<uint32_t> mProtocolVersion;
@@ -189,6 +192,7 @@
std::map<RpcAddress, sp<RpcSession>> mSessions;
std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
std::condition_variable mShutdownCv;
+ std::unique_ptr<RpcTransportCtx> mCtx;
};
} // namespace android
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 1f7c029..0b46606 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -18,6 +18,8 @@
#include <android-base/unique_fd.h>
#include <binder/IBinder.h>
#include <binder/RpcAddress.h>
+#include <binder/RpcSession.h>
+#include <binder/RpcTransport.h>
#include <utils/Errors.h>
#include <utils/RefBase.h>
@@ -36,6 +38,7 @@
class RpcServer;
class RpcSocketAddress;
class RpcState;
+class RpcTransport;
constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_NEXT = 0;
constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL = 0xF0000000;
@@ -48,7 +51,8 @@
*/
class RpcSession final : public virtual RefBase {
public:
- static sp<RpcSession> make();
+ static sp<RpcSession> make(
+ std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory = nullptr);
/**
* Set the maximum number of threads allowed to be made (for things like callbacks).
@@ -87,6 +91,19 @@
[[nodiscard]] bool setupInetClient(const char* addr, unsigned int port);
/**
+ * Starts talking to an RPC server which has already been connected to. This
+ * is expected to be used when another process has permission to connect to
+ * a binder RPC service, but this process only has permission to talk to
+ * that service.
+ *
+ * For convenience, if 'fd' is -1, 'request' will be called.
+ *
+ * For future compatibility, 'request' should not reference any stack data.
+ */
+ [[nodiscard]] bool setupPreconnectedClient(base::unique_fd fd,
+ std::function<base::unique_fd()>&& request);
+
+ /**
* For debugging!
*
* Sets up an empty connection. All queries to this connection which require a
@@ -142,7 +159,7 @@
friend sp<RpcSession>;
friend RpcServer;
friend RpcState;
- RpcSession();
+ explicit RpcSession(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory);
/** This is not a pipe. */
struct FdTrigger {
@@ -178,10 +195,12 @@
* true - succeeded in completely processing 'size'
* false - interrupted (failure or trigger)
*/
- status_t interruptableReadFully(base::borrowed_fd fd, void* data, size_t size);
- status_t interruptableWriteFully(base::borrowed_fd fd, const void* data, size_t size);
+ status_t interruptableReadFully(RpcTransport* rpcTransport, void* data, size_t size);
+ status_t interruptableWriteFully(RpcTransport* rpcTransport, const void* data, size_t size);
private:
+ status_t triggerablePoll(RpcTransport* rpcTransport, int16_t event);
+
base::unique_fd mWrite;
base::unique_fd mRead;
};
@@ -204,7 +223,7 @@
};
struct RpcConnection : public RefBase {
- base::unique_fd fd;
+ std::unique_ptr<RpcTransport> rpcTransport;
// whether this or another thread is currently using this fd to make
// or receive transactions.
@@ -230,19 +249,24 @@
// Status of setup
status_t status;
};
- PreJoinSetupResult preJoinSetup(base::unique_fd fd);
+ PreJoinSetupResult preJoinSetup(std::unique_ptr<RpcTransport> rpcTransport);
// join on thread passed to preJoinThreadOwnership
static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result);
+ [[nodiscard]] bool setupClient(
+ const std::function<bool(const RpcAddress& sessionId, bool incoming)>& connectAndInit);
[[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address);
[[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address,
- const RpcAddress& sessionId, bool server);
- [[nodiscard]] bool addIncomingConnection(base::unique_fd fd);
- [[nodiscard]] bool addOutgoingConnection(base::unique_fd fd, bool init);
+ const RpcAddress& sessionId, bool incoming);
+ [[nodiscard]] bool initAndAddConnection(base::unique_fd fd, const RpcAddress& sessionId,
+ bool incoming);
+ [[nodiscard]] bool addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport);
+ [[nodiscard]] bool addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init);
[[nodiscard]] bool setForServer(const wp<RpcServer>& server,
const wp<RpcSession::EventListener>& eventListener,
const RpcAddress& sessionId);
- sp<RpcConnection> assignIncomingConnectionToThisThread(base::unique_fd fd);
+ sp<RpcConnection> assignIncomingConnectionToThisThread(
+ std::unique_ptr<RpcTransport> rpcTransport);
[[nodiscard]] bool removeIncomingConnection(const sp<RpcConnection>& connection);
enum class ConnectionUse {
@@ -275,6 +299,8 @@
bool mReentrant = false;
};
+ const std::unique_ptr<RpcTransportCtxFactory> mRpcTransportCtxFactory;
+
// On the other side of a session, for each of mOutgoingConnections here, there should
// be one of mIncomingConnections on the other side (and vice versa).
//
diff --git a/libs/binder/include/binder/RpcTransport.h b/libs/binder/include/binder/RpcTransport.h
new file mode 100644
index 0000000..1164600
--- /dev/null
+++ b/libs/binder/include/binder/RpcTransport.h
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Wraps the transport layer of RPC. Implementation may use plain sockets or TLS.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include <android-base/result.h>
+#include <android-base/unique_fd.h>
+
+namespace android {
+
+// Represents a socket connection.
+class RpcTransport {
+public:
+ virtual ~RpcTransport() = default;
+
+ // replacement of ::send(). errno may not be set if TLS is enabled.
+ virtual android::base::Result<size_t> send(const void *buf, size_t size) = 0;
+
+ // replacement of ::recv(). errno may not be set if TLS is enabled.
+ virtual android::base::Result<size_t> recv(void *buf, size_t size) = 0;
+
+ // replacement of ::recv(MSG_PEEK). errno may not be set if TLS is enabled.
+ //
+ // Implementation details:
+ // - For TLS, this may invoke syscalls and read data from the transport
+ // into an internal buffer in userspace. After that, pending() == true.
+ // - For raw sockets, this calls ::recv(MSG_PEEK), which leaves the data in the kernel buffer;
+ // pending() is always false.
+ virtual android::base::Result<size_t> peek(void *buf, size_t size) = 0;
+
+ // Returns true if there are data pending in a userspace buffer that RpcTransport holds.
+ //
+ // Implementation details:
+ // - For TLS, this does not invoke any syscalls or read any data from the
+ // transport. This only returns whether there are data pending in the internal buffer in
+ // userspace.
+ // - For raw sockets, this always returns false.
+ virtual bool pending() = 0;
+
+ // Returns fd for polling.
+ //
+ // Do not directly read / write on this raw fd!
+ [[nodiscard]] virtual android::base::borrowed_fd pollSocket() const = 0;
+
+protected:
+ RpcTransport() = default;
+};
+
+// Represents the context that generates the socket connection.
+class RpcTransportCtx {
+public:
+ virtual ~RpcTransportCtx() = default;
+ [[nodiscard]] virtual std::unique_ptr<RpcTransport> newTransport(
+ android::base::unique_fd fd) const = 0;
+
+protected:
+ RpcTransportCtx() = default;
+};
+
+// A factory class that generates RpcTransportCtx.
+class RpcTransportCtxFactory {
+public:
+ virtual ~RpcTransportCtxFactory() = default;
+ // Creates server context.
+ [[nodiscard]] virtual std::unique_ptr<RpcTransportCtx> newServerCtx() const = 0;
+
+ // Creates client context.
+ [[nodiscard]] virtual std::unique_ptr<RpcTransportCtx> newClientCtx() const = 0;
+
+ // Return a short description of this transport (e.g. "raw"). For logging / debugging / testing
+ // only.
+ [[nodiscard]] virtual const char *toCString() const = 0;
+
+protected:
+ RpcTransportCtxFactory() = default;
+};
+
+} // namespace android
diff --git a/libs/binder/include/binder/RpcTransportRaw.h b/libs/binder/include/binder/RpcTransportRaw.h
new file mode 100644
index 0000000..6fb1f92
--- /dev/null
+++ b/libs/binder/include/binder/RpcTransportRaw.h
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Wraps the transport layer of RPC. Implementation uses plain sockets.
+// Note: don't use directly. You probably want newServerRpcTransportCtx / newClientRpcTransportCtx.
+
+#pragma once
+
+#include <memory>
+
+#include <binder/RpcTransport.h>
+
+namespace android {
+
+// RpcTransportCtxFactory with TLS disabled.
+class RpcTransportCtxFactoryRaw : public RpcTransportCtxFactory {
+public:
+ static std::unique_ptr<RpcTransportCtxFactory> make();
+
+ std::unique_ptr<RpcTransportCtx> newServerCtx() const override;
+ std::unique_ptr<RpcTransportCtx> newClientCtx() const override;
+ const char* toCString() const override;
+
+private:
+ RpcTransportCtxFactoryRaw() = default;
+};
+
+} // namespace android
diff --git a/libs/binder/ndk/include_cpp/android/binder_interface_utils.h b/libs/binder/ndk/include_cpp/android/binder_interface_utils.h
index 70a3906..6c44726 100644
--- a/libs/binder/ndk/include_cpp/android/binder_interface_utils.h
+++ b/libs/binder/ndk/include_cpp/android/binder_interface_utils.h
@@ -43,23 +43,31 @@
namespace ndk {
/**
+ * analog using std::shared_ptr for internally held refcount
+ *
* ref must be called at least one time during the lifetime of this object. The recommended way to
* construct this object is with SharedRefBase::make.
- *
- * Note - this class used to not inherit from enable_shared_from_this, so
- * std::make_shared works, but it won't be portable against old copies of this
- * class.
*/
-class SharedRefBase : public std::enable_shared_from_this<SharedRefBase> {
+class SharedRefBase {
public:
SharedRefBase() {}
- virtual ~SharedRefBase() {}
+ virtual ~SharedRefBase() {
+ std::call_once(mFlagThis, [&]() {
+ __assert(__FILE__, __LINE__, "SharedRefBase: no ref created during lifetime");
+ });
+ }
/**
* A shared_ptr must be held to this object when this is called. This must be called once during
* the lifetime of this object.
*/
- std::shared_ptr<SharedRefBase> ref() { return shared_from_this(); }
+ std::shared_ptr<SharedRefBase> ref() {
+ std::shared_ptr<SharedRefBase> thiz = mThis.lock();
+
+ std::call_once(mFlagThis, [&]() { mThis = thiz = std::shared_ptr<SharedRefBase>(this); });
+
+ return thiz;
+ }
/**
* Convenience method for a ref (see above) which automatically casts to the desired child type.
@@ -78,13 +86,8 @@
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
T* t = new T(std::forward<Args>(args)...);
#pragma clang diagnostic pop
-
- // T may have a private (though necessarily virtual!) destructor, so we
- // can't use refbase. For getting the first ref, we don't use ref()
- // since the internal shared_ptr isn't guaranteed to exist yet.
- SharedRefBase* base = static_cast<SharedRefBase*>(t);
- std::shared_ptr<SharedRefBase> strongBase(base);
- return std::static_pointer_cast<T>(strongBase);
+ // warning: Potential leak of memory pointed to by 't' [clang-analyzer-unix.Malloc]
+ return t->template ref<T>(); // NOLINT(clang-analyzer-unix.Malloc)
}
static void operator delete(void* p) { std::free(p); }
@@ -97,9 +100,13 @@
#if !defined(__ANDROID_API__) || __ANDROID_API__ >= 30 || defined(__ANDROID_APEX__)
private:
#else
- [[deprecated("Prefer SharedRefBase::make<T>(...) or std::make_shared<T>() if possible.")]]
+ [[deprecated("Prefer SharedRefBase::make<T>(...) if possible.")]]
#endif
static void* operator new(size_t s) { return std::malloc(s); }
+
+ private:
+ std::once_flag mFlagThis;
+ std::weak_ptr<SharedRefBase> mThis;
};
/**
diff --git a/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp b/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp
index 94cc086..5ad390e 100644
--- a/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp
+++ b/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp
@@ -54,15 +54,6 @@
constexpr uint64_t kContextTestValue = 0xb4e42fb4d9a1d715;
class MyBinderNdkUnitTest : public aidl::BnBinderNdkUnitTest {
- public:
- MyBinderNdkUnitTest() = default;
- MyBinderNdkUnitTest(bool* deleted) : deleted(deleted) {}
- ~MyBinderNdkUnitTest() {
- if (deleted) {
- *deleted = true;
- }
- }
-
ndk::ScopedAStatus repeatInt(int32_t in, int32_t* out) {
*out = in;
return ndk::ScopedAStatus::ok();
@@ -131,7 +122,6 @@
}
uint64_t contextTestValue = kContextTestValue;
- bool* deleted = nullptr;
};
int generatedService() {
@@ -234,27 +224,6 @@
return true;
}
-TEST(NdkBinder, MakeShared) {
- const char* kInstance = "make_shared_test_instance";
- bool deleted = false;
-
- {
- auto service = std::make_shared<MyBinderNdkUnitTest>(&deleted);
- auto binder = service->asBinder();
- ASSERT_EQ(EX_NONE, AServiceManager_addService(binder.get(), kInstance));
- auto binder2 = ndk::SpAIBinder(AServiceManager_checkService(kInstance));
- ASSERT_EQ(binder.get(), binder2.get());
-
- // overwrite service
- ASSERT_EQ(EX_NONE,
- AServiceManager_addService(
- std::make_shared<MyBinderNdkUnitTest>(&deleted)->asBinder().get(),
- kInstance));
- }
-
- EXPECT_TRUE(deleted);
-}
-
TEST(NdkBinder, GetServiceThatDoesntExist) {
sp<IFoo> foo = IFoo::getService("asdfghkl;");
EXPECT_EQ(nullptr, foo.get());
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 44ed229..36a6804 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -29,6 +29,8 @@
#include <binder/ProcessState.h>
#include <binder/RpcServer.h>
#include <binder/RpcSession.h>
+#include <binder/RpcTransport.h>
+#include <binder/RpcTransportRaw.h>
#include <gtest/gtest.h>
#include <chrono>
@@ -40,6 +42,7 @@
#include <sys/prctl.h>
#include <unistd.h>
+#include "../RpcSocketAddress.h" // for testing preconnected clients
#include "../RpcState.h" // for debugging
#include "../vm_sockets.h" // for VMADDR_*
@@ -51,6 +54,21 @@
RPC_WIRE_PROTOCOL_VERSION == RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL);
const char* kLocalInetAddress = "127.0.0.1";
+enum class RpcSecurity { RAW };
+
+static inline std::vector<RpcSecurity> RpcSecurityValues() {
+ return {RpcSecurity::RAW};
+}
+
+static inline std::unique_ptr<RpcTransportCtxFactory> newFactory(RpcSecurity rpcSecurity) {
+ switch (rpcSecurity) {
+ case RpcSecurity::RAW:
+ return RpcTransportCtxFactoryRaw::make();
+ default:
+ LOG_ALWAYS_FATAL("Unknown RpcSecurity %d", rpcSecurity);
+ }
+}
+
TEST(BinderRpcParcel, EntireParcelFormatted) {
Parcel p;
p.writeInt32(3);
@@ -58,10 +76,17 @@
EXPECT_DEATH(p.markForBinder(sp<BBinder>::make()), "");
}
-TEST(BinderRpc, SetExternalServer) {
+class BinderRpcSimple : public ::testing::TestWithParam<RpcSecurity> {
+public:
+ static std::string PrintTestParam(const ::testing::TestParamInfo<ParamType>& info) {
+ return newFactory(info.param)->toCString();
+ }
+};
+
+TEST_P(BinderRpcSimple, SetExternalServerTest) {
base::unique_fd sink(TEMP_FAILURE_RETRY(open("/dev/null", O_RDWR)));
int sinkFd = sink.get();
- auto server = RpcServer::make();
+ auto server = RpcServer::make(newFactory(GetParam()));
server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
ASSERT_FALSE(server->hasServer());
ASSERT_TRUE(server->setupExternalServer(std::move(sink)));
@@ -385,12 +410,15 @@
};
enum class SocketType {
+ PRECONNECTED,
UNIX,
VSOCK,
INET,
};
-static inline std::string PrintSocketType(const testing::TestParamInfo<SocketType>& info) {
- switch (info.param) {
+static inline std::string PrintToString(SocketType socketType) {
+ switch (socketType) {
+ case SocketType::PRECONNECTED:
+ return "preconnected_uds";
case SocketType::UNIX:
return "unix_domain_socket";
case SocketType::VSOCK:
@@ -403,7 +431,21 @@
}
}
-class BinderRpc : public ::testing::TestWithParam<SocketType> {
+static base::unique_fd connectToUds(const char* addrStr) {
+ UnixSocketAddress addr(addrStr);
+ base::unique_fd serverFd(
+ TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
+ int savedErrno = errno;
+ CHECK(serverFd.ok()) << "Could not create socket " << addrStr << ": " << strerror(savedErrno);
+
+ if (0 != TEMP_FAILURE_RETRY(connect(serverFd.get(), addr.addr(), addr.addrSize()))) {
+ int savedErrno = errno;
+ LOG(FATAL) << "Could not connect to socket " << addrStr << ": " << strerror(savedErrno);
+ }
+ return serverFd;
+}
+
+class BinderRpc : public ::testing::TestWithParam<std::tuple<SocketType, RpcSecurity>> {
public:
struct Options {
size_t numThreads = 1;
@@ -411,13 +453,19 @@
size_t numIncomingConnections = 0;
};
+ static inline std::string PrintParamInfo(const testing::TestParamInfo<ParamType>& info) {
+ auto [type, security] = info.param;
+ return PrintToString(type) + "_" + newFactory(security)->toCString();
+ }
+
// This creates a new process serving an interface on a certain number of
// threads.
ProcessSession createRpcTestSocketServerProcess(
const Options& options, const std::function<void(const sp<RpcServer>&)>& configure) {
CHECK_GE(options.numSessions, 1) << "Must have at least one session to a server";
- SocketType socketType = GetParam();
+ SocketType socketType = std::get<0>(GetParam());
+ RpcSecurity rpcSecurity = std::get<1>(GetParam());
unsigned int vsockPort = allocateVsockPort();
std::string addr = allocateSocketAddress();
@@ -425,7 +473,7 @@
auto ret = ProcessSession{
.host = Process([&](android::base::borrowed_fd writeEnd) {
- sp<RpcServer> server = RpcServer::make();
+ sp<RpcServer> server = RpcServer::make(newFactory(rpcSecurity));
server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
server->setMaxThreads(options.numThreads);
@@ -433,6 +481,8 @@
unsigned int outPort = 0;
switch (socketType) {
+ case SocketType::PRECONNECTED:
+ [[fallthrough]];
case SocketType::UNIX:
CHECK(server->setupUnixDomainServer(addr.c_str())) << addr;
break;
@@ -467,10 +517,16 @@
}
for (size_t i = 0; i < options.numSessions; i++) {
- sp<RpcSession> session = RpcSession::make();
+ sp<RpcSession> session = RpcSession::make(newFactory(rpcSecurity));
session->setMaxThreads(options.numIncomingConnections);
switch (socketType) {
+ case SocketType::PRECONNECTED:
+ if (session->setupPreconnectedClient({}, [=]() {
+ return connectToUds(addr.c_str());
+ }))
+ goto success;
+ break;
case SocketType::UNIX:
if (session->setupUnixDomainClient(addr.c_str())) goto success;
break;
@@ -1131,13 +1187,14 @@
}
static bool testSupportVsockLoopback() {
+ // We don't need to enable TLS to know if vsock is supported.
unsigned int vsockPort = allocateVsockPort();
- sp<RpcServer> server = RpcServer::make();
+ sp<RpcServer> server = RpcServer::make(RpcTransportCtxFactoryRaw::make());
server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
CHECK(server->setupVsockServer(vsockPort));
server->start();
- sp<RpcSession> session = RpcSession::make();
+ sp<RpcSession> session = RpcSession::make(RpcTransportCtxFactoryRaw::make());
bool okay = session->setupVsockClient(VMADDR_CID_LOCAL, vsockPort);
while (!server->shutdown()) usleep(10000);
ALOGE("Detected vsock loopback supported: %d", okay);
@@ -1145,7 +1202,7 @@
}
static std::vector<SocketType> testSocketTypes() {
- std::vector<SocketType> ret = {SocketType::UNIX, SocketType::INET};
+ std::vector<SocketType> ret = {SocketType::PRECONNECTED, SocketType::UNIX, SocketType::INET};
static bool hasVsockLoopback = testSupportVsockLoopback();
@@ -1156,10 +1213,13 @@
return ret;
}
-INSTANTIATE_TEST_CASE_P(PerSocket, BinderRpc, ::testing::ValuesIn(testSocketTypes()),
- PrintSocketType);
+INSTANTIATE_TEST_CASE_P(PerSocket, BinderRpc,
+ ::testing::Combine(::testing::ValuesIn(testSocketTypes()),
+ ::testing::ValuesIn(RpcSecurityValues())),
+ BinderRpc::PrintParamInfo);
-class BinderRpcServerRootObject : public ::testing::TestWithParam<std::tuple<bool, bool>> {};
+class BinderRpcServerRootObject
+ : public ::testing::TestWithParam<std::tuple<bool, bool, RpcSecurity>> {};
TEST_P(BinderRpcServerRootObject, WeakRootObject) {
using SetFn = std::function<void(RpcServer*, sp<IBinder>)>;
@@ -1167,8 +1227,8 @@
return isStrong ? SetFn(&RpcServer::setRootObject) : SetFn(&RpcServer::setRootObjectWeak);
};
- auto server = RpcServer::make();
- auto [isStrong1, isStrong2] = GetParam();
+ auto [isStrong1, isStrong2, rpcSecurity] = GetParam();
+ auto server = RpcServer::make(newFactory(rpcSecurity));
auto binder1 = sp<BBinder>::make();
IBinder* binderRaw1 = binder1.get();
setRootObject(isStrong1)(server.get(), binder1);
@@ -1185,7 +1245,8 @@
}
INSTANTIATE_TEST_CASE_P(BinderRpc, BinderRpcServerRootObject,
- ::testing::Combine(::testing::Bool(), ::testing::Bool()));
+ ::testing::Combine(::testing::Bool(), ::testing::Bool(),
+ ::testing::ValuesIn(RpcSecurityValues())));
class OneOffSignal {
public:
@@ -1208,10 +1269,10 @@
bool mValue = false;
};
-TEST(BinderRpc, Shutdown) {
+TEST_P(BinderRpcSimple, Shutdown) {
auto addr = allocateSocketAddress();
unlink(addr.c_str());
- auto server = RpcServer::make();
+ auto server = RpcServer::make(newFactory(GetParam()));
server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
ASSERT_TRUE(server->setupUnixDomainServer(addr.c_str()));
auto joinEnds = std::make_shared<OneOffSignal>();
@@ -1272,6 +1333,9 @@
ASSERT_EQ(OK, rpcBinder->pingBinder());
}
+INSTANTIATE_TEST_CASE_P(BinderRpc, BinderRpcSimple, ::testing::ValuesIn(RpcSecurityValues()),
+ BinderRpcSimple::PrintTestParam);
+
} // namespace android
int main(int argc, char** argv) {
diff --git a/libs/binder/tests/parcel_fuzzer/random_parcel.cpp b/libs/binder/tests/parcel_fuzzer/random_parcel.cpp
index 92fdc72..a2472f8 100644
--- a/libs/binder/tests/parcel_fuzzer/random_parcel.cpp
+++ b/libs/binder/tests/parcel_fuzzer/random_parcel.cpp
@@ -19,6 +19,7 @@
#include <android-base/logging.h>
#include <binder/IServiceManager.h>
#include <binder/RpcSession.h>
+#include <binder/RpcTransportRaw.h>
#include <fuzzbinder/random_fd.h>
#include <utils/String16.h>
@@ -35,7 +36,7 @@
void fillRandomParcel(Parcel* p, FuzzedDataProvider&& provider) {
if (provider.ConsumeBool()) {
- auto session = sp<RpcSession>::make();
+ auto session = RpcSession::make(RpcTransportCtxFactoryRaw::make());
CHECK(session->addNullDebuggingClient());
p->markForRpc(session);
fillRandomParcelData(p, std::move(provider));