blob: 83a1618e9f78e1c1a9529820b602570bddd2087a [file] [log] [blame]
Steven Moreland5553ac42020-11-11 02:14:45 +00001/*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "RpcConnection"
18
19#include <binder/RpcConnection.h>
20
21#include <binder/Parcel.h>
22#include <binder/Stability.h>
23
24#include "RpcState.h"
25#include "RpcWireFormat.h"
26
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <sys/un.h>
30#include <unistd.h>
31
32#if defined(__GLIBC__)
33extern "C" pid_t gettid();
34#endif
35
36namespace android {
37
38using base::unique_fd;
39
40RpcConnection::RpcConnection() {
41 LOG_RPC_DETAIL("RpcConnection created %p", this);
42
43 mState = std::make_unique<RpcState>();
44}
45RpcConnection::~RpcConnection() {
46 LOG_RPC_DETAIL("RpcConnection destroyed %p", this);
47}
48
49sp<RpcConnection> RpcConnection::make() {
50 return new RpcConnection;
51}
52
53bool RpcConnection::setupUnixDomainServer(const char* path) {
54 LOG_ALWAYS_FATAL_IF(mServer.get() != -1, "Only supports one server now");
55
56 unique_fd serverFd(TEMP_FAILURE_RETRY(socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)));
57 if (serverFd == -1) {
58 ALOGE("Could not create socket at %s: %s", path, strerror(errno));
59 return false;
60 }
61
62 struct sockaddr_un addr = {
63 .sun_family = AF_UNIX,
64 };
65
66 unsigned int pathLen = strlen(path) + 1;
67 LOG_ALWAYS_FATAL_IF(pathLen > sizeof(addr.sun_path), "%u", pathLen);
68 memcpy(addr.sun_path, path, pathLen);
69
70 if (0 != TEMP_FAILURE_RETRY(bind(serverFd.get(), (struct sockaddr*)&addr, sizeof(addr)))) {
71 ALOGE("Could not bind socket at %s: %s", path, strerror(errno));
72 return false;
73 }
74
75 if (0 != TEMP_FAILURE_RETRY(listen(serverFd.get(), 1 /*backlog*/))) {
76 ALOGE("Could not listen socket at %s: %s", path, strerror(errno));
77 return false;
78 }
79
80 mServer = std::move(serverFd);
81 return true;
82}
83
84bool RpcConnection::addUnixDomainClient(const char* path) {
85 LOG_RPC_DETAIL("Connecting on path: %s", path);
86
87 unique_fd serverFd(TEMP_FAILURE_RETRY(socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)));
88 if (serverFd == -1) {
89 ALOGE("Could not create socket at %s: %s", path, strerror(errno));
90 return false;
91 }
92
93 struct sockaddr_un addr = {
94 .sun_family = AF_UNIX,
95 };
96
97 unsigned int pathLen = strlen(path) + 1;
98 LOG_ALWAYS_FATAL_IF(pathLen > sizeof(addr.sun_path), "%u", pathLen);
99 memcpy(addr.sun_path, path, pathLen);
100
101 if (0 != TEMP_FAILURE_RETRY(connect(serverFd.get(), (struct sockaddr*)&addr, sizeof(addr)))) {
102 ALOGE("Could not connect socket at %s: %s", path, strerror(errno));
103 return false;
104 }
105
106 LOG_RPC_DETAIL("Unix domain client with fd %d", serverFd.get());
107
108 addClient(std::move(serverFd));
109 return true;
110}
111
112sp<IBinder> RpcConnection::getRootObject() {
113 ExclusiveSocket socket(this, SocketUse::CLIENT);
114 return state()->getRootObject(socket.fd(), this);
115}
116
117status_t RpcConnection::transact(const RpcAddress& address, uint32_t code, const Parcel& data,
118 Parcel* reply, uint32_t flags) {
119 ExclusiveSocket socket(this,
120 (flags & IBinder::FLAG_ONEWAY) ? SocketUse::CLIENT_ASYNC
121 : SocketUse::CLIENT);
122 return state()->transact(socket.fd(), address, code, data, this, reply, flags);
123}
124
125status_t RpcConnection::sendDecStrong(const RpcAddress& address) {
126 ExclusiveSocket socket(this, SocketUse::CLIENT_REFCOUNT);
127 return state()->sendDecStrong(socket.fd(), address);
128}
129
130void RpcConnection::join() {
131 // establish a connection
132 {
133 struct sockaddr_un clientSa;
134 socklen_t clientSaLen = sizeof(clientSa);
135
136 unique_fd clientFd(TEMP_FAILURE_RETRY(
137 accept4(mServer.get(), (struct sockaddr*)&clientSa, &clientSaLen, SOCK_CLOEXEC)));
138 if (clientFd < 0) {
139 // If this log becomes confusing, should save more state from setupUnixDomainServer
140 // in order to output here.
141 ALOGE("Could not accept4 socket: %s", strerror(errno));
142 return;
143 }
144
145 LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
146
147 addServer(std::move(clientFd));
148 }
149
150 // We may not use the connection we just established (two threads might
151 // establish connections for each other), but for now, just use one
152 // server/socket connection.
153 ExclusiveSocket socket(this, SocketUse::SERVER);
154
155 while (true) {
156 status_t error = state()->getAndExecuteCommand(socket.fd(), this);
157
158 if (error != OK) {
159 ALOGI("Binder socket thread closing w/ status %s", statusToString(error).c_str());
160 return;
161 }
162 }
163}
164
165void RpcConnection::setForServer(const wp<RpcServer>& server) {
166 mForServer = server;
167}
168
169wp<RpcServer> RpcConnection::server() {
170 return mForServer;
171}
172
173void RpcConnection::addClient(base::unique_fd&& fd) {
174 std::lock_guard<std::mutex> _l(mSocketMutex);
175 sp<ConnectionSocket> connection = new ConnectionSocket();
176 connection->fd = std::move(fd);
177 mClients.push_back(connection);
178}
179
180void RpcConnection::addServer(base::unique_fd&& fd) {
181 std::lock_guard<std::mutex> _l(mSocketMutex);
182 sp<ConnectionSocket> connection = new ConnectionSocket();
183 connection->fd = std::move(fd);
184 mServers.push_back(connection);
185}
186
187RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connection, SocketUse use)
188 : mConnection(connection) {
189 pid_t tid = gettid();
190 std::unique_lock<std::mutex> _l(mConnection->mSocketMutex);
191
192 mConnection->mWaitingThreads++;
193 while (true) {
194 sp<ConnectionSocket> exclusive;
195 sp<ConnectionSocket> available;
196
197 // CHECK FOR DEDICATED CLIENT SOCKET
198 //
199 // A server/looper should always use a dedicated connection.
200 if (use != SocketUse::SERVER) {
201 findSocket(tid, &exclusive, &available, mConnection->mClients,
202 mConnection->mClientsOffset);
203
204 // WARNING: this assumes a server cannot request its client to send
205 // a transaction, as mServers is excluded below.
206 //
207 // Imagine we have more than one thread in play, and a single thread
208 // sends a synchronous, then an asynchronous command. Imagine the
209 // asynchronous command is sent on the first client socket. Then, if
210 // we naively send a synchronous command to that same socket, the
211 // thread on the far side might be busy processing the asynchronous
212 // command. So, we move to considering the second available thread
213 // for subsequent calls.
214 if (use == SocketUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) {
215 mConnection->mClientsOffset =
216 (mConnection->mClientsOffset + 1) % mConnection->mClients.size();
217 }
218 }
219
220 // USE SERVING SOCKET (to start serving or for nested transaction)
221 //
222 // asynchronous calls cannot be nested
223 if (use != SocketUse::CLIENT_ASYNC) {
224 // servers should start serving on an available thread only
225 // otherwise, this should only be a nested call
226 bool useAvailable = use == SocketUse::SERVER;
227
228 findSocket(tid, &exclusive, (useAvailable ? &available : nullptr),
229 mConnection->mServers, 0 /* index hint */);
230 }
231
232 // if our thread is already using a connection, prioritize using that
233 if (exclusive != nullptr) {
234 mSocket = exclusive;
235 mReentrant = true;
236 break;
237 } else if (available != nullptr) {
238 mSocket = available;
239 mSocket->exclusiveTid = tid;
240 break;
241 }
242
243 LOG_ALWAYS_FATAL_IF(use == SocketUse::SERVER, "Must create connection to join one.");
244
245 // in regular binder, this would usually be a deadlock :)
246 LOG_ALWAYS_FATAL_IF(mConnection->mClients.size() == 0,
247 "Not a client of any connection. You must create a connection to an "
248 "RPC server to make any non-nested (e.g. oneway or on another thread) "
249 "calls.");
250
251 LOG_RPC_DETAIL("No available connection (have %zu clients and %zu servers). Waiting...",
252 mConnection->mClients.size(), mConnection->mServers.size());
253 mConnection->mSocketCv.wait(_l);
254 }
255 mConnection->mWaitingThreads--;
256}
257
258void RpcConnection::ExclusiveSocket::findSocket(pid_t tid, sp<ConnectionSocket>* exclusive,
259 sp<ConnectionSocket>* available,
260 std::vector<sp<ConnectionSocket>>& sockets,
261 size_t socketsIndexHint) {
262 LOG_ALWAYS_FATAL_IF(sockets.size() > 0 && socketsIndexHint >= sockets.size(),
263 "Bad index %zu >= %zu", socketsIndexHint, sockets.size());
264
265 if (*exclusive != nullptr) return; // consistent with break below
266
267 for (size_t i = 0; i < sockets.size(); i++) {
268 sp<ConnectionSocket>& socket = sockets[(i + socketsIndexHint) % sockets.size()];
269
270 // take first available connection (intuition = caching)
271 if (available && *available == nullptr && socket->exclusiveTid == std::nullopt) {
272 *available = socket;
273 continue;
274 }
275
276 // though, prefer to take connection which is already inuse by this thread
277 // (nested transactions)
278 if (exclusive && socket->exclusiveTid == tid) {
279 *exclusive = socket;
280 break; // consistent with return above
281 }
282 }
283}
284
285RpcConnection::ExclusiveSocket::~ExclusiveSocket() {
286 // reentrant use of a connection means something less deep in the call stack
287 // is using this fd, and it retains the right to it. So, we don't give up
288 // exclusive ownership, and no thread is freed.
289 if (!mReentrant) {
290 std::unique_lock<std::mutex> _l(mConnection->mSocketMutex);
291 mSocket->exclusiveTid = std::nullopt;
292 if (mConnection->mWaitingThreads > 0) {
293 _l.unlock();
294 mConnection->mSocketCv.notify_one();
295 }
296 }
297}
298
299} // namespace android