blob: eb98af04fe2261711dab59c0e1bd1dfd94032df3 [file] [log] [blame]
Hao Chen32d46702023-04-10 15:59:50 -07001/*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "GRPCVehicleProxyServer.h"
18
19#include "ProtoMessageConverter.h"
20
21#include <grpc++/grpc++.h>
22
23#include <android-base/logging.h>
24
25#include <algorithm>
26#include <condition_variable>
27#include <mutex>
28#include <unordered_set>
29#include <utility>
30#include <vector>
31
32namespace android::hardware::automotive::vehicle::virtualization {
33
34std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0};
35
36static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
37 // TODO(chenhaosjtuacm): get secured credentials here
38 return ::grpc::InsecureServerCredentials();
39}
40
41GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr,
42 std::unique_ptr<IVehicleHardware>&& hardware)
Yu Shanb02b7722024-06-18 17:38:11 -070043 : GrpcVehicleProxyServer(std::vector<std::string>({serverAddr}), std::move(hardware)){};
44
45GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::vector<std::string> serverAddrs,
46 std::unique_ptr<IVehicleHardware>&& hardware)
47 : mServiceAddrs(std::move(serverAddrs)), mHardware(std::move(hardware)) {
Hao Chen32d46702023-04-10 15:59:50 -070048 mHardware->registerOnPropertyChangeEvent(
49 std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
50 [this](std::vector<aidlvhal::VehiclePropValue> values) {
51 OnVehiclePropChange(values);
52 }));
53}
54
55::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig(
56 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
57 ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) {
58 for (const auto& config : mHardware->getAllPropertyConfigs()) {
59 proto::VehiclePropConfig protoConfig;
60 proto_msg_converter::aidlToProto(config, &protoConfig);
61 if (!stream->Write(protoConfig)) {
62 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
63 }
64 }
65 return ::grpc::Status::OK;
66}
67
68::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context,
69 const proto::VehiclePropValueRequests* requests,
70 proto::SetValueResults* results) {
71 std::vector<aidlvhal::SetValueRequest> aidlRequests;
72 for (const auto& protoRequest : requests->requests()) {
73 auto& aidlRequest = aidlRequests.emplace_back();
74 aidlRequest.requestId = protoRequest.request_id();
75 proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value);
76 }
77 auto waitMtx = std::make_shared<std::mutex>();
78 auto waitCV = std::make_shared<std::condition_variable>();
79 auto complete = std::make_shared<bool>(false);
80 auto tmpResults = std::make_shared<proto::SetValueResults>();
81 auto aidlStatus = mHardware->setValues(
82 std::make_shared<const IVehicleHardware::SetValuesCallback>(
83 [waitMtx, waitCV, complete,
84 tmpResults](std::vector<aidlvhal::SetValueResult> setValueResults) {
85 for (const auto& aidlResult : setValueResults) {
86 auto& protoResult = *tmpResults->add_results();
87 protoResult.set_request_id(aidlResult.requestId);
88 protoResult.set_status(
89 static_cast<proto::StatusCode>(aidlResult.status));
90 }
91 {
92 std::lock_guard lck(*waitMtx);
93 *complete = true;
94 }
95 waitCV->notify_all();
96 }),
97 aidlRequests);
98 if (aidlStatus != aidlvhal::StatusCode::OK) {
99 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
100 "The underlying hardware fails to set values, VHAL status: " +
101 toString(aidlStatus));
102 }
103 std::unique_lock lck(*waitMtx);
104 bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
105 if (!success) {
106 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
107 "The underlying hardware set values timeout.");
108 }
109 *results = std::move(*tmpResults);
110 return ::grpc::Status::OK;
111}
112
113::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context,
114 const proto::VehiclePropValueRequests* requests,
115 proto::GetValueResults* results) {
116 std::vector<aidlvhal::GetValueRequest> aidlRequests;
117 for (const auto& protoRequest : requests->requests()) {
118 auto& aidlRequest = aidlRequests.emplace_back();
119 aidlRequest.requestId = protoRequest.request_id();
120 proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop);
121 }
122 auto waitMtx = std::make_shared<std::mutex>();
123 auto waitCV = std::make_shared<std::condition_variable>();
124 auto complete = std::make_shared<bool>(false);
125 auto tmpResults = std::make_shared<proto::GetValueResults>();
126 auto aidlStatus = mHardware->getValues(
127 std::make_shared<const IVehicleHardware::GetValuesCallback>(
128 [waitMtx, waitCV, complete,
129 tmpResults](std::vector<aidlvhal::GetValueResult> getValueResults) {
130 for (const auto& aidlResult : getValueResults) {
131 auto& protoResult = *tmpResults->add_results();
132 protoResult.set_request_id(aidlResult.requestId);
133 protoResult.set_status(
134 static_cast<proto::StatusCode>(aidlResult.status));
135 if (aidlResult.prop) {
136 auto* valuePtr = protoResult.mutable_value();
137 proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
138 }
139 }
140 {
141 std::lock_guard lck(*waitMtx);
142 *complete = true;
143 }
144 waitCV->notify_all();
145 }),
146 aidlRequests);
147 if (aidlStatus != aidlvhal::StatusCode::OK) {
148 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
149 "The underlying hardware fails to get values, VHAL status: " +
150 toString(aidlStatus));
151 }
152 std::unique_lock lck(*waitMtx);
153 bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
154 if (!success) {
155 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
156 "The underlying hardware get values timeout.");
157 }
158 *results = std::move(*tmpResults);
159 return ::grpc::Status::OK;
160}
161
Hao Chena810fb22023-04-11 15:27:44 -0700162::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate(
163 ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request,
164 proto::VehicleHalCallStatus* status) {
165 const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(),
166 request->sample_rate());
167 status->set_status_code(static_cast<proto::StatusCode>(status_code));
168 return ::grpc::Status::OK;
169}
170
Yu Shan5c846f72024-05-16 15:39:51 -0700171::grpc::Status GrpcVehicleProxyServer::Subscribe(::grpc::ServerContext* context,
172 const proto::SubscribeRequest* request,
173 proto::VehicleHalCallStatus* status) {
174 const auto& protoSubscribeOptions = request->options();
175 aidlvhal::SubscribeOptions aidlSubscribeOptions = {};
176 proto_msg_converter::protoToAidl(protoSubscribeOptions, &aidlSubscribeOptions);
177 const auto status_code = mHardware->subscribe(aidlSubscribeOptions);
178 status->set_status_code(static_cast<proto::StatusCode>(status_code));
179 return ::grpc::Status::OK;
180}
181
Yu Shan2c37c112024-05-20 17:27:19 -0700182::grpc::Status GrpcVehicleProxyServer::Unsubscribe(::grpc::ServerContext* context,
183 const proto::UnsubscribeRequest* request,
184 proto::VehicleHalCallStatus* status) {
185 int32_t propId = request->prop_id();
186 int32_t areaId = request->area_id();
187 const auto status_code = mHardware->unsubscribe(propId, areaId);
188 status->set_status_code(static_cast<proto::StatusCode>(status_code));
189 return ::grpc::Status::OK;
190}
191
Hao Chena810fb22023-04-11 15:27:44 -0700192::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context,
193 const ::google::protobuf::Empty*,
194 proto::VehicleHalCallStatus* status) {
195 status->set_status_code(static_cast<proto::StatusCode>(mHardware->checkHealth()));
196 return ::grpc::Status::OK;
197}
198
199::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context,
200 const proto::DumpOptions* options,
201 proto::DumpResult* result) {
202 std::vector<std::string> dumpOptionStrings(options->options().begin(),
203 options->options().end());
204 auto dumpResult = mHardware->dump(dumpOptionStrings);
205 result->set_caller_should_dump_state(dumpResult.callerShouldDumpState);
206 result->set_buffer(dumpResult.buffer);
207 return ::grpc::Status::OK;
208}
209
Hao Chen32d46702023-04-10 15:59:50 -0700210::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream(
211 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
212 ::grpc::ServerWriter<proto::VehiclePropValues>* stream) {
213 auto conn = std::make_shared<ConnectionDescriptor>(stream);
214 {
215 std::lock_guard lck(mConnectionMutex);
216 mValueStreamingConnections.push_back(conn);
217 }
218 conn->Wait();
219 LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID();
220 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
221}
222
223void GrpcVehicleProxyServer::OnVehiclePropChange(
224 const std::vector<aidlvhal::VehiclePropValue>& values) {
225 std::unordered_set<uint64_t> brokenConn;
226 proto::VehiclePropValues protoValues;
227 for (const auto& value : values) {
228 auto* protoValuePtr = protoValues.add_values();
229 proto_msg_converter::aidlToProto(value, protoValuePtr);
230 }
231 {
232 std::shared_lock read_lock(mConnectionMutex);
233 for (auto& connection : mValueStreamingConnections) {
234 auto writeOK = connection->Write(protoValues);
235 if (!writeOK) {
236 LOG(ERROR) << __func__
237 << ": Server Write failed, connection lost. ID: " << connection->ID();
238 brokenConn.insert(connection->ID());
239 }
240 }
241 }
242 if (brokenConn.empty()) {
243 return;
244 }
245 std::unique_lock write_lock(mConnectionMutex);
246 mValueStreamingConnections.erase(
247 std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(),
248 [&brokenConn](const auto& conn) {
249 return brokenConn.find(conn->ID()) != brokenConn.end();
250 }),
251 mValueStreamingConnections.end());
252}
253
254GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() {
255 if (mServer) {
256 LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started.";
257 return *this;
258 }
259 ::grpc::ServerBuilder builder;
260 builder.RegisterService(this);
Yu Shanb02b7722024-06-18 17:38:11 -0700261 for (const std::string& serviceAddr : mServiceAddrs) {
262 builder.AddListeningPort(serviceAddr, getServerCredentials());
263 }
Hao Chen32d46702023-04-10 15:59:50 -0700264 mServer = builder.BuildAndStart();
265 CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
266 << "please make sure the configuration and permissions are correct";
267 return *this;
268}
269
270GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() {
271 std::shared_lock read_lock(mConnectionMutex);
272 for (auto& conn : mValueStreamingConnections) {
273 conn->Shutdown();
274 }
275 if (mServer) {
276 mServer->Shutdown();
277 }
278 return *this;
279}
280
281void GrpcVehicleProxyServer::Wait() {
282 if (mServer) {
283 mServer->Wait();
284 }
285 mServer.reset();
286}
287
288GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() {
289 Shutdown();
290}
291
292bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) {
293 if (!mStream) {
294 LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID();
295 Shutdown();
296 return false;
297 }
298 {
299 std::lock_guard lck(*mMtx);
300 if (!mShutdownFlag && mStream->Write(values)) {
301 return true;
302 } else {
303 LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID();
304 }
305 }
306 Shutdown();
307 return false;
308}
309
310void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() {
311 std::unique_lock lck(*mMtx);
312 mCV->wait(lck, [this] { return mShutdownFlag; });
313}
314
315void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() {
316 {
317 std::lock_guard lck(*mMtx);
318 mShutdownFlag = true;
319 }
320 mCV->notify_all();
321}
322
323} // namespace android::hardware::automotive::vehicle::virtualization