libbinder : Adding new type TransportFd
Adding a new struct TransportFd which will contain unique_fd and
polling state of file descriptor. This will be useful in detecting
if all the descriptors are being polled. unique_fd and borrowed_fd
are replaced in these changes.
Test: m
Test: m libbinder binderRpcTest && atest binderRpcTest
Test: trusty/vendor/google/aosp/scripts/build.py --test
"boot-test:com.android.trusty.binder.test" qemu-generic-arm64-test-debug
Bug: 218518615
Change-Id: Id108806b98184582e5d93186b3b1884017c441ea
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 4c037b7..cfdfdf8 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -1773,7 +1773,7 @@
}
}
mFd = rpcServer->releaseServer();
- if (!mFd.ok()) return AssertionFailure() << "releaseServer returns invalid fd";
+ if (!mFd.fd.ok()) return AssertionFailure() << "releaseServer returns invalid fd";
mCtx = newFactory(rpcSecurity, mCertVerifier, std::move(auth))->newServerCtx();
if (mCtx == nullptr) return AssertionFailure() << "newServerCtx";
mSetup = true;
@@ -1794,7 +1794,7 @@
std::vector<std::thread> threads;
while (OK == mFdTrigger->triggerablePoll(mFd, POLLIN)) {
base::unique_fd acceptedFd(
- TEMP_FAILURE_RETRY(accept4(mFd.get(), nullptr, nullptr /*length*/,
+ TEMP_FAILURE_RETRY(accept4(mFd.fd.get(), nullptr, nullptr /*length*/,
SOCK_CLOEXEC | SOCK_NONBLOCK)));
threads.emplace_back(&Server::handleOne, this, std::move(acceptedFd));
}
@@ -1803,7 +1803,8 @@
}
void handleOne(android::base::unique_fd acceptedFd) {
ASSERT_TRUE(acceptedFd.ok());
- auto serverTransport = mCtx->newTransport(std::move(acceptedFd), mFdTrigger.get());
+ TransportFd transportFd(std::move(acceptedFd));
+ auto serverTransport = mCtx->newTransport(std::move(transportFd), mFdTrigger.get());
if (serverTransport == nullptr) return; // handshake failed
ASSERT_TRUE(mPostConnect(serverTransport.get(), mFdTrigger.get()));
}
@@ -1822,7 +1823,7 @@
std::unique_ptr<std::thread> mThread;
ConnectToServer mConnectToServer;
std::unique_ptr<FdTrigger> mFdTrigger = FdTrigger::make();
- base::unique_fd mFd;
+ TransportFd mFd;
std::unique_ptr<RpcTransportCtx> mCtx;
std::shared_ptr<RpcCertificateVerifierSimple> mCertVerifier =
std::make_shared<RpcCertificateVerifierSimple>();
@@ -1869,7 +1870,7 @@
// connect() and do handshake
bool setUpTransport() {
mFd = mConnectToServer();
- if (!mFd.ok()) return AssertionFailure() << "Cannot connect to server";
+ if (!mFd.fd.ok()) return AssertionFailure() << "Cannot connect to server";
mClientTransport = mCtx->newTransport(std::move(mFd), mFdTrigger.get());
return mClientTransport != nullptr;
}
@@ -1898,9 +1899,11 @@
ASSERT_EQ(readOk, readMessage());
}
+ bool isTransportWaiting() { return mClientTransport->isWaiting(); }
+
private:
ConnectToServer mConnectToServer;
- base::unique_fd mFd;
+ TransportFd mFd;
std::unique_ptr<FdTrigger> mFdTrigger = FdTrigger::make();
std::unique_ptr<RpcTransportCtx> mCtx;
std::shared_ptr<RpcCertificateVerifierSimple> mCertVerifier =
@@ -2147,6 +2150,56 @@
ASSERT_FALSE(client.readMessage(msg2));
}
+TEST_P(RpcTransportTest, CheckWaitingForRead) {
+ std::mutex readMutex;
+ std::condition_variable readCv;
+ bool shouldContinueReading = false;
+ // Server will write data on transport once its started
+ auto serverPostConnect = [&](RpcTransport* serverTransport, FdTrigger* fdTrigger) {
+ std::string message(RpcTransportTestUtils::kMessage);
+ iovec messageIov{message.data(), message.size()};
+ auto status = serverTransport->interruptableWriteFully(fdTrigger, &messageIov, 1,
+ std::nullopt, nullptr);
+ if (status != OK) return AssertionFailure() << statusToString(status);
+
+ {
+ std::unique_lock<std::mutex> lock(readMutex);
+ shouldContinueReading = true;
+ lock.unlock();
+ readCv.notify_all();
+ }
+ return AssertionSuccess();
+ };
+
+ // Setup Server and client
+ auto server = std::make_unique<Server>();
+ ASSERT_TRUE(server->setUp(GetParam()));
+
+ Client client(server->getConnectToServerFn());
+ ASSERT_TRUE(client.setUp(GetParam()));
+
+ ASSERT_EQ(OK, trust(&client, server));
+ ASSERT_EQ(OK, trust(server, &client));
+ server->setPostConnect(serverPostConnect);
+
+ server->start();
+ ASSERT_TRUE(client.setUpTransport());
+ {
+ // Wait till server writes data
+ std::unique_lock<std::mutex> lock(readMutex);
+ ASSERT_TRUE(readCv.wait_for(lock, 3s, [&] { return shouldContinueReading; }));
+ }
+
+ // Since there is no read polling here, we will get polling count 0
+ ASSERT_FALSE(client.isTransportWaiting());
+ ASSERT_TRUE(client.readMessage(RpcTransportTestUtils::kMessage));
+ // Thread should increment polling count, read and decrement polling count
+ // Again, polling count should be zero here
+ ASSERT_FALSE(client.isTransportWaiting());
+
+ server->shutdown();
+}
+
INSTANTIATE_TEST_CASE_P(BinderRpc, RpcTransportTest,
::testing::ValuesIn(RpcTransportTest::getRpcTranportTestParams()),
RpcTransportTest::PrintParamInfo);