Use AAOS side timestamp for VehiclePropValue.

Update the timestamp set by the host-side VHAL proxy server with
the Android-side timestamp at AAOS VHAL side. This makes sure that
we always expose VehiclePropValue that is perfectly synced with
Android elapsedRealtimeNano. This CL also adds logic to deal with
cases when a property update event or a get value result is outdated.

This CL updates the unit test to cover more cases and remove the flaky
test case that requires starting a local GRPC server.

Flag: EXEMPT HAL change
Test: atest GRPCVehicleHardwareUnitTest
Bug: 349678711
Change-Id: I5e2c07e77869f7286a438cb2a04d1b6c130c3c36
diff --git a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp
index f44573a..73bb521 100644
--- a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp
+++ b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp
@@ -20,6 +20,7 @@
 
 #include <android-base/logging.h>
 #include <grpc++/grpc++.h>
+#include <utils/SystemClock.h>
 
 #include <cstdlib>
 #include <mutex>
@@ -28,11 +29,16 @@
 
 namespace android::hardware::automotive::vehicle::virtualization {
 
-static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
-    // TODO(chenhaosjtuacm): get secured credentials here
+namespace {
+
+constexpr size_t MAX_RETRY_COUNT = 5;
+
+std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
     return ::grpc::InsecureChannelCredentials();
 }
 
+}  // namespace
+
 GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
     : mServiceAddr(std::move(service_addr)),
       mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
@@ -40,11 +46,13 @@
       mValuePollingThread([this] { ValuePollingLoop(); }) {}
 
 // Only used for unit testing.
-GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub)
-    : mServiceAddr(""),
-      mGrpcChannel(nullptr),
-      mGrpcStub(std::move(stub)),
-      mValuePollingThread([] {}) {}
+GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,
+                                         bool startValuePollingLoop)
+    : mServiceAddr(""), mGrpcChannel(nullptr), mGrpcStub(std::move(stub)) {
+    if (startValuePollingLoop) {
+        mValuePollingThread = std::thread([this] { ValuePollingLoop(); });
+    }
+}
 
 GRPCVehicleHardware::~GRPCVehicleHardware() {
     {
@@ -52,7 +60,9 @@
         mShuttingDownFlag.store(true);
     }
     mShutdownCV.notify_all();
-    mValuePollingThread.join();
+    if (mValuePollingThread.joinable()) {
+        mValuePollingThread.join();
+    }
 }
 
 std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const {
@@ -109,36 +119,117 @@
 aidlvhal::StatusCode GRPCVehicleHardware::getValues(
         std::shared_ptr<const GetValuesCallback> callback,
         const std::vector<aidlvhal::GetValueRequest>& requests) const {
-    ::grpc::ClientContext context;
+    std::vector<aidlvhal::GetValueResult> results;
+    auto status = getValuesWithRetry(requests, &results, /*retryCount=*/0);
+    if (status != aidlvhal::StatusCode::OK) {
+        return status;
+    }
+    if (!results.empty()) {
+        (*callback)(std::move(results));
+    }
+    return status;
+}
+
+aidlvhal::StatusCode GRPCVehicleHardware::getValuesWithRetry(
+        const std::vector<aidlvhal::GetValueRequest>& requests,
+        std::vector<aidlvhal::GetValueResult>* results, size_t retryCount) const {
+    if (retryCount == MAX_RETRY_COUNT) {
+        LOG(ERROR) << __func__ << ": GRPC GetValues Failed, failed to get the latest value after "
+                   << retryCount << " retries";
+        return aidlvhal::StatusCode::TRY_AGAIN;
+    }
+
     proto::VehiclePropValueRequests protoRequests;
-    proto::GetValueResults protoResults;
+    std::unordered_map<int64_t, const aidlvhal::GetValueRequest*> requestById;
     for (const auto& request : requests) {
         auto& protoRequest = *protoRequests.add_requests();
         protoRequest.set_request_id(request.requestId);
         proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value());
+        requestById[request.requestId] = &request;
     }
+
     // TODO(chenhaosjtuacm): Make it Async.
+    ::grpc::ClientContext context;
+    proto::GetValueResults protoResults;
     auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults);
     if (!grpc_status.ok()) {
         LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message();
         return aidlvhal::StatusCode::INTERNAL_ERROR;
     }
-    std::vector<aidlvhal::GetValueResult> results;
+
+    std::vector<aidlvhal::GetValueRequest> retryRequests;
     for (const auto& protoResult : protoResults.results()) {
-        auto& result = results.emplace_back();
-        result.requestId = protoResult.request_id();
-        result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
-        if (protoResult.has_value()) {
-            aidlvhal::VehiclePropValue value;
-            proto_msg_converter::protoToAidl(protoResult.value(), &value);
-            result.prop = std::move(value);
+        int64_t requestId = protoResult.request_id();
+        auto it = requestById.find(requestId);
+        if (it == requestById.end()) {
+            LOG(ERROR) << __func__
+                       << "Invalid getValue request with unknown request ID: " << requestId
+                       << ", ignore";
+            continue;
         }
+
+        if (!protoResult.has_value()) {
+            auto& result = results->emplace_back();
+            result.requestId = requestId;
+            result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
+            continue;
+        }
+
+        aidlvhal::VehiclePropValue value;
+        proto_msg_converter::protoToAidl(protoResult.value(), &value);
+
+        // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to reset
+        // the timestamp.
+        // TODO(b/350822044): Remove this once we use timestamp from proxy server.
+        if (!setAndroidTimestamp(&value)) {
+            // This is a rare case when we receive a property update event reflecting a new value
+            // for the property before we receive the get value result. This means that the result
+            // is already outdated, hence we should retry getting the latest value again.
+            LOG(WARNING) << __func__ << "getValue result for propId: " << value.prop
+                         << " areaId: " << value.areaId << " is oudated, retry";
+            retryRequests.push_back(*(it->second));
+            continue;
+        }
+
+        auto& result = results->emplace_back();
+        result.requestId = requestId;
+        result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
+        result.prop = std::move(value);
     }
-    (*callback)(std::move(results));
+
+    if (retryRequests.size() != 0) {
+        return getValuesWithRetry(retryRequests, results, retryCount++);
+    }
 
     return aidlvhal::StatusCode::OK;
 }
 
+bool GRPCVehicleHardware::setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const {
+    PropIdAreaId propIdAreaId = {
+            .propId = propValue->prop,
+            .areaId = propValue->areaId,
+    };
+    int64_t now = elapsedRealtimeNano();
+    int64_t externalTimestamp = propValue->timestamp;
+
+    {
+        std::lock_guard lck(mLatestUpdateTimestampsMutex);
+        auto it = mLatestUpdateTimestamps.find(propIdAreaId);
+        if (it == mLatestUpdateTimestamps.end() || externalTimestamp > (it->second).first) {
+            mLatestUpdateTimestamps[propIdAreaId].first = externalTimestamp;
+            mLatestUpdateTimestamps[propIdAreaId].second = now;
+            propValue->timestamp = now;
+            return true;
+        }
+        if (externalTimestamp == (it->second).first) {
+            propValue->timestamp = (it->second).second;
+            return true;
+        }
+    }
+    // externalTimestamp < (it->second).first, the value is outdated.
+    return false;
+}
+
 void GRPCVehicleHardware::registerOnPropertyChangeEvent(
         std::unique_ptr<const PropertyChangeCallback> callback) {
     std::lock_guard lck(mCallbackMutex);
@@ -248,46 +339,61 @@
 
 void GRPCVehicleHardware::ValuePollingLoop() {
     while (!mShuttingDownFlag.load()) {
-        ::grpc::ClientContext context;
-
-        bool rpc_stopped{false};
-        std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
-            std::unique_lock<std::mutex> lck(mShutdownMutex);
-            mShutdownCV.wait(lck, [this, &rpc_stopped]() {
-                return rpc_stopped || mShuttingDownFlag.load();
-            });
-            context.TryCancel();
-        });
-
-        auto value_stream =
-                mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
-        LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
-        proto::VehiclePropValues protoValues;
-        while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
-            std::vector<aidlvhal::VehiclePropValue> values;
-            for (const auto protoValue : protoValues.values()) {
-                values.push_back(aidlvhal::VehiclePropValue());
-                proto_msg_converter::protoToAidl(protoValue, &values.back());
-            }
-            std::shared_lock lck(mCallbackMutex);
-            if (mOnPropChange) {
-                (*mOnPropChange)(values);
-            }
-        }
-
-        {
-            std::lock_guard lck(mShutdownMutex);
-            rpc_stopped = true;
-        }
-        mShutdownCV.notify_all();
-        shuttingdown_watcher.join();
-
-        auto grpc_status = value_stream->Finish();
-        // never reach here until connection lost
-        LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
-
+        pollValue();
         // try to reconnect
     }
 }
 
+void GRPCVehicleHardware::pollValue() {
+    ::grpc::ClientContext context;
+
+    bool rpc_stopped{false};
+    std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
+        std::unique_lock<std::mutex> lck(mShutdownMutex);
+        mShutdownCV.wait(
+                lck, [this, &rpc_stopped]() { return rpc_stopped || mShuttingDownFlag.load(); });
+        context.TryCancel();
+    });
+
+    auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
+    LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
+    proto::VehiclePropValues protoValues;
+    while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
+        std::vector<aidlvhal::VehiclePropValue> values;
+        for (const auto protoValue : protoValues.values()) {
+            aidlvhal::VehiclePropValue aidlValue = {};
+            proto_msg_converter::protoToAidl(protoValue, &aidlValue);
+
+            // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to
+            // reset the timestamp.
+            // TODO(b/350822044): Remove this once we use timestamp from proxy server.
+            if (!setAndroidTimestamp(&aidlValue)) {
+                LOG(WARNING) << __func__ << ": property event for propId: " << aidlValue.prop
+                             << " areaId: " << aidlValue.areaId << " is outdated, ignore";
+                continue;
+            }
+
+            values.push_back(std::move(aidlValue));
+        }
+        if (values.empty()) {
+            continue;
+        }
+        std::shared_lock lck(mCallbackMutex);
+        if (mOnPropChange) {
+            (*mOnPropChange)(values);
+        }
+    }
+
+    {
+        std::lock_guard lck(mShutdownMutex);
+        rpc_stopped = true;
+    }
+    mShutdownCV.notify_all();
+    shuttingdown_watcher.join();
+
+    auto grpc_status = value_stream->Finish();
+    // never reach here until connection lost
+    LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
+}
+
 }  // namespace android::hardware::automotive::vehicle::virtualization