| /* |
| * Copyright (C) 2023 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <GRPCVehicleHardware.h> |
| |
| #include "ProtoMessageConverter.h" |
| |
| #include <android-base/logging.h> |
| #include <grpc++/grpc++.h> |
| #include <utils/SystemClock.h> |
| |
| #include <cstdlib> |
| #include <mutex> |
| #include <shared_mutex> |
| #include <utility> |
| |
| namespace android::hardware::automotive::vehicle::virtualization { |
| |
| namespace { |
| |
| constexpr size_t MAX_RETRY_COUNT = 5; |
| |
| std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() { |
| return ::grpc::InsecureChannelCredentials(); |
| } |
| |
| } // namespace |
| |
| GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr) |
| : mServiceAddr(std::move(service_addr)), |
| mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())), |
| mGrpcStub(proto::VehicleServer::NewStub(mGrpcChannel)), |
| mValuePollingThread([this] { ValuePollingLoop(); }) {} |
| |
| // Only used for unit testing. |
| GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub, |
| bool startValuePollingLoop) |
| : mServiceAddr(""), mGrpcChannel(nullptr), mGrpcStub(std::move(stub)) { |
| if (startValuePollingLoop) { |
| mValuePollingThread = std::thread([this] { ValuePollingLoop(); }); |
| } |
| } |
| |
| GRPCVehicleHardware::~GRPCVehicleHardware() { |
| { |
| std::lock_guard lck(mShutdownMutex); |
| mShuttingDownFlag.store(true); |
| } |
| mShutdownCV.notify_all(); |
| if (mValuePollingThread.joinable()) { |
| mValuePollingThread.join(); |
| } |
| } |
| |
| std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const { |
| std::vector<aidlvhal::VehiclePropConfig> configs; |
| ::grpc::ClientContext context; |
| auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty()); |
| proto::VehiclePropConfig protoConfig; |
| while (config_stream->Read(&protoConfig)) { |
| aidlvhal::VehiclePropConfig config; |
| proto_msg_converter::protoToAidl(protoConfig, &config); |
| configs.push_back(std::move(config)); |
| } |
| auto grpc_status = config_stream->Finish(); |
| if (!grpc_status.ok()) { |
| LOG(ERROR) << __func__ |
| << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message(); |
| } |
| return configs; |
| } |
| |
| aidlvhal::StatusCode GRPCVehicleHardware::setValues( |
| std::shared_ptr<const SetValuesCallback> callback, |
| const std::vector<aidlvhal::SetValueRequest>& requests) { |
| ::grpc::ClientContext context; |
| proto::VehiclePropValueRequests protoRequests; |
| proto::SetValueResults protoResults; |
| for (const auto& request : requests) { |
| auto& protoRequest = *protoRequests.add_requests(); |
| protoRequest.set_request_id(request.requestId); |
| proto_msg_converter::aidlToProto(request.value, protoRequest.mutable_value()); |
| } |
| // TODO(chenhaosjtuacm): Make it Async. |
| auto grpc_status = mGrpcStub->SetValues(&context, protoRequests, &protoResults); |
| if (!grpc_status.ok()) { |
| LOG(ERROR) << __func__ << ": GRPC SetValues Failed: " << grpc_status.error_message(); |
| { |
| std::shared_lock lck(mCallbackMutex); |
| // TODO(chenhaosjtuacm): call on-set-error callback. |
| } |
| return aidlvhal::StatusCode::INTERNAL_ERROR; |
| } |
| std::vector<aidlvhal::SetValueResult> results; |
| for (const auto& protoResult : protoResults.results()) { |
| auto& result = results.emplace_back(); |
| result.requestId = protoResult.request_id(); |
| result.status = static_cast<aidlvhal::StatusCode>(protoResult.status()); |
| // TODO(chenhaosjtuacm): call on-set-error callback. |
| } |
| (*callback)(std::move(results)); |
| |
| return aidlvhal::StatusCode::OK; |
| } |
| |
| aidlvhal::StatusCode GRPCVehicleHardware::getValues( |
| std::shared_ptr<const GetValuesCallback> callback, |
| const std::vector<aidlvhal::GetValueRequest>& requests) const { |
| std::vector<aidlvhal::GetValueResult> results; |
| auto status = getValuesWithRetry(requests, &results, /*retryCount=*/0); |
| if (status != aidlvhal::StatusCode::OK) { |
| return status; |
| } |
| if (!results.empty()) { |
| (*callback)(std::move(results)); |
| } |
| return status; |
| } |
| |
| aidlvhal::StatusCode GRPCVehicleHardware::getValuesWithRetry( |
| const std::vector<aidlvhal::GetValueRequest>& requests, |
| std::vector<aidlvhal::GetValueResult>* results, size_t retryCount) const { |
| if (retryCount == MAX_RETRY_COUNT) { |
| LOG(ERROR) << __func__ << ": GRPC GetValues Failed, failed to get the latest value after " |
| << retryCount << " retries"; |
| return aidlvhal::StatusCode::TRY_AGAIN; |
| } |
| |
| proto::VehiclePropValueRequests protoRequests; |
| std::unordered_map<int64_t, const aidlvhal::GetValueRequest*> requestById; |
| for (const auto& request : requests) { |
| auto& protoRequest = *protoRequests.add_requests(); |
| protoRequest.set_request_id(request.requestId); |
| proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value()); |
| requestById[request.requestId] = &request; |
| } |
| |
| // TODO(chenhaosjtuacm): Make it Async. |
| ::grpc::ClientContext context; |
| proto::GetValueResults protoResults; |
| auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults); |
| if (!grpc_status.ok()) { |
| LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message(); |
| return aidlvhal::StatusCode::INTERNAL_ERROR; |
| } |
| |
| std::vector<aidlvhal::GetValueRequest> retryRequests; |
| for (const auto& protoResult : protoResults.results()) { |
| int64_t requestId = protoResult.request_id(); |
| auto it = requestById.find(requestId); |
| if (it == requestById.end()) { |
| LOG(ERROR) << __func__ |
| << "Invalid getValue request with unknown request ID: " << requestId |
| << ", ignore"; |
| continue; |
| } |
| |
| if (!protoResult.has_value()) { |
| auto& result = results->emplace_back(); |
| result.requestId = requestId; |
| result.status = static_cast<aidlvhal::StatusCode>(protoResult.status()); |
| continue; |
| } |
| |
| aidlvhal::VehiclePropValue value; |
| proto_msg_converter::protoToAidl(protoResult.value(), &value); |
| |
| // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to reset |
| // the timestamp. |
| // TODO(b/350822044): Remove this once we use timestamp from proxy server. |
| if (!setAndroidTimestamp(&value)) { |
| // This is a rare case when we receive a property update event reflecting a new value |
| // for the property before we receive the get value result. This means that the result |
| // is already outdated, hence we should retry getting the latest value again. |
| LOG(WARNING) << __func__ << "getValue result for propId: " << value.prop |
| << " areaId: " << value.areaId << " is oudated, retry"; |
| retryRequests.push_back(*(it->second)); |
| continue; |
| } |
| |
| auto& result = results->emplace_back(); |
| result.requestId = requestId; |
| result.status = static_cast<aidlvhal::StatusCode>(protoResult.status()); |
| result.prop = std::move(value); |
| } |
| |
| if (retryRequests.size() != 0) { |
| return getValuesWithRetry(retryRequests, results, retryCount++); |
| } |
| |
| return aidlvhal::StatusCode::OK; |
| } |
| |
| bool GRPCVehicleHardware::setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const { |
| PropIdAreaId propIdAreaId = { |
| .propId = propValue->prop, |
| .areaId = propValue->areaId, |
| }; |
| int64_t now = elapsedRealtimeNano(); |
| int64_t externalTimestamp = propValue->timestamp; |
| |
| { |
| std::lock_guard lck(mLatestUpdateTimestampsMutex); |
| auto it = mLatestUpdateTimestamps.find(propIdAreaId); |
| if (it == mLatestUpdateTimestamps.end() || externalTimestamp > (it->second).first) { |
| mLatestUpdateTimestamps[propIdAreaId].first = externalTimestamp; |
| mLatestUpdateTimestamps[propIdAreaId].second = now; |
| propValue->timestamp = now; |
| return true; |
| } |
| if (externalTimestamp == (it->second).first) { |
| propValue->timestamp = (it->second).second; |
| return true; |
| } |
| } |
| // externalTimestamp < (it->second).first, the value is outdated. |
| return false; |
| } |
| |
| void GRPCVehicleHardware::registerOnPropertyChangeEvent( |
| std::unique_ptr<const PropertyChangeCallback> callback) { |
| std::lock_guard lck(mCallbackMutex); |
| if (mOnPropChange) { |
| LOG(ERROR) << __func__ << " must only be called once."; |
| return; |
| } |
| mOnPropChange = std::move(callback); |
| } |
| |
| void GRPCVehicleHardware::registerOnPropertySetErrorEvent( |
| std::unique_ptr<const PropertySetErrorCallback> callback) { |
| std::lock_guard lck(mCallbackMutex); |
| if (mOnSetErr) { |
| LOG(ERROR) << __func__ << " must only be called once."; |
| return; |
| } |
| mOnSetErr = std::move(callback); |
| } |
| |
| DumpResult GRPCVehicleHardware::dump(const std::vector<std::string>& options) { |
| ::grpc::ClientContext context; |
| proto::DumpOptions protoDumpOptions; |
| proto::DumpResult protoDumpResult; |
| for (const auto& option : options) { |
| protoDumpOptions.add_options(option); |
| } |
| auto grpc_status = mGrpcStub->Dump(&context, protoDumpOptions, &protoDumpResult); |
| if (!grpc_status.ok()) { |
| LOG(ERROR) << __func__ << ": GRPC Dump Failed: " << grpc_status.error_message(); |
| return {}; |
| } |
| return { |
| .callerShouldDumpState = protoDumpResult.caller_should_dump_state(), |
| .buffer = protoDumpResult.buffer(), |
| }; |
| } |
| |
| aidlvhal::StatusCode GRPCVehicleHardware::checkHealth() { |
| ::grpc::ClientContext context; |
| proto::VehicleHalCallStatus protoStatus; |
| auto grpc_status = mGrpcStub->CheckHealth(&context, ::google::protobuf::Empty(), &protoStatus); |
| if (!grpc_status.ok()) { |
| LOG(ERROR) << __func__ << ": GRPC CheckHealth Failed: " << grpc_status.error_message(); |
| return aidlvhal::StatusCode::INTERNAL_ERROR; |
| } |
| return static_cast<aidlvhal::StatusCode>(protoStatus.status_code()); |
| } |
| |
| aidlvhal::StatusCode GRPCVehicleHardware::subscribe(aidlvhal::SubscribeOptions options) { |
| proto::SubscribeRequest request; |
| ::grpc::ClientContext context; |
| proto::VehicleHalCallStatus protoStatus; |
| proto_msg_converter::aidlToProto(options, request.mutable_options()); |
| auto grpc_status = mGrpcStub->Subscribe(&context, request, &protoStatus); |
| if (!grpc_status.ok()) { |
| if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) { |
| // This is a legacy sever. It should handle updateSampleRate. |
| LOG(INFO) << __func__ << ": GRPC Subscribe is not supported by the server"; |
| return aidlvhal::StatusCode::OK; |
| } |
| LOG(ERROR) << __func__ << ": GRPC Subscribe Failed: " << grpc_status.error_message(); |
| return aidlvhal::StatusCode::INTERNAL_ERROR; |
| } |
| return static_cast<aidlvhal::StatusCode>(protoStatus.status_code()); |
| } |
| |
| aidlvhal::StatusCode GRPCVehicleHardware::unsubscribe(int32_t propId, int32_t areaId) { |
| proto::UnsubscribeRequest request; |
| ::grpc::ClientContext context; |
| proto::VehicleHalCallStatus protoStatus; |
| request.set_prop_id(propId); |
| request.set_area_id(areaId); |
| auto grpc_status = mGrpcStub->Unsubscribe(&context, request, &protoStatus); |
| if (!grpc_status.ok()) { |
| if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) { |
| // This is a legacy sever. Ignore unsubscribe request. |
| LOG(INFO) << __func__ << ": GRPC Unsubscribe is not supported by the server"; |
| return aidlvhal::StatusCode::OK; |
| } |
| LOG(ERROR) << __func__ << ": GRPC Unsubscribe Failed: " << grpc_status.error_message(); |
| return aidlvhal::StatusCode::INTERNAL_ERROR; |
| } |
| return static_cast<aidlvhal::StatusCode>(protoStatus.status_code()); |
| } |
| |
| aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t propId, int32_t areaId, |
| float sampleRate) { |
| ::grpc::ClientContext context; |
| proto::UpdateSampleRateRequest request; |
| proto::VehicleHalCallStatus protoStatus; |
| request.set_prop(propId); |
| request.set_area_id(areaId); |
| request.set_sample_rate(sampleRate); |
| auto grpc_status = mGrpcStub->UpdateSampleRate(&context, request, &protoStatus); |
| if (!grpc_status.ok()) { |
| LOG(ERROR) << __func__ << ": GRPC UpdateSampleRate Failed: " << grpc_status.error_message(); |
| return aidlvhal::StatusCode::INTERNAL_ERROR; |
| } |
| return static_cast<aidlvhal::StatusCode>(protoStatus.status_code()); |
| } |
| |
| bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) { |
| return mGrpcChannel->WaitForConnected(gpr_time_add( |
| gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(waitTime.count(), GPR_TIMESPAN))); |
| } |
| |
| void GRPCVehicleHardware::ValuePollingLoop() { |
| while (!mShuttingDownFlag.load()) { |
| pollValue(); |
| // try to reconnect |
| } |
| } |
| |
| void GRPCVehicleHardware::pollValue() { |
| ::grpc::ClientContext context; |
| |
| bool rpc_stopped{false}; |
| std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() { |
| std::unique_lock<std::mutex> lck(mShutdownMutex); |
| mShutdownCV.wait( |
| lck, [this, &rpc_stopped]() { return rpc_stopped || mShuttingDownFlag.load(); }); |
| context.TryCancel(); |
| }); |
| |
| auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty()); |
| LOG(INFO) << __func__ << ": GRPC Value Streaming Started"; |
| proto::VehiclePropValues protoValues; |
| while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) { |
| std::vector<aidlvhal::VehiclePropValue> values; |
| for (const auto protoValue : protoValues.values()) { |
| aidlvhal::VehiclePropValue aidlValue = {}; |
| proto_msg_converter::protoToAidl(protoValue, &aidlValue); |
| |
| // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to |
| // reset the timestamp. |
| // TODO(b/350822044): Remove this once we use timestamp from proxy server. |
| if (!setAndroidTimestamp(&aidlValue)) { |
| LOG(WARNING) << __func__ << ": property event for propId: " << aidlValue.prop |
| << " areaId: " << aidlValue.areaId << " is outdated, ignore"; |
| continue; |
| } |
| |
| values.push_back(std::move(aidlValue)); |
| } |
| if (values.empty()) { |
| continue; |
| } |
| std::shared_lock lck(mCallbackMutex); |
| if (mOnPropChange) { |
| (*mOnPropChange)(values); |
| } |
| } |
| |
| { |
| std::lock_guard lck(mShutdownMutex); |
| rpc_stopped = true; |
| } |
| mShutdownCV.notify_all(); |
| shuttingdown_watcher.join(); |
| |
| auto grpc_status = value_stream->Finish(); |
| // never reach here until connection lost |
| LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message(); |
| } |
| |
| } // namespace android::hardware::automotive::vehicle::virtualization |