GRPC Vehicle Proxy Server

The GRPC proxy server speaks to another IVehicleHardware (no matter if it is real or fake) to serve
other GRPC vehicle hardware(s). It can be used as a real GRPC Vehicle
Hardware backend when it is running on the machine that have a direct
access to the real vehicle bus, or it can also be used as a fake backend
for testing.

Test: `atest GRPCVehicleProxyServerUnitTest`
Bug: 266001013
Change-Id: Ifec6e21223986a68ab089a87a9664c1b52601ea2
diff --git a/automotive/vehicle/aidl/impl/grpc/Android.bp b/automotive/vehicle/aidl/impl/grpc/Android.bp
index e8f0970..06c9600 100644
--- a/automotive/vehicle/aidl/impl/grpc/Android.bp
+++ b/automotive/vehicle/aidl/impl/grpc/Android.bp
@@ -100,3 +100,27 @@
         "-Wno-unused-parameter",
     ],
 }
+
+cc_library_static {
+    name: "android.hardware.automotive.vehicle@default-grpc-server-lib",
+    defaults: ["VehicleHalDefaults"],
+    vendor: true,
+    srcs: [
+        "GRPCVehicleProxyServer.cpp",
+    ],
+    whole_static_libs: [
+        "android.hardware.automotive.vehicle@default-grpc-libgrpc",
+        "VehicleHalProtoMessageConverter",
+    ],
+    header_libs: [
+        "IVehicleHardware",
+    ],
+    shared_libs: [
+        "libgrpc++",
+        "libprotobuf-cpp-full",
+    ],
+    export_include_dirs: ["."],
+    cflags: [
+        "-Wno-unused-parameter",
+    ],
+}
diff --git a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleProxyServer.cpp b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleProxyServer.cpp
new file mode 100644
index 0000000..e2fe97b
--- /dev/null
+++ b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleProxyServer.cpp
@@ -0,0 +1,269 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GRPCVehicleProxyServer.h"
+
+#include "ProtoMessageConverter.h"
+
+#include <grpc++/grpc++.h>
+
+#include <android-base/logging.h>
+
+#include <algorithm>
+#include <condition_variable>
+#include <mutex>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+namespace android::hardware::automotive::vehicle::virtualization {
+
+std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0};
+
+static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
+    // TODO(chenhaosjtuacm): get secured credentials here
+    return ::grpc::InsecureServerCredentials();
+}
+
+GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr,
+                                               std::unique_ptr<IVehicleHardware>&& hardware)
+    : mServiceAddr(std::move(serverAddr)), mHardware(std::move(hardware)) {
+    mHardware->registerOnPropertyChangeEvent(
+            std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
+                    [this](std::vector<aidlvhal::VehiclePropValue> values) {
+                        OnVehiclePropChange(values);
+                    }));
+}
+
+::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig(
+        ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
+        ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) {
+    for (const auto& config : mHardware->getAllPropertyConfigs()) {
+        proto::VehiclePropConfig protoConfig;
+        proto_msg_converter::aidlToProto(config, &protoConfig);
+        if (!stream->Write(protoConfig)) {
+            return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
+        }
+    }
+    return ::grpc::Status::OK;
+}
+
+::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context,
+                                                 const proto::VehiclePropValueRequests* requests,
+                                                 proto::SetValueResults* results) {
+    std::vector<aidlvhal::SetValueRequest> aidlRequests;
+    for (const auto& protoRequest : requests->requests()) {
+        auto& aidlRequest = aidlRequests.emplace_back();
+        aidlRequest.requestId = protoRequest.request_id();
+        proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value);
+    }
+    auto waitMtx = std::make_shared<std::mutex>();
+    auto waitCV = std::make_shared<std::condition_variable>();
+    auto complete = std::make_shared<bool>(false);
+    auto tmpResults = std::make_shared<proto::SetValueResults>();
+    auto aidlStatus = mHardware->setValues(
+            std::make_shared<const IVehicleHardware::SetValuesCallback>(
+                    [waitMtx, waitCV, complete,
+                     tmpResults](std::vector<aidlvhal::SetValueResult> setValueResults) {
+                        for (const auto& aidlResult : setValueResults) {
+                            auto& protoResult = *tmpResults->add_results();
+                            protoResult.set_request_id(aidlResult.requestId);
+                            protoResult.set_status(
+                                    static_cast<proto::StatusCode>(aidlResult.status));
+                        }
+                        {
+                            std::lock_guard lck(*waitMtx);
+                            *complete = true;
+                        }
+                        waitCV->notify_all();
+                    }),
+            aidlRequests);
+    if (aidlStatus != aidlvhal::StatusCode::OK) {
+        return ::grpc::Status(::grpc::StatusCode::INTERNAL,
+                              "The underlying hardware fails to set values, VHAL status: " +
+                                      toString(aidlStatus));
+    }
+    std::unique_lock lck(*waitMtx);
+    bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
+    if (!success) {
+        return ::grpc::Status(::grpc::StatusCode::INTERNAL,
+                              "The underlying hardware set values timeout.");
+    }
+    *results = std::move(*tmpResults);
+    return ::grpc::Status::OK;
+}
+
+::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context,
+                                                 const proto::VehiclePropValueRequests* requests,
+                                                 proto::GetValueResults* results) {
+    std::vector<aidlvhal::GetValueRequest> aidlRequests;
+    for (const auto& protoRequest : requests->requests()) {
+        auto& aidlRequest = aidlRequests.emplace_back();
+        aidlRequest.requestId = protoRequest.request_id();
+        proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop);
+    }
+    auto waitMtx = std::make_shared<std::mutex>();
+    auto waitCV = std::make_shared<std::condition_variable>();
+    auto complete = std::make_shared<bool>(false);
+    auto tmpResults = std::make_shared<proto::GetValueResults>();
+    auto aidlStatus = mHardware->getValues(
+            std::make_shared<const IVehicleHardware::GetValuesCallback>(
+                    [waitMtx, waitCV, complete,
+                     tmpResults](std::vector<aidlvhal::GetValueResult> getValueResults) {
+                        for (const auto& aidlResult : getValueResults) {
+                            auto& protoResult = *tmpResults->add_results();
+                            protoResult.set_request_id(aidlResult.requestId);
+                            protoResult.set_status(
+                                    static_cast<proto::StatusCode>(aidlResult.status));
+                            if (aidlResult.prop) {
+                                auto* valuePtr = protoResult.mutable_value();
+                                proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
+                            }
+                        }
+                        {
+                            std::lock_guard lck(*waitMtx);
+                            *complete = true;
+                        }
+                        waitCV->notify_all();
+                    }),
+            aidlRequests);
+    if (aidlStatus != aidlvhal::StatusCode::OK) {
+        return ::grpc::Status(::grpc::StatusCode::INTERNAL,
+                              "The underlying hardware fails to get values, VHAL status: " +
+                                      toString(aidlStatus));
+    }
+    std::unique_lock lck(*waitMtx);
+    bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
+    if (!success) {
+        return ::grpc::Status(::grpc::StatusCode::INTERNAL,
+                              "The underlying hardware get values timeout.");
+    }
+    *results = std::move(*tmpResults);
+    return ::grpc::Status::OK;
+}
+
+::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream(
+        ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
+        ::grpc::ServerWriter<proto::VehiclePropValues>* stream) {
+    auto conn = std::make_shared<ConnectionDescriptor>(stream);
+    {
+        std::lock_guard lck(mConnectionMutex);
+        mValueStreamingConnections.push_back(conn);
+    }
+    conn->Wait();
+    LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID();
+    return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
+}
+
+void GrpcVehicleProxyServer::OnVehiclePropChange(
+        const std::vector<aidlvhal::VehiclePropValue>& values) {
+    std::unordered_set<uint64_t> brokenConn;
+    proto::VehiclePropValues protoValues;
+    for (const auto& value : values) {
+        auto* protoValuePtr = protoValues.add_values();
+        proto_msg_converter::aidlToProto(value, protoValuePtr);
+    }
+    {
+        std::shared_lock read_lock(mConnectionMutex);
+        for (auto& connection : mValueStreamingConnections) {
+            auto writeOK = connection->Write(protoValues);
+            if (!writeOK) {
+                LOG(ERROR) << __func__
+                           << ": Server Write failed, connection lost. ID: " << connection->ID();
+                brokenConn.insert(connection->ID());
+            }
+        }
+    }
+    if (brokenConn.empty()) {
+        return;
+    }
+    std::unique_lock write_lock(mConnectionMutex);
+    mValueStreamingConnections.erase(
+            std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(),
+                           [&brokenConn](const auto& conn) {
+                               return brokenConn.find(conn->ID()) != brokenConn.end();
+                           }),
+            mValueStreamingConnections.end());
+}
+
+GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() {
+    if (mServer) {
+        LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started.";
+        return *this;
+    }
+    ::grpc::ServerBuilder builder;
+    builder.RegisterService(this);
+    builder.AddListeningPort(mServiceAddr, getServerCredentials());
+    mServer = builder.BuildAndStart();
+    CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
+                   << "please make sure the configuration and permissions are correct";
+    return *this;
+}
+
+GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() {
+    std::shared_lock read_lock(mConnectionMutex);
+    for (auto& conn : mValueStreamingConnections) {
+        conn->Shutdown();
+    }
+    if (mServer) {
+        mServer->Shutdown();
+    }
+    return *this;
+}
+
+void GrpcVehicleProxyServer::Wait() {
+    if (mServer) {
+        mServer->Wait();
+    }
+    mServer.reset();
+}
+
+GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() {
+    Shutdown();
+}
+
+bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) {
+    if (!mStream) {
+        LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID();
+        Shutdown();
+        return false;
+    }
+    {
+        std::lock_guard lck(*mMtx);
+        if (!mShutdownFlag && mStream->Write(values)) {
+            return true;
+        } else {
+            LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID();
+        }
+    }
+    Shutdown();
+    return false;
+}
+
+void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() {
+    std::unique_lock lck(*mMtx);
+    mCV->wait(lck, [this] { return mShutdownFlag; });
+}
+
+void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() {
+    {
+        std::lock_guard lck(*mMtx);
+        mShutdownFlag = true;
+    }
+    mCV->notify_all();
+}
+
+}  // namespace android::hardware::automotive::vehicle::virtualization
diff --git a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleProxyServer.h b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleProxyServer.h
new file mode 100644
index 0000000..01beec3
--- /dev/null
+++ b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleProxyServer.h
@@ -0,0 +1,112 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "IVehicleHardware.h"
+
+#include "VehicleServer.grpc.pb.h"
+#include "VehicleServer.pb.h"
+
+#include <grpc++/grpc++.h>
+
+#include <atomic>
+#include <chrono>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <shared_mutex>
+#include <string>
+#include <utility>
+
+namespace android::hardware::automotive::vehicle::virtualization {
+
+namespace aidlvhal = ::aidl::android::hardware::automotive::vehicle;
+
+// Connect other GRPC vehicle hardware(s) to the underlying vehicle hardware.
+class GrpcVehicleProxyServer : public proto::VehicleServer::Service {
+  public:
+    GrpcVehicleProxyServer(std::string serverAddr, std::unique_ptr<IVehicleHardware>&& hardware);
+
+    ::grpc::Status GetAllPropertyConfig(
+            ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
+            ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) override;
+
+    ::grpc::Status SetValues(::grpc::ServerContext* context,
+                             const proto::VehiclePropValueRequests* requests,
+                             proto::SetValueResults* results) override;
+
+    ::grpc::Status GetValues(::grpc::ServerContext* context,
+                             const proto::VehiclePropValueRequests* requests,
+                             proto::GetValueResults* results) override;
+
+    ::grpc::Status StartPropertyValuesStream(
+            ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
+            ::grpc::ServerWriter<proto::VehiclePropValues>* stream) override;
+
+    GrpcVehicleProxyServer& Start();
+
+    GrpcVehicleProxyServer& Shutdown();
+
+    void Wait();
+
+  private:
+    void OnVehiclePropChange(const std::vector<aidlvhal::VehiclePropValue>& values);
+
+    // We keep long-lasting connection for streaming the prop values.
+    struct ConnectionDescriptor {
+        explicit ConnectionDescriptor(::grpc::ServerWriter<proto::VehiclePropValues>* stream)
+            : mStream(stream),
+              mConnectionID(connection_id_counter_.fetch_add(1) + 1),
+              mMtx(std::make_unique<std::mutex>()),
+              mCV(std::make_unique<std::condition_variable>()) {}
+
+        ConnectionDescriptor(const ConnectionDescriptor&) = delete;
+        ConnectionDescriptor(ConnectionDescriptor&& cd) = default;
+        ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete;
+        ConnectionDescriptor& operator=(ConnectionDescriptor&& cd) = default;
+
+        ~ConnectionDescriptor();
+
+        uint64_t ID() const { return mConnectionID; }
+
+        bool Write(const proto::VehiclePropValues& values);
+
+        void Wait();
+
+        void Shutdown();
+
+      private:
+        ::grpc::ServerWriter<proto::VehiclePropValues>* mStream;
+        uint64_t mConnectionID{0};
+        std::unique_ptr<std::mutex> mMtx;
+        std::unique_ptr<std::condition_variable> mCV;
+        bool mShutdownFlag{false};
+
+        static std::atomic<uint64_t> connection_id_counter_;
+    };
+
+    std::string mServiceAddr;
+    std::unique_ptr<::grpc::Server> mServer{nullptr};
+    std::unique_ptr<IVehicleHardware> mHardware;
+
+    std::shared_mutex mConnectionMutex;
+    std::vector<std::shared_ptr<ConnectionDescriptor>> mValueStreamingConnections;
+
+    static constexpr auto kHardwareOpTimeout = std::chrono::seconds(1);
+};
+
+}  // namespace android::hardware::automotive::vehicle::virtualization
diff --git a/automotive/vehicle/aidl/impl/grpc/test/Android.bp b/automotive/vehicle/aidl/impl/grpc/test/Android.bp
index efdf4ca..e53826f 100644
--- a/automotive/vehicle/aidl/impl/grpc/test/Android.bp
+++ b/automotive/vehicle/aidl/impl/grpc/test/Android.bp
@@ -44,3 +44,31 @@
     ],
     test_suites: ["device-tests"],
 }
+
+cc_test {
+    name: "GRPCVehicleProxyServerUnitTest",
+    vendor: true,
+    srcs: ["GRPCVehicleProxyServerUnitTest.cpp"],
+    header_libs: [
+        "IVehicleHardware",
+    ],
+    static_libs: [
+        "android.hardware.automotive.vehicle@default-grpc-hardware-lib",
+        "android.hardware.automotive.vehicle@default-grpc-server-lib",
+        "libgtest",
+        "libgmock",
+    ],
+    shared_libs: [
+        "libgrpc++",
+        "libprotobuf-cpp-full",
+    ],
+    // libgrpc++.so is installed as root, require root to access it.
+    require_root: true,
+    defaults: [
+        "VehicleHalDefaults",
+    ],
+    cflags: [
+        "-Wno-unused-parameter",
+    ],
+    test_suites: ["device-tests"],
+}
diff --git a/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleProxyServerUnitTest.cpp b/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleProxyServerUnitTest.cpp
new file mode 100644
index 0000000..49e6fc9
--- /dev/null
+++ b/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleProxyServerUnitTest.cpp
@@ -0,0 +1,147 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "GRPCVehicleHardware.h"
+#include "GRPCVehicleProxyServer.h"
+#include "IVehicleHardware.h"
+#include "VehicleServer.grpc.pb.h"
+#include "VehicleServer.pb.h"
+
+#include <gmock/gmock.h>
+#include <grpc++/grpc++.h>
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <memory>
+#include <string>
+#include <thread>
+#include <utility>
+
+namespace android::hardware::automotive::vehicle::virtualization {
+
+const std::string kFakeServerAddr = "0.0.0.0:54321";
+
+class VehicleHardwareForTest : public IVehicleHardware {
+  public:
+    void registerOnPropertyChangeEvent(
+            std::unique_ptr<const PropertyChangeCallback> callback) override {
+        mOnProp = std::move(callback);
+    }
+
+    void onPropertyEvent(
+            std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropValue> values) {
+        if (mOnProp) {
+            (*mOnProp)(std::move(values));
+        }
+    }
+
+    // Functions that we do not care.
+    std::vector<aidl::android::hardware::automotive::vehicle::VehiclePropConfig>
+    getAllPropertyConfigs() const override {
+        return {};
+    }
+
+    aidl::android::hardware::automotive::vehicle::StatusCode setValues(
+            std::shared_ptr<const SetValuesCallback> callback,
+            const std::vector<aidl::android::hardware::automotive::vehicle::SetValueRequest>&
+                    requests) override {
+        return aidl::android::hardware::automotive::vehicle::StatusCode::OK;
+    }
+
+    aidl::android::hardware::automotive::vehicle::StatusCode getValues(
+            std::shared_ptr<const GetValuesCallback> callback,
+            const std::vector<aidl::android::hardware::automotive::vehicle::GetValueRequest>&
+                    requests) const override {
+        return aidl::android::hardware::automotive::vehicle::StatusCode::OK;
+    }
+
+    DumpResult dump(const std::vector<std::string>& options) override { return {}; }
+
+    aidl::android::hardware::automotive::vehicle::StatusCode checkHealth() override {
+        return aidl::android::hardware::automotive::vehicle::StatusCode::OK;
+    }
+
+    void registerOnPropertySetErrorEvent(
+            std::unique_ptr<const PropertySetErrorCallback> callback) override {}
+
+  private:
+    std::unique_ptr<const PropertyChangeCallback> mOnProp;
+};
+
+TEST(GRPCVehicleProxyServerUnitTest, ClientConnectDisconnect) {
+    auto testHardware = std::make_unique<VehicleHardwareForTest>();
+    // HACK: manipulate the underlying hardware via raw pointer for testing.
+    auto* testHardwareRaw = testHardware.get();
+    auto vehicleServer =
+            std::make_unique<GrpcVehicleProxyServer>(kFakeServerAddr, std::move(testHardware));
+    vehicleServer->Start();
+
+    constexpr auto kWaitForConnectionMaxTime = std::chrono::seconds(5);
+    constexpr auto kWaitForStreamStartTime = std::chrono::seconds(1);
+    constexpr auto kWaitForUpdateDeliveryTime = std::chrono::milliseconds(100);
+
+    auto updateReceived1 = std::make_shared<bool>(false);
+    auto vehicleHardware1 = std::make_unique<GRPCVehicleHardware>(kFakeServerAddr);
+    vehicleHardware1->registerOnPropertyChangeEvent(
+            std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
+                    [updateReceived1](const auto&) { *updateReceived1 = true; }));
+    EXPECT_TRUE(vehicleHardware1->waitForConnected(kWaitForConnectionMaxTime));
+    std::this_thread::sleep_for(kWaitForStreamStartTime);
+
+    // Client hardware 1 received update from the server.
+    EXPECT_FALSE(*updateReceived1);
+    testHardwareRaw->onPropertyEvent({});
+    // Wait for the update delivery.
+    std::this_thread::sleep_for(kWaitForUpdateDeliveryTime);
+    EXPECT_TRUE(*updateReceived1);
+
+    // Reset.
+    *updateReceived1 = false;
+
+    auto updateReceived2 = std::make_shared<bool>(false);
+    auto vehicleHardware2 = std::make_unique<GRPCVehicleHardware>(kFakeServerAddr);
+    vehicleHardware2->registerOnPropertyChangeEvent(
+            std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
+                    [updateReceived2](const auto&) { *updateReceived2 = true; }));
+    EXPECT_TRUE(vehicleHardware2->waitForConnected(kWaitForConnectionMaxTime));
+    std::this_thread::sleep_for(kWaitForStreamStartTime);
+
+    // Both client hardware 1 and 2 received update from the server.
+    EXPECT_FALSE(*updateReceived1);
+    EXPECT_FALSE(*updateReceived2);
+    testHardwareRaw->onPropertyEvent({});
+    // Wait for the update delivery.
+    std::this_thread::sleep_for(kWaitForUpdateDeliveryTime);
+    EXPECT_TRUE(*updateReceived1);
+    EXPECT_TRUE(*updateReceived2);
+
+    // Reset.
+    *updateReceived1 = false;
+    *updateReceived2 = false;
+
+    vehicleHardware1.reset();
+
+    // Client 1 exited, only client hardware 2 received update from the server.
+    EXPECT_FALSE(*updateReceived1);
+    EXPECT_FALSE(*updateReceived2);
+    testHardwareRaw->onPropertyEvent({});
+    // Wait for the update delivery.
+    std::this_thread::sleep_for(kWaitForUpdateDeliveryTime);
+    EXPECT_FALSE(*updateReceived1);
+    EXPECT_TRUE(*updateReceived2);
+
+    vehicleServer->Shutdown().Wait();
+}
+
+}  // namespace android::hardware::automotive::vehicle::virtualization