Merge "binder: setRpcClientDebug drops numThreads argument."
diff --git a/cmds/installd/dexopt.cpp b/cmds/installd/dexopt.cpp
index 204953c..cc0434d 100644
--- a/cmds/installd/dexopt.cpp
+++ b/cmds/installd/dexopt.cpp
@@ -292,8 +292,8 @@
}
}
-static unique_fd create_profile(uid_t uid, const std::string& profile, int32_t flags) {
- unique_fd fd(TEMP_FAILURE_RETRY(open(profile.c_str(), flags, 0600)));
+static unique_fd create_profile(uid_t uid, const std::string& profile, int32_t flags, mode_t mode) {
+ unique_fd fd(TEMP_FAILURE_RETRY(open(profile.c_str(), flags, mode)));
if (fd.get() < 0) {
if (errno != EEXIST) {
PLOG(ERROR) << "Failed to create profile " << profile;
@@ -310,7 +310,7 @@
return fd;
}
-static unique_fd open_profile(uid_t uid, const std::string& profile, int32_t flags) {
+static unique_fd open_profile(uid_t uid, const std::string& profile, int32_t flags, mode_t mode) {
// Do not follow symlinks when opening a profile:
// - primary profiles should not contain symlinks in their paths
// - secondary dex paths should have been already resolved and validated
@@ -320,7 +320,7 @@
// Reference profiles and snapshots are created on the fly; so they might not exist beforehand.
unique_fd fd;
if ((flags & O_CREAT) != 0) {
- fd = create_profile(uid, profile, flags);
+ fd = create_profile(uid, profile, flags, mode);
} else {
fd.reset(TEMP_FAILURE_RETRY(open(profile.c_str(), flags)));
}
@@ -336,6 +336,16 @@
PLOG(ERROR) << "Failed to open profile " << profile;
}
return invalid_unique_fd();
+ } else {
+ // If we just create the file we need to set its mode because on Android
+ // open has a mask that only allows owner access.
+ if ((flags & O_CREAT) != 0) {
+ if (fchmod(fd.get(), mode) != 0) {
+ PLOG(ERROR) << "Could not set mode " << std::hex << mode << std::dec
+ << " on profile" << profile;
+ // Not a terminal failure.
+ }
+ }
}
return fd;
@@ -345,20 +355,29 @@
const std::string& location, bool is_secondary_dex) {
std::string profile = create_current_profile_path(user, package_name, location,
is_secondary_dex);
- return open_profile(uid, profile, O_RDONLY);
+ return open_profile(uid, profile, O_RDONLY, /*mode=*/ 0);
}
static unique_fd open_reference_profile(uid_t uid, const std::string& package_name,
const std::string& location, bool read_write, bool is_secondary_dex) {
std::string profile = create_reference_profile_path(package_name, location, is_secondary_dex);
- return open_profile(uid, profile, read_write ? (O_CREAT | O_RDWR) : O_RDONLY);
+ return open_profile(
+ uid,
+ profile,
+ read_write ? (O_CREAT | O_RDWR) : O_RDONLY,
+ S_IRUSR | S_IWUSR | S_IRGRP); // so that ART can also read it when apps run.
}
static UniqueFile open_reference_profile_as_unique_file(uid_t uid, const std::string& package_name,
const std::string& location, bool read_write, bool is_secondary_dex) {
std::string profile_path = create_reference_profile_path(package_name, location,
is_secondary_dex);
- unique_fd ufd = open_profile(uid, profile_path, read_write ? (O_CREAT | O_RDWR) : O_RDONLY);
+ unique_fd ufd = open_profile(
+ uid,
+ profile_path,
+ read_write ? (O_CREAT | O_RDWR) : O_RDONLY,
+ S_IRUSR | S_IWUSR | S_IRGRP); // so that ART can also read it when apps run.
+
return UniqueFile(ufd.release(), profile_path, [](const std::string& path) {
clear_profile(path);
});
@@ -367,7 +386,7 @@
static unique_fd open_spnashot_profile(uid_t uid, const std::string& package_name,
const std::string& location) {
std::string profile = create_snapshot_profile_path(package_name, location);
- return open_profile(uid, profile, O_CREAT | O_RDWR | O_TRUNC);
+ return open_profile(uid, profile, O_CREAT | O_RDWR | O_TRUNC, S_IRUSR | S_IWUSR);
}
static void open_profile_files(uid_t uid, const std::string& package_name,
@@ -2484,7 +2503,7 @@
for (size_t i = 0; i < profiles.size(); ) {
std::vector<unique_fd> profiles_fd;
for (size_t k = 0; k < kAggregationBatchSize && i < profiles.size(); k++, i++) {
- unique_fd fd = open_profile(AID_SYSTEM, profiles[i], O_RDONLY);
+ unique_fd fd = open_profile(AID_SYSTEM, profiles[i], O_RDONLY, /*mode=*/ 0);
if (fd.get() >= 0) {
profiles_fd.push_back(std::move(fd));
}
diff --git a/cmds/installd/tests/installd_dexopt_test.cpp b/cmds/installd/tests/installd_dexopt_test.cpp
index e272025..216347e 100644
--- a/cmds/installd/tests/installd_dexopt_test.cpp
+++ b/cmds/installd/tests/installd_dexopt_test.cpp
@@ -919,7 +919,7 @@
return;
}
- // Check that the snapshot was created witht he expected acess flags.
+ // Check that the snapshot was created with the expected access flags.
CheckFileAccess(snap_profile_, kSystemUid, kSystemGid, 0600 | S_IFREG);
// The snapshot should be equivalent to the merge of profiles.
@@ -962,8 +962,8 @@
return;
}
- // Check that the snapshot was created witht he expected acess flags.
- CheckFileAccess(ref_profile_, kTestAppUid, kTestAppUid, 0600 | S_IFREG);
+ // Check that the snapshot was created with the expected access flags.
+ CheckFileAccess(ref_profile_, kTestAppUid, kTestAppUid, 0640 | S_IFREG);
// The snapshot should be equivalent to the merge of profiles.
std::string ref_profile_content = ref_profile_ + ".expected";
diff --git a/libs/binder/Parcel.cpp b/libs/binder/Parcel.cpp
index 232a70c..ebba375 100644
--- a/libs/binder/Parcel.cpp
+++ b/libs/binder/Parcel.cpp
@@ -206,6 +206,7 @@
status_t status = writeInt32(1); // non-null
if (status != OK) return status;
RpcAddress address = RpcAddress::zero();
+ // TODO(b/167966510): need to undo this if the Parcel is not sent
status = mSession->state()->onBinderLeaving(mSession, binder, &address);
if (status != OK) return status;
status = address.writeToParcel(this);
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 2f378da..2d2eed2 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -187,6 +187,11 @@
}
mShutdownTrigger->trigger();
+ for (auto& [id, session] : mSessions) {
+ (void)id;
+ session->mShutdownTrigger->trigger();
+ }
+
while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) {
ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, "
@@ -261,7 +266,7 @@
};
server->mConnectingThreads.erase(threadId);
- if (!idValid) {
+ if (!idValid || server->mShutdownTrigger->isTriggered()) {
return;
}
@@ -276,10 +281,14 @@
session = RpcSession::make();
session->setMaxThreads(server->mMaxThreads);
- session->setForServer(server,
- sp<RpcServer::EventListener>::fromExisting(
- static_cast<RpcServer::EventListener*>(server.get())),
- server->mSessionIdCounter, server->mShutdownTrigger);
+ if (!session->setForServer(server,
+ sp<RpcServer::EventListener>::fromExisting(
+ static_cast<RpcServer::EventListener*>(
+ server.get())),
+ server->mSessionIdCounter)) {
+ ALOGE("Failed to attach server to session");
+ return;
+ }
server->mSessions[server->mSessionIdCounter] = session;
} else {
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index c563377..de9aa22 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -113,17 +113,21 @@
return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
}
-bool RpcSession::shutdown() {
+bool RpcSession::shutdownAndWait(bool wait) {
std::unique_lock<std::mutex> _l(mMutex);
- LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session");
LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed");
- LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
mShutdownTrigger->trigger();
- mShutdownListener->waitForShutdown(_l);
- mState->terminate();
- LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
+ if (wait) {
+ LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
+ mShutdownListener->waitForShutdown(_l);
+ LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
+ }
+
+ _l.unlock();
+ mState->clear();
+
return true;
}
@@ -139,12 +143,15 @@
status_t RpcSession::sendDecStrong(const RpcAddress& address) {
ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
ConnectionUse::CLIENT_REFCOUNT);
- return state()->sendDecStrong(connection.fd(), address);
+ return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address);
}
std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
auto ret = std::make_unique<RpcSession::FdTrigger>();
- if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr;
+ if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
+ ALOGE("Could not create pipe %s", strerror(errno));
+ return nullptr;
+ }
return ret;
}
@@ -152,6 +159,10 @@
mWrite.reset();
}
+bool RpcSession::FdTrigger::isTriggered() {
+ return mWrite == -1;
+}
+
status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
while (true) {
pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0},
@@ -285,7 +296,7 @@
if (!setupOneSocketConnection(addr, RPC_SESSION_ID_NEW, false /*reverse*/)) return false;
- // TODO(b/185167543): we should add additional sessions dynamically
+ // TODO(b/189955605): we should add additional sessions dynamically
// instead of all at once.
// TODO(b/186470974): first risk of blocking
size_t numThreadsAvailable;
@@ -303,11 +314,11 @@
// we've already setup one client
for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
- // TODO(b/185167543): shutdown existing connections?
+ // TODO(b/189955605): shutdown existing connections?
if (!setupOneSocketConnection(addr, mId.value(), false /*reverse*/)) return false;
}
- // TODO(b/185167543): we should add additional sessions dynamically
+ // TODO(b/189955605): we should add additional sessions dynamically
// instead of all at once - the other side should be responsible for setting
// up additional connections. We need to create at least one (unless 0 are
// requested to be set) in order to allow the other side to reliably make
@@ -408,20 +419,21 @@
return true;
}
-void RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
- int32_t sessionId,
- const std::shared_ptr<FdTrigger>& shutdownTrigger) {
+bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
+ int32_t sessionId) {
LOG_ALWAYS_FATAL_IF(mForServer != nullptr);
LOG_ALWAYS_FATAL_IF(server == nullptr);
LOG_ALWAYS_FATAL_IF(mEventListener != nullptr);
LOG_ALWAYS_FATAL_IF(eventListener == nullptr);
LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
- LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr);
+
+ mShutdownTrigger = FdTrigger::make();
+ if (mShutdownTrigger == nullptr) return false;
mId = sessionId;
mForServer = server;
mEventListener = eventListener;
- mShutdownTrigger = shutdownTrigger;
+ return true;
}
sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 93f1529..6899981 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -137,9 +137,39 @@
dumpLocked();
}
-void RpcState::terminate() {
+void RpcState::clear() {
std::unique_lock<std::mutex> _l(mNodeMutex);
- terminate(_l);
+
+ if (mTerminated) {
+ LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(),
+ "New state should be impossible after terminating!");
+ return;
+ }
+
+ if (SHOULD_LOG_RPC_DETAIL) {
+ ALOGE("RpcState::clear()");
+ dumpLocked();
+ }
+
+ // if the destructor of a binder object makes another RPC call, then calling
+ // decStrong could deadlock. So, we must hold onto these binders until
+ // mNodeMutex is no longer taken.
+ std::vector<sp<IBinder>> tempHoldBinder;
+
+ mTerminated = true;
+ for (auto& [address, node] : mNodeForAddress) {
+ sp<IBinder> binder = node.binder.promote();
+ LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());
+
+ if (node.sentRef != nullptr) {
+ tempHoldBinder.push_back(node.sentRef);
+ }
+ }
+
+ mNodeForAddress.clear();
+
+ _l.unlock();
+ tempHoldBinder.clear(); // explicit
}
void RpcState::dumpLocked() {
@@ -170,32 +200,6 @@
ALOGE("END DUMP OF RpcState");
}
-void RpcState::terminate(std::unique_lock<std::mutex>& lock) {
- if (SHOULD_LOG_RPC_DETAIL) {
- ALOGE("RpcState::terminate()");
- dumpLocked();
- }
-
- // if the destructor of a binder object makes another RPC call, then calling
- // decStrong could deadlock. So, we must hold onto these binders until
- // mNodeMutex is no longer taken.
- std::vector<sp<IBinder>> tempHoldBinder;
-
- mTerminated = true;
- for (auto& [address, node] : mNodeForAddress) {
- sp<IBinder> binder = node.binder.promote();
- LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());
-
- if (node.sentRef != nullptr) {
- tempHoldBinder.push_back(node.sentRef);
- }
- }
-
- mNodeForAddress.clear();
-
- lock.unlock();
- tempHoldBinder.clear(); // explicit
-}
RpcState::CommandData::CommandData(size_t size) : mSize(size) {
// The maximum size for regular binder is 1MB for all concurrent
@@ -218,13 +222,13 @@
mData.reset(new (std::nothrow) uint8_t[size]);
}
-status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data,
- size_t size) {
+status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const char* what, const void* data, size_t size) {
LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
if (size > std::numeric_limits<ssize_t>::max()) {
ALOGE("Cannot send %s at size %zu (too big)", what, size);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
@@ -235,7 +239,7 @@
LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
size, fd.get(), strerror(savedErrno));
- terminate();
+ (void)session->shutdownAndWait(false);
return -savedErrno;
}
@@ -246,7 +250,7 @@
const char* what, void* data, size_t size) {
if (size > std::numeric_limits<ssize_t>::max()) {
ALOGE("Cannot rec %s at size %zu (too big)", what, size);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
@@ -358,7 +362,11 @@
if (flags & IBinder::FLAG_ONEWAY) {
asyncNumber = it->second.asyncNumber;
- if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
+ if (!nodeProgressAsyncNumber(&it->second)) {
+ _l.unlock();
+ (void)session->shutdownAndWait(false);
+ return DEAD_OBJECT;
+ }
}
}
@@ -390,8 +398,10 @@
data.dataSize());
if (status_t status =
- rpcSend(fd, "transaction", transactionData.data(), transactionData.size());
+ rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size());
status != OK)
+ // TODO(b/167966510): need to undo onBinderLeaving - we know the
+ // refcount isn't successfully transferred.
return status;
if (flags & IBinder::FLAG_ONEWAY) {
@@ -442,7 +452,7 @@
if (command.bodySize < sizeof(RpcWireReply)) {
ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!",
sizeof(RpcWireReply), command.bodySize);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data.data());
@@ -457,7 +467,8 @@
return OK;
}
-status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& addr) {
+status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const RpcAddress& addr) {
{
std::lock_guard<std::mutex> _l(mNodeMutex);
if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
@@ -476,10 +487,10 @@
.command = RPC_COMMAND_DEC_STRONG,
.bodySize = sizeof(RpcWireAddress),
};
- if (status_t status = rpcSend(fd, "dec ref header", &cmd, sizeof(cmd)); status != OK)
+ if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK)
return status;
- if (status_t status =
- rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress));
+ if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(),
+ sizeof(RpcWireAddress));
status != OK)
return status;
return OK;
@@ -538,7 +549,7 @@
// also can't consider it a fatal error because this would allow any client
// to kill us, so ending the session for misbehaving client.
ALOGE("Unknown RPC command %d - terminating session", command.command);
- terminate();
+ (void)session->shutdownAndWait(false);
return DEAD_OBJECT;
}
status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -571,7 +582,7 @@
if (transactionData.size() < sizeof(RpcWireTransaction)) {
ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
sizeof(RpcWireTransaction), transactionData.size());
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
RpcWireTransaction* transaction = reinterpret_cast<RpcWireTransaction*>(transactionData.data());
@@ -600,15 +611,15 @@
// session.
ALOGE("While transacting, binder has been deleted at address %s. Terminating!",
addr.toString().c_str());
- terminate();
+ (void)session->shutdownAndWait(false);
replyStatus = BAD_VALUE;
} else if (target->localBinder() == nullptr) {
ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!",
addr.toString().c_str());
- terminate();
+ (void)session->shutdownAndWait(false);
replyStatus = BAD_VALUE;
} else if (transaction->flags & IBinder::FLAG_ONEWAY) {
- std::lock_guard<std::mutex> _l(mNodeMutex);
+ std::unique_lock<std::mutex> _l(mNodeMutex);
auto it = mNodeForAddress.find(addr);
if (it->second.binder.promote() != target) {
ALOGE("Binder became invalid during transaction. Bad client? %s",
@@ -617,16 +628,33 @@
} else if (transaction->asyncNumber != it->second.asyncNumber) {
// we need to process some other asynchronous transaction
// first
- // TODO(b/183140903): limit enqueues/detect overfill for bad client
- // TODO(b/183140903): detect when an object is deleted when it still has
- // pending async transactions
it->second.asyncTodo.push(BinderNode::AsyncTodo{
.ref = target,
.data = std::move(transactionData),
.asyncNumber = transaction->asyncNumber,
});
- LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s", transaction->asyncNumber,
- addr.toString().c_str());
+
+ size_t numPending = it->second.asyncTodo.size();
+ LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s (%zu pending)",
+ transaction->asyncNumber, addr.toString().c_str(), numPending);
+
+ constexpr size_t kArbitraryOnewayCallTerminateLevel = 10000;
+ constexpr size_t kArbitraryOnewayCallWarnLevel = 1000;
+ constexpr size_t kArbitraryOnewayCallWarnPer = 1000;
+
+ if (numPending >= kArbitraryOnewayCallWarnLevel) {
+ if (numPending >= kArbitraryOnewayCallTerminateLevel) {
+ ALOGE("WARNING: %zu pending oneway transactions. Terminating!", numPending);
+ _l.unlock();
+ (void)session->shutdownAndWait(false);
+ return FAILED_TRANSACTION;
+ }
+
+ if (numPending % kArbitraryOnewayCallWarnPer == 0) {
+ ALOGW("Warning: many oneway transactions built up on %p (%zu)",
+ target.get(), numPending);
+ }
+ }
return OK;
}
}
@@ -707,7 +735,11 @@
// last refcount dropped after this transaction happened
if (it == mNodeForAddress.end()) return OK;
- if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
+ if (!nodeProgressAsyncNumber(&it->second)) {
+ _l.unlock();
+ (void)session->shutdownAndWait(false);
+ return DEAD_OBJECT;
+ }
if (it->second.asyncTodo.size() == 0) return OK;
if (it->second.asyncTodo.top().asyncNumber == it->second.asyncNumber) {
@@ -753,7 +785,7 @@
memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(),
reply.dataSize());
- return rpcSend(fd, "reply", replyData.data(), replyData.size());
+ return rpcSend(fd, session, "reply", replyData.data(), replyData.size());
}
status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -772,7 +804,7 @@
if (command.bodySize < sizeof(RpcWireAddress)) {
ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireAddress. Terminating!",
sizeof(RpcWireAddress), command.bodySize);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
RpcWireAddress* address = reinterpret_cast<RpcWireAddress*>(commandData.data());
@@ -790,7 +822,8 @@
if (target == nullptr) {
ALOGE("While requesting dec strong, binder has been deleted at address %s. Terminating!",
addr.toString().c_str());
- terminate();
+ _l.unlock();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
@@ -826,12 +859,11 @@
return ref;
}
-bool RpcState::nodeProgressAsyncNumber(BinderNode* node, std::unique_lock<std::mutex>& lock) {
+bool RpcState::nodeProgressAsyncNumber(BinderNode* node) {
// 2**64 =~ 10**19 =~ 1000 transactions per second for 585 million years to
// a single binder
if (node->asyncNumber >= std::numeric_limits<decltype(node->asyncNumber)>::max()) {
ALOGE("Out of async transaction IDs. Terminating");
- terminate(lock);
return false;
}
node->asyncNumber++;
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index 81ff458..13c3115 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -65,7 +65,8 @@
uint32_t code, const Parcel& data,
const sp<RpcSession>& session, Parcel* reply,
uint32_t flags);
- [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address);
+ [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const RpcAddress& address);
enum class CommandType {
ANY,
@@ -110,11 +111,10 @@
* WARNING: RpcState is responsible for calling this when the session is
* no longer recoverable.
*/
- void terminate();
+ void clear();
private:
void dumpLocked();
- void terminate(std::unique_lock<std::mutex>& lock);
// Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
// large allocations to avoid being requested from allocating too much data.
@@ -130,8 +130,8 @@
size_t mSize;
};
- [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const char* what, const void* data,
- size_t size);
+ [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const char* what, const void* data, size_t size);
[[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
const char* what, void* data, size_t size);
@@ -204,9 +204,8 @@
// dropped after any locks are removed.
sp<IBinder> tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& it);
// true - success
- // false - state terminated, lock gone, halt
- [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node,
- std::unique_lock<std::mutex>& lock);
+ // false - session shutdown, halt
+ [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node);
std::mutex mNodeMutex;
bool mTerminated = false;
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 98db221..4e6934b 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -97,7 +97,7 @@
*
* If this is not specified, this will be a single-threaded server.
*
- * TODO(b/185167543): these are currently created per client, but these
+ * TODO(b/167966510): these are currently created per client, but these
* should be shared.
*/
void setMaxThreads(size_t threads);
@@ -173,7 +173,7 @@
wp<IBinder> mRootObjectWeak;
std::map<int32_t, sp<RpcSession>> mSessions;
int32_t mSessionIdCounter = 0;
- std::shared_ptr<RpcSession::FdTrigger> mShutdownTrigger;
+ std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
std::condition_variable mShutdownCv;
};
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index a6bc1a9..4650cf2 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -54,7 +54,7 @@
* If this is called, 'shutdown' on this session must also be called.
* Otherwise, a threadpool will leak.
*
- * TODO(b/185167543): start these dynamically
+ * TODO(b/189955605): start these dynamically
*/
void setMaxThreads(size_t threads);
size_t getMaxThreads();
@@ -97,14 +97,20 @@
status_t getRemoteMaxThreads(size_t* maxThreads);
/**
- * Shuts down the service. Only works for client sessions (server-side
- * sessions currently only support shutting down the entire server).
+ * Shuts down the service.
+ *
+ * For client sessions, wait can be true or false. For server sessions,
+ * waiting is not currently supported (will abort).
*
* Warning: this is currently not active/nice (the server isn't told we're
* shutting down). Being nicer to the server could potentially make it
* reclaim resources faster.
+ *
+ * If this is called w/ 'wait' true, then this will wait for shutdown to
+ * complete before returning. This will hang if it is called from the
+ * session threadpool (when processing received calls).
*/
- [[nodiscard]] bool shutdown();
+ [[nodiscard]] bool shutdownAndWait(bool wait);
[[nodiscard]] status_t transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
Parcel* reply, uint32_t flags);
@@ -129,16 +135,17 @@
static std::unique_ptr<FdTrigger> make();
/**
- * poll() on this fd for POLLHUP to get notification when trigger is called
- */
- base::borrowed_fd readFd() const { return mRead; }
-
- /**
* Close the write end of the pipe so that the read end receives POLLHUP.
+ * Not threadsafe.
*/
void trigger();
/**
+ * Whether this has been triggered.
+ */
+ bool isTriggered();
+
+ /**
* Poll for a read event.
*
* Return:
@@ -197,9 +204,9 @@
[[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId,
bool server);
[[nodiscard]] bool addClientConnection(base::unique_fd fd);
- void setForServer(const wp<RpcServer>& server,
- const wp<RpcSession::EventListener>& eventListener, int32_t sessionId,
- const std::shared_ptr<FdTrigger>& shutdownTrigger);
+ [[nodiscard]] bool setForServer(const wp<RpcServer>& server,
+ const wp<RpcSession::EventListener>& eventListener,
+ int32_t sessionId);
sp<RpcConnection> assignServerToThisThread(base::unique_fd fd);
[[nodiscard]] bool removeServerConnection(const sp<RpcConnection>& connection);
@@ -252,7 +259,7 @@
// TODO(b/183988761): this shouldn't be guessable
std::optional<int32_t> mId;
- std::shared_ptr<FdTrigger> mShutdownTrigger;
+ std::unique_ptr<FdTrigger> mShutdownTrigger;
std::unique_ptr<RpcState> mState;
@@ -266,9 +273,6 @@
size_t mClientConnectionsOffset = 0;
std::vector<sp<RpcConnection>> mClientConnections;
std::vector<sp<RpcConnection>> mServerConnections;
-
- // TODO(b/185167543): allow sharing between different sessions in a
- // process? (or combine with mServerConnections)
std::map<std::thread::id, std::thread> mThreads;
};
diff --git a/libs/binder/rust/src/lib.rs b/libs/binder/rust/src/lib.rs
index 2694cba..7c0584b 100644
--- a/libs/binder/rust/src/lib.rs
+++ b/libs/binder/rust/src/lib.rs
@@ -115,14 +115,14 @@
pub use native::add_service;
pub use native::Binder;
pub use parcel::Parcel;
-pub use proxy::{get_interface, get_service};
+pub use proxy::{get_interface, get_service, wait_for_interface, wait_for_service};
pub use proxy::{AssociateClass, DeathRecipient, Proxy, SpIBinder, WpIBinder};
pub use state::{ProcessState, ThreadState};
/// The public API usable outside AIDL-generated interface crates.
pub mod public_api {
pub use super::parcel::ParcelFileDescriptor;
- pub use super::{add_service, get_interface};
+ pub use super::{add_service, get_interface, wait_for_interface};
pub use super::{
BinderFeatures, DeathRecipient, ExceptionCode, IBinder, Interface, ProcessState, SpIBinder,
Status, StatusCode, Strong, ThreadState, Weak, WpIBinder,
diff --git a/libs/binder/rust/src/proxy.rs b/libs/binder/rust/src/proxy.rs
index 52036f5..4a6d118 100644
--- a/libs/binder/rust/src/proxy.rs
+++ b/libs/binder/rust/src/proxy.rs
@@ -653,6 +653,18 @@
}
}
+/// Retrieve an existing service, or start it if it is configured as a dynamic
+/// service and isn't yet started.
+pub fn wait_for_service(name: &str) -> Option<SpIBinder> {
+ let name = CString::new(name).ok()?;
+ unsafe {
+ // Safety: `AServiceManager_waitforService` returns either a null
+ // pointer or a valid pointer to an owned `AIBinder`. Either of these
+ // values is safe to pass to `SpIBinder::from_raw`.
+ SpIBinder::from_raw(sys::AServiceManager_waitForService(name.as_ptr()))
+ }
+}
+
/// Retrieve an existing service for a particular interface, blocking for a few
/// seconds if it doesn't yet exist.
pub fn get_interface<T: FromIBinder + ?Sized>(name: &str) -> Result<Strong<T>> {
@@ -663,6 +675,16 @@
}
}
+/// Retrieve an existing service for a particular interface, or start it if it
+/// is configured as a dynamic service and isn't yet started.
+pub fn wait_for_interface<T: FromIBinder + ?Sized>(name: &str) -> Result<Strong<T>> {
+ let service = wait_for_service(name);
+ match service {
+ Some(service) => FromIBinder::try_from(service),
+ None => Err(StatusCode::NAME_NOT_FOUND),
+ }
+}
+
/// # Safety
///
/// `SpIBinder` guarantees that `binder` always contains a valid pointer to an
diff --git a/libs/binder/rust/tests/integration.rs b/libs/binder/rust/tests/integration.rs
index 0332007..10b77f4 100644
--- a/libs/binder/rust/tests/integration.rs
+++ b/libs/binder/rust/tests/integration.rs
@@ -274,6 +274,20 @@
}
#[test]
+ fn check_wait_for_service() {
+ let mut sm =
+ binder::wait_for_service("manager").expect("Did not get manager binder service");
+ assert!(sm.is_binder_alive());
+ assert!(sm.ping_binder().is_ok());
+
+ // The service manager service isn't an ITest, so this must fail.
+ assert_eq!(
+ binder::wait_for_interface::<dyn ITest>("manager").err(),
+ Some(StatusCode::BAD_TYPE)
+ );
+ }
+
+ #[test]
fn trivial_client() {
let service_name = "trivial_client_test";
let _process = ScopedServiceProcess::new(service_name);
@@ -283,6 +297,15 @@
}
#[test]
+ fn wait_for_trivial_client() {
+ let service_name = "wait_for_trivial_client_test";
+ let _process = ScopedServiceProcess::new(service_name);
+ let test_client: Strong<dyn ITest> =
+ binder::wait_for_interface(service_name).expect("Did not get manager binder service");
+ assert_eq!(test_client.test().unwrap(), "wait_for_trivial_client_test");
+ }
+
+ #[test]
fn get_selinux_context() {
let service_name = "get_selinux_context";
let _process = ScopedServiceProcess::new(service_name);
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 0a970fb..82f8a3e 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -941,6 +941,40 @@
for (auto& t : threads) t.join();
}
+TEST_P(BinderRpc, OnewayCallExhaustion) {
+ constexpr size_t kNumClients = 2;
+ constexpr size_t kTooLongMs = 1000;
+
+ auto proc = createRpcTestSocketServerProcess(kNumClients /*threads*/, 2 /*sessions*/);
+
+ // Build up oneway calls on the second session to make sure it terminates
+ // and shuts down. The first session should be unaffected (proc destructor
+ // checks the first session).
+ auto iface = interface_cast<IBinderRpcTest>(proc.proc.sessions.at(1).root);
+
+ std::vector<std::thread> threads;
+ for (size_t i = 0; i < kNumClients; i++) {
+ // one of these threads will get stuck queueing a transaction once the
+ // socket fills up, the other will be able to fill up transactions on
+ // this object
+ threads.push_back(std::thread([&] {
+ while (iface->sleepMsAsync(kTooLongMs).isOk()) {
+ }
+ }));
+ }
+ for (auto& t : threads) t.join();
+
+ Status status = iface->sleepMsAsync(kTooLongMs);
+ EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status;
+
+ // the second session should be shutdown in the other process by the time we
+ // are able to join above (it'll only be hung up once it finishes processing
+ // any pending commands). We need to erase this session from the record
+ // here, so that the destructor for our session won't check that this
+ // session is valid, but we still want it to test the other session.
+ proc.proc.sessions.erase(proc.proc.sessions.begin() + 1);
+}
+
TEST_P(BinderRpc, Callbacks) {
const static std::string kTestString = "good afternoon!";
@@ -966,7 +1000,7 @@
// since this session has a reverse connection w/ a threadpool, we
// need to manually shut it down
- EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdown());
+ EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));
proc.expectAlreadyShutdown = true;
}