Merge "Use AAOS side timestamp for VehiclePropValue." into main
diff --git a/automotive/vehicle/TEST_MAPPING b/automotive/vehicle/TEST_MAPPING
index 77629a9..d848774 100644
--- a/automotive/vehicle/TEST_MAPPING
+++ b/automotive/vehicle/TEST_MAPPING
@@ -45,6 +45,9 @@
       "name": "FakeVehicleHardwareTest"
     },
     {
+      "name": "GRPCVehicleHardwareUnitTest"
+    },
+    {
       "name": "CarServiceUnitTest",
       "options" : [
         {
diff --git a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp
index f44573a..73bb521 100644
--- a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp
+++ b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp
@@ -20,6 +20,7 @@
 
 #include <android-base/logging.h>
 #include <grpc++/grpc++.h>
+#include <utils/SystemClock.h>
 
 #include <cstdlib>
 #include <mutex>
@@ -28,11 +29,16 @@
 
 namespace android::hardware::automotive::vehicle::virtualization {
 
-static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
-    // TODO(chenhaosjtuacm): get secured credentials here
+namespace {
+
+constexpr size_t MAX_RETRY_COUNT = 5;
+
+std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
     return ::grpc::InsecureChannelCredentials();
 }
 
+}  // namespace
+
 GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
     : mServiceAddr(std::move(service_addr)),
       mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
@@ -40,11 +46,13 @@
       mValuePollingThread([this] { ValuePollingLoop(); }) {}
 
 // Only used for unit testing.
-GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub)
-    : mServiceAddr(""),
-      mGrpcChannel(nullptr),
-      mGrpcStub(std::move(stub)),
-      mValuePollingThread([] {}) {}
+GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,
+                                         bool startValuePollingLoop)
+    : mServiceAddr(""), mGrpcChannel(nullptr), mGrpcStub(std::move(stub)) {
+    if (startValuePollingLoop) {
+        mValuePollingThread = std::thread([this] { ValuePollingLoop(); });
+    }
+}
 
 GRPCVehicleHardware::~GRPCVehicleHardware() {
     {
@@ -52,7 +60,9 @@
         mShuttingDownFlag.store(true);
     }
     mShutdownCV.notify_all();
-    mValuePollingThread.join();
+    if (mValuePollingThread.joinable()) {
+        mValuePollingThread.join();
+    }
 }
 
 std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const {
@@ -109,36 +119,117 @@
 aidlvhal::StatusCode GRPCVehicleHardware::getValues(
         std::shared_ptr<const GetValuesCallback> callback,
         const std::vector<aidlvhal::GetValueRequest>& requests) const {
-    ::grpc::ClientContext context;
+    std::vector<aidlvhal::GetValueResult> results;
+    auto status = getValuesWithRetry(requests, &results, /*retryCount=*/0);
+    if (status != aidlvhal::StatusCode::OK) {
+        return status;
+    }
+    if (!results.empty()) {
+        (*callback)(std::move(results));
+    }
+    return status;
+}
+
+aidlvhal::StatusCode GRPCVehicleHardware::getValuesWithRetry(
+        const std::vector<aidlvhal::GetValueRequest>& requests,
+        std::vector<aidlvhal::GetValueResult>* results, size_t retryCount) const {
+    if (retryCount == MAX_RETRY_COUNT) {
+        LOG(ERROR) << __func__ << ": GRPC GetValues Failed, failed to get the latest value after "
+                   << retryCount << " retries";
+        return aidlvhal::StatusCode::TRY_AGAIN;
+    }
+
     proto::VehiclePropValueRequests protoRequests;
-    proto::GetValueResults protoResults;
+    std::unordered_map<int64_t, const aidlvhal::GetValueRequest*> requestById;
     for (const auto& request : requests) {
         auto& protoRequest = *protoRequests.add_requests();
         protoRequest.set_request_id(request.requestId);
         proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value());
+        requestById[request.requestId] = &request;
     }
+
     // TODO(chenhaosjtuacm): Make it Async.
+    ::grpc::ClientContext context;
+    proto::GetValueResults protoResults;
     auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults);
     if (!grpc_status.ok()) {
         LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message();
         return aidlvhal::StatusCode::INTERNAL_ERROR;
     }
-    std::vector<aidlvhal::GetValueResult> results;
+
+    std::vector<aidlvhal::GetValueRequest> retryRequests;
     for (const auto& protoResult : protoResults.results()) {
-        auto& result = results.emplace_back();
-        result.requestId = protoResult.request_id();
-        result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
-        if (protoResult.has_value()) {
-            aidlvhal::VehiclePropValue value;
-            proto_msg_converter::protoToAidl(protoResult.value(), &value);
-            result.prop = std::move(value);
+        int64_t requestId = protoResult.request_id();
+        auto it = requestById.find(requestId);
+        if (it == requestById.end()) {
+            LOG(ERROR) << __func__
+                       << "Invalid getValue request with unknown request ID: " << requestId
+                       << ", ignore";
+            continue;
         }
+
+        if (!protoResult.has_value()) {
+            auto& result = results->emplace_back();
+            result.requestId = requestId;
+            result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
+            continue;
+        }
+
+        aidlvhal::VehiclePropValue value;
+        proto_msg_converter::protoToAidl(protoResult.value(), &value);
+
+        // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to reset
+        // the timestamp.
+        // TODO(b/350822044): Remove this once we use timestamp from proxy server.
+        if (!setAndroidTimestamp(&value)) {
+            // This is a rare case when we receive a property update event reflecting a new value
+            // for the property before we receive the get value result. This means that the result
+            // is already outdated, hence we should retry getting the latest value again.
+            LOG(WARNING) << __func__ << "getValue result for propId: " << value.prop
+                         << " areaId: " << value.areaId << " is oudated, retry";
+            retryRequests.push_back(*(it->second));
+            continue;
+        }
+
+        auto& result = results->emplace_back();
+        result.requestId = requestId;
+        result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
+        result.prop = std::move(value);
     }
-    (*callback)(std::move(results));
+
+    if (retryRequests.size() != 0) {
+        return getValuesWithRetry(retryRequests, results, retryCount++);
+    }
 
     return aidlvhal::StatusCode::OK;
 }
 
+bool GRPCVehicleHardware::setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const {
+    PropIdAreaId propIdAreaId = {
+            .propId = propValue->prop,
+            .areaId = propValue->areaId,
+    };
+    int64_t now = elapsedRealtimeNano();
+    int64_t externalTimestamp = propValue->timestamp;
+
+    {
+        std::lock_guard lck(mLatestUpdateTimestampsMutex);
+        auto it = mLatestUpdateTimestamps.find(propIdAreaId);
+        if (it == mLatestUpdateTimestamps.end() || externalTimestamp > (it->second).first) {
+            mLatestUpdateTimestamps[propIdAreaId].first = externalTimestamp;
+            mLatestUpdateTimestamps[propIdAreaId].second = now;
+            propValue->timestamp = now;
+            return true;
+        }
+        if (externalTimestamp == (it->second).first) {
+            propValue->timestamp = (it->second).second;
+            return true;
+        }
+    }
+    // externalTimestamp < (it->second).first, the value is outdated.
+    return false;
+}
+
 void GRPCVehicleHardware::registerOnPropertyChangeEvent(
         std::unique_ptr<const PropertyChangeCallback> callback) {
     std::lock_guard lck(mCallbackMutex);
@@ -248,46 +339,61 @@
 
 void GRPCVehicleHardware::ValuePollingLoop() {
     while (!mShuttingDownFlag.load()) {
-        ::grpc::ClientContext context;
-
-        bool rpc_stopped{false};
-        std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
-            std::unique_lock<std::mutex> lck(mShutdownMutex);
-            mShutdownCV.wait(lck, [this, &rpc_stopped]() {
-                return rpc_stopped || mShuttingDownFlag.load();
-            });
-            context.TryCancel();
-        });
-
-        auto value_stream =
-                mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
-        LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
-        proto::VehiclePropValues protoValues;
-        while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
-            std::vector<aidlvhal::VehiclePropValue> values;
-            for (const auto protoValue : protoValues.values()) {
-                values.push_back(aidlvhal::VehiclePropValue());
-                proto_msg_converter::protoToAidl(protoValue, &values.back());
-            }
-            std::shared_lock lck(mCallbackMutex);
-            if (mOnPropChange) {
-                (*mOnPropChange)(values);
-            }
-        }
-
-        {
-            std::lock_guard lck(mShutdownMutex);
-            rpc_stopped = true;
-        }
-        mShutdownCV.notify_all();
-        shuttingdown_watcher.join();
-
-        auto grpc_status = value_stream->Finish();
-        // never reach here until connection lost
-        LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
-
+        pollValue();
         // try to reconnect
     }
 }
 
+void GRPCVehicleHardware::pollValue() {
+    ::grpc::ClientContext context;
+
+    bool rpc_stopped{false};
+    std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
+        std::unique_lock<std::mutex> lck(mShutdownMutex);
+        mShutdownCV.wait(
+                lck, [this, &rpc_stopped]() { return rpc_stopped || mShuttingDownFlag.load(); });
+        context.TryCancel();
+    });
+
+    auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
+    LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
+    proto::VehiclePropValues protoValues;
+    while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
+        std::vector<aidlvhal::VehiclePropValue> values;
+        for (const auto protoValue : protoValues.values()) {
+            aidlvhal::VehiclePropValue aidlValue = {};
+            proto_msg_converter::protoToAidl(protoValue, &aidlValue);
+
+            // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to
+            // reset the timestamp.
+            // TODO(b/350822044): Remove this once we use timestamp from proxy server.
+            if (!setAndroidTimestamp(&aidlValue)) {
+                LOG(WARNING) << __func__ << ": property event for propId: " << aidlValue.prop
+                             << " areaId: " << aidlValue.areaId << " is outdated, ignore";
+                continue;
+            }
+
+            values.push_back(std::move(aidlValue));
+        }
+        if (values.empty()) {
+            continue;
+        }
+        std::shared_lock lck(mCallbackMutex);
+        if (mOnPropChange) {
+            (*mOnPropChange)(values);
+        }
+    }
+
+    {
+        std::lock_guard lck(mShutdownMutex);
+        rpc_stopped = true;
+    }
+    mShutdownCV.notify_all();
+    shuttingdown_watcher.join();
+
+    auto grpc_status = value_stream->Finish();
+    // never reach here until connection lost
+    LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
+}
+
 }  // namespace android::hardware::automotive::vehicle::virtualization
diff --git a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h
index 9750f62..1edf658 100644
--- a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h
+++ b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h
@@ -20,6 +20,7 @@
 #include <VehicleHalTypes.h>
 #include <VehicleUtils.h>
 #include <android-base/result.h>
+#include <android-base/thread_annotations.h>
 
 #include "VehicleServer.grpc.pb.h"
 #include "VehicleServer.pb.h"
@@ -33,6 +34,7 @@
 #include <shared_mutex>
 #include <string>
 #include <thread>
+#include <unordered_map>
 #include <vector>
 
 namespace android::hardware::automotive::vehicle::virtualization {
@@ -43,9 +45,6 @@
   public:
     explicit GRPCVehicleHardware(std::string service_addr);
 
-    // Only used for unit testing.
-    explicit GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub);
-
     ~GRPCVehicleHardware();
 
     // Get all the property configs.
@@ -94,7 +93,7 @@
     std::unique_ptr<const PropertyChangeCallback> mOnPropChange;
 
   private:
-    void ValuePollingLoop();
+    friend class GRPCVehicleHardwareUnitTest;
 
     std::string mServiceAddr;
     std::shared_ptr<::grpc::Channel> mGrpcChannel;
@@ -106,6 +105,31 @@
     std::mutex mShutdownMutex;
     std::condition_variable mShutdownCV;
     std::atomic<bool> mShuttingDownFlag{false};
+
+    mutable std::mutex mLatestUpdateTimestampsMutex;
+
+    // A map from [propId, areaId] to the latest timestamp this property is updated.
+    // The key is a tuple, the first element is the external timestamp (timestamp set by VHAL
+    // server), the second element is the Android timestamp (elapsedRealtimeNano).
+    mutable std::unordered_map<PropIdAreaId, std::pair<int64_t, int64_t>,
+                               PropIdAreaIdHash> mLatestUpdateTimestamps
+            GUARDED_BY(mLatestUpdateTimestampsMutex);
+
+    // Only used for unit testing.
+    GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,
+                        bool startValuePollingLoop);
+
+    void ValuePollingLoop();
+    void pollValue();
+
+    aidlvhal::StatusCode getValuesWithRetry(const std::vector<aidlvhal::GetValueRequest>& requests,
+                                            std::vector<aidlvhal::GetValueResult>* results,
+                                            size_t retryCount) const;
+
+    // Check the external timestamp of propValue against the latest updated external timestamp, if
+    // this is an outdated value, return false. Otherwise, update the external timestamp to the
+    // Android timestamp and return true.
+    bool setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const;
 };
 
 }  // namespace android::hardware::automotive::vehicle::virtualization
diff --git a/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp b/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp
index 3bd7e0e..20af231 100644
--- a/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp
+++ b/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp
@@ -19,8 +19,10 @@
 
 #include <gmock/gmock.h>
 #include <grpc++/grpc++.h>
+#include <grpcpp/test/mock_stream.h>
 #include <gtest/gtest.h>
 
+#include <utils/SystemClock.h>
 #include <chrono>
 #include <memory>
 #include <string>
@@ -31,98 +33,48 @@
 
 using ::testing::_;
 using ::testing::DoAll;
+using ::testing::ElementsAre;
 using ::testing::NiceMock;
 using ::testing::Return;
 using ::testing::SaveArg;
 using ::testing::SetArgPointee;
+using ::testing::SizeIs;
+
+using ::grpc::testing::MockClientReader;
 
 using proto::MockVehicleServerStub;
 
-const std::string kFakeServerAddr = "0.0.0.0:54321";
-
-class FakeVehicleServer : public proto::VehicleServer::Service {
-  public:
-    ::grpc::Status StartPropertyValuesStream(
-            ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
-            ::grpc::ServerWriter<proto::VehiclePropValues>* stream) override {
-        stream->Write(proto::VehiclePropValues());
-        // A fake disconnection.
-        return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
-    }
-
-    // Functions that we do not care.
-    ::grpc::Status GetAllPropertyConfig(
-            ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
-            ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) override {
-        return ::grpc::Status::OK;
-    }
-
-    ::grpc::Status SetValues(::grpc::ServerContext* context,
-                             const proto::VehiclePropValueRequests* requests,
-                             proto::SetValueResults* results) override {
-        return ::grpc::Status::OK;
-    }
-
-    ::grpc::Status GetValues(::grpc::ServerContext* context,
-                             const proto::VehiclePropValueRequests* requests,
-                             proto::GetValueResults* results) override {
-        return ::grpc::Status::OK;
-    }
-};
-
-TEST(GRPCVehicleHardwareUnitTest, Reconnect) {
-    auto receivedUpdate = std::make_shared<std::atomic<int>>(0);
-    auto vehicleHardware = std::make_unique<GRPCVehicleHardware>(kFakeServerAddr);
-    vehicleHardware->registerOnPropertyChangeEvent(
-            std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
-                    [receivedUpdate](const auto&) { receivedUpdate->fetch_add(1); }));
-
-    constexpr size_t kServerRestartTimes = 5;
-    for (size_t serverStart = 0; serverStart < kServerRestartTimes; ++serverStart) {
-        EXPECT_EQ(receivedUpdate->load(), 0);
-        auto fakeServer = std::make_unique<FakeVehicleServer>();
-        ::grpc::ServerBuilder builder;
-        builder.RegisterService(fakeServer.get());
-        builder.AddListeningPort(kFakeServerAddr, ::grpc::InsecureServerCredentials());
-        auto grpcServer = builder.BuildAndStart();
-
-        // Wait until the vehicle hardware received the second update (after one fake
-        // disconnection).
-        constexpr auto kMaxWaitTime = std::chrono::seconds(5);
-        auto startTime = std::chrono::steady_clock::now();
-        while (receivedUpdate->load() <= 1 &&
-               std::chrono::steady_clock::now() - startTime < kMaxWaitTime)
-            ;
-
-        grpcServer->Shutdown();
-        grpcServer->Wait();
-        EXPECT_GT(receivedUpdate->load(), 1);
-
-        // Reset for the next round.
-        receivedUpdate->store(0);
-    }
-}
-
-class GRPCVehicleHardwareMockServerUnitTest : public ::testing::Test {
+class GRPCVehicleHardwareUnitTest : public ::testing::Test {
   protected:
     NiceMock<MockVehicleServerStub>* mGrpcStub;
     std::unique_ptr<GRPCVehicleHardware> mHardware;
 
     void SetUp() override {
         auto stub = std::make_unique<NiceMock<MockVehicleServerStub>>();
-        ;
         mGrpcStub = stub.get();
-        mHardware = std::make_unique<GRPCVehicleHardware>(std::move(stub));
+        // Cannot use make_unique here since the constructor is a private method.
+        mHardware = std::unique_ptr<GRPCVehicleHardware>(
+                new GRPCVehicleHardware(std::move(stub), /*startValuePollingLoop=*/false));
     }
 
     void TearDown() override { mHardware.reset(); }
+
+    // Access GRPCVehicleHardware private method.
+    void pollValue() { mHardware->pollValue(); }
+
+    void startValuePollingLoop(std::unique_ptr<proto::VehicleServer::StubInterface> stub) {
+        mHardware = std::unique_ptr<GRPCVehicleHardware>(
+                new GRPCVehicleHardware(std::move(stub), /*startValuePollingLoop=*/true));
+    }
+
+    void generatePropertyUpdateEvent(int32_t propId, int64_t timestamp);
 };
 
 MATCHER_P(RepeatedInt32Eq, expected_values, "") {
     return std::vector<int32_t>(arg.begin(), arg.end()) == expected_values;
 }
 
-TEST_F(GRPCVehicleHardwareMockServerUnitTest, Subscribe) {
+TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribe) {
     proto::VehicleHalCallStatus protoStatus;
     protoStatus.set_status_code(proto::StatusCode::OK);
     proto::SubscribeRequest actualRequest;
@@ -147,7 +99,7 @@
     EXPECT_EQ(protoOptions.enable_variable_update_rate(), true);
 }
 
-TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeLegacyServer) {
+TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribeLegacyServer) {
     EXPECT_CALL(*mGrpcStub, Subscribe(_, _, _))
             .WillOnce(Return(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")));
 
@@ -157,7 +109,7 @@
     EXPECT_EQ(status, aidlvhal::StatusCode::OK);
 }
 
-TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeGrpcFailure) {
+TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribeGrpcFailure) {
     EXPECT_CALL(*mGrpcStub, Subscribe(_, _, _))
             .WillOnce(Return(::grpc::Status(::grpc::StatusCode::INTERNAL, "GRPC Error")));
 
@@ -167,7 +119,7 @@
     EXPECT_EQ(status, aidlvhal::StatusCode::INTERNAL_ERROR);
 }
 
-TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeProtoFailure) {
+TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribeProtoFailure) {
     proto::VehicleHalCallStatus protoStatus;
     protoStatus.set_status_code(proto::StatusCode::NOT_AVAILABLE_SPEED_LOW);
 
@@ -181,7 +133,7 @@
     EXPECT_EQ(status, aidlvhal::StatusCode::NOT_AVAILABLE_SPEED_LOW);
 }
 
-TEST_F(GRPCVehicleHardwareMockServerUnitTest, Unsubscribe) {
+TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribe) {
     proto::VehicleHalCallStatus protoStatus;
     protoStatus.set_status_code(proto::StatusCode::OK);
     proto::UnsubscribeRequest actualRequest;
@@ -199,7 +151,7 @@
     EXPECT_EQ(actualRequest.area_id(), areaId);
 }
 
-TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeLegacyServer) {
+TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribeLegacyServer) {
     EXPECT_CALL(*mGrpcStub, Unsubscribe(_, _, _))
             .WillOnce(Return(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")));
 
@@ -208,7 +160,7 @@
     EXPECT_EQ(status, aidlvhal::StatusCode::OK);
 }
 
-TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeGrpcFailure) {
+TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribeGrpcFailure) {
     EXPECT_CALL(*mGrpcStub, Unsubscribe(_, _, _))
             .WillOnce(Return(::grpc::Status(::grpc::StatusCode::INTERNAL, "GRPC Error")));
 
@@ -217,7 +169,7 @@
     EXPECT_EQ(status, aidlvhal::StatusCode::INTERNAL_ERROR);
 }
 
-TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeProtoFailure) {
+TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribeProtoFailure) {
     proto::VehicleHalCallStatus protoStatus;
     protoStatus.set_status_code(proto::StatusCode::NOT_AVAILABLE_SPEED_LOW);
 
@@ -230,4 +182,264 @@
     EXPECT_EQ(status, aidlvhal::StatusCode::NOT_AVAILABLE_SPEED_LOW);
 }
 
+TEST_F(GRPCVehicleHardwareUnitTest, TestPollValue) {
+    int64_t testTimestamp = 12345;
+    int32_t testPropId = 54321;
+    int64_t startTimestamp = elapsedRealtimeNano();
+
+    // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed
+    // there.
+    auto clientReader = new MockClientReader<proto::VehiclePropValues>();
+    EXPECT_CALL(*mGrpcStub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader));
+    EXPECT_CALL(*clientReader, Read(_))
+            .WillOnce([testTimestamp, testPropId](proto::VehiclePropValues* values) {
+                values->Clear();
+                auto value = values->add_values();
+                value->set_timestamp(testTimestamp);
+                value->set_prop(testPropId);
+                return true;
+            })
+            .WillOnce(Return(false));
+    EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK));
+
+    std::vector<aidlvhal::VehiclePropValue> propertyEvents;
+
+    mHardware->registerOnPropertyChangeEvent(
+            std::make_unique<GRPCVehicleHardware::PropertyChangeCallback>(
+                    [&propertyEvents](const std::vector<aidlvhal::VehiclePropValue>& events) {
+                        for (const auto& event : events) {
+                            propertyEvents.push_back(event);
+                        }
+                    }));
+
+    pollValue();
+
+    ASSERT_THAT(propertyEvents, SizeIs(1));
+    EXPECT_EQ(propertyEvents[0].prop, testPropId);
+    EXPECT_GT(propertyEvents[0].timestamp, startTimestamp)
+            << "Timestamp must be updated to Android timestamp";
+    EXPECT_LT(propertyEvents[0].timestamp, elapsedRealtimeNano())
+            << "Timestamp must be updated to Android timestamp";
+}
+
+TEST_F(GRPCVehicleHardwareUnitTest, TestPollValueIgnoreOutdatedValue) {
+    int64_t testTimestamp1 = 12345;
+    int32_t value1 = 1324;
+    int64_t testTimestamp2 = 12340;
+    int32_t value2 = 1423;
+    int32_t testPropId = 54321;
+    int64_t startTimestamp = elapsedRealtimeNano();
+
+    // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed
+    // there.
+    auto clientReader = new MockClientReader<proto::VehiclePropValues>();
+    EXPECT_CALL(*mGrpcStub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader));
+    EXPECT_CALL(*clientReader, Read(_))
+            .WillOnce([testTimestamp1, value1, testPropId](proto::VehiclePropValues* values) {
+                values->Clear();
+                auto value = values->add_values();
+                value->set_timestamp(testTimestamp1);
+                value->set_prop(testPropId);
+                value->add_int32_values(value1);
+                return true;
+            })
+            .WillOnce([testTimestamp2, value2, testPropId](proto::VehiclePropValues* values) {
+                values->Clear();
+                // This event is outdated, must be ignored.
+                auto value = values->add_values();
+                value->set_timestamp(testTimestamp2);
+                value->set_prop(testPropId);
+                value->add_int32_values(value2);
+                return true;
+            })
+            .WillOnce(Return(false));
+    EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK));
+
+    std::vector<aidlvhal::VehiclePropValue> propertyEvents;
+
+    mHardware->registerOnPropertyChangeEvent(
+            std::make_unique<GRPCVehicleHardware::PropertyChangeCallback>(
+                    [&propertyEvents](const std::vector<aidlvhal::VehiclePropValue>& events) {
+                        for (const auto& event : events) {
+                            propertyEvents.push_back(event);
+                        }
+                    }));
+
+    pollValue();
+
+    ASSERT_THAT(propertyEvents, SizeIs(1)) << "Outdated event must be ignored";
+    EXPECT_EQ(propertyEvents[0].prop, testPropId);
+    EXPECT_GT(propertyEvents[0].timestamp, startTimestamp);
+    EXPECT_LT(propertyEvents[0].timestamp, elapsedRealtimeNano());
+    EXPECT_THAT(propertyEvents[0].value.int32Values, ElementsAre(value1));
+}
+
+TEST_F(GRPCVehicleHardwareUnitTest, TestValuePollingLoop) {
+    int64_t testTimestamp = 12345;
+    int32_t testPropId = 54321;
+    auto stub = std::make_unique<NiceMock<MockVehicleServerStub>>();
+
+    // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed
+    // there.
+    auto clientReader = new MockClientReader<proto::VehiclePropValues>();
+    EXPECT_CALL(*stub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader));
+    EXPECT_CALL(*clientReader, Read(_))
+            .WillRepeatedly([testTimestamp, testPropId](proto::VehiclePropValues* values) {
+                // Sleep for 10ms and always return the same property event.
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+                values->Clear();
+                auto value = values->add_values();
+                value->set_timestamp(testTimestamp);
+                value->set_prop(testPropId);
+                return true;
+            });
+    EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK));
+
+    startValuePollingLoop(std::move(stub));
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+    // This must stop the loop and wait for the thread to finish.
+    mHardware.reset();
+}
+
+TEST_F(GRPCVehicleHardwareUnitTest, TestGetValues) {
+    int64_t testRequestId = 1234;
+    int32_t testPropId = 4321;
+    int32_t testValue = 123456;
+    proto::VehiclePropValueRequests gotRequests;
+    EXPECT_CALL(*mGrpcStub, GetValues(_, _, _))
+            .WillOnce([&gotRequests, testRequestId, testPropId, testValue](
+                              ::grpc::ClientContext* context,
+                              const proto::VehiclePropValueRequests& request,
+                              proto::GetValueResults* response) {
+                gotRequests = request;
+                response->Clear();
+                auto* resultPtr = response->add_results();
+                resultPtr->set_request_id(testRequestId);
+                resultPtr->set_status(proto::StatusCode::OK);
+                auto* valuePtr = resultPtr->mutable_value();
+                valuePtr->set_prop(testPropId);
+                valuePtr->add_int32_values(testValue);
+                return ::grpc::Status::OK;
+            });
+
+    std::vector<aidlvhal::GetValueRequest> requests;
+    requests.push_back(aidlvhal::GetValueRequest{.requestId = testRequestId,
+                                                 .prop = {
+                                                         .prop = testPropId,
+                                                 }});
+
+    std::vector<aidlvhal::GetValueResult> gotResults;
+
+    auto status = mHardware->getValues(
+            std::make_shared<GRPCVehicleHardware::GetValuesCallback>(
+                    [&gotResults](std::vector<aidlvhal::GetValueResult> results) {
+                        for (const auto& result : results) {
+                            gotResults.push_back(result);
+                        }
+                    }),
+            requests);
+
+    ASSERT_EQ(status, aidlvhal::StatusCode::OK);
+    ASSERT_THAT(gotRequests.requests(), SizeIs(1));
+    EXPECT_THAT(gotRequests.requests(0).request_id(), testRequestId);
+    EXPECT_THAT(gotRequests.requests(0).value().prop(), testPropId);
+
+    ASSERT_THAT(gotResults, SizeIs(1));
+    EXPECT_EQ(gotResults[0].requestId, testRequestId);
+    EXPECT_EQ(gotResults[0].status, aidlvhal::StatusCode::OK);
+    EXPECT_EQ(gotResults[0].prop->prop, testPropId);
+    EXPECT_THAT(gotResults[0].prop->value.int32Values, ElementsAre(testValue));
+}
+
+void GRPCVehicleHardwareUnitTest::generatePropertyUpdateEvent(int32_t propId, int64_t timestamp) {
+    // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed
+    // there.
+    auto clientReader = new MockClientReader<proto::VehiclePropValues>();
+    EXPECT_CALL(*mGrpcStub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader));
+    EXPECT_CALL(*clientReader, Read(_))
+            .WillOnce([timestamp, propId](proto::VehiclePropValues* values) {
+                values->Clear();
+                auto value = values->add_values();
+                value->set_timestamp(timestamp);
+                value->set_prop(propId);
+                return true;
+            })
+            .WillOnce(Return(false));
+    EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK));
+
+    pollValue();
+}
+
+TEST_F(GRPCVehicleHardwareUnitTest, TestGetValuesOutdatedRetry) {
+    int64_t startTimestamp = elapsedRealtimeNano();
+    int64_t testRequestId = 1234;
+    int32_t testPropId = 4321;
+    int32_t testValue1 = 123456;
+    int32_t testValue2 = 654321;
+    int32_t testTimestamp1 = 1000;
+    int32_t testTimestamp2 = 2000;
+
+    // A property update event for testTimestamp2 happens before getValues returns.
+    generatePropertyUpdateEvent(testPropId, testTimestamp2);
+
+    // GetValues first returns an outdated result, then an up-to-date result.
+    EXPECT_CALL(*mGrpcStub, GetValues(_, _, _))
+            .WillOnce([testRequestId, testPropId, testValue1, testTimestamp1](
+                              ::grpc::ClientContext* context,
+                              const proto::VehiclePropValueRequests& request,
+                              proto::GetValueResults* response) {
+                response->Clear();
+                auto* resultPtr = response->add_results();
+                resultPtr->set_request_id(testRequestId);
+                resultPtr->set_status(proto::StatusCode::OK);
+                auto* valuePtr = resultPtr->mutable_value();
+                valuePtr->set_prop(testPropId);
+                valuePtr->set_timestamp(testTimestamp1);
+                valuePtr->add_int32_values(testValue1);
+                return ::grpc::Status::OK;
+            })
+            .WillOnce([testRequestId, testPropId, testValue2, testTimestamp2](
+                              ::grpc::ClientContext* context,
+                              const proto::VehiclePropValueRequests& request,
+                              proto::GetValueResults* response) {
+                response->Clear();
+                auto* resultPtr = response->add_results();
+                resultPtr->set_request_id(testRequestId);
+                resultPtr->set_status(proto::StatusCode::OK);
+                auto* valuePtr = resultPtr->mutable_value();
+                valuePtr->set_prop(testPropId);
+                valuePtr->set_timestamp(testTimestamp2);
+                valuePtr->add_int32_values(testValue2);
+                return ::grpc::Status::OK;
+            });
+
+    std::vector<aidlvhal::GetValueRequest> requests;
+    requests.push_back(aidlvhal::GetValueRequest{.requestId = testRequestId,
+                                                 .prop = {
+                                                         .prop = testPropId,
+                                                 }});
+
+    std::vector<aidlvhal::GetValueResult> gotResults;
+
+    auto status = mHardware->getValues(
+            std::make_shared<GRPCVehicleHardware::GetValuesCallback>(
+                    [&gotResults](std::vector<aidlvhal::GetValueResult> results) {
+                        for (const auto& result : results) {
+                            gotResults.push_back(result);
+                        }
+                    }),
+            requests);
+
+    ASSERT_EQ(status, aidlvhal::StatusCode::OK);
+    ASSERT_THAT(gotResults, SizeIs(1));
+    EXPECT_EQ(gotResults[0].requestId, testRequestId);
+    EXPECT_EQ(gotResults[0].status, aidlvhal::StatusCode::OK);
+    EXPECT_EQ(gotResults[0].prop->prop, testPropId);
+    EXPECT_THAT(gotResults[0].prop->value.int32Values, ElementsAre(testValue2));
+    EXPECT_GT(gotResults[0].prop->timestamp, startTimestamp);
+    EXPECT_LT(gotResults[0].prop->timestamp, elapsedRealtimeNano());
+}
+
 }  // namespace android::hardware::automotive::vehicle::virtualization