Merge "Change permissions on ref profile so that ART can read it"
diff --git a/libs/binder/Android.bp b/libs/binder/Android.bp
index 6e7f74f..b9c8d20 100644
--- a/libs/binder/Android.bp
+++ b/libs/binder/Android.bp
@@ -71,6 +71,8 @@
cc_library {
name: "libbinder",
+ version_script: "libbinder.map",
+
// for vndbinder
vendor_available: true,
vndk: {
@@ -307,3 +309,17 @@
export_aidl_headers: true,
},
}
+
+cc_binary {
+ name: "servicedispatcher",
+ host_supported: false,
+ srcs: [
+ "servicedispatcher.cpp",
+ ],
+ shared_libs: [
+ "libbase",
+ "libbinder",
+ "liblog",
+ "libutils",
+ ],
+}
diff --git a/libs/binder/Parcel.cpp b/libs/binder/Parcel.cpp
index 232a70c..ebba375 100644
--- a/libs/binder/Parcel.cpp
+++ b/libs/binder/Parcel.cpp
@@ -206,6 +206,7 @@
status_t status = writeInt32(1); // non-null
if (status != OK) return status;
RpcAddress address = RpcAddress::zero();
+ // TODO(b/167966510): need to undo this if the Parcel is not sent
status = mSession->state()->onBinderLeaving(mSession, binder, &address);
if (status != OK) return status;
status = address.writeToParcel(this);
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 2f378da..2d2eed2 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -187,6 +187,11 @@
}
mShutdownTrigger->trigger();
+ for (auto& [id, session] : mSessions) {
+ (void)id;
+ session->mShutdownTrigger->trigger();
+ }
+
while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) {
ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, "
@@ -261,7 +266,7 @@
};
server->mConnectingThreads.erase(threadId);
- if (!idValid) {
+ if (!idValid || server->mShutdownTrigger->isTriggered()) {
return;
}
@@ -276,10 +281,14 @@
session = RpcSession::make();
session->setMaxThreads(server->mMaxThreads);
- session->setForServer(server,
- sp<RpcServer::EventListener>::fromExisting(
- static_cast<RpcServer::EventListener*>(server.get())),
- server->mSessionIdCounter, server->mShutdownTrigger);
+ if (!session->setForServer(server,
+ sp<RpcServer::EventListener>::fromExisting(
+ static_cast<RpcServer::EventListener*>(
+ server.get())),
+ server->mSessionIdCounter)) {
+ ALOGE("Failed to attach server to session");
+ return;
+ }
server->mSessions[server->mSessionIdCounter] = session;
} else {
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index c563377..de9aa22 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -113,17 +113,21 @@
return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
}
-bool RpcSession::shutdown() {
+bool RpcSession::shutdownAndWait(bool wait) {
std::unique_lock<std::mutex> _l(mMutex);
- LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session");
LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed");
- LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
mShutdownTrigger->trigger();
- mShutdownListener->waitForShutdown(_l);
- mState->terminate();
- LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
+ if (wait) {
+ LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
+ mShutdownListener->waitForShutdown(_l);
+ LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
+ }
+
+ _l.unlock();
+ mState->clear();
+
return true;
}
@@ -139,12 +143,15 @@
status_t RpcSession::sendDecStrong(const RpcAddress& address) {
ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
ConnectionUse::CLIENT_REFCOUNT);
- return state()->sendDecStrong(connection.fd(), address);
+ return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address);
}
std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
auto ret = std::make_unique<RpcSession::FdTrigger>();
- if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr;
+ if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
+ ALOGE("Could not create pipe %s", strerror(errno));
+ return nullptr;
+ }
return ret;
}
@@ -152,6 +159,10 @@
mWrite.reset();
}
+bool RpcSession::FdTrigger::isTriggered() {
+ return mWrite == -1;
+}
+
status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
while (true) {
pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0},
@@ -285,7 +296,7 @@
if (!setupOneSocketConnection(addr, RPC_SESSION_ID_NEW, false /*reverse*/)) return false;
- // TODO(b/185167543): we should add additional sessions dynamically
+ // TODO(b/189955605): we should add additional sessions dynamically
// instead of all at once.
// TODO(b/186470974): first risk of blocking
size_t numThreadsAvailable;
@@ -303,11 +314,11 @@
// we've already setup one client
for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
- // TODO(b/185167543): shutdown existing connections?
+ // TODO(b/189955605): shutdown existing connections?
if (!setupOneSocketConnection(addr, mId.value(), false /*reverse*/)) return false;
}
- // TODO(b/185167543): we should add additional sessions dynamically
+ // TODO(b/189955605): we should add additional sessions dynamically
// instead of all at once - the other side should be responsible for setting
// up additional connections. We need to create at least one (unless 0 are
// requested to be set) in order to allow the other side to reliably make
@@ -408,20 +419,21 @@
return true;
}
-void RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
- int32_t sessionId,
- const std::shared_ptr<FdTrigger>& shutdownTrigger) {
+bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
+ int32_t sessionId) {
LOG_ALWAYS_FATAL_IF(mForServer != nullptr);
LOG_ALWAYS_FATAL_IF(server == nullptr);
LOG_ALWAYS_FATAL_IF(mEventListener != nullptr);
LOG_ALWAYS_FATAL_IF(eventListener == nullptr);
LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
- LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr);
+
+ mShutdownTrigger = FdTrigger::make();
+ if (mShutdownTrigger == nullptr) return false;
mId = sessionId;
mForServer = server;
mEventListener = eventListener;
- mShutdownTrigger = shutdownTrigger;
+ return true;
}
sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 93f1529..6899981 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -137,9 +137,39 @@
dumpLocked();
}
-void RpcState::terminate() {
+void RpcState::clear() {
std::unique_lock<std::mutex> _l(mNodeMutex);
- terminate(_l);
+
+ if (mTerminated) {
+ LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(),
+ "New state should be impossible after terminating!");
+ return;
+ }
+
+ if (SHOULD_LOG_RPC_DETAIL) {
+ ALOGE("RpcState::clear()");
+ dumpLocked();
+ }
+
+ // if the destructor of a binder object makes another RPC call, then calling
+ // decStrong could deadlock. So, we must hold onto these binders until
+ // mNodeMutex is no longer taken.
+ std::vector<sp<IBinder>> tempHoldBinder;
+
+ mTerminated = true;
+ for (auto& [address, node] : mNodeForAddress) {
+ sp<IBinder> binder = node.binder.promote();
+ LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());
+
+ if (node.sentRef != nullptr) {
+ tempHoldBinder.push_back(node.sentRef);
+ }
+ }
+
+ mNodeForAddress.clear();
+
+ _l.unlock();
+ tempHoldBinder.clear(); // explicit
}
void RpcState::dumpLocked() {
@@ -170,32 +200,6 @@
ALOGE("END DUMP OF RpcState");
}
-void RpcState::terminate(std::unique_lock<std::mutex>& lock) {
- if (SHOULD_LOG_RPC_DETAIL) {
- ALOGE("RpcState::terminate()");
- dumpLocked();
- }
-
- // if the destructor of a binder object makes another RPC call, then calling
- // decStrong could deadlock. So, we must hold onto these binders until
- // mNodeMutex is no longer taken.
- std::vector<sp<IBinder>> tempHoldBinder;
-
- mTerminated = true;
- for (auto& [address, node] : mNodeForAddress) {
- sp<IBinder> binder = node.binder.promote();
- LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());
-
- if (node.sentRef != nullptr) {
- tempHoldBinder.push_back(node.sentRef);
- }
- }
-
- mNodeForAddress.clear();
-
- lock.unlock();
- tempHoldBinder.clear(); // explicit
-}
RpcState::CommandData::CommandData(size_t size) : mSize(size) {
// The maximum size for regular binder is 1MB for all concurrent
@@ -218,13 +222,13 @@
mData.reset(new (std::nothrow) uint8_t[size]);
}
-status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data,
- size_t size) {
+status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const char* what, const void* data, size_t size) {
LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
if (size > std::numeric_limits<ssize_t>::max()) {
ALOGE("Cannot send %s at size %zu (too big)", what, size);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
@@ -235,7 +239,7 @@
LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
size, fd.get(), strerror(savedErrno));
- terminate();
+ (void)session->shutdownAndWait(false);
return -savedErrno;
}
@@ -246,7 +250,7 @@
const char* what, void* data, size_t size) {
if (size > std::numeric_limits<ssize_t>::max()) {
ALOGE("Cannot rec %s at size %zu (too big)", what, size);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
@@ -358,7 +362,11 @@
if (flags & IBinder::FLAG_ONEWAY) {
asyncNumber = it->second.asyncNumber;
- if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
+ if (!nodeProgressAsyncNumber(&it->second)) {
+ _l.unlock();
+ (void)session->shutdownAndWait(false);
+ return DEAD_OBJECT;
+ }
}
}
@@ -390,8 +398,10 @@
data.dataSize());
if (status_t status =
- rpcSend(fd, "transaction", transactionData.data(), transactionData.size());
+ rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size());
status != OK)
+ // TODO(b/167966510): need to undo onBinderLeaving - we know the
+ // refcount isn't successfully transferred.
return status;
if (flags & IBinder::FLAG_ONEWAY) {
@@ -442,7 +452,7 @@
if (command.bodySize < sizeof(RpcWireReply)) {
ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!",
sizeof(RpcWireReply), command.bodySize);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data.data());
@@ -457,7 +467,8 @@
return OK;
}
-status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& addr) {
+status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const RpcAddress& addr) {
{
std::lock_guard<std::mutex> _l(mNodeMutex);
if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
@@ -476,10 +487,10 @@
.command = RPC_COMMAND_DEC_STRONG,
.bodySize = sizeof(RpcWireAddress),
};
- if (status_t status = rpcSend(fd, "dec ref header", &cmd, sizeof(cmd)); status != OK)
+ if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK)
return status;
- if (status_t status =
- rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress));
+ if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(),
+ sizeof(RpcWireAddress));
status != OK)
return status;
return OK;
@@ -538,7 +549,7 @@
// also can't consider it a fatal error because this would allow any client
// to kill us, so ending the session for misbehaving client.
ALOGE("Unknown RPC command %d - terminating session", command.command);
- terminate();
+ (void)session->shutdownAndWait(false);
return DEAD_OBJECT;
}
status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -571,7 +582,7 @@
if (transactionData.size() < sizeof(RpcWireTransaction)) {
ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
sizeof(RpcWireTransaction), transactionData.size());
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
RpcWireTransaction* transaction = reinterpret_cast<RpcWireTransaction*>(transactionData.data());
@@ -600,15 +611,15 @@
// session.
ALOGE("While transacting, binder has been deleted at address %s. Terminating!",
addr.toString().c_str());
- terminate();
+ (void)session->shutdownAndWait(false);
replyStatus = BAD_VALUE;
} else if (target->localBinder() == nullptr) {
ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!",
addr.toString().c_str());
- terminate();
+ (void)session->shutdownAndWait(false);
replyStatus = BAD_VALUE;
} else if (transaction->flags & IBinder::FLAG_ONEWAY) {
- std::lock_guard<std::mutex> _l(mNodeMutex);
+ std::unique_lock<std::mutex> _l(mNodeMutex);
auto it = mNodeForAddress.find(addr);
if (it->second.binder.promote() != target) {
ALOGE("Binder became invalid during transaction. Bad client? %s",
@@ -617,16 +628,33 @@
} else if (transaction->asyncNumber != it->second.asyncNumber) {
// we need to process some other asynchronous transaction
// first
- // TODO(b/183140903): limit enqueues/detect overfill for bad client
- // TODO(b/183140903): detect when an object is deleted when it still has
- // pending async transactions
it->second.asyncTodo.push(BinderNode::AsyncTodo{
.ref = target,
.data = std::move(transactionData),
.asyncNumber = transaction->asyncNumber,
});
- LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s", transaction->asyncNumber,
- addr.toString().c_str());
+
+ size_t numPending = it->second.asyncTodo.size();
+ LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s (%zu pending)",
+ transaction->asyncNumber, addr.toString().c_str(), numPending);
+
+ constexpr size_t kArbitraryOnewayCallTerminateLevel = 10000;
+ constexpr size_t kArbitraryOnewayCallWarnLevel = 1000;
+ constexpr size_t kArbitraryOnewayCallWarnPer = 1000;
+
+ if (numPending >= kArbitraryOnewayCallWarnLevel) {
+ if (numPending >= kArbitraryOnewayCallTerminateLevel) {
+ ALOGE("WARNING: %zu pending oneway transactions. Terminating!", numPending);
+ _l.unlock();
+ (void)session->shutdownAndWait(false);
+ return FAILED_TRANSACTION;
+ }
+
+ if (numPending % kArbitraryOnewayCallWarnPer == 0) {
+ ALOGW("Warning: many oneway transactions built up on %p (%zu)",
+ target.get(), numPending);
+ }
+ }
return OK;
}
}
@@ -707,7 +735,11 @@
// last refcount dropped after this transaction happened
if (it == mNodeForAddress.end()) return OK;
- if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
+ if (!nodeProgressAsyncNumber(&it->second)) {
+ _l.unlock();
+ (void)session->shutdownAndWait(false);
+ return DEAD_OBJECT;
+ }
if (it->second.asyncTodo.size() == 0) return OK;
if (it->second.asyncTodo.top().asyncNumber == it->second.asyncNumber) {
@@ -753,7 +785,7 @@
memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(),
reply.dataSize());
- return rpcSend(fd, "reply", replyData.data(), replyData.size());
+ return rpcSend(fd, session, "reply", replyData.data(), replyData.size());
}
status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -772,7 +804,7 @@
if (command.bodySize < sizeof(RpcWireAddress)) {
ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireAddress. Terminating!",
sizeof(RpcWireAddress), command.bodySize);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
RpcWireAddress* address = reinterpret_cast<RpcWireAddress*>(commandData.data());
@@ -790,7 +822,8 @@
if (target == nullptr) {
ALOGE("While requesting dec strong, binder has been deleted at address %s. Terminating!",
addr.toString().c_str());
- terminate();
+ _l.unlock();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
@@ -826,12 +859,11 @@
return ref;
}
-bool RpcState::nodeProgressAsyncNumber(BinderNode* node, std::unique_lock<std::mutex>& lock) {
+bool RpcState::nodeProgressAsyncNumber(BinderNode* node) {
// 2**64 =~ 10**19 =~ 1000 transactions per second for 585 million years to
// a single binder
if (node->asyncNumber >= std::numeric_limits<decltype(node->asyncNumber)>::max()) {
ALOGE("Out of async transaction IDs. Terminating");
- terminate(lock);
return false;
}
node->asyncNumber++;
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index 81ff458..13c3115 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -65,7 +65,8 @@
uint32_t code, const Parcel& data,
const sp<RpcSession>& session, Parcel* reply,
uint32_t flags);
- [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address);
+ [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const RpcAddress& address);
enum class CommandType {
ANY,
@@ -110,11 +111,10 @@
* WARNING: RpcState is responsible for calling this when the session is
* no longer recoverable.
*/
- void terminate();
+ void clear();
private:
void dumpLocked();
- void terminate(std::unique_lock<std::mutex>& lock);
// Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
// large allocations to avoid being requested from allocating too much data.
@@ -130,8 +130,8 @@
size_t mSize;
};
- [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const char* what, const void* data,
- size_t size);
+ [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const char* what, const void* data, size_t size);
[[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
const char* what, void* data, size_t size);
@@ -204,9 +204,8 @@
// dropped after any locks are removed.
sp<IBinder> tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& it);
// true - success
- // false - state terminated, lock gone, halt
- [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node,
- std::unique_lock<std::mutex>& lock);
+ // false - session shutdown, halt
+ [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node);
std::mutex mNodeMutex;
bool mTerminated = false;
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 98db221..4e6934b 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -97,7 +97,7 @@
*
* If this is not specified, this will be a single-threaded server.
*
- * TODO(b/185167543): these are currently created per client, but these
+ * TODO(b/167966510): these are currently created per client, but these
* should be shared.
*/
void setMaxThreads(size_t threads);
@@ -173,7 +173,7 @@
wp<IBinder> mRootObjectWeak;
std::map<int32_t, sp<RpcSession>> mSessions;
int32_t mSessionIdCounter = 0;
- std::shared_ptr<RpcSession::FdTrigger> mShutdownTrigger;
+ std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
std::condition_variable mShutdownCv;
};
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index a6bc1a9..4650cf2 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -54,7 +54,7 @@
* If this is called, 'shutdown' on this session must also be called.
* Otherwise, a threadpool will leak.
*
- * TODO(b/185167543): start these dynamically
+ * TODO(b/189955605): start these dynamically
*/
void setMaxThreads(size_t threads);
size_t getMaxThreads();
@@ -97,14 +97,20 @@
status_t getRemoteMaxThreads(size_t* maxThreads);
/**
- * Shuts down the service. Only works for client sessions (server-side
- * sessions currently only support shutting down the entire server).
+ * Shuts down the service.
+ *
+ * For client sessions, wait can be true or false. For server sessions,
+ * waiting is not currently supported (will abort).
*
* Warning: this is currently not active/nice (the server isn't told we're
* shutting down). Being nicer to the server could potentially make it
* reclaim resources faster.
+ *
+ * If this is called w/ 'wait' true, then this will wait for shutdown to
+ * complete before returning. This will hang if it is called from the
+ * session threadpool (when processing received calls).
*/
- [[nodiscard]] bool shutdown();
+ [[nodiscard]] bool shutdownAndWait(bool wait);
[[nodiscard]] status_t transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
Parcel* reply, uint32_t flags);
@@ -129,16 +135,17 @@
static std::unique_ptr<FdTrigger> make();
/**
- * poll() on this fd for POLLHUP to get notification when trigger is called
- */
- base::borrowed_fd readFd() const { return mRead; }
-
- /**
* Close the write end of the pipe so that the read end receives POLLHUP.
+ * Not threadsafe.
*/
void trigger();
/**
+ * Whether this has been triggered.
+ */
+ bool isTriggered();
+
+ /**
* Poll for a read event.
*
* Return:
@@ -197,9 +204,9 @@
[[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId,
bool server);
[[nodiscard]] bool addClientConnection(base::unique_fd fd);
- void setForServer(const wp<RpcServer>& server,
- const wp<RpcSession::EventListener>& eventListener, int32_t sessionId,
- const std::shared_ptr<FdTrigger>& shutdownTrigger);
+ [[nodiscard]] bool setForServer(const wp<RpcServer>& server,
+ const wp<RpcSession::EventListener>& eventListener,
+ int32_t sessionId);
sp<RpcConnection> assignServerToThisThread(base::unique_fd fd);
[[nodiscard]] bool removeServerConnection(const sp<RpcConnection>& connection);
@@ -252,7 +259,7 @@
// TODO(b/183988761): this shouldn't be guessable
std::optional<int32_t> mId;
- std::shared_ptr<FdTrigger> mShutdownTrigger;
+ std::unique_ptr<FdTrigger> mShutdownTrigger;
std::unique_ptr<RpcState> mState;
@@ -266,9 +273,6 @@
size_t mClientConnectionsOffset = 0;
std::vector<sp<RpcConnection>> mClientConnections;
std::vector<sp<RpcConnection>> mServerConnections;
-
- // TODO(b/185167543): allow sharing between different sessions in a
- // process? (or combine with mServerConnections)
std::map<std::thread::id, std::thread> mThreads;
};
diff --git a/libs/binder/libbinder.map b/libs/binder/libbinder.map
new file mode 100644
index 0000000..9ca14bc
--- /dev/null
+++ b/libs/binder/libbinder.map
@@ -0,0 +1,5 @@
+# b/190148312: Populate with correct list of ABI symbols
+LIBBINDER {
+ global:
+ *;
+};
diff --git a/libs/binder/rust/src/lib.rs b/libs/binder/rust/src/lib.rs
index 2694cba..7c0584b 100644
--- a/libs/binder/rust/src/lib.rs
+++ b/libs/binder/rust/src/lib.rs
@@ -115,14 +115,14 @@
pub use native::add_service;
pub use native::Binder;
pub use parcel::Parcel;
-pub use proxy::{get_interface, get_service};
+pub use proxy::{get_interface, get_service, wait_for_interface, wait_for_service};
pub use proxy::{AssociateClass, DeathRecipient, Proxy, SpIBinder, WpIBinder};
pub use state::{ProcessState, ThreadState};
/// The public API usable outside AIDL-generated interface crates.
pub mod public_api {
pub use super::parcel::ParcelFileDescriptor;
- pub use super::{add_service, get_interface};
+ pub use super::{add_service, get_interface, wait_for_interface};
pub use super::{
BinderFeatures, DeathRecipient, ExceptionCode, IBinder, Interface, ProcessState, SpIBinder,
Status, StatusCode, Strong, ThreadState, Weak, WpIBinder,
diff --git a/libs/binder/rust/src/proxy.rs b/libs/binder/rust/src/proxy.rs
index 52036f5..4a6d118 100644
--- a/libs/binder/rust/src/proxy.rs
+++ b/libs/binder/rust/src/proxy.rs
@@ -653,6 +653,18 @@
}
}
+/// Retrieve an existing service, or start it if it is configured as a dynamic
+/// service and isn't yet started.
+pub fn wait_for_service(name: &str) -> Option<SpIBinder> {
+ let name = CString::new(name).ok()?;
+ unsafe {
+ // Safety: `AServiceManager_waitforService` returns either a null
+ // pointer or a valid pointer to an owned `AIBinder`. Either of these
+ // values is safe to pass to `SpIBinder::from_raw`.
+ SpIBinder::from_raw(sys::AServiceManager_waitForService(name.as_ptr()))
+ }
+}
+
/// Retrieve an existing service for a particular interface, blocking for a few
/// seconds if it doesn't yet exist.
pub fn get_interface<T: FromIBinder + ?Sized>(name: &str) -> Result<Strong<T>> {
@@ -663,6 +675,16 @@
}
}
+/// Retrieve an existing service for a particular interface, or start it if it
+/// is configured as a dynamic service and isn't yet started.
+pub fn wait_for_interface<T: FromIBinder + ?Sized>(name: &str) -> Result<Strong<T>> {
+ let service = wait_for_service(name);
+ match service {
+ Some(service) => FromIBinder::try_from(service),
+ None => Err(StatusCode::NAME_NOT_FOUND),
+ }
+}
+
/// # Safety
///
/// `SpIBinder` guarantees that `binder` always contains a valid pointer to an
diff --git a/libs/binder/rust/tests/integration.rs b/libs/binder/rust/tests/integration.rs
index 0332007..10b77f4 100644
--- a/libs/binder/rust/tests/integration.rs
+++ b/libs/binder/rust/tests/integration.rs
@@ -274,6 +274,20 @@
}
#[test]
+ fn check_wait_for_service() {
+ let mut sm =
+ binder::wait_for_service("manager").expect("Did not get manager binder service");
+ assert!(sm.is_binder_alive());
+ assert!(sm.ping_binder().is_ok());
+
+ // The service manager service isn't an ITest, so this must fail.
+ assert_eq!(
+ binder::wait_for_interface::<dyn ITest>("manager").err(),
+ Some(StatusCode::BAD_TYPE)
+ );
+ }
+
+ #[test]
fn trivial_client() {
let service_name = "trivial_client_test";
let _process = ScopedServiceProcess::new(service_name);
@@ -283,6 +297,15 @@
}
#[test]
+ fn wait_for_trivial_client() {
+ let service_name = "wait_for_trivial_client_test";
+ let _process = ScopedServiceProcess::new(service_name);
+ let test_client: Strong<dyn ITest> =
+ binder::wait_for_interface(service_name).expect("Did not get manager binder service");
+ assert_eq!(test_client.test().unwrap(), "wait_for_trivial_client_test");
+ }
+
+ #[test]
fn get_selinux_context() {
let service_name = "get_selinux_context";
let _process = ScopedServiceProcess::new(service_name);
diff --git a/libs/binder/servicedispatcher.cpp b/libs/binder/servicedispatcher.cpp
new file mode 100644
index 0000000..f61df08
--- /dev/null
+++ b/libs/binder/servicedispatcher.cpp
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdint.h>
+#include <sysexits.h>
+#include <unistd.h>
+
+#include <iostream>
+
+#include <android-base/file.h>
+#include <android-base/logging.h>
+#include <android-base/parseint.h>
+#include <android-base/properties.h>
+#include <android-base/stringprintf.h>
+#include <binder/IServiceManager.h>
+#include <binder/RpcServer.h>
+
+using android::defaultServiceManager;
+using android::OK;
+using android::RpcServer;
+using android::statusToString;
+using android::String16;
+using android::base::Basename;
+using android::base::GetBoolProperty;
+using android::base::InitLogging;
+using android::base::LogdLogger;
+using android::base::LogId;
+using android::base::LogSeverity;
+using android::base::ParseUint;
+using android::base::StdioLogger;
+using android::base::StringPrintf;
+
+namespace {
+int Usage(const char* program) {
+ auto format = R"(dispatch calls to RPC service.
+Usage:
+ %s [-n <num_threads>] <service_name>
+ -n <num_threads>: number of RPC threads added to the service (default 1).
+ <service_name>: the service to connect to.
+)";
+ LOG(ERROR) << StringPrintf(format, Basename(program).c_str());
+ return EX_USAGE;
+}
+
+int Dispatch(const char* name, uint32_t numThreads) {
+ auto sm = defaultServiceManager();
+ if (nullptr == sm) {
+ LOG(ERROR) << "No servicemanager";
+ return EX_SOFTWARE;
+ }
+ auto binder = sm->checkService(String16(name));
+ if (nullptr == binder) {
+ LOG(ERROR) << "No service \"" << name << "\"";
+ return EX_SOFTWARE;
+ }
+ auto rpcServer = RpcServer::make();
+ if (nullptr == rpcServer) {
+ LOG(ERROR) << "Cannot create RpcServer";
+ return EX_SOFTWARE;
+ }
+ rpcServer->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
+ unsigned int port;
+ if (!rpcServer->setupInetServer(0, &port)) {
+ LOG(ERROR) << "setupInetServer failed";
+ return EX_SOFTWARE;
+ }
+ auto socket = rpcServer->releaseServer();
+ auto status = binder->setRpcClientDebug(std::move(socket), numThreads);
+ if (status != OK) {
+ LOG(ERROR) << "setRpcClientDebug failed with " << statusToString(status);
+ return EX_SOFTWARE;
+ }
+ LOG(INFO) << "Finish setting up RPC on service " << name << " with " << numThreads
+ << " threads on port" << port;
+
+ std::cout << port << std::endl;
+ return EX_OK;
+}
+
+// Log to logd. For warning and more severe messages, also log to stderr.
+class ServiceDispatcherLogger {
+public:
+ void operator()(LogId id, LogSeverity severity, const char* tag, const char* file,
+ unsigned int line, const char* message) {
+ mLogdLogger(id, severity, tag, file, line, message);
+ if (severity >= LogSeverity::WARNING) {
+ std::cout << std::flush;
+ std::cerr << Basename(getprogname()) << ": " << message << std::endl;
+ }
+ }
+
+private:
+ LogdLogger mLogdLogger{};
+};
+
+} // namespace
+
+int main(int argc, char* argv[]) {
+ InitLogging(argv, ServiceDispatcherLogger());
+
+ if (!GetBoolProperty("ro.debuggable", false)) {
+ LOG(ERROR) << "servicedispatcher is only allowed on debuggable builds.";
+ return EX_NOPERM;
+ }
+ LOG(WARNING) << "WARNING: servicedispatcher is debug only. Use with caution.";
+
+ uint32_t numThreads = 1;
+ int opt;
+ while (-1 != (opt = getopt(argc, argv, "n:"))) {
+ switch (opt) {
+ case 'n': {
+ if (!ParseUint(optarg, &numThreads)) {
+ return Usage(argv[0]);
+ }
+ } break;
+ default: {
+ return Usage(argv[0]);
+ }
+ }
+ }
+ if (optind + 1 != argc) return Usage(argv[0]);
+ auto name = argv[optind];
+
+ return Dispatch(name, numThreads);
+}
diff --git a/libs/binder/tests/binderLibTest.cpp b/libs/binder/tests/binderLibTest.cpp
index 12b54c2..c4eacfd 100644
--- a/libs/binder/tests/binderLibTest.cpp
+++ b/libs/binder/tests/binderLibTest.cpp
@@ -29,6 +29,7 @@
#include <gtest/gtest.h>
#include <android-base/properties.h>
+#include <android-base/result-gmock.h>
#include <android-base/result.h>
#include <android-base/unique_fd.h>
#include <binder/Binder.h>
@@ -52,6 +53,7 @@
using namespace android;
using namespace std::string_literals;
using namespace std::chrono_literals;
+using android::base::testing::HasValue;
using testing::ExplainMatchResult;
using testing::Not;
using testing::WithParamInterface;
@@ -62,15 +64,6 @@
return expected == arg;
}
-// e.g. Result<int32_t> v = 0; EXPECT_THAT(result, ResultHasValue(0));
-MATCHER_P(ResultHasValue, resultMatcher, "") {
- if (!arg.ok()) {
- *result_listener << "contains error " << arg.error();
- return false;
- }
- return ExplainMatchResult(resultMatcher, arg.value(), result_listener);
-}
-
static ::testing::AssertionResult IsPageAligned(void *buf) {
if (((unsigned long)buf & ((unsigned long)PAGE_SIZE - 1)) == 0)
return ::testing::AssertionSuccess();
@@ -522,7 +515,7 @@
}
TEST_F(BinderLibTest, GetId) {
- EXPECT_THAT(GetId(m_server), ResultHasValue(0));
+ EXPECT_THAT(GetId(m_server), HasValue(0));
}
TEST_F(BinderLibTest, PtrSize) {
@@ -1287,7 +1280,7 @@
sp<IBinder> server = isRemote ? sp<IBinder>(CreateRemoteService(id))
: sp<IBinder>(sp<BinderLibTestService>::make(id, false));
ASSERT_EQ(isRemote, !!server->remoteBinder());
- ASSERT_THAT(GetId(server), ResultHasValue(id));
+ ASSERT_THAT(GetId(server), HasValue(id));
unsigned int port = 0;
// Fake servicedispatcher.
@@ -1317,7 +1310,7 @@
EXPECT_EQ(OK, rpcServerBinder->pingBinder());
// Check that |rpcServerBinder| and |server| points to the same service.
- EXPECT_THAT(GetId(rpcServerBinder), ResultHasValue(id));
+ EXPECT_THAT(GetId(rpcServerBinder), HasValue(id));
// Occupy the server thread. The server should still have enough threads to handle
// other connections.
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 0a970fb..82f8a3e 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -941,6 +941,40 @@
for (auto& t : threads) t.join();
}
+TEST_P(BinderRpc, OnewayCallExhaustion) {
+ constexpr size_t kNumClients = 2;
+ constexpr size_t kTooLongMs = 1000;
+
+ auto proc = createRpcTestSocketServerProcess(kNumClients /*threads*/, 2 /*sessions*/);
+
+ // Build up oneway calls on the second session to make sure it terminates
+ // and shuts down. The first session should be unaffected (proc destructor
+ // checks the first session).
+ auto iface = interface_cast<IBinderRpcTest>(proc.proc.sessions.at(1).root);
+
+ std::vector<std::thread> threads;
+ for (size_t i = 0; i < kNumClients; i++) {
+ // one of these threads will get stuck queueing a transaction once the
+ // socket fills up, the other will be able to fill up transactions on
+ // this object
+ threads.push_back(std::thread([&] {
+ while (iface->sleepMsAsync(kTooLongMs).isOk()) {
+ }
+ }));
+ }
+ for (auto& t : threads) t.join();
+
+ Status status = iface->sleepMsAsync(kTooLongMs);
+ EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status;
+
+ // the second session should be shutdown in the other process by the time we
+ // are able to join above (it'll only be hung up once it finishes processing
+ // any pending commands). We need to erase this session from the record
+ // here, so that the destructor for our session won't check that this
+ // session is valid, but we still want it to test the other session.
+ proc.proc.sessions.erase(proc.proc.sessions.begin() + 1);
+}
+
TEST_P(BinderRpc, Callbacks) {
const static std::string kTestString = "good afternoon!";
@@ -966,7 +1000,7 @@
// since this session has a reverse connection w/ a threadpool, we
// need to manually shut it down
- EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdown());
+ EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));
proc.expectAlreadyShutdown = true;
}