blob: 73bb521ddbe082f778a9844885bcc6017d744fdd [file] [log] [blame]
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <GRPCVehicleHardware.h>
#include "ProtoMessageConverter.h"
#include <android-base/logging.h>
#include <grpc++/grpc++.h>
#include <utils/SystemClock.h>
#include <cstdlib>
#include <mutex>
#include <shared_mutex>
#include <utility>
namespace android::hardware::automotive::vehicle::virtualization {
namespace {
constexpr size_t MAX_RETRY_COUNT = 5;
std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
return ::grpc::InsecureChannelCredentials();
}
} // namespace
GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
: mServiceAddr(std::move(service_addr)),
mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
mGrpcStub(proto::VehicleServer::NewStub(mGrpcChannel)),
mValuePollingThread([this] { ValuePollingLoop(); }) {}
// Only used for unit testing.
GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,
bool startValuePollingLoop)
: mServiceAddr(""), mGrpcChannel(nullptr), mGrpcStub(std::move(stub)) {
if (startValuePollingLoop) {
mValuePollingThread = std::thread([this] { ValuePollingLoop(); });
}
}
GRPCVehicleHardware::~GRPCVehicleHardware() {
{
std::lock_guard lck(mShutdownMutex);
mShuttingDownFlag.store(true);
}
mShutdownCV.notify_all();
if (mValuePollingThread.joinable()) {
mValuePollingThread.join();
}
}
std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const {
std::vector<aidlvhal::VehiclePropConfig> configs;
::grpc::ClientContext context;
auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty());
proto::VehiclePropConfig protoConfig;
while (config_stream->Read(&protoConfig)) {
aidlvhal::VehiclePropConfig config;
proto_msg_converter::protoToAidl(protoConfig, &config);
configs.push_back(std::move(config));
}
auto grpc_status = config_stream->Finish();
if (!grpc_status.ok()) {
LOG(ERROR) << __func__
<< ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message();
}
return configs;
}
aidlvhal::StatusCode GRPCVehicleHardware::setValues(
std::shared_ptr<const SetValuesCallback> callback,
const std::vector<aidlvhal::SetValueRequest>& requests) {
::grpc::ClientContext context;
proto::VehiclePropValueRequests protoRequests;
proto::SetValueResults protoResults;
for (const auto& request : requests) {
auto& protoRequest = *protoRequests.add_requests();
protoRequest.set_request_id(request.requestId);
proto_msg_converter::aidlToProto(request.value, protoRequest.mutable_value());
}
// TODO(chenhaosjtuacm): Make it Async.
auto grpc_status = mGrpcStub->SetValues(&context, protoRequests, &protoResults);
if (!grpc_status.ok()) {
LOG(ERROR) << __func__ << ": GRPC SetValues Failed: " << grpc_status.error_message();
{
std::shared_lock lck(mCallbackMutex);
// TODO(chenhaosjtuacm): call on-set-error callback.
}
return aidlvhal::StatusCode::INTERNAL_ERROR;
}
std::vector<aidlvhal::SetValueResult> results;
for (const auto& protoResult : protoResults.results()) {
auto& result = results.emplace_back();
result.requestId = protoResult.request_id();
result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
// TODO(chenhaosjtuacm): call on-set-error callback.
}
(*callback)(std::move(results));
return aidlvhal::StatusCode::OK;
}
aidlvhal::StatusCode GRPCVehicleHardware::getValues(
std::shared_ptr<const GetValuesCallback> callback,
const std::vector<aidlvhal::GetValueRequest>& requests) const {
std::vector<aidlvhal::GetValueResult> results;
auto status = getValuesWithRetry(requests, &results, /*retryCount=*/0);
if (status != aidlvhal::StatusCode::OK) {
return status;
}
if (!results.empty()) {
(*callback)(std::move(results));
}
return status;
}
aidlvhal::StatusCode GRPCVehicleHardware::getValuesWithRetry(
const std::vector<aidlvhal::GetValueRequest>& requests,
std::vector<aidlvhal::GetValueResult>* results, size_t retryCount) const {
if (retryCount == MAX_RETRY_COUNT) {
LOG(ERROR) << __func__ << ": GRPC GetValues Failed, failed to get the latest value after "
<< retryCount << " retries";
return aidlvhal::StatusCode::TRY_AGAIN;
}
proto::VehiclePropValueRequests protoRequests;
std::unordered_map<int64_t, const aidlvhal::GetValueRequest*> requestById;
for (const auto& request : requests) {
auto& protoRequest = *protoRequests.add_requests();
protoRequest.set_request_id(request.requestId);
proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value());
requestById[request.requestId] = &request;
}
// TODO(chenhaosjtuacm): Make it Async.
::grpc::ClientContext context;
proto::GetValueResults protoResults;
auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults);
if (!grpc_status.ok()) {
LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message();
return aidlvhal::StatusCode::INTERNAL_ERROR;
}
std::vector<aidlvhal::GetValueRequest> retryRequests;
for (const auto& protoResult : protoResults.results()) {
int64_t requestId = protoResult.request_id();
auto it = requestById.find(requestId);
if (it == requestById.end()) {
LOG(ERROR) << __func__
<< "Invalid getValue request with unknown request ID: " << requestId
<< ", ignore";
continue;
}
if (!protoResult.has_value()) {
auto& result = results->emplace_back();
result.requestId = requestId;
result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
continue;
}
aidlvhal::VehiclePropValue value;
proto_msg_converter::protoToAidl(protoResult.value(), &value);
// VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to reset
// the timestamp.
// TODO(b/350822044): Remove this once we use timestamp from proxy server.
if (!setAndroidTimestamp(&value)) {
// This is a rare case when we receive a property update event reflecting a new value
// for the property before we receive the get value result. This means that the result
// is already outdated, hence we should retry getting the latest value again.
LOG(WARNING) << __func__ << "getValue result for propId: " << value.prop
<< " areaId: " << value.areaId << " is oudated, retry";
retryRequests.push_back(*(it->second));
continue;
}
auto& result = results->emplace_back();
result.requestId = requestId;
result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
result.prop = std::move(value);
}
if (retryRequests.size() != 0) {
return getValuesWithRetry(retryRequests, results, retryCount++);
}
return aidlvhal::StatusCode::OK;
}
bool GRPCVehicleHardware::setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const {
PropIdAreaId propIdAreaId = {
.propId = propValue->prop,
.areaId = propValue->areaId,
};
int64_t now = elapsedRealtimeNano();
int64_t externalTimestamp = propValue->timestamp;
{
std::lock_guard lck(mLatestUpdateTimestampsMutex);
auto it = mLatestUpdateTimestamps.find(propIdAreaId);
if (it == mLatestUpdateTimestamps.end() || externalTimestamp > (it->second).first) {
mLatestUpdateTimestamps[propIdAreaId].first = externalTimestamp;
mLatestUpdateTimestamps[propIdAreaId].second = now;
propValue->timestamp = now;
return true;
}
if (externalTimestamp == (it->second).first) {
propValue->timestamp = (it->second).second;
return true;
}
}
// externalTimestamp < (it->second).first, the value is outdated.
return false;
}
void GRPCVehicleHardware::registerOnPropertyChangeEvent(
std::unique_ptr<const PropertyChangeCallback> callback) {
std::lock_guard lck(mCallbackMutex);
if (mOnPropChange) {
LOG(ERROR) << __func__ << " must only be called once.";
return;
}
mOnPropChange = std::move(callback);
}
void GRPCVehicleHardware::registerOnPropertySetErrorEvent(
std::unique_ptr<const PropertySetErrorCallback> callback) {
std::lock_guard lck(mCallbackMutex);
if (mOnSetErr) {
LOG(ERROR) << __func__ << " must only be called once.";
return;
}
mOnSetErr = std::move(callback);
}
DumpResult GRPCVehicleHardware::dump(const std::vector<std::string>& options) {
::grpc::ClientContext context;
proto::DumpOptions protoDumpOptions;
proto::DumpResult protoDumpResult;
for (const auto& option : options) {
protoDumpOptions.add_options(option);
}
auto grpc_status = mGrpcStub->Dump(&context, protoDumpOptions, &protoDumpResult);
if (!grpc_status.ok()) {
LOG(ERROR) << __func__ << ": GRPC Dump Failed: " << grpc_status.error_message();
return {};
}
return {
.callerShouldDumpState = protoDumpResult.caller_should_dump_state(),
.buffer = protoDumpResult.buffer(),
};
}
aidlvhal::StatusCode GRPCVehicleHardware::checkHealth() {
::grpc::ClientContext context;
proto::VehicleHalCallStatus protoStatus;
auto grpc_status = mGrpcStub->CheckHealth(&context, ::google::protobuf::Empty(), &protoStatus);
if (!grpc_status.ok()) {
LOG(ERROR) << __func__ << ": GRPC CheckHealth Failed: " << grpc_status.error_message();
return aidlvhal::StatusCode::INTERNAL_ERROR;
}
return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
}
aidlvhal::StatusCode GRPCVehicleHardware::subscribe(aidlvhal::SubscribeOptions options) {
proto::SubscribeRequest request;
::grpc::ClientContext context;
proto::VehicleHalCallStatus protoStatus;
proto_msg_converter::aidlToProto(options, request.mutable_options());
auto grpc_status = mGrpcStub->Subscribe(&context, request, &protoStatus);
if (!grpc_status.ok()) {
if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) {
// This is a legacy sever. It should handle updateSampleRate.
LOG(INFO) << __func__ << ": GRPC Subscribe is not supported by the server";
return aidlvhal::StatusCode::OK;
}
LOG(ERROR) << __func__ << ": GRPC Subscribe Failed: " << grpc_status.error_message();
return aidlvhal::StatusCode::INTERNAL_ERROR;
}
return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
}
aidlvhal::StatusCode GRPCVehicleHardware::unsubscribe(int32_t propId, int32_t areaId) {
proto::UnsubscribeRequest request;
::grpc::ClientContext context;
proto::VehicleHalCallStatus protoStatus;
request.set_prop_id(propId);
request.set_area_id(areaId);
auto grpc_status = mGrpcStub->Unsubscribe(&context, request, &protoStatus);
if (!grpc_status.ok()) {
if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) {
// This is a legacy sever. Ignore unsubscribe request.
LOG(INFO) << __func__ << ": GRPC Unsubscribe is not supported by the server";
return aidlvhal::StatusCode::OK;
}
LOG(ERROR) << __func__ << ": GRPC Unsubscribe Failed: " << grpc_status.error_message();
return aidlvhal::StatusCode::INTERNAL_ERROR;
}
return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
}
aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t propId, int32_t areaId,
float sampleRate) {
::grpc::ClientContext context;
proto::UpdateSampleRateRequest request;
proto::VehicleHalCallStatus protoStatus;
request.set_prop(propId);
request.set_area_id(areaId);
request.set_sample_rate(sampleRate);
auto grpc_status = mGrpcStub->UpdateSampleRate(&context, request, &protoStatus);
if (!grpc_status.ok()) {
LOG(ERROR) << __func__ << ": GRPC UpdateSampleRate Failed: " << grpc_status.error_message();
return aidlvhal::StatusCode::INTERNAL_ERROR;
}
return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
}
bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) {
return mGrpcChannel->WaitForConnected(gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(waitTime.count(), GPR_TIMESPAN)));
}
void GRPCVehicleHardware::ValuePollingLoop() {
while (!mShuttingDownFlag.load()) {
pollValue();
// try to reconnect
}
}
void GRPCVehicleHardware::pollValue() {
::grpc::ClientContext context;
bool rpc_stopped{false};
std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
std::unique_lock<std::mutex> lck(mShutdownMutex);
mShutdownCV.wait(
lck, [this, &rpc_stopped]() { return rpc_stopped || mShuttingDownFlag.load(); });
context.TryCancel();
});
auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
proto::VehiclePropValues protoValues;
while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
std::vector<aidlvhal::VehiclePropValue> values;
for (const auto protoValue : protoValues.values()) {
aidlvhal::VehiclePropValue aidlValue = {};
proto_msg_converter::protoToAidl(protoValue, &aidlValue);
// VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to
// reset the timestamp.
// TODO(b/350822044): Remove this once we use timestamp from proxy server.
if (!setAndroidTimestamp(&aidlValue)) {
LOG(WARNING) << __func__ << ": property event for propId: " << aidlValue.prop
<< " areaId: " << aidlValue.areaId << " is outdated, ignore";
continue;
}
values.push_back(std::move(aidlValue));
}
if (values.empty()) {
continue;
}
std::shared_lock lck(mCallbackMutex);
if (mOnPropChange) {
(*mOnPropChange)(values);
}
}
{
std::lock_guard lck(mShutdownMutex);
rpc_stopped = true;
}
mShutdownCV.notify_all();
shuttingdown_watcher.join();
auto grpc_status = value_stream->Finish();
// never reach here until connection lost
LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
}
} // namespace android::hardware::automotive::vehicle::virtualization