blob: 2e5d2e46c315d7833708b23267061f29319a8294 [file] [log] [blame]
Hao Chen6cb86892023-04-10 15:21:59 -07001/*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <GRPCVehicleHardware.h>
18
19#include "ProtoMessageConverter.h"
20
21#include <android-base/logging.h>
22#include <grpc++/grpc++.h>
23
24#include <cstdlib>
25#include <mutex>
26#include <shared_mutex>
27#include <utility>
28
29namespace android::hardware::automotive::vehicle::virtualization {
30
31static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
32 // TODO(chenhaosjtuacm): get secured credentials here
33 return ::grpc::InsecureChannelCredentials();
34}
35
36GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
37 : mServiceAddr(std::move(service_addr)),
38 mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
39 mGrpcStub(proto::VehicleServer::NewStub(mGrpcChannel)),
40 mValuePollingThread([this] { ValuePollingLoop(); }) {}
41
Yu Shan5c846f72024-05-16 15:39:51 -070042// Only used for unit testing.
43GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub)
44 : mServiceAddr(""),
45 mGrpcChannel(nullptr),
46 mGrpcStub(std::move(stub)),
47 mValuePollingThread([] {}) {}
48
Hao Chen6cb86892023-04-10 15:21:59 -070049GRPCVehicleHardware::~GRPCVehicleHardware() {
50 {
51 std::lock_guard lck(mShutdownMutex);
52 mShuttingDownFlag.store(true);
53 }
54 mShutdownCV.notify_all();
55 mValuePollingThread.join();
56}
57
58std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const {
59 std::vector<aidlvhal::VehiclePropConfig> configs;
60 ::grpc::ClientContext context;
61 auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty());
62 proto::VehiclePropConfig protoConfig;
63 while (config_stream->Read(&protoConfig)) {
64 aidlvhal::VehiclePropConfig config;
65 proto_msg_converter::protoToAidl(protoConfig, &config);
66 configs.push_back(std::move(config));
67 }
68 auto grpc_status = config_stream->Finish();
69 if (!grpc_status.ok()) {
70 LOG(ERROR) << __func__
71 << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message();
72 }
73 return configs;
74}
75
76aidlvhal::StatusCode GRPCVehicleHardware::setValues(
77 std::shared_ptr<const SetValuesCallback> callback,
78 const std::vector<aidlvhal::SetValueRequest>& requests) {
79 ::grpc::ClientContext context;
80 proto::VehiclePropValueRequests protoRequests;
81 proto::SetValueResults protoResults;
82 for (const auto& request : requests) {
83 auto& protoRequest = *protoRequests.add_requests();
84 protoRequest.set_request_id(request.requestId);
85 proto_msg_converter::aidlToProto(request.value, protoRequest.mutable_value());
86 }
87 // TODO(chenhaosjtuacm): Make it Async.
88 auto grpc_status = mGrpcStub->SetValues(&context, protoRequests, &protoResults);
89 if (!grpc_status.ok()) {
90 LOG(ERROR) << __func__ << ": GRPC SetValues Failed: " << grpc_status.error_message();
91 {
92 std::shared_lock lck(mCallbackMutex);
93 // TODO(chenhaosjtuacm): call on-set-error callback.
94 }
95 return aidlvhal::StatusCode::INTERNAL_ERROR;
96 }
97 std::vector<aidlvhal::SetValueResult> results;
98 for (const auto& protoResult : protoResults.results()) {
99 auto& result = results.emplace_back();
100 result.requestId = protoResult.request_id();
101 result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
102 // TODO(chenhaosjtuacm): call on-set-error callback.
103 }
104 (*callback)(std::move(results));
105
106 return aidlvhal::StatusCode::OK;
107}
108
109aidlvhal::StatusCode GRPCVehicleHardware::getValues(
110 std::shared_ptr<const GetValuesCallback> callback,
111 const std::vector<aidlvhal::GetValueRequest>& requests) const {
112 ::grpc::ClientContext context;
113 proto::VehiclePropValueRequests protoRequests;
114 proto::GetValueResults protoResults;
115 for (const auto& request : requests) {
116 auto& protoRequest = *protoRequests.add_requests();
117 protoRequest.set_request_id(request.requestId);
118 proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value());
119 }
120 // TODO(chenhaosjtuacm): Make it Async.
121 auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults);
122 if (!grpc_status.ok()) {
123 LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message();
124 return aidlvhal::StatusCode::INTERNAL_ERROR;
125 }
126 std::vector<aidlvhal::GetValueResult> results;
127 for (const auto& protoResult : protoResults.results()) {
128 auto& result = results.emplace_back();
129 result.requestId = protoResult.request_id();
130 result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
131 if (protoResult.has_value()) {
132 aidlvhal::VehiclePropValue value;
133 proto_msg_converter::protoToAidl(protoResult.value(), &value);
134 result.prop = std::move(value);
135 }
136 }
137 (*callback)(std::move(results));
138
139 return aidlvhal::StatusCode::OK;
140}
141
142void GRPCVehicleHardware::registerOnPropertyChangeEvent(
143 std::unique_ptr<const PropertyChangeCallback> callback) {
144 std::lock_guard lck(mCallbackMutex);
145 if (mOnPropChange) {
146 LOG(ERROR) << __func__ << " must only be called once.";
147 return;
148 }
149 mOnPropChange = std::move(callback);
150}
151
152void GRPCVehicleHardware::registerOnPropertySetErrorEvent(
153 std::unique_ptr<const PropertySetErrorCallback> callback) {
154 std::lock_guard lck(mCallbackMutex);
155 if (mOnSetErr) {
156 LOG(ERROR) << __func__ << " must only be called once.";
157 return;
158 }
159 mOnSetErr = std::move(callback);
160}
161
Hao Chena810fb22023-04-11 15:27:44 -0700162DumpResult GRPCVehicleHardware::dump(const std::vector<std::string>& options) {
163 ::grpc::ClientContext context;
164 proto::DumpOptions protoDumpOptions;
165 proto::DumpResult protoDumpResult;
166 for (const auto& option : options) {
167 protoDumpOptions.add_options(option);
168 }
169 auto grpc_status = mGrpcStub->Dump(&context, protoDumpOptions, &protoDumpResult);
170 if (!grpc_status.ok()) {
171 LOG(ERROR) << __func__ << ": GRPC Dump Failed: " << grpc_status.error_message();
172 return {};
173 }
174 return {
175 .callerShouldDumpState = protoDumpResult.caller_should_dump_state(),
176 .buffer = protoDumpResult.buffer(),
177 };
Hao Chen6cb86892023-04-10 15:21:59 -0700178}
179
180aidlvhal::StatusCode GRPCVehicleHardware::checkHealth() {
Hao Chena810fb22023-04-11 15:27:44 -0700181 ::grpc::ClientContext context;
182 proto::VehicleHalCallStatus protoStatus;
183 auto grpc_status = mGrpcStub->CheckHealth(&context, ::google::protobuf::Empty(), &protoStatus);
184 if (!grpc_status.ok()) {
185 LOG(ERROR) << __func__ << ": GRPC CheckHealth Failed: " << grpc_status.error_message();
186 return aidlvhal::StatusCode::INTERNAL_ERROR;
187 }
188 return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
Hao Chen6cb86892023-04-10 15:21:59 -0700189}
190
Yu Shan5c846f72024-05-16 15:39:51 -0700191aidlvhal::StatusCode GRPCVehicleHardware::subscribe(aidlvhal::SubscribeOptions options) {
192 proto::SubscribeRequest request;
193 ::grpc::ClientContext context;
194 proto::VehicleHalCallStatus protoStatus;
195 proto_msg_converter::aidlToProto(options, request.mutable_options());
196 auto grpc_status = mGrpcStub->Subscribe(&context, request, &protoStatus);
197 if (!grpc_status.ok()) {
198 if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) {
199 // This is a legacy sever. It should handle updateSampleRate.
200 LOG(INFO) << __func__ << ": GRPC Subscribe is not supported by the server";
201 return aidlvhal::StatusCode::OK;
202 }
203 LOG(ERROR) << __func__ << ": GRPC Subscribe Failed: " << grpc_status.error_message();
204 return aidlvhal::StatusCode::INTERNAL_ERROR;
205 }
206 return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
207}
208
Hao Chena810fb22023-04-11 15:27:44 -0700209aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t propId, int32_t areaId,
210 float sampleRate) {
211 ::grpc::ClientContext context;
212 proto::UpdateSampleRateRequest request;
213 proto::VehicleHalCallStatus protoStatus;
214 request.set_prop(propId);
215 request.set_area_id(areaId);
216 request.set_sample_rate(sampleRate);
217 auto grpc_status = mGrpcStub->UpdateSampleRate(&context, request, &protoStatus);
218 if (!grpc_status.ok()) {
219 LOG(ERROR) << __func__ << ": GRPC UpdateSampleRate Failed: " << grpc_status.error_message();
220 return aidlvhal::StatusCode::INTERNAL_ERROR;
221 }
222 return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
Hao Chen6cb86892023-04-10 15:21:59 -0700223}
224
225bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) {
226 return mGrpcChannel->WaitForConnected(gpr_time_add(
227 gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(waitTime.count(), GPR_TIMESPAN)));
228}
229
230void GRPCVehicleHardware::ValuePollingLoop() {
231 while (!mShuttingDownFlag.load()) {
232 ::grpc::ClientContext context;
233
234 bool rpc_stopped{false};
235 std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
236 std::unique_lock<std::mutex> lck(mShutdownMutex);
237 mShutdownCV.wait(lck, [this, &rpc_stopped]() {
238 return rpc_stopped || mShuttingDownFlag.load();
239 });
240 context.TryCancel();
241 });
242
243 auto value_stream =
244 mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
245 LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
246 proto::VehiclePropValues protoValues;
247 while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
248 std::vector<aidlvhal::VehiclePropValue> values;
249 for (const auto protoValue : protoValues.values()) {
250 values.push_back(aidlvhal::VehiclePropValue());
251 proto_msg_converter::protoToAidl(protoValue, &values.back());
252 }
253 std::shared_lock lck(mCallbackMutex);
254 if (mOnPropChange) {
255 (*mOnPropChange)(values);
256 }
257 }
258
259 {
260 std::lock_guard lck(mShutdownMutex);
261 rpc_stopped = true;
262 }
263 mShutdownCV.notify_all();
264 shuttingdown_watcher.join();
265
266 auto grpc_status = value_stream->Finish();
267 // never reach here until connection lost
268 LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
269
270 // try to reconnect
271 }
272}
273
274} // namespace android::hardware::automotive::vehicle::virtualization