blob: d7cbe1b0172096f4910f5e41dd5791ff8429935b [file] [log] [blame]
Hao Chen32d46702023-04-10 15:59:50 -07001/*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "GRPCVehicleProxyServer.h"
18
19#include "ProtoMessageConverter.h"
20
21#include <grpc++/grpc++.h>
22
23#include <android-base/logging.h>
24
25#include <algorithm>
26#include <condition_variable>
27#include <mutex>
28#include <unordered_set>
29#include <utility>
30#include <vector>
31
32namespace android::hardware::automotive::vehicle::virtualization {
33
34std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0};
35
36static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
37 // TODO(chenhaosjtuacm): get secured credentials here
38 return ::grpc::InsecureServerCredentials();
39}
40
41GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr,
42 std::unique_ptr<IVehicleHardware>&& hardware)
Yu Shanb02b7722024-06-18 17:38:11 -070043 : GrpcVehicleProxyServer(std::vector<std::string>({serverAddr}), std::move(hardware)){};
44
45GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::vector<std::string> serverAddrs,
46 std::unique_ptr<IVehicleHardware>&& hardware)
47 : mServiceAddrs(std::move(serverAddrs)), mHardware(std::move(hardware)) {
Hao Chen32d46702023-04-10 15:59:50 -070048 mHardware->registerOnPropertyChangeEvent(
49 std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
50 [this](std::vector<aidlvhal::VehiclePropValue> values) {
51 OnVehiclePropChange(values);
52 }));
53}
54
55::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig(
56 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
57 ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) {
58 for (const auto& config : mHardware->getAllPropertyConfigs()) {
59 proto::VehiclePropConfig protoConfig;
60 proto_msg_converter::aidlToProto(config, &protoConfig);
61 if (!stream->Write(protoConfig)) {
62 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
63 }
64 }
65 return ::grpc::Status::OK;
66}
67
68::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context,
69 const proto::VehiclePropValueRequests* requests,
70 proto::SetValueResults* results) {
71 std::vector<aidlvhal::SetValueRequest> aidlRequests;
Yu Shanb561e442024-07-11 15:11:29 -070072 std::unordered_set<int64_t> requestIds;
Hao Chen32d46702023-04-10 15:59:50 -070073 for (const auto& protoRequest : requests->requests()) {
74 auto& aidlRequest = aidlRequests.emplace_back();
Yu Shanb561e442024-07-11 15:11:29 -070075 int64_t requestId = protoRequest.request_id();
76 aidlRequest.requestId = requestId;
Hao Chen32d46702023-04-10 15:59:50 -070077 proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value);
Yu Shanb561e442024-07-11 15:11:29 -070078 requestIds.insert(requestId);
Hao Chen32d46702023-04-10 15:59:50 -070079 }
80 auto waitMtx = std::make_shared<std::mutex>();
81 auto waitCV = std::make_shared<std::condition_variable>();
82 auto complete = std::make_shared<bool>(false);
83 auto tmpResults = std::make_shared<proto::SetValueResults>();
84 auto aidlStatus = mHardware->setValues(
85 std::make_shared<const IVehicleHardware::SetValuesCallback>(
Yu Shanb561e442024-07-11 15:11:29 -070086 [waitMtx, waitCV, complete, tmpResults,
87 &requestIds](std::vector<aidlvhal::SetValueResult> setValueResults) {
88 bool receivedAllResults = false;
Hao Chen32d46702023-04-10 15:59:50 -070089 {
90 std::lock_guard lck(*waitMtx);
Yu Shanb561e442024-07-11 15:11:29 -070091 for (const auto& aidlResult : setValueResults) {
92 auto& protoResult = *tmpResults->add_results();
93 int64_t requestIdForResult = aidlResult.requestId;
94 protoResult.set_request_id(requestIdForResult);
95 protoResult.set_status(
96 static_cast<proto::StatusCode>(aidlResult.status));
97 requestIds.erase(requestIdForResult);
98 }
99 if (requestIds.empty()) {
100 receivedAllResults = true;
101 *complete = true;
102 }
Hao Chen32d46702023-04-10 15:59:50 -0700103 }
Yu Shanb561e442024-07-11 15:11:29 -0700104 if (receivedAllResults) {
105 waitCV->notify_all();
106 }
Hao Chen32d46702023-04-10 15:59:50 -0700107 }),
108 aidlRequests);
109 if (aidlStatus != aidlvhal::StatusCode::OK) {
110 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
111 "The underlying hardware fails to set values, VHAL status: " +
112 toString(aidlStatus));
113 }
114 std::unique_lock lck(*waitMtx);
115 bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
116 if (!success) {
117 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
118 "The underlying hardware set values timeout.");
119 }
120 *results = std::move(*tmpResults);
121 return ::grpc::Status::OK;
122}
123
124::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context,
125 const proto::VehiclePropValueRequests* requests,
126 proto::GetValueResults* results) {
127 std::vector<aidlvhal::GetValueRequest> aidlRequests;
Yu Shanb561e442024-07-11 15:11:29 -0700128 std::unordered_set<int64_t> requestIds;
Hao Chen32d46702023-04-10 15:59:50 -0700129 for (const auto& protoRequest : requests->requests()) {
130 auto& aidlRequest = aidlRequests.emplace_back();
Yu Shanb561e442024-07-11 15:11:29 -0700131 int64_t requestId = protoRequest.request_id();
132 aidlRequest.requestId = requestId;
Hao Chen32d46702023-04-10 15:59:50 -0700133 proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop);
Yu Shanb561e442024-07-11 15:11:29 -0700134 requestIds.insert(requestId);
Hao Chen32d46702023-04-10 15:59:50 -0700135 }
136 auto waitMtx = std::make_shared<std::mutex>();
137 auto waitCV = std::make_shared<std::condition_variable>();
138 auto complete = std::make_shared<bool>(false);
139 auto tmpResults = std::make_shared<proto::GetValueResults>();
140 auto aidlStatus = mHardware->getValues(
141 std::make_shared<const IVehicleHardware::GetValuesCallback>(
Yu Shanb561e442024-07-11 15:11:29 -0700142 [waitMtx, waitCV, complete, tmpResults,
143 &requestIds](std::vector<aidlvhal::GetValueResult> getValueResults) {
144 bool receivedAllResults = false;
Hao Chen32d46702023-04-10 15:59:50 -0700145 {
146 std::lock_guard lck(*waitMtx);
Yu Shanb561e442024-07-11 15:11:29 -0700147 for (const auto& aidlResult : getValueResults) {
148 auto& protoResult = *tmpResults->add_results();
149 int64_t requestIdForResult = aidlResult.requestId;
150 protoResult.set_request_id(requestIdForResult);
151 protoResult.set_status(
152 static_cast<proto::StatusCode>(aidlResult.status));
153 if (aidlResult.prop) {
154 auto* valuePtr = protoResult.mutable_value();
155 proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
156 }
157 requestIds.erase(requestIdForResult);
158 }
159 if (requestIds.empty()) {
160 receivedAllResults = true;
161 *complete = true;
162 }
Hao Chen32d46702023-04-10 15:59:50 -0700163 }
Yu Shanb561e442024-07-11 15:11:29 -0700164 if (receivedAllResults) {
165 waitCV->notify_all();
166 }
Hao Chen32d46702023-04-10 15:59:50 -0700167 }),
168 aidlRequests);
169 if (aidlStatus != aidlvhal::StatusCode::OK) {
170 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
171 "The underlying hardware fails to get values, VHAL status: " +
172 toString(aidlStatus));
173 }
174 std::unique_lock lck(*waitMtx);
175 bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
176 if (!success) {
177 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
178 "The underlying hardware get values timeout.");
179 }
180 *results = std::move(*tmpResults);
181 return ::grpc::Status::OK;
182}
183
Hao Chena810fb22023-04-11 15:27:44 -0700184::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate(
185 ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request,
186 proto::VehicleHalCallStatus* status) {
187 const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(),
188 request->sample_rate());
189 status->set_status_code(static_cast<proto::StatusCode>(status_code));
190 return ::grpc::Status::OK;
191}
192
Yu Shan5c846f72024-05-16 15:39:51 -0700193::grpc::Status GrpcVehicleProxyServer::Subscribe(::grpc::ServerContext* context,
194 const proto::SubscribeRequest* request,
195 proto::VehicleHalCallStatus* status) {
196 const auto& protoSubscribeOptions = request->options();
197 aidlvhal::SubscribeOptions aidlSubscribeOptions = {};
198 proto_msg_converter::protoToAidl(protoSubscribeOptions, &aidlSubscribeOptions);
199 const auto status_code = mHardware->subscribe(aidlSubscribeOptions);
200 status->set_status_code(static_cast<proto::StatusCode>(status_code));
201 return ::grpc::Status::OK;
202}
203
Yu Shan2c37c112024-05-20 17:27:19 -0700204::grpc::Status GrpcVehicleProxyServer::Unsubscribe(::grpc::ServerContext* context,
205 const proto::UnsubscribeRequest* request,
206 proto::VehicleHalCallStatus* status) {
207 int32_t propId = request->prop_id();
208 int32_t areaId = request->area_id();
209 const auto status_code = mHardware->unsubscribe(propId, areaId);
210 status->set_status_code(static_cast<proto::StatusCode>(status_code));
211 return ::grpc::Status::OK;
212}
213
Hao Chena810fb22023-04-11 15:27:44 -0700214::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context,
215 const ::google::protobuf::Empty*,
216 proto::VehicleHalCallStatus* status) {
217 status->set_status_code(static_cast<proto::StatusCode>(mHardware->checkHealth()));
218 return ::grpc::Status::OK;
219}
220
221::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context,
222 const proto::DumpOptions* options,
223 proto::DumpResult* result) {
224 std::vector<std::string> dumpOptionStrings(options->options().begin(),
225 options->options().end());
226 auto dumpResult = mHardware->dump(dumpOptionStrings);
227 result->set_caller_should_dump_state(dumpResult.callerShouldDumpState);
228 result->set_buffer(dumpResult.buffer);
229 return ::grpc::Status::OK;
230}
231
Hao Chen32d46702023-04-10 15:59:50 -0700232::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream(
233 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
234 ::grpc::ServerWriter<proto::VehiclePropValues>* stream) {
235 auto conn = std::make_shared<ConnectionDescriptor>(stream);
236 {
237 std::lock_guard lck(mConnectionMutex);
238 mValueStreamingConnections.push_back(conn);
239 }
240 conn->Wait();
241 LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID();
242 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
243}
244
245void GrpcVehicleProxyServer::OnVehiclePropChange(
246 const std::vector<aidlvhal::VehiclePropValue>& values) {
247 std::unordered_set<uint64_t> brokenConn;
248 proto::VehiclePropValues protoValues;
249 for (const auto& value : values) {
250 auto* protoValuePtr = protoValues.add_values();
251 proto_msg_converter::aidlToProto(value, protoValuePtr);
252 }
253 {
254 std::shared_lock read_lock(mConnectionMutex);
255 for (auto& connection : mValueStreamingConnections) {
256 auto writeOK = connection->Write(protoValues);
257 if (!writeOK) {
258 LOG(ERROR) << __func__
259 << ": Server Write failed, connection lost. ID: " << connection->ID();
260 brokenConn.insert(connection->ID());
261 }
262 }
263 }
264 if (brokenConn.empty()) {
265 return;
266 }
267 std::unique_lock write_lock(mConnectionMutex);
268 mValueStreamingConnections.erase(
269 std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(),
270 [&brokenConn](const auto& conn) {
271 return brokenConn.find(conn->ID()) != brokenConn.end();
272 }),
273 mValueStreamingConnections.end());
274}
275
276GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() {
277 if (mServer) {
278 LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started.";
279 return *this;
280 }
281 ::grpc::ServerBuilder builder;
282 builder.RegisterService(this);
Yu Shanb02b7722024-06-18 17:38:11 -0700283 for (const std::string& serviceAddr : mServiceAddrs) {
284 builder.AddListeningPort(serviceAddr, getServerCredentials());
285 }
Hao Chen32d46702023-04-10 15:59:50 -0700286 mServer = builder.BuildAndStart();
287 CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
288 << "please make sure the configuration and permissions are correct";
289 return *this;
290}
291
292GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() {
293 std::shared_lock read_lock(mConnectionMutex);
294 for (auto& conn : mValueStreamingConnections) {
295 conn->Shutdown();
296 }
297 if (mServer) {
298 mServer->Shutdown();
299 }
300 return *this;
301}
302
303void GrpcVehicleProxyServer::Wait() {
304 if (mServer) {
305 mServer->Wait();
306 }
307 mServer.reset();
308}
309
310GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() {
311 Shutdown();
312}
313
314bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) {
315 if (!mStream) {
316 LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID();
317 Shutdown();
318 return false;
319 }
320 {
321 std::lock_guard lck(*mMtx);
322 if (!mShutdownFlag && mStream->Write(values)) {
323 return true;
324 } else {
325 LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID();
326 }
327 }
328 Shutdown();
329 return false;
330}
331
332void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() {
333 std::unique_lock lck(*mMtx);
334 mCV->wait(lck, [this] { return mShutdownFlag; });
335}
336
337void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() {
338 {
339 std::lock_guard lck(*mMtx);
340 mShutdownFlag = true;
341 }
342 mCV->notify_all();
343}
344
345} // namespace android::hardware::automotive::vehicle::virtualization