Merge "binderRpcTest: mitigate hang"
diff --git a/libs/binder/BpBinder.cpp b/libs/binder/BpBinder.cpp
index 1dcb94c..5e44a0f 100644
--- a/libs/binder/BpBinder.cpp
+++ b/libs/binder/BpBinder.cpp
@@ -273,7 +273,8 @@
status_t status;
if (CC_UNLIKELY(isRpcBinder())) {
- status = rpcSession()->transact(rpcAddress(), code, data, reply, flags);
+ status = rpcSession()->transact(sp<IBinder>::fromExisting(this), code, data, reply,
+ flags);
} else {
status = IPCThreadState::self()->transact(binderHandle(), code, data, reply, flags);
}
diff --git a/libs/binder/IPCThreadState.cpp b/libs/binder/IPCThreadState.cpp
index e933f7e..fa9f3a9 100644
--- a/libs/binder/IPCThreadState.cpp
+++ b/libs/binder/IPCThreadState.cpp
@@ -641,12 +641,6 @@
mPostWriteStrongDerefs.clear();
}
-void IPCThreadState::createTransactionReference(RefBase* ref)
-{
- ref->incStrong(mProcess.get());
- mPostWriteStrongDerefs.push(ref);
-}
-
void IPCThreadState::joinThreadPool(bool isMain)
{
LOG_THREADPOOL("**** THREAD %p (PID %d) IS JOINING THE THREAD POOL\n", (void*)pthread_self(), getpid());
diff --git a/libs/binder/IServiceManager.cpp b/libs/binder/IServiceManager.cpp
index f684cf6..47dd32e 100644
--- a/libs/binder/IServiceManager.cpp
+++ b/libs/binder/IServiceManager.cpp
@@ -320,14 +320,18 @@
const std::string name = String8(name16).c_str();
sp<IBinder> out;
- if (!mTheRealServiceManager->getService(name, &out).isOk()) {
+ if (Status status = mTheRealServiceManager->getService(name, &out); !status.isOk()) {
+ ALOGW("Failed to getService in waitForService for %s: %s", name.c_str(),
+ status.toString8().c_str());
return nullptr;
}
if (out != nullptr) return out;
sp<Waiter> waiter = sp<Waiter>::make();
- if (!mTheRealServiceManager->registerForNotifications(
- name, waiter).isOk()) {
+ if (Status status = mTheRealServiceManager->registerForNotifications(name, waiter);
+ !status.isOk()) {
+ ALOGW("Failed to registerForNotifications in waitForService for %s: %s", name.c_str(),
+ status.toString8().c_str());
return nullptr;
}
Defer unregister ([&] {
@@ -360,7 +364,9 @@
// - init gets death signal, but doesn't know it needs to restart
// the service
// - we need to request service again to get it to start
- if (!mTheRealServiceManager->getService(name, &out).isOk()) {
+ if (Status status = mTheRealServiceManager->getService(name, &out); !status.isOk()) {
+ ALOGW("Failed to getService in waitForService on later try for %s: %s", name.c_str(),
+ status.toString8().c_str());
return nullptr;
}
if (out != nullptr) return out;
@@ -369,7 +375,10 @@
bool ServiceManagerShim::isDeclared(const String16& name) {
bool declared;
- if (!mTheRealServiceManager->isDeclared(String8(name).c_str(), &declared).isOk()) {
+ if (Status status = mTheRealServiceManager->isDeclared(String8(name).c_str(), &declared);
+ !status.isOk()) {
+ ALOGW("Failed to get isDeclard for %s: %s", String8(name).c_str(),
+ status.toString8().c_str());
return false;
}
return declared;
@@ -377,7 +386,11 @@
Vector<String16> ServiceManagerShim::getDeclaredInstances(const String16& interface) {
std::vector<std::string> out;
- if (!mTheRealServiceManager->getDeclaredInstances(String8(interface).c_str(), &out).isOk()) {
+ if (Status status =
+ mTheRealServiceManager->getDeclaredInstances(String8(interface).c_str(), &out);
+ !status.isOk()) {
+ ALOGW("Failed to getDeclaredInstances for %s: %s", String8(interface).c_str(),
+ status.toString8().c_str());
return {};
}
@@ -391,7 +404,10 @@
std::optional<String16> ServiceManagerShim::updatableViaApex(const String16& name) {
std::optional<std::string> declared;
- if (!mTheRealServiceManager->updatableViaApex(String8(name).c_str(), &declared).isOk()) {
+ if (Status status = mTheRealServiceManager->updatableViaApex(String8(name).c_str(), &declared);
+ !status.isOk()) {
+ ALOGW("Failed to get updatableViaApex for %s: %s", String8(name).c_str(),
+ status.toString8().c_str());
return std::nullopt;
}
return declared ? std::optional<String16>(String16(declared.value().c_str())) : std::nullopt;
diff --git a/libs/binder/ProcessState.cpp b/libs/binder/ProcessState.cpp
index 3095607..8ef4341 100644
--- a/libs/binder/ProcessState.cpp
+++ b/libs/binder/ProcessState.cpp
@@ -409,7 +409,7 @@
uint32_t enable = DEFAULT_ENABLE_ONEWAY_SPAM_DETECTION;
result = ioctl(fd, BINDER_ENABLE_ONEWAY_SPAM_DETECTION, &enable);
if (result == -1) {
- ALOGI("Binder ioctl to enable oneway spam detection failed: %s", strerror(errno));
+ ALOGD("Binder ioctl to enable oneway spam detection failed: %s", strerror(errno));
}
} else {
ALOGW("Opening '%s' failed: %s\n", driver, strerror(errno));
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index bff5543..d8ba2c6 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -193,10 +193,12 @@
mShutdownTrigger->trigger();
while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
- ALOGI("Waiting for RpcServer to shut down. Join thread running: %d, Connecting threads: "
- "%zu, Sessions: %zu",
- mJoinThreadRunning, mConnectingThreads.size(), mSessions.size());
- mShutdownCv.wait(_l);
+ if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) {
+ ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, "
+ "Connecting threads: "
+ "%zu, Sessions: %zu. Is your server deadlocked?",
+ mJoinThreadRunning, mConnectingThreads.size(), mSessions.size());
+ }
}
// At this point, we know join() is about to exit, but the thread that calls
@@ -256,18 +258,12 @@
LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(),
"Must establish connection on owned thread");
thisThread = std::move(threadId->second);
- ScopeGuard detachGuard = [&]() { thisThread.detach(); };
- server->mConnectingThreads.erase(threadId);
-
- // TODO(b/185167543): we currently can't disable this because we don't
- // shutdown sessions as well, only the server itself. So, we need to
- // keep this separate from the detachGuard, since we temporarily want to
- // give a notification even when we pass ownership of the thread to
- // a session.
- ScopeGuard threadLifetimeGuard = [&]() {
+ ScopeGuard detachGuard = [&]() {
+ thisThread.detach();
_l.unlock();
server->mShutdownCv.notify_all();
};
+ server->mConnectingThreads.erase(threadId);
if (!idValid) {
return;
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index 7c458c1..d05b848 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -86,8 +86,7 @@
return false;
}
- addClientConnection(std::move(serverFd));
- return true;
+ return addClientConnection(std::move(serverFd));
}
sp<IBinder> RpcSession::getRootObject() {
@@ -100,12 +99,12 @@
return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
}
-status_t RpcSession::transact(const RpcAddress& address, uint32_t code, const Parcel& data,
+status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
Parcel* reply, uint32_t flags) {
ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
(flags & IBinder::FLAG_ONEWAY) ? ConnectionUse::CLIENT_ASYNC
: ConnectionUse::CLIENT);
- return state()->transact(connection.fd(), address, code, data,
+ return state()->transact(connection.fd(), binder, code, data,
sp<RpcSession>::fromExisting(this), reply, flags);
}
@@ -199,7 +198,8 @@
state()->getAndExecuteCommand(connection->fd, sp<RpcSession>::fromExisting(this));
if (error != OK) {
- ALOGI("Binder connection thread closing w/ status %s", statusToString(error).c_str());
+ LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
+ statusToString(error).c_str());
break;
}
}
@@ -311,24 +311,25 @@
LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());
- addClientConnection(std::move(serverFd));
- return true;
+ return addClientConnection(std::move(serverFd));
}
ALOGE("Ran out of retries to connect to %s", addr.toString().c_str());
return false;
}
-void RpcSession::addClientConnection(unique_fd fd) {
+bool RpcSession::addClientConnection(unique_fd fd) {
std::lock_guard<std::mutex> _l(mMutex);
if (mShutdownTrigger == nullptr) {
mShutdownTrigger = FdTrigger::make();
+ if (mShutdownTrigger == nullptr) return false;
}
sp<RpcConnection> session = sp<RpcConnection>::make();
session->fd = std::move(fd);
mClientConnections.push_back(session);
+ return true;
}
void RpcSession::setForServer(const wp<RpcServer>& server, int32_t sessionId,
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 6483486..a801729 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -207,45 +207,49 @@
mData.reset(new (std::nothrow) uint8_t[size]);
}
-bool RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data, size_t size) {
+status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data,
+ size_t size) {
LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
if (size > std::numeric_limits<ssize_t>::max()) {
ALOGE("Cannot send %s at size %zu (too big)", what, size);
terminate();
- return false;
+ return BAD_VALUE;
}
ssize_t sent = TEMP_FAILURE_RETRY(send(fd.get(), data, size, MSG_NOSIGNAL));
if (sent < 0 || sent != static_cast<ssize_t>(size)) {
+ int savedErrno = errno;
ALOGE("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent, size,
- fd.get(), strerror(errno));
+ fd.get(), strerror(savedErrno));
terminate();
- return false;
+ return -savedErrno;
}
- return true;
+ return OK;
}
-bool RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session, const char* what,
- void* data, size_t size) {
+status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const char* what, void* data, size_t size) {
if (size > std::numeric_limits<ssize_t>::max()) {
ALOGE("Cannot rec %s at size %zu (too big)", what, size);
terminate();
- return false;
+ return BAD_VALUE;
}
if (status_t status = session->mShutdownTrigger->interruptableReadFully(fd.get(), data, size);
status != OK) {
- ALOGE("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(),
- statusToString(status).c_str());
- return false;
+ if (status != -ECANCELED) {
+ ALOGE("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(),
+ statusToString(status).c_str());
+ }
+ return status;
}
LOG_RPC_DETAIL("Received %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
- return true;
+ return OK;
}
sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) {
@@ -253,8 +257,8 @@
data.markForRpc(session);
Parcel reply;
- status_t status = transact(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT, data, session,
- &reply, 0);
+ status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT, data,
+ session, &reply, 0);
if (status != OK) {
ALOGE("Error getting root object: %s", statusToString(status).c_str());
return nullptr;
@@ -269,8 +273,8 @@
data.markForRpc(session);
Parcel reply;
- status_t status = transact(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS, data,
- session, &reply, 0);
+ status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS,
+ data, session, &reply, 0);
if (status != OK) {
ALOGE("Error getting max threads: %s", statusToString(status).c_str());
return status;
@@ -294,8 +298,8 @@
data.markForRpc(session);
Parcel reply;
- status_t status = transact(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID, data,
- session, &reply, 0);
+ status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID,
+ data, session, &reply, 0);
if (status != OK) {
ALOGE("Error getting session ID: %s", statusToString(status).c_str());
return status;
@@ -309,9 +313,31 @@
return OK;
}
-status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address, uint32_t code,
+status_t RpcState::transact(const base::unique_fd& fd, const sp<IBinder>& binder, uint32_t code,
const Parcel& data, const sp<RpcSession>& session, Parcel* reply,
uint32_t flags) {
+ if (!data.isForRpc()) {
+ ALOGE("Refusing to send RPC with parcel not crafted for RPC");
+ return BAD_TYPE;
+ }
+
+ if (data.objectsCount() != 0) {
+ ALOGE("Parcel at %p has attached objects but is being used in an RPC call", &data);
+ return BAD_TYPE;
+ }
+
+ RpcAddress address = RpcAddress::zero();
+ if (status_t status = onBinderLeaving(session, binder, &address); status != OK) return status;
+
+ return transactAddress(fd, address, code, data, session, reply, flags);
+}
+
+status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& address,
+ uint32_t code, const Parcel& data, const sp<RpcSession>& session,
+ Parcel* reply, uint32_t flags) {
+ LOG_ALWAYS_FATAL_IF(!data.isForRpc());
+ LOG_ALWAYS_FATAL_IF(data.objectsCount() != 0);
+
uint64_t asyncNumber = 0;
if (!address.isZero()) {
@@ -326,16 +352,6 @@
}
}
- if (!data.isForRpc()) {
- ALOGE("Refusing to send RPC with parcel not crafted for RPC");
- return BAD_TYPE;
- }
-
- if (data.objectsCount() != 0) {
- ALOGE("Parcel at %p has attached objects but is being used in an RPC call", &data);
- return BAD_TYPE;
- }
-
RpcWireTransaction transaction{
.address = address.viewRawEmbedded(),
.code = code,
@@ -361,12 +377,12 @@
.bodySize = static_cast<uint32_t>(transactionData.size()),
};
- if (!rpcSend(fd, "transact header", &command, sizeof(command))) {
- return DEAD_OBJECT;
- }
- if (!rpcSend(fd, "command body", transactionData.data(), transactionData.size())) {
- return DEAD_OBJECT;
- }
+ if (status_t status = rpcSend(fd, "transact header", &command, sizeof(command)); status != OK)
+ return status;
+ if (status_t status =
+ rpcSend(fd, "command body", transactionData.data(), transactionData.size());
+ status != OK)
+ return status;
if (flags & IBinder::FLAG_ONEWAY) {
return OK; // do not wait for result
@@ -390,24 +406,22 @@
Parcel* reply) {
RpcWireHeader command;
while (true) {
- if (!rpcRec(fd, session, "command header", &command, sizeof(command))) {
- return DEAD_OBJECT;
- }
+ if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command));
+ status != OK)
+ return status;
if (command.command == RPC_COMMAND_REPLY) break;
- status_t status = processServerCommand(fd, session, command);
- if (status != OK) return status;
+ if (status_t status = processServerCommand(fd, session, command); status != OK)
+ return status;
}
CommandData data(command.bodySize);
- if (!data.valid()) {
- return NO_MEMORY;
- }
+ if (!data.valid()) return NO_MEMORY;
- if (!rpcRec(fd, session, "reply body", data.data(), command.bodySize)) {
- return DEAD_OBJECT;
- }
+ if (status_t status = rpcRec(fd, session, "reply body", data.data(), command.bodySize);
+ status != OK)
+ return status;
if (command.bodySize < sizeof(RpcWireReply)) {
ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!",
@@ -447,9 +461,12 @@
.command = RPC_COMMAND_DEC_STRONG,
.bodySize = sizeof(RpcWireAddress),
};
- if (!rpcSend(fd, "dec ref header", &cmd, sizeof(cmd))) return DEAD_OBJECT;
- if (!rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress)))
- return DEAD_OBJECT;
+ if (status_t status = rpcSend(fd, "dec ref header", &cmd, sizeof(cmd)); status != OK)
+ return status;
+ if (status_t status =
+ rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress));
+ status != OK)
+ return status;
return OK;
}
@@ -457,9 +474,9 @@
LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get());
RpcWireHeader command;
- if (!rpcRec(fd, session, "command header", &command, sizeof(command))) {
- return DEAD_OBJECT;
- }
+ if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command));
+ status != OK)
+ return status;
return processServerCommand(fd, session, command);
}
@@ -505,11 +522,12 @@
if (!transactionData.valid()) {
return NO_MEMORY;
}
- if (!rpcRec(fd, session, "transaction body", transactionData.data(), transactionData.size())) {
- return DEAD_OBJECT;
- }
+ if (status_t status = rpcRec(fd, session, "transaction body", transactionData.data(),
+ transactionData.size());
+ status != OK)
+ return status;
- return processTransactInternal(fd, session, std::move(transactionData));
+ return processTransactInternal(fd, session, std::move(transactionData), nullptr /*targetRef*/);
}
static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t dataSize,
@@ -522,7 +540,7 @@
}
status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session,
- CommandData transactionData) {
+ CommandData transactionData, sp<IBinder>&& targetRef) {
if (transactionData.size() < sizeof(RpcWireTransaction)) {
ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
sizeof(RpcWireTransaction), transactionData.size());
@@ -538,45 +556,49 @@
status_t replyStatus = OK;
sp<IBinder> target;
if (!addr.isZero()) {
- std::lock_guard<std::mutex> _l(mNodeMutex);
-
- auto it = mNodeForAddress.find(addr);
- if (it == mNodeForAddress.end()) {
- ALOGE("Unknown binder address %s.", addr.toString().c_str());
- replyStatus = BAD_VALUE;
+ if (!targetRef) {
+ target = onBinderEntering(session, addr);
} else {
- target = it->second.binder.promote();
- if (target == nullptr) {
- // This can happen if the binder is remote in this process, and
- // another thread has called the last decStrong on this binder.
- // However, for local binders, it indicates a misbehaving client
- // (any binder which is being transacted on should be holding a
- // strong ref count), so in either case, terminating the
- // session.
- ALOGE("While transacting, binder has been deleted at address %s. Terminating!",
+ target = targetRef;
+ }
+
+ if (target == nullptr) {
+ // This can happen if the binder is remote in this process, and
+ // another thread has called the last decStrong on this binder.
+ // However, for local binders, it indicates a misbehaving client
+ // (any binder which is being transacted on should be holding a
+ // strong ref count), so in either case, terminating the
+ // session.
+ ALOGE("While transacting, binder has been deleted at address %s. Terminating!",
+ addr.toString().c_str());
+ terminate();
+ replyStatus = BAD_VALUE;
+ } else if (target->localBinder() == nullptr) {
+ ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!",
+ addr.toString().c_str());
+ terminate();
+ replyStatus = BAD_VALUE;
+ } else if (transaction->flags & IBinder::FLAG_ONEWAY) {
+ std::lock_guard<std::mutex> _l(mNodeMutex);
+ auto it = mNodeForAddress.find(addr);
+ if (it->second.binder.promote() != target) {
+ ALOGE("Binder became invalid during transaction. Bad client? %s",
addr.toString().c_str());
- terminate();
replyStatus = BAD_VALUE;
- } else if (target->localBinder() == nullptr) {
- ALOGE("Transactions can only go to local binders, not address %s. Terminating!",
- addr.toString().c_str());
- terminate();
- replyStatus = BAD_VALUE;
- } else if (transaction->flags & IBinder::FLAG_ONEWAY) {
- if (transaction->asyncNumber != it->second.asyncNumber) {
- // we need to process some other asynchronous transaction
- // first
- // TODO(b/183140903): limit enqueues/detect overfill for bad client
- // TODO(b/183140903): detect when an object is deleted when it still has
- // pending async transactions
- it->second.asyncTodo.push(BinderNode::AsyncTodo{
- .data = std::move(transactionData),
- .asyncNumber = transaction->asyncNumber,
- });
- LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s", transaction->asyncNumber,
- addr.toString().c_str());
- return OK;
- }
+ } else if (transaction->asyncNumber != it->second.asyncNumber) {
+ // we need to process some other asynchronous transaction
+ // first
+ // TODO(b/183140903): limit enqueues/detect overfill for bad client
+ // TODO(b/183140903): detect when an object is deleted when it still has
+ // pending async transactions
+ it->second.asyncTodo.push(BinderNode::AsyncTodo{
+ .ref = target,
+ .data = std::move(transactionData),
+ .asyncNumber = transaction->asyncNumber,
+ });
+ LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s", transaction->asyncNumber,
+ addr.toString().c_str());
+ return OK;
}
}
}
@@ -670,13 +692,17 @@
it->second.asyncNumber, addr.toString().c_str());
// justification for const_cast (consider avoiding priority_queue):
- // - AsyncTodo operator< doesn't depend on 'data' object
+ // - AsyncTodo operator< doesn't depend on 'data' or 'ref' objects
// - gotta go fast
- CommandData data = std::move(
- const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top()).data);
+ auto& todo = const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top());
+
+ CommandData nextData = std::move(todo.data);
+ sp<IBinder> nextRef = std::move(todo.ref);
+
it->second.asyncTodo.pop();
_l.unlock();
- return processTransactInternal(fd, session, std::move(data));
+ return processTransactInternal(fd, session, std::move(nextData),
+ std::move(nextRef));
}
}
return OK;
@@ -704,12 +730,12 @@
.bodySize = static_cast<uint32_t>(replyData.size()),
};
- if (!rpcSend(fd, "reply header", &cmdReply, sizeof(RpcWireHeader))) {
- return DEAD_OBJECT;
- }
- if (!rpcSend(fd, "reply body", replyData.data(), replyData.size())) {
- return DEAD_OBJECT;
- }
+ if (status_t status = rpcSend(fd, "reply header", &cmdReply, sizeof(RpcWireHeader));
+ status != OK)
+ return status;
+ if (status_t status = rpcSend(fd, "reply body", replyData.data(), replyData.size());
+ status != OK)
+ return status;
return OK;
}
@@ -721,9 +747,10 @@
if (!commandData.valid()) {
return NO_MEMORY;
}
- if (!rpcRec(fd, session, "dec ref body", commandData.data(), commandData.size())) {
- return DEAD_OBJECT;
- }
+ if (status_t status =
+ rpcRec(fd, session, "dec ref body", commandData.data(), commandData.size());
+ status != OK)
+ return status;
if (command.bodySize < sizeof(RpcWireAddress)) {
ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireAddress. Terminating!",
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index f913925..8a0610e 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -58,9 +58,13 @@
status_t getSessionId(const base::unique_fd& fd, const sp<RpcSession>& session,
int32_t* sessionIdOut);
- [[nodiscard]] status_t transact(const base::unique_fd& fd, const RpcAddress& address,
+ [[nodiscard]] status_t transact(const base::unique_fd& fd, const sp<IBinder>& address,
uint32_t code, const Parcel& data,
const sp<RpcSession>& session, Parcel* reply, uint32_t flags);
+ [[nodiscard]] status_t transactAddress(const base::unique_fd& fd, const RpcAddress& address,
+ uint32_t code, const Parcel& data,
+ const sp<RpcSession>& session, Parcel* reply,
+ uint32_t flags);
[[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address);
[[nodiscard]] status_t getAndExecuteCommand(const base::unique_fd& fd,
const sp<RpcSession>& session);
@@ -115,10 +119,10 @@
size_t mSize;
};
- [[nodiscard]] bool rpcSend(const base::unique_fd& fd, const char* what, const void* data,
- size_t size);
- [[nodiscard]] bool rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
- const char* what, void* data, size_t size);
+ [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const char* what, const void* data,
+ size_t size);
+ [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const char* what, void* data, size_t size);
[[nodiscard]] status_t waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session,
Parcel* reply);
@@ -129,7 +133,8 @@
const RpcWireHeader& command);
[[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd,
const sp<RpcSession>& session,
- CommandData transactionData);
+ CommandData transactionData,
+ sp<IBinder>&& targetRef);
[[nodiscard]] status_t processDecStrong(const base::unique_fd& fd,
const sp<RpcSession>& session,
const RpcWireHeader& command);
@@ -165,6 +170,7 @@
// async transaction queue, _only_ for local binder
struct AsyncTodo {
+ sp<IBinder> ref;
CommandData data;
uint64_t asyncNumber = 0;
diff --git a/libs/binder/include/binder/IPCThreadState.h b/libs/binder/include/binder/IPCThreadState.h
index 204926d..20a9f36 100644
--- a/libs/binder/include/binder/IPCThreadState.h
+++ b/libs/binder/include/binder/IPCThreadState.h
@@ -192,12 +192,6 @@
// This constant needs to be kept in sync with Binder.UNSET_WORKSOURCE from the Java
// side.
static const int32_t kUnsetWorkSource = -1;
-
- // Create a temp reference until commands in queue flushed to driver
- // Internal only.
- // @internal
- void createTransactionReference(RefBase* ref);
-
private:
IPCThreadState();
~IPCThreadState();
diff --git a/libs/binder/include/binder/ParcelRef.h b/libs/binder/include/binder/ParcelRef.h
deleted file mode 100644
index 497da2d..0000000
--- a/libs/binder/include/binder/ParcelRef.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (C) 2020 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-
-#include <binder/Parcel.h>
-#include <utils/RefBase.h>
-
-// ---------------------------------------------------------------------------
-namespace android {
-
-/**
- * internal use only
- * @internal
- */
-class ParcelRef : public Parcel, public RefBase
-{
-public:
- static sp<ParcelRef> create() {
- return new ParcelRef();
- }
-
-private:
- ParcelRef() = default;
-};
-
-} // namespace android
-
-// ---------------------------------------------------------------------------
\ No newline at end of file
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 178459d..a08c401 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -127,6 +127,10 @@
* If a client needs to actively terminate join, call shutdown() in a separate thread.
*
* At any given point, there can only be one thread calling join().
+ *
+ * Warning: if shutdown is called, this will return while the shutdown is
+ * still occurring. To ensure that the service is fully shutdown, you might
+ * want to call shutdown after 'join' returns.
*/
void join();
@@ -135,7 +139,7 @@
* (e.g. no join() is running). Will wait for the server to be fully
* shutdown.
*
- * TODO(b/185167543): wait for sessions to shutdown as well
+ * Warning: this will hang if it is called from its own thread.
*/
[[nodiscard]] bool shutdown();
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index d46f275..4401aaf 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -83,7 +83,7 @@
*/
status_t getRemoteMaxThreads(size_t* maxThreads);
- [[nodiscard]] status_t transact(const RpcAddress& address, uint32_t code, const Parcel& data,
+ [[nodiscard]] status_t transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
Parcel* reply, uint32_t flags);
[[nodiscard]] status_t sendDecStrong(const RpcAddress& address);
@@ -102,6 +102,7 @@
/** This is not a pipe. */
struct FdTrigger {
+ /** Returns nullptr for error case */
static std::unique_ptr<FdTrigger> make();
/**
@@ -155,7 +156,7 @@
bool setupSocketClient(const RpcSocketAddress& address);
bool setupOneSocketClient(const RpcSocketAddress& address, int32_t sessionId);
- void addClientConnection(base::unique_fd fd);
+ bool addClientConnection(base::unique_fd fd);
void setForServer(const wp<RpcServer>& server, int32_t sessionId,
const std::shared_ptr<FdTrigger>& shutdownTrigger);
sp<RpcConnection> assignServerToThisThread(base::unique_fd fd);
diff --git a/libs/binder/tests/IBinderRpcTest.aidl b/libs/binder/tests/IBinderRpcTest.aidl
index 41daccc..646bcc6 100644
--- a/libs/binder/tests/IBinderRpcTest.aidl
+++ b/libs/binder/tests/IBinderRpcTest.aidl
@@ -55,6 +55,7 @@
oneway void sleepMsAsync(int ms);
void die(boolean cleanup);
+ void scheduleShutdown();
void useKernelBinderCallingId();
}
diff --git a/libs/binder/tests/binderLibTest.cpp b/libs/binder/tests/binderLibTest.cpp
index 6006fd3..3289b5f 100644
--- a/libs/binder/tests/binderLibTest.cpp
+++ b/libs/binder/tests/binderLibTest.cpp
@@ -35,7 +35,6 @@
#include <binder/IBinder.h>
#include <binder/IPCThreadState.h>
#include <binder/IServiceManager.h>
-#include <binder/ParcelRef.h>
#include <binder/RpcServer.h>
#include <binder/RpcSession.h>
@@ -932,36 +931,6 @@
}
}
-TEST_F(BinderLibTest, ParcelAllocatedOnAnotherThread) {
- sp<IBinder> server = addServer();
- ASSERT_TRUE(server != nullptr);
-
- Parcel data;
- sp<ParcelRef> reply = ParcelRef::create();
-
- // when we have a Parcel which is deleted on another thread, if it gets
- // deleted, it will tell the kernel this, and it will drop strong references
- // to binder, so that we can't BR_ACQUIRE would fail
- IPCThreadState::self()->createTransactionReference(reply.get());
- ASSERT_EQ(NO_ERROR, server->transact(BINDER_LIB_TEST_CREATE_BINDER_TRANSACTION,
- data,
- reply.get()));
-
- // we have sp to binder, but it is not actually acquired by kernel, the
- // transaction is sitting on an out buffer
- sp<IBinder> binder = reply->readStrongBinder();
-
- std::thread([&] {
- // without the transaction reference, this would cause the Parcel to be
- // deallocated before the first thread flushes BR_ACQUIRE
- reply = nullptr;
- IPCThreadState::self()->flushCommands();
- }).join();
-
- ASSERT_NE(nullptr, binder);
- ASSERT_EQ(NO_ERROR, binder->pingBinder());
-}
-
TEST_F(BinderLibTest, CheckNoHeaderMappedInUser) {
Parcel data, reply;
sp<BinderLibTestCallBack> callBack = new BinderLibTestCallBack();
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 1182ecc..efc70e6 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -194,6 +194,18 @@
_exit(1);
}
}
+
+ Status scheduleShutdown() override {
+ sp<RpcServer> strongServer = server.promote();
+ if (strongServer == nullptr) {
+ return Status::fromExceptionCode(Status::EX_NULL_POINTER);
+ }
+ std::thread([=] {
+ LOG_ALWAYS_FATAL_IF(!strongServer->shutdown(), "Could not shutdown");
+ }).detach();
+ return Status::ok();
+ }
+
Status useKernelBinderCallingId() override {
// this is WRONG! It does not make sense when using RPC binder, and
// because it is SO wrong, and so much code calls this, it should abort!
@@ -225,11 +237,13 @@
prctl(PR_SET_PDEATHSIG, SIGHUP);
f(&mPipe);
+
+ exit(0);
}
}
~Process() {
if (mPid != 0) {
- kill(mPid, SIGKILL);
+ waitpid(mPid, nullptr, 0);
}
}
Pipe* getPipe() { return &mPipe; }
@@ -290,11 +304,11 @@
sp<IBinderRpcTest> rootIface;
// whether session should be invalidated by end of run
- bool expectInvalid = false;
+ bool expectAlreadyShutdown = false;
BinderRpcTestProcessSession(BinderRpcTestProcessSession&&) = default;
~BinderRpcTestProcessSession() {
- if (!expectInvalid) {
+ if (!expectAlreadyShutdown) {
std::vector<int32_t> remoteCounts;
// calling over any sessions counts across all sessions
EXPECT_OK(rootIface->countBinders(&remoteCounts));
@@ -302,6 +316,8 @@
for (auto remoteCount : remoteCounts) {
EXPECT_EQ(remoteCount, 1);
}
+
+ EXPECT_OK(rootIface->scheduleShutdown());
}
rootIface = nullptr;
@@ -373,6 +389,9 @@
configure(server);
server->join();
+
+ // Another thread calls shutdown. Wait for it to complete.
+ (void)server->shutdown();
}),
};
@@ -424,15 +443,6 @@
}
};
-TEST_P(BinderRpc, RootObjectIsNull) {
- auto proc = createRpcTestSocketServerProcess(1, 1, [](const sp<RpcServer>& server) {
- // this is the default, but to be explicit
- server->setRootObject(nullptr);
- });
-
- EXPECT_EQ(nullptr, proc.sessions.at(0).root);
-}
-
TEST_P(BinderRpc, Ping) {
auto proc = createRpcTestSocketServerProcess(1);
ASSERT_NE(proc.rootBinder, nullptr);
@@ -843,8 +853,7 @@
constexpr size_t kReallyLongTimeMs = 100;
constexpr size_t kSleepMs = kReallyLongTimeMs * 5;
- // more than one thread, just so this doesn't deadlock
- auto proc = createRpcTestSocketServerProcess(2);
+ auto proc = createRpcTestSocketServerProcess(1);
size_t epochMsBefore = epochMillis();
@@ -876,6 +885,14 @@
size_t epochMsAfter = epochMillis();
EXPECT_GT(epochMsAfter, epochMsBefore + kSleepMs * kNumSleeps);
+
+ // pending oneway transactions hold ref, make sure we read data on all
+ // sockets
+ std::vector<std::thread> threads;
+ for (size_t i = 0; i < 1 + kNumExtraServerThreads; i++) {
+ threads.push_back(std::thread([&] { EXPECT_OK(proc.rootIface->sleepMs(250)); }));
+ }
+ for (auto& t : threads) t.join();
}
TEST_P(BinderRpc, Die) {
@@ -893,7 +910,7 @@
EXPECT_EQ(DEAD_OBJECT, proc.rootIface->die(doDeathCleanup).transactionError())
<< "Do death cleanup: " << doDeathCleanup;
- proc.expectInvalid = true;
+ proc.expectAlreadyShutdown = true;
}
}
@@ -907,7 +924,7 @@
// second time! we catch the error :)
EXPECT_EQ(DEAD_OBJECT, proc.rootIface->useKernelBinderCallingId().transactionError());
- proc.expectInvalid = true;
+ proc.expectAlreadyShutdown = true;
}
TEST_P(BinderRpc, WorksWithLibbinderNdkPing) {
diff --git a/opengl/Android.bp b/opengl/Android.bp
index 16ce15b..b15694b 100644
--- a/opengl/Android.bp
+++ b/opengl/Android.bp
@@ -66,6 +66,7 @@
cc_library_headers {
name: "gl_headers",
+ host_supported: true,
vendor_available: true,
export_include_dirs: ["include"],
llndk: {