Support subscribe/unsubscribe in VHAL.

Test: atest DefaultVehicleHalTest
Bug: 200737967

Change-Id: I4e7b31af7fa2af445f9bac6ec71dad3bf6c0b8b3
diff --git a/automotive/vehicle/aidl/impl/utils/common/include/VehicleUtils.h b/automotive/vehicle/aidl/impl/utils/common/include/VehicleUtils.h
index 49b33d5..0f0ccf1 100644
--- a/automotive/vehicle/aidl/impl/utils/common/include/VehicleUtils.h
+++ b/automotive/vehicle/aidl/impl/utils/common/include/VehicleUtils.h
@@ -67,24 +67,30 @@
 }
 
 inline const ::aidl::android::hardware::automotive::vehicle::VehicleAreaConfig* getAreaConfig(
-        const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue& propValue,
+        int32_t propId, int32_t areaId,
         const ::aidl::android::hardware::automotive::vehicle::VehiclePropConfig& config) {
     if (config.areaConfigs.size() == 0) {
         return nullptr;
     }
 
-    if (isGlobalProp(propValue.prop)) {
+    if (isGlobalProp(propId)) {
         return &(config.areaConfigs[0]);
     }
 
     for (const auto& c : config.areaConfigs) {
-        if (c.areaId == propValue.areaId) {
+        if (c.areaId == areaId) {
             return &c;
         }
     }
     return nullptr;
 }
 
+inline const ::aidl::android::hardware::automotive::vehicle::VehicleAreaConfig* getAreaConfig(
+        const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue& propValue,
+        const ::aidl::android::hardware::automotive::vehicle::VehiclePropConfig& config) {
+    return getAreaConfig(propValue.prop, propValue.areaId, config);
+}
+
 inline std::unique_ptr<::aidl::android::hardware::automotive::vehicle::VehiclePropValue>
 createVehiclePropValueVec(::aidl::android::hardware::automotive::vehicle::VehiclePropertyType type,
                           size_t vecSize) {
diff --git a/automotive/vehicle/aidl/impl/utils/common/src/VehiclePropertyStore.cpp b/automotive/vehicle/aidl/impl/utils/common/src/VehiclePropertyStore.cpp
index 1a79230..c1fa896 100644
--- a/automotive/vehicle/aidl/impl/utils/common/src/VehiclePropertyStore.cpp
+++ b/automotive/vehicle/aidl/impl/utils/common/src/VehiclePropertyStore.cpp
@@ -21,9 +21,11 @@
 
 #include <VehicleHalTypes.h>
 #include <VehicleUtils.h>
-#include <android-base/format.h>
+#include <android-base/stringprintf.h>
 #include <math/HashCombine.h>
 
+#include <inttypes.h>
+
 namespace android {
 namespace hardware {
 namespace automotive {
@@ -36,13 +38,14 @@
 using ::aidl::android::hardware::automotive::vehicle::VehiclePropValue;
 using ::android::base::Error;
 using ::android::base::Result;
+using ::android::base::StringPrintf;
 
 bool VehiclePropertyStore::RecordId::operator==(const VehiclePropertyStore::RecordId& other) const {
     return area == other.area && token == other.token;
 }
 
 std::string VehiclePropertyStore::RecordId::toString() const {
-    return ::fmt::format("RecordID{{.areaId={:d}, .token={:d}}}", area, token);
+    return StringPrintf("RecordID{{.areaId=% " PRId32 ", .token=%" PRId64 "}", area, token);
 }
 
 size_t VehiclePropertyStore::RecordIdHash::operator()(RecordId const& recordId) const {
diff --git a/automotive/vehicle/aidl/impl/vhal/include/ConnectedClient.h b/automotive/vehicle/aidl/impl/vhal/include/ConnectedClient.h
index d8516b1..4f0b74a 100644
--- a/automotive/vehicle/aidl/impl/vhal/include/ConnectedClient.h
+++ b/automotive/vehicle/aidl/impl/vhal/include/ConnectedClient.h
@@ -19,6 +19,7 @@
 
 #include "PendingRequestPool.h"
 
+#include <IVehicleHardware.h>
 #include <VehicleHalTypes.h>
 
 #include <aidl/android/hardware/automotive/vehicle/IVehicleCallback.h>
@@ -51,14 +52,14 @@
     // Gets the unique ID for this client.
     const void* id();
 
-    // Add client requests. The requests would be registered as pending requests until
+    // Adds client requests. The requests would be registered as pending requests until
     // {@code tryFinishRequests} is called for them.
     // Returns {@code INVALID_ARG} error if any of the requestIds are duplicate with one of the
     // pending request IDs or {@code TRY_AGAIN} error if the pending request pool is full and could
     // no longer add requests.
     ::android::base::Result<void> addRequests(const std::unordered_set<int64_t>& requestIds);
 
-    // Mark the requests as finished. Returns a list of request IDs that was pending and has been
+    // Marks the requests as finished. Returns a list of request IDs that was pending and has been
     // finished. It must be a set of the requested request IDs.
     std::unordered_set<int64_t> tryFinishRequests(const std::unordered_set<int64_t>& requestIds);
 
@@ -110,9 +111,15 @@
                     callback);
 
     // Gets the callback to be called when the request for this client has finished.
-    std::shared_ptr<const std::function<
-            void(std::vector<::aidl::android::hardware::automotive::vehicle::GetValueResult>)>>
-    getResultCallback();
+    std::shared_ptr<const IVehicleHardware::GetValuesCallback> getResultCallback();
+
+    // Marshals the updated values into largeParcelable and sents it through {@code onPropertyEvent}
+    // callback.
+    static void sendUpdatedValues(
+            std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>
+                    callback,
+            std::vector<::aidl::android::hardware::automotive::vehicle::VehiclePropValue>&&
+                    updatedValues);
 
   protected:
     // Gets the callback to be called when the request for this client has timeout.
@@ -121,9 +128,8 @@
   private:
     // The following members are only initialized during construction.
     std::shared_ptr<const PendingRequestPool::TimeoutCallbackFunc> mTimeoutCallback;
-    std::shared_ptr<const std::function<void(
-            std::vector<::aidl::android::hardware::automotive::vehicle::GetValueResult>)>>
-            mResultCallback;
+    std::shared_ptr<const IVehicleHardware::GetValuesCallback> mResultCallback;
+    std::shared_ptr<const IVehicleHardware::PropertyChangeCallback> mPropertyChangeCallback;
 
     static void onGetValueResults(
             const void* clientId,
diff --git a/automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h b/automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h
index b0423a3..6becff8 100644
--- a/automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h
+++ b/automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h
@@ -53,7 +53,7 @@
 
     explicit DefaultVehicleHal(std::unique_ptr<IVehicleHardware> hardware);
 
-    ~DefaultVehicleHal();
+    ~DefaultVehicleHal() = default;
 
     ::ndk::ScopedAStatus getAllPropConfigs(
             ::aidl::android::hardware::automotive::vehicle::VehiclePropConfigs* returnConfigs)
@@ -93,11 +93,40 @@
             GetSetValuesClient<::aidl::android::hardware::automotive::vehicle::SetValueResult,
                                ::aidl::android::hardware::automotive::vehicle::SetValueResults>;
 
+    // A thread safe class to maintain an increasing request ID for each subscribe client. This
+    // class is safe to pass to async callbacks.
+    class SubscribeIdByClient {
+      public:
+        int64_t getId(const CallbackType& callback);
+
+      private:
+        std::mutex mLock;
+        std::unordered_map<CallbackType, int64_t> mIds GUARDED_BY(mLock);
+    };
+
+    // A thread safe class to store all subscribe clients. This class is safe to pass to async
+    // callbacks.
+    class SubscriptionClients {
+      public:
+        SubscriptionClients(std::shared_ptr<PendingRequestPool> pool) : mPendingRequestPool(pool) {}
+
+        std::shared_ptr<SubscriptionClient> getClient(const CallbackType& callback);
+
+        size_t countClients();
+
+      private:
+        std::mutex mLock;
+        std::unordered_map<CallbackType, std::shared_ptr<SubscriptionClient>> mClients
+                GUARDED_BY(mLock);
+        // PendingRequestPool is thread-safe.
+        std::shared_ptr<PendingRequestPool> mPendingRequestPool;
+    };
+
     // The default timeout of get or set value requests is 30s.
     // TODO(b/214605968): define TIMEOUT_IN_NANO in IVehicle and allow getValues/setValues/subscribe
     // to specify custom timeouts.
     static constexpr int64_t TIMEOUT_IN_NANO = 30'000'000'000;
-    const std::unique_ptr<IVehicleHardware> mVehicleHardware;
+    const std::shared_ptr<IVehicleHardware> mVehicleHardware;
 
     // mConfigsByPropId and mConfigFile are only modified during initialization, so no need to
     // lock guard them.
@@ -108,22 +137,15 @@
     // PendingRequestPool is thread-safe.
     std::shared_ptr<PendingRequestPool> mPendingRequestPool;
     // SubscriptionManager is thread-safe.
-    std::unique_ptr<SubscriptionManager> mSubscriptionManager;
+    std::shared_ptr<SubscriptionManager> mSubscriptionManager;
 
     std::mutex mLock;
     std::unordered_map<CallbackType, std::shared_ptr<GetValuesClient>> mGetValuesClients
             GUARDED_BY(mLock);
     std::unordered_map<CallbackType, std::shared_ptr<SetValuesClient>> mSetValuesClients
             GUARDED_BY(mLock);
-    std::unordered_map<CallbackType, std::shared_ptr<SubscriptionClient>> mSubscriptionClients
-            GUARDED_BY(mLock);
-    // An increasing request ID we keep for subscribe clients.
-    std::unordered_map<CallbackType, int64_t> mSubscribeIdByClient GUARDED_BY(mLock);
-
-    template <class T>
-    std::shared_ptr<T> getOrCreateClient(
-            std::unordered_map<CallbackType, std::shared_ptr<T>>* clients,
-            const CallbackType& callback) REQUIRES(mLock);
+    // SubscriptionClients is thread-safe.
+    std::shared_ptr<SubscriptionClients> mSubscriptionClients;
 
     ::android::base::Result<void> checkProperty(
             const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue& propValue);
@@ -136,10 +158,26 @@
             const std::vector<::aidl::android::hardware::automotive::vehicle::SetValueRequest>&
                     requests);
 
-    void getValueFromHardwareCallCallback(
-            const CallbackType& callback,
+    ::android::base::Result<void> checkSubscribeOptions(
+            const std::vector<::aidl::android::hardware::automotive::vehicle::SubscribeOptions>&
+                    options);
+
+    template <class T>
+    static std::shared_ptr<T> getOrCreateClient(
+            std::unordered_map<CallbackType, std::shared_ptr<T>>* clients,
+            const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool);
+
+    static void getValueFromHardwareCallCallback(
+            std::weak_ptr<IVehicleHardware> vehicleHardware,
+            std::shared_ptr<SubscribeIdByClient> subscribeIdByClient,
+            std::shared_ptr<SubscriptionClients> subscriptionClients, const CallbackType& callback,
             const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue& value);
 
+    static void onPropertyChangeEvent(
+            std::weak_ptr<SubscriptionManager> subscriptionManager,
+            const std::vector<::aidl::android::hardware::automotive::vehicle::VehiclePropValue>&
+                    updatedValues);
+
     // Test-only
     // Set the default timeout for pending requests.
     void setTimeout(int64_t timeoutInNano);
diff --git a/automotive/vehicle/aidl/impl/vhal/src/ConnectedClient.cpp b/automotive/vehicle/aidl/impl/vhal/src/ConnectedClient.cpp
index 7d02a05..5ccef55 100644
--- a/automotive/vehicle/aidl/impl/vhal/src/ConnectedClient.cpp
+++ b/automotive/vehicle/aidl/impl/vhal/src/ConnectedClient.cpp
@@ -258,7 +258,7 @@
             });
     auto requestPoolCopy = mRequestPool;
     const void* clientId = reinterpret_cast<const void*>(this);
-    mResultCallback = std::make_shared<const std::function<void(std::vector<GetValueResult>)>>(
+    mResultCallback = std::make_shared<const IVehicleHardware::GetValuesCallback>(
             [clientId, callback, requestPoolCopy](std::vector<GetValueResult> results) {
                 onGetValueResults(clientId, callback, requestPoolCopy, results);
             });
@@ -274,6 +274,32 @@
     return mTimeoutCallback;
 }
 
+void SubscriptionClient::sendUpdatedValues(std::shared_ptr<IVehicleCallback> callback,
+                                           std::vector<VehiclePropValue>&& updatedValues) {
+    if (updatedValues.empty()) {
+        return;
+    }
+
+    // TODO(b/205189110): Use memory pool here and fill in sharedMemoryId.
+    VehiclePropValues vehiclePropValues;
+    int32_t sharedMemoryFileCount = 0;
+    ScopedAStatus status = vectorToStableLargeParcelable(updatedValues, &vehiclePropValues);
+    if (!status.isOk()) {
+        int statusCode = status.getServiceSpecificError();
+        ALOGE("subscribe: failed to marshal result into large parcelable, error: "
+              "%s, code: %d",
+              status.getMessage(), statusCode);
+        return;
+    }
+
+    if (ScopedAStatus callbackStatus =
+                callback->onPropertyEvent(vehiclePropValues, sharedMemoryFileCount);
+        !callbackStatus.isOk()) {
+        ALOGE("subscribe: failed to call callback, error: %s, code: %d", status.getMessage(),
+              status.getServiceSpecificError());
+    }
+}
+
 void SubscriptionClient::onGetValueResults(const void* clientId,
                                            std::shared_ptr<IVehicleCallback> callback,
                                            std::shared_ptr<PendingRequestPool> requestPool,
@@ -308,27 +334,7 @@
         propValues.push_back(std::move(result.prop.value()));
     }
 
-    if (propValues.empty()) {
-        return;
-    }
-    // TODO(b/205189110): Use memory pool here and fill in sharedMemoryId.
-    VehiclePropValues vehiclePropValues;
-    int32_t sharedMemoryFileCount = 0;
-    ScopedAStatus status = vectorToStableLargeParcelable(propValues, &vehiclePropValues);
-    if (!status.isOk()) {
-        int statusCode = status.getServiceSpecificError();
-        ALOGE("failed to marshal result into large parcelable, error: "
-              "%s, code: %d",
-              status.getMessage(), statusCode);
-        return;
-    }
-
-    if (ScopedAStatus callbackStatus =
-                callback->onPropertyEvent(vehiclePropValues, sharedMemoryFileCount);
-        !callbackStatus.isOk()) {
-        ALOGE("failed to call callback, error: %s, code: %d", status.getMessage(),
-              status.getServiceSpecificError());
-    }
+    sendUpdatedValues(callback, std::move(propValues));
 }
 
 }  // namespace vehicle
diff --git a/automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp b/automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp
index 1e76eb7..7549635 100644
--- a/automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp
+++ b/automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp
@@ -23,6 +23,7 @@
 #include <VehicleUtils.h>
 
 #include <android-base/result.h>
+#include <android-base/stringprintf.h>
 #include <utils/Log.h>
 
 #include <inttypes.h>
@@ -50,11 +51,13 @@
 using ::aidl::android::hardware::automotive::vehicle::VehicleAreaConfig;
 using ::aidl::android::hardware::automotive::vehicle::VehiclePropConfig;
 using ::aidl::android::hardware::automotive::vehicle::VehiclePropConfigs;
+using ::aidl::android::hardware::automotive::vehicle::VehiclePropertyChangeMode;
 using ::aidl::android::hardware::automotive::vehicle::VehiclePropValue;
 using ::android::automotive::car_binder_lib::LargeParcelableBase;
 using ::android::base::Error;
 using ::android::base::expected;
 using ::android::base::Result;
+using ::android::base::StringPrintf;
 using ::ndk::ScopedAStatus;
 
 std::string toString(const std::unordered_set<int64_t>& values) {
@@ -70,6 +73,24 @@
 
 }  // namespace
 
+std::shared_ptr<SubscriptionClient> DefaultVehicleHal::SubscriptionClients::getClient(
+        const CallbackType& callback) {
+    std::scoped_lock<std::mutex> lockGuard(mLock);
+    return getOrCreateClient(&mClients, callback, mPendingRequestPool);
+}
+
+int64_t DefaultVehicleHal::SubscribeIdByClient::getId(const CallbackType& callback) {
+    std::scoped_lock<std::mutex> lockGuard(mLock);
+    // This would be initialized to 0 if callback does not exist in the map.
+    int64_t subscribeId = (mIds[callback])++;
+    return subscribeId;
+}
+
+size_t DefaultVehicleHal::SubscriptionClients::countClients() {
+    std::scoped_lock<std::mutex> lockGuard(mLock);
+    return mClients.size();
+}
+
 DefaultVehicleHal::DefaultVehicleHal(std::unique_ptr<IVehicleHardware> hardware)
     : mVehicleHardware(std::move(hardware)),
       mPendingRequestPool(std::make_shared<PendingRequestPool>(TIMEOUT_IN_NANO)) {
@@ -90,24 +111,50 @@
         mConfigFile = std::move(result.value());
     }
 
-    mSubscriptionManager = std::make_unique<SubscriptionManager>(
-            [this](const CallbackType& callback, const VehiclePropValue& value) {
-                getValueFromHardwareCallCallback(callback, value);
-            });
+    mSubscriptionClients = std::make_shared<SubscriptionClients>(mPendingRequestPool);
+
+    auto subscribeIdByClient = std::make_shared<SubscribeIdByClient>();
+    // Make a weak copy of IVehicleHardware because subscriptionManager uses IVehicleHardware and
+    // IVehicleHardware uses subscriptionManager. We want to avoid cyclic reference.
+    std::weak_ptr<IVehicleHardware> hardwareCopy = mVehicleHardware;
+    SubscriptionManager::GetValueFunc getValueFunc = std::bind(
+            &DefaultVehicleHal::getValueFromHardwareCallCallback, hardwareCopy, subscribeIdByClient,
+            mSubscriptionClients, std::placeholders::_1, std::placeholders::_2);
+    mSubscriptionManager = std::make_shared<SubscriptionManager>(std::move(getValueFunc));
+
+    std::weak_ptr<SubscriptionManager> subscriptionManagerCopy = mSubscriptionManager;
+    mVehicleHardware->registerOnPropertyChangeEvent(
+            std::make_unique<IVehicleHardware::PropertyChangeCallback>(
+                    [subscriptionManagerCopy](std::vector<VehiclePropValue> updatedValues) {
+                        onPropertyChangeEvent(subscriptionManagerCopy, updatedValues);
+                    }));
 }
 
-DefaultVehicleHal::~DefaultVehicleHal() {
-    // mSubscriptionManager has reference to this, so must be destroyed before other members.
-    mSubscriptionManager.reset();
+void DefaultVehicleHal::onPropertyChangeEvent(
+        std::weak_ptr<SubscriptionManager> subscriptionManager,
+        const std::vector<VehiclePropValue>& updatedValues) {
+    auto manager = subscriptionManager.lock();
+    if (manager == nullptr) {
+        ALOGW("the SubscriptionManager is destroyed, DefaultVehicleHal is ending");
+        return;
+    }
+    auto updatedValuesByClients = manager->getSubscribedClients(updatedValues);
+    for (const auto& [callback, valuePtrs] : updatedValuesByClients) {
+        std::vector<VehiclePropValue> values;
+        for (const VehiclePropValue* valuePtr : valuePtrs) {
+            values.push_back(*valuePtr);
+        }
+        SubscriptionClient::sendUpdatedValues(callback, std::move(values));
+    }
 }
 
 template <class T>
 std::shared_ptr<T> DefaultVehicleHal::getOrCreateClient(
-        std::unordered_map<CallbackType, std::shared_ptr<T>>* clients,
-        const CallbackType& callback) {
+        std::unordered_map<CallbackType, std::shared_ptr<T>>* clients, const CallbackType& callback,
+        std::shared_ptr<PendingRequestPool> pendingRequestPool) {
     if (clients->find(callback) == clients->end()) {
         // TODO(b/204943359): Remove client from clients when linkToDeath is implemented.
-        (*clients)[callback] = std::make_shared<T>(mPendingRequestPool, callback);
+        (*clients)[callback] = std::make_shared<T>(pendingRequestPool, callback);
     }
     return (*clients)[callback];
 }
@@ -115,26 +162,23 @@
 template std::shared_ptr<DefaultVehicleHal::GetValuesClient>
 DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::GetValuesClient>(
         std::unordered_map<CallbackType, std::shared_ptr<GetValuesClient>>* clients,
-        const CallbackType& callback);
+        const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool);
 template std::shared_ptr<DefaultVehicleHal::SetValuesClient>
 DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::SetValuesClient>(
         std::unordered_map<CallbackType, std::shared_ptr<SetValuesClient>>* clients,
-        const CallbackType& callback);
+        const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool);
 template std::shared_ptr<SubscriptionClient>
 DefaultVehicleHal::getOrCreateClient<SubscriptionClient>(
         std::unordered_map<CallbackType, std::shared_ptr<SubscriptionClient>>* clients,
-        const CallbackType& callback);
+        const CallbackType& callback, std::shared_ptr<PendingRequestPool> pendingRequestPool);
 
-void DefaultVehicleHal::getValueFromHardwareCallCallback(const CallbackType& callback,
-                                                         const VehiclePropValue& value) {
-    int64_t subscribeId;
-    std::shared_ptr<SubscriptionClient> client;
-    {
-        std::scoped_lock<std::mutex> lockGuard(mLock);
-        // This is initialized to 0 if callback does not exist in the map.
-        subscribeId = (mSubscribeIdByClient[callback])++;
-        client = getOrCreateClient(&mSubscriptionClients, callback);
-    }
+void DefaultVehicleHal::getValueFromHardwareCallCallback(
+        std::weak_ptr<IVehicleHardware> vehicleHardware,
+        std::shared_ptr<SubscribeIdByClient> subscribeIdByClient,
+        std::shared_ptr<SubscriptionClients> subscriptionClients, const CallbackType& callback,
+        const VehiclePropValue& value) {
+    int64_t subscribeId = subscribeIdByClient->getId(callback);
+    auto client = subscriptionClients->getClient(callback);
     if (auto addRequestResult = client->addRequests({subscribeId}); !addRequestResult.ok()) {
         ALOGE("subscribe[%" PRId64 "]: too many pending requests, ignore the getValue request",
               subscribeId);
@@ -146,8 +190,12 @@
             .prop = value,
     }};
 
-    if (StatusCode status =
-                mVehicleHardware->getValues(client->getResultCallback(), hardwareRequests);
+    std::shared_ptr<IVehicleHardware> hardware = vehicleHardware.lock();
+    if (hardware == nullptr) {
+        ALOGW("the IVehicleHardware is destroyed, DefaultVehicleHal is ending");
+        return;
+    }
+    if (StatusCode status = hardware->getValues(client->getResultCallback(), hardwareRequests);
         status != StatusCode::OK) {
         // If the hardware returns error, finish all the pending requests for this request because
         // we never expect hardware to call callback for these requests.
@@ -222,7 +270,7 @@
     std::shared_ptr<GetValuesClient> client;
     {
         std::scoped_lock<std::mutex> lockGuard(mLock);
-        client = getOrCreateClient(&mGetValuesClients, callback);
+        client = getOrCreateClient(&mGetValuesClients, callback, mPendingRequestPool);
     }
     // Register the pending hardware requests and also check for duplicate request Ids.
     if (auto addRequestResult = client->addRequests(hardwareRequestIds); !addRequestResult.ok()) {
@@ -291,7 +339,7 @@
     std::shared_ptr<SetValuesClient> client;
     {
         std::scoped_lock<std::mutex> lockGuard(mLock);
-        client = getOrCreateClient(&mSetValuesClients, callback);
+        client = getOrCreateClient(&mSetValuesClients, callback, mPendingRequestPool);
     }
 
     // Register the pending hardware requests and also check for duplicate request Ids.
@@ -360,13 +408,100 @@
     return vectorToStableLargeParcelable(std::move(configs), output);
 }
 
-ScopedAStatus DefaultVehicleHal::subscribe(const CallbackType&,
-                                           const std::vector<SubscribeOptions>&, int32_t) {
+Result<void> DefaultVehicleHal::checkSubscribeOptions(
+        const std::vector<SubscribeOptions>& options) {
+    for (const auto& option : options) {
+        int32_t propId = option.propId;
+        if (mConfigsByPropId.find(propId) == mConfigsByPropId.end()) {
+            return Error() << StringPrintf("no config for property, ID: %" PRId32, propId);
+        }
+        const VehiclePropConfig& config = mConfigsByPropId[propId];
+
+        if (config.changeMode != VehiclePropertyChangeMode::ON_CHANGE &&
+            config.changeMode != VehiclePropertyChangeMode::CONTINUOUS) {
+            return Error() << "only support subscribing to ON_CHANGE or CONTINUOUS property";
+        }
+
+        if (config.changeMode == VehiclePropertyChangeMode::CONTINUOUS) {
+            float sampleRate = option.sampleRate;
+            float minSampleRate = config.minSampleRate;
+            float maxSampleRate = config.maxSampleRate;
+            if (sampleRate < minSampleRate || sampleRate > maxSampleRate) {
+                return Error() << StringPrintf(
+                               "sample rate: %f out of range, must be within %f and %f", sampleRate,
+                               minSampleRate, maxSampleRate);
+            }
+            if (!SubscriptionManager::checkSampleRate(sampleRate)) {
+                return Error() << "invalid sample rate: " << sampleRate;
+            }
+        }
+
+        if (isGlobalProp(propId)) {
+            continue;
+        }
+
+        // Non-global property.
+        for (int32_t areaId : option.areaIds) {
+            if (auto areaConfig = getAreaConfig(propId, areaId, config); areaConfig == nullptr) {
+                return Error() << StringPrintf("invalid area ID: %" PRId32 " for prop ID: %" PRId32
+                                               ", not listed in config",
+                                               areaId, propId);
+            }
+        }
+    }
+    return {};
+}
+
+ScopedAStatus DefaultVehicleHal::subscribe(const CallbackType& callback,
+                                           const std::vector<SubscribeOptions>& options,
+                                           [[maybe_unused]] int32_t maxSharedMemoryFileCount) {
+    // TODO(b/205189110): Use shared memory file count.
+    if (auto result = checkSubscribeOptions(options); !result.ok()) {
+        ALOGE("subscribe: invalid subscribe options: %s", result.error().message().c_str());
+        return toScopedAStatus(result, StatusCode::INVALID_ARG);
+    }
+
+    std::vector<SubscribeOptions> onChangeSubscriptions;
+    std::vector<SubscribeOptions> continuousSubscriptions;
+    for (const auto& option : options) {
+        int32_t propId = option.propId;
+        // We have already validate config exists.
+        const VehiclePropConfig& config = mConfigsByPropId[propId];
+
+        SubscribeOptions optionCopy = option;
+        // If areaIds is empty, subscribe to all areas.
+        if (optionCopy.areaIds.empty() && !isGlobalProp(propId)) {
+            for (const auto& areaConfig : config.areaConfigs) {
+                optionCopy.areaIds.push_back(areaConfig.areaId);
+            }
+        }
+
+        if (isGlobalProp(propId)) {
+            optionCopy.areaIds = {0};
+        }
+
+        if (config.changeMode == VehiclePropertyChangeMode::CONTINUOUS) {
+            continuousSubscriptions.push_back(std::move(optionCopy));
+        } else {
+            onChangeSubscriptions.push_back(std::move(optionCopy));
+        }
+    }
+    // Since we have already check the sample rates, the following functions must succeed.
+    if (!onChangeSubscriptions.empty()) {
+        mSubscriptionManager->subscribe(callback, onChangeSubscriptions,
+                                        /*isContinuousProperty=*/false);
+    }
+    if (!continuousSubscriptions.empty()) {
+        mSubscriptionManager->subscribe(callback, continuousSubscriptions,
+                                        /*isContinuousProperty=*/true);
+    }
     return ScopedAStatus::ok();
 }
 
-ScopedAStatus DefaultVehicleHal::unsubscribe(const CallbackType&, const std::vector<int32_t>&) {
-    return ScopedAStatus::ok();
+ScopedAStatus DefaultVehicleHal::unsubscribe(const CallbackType& callback,
+                                             const std::vector<int32_t>& propIds) {
+    return toScopedAStatus(mSubscriptionManager->unsubscribe(callback, propIds),
+                           StatusCode::INVALID_ARG);
 }
 
 ScopedAStatus DefaultVehicleHal::returnSharedMemory(const CallbackType&, int64_t) {
diff --git a/automotive/vehicle/aidl/impl/vhal/src/SubscriptionManager.cpp b/automotive/vehicle/aidl/impl/vhal/src/SubscriptionManager.cpp
index dc9a6ce..ff996fe 100644
--- a/automotive/vehicle/aidl/impl/vhal/src/SubscriptionManager.cpp
+++ b/automotive/vehicle/aidl/impl/vhal/src/SubscriptionManager.cpp
@@ -55,6 +55,8 @@
     std::scoped_lock<std::mutex> lockGuard(mLock);
 
     mClientsByPropIdArea.clear();
+    // RecurrentSubscription has reference to mGetValue, so it must be destroyed before mGetValue is
+    // destroyed.
     mSubscriptionsByClient.clear();
 }
 
@@ -80,7 +82,6 @@
     std::scoped_lock<std::mutex> lockGuard(mLock);
 
     std::vector<int64_t> intervals;
-
     for (const auto& option : options) {
         float sampleRate = option.sampleRate;
 
diff --git a/automotive/vehicle/aidl/impl/vhal/test/DefaultVehicleHalTest.cpp b/automotive/vehicle/aidl/impl/vhal/test/DefaultVehicleHalTest.cpp
index d3186fd..d8c9fa2 100644
--- a/automotive/vehicle/aidl/impl/vhal/test/DefaultVehicleHalTest.cpp
+++ b/automotive/vehicle/aidl/impl/vhal/test/DefaultVehicleHalTest.cpp
@@ -56,10 +56,12 @@
 using ::aidl::android::hardware::automotive::vehicle::SetValueResult;
 using ::aidl::android::hardware::automotive::vehicle::SetValueResults;
 using ::aidl::android::hardware::automotive::vehicle::StatusCode;
+using ::aidl::android::hardware::automotive::vehicle::SubscribeOptions;
 using ::aidl::android::hardware::automotive::vehicle::VehicleAreaWindow;
 using ::aidl::android::hardware::automotive::vehicle::VehiclePropConfig;
 using ::aidl::android::hardware::automotive::vehicle::VehiclePropConfigs;
 using ::aidl::android::hardware::automotive::vehicle::VehiclePropErrors;
+using ::aidl::android::hardware::automotive::vehicle::VehiclePropertyChangeMode;
 using ::aidl::android::hardware::automotive::vehicle::VehiclePropValue;
 using ::aidl::android::hardware::automotive::vehicle::VehiclePropValues;
 
@@ -70,12 +72,21 @@
 using ::ndk::ScopedFileDescriptor;
 
 using ::testing::Eq;
+using ::testing::UnorderedElementsAre;
 using ::testing::UnorderedElementsAreArray;
 using ::testing::WhenSortedBy;
 
 constexpr int32_t INVALID_PROP_ID = 0;
 // VehiclePropertyGroup:SYSTEM,VehicleArea:WINDOW,VehiclePropertyType:INT32
 constexpr int32_t INT32_WINDOW_PROP = 10001 + 0x10000000 + 0x03000000 + 0x00400000;
+// VehiclePropertyGroup:SYSTEM,VehicleArea:GLOBAL,VehiclePropertyType:INT32
+constexpr int32_t GLOBAL_ON_CHANGE_PROP = 10002 + 0x10000000 + 0x01000000 + 0x00400000;
+// VehiclePropertyGroup:SYSTEM,VehicleArea:GLOBAL,VehiclePropertyType:INT32
+constexpr int32_t GLOBAL_CONTINUOUS_PROP = 10003 + 0x10000000 + 0x01000000 + 0x00400000;
+// VehiclePropertyGroup:SYSTEM,VehicleArea:WINDOW,VehiclePropertyType:INT32
+constexpr int32_t AREA_ON_CHANGE_PROP = 10004 + 0x10000000 + 0x03000000 + 0x00400000;
+// VehiclePropertyGroup:SYSTEM,VehicleArea:WINDOW,VehiclePropertyType:INT32
+constexpr int32_t AREA_CONTINUOUS_PROP = 10005 + 0x10000000 + 0x03000000 + 0x00400000;
 
 int32_t testInt32VecProp(size_t i) {
     // VehiclePropertyGroup:SYSTEM,VehicleArea:GLOBAL,VehiclePropertyType:INT32_VEC
@@ -137,6 +148,53 @@
             }};
 }
 
+struct SubscribeInvalidOptionsTestCase {
+    std::string name;
+    SubscribeOptions option;
+};
+
+std::vector<SubscribeInvalidOptionsTestCase> getSubscribeInvalidOptionsTestCases() {
+    return {{
+                    .name = "invalid_prop",
+                    .option =
+                            {
+                                    .propId = INVALID_PROP_ID,
+                            },
+            },
+            {
+                    .name = "invalid_area_ID",
+                    .option =
+                            {
+                                    .propId = AREA_ON_CHANGE_PROP,
+                                    .areaIds = {0},
+                            },
+            },
+            {
+                    .name = "invalid_sample_rate",
+                    .option =
+                            {
+                                    .propId = GLOBAL_CONTINUOUS_PROP,
+                                    .sampleRate = 0.0,
+                            },
+            },
+            {
+                    .name = "sample_rate_out_of_range",
+                    .option =
+                            {
+                                    .propId = GLOBAL_CONTINUOUS_PROP,
+                                    .sampleRate = 1000.0,
+                            },
+            },
+            {
+                    .name = "static_property",
+                    .option =
+                            {
+                                    // Default change mode is static.
+                                    .propId = testInt32VecProp(0),
+                            },
+            }};
+}
+
 }  // namespace
 
 class DefaultVehicleHalTest : public ::testing::Test {
@@ -157,6 +215,7 @@
                             },
             });
         }
+        // A property with area config.
         testConfigs.push_back(
                 VehiclePropConfig{.prop = INT32_WINDOW_PROP,
                                   .areaConfigs = {{
@@ -164,6 +223,58 @@
                                           .minInt32Value = 0,
                                           .maxInt32Value = 100,
                                   }}});
+        // A global on-change property.
+        testConfigs.push_back(VehiclePropConfig{
+                .prop = GLOBAL_ON_CHANGE_PROP,
+                .changeMode = VehiclePropertyChangeMode::ON_CHANGE,
+        });
+        // A global continuous property.
+        testConfigs.push_back(VehiclePropConfig{
+                .prop = GLOBAL_CONTINUOUS_PROP,
+                .changeMode = VehiclePropertyChangeMode::CONTINUOUS,
+                .minSampleRate = 0.0,
+                .maxSampleRate = 100.0,
+        });
+        // A per-area on-change property.
+        testConfigs.push_back(VehiclePropConfig{
+                .prop = AREA_ON_CHANGE_PROP,
+                .changeMode = VehiclePropertyChangeMode::ON_CHANGE,
+                .areaConfigs =
+                        {
+                                {
+
+                                        .areaId = toInt(VehicleAreaWindow::ROW_1_LEFT),
+                                        .minInt32Value = 0,
+                                        .maxInt32Value = 100,
+                                },
+                                {
+                                        .areaId = toInt(VehicleAreaWindow::ROW_1_RIGHT),
+                                        .minInt32Value = 0,
+                                        .maxInt32Value = 100,
+                                },
+                        },
+        });
+        // A per-area continuous property.
+        testConfigs.push_back(VehiclePropConfig{
+                .prop = AREA_CONTINUOUS_PROP,
+                .changeMode = VehiclePropertyChangeMode::CONTINUOUS,
+                .minSampleRate = 0.0,
+                .maxSampleRate = 1000.0,
+                .areaConfigs =
+                        {
+                                {
+
+                                        .areaId = toInt(VehicleAreaWindow::ROW_1_LEFT),
+                                        .minInt32Value = 0,
+                                        .maxInt32Value = 100,
+                                },
+                                {
+                                        .areaId = toInt(VehicleAreaWindow::ROW_1_RIGHT),
+                                        .minInt32Value = 0,
+                                        .maxInt32Value = 100,
+                                },
+                        },
+        });
         hardware->setPropertyConfigs(testConfigs);
         mHardwarePtr = hardware.get();
         mVhal = ndk::SharedRefBase::make<DefaultVehicleHal>(std::move(hardware));
@@ -263,7 +374,8 @@
 
     size_t countClients() {
         std::scoped_lock<std::mutex> lockGuard(mVhal->mLock);
-        return mVhal->mGetValuesClients.size() + mVhal->mSetValuesClients.size();
+        return mVhal->mGetValuesClients.size() + mVhal->mSetValuesClients.size() +
+               mVhal->mSubscriptionClients->countClients();
     }
 
   private:
@@ -783,6 +895,446 @@
     ASSERT_FALSE(status.isOk()) << "duplicate request properties in one request must fail";
 }
 
+TEST_F(DefaultVehicleHalTest, testSubscribeUnsubscribe) {
+    std::vector<SubscribeOptions> options = {
+            {
+                    .propId = GLOBAL_ON_CHANGE_PROP,
+            },
+    };
+
+    auto status = getClient()->subscribe(getCallbackClient(), options, 0);
+
+    ASSERT_TRUE(status.isOk()) << "subscribe failed: " << status.getMessage();
+
+    status = getClient()->unsubscribe(getCallbackClient(),
+                                      std::vector<int32_t>({GLOBAL_ON_CHANGE_PROP}));
+
+    ASSERT_TRUE(status.isOk()) << "unsubscribe failed: " << status.getMessage();
+}
+
+TEST_F(DefaultVehicleHalTest, testSubscribeGlobalOnChangeNormal) {
+    std::vector<SubscribeOptions> options = {
+            {
+                    .propId = GLOBAL_ON_CHANGE_PROP,
+            },
+    };
+
+    auto status = getClient()->subscribe(getCallbackClient(), options, 0);
+
+    ASSERT_TRUE(status.isOk()) << "subscribe failed: " << status.getMessage();
+
+    VehiclePropValue testValue{
+            .prop = GLOBAL_ON_CHANGE_PROP,
+            .value.int32Values = {0},
+    };
+    SetValueRequests setValueRequests = {
+            .payloads =
+                    {
+                            SetValueRequest{
+                                    .requestId = 0,
+                                    .value = testValue,
+                            },
+                    },
+    };
+    std::vector<SetValueResult> setValueResults = {{
+            .requestId = 0,
+            .status = StatusCode::OK,
+    }};
+
+    // Set the value to trigger a property change event.
+    getHardware()->addSetValueResponses(setValueResults);
+    status = getClient()->setValues(getCallbackClient(), setValueRequests);
+
+    ASSERT_TRUE(status.isOk()) << "setValues failed: " << status.getMessage();
+
+    auto maybeResults = getCallback()->nextOnPropertyEventResults();
+    ASSERT_TRUE(maybeResults.has_value()) << "no results in callback";
+    ASSERT_THAT(maybeResults.value().payloads, UnorderedElementsAre(testValue))
+            << "results mismatch, expect on change event for the updated value";
+    ASSERT_FALSE(getCallback()->nextOnPropertyEventResults().has_value())
+            << "more results than expected";
+    EXPECT_EQ(countClients(), static_cast<size_t>(1));
+}
+
+TEST_F(DefaultVehicleHalTest, testSubscribeGlobalOnchangeUnrelatedEventIgnored) {
+    std::vector<SubscribeOptions> options = {
+            {
+                    .propId = GLOBAL_ON_CHANGE_PROP,
+            },
+    };
+
+    auto status = getClient()->subscribe(getCallbackClient(), options, 0);
+
+    ASSERT_TRUE(status.isOk()) << "subscribe failed: " << status.getMessage();
+
+    VehiclePropValue testValue{
+            .prop = GLOBAL_CONTINUOUS_PROP,
+            .value.int32Values = {0},
+    };
+
+    // Set the value to trigger a property change event. This event should be ignored because we
+    // have not subscribed to it.
+    getHardware()->addSetValueResponses({{
+            .requestId = 0,
+            .status = StatusCode::OK,
+    }});
+    status = getClient()->setValues(getCallbackClient(),
+                                    {
+                                            .payloads =
+                                                    {
+                                                            SetValueRequest{
+                                                                    .requestId = 0,
+                                                                    .value = testValue,
+                                                            },
+                                                    },
+                                    });
+
+    ASSERT_TRUE(status.isOk()) << "setValues failed: " << status.getMessage();
+
+    ASSERT_FALSE(getCallback()->nextOnPropertyEventResults().has_value())
+            << "must receive no property update event if the property is not subscribed";
+}
+
+TEST_F(DefaultVehicleHalTest, testSubscribeAreaOnChange) {
+    int testAreaId = toInt(VehicleAreaWindow::ROW_1_LEFT);
+    std::vector<SubscribeOptions> options = {
+            {
+                    .propId = AREA_ON_CHANGE_PROP,
+                    .areaIds = {testAreaId},
+            },
+    };
+
+    auto status = getClient()->subscribe(getCallbackClient(), options, 0);
+
+    ASSERT_TRUE(status.isOk()) << "subscribe failed: " << status.getMessage();
+
+    VehiclePropValue testValue{
+            .prop = AREA_ON_CHANGE_PROP,
+            .areaId = testAreaId,
+            .value.int32Values = {0},
+    };
+
+    // Set the value to trigger a property change event.
+    getHardware()->addSetValueResponses({{
+            .requestId = 0,
+            .status = StatusCode::OK,
+    }});
+    status = getClient()->setValues(getCallbackClient(),
+                                    {
+                                            .payloads =
+                                                    {
+                                                            SetValueRequest{
+                                                                    .requestId = 0,
+                                                                    .value = testValue,
+                                                            },
+                                                    },
+                                    });
+
+    ASSERT_TRUE(status.isOk()) << "setValues failed: " << status.getMessage();
+
+    auto maybeResults = getCallback()->nextOnPropertyEventResults();
+    ASSERT_TRUE(maybeResults.has_value()) << "no results in callback";
+    ASSERT_THAT(maybeResults.value().payloads, UnorderedElementsAre(testValue))
+            << "results mismatch, expect on change event for the updated value";
+    ASSERT_FALSE(getCallback()->nextOnPropertyEventResults().has_value())
+            << "more results than expected";
+}
+
+TEST_F(DefaultVehicleHalTest, testSubscribeAreaOnChangeAllAreas) {
+    std::vector<SubscribeOptions> options = {
+            {
+                    .propId = AREA_ON_CHANGE_PROP,
+                    // No areaIds means subscribing to all area IDs.
+                    .areaIds = {},
+            },
+    };
+
+    auto status = getClient()->subscribe(getCallbackClient(), options, 0);
+
+    ASSERT_TRUE(status.isOk()) << "subscribe failed: " << status.getMessage();
+
+    VehiclePropValue testValue1{
+            .prop = AREA_ON_CHANGE_PROP,
+            .areaId = toInt(VehicleAreaWindow::ROW_1_LEFT),
+            .value.int32Values = {0},
+    };
+    VehiclePropValue testValue2{
+            .prop = AREA_ON_CHANGE_PROP,
+            .areaId = toInt(VehicleAreaWindow::ROW_1_RIGHT),
+            .value.int32Values = {0},
+    };
+
+    // Set the values to trigger property change events for two areas.
+    getHardware()->addSetValueResponses({{
+                                                 .requestId = 0,
+                                                 .status = StatusCode::OK,
+                                         },
+                                         {
+                                                 .requestId = 1,
+                                                 .status = StatusCode::OK,
+                                         }});
+    status = getClient()->setValues(getCallbackClient(),
+                                    {
+                                            .payloads =
+                                                    {
+                                                            SetValueRequest{
+                                                                    .requestId = 0,
+                                                                    .value = testValue1,
+                                                            },
+                                                            SetValueRequest{
+                                                                    .requestId = 1,
+                                                                    .value = testValue2,
+                                                            },
+                                                    },
+                                    });
+
+    ASSERT_TRUE(status.isOk()) << "setValues failed: " << status.getMessage();
+
+    auto maybeResults = getCallback()->nextOnPropertyEventResults();
+    ASSERT_TRUE(maybeResults.has_value()) << "no results in callback";
+    ASSERT_THAT(maybeResults.value().payloads, UnorderedElementsAre(testValue1, testValue2))
+            << "results mismatch, expect two on-change events for all updated areas";
+    ASSERT_FALSE(getCallback()->nextOnPropertyEventResults().has_value())
+            << "more results than expected";
+}
+
+TEST_F(DefaultVehicleHalTest, testSubscribeGlobalContinuous) {
+    VehiclePropValue testValue{
+            .prop = GLOBAL_CONTINUOUS_PROP,
+            .value.int32Values = {0},
+    };
+    // Set responses for all the hardware getValues requests.
+    getHardware()->setGetValueResponder(
+            [](std::shared_ptr<const IVehicleHardware::GetValuesCallback> callback,
+               const std::vector<GetValueRequest>& requests) {
+                std::vector<GetValueResult> results;
+                for (auto& request : requests) {
+                    VehiclePropValue prop = request.prop;
+                    prop.value.int32Values = {0};
+                    results.push_back({
+                            .requestId = request.requestId,
+                            .status = StatusCode::OK,
+                            .prop = prop,
+                    });
+                }
+                (*callback)(results);
+                return StatusCode::OK;
+            });
+
+    std::vector<SubscribeOptions> options = {
+            {
+                    .propId = GLOBAL_CONTINUOUS_PROP,
+                    .sampleRate = 20.0,
+            },
+    };
+
+    auto status = getClient()->subscribe(getCallbackClient(), options, 0);
+
+    ASSERT_TRUE(status.isOk()) << "subscribe failed: " << status.getMessage();
+
+    // Sleep for 1s, which should generate ~20 events.
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    // Should trigger about 20 times, check for at least 15 events to be safe.
+    for (size_t i = 0; i < 15; i++) {
+        auto maybeResults = getCallback()->nextOnPropertyEventResults();
+        ASSERT_TRUE(maybeResults.has_value()) << "no results in callback";
+        ASSERT_THAT(maybeResults.value().payloads, UnorderedElementsAre(testValue))
+                << "results mismatch, expect to get the updated value";
+    }
+    EXPECT_EQ(countClients(), static_cast<size_t>(1));
+}
+
+TEST_F(DefaultVehicleHalTest, testSubscribeAreaContinuous) {
+    // Set responses for all the hardware getValues requests.
+    getHardware()->setGetValueResponder(
+            [](std::shared_ptr<const IVehicleHardware::GetValuesCallback> callback,
+               const std::vector<GetValueRequest>& requests) {
+                std::vector<GetValueResult> results;
+                for (auto& request : requests) {
+                    VehiclePropValue prop = request.prop;
+                    prop.value.int32Values = {0};
+                    results.push_back({
+                            .requestId = request.requestId,
+                            .status = StatusCode::OK,
+                            .prop = prop,
+                    });
+                }
+                (*callback)(results);
+                return StatusCode::OK;
+            });
+
+    std::vector<SubscribeOptions> options = {
+            {
+                    .propId = AREA_CONTINUOUS_PROP,
+                    .sampleRate = 20.0,
+                    .areaIds = {toInt(VehicleAreaWindow::ROW_1_LEFT)},
+            },
+            {
+                    .propId = AREA_CONTINUOUS_PROP,
+                    .sampleRate = 10.0,
+                    .areaIds = {toInt(VehicleAreaWindow::ROW_1_RIGHT)},
+            },
+    };
+
+    auto status = getClient()->subscribe(getCallbackClient(), options, 0);
+
+    ASSERT_TRUE(status.isOk()) << "subscribe failed: " << status.getMessage();
+
+    // Sleep for 1s, which should generate ~20 events.
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    std::vector<VehiclePropValue> events;
+    while (true) {
+        auto maybeResults = getCallback()->nextOnPropertyEventResults();
+        if (!maybeResults.has_value()) {
+            break;
+        }
+        for (const auto& value : maybeResults.value().payloads) {
+            events.push_back(value);
+        }
+    }
+
+    size_t leftCount = 0;
+    size_t rightCount = 0;
+
+    for (const auto& event : events) {
+        ASSERT_EQ(event.prop, AREA_CONTINUOUS_PROP);
+        if (event.areaId == toInt(VehicleAreaWindow::ROW_1_LEFT)) {
+            leftCount++;
+            continue;
+        }
+        rightCount++;
+    }
+
+    // Should trigger about 20 times, check for at least 15 events to be safe.
+    ASSERT_GE(leftCount, static_cast<size_t>(15));
+    // Should trigger about 10 times, check for at least 5 events to be safe.
+    ASSERT_GE(rightCount, static_cast<size_t>(5));
+}
+
+TEST_F(DefaultVehicleHalTest, testUnsubscribeOnChange) {
+    std::vector<SubscribeOptions> options = {
+            {
+                    .propId = GLOBAL_ON_CHANGE_PROP,
+            },
+    };
+
+    auto status = getClient()->subscribe(getCallbackClient(), options, 0);
+
+    ASSERT_TRUE(status.isOk()) << "subscribe failed: " << status.getMessage();
+
+    status = getClient()->unsubscribe(getCallbackClient(),
+                                      std::vector<int32_t>({GLOBAL_ON_CHANGE_PROP}));
+
+    ASSERT_TRUE(status.isOk()) << "unsubscribe failed: " << status.getMessage();
+
+    VehiclePropValue testValue{
+            .prop = GLOBAL_ON_CHANGE_PROP,
+            .value.int32Values = {0},
+    };
+
+    // Set the value to trigger a property change event.
+    getHardware()->addSetValueResponses({{
+            .requestId = 0,
+            .status = StatusCode::OK,
+    }});
+    status = getClient()->setValues(getCallbackClient(),
+                                    {
+                                            .payloads =
+                                                    {
+                                                            SetValueRequest{
+                                                                    .requestId = 0,
+                                                                    .value = testValue,
+                                                            },
+                                                    },
+                                    });
+
+    ASSERT_TRUE(status.isOk()) << "setValues failed: " << status.getMessage();
+
+    ASSERT_FALSE(getCallback()->nextOnPropertyEventResults().has_value())
+            << "No property event should be generated after unsubscription";
+}
+
+TEST_F(DefaultVehicleHalTest, testUnsubscribeContinuous) {
+    VehiclePropValue testValue{
+            .prop = GLOBAL_CONTINUOUS_PROP,
+            .value.int32Values = {0},
+    };
+    // Set responses for all the hardware getValues requests.
+    getHardware()->setGetValueResponder(
+            [](std::shared_ptr<const IVehicleHardware::GetValuesCallback> callback,
+               const std::vector<GetValueRequest>& requests) {
+                std::vector<GetValueResult> results;
+                for (auto& request : requests) {
+                    VehiclePropValue prop = request.prop;
+                    prop.value.int32Values = {0};
+                    results.push_back({
+                            .requestId = request.requestId,
+                            .status = StatusCode::OK,
+                            .prop = prop,
+                    });
+                }
+                (*callback)(results);
+                return StatusCode::OK;
+            });
+
+    std::vector<SubscribeOptions> options = {
+            {
+                    .propId = GLOBAL_CONTINUOUS_PROP,
+                    .sampleRate = 20.0,
+            },
+    };
+
+    auto status = getClient()->subscribe(getCallbackClient(), options, 0);
+
+    ASSERT_TRUE(status.isOk()) << "subscribe failed: " << status.getMessage();
+
+    status = getClient()->unsubscribe(getCallbackClient(),
+                                      std::vector<int32_t>({GLOBAL_CONTINUOUS_PROP}));
+
+    ASSERT_TRUE(status.isOk()) << "unsubscribe failed: " << status.getMessage();
+
+    // Clear existing events.
+    while (getCallback()->nextOnPropertyEventResults().has_value()) {
+        // Do nothing.
+    }
+
+    // Wait for a while, make sure no new events are generated.
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+    ASSERT_FALSE(getCallback()->nextOnPropertyEventResults().has_value())
+            << "No property event should be generated after unsubscription";
+}
+
+class SubscribeInvalidOptionsTest
+    : public DefaultVehicleHalTest,
+      public testing::WithParamInterface<SubscribeInvalidOptionsTestCase> {};
+
+INSTANTIATE_TEST_SUITE_P(
+        SubscribeInvalidOptionsTests, SubscribeInvalidOptionsTest,
+        ::testing::ValuesIn(getSubscribeInvalidOptionsTestCases()),
+        [](const testing::TestParamInfo<SubscribeInvalidOptionsTest::ParamType>& info) {
+            return info.param.name;
+        });
+
+TEST_P(SubscribeInvalidOptionsTest, testSubscribeInvalidRequest) {
+    std::vector<SubscribeOptions> options = {GetParam().option};
+
+    auto status = getClient()->subscribe(getCallbackClient(), options, 0);
+
+    ASSERT_FALSE(status.isOk()) << "invalid subscribe options must fail";
+    ASSERT_EQ(status.getServiceSpecificError(), toInt(StatusCode::INVALID_ARG));
+}
+
+TEST_F(DefaultVehicleHalTest, testUnsubscribeFailure) {
+    auto status = getClient()->unsubscribe(getCallbackClient(),
+                                           std::vector<int32_t>({GLOBAL_ON_CHANGE_PROP}));
+
+    ASSERT_FALSE(status.isOk()) << "unsubscribe to a not-subscribed property must fail";
+    ASSERT_EQ(status.getServiceSpecificError(), toInt(StatusCode::INVALID_ARG));
+}
+
 }  // namespace vehicle
 }  // namespace automotive
 }  // namespace hardware
diff --git a/automotive/vehicle/aidl/impl/vhal/test/MockVehicleHardware.cpp b/automotive/vehicle/aidl/impl/vhal/test/MockVehicleHardware.cpp
index 7d992af..eec32dd 100644
--- a/automotive/vehicle/aidl/impl/vhal/test/MockVehicleHardware.cpp
+++ b/automotive/vehicle/aidl/impl/vhal/test/MockVehicleHardware.cpp
@@ -64,6 +64,9 @@
 StatusCode MockVehicleHardware::getValues(std::shared_ptr<const GetValuesCallback> callback,
                                           const std::vector<GetValueRequest>& requests) const {
     std::scoped_lock<std::mutex> lockGuard(mLock);
+    if (mGetValueResponder != nullptr) {
+        return mGetValueResponder(callback, requests);
+    }
     return handleRequestsLocked(__func__, callback, requests, &mGetValueRequests,
                                 &mGetValueResponses);
 }
@@ -104,6 +107,13 @@
     mSetValueResponses.push_back(responses);
 }
 
+void MockVehicleHardware::setGetValueResponder(
+        std::function<StatusCode(std::shared_ptr<const GetValuesCallback>,
+                                 const std::vector<GetValueRequest>&)>&& responder) {
+    std::scoped_lock<std::mutex> lockGuard(mLock);
+    mGetValueResponder = responder;
+}
+
 std::vector<GetValueRequest> MockVehicleHardware::nextGetValueRequests() {
     std::scoped_lock<std::mutex> lockGuard(mLock);
     std::optional<std::vector<GetValueRequest>> request = pop(mGetValueRequests);
diff --git a/automotive/vehicle/aidl/impl/vhal/test/MockVehicleHardware.h b/automotive/vehicle/aidl/impl/vhal/test/MockVehicleHardware.h
index 283d1f9..0844de1 100644
--- a/automotive/vehicle/aidl/impl/vhal/test/MockVehicleHardware.h
+++ b/automotive/vehicle/aidl/impl/vhal/test/MockVehicleHardware.h
@@ -66,6 +66,12 @@
     void addSetValueResponses(
             const std::vector<::aidl::android::hardware::automotive::vehicle::SetValueResult>&
                     responses);
+    void setGetValueResponder(
+            std::function<::aidl::android::hardware::automotive::vehicle::StatusCode(
+                    std::shared_ptr<const GetValuesCallback>,
+                    const std::vector<
+                            ::aidl::android::hardware::automotive::vehicle::GetValueRequest>&)>&&
+                    responder);
     std::vector<::aidl::android::hardware::automotive::vehicle::GetValueRequest>
     nextGetValueRequests();
     std::vector<::aidl::android::hardware::automotive::vehicle::SetValueRequest>
@@ -92,6 +98,10 @@
             mStatusByFunctions GUARDED_BY(mLock);
     int64_t mSleepTime GUARDED_BY(mLock) = 0;
     std::unique_ptr<const PropertyChangeCallback> mPropertyChangeCallback GUARDED_BY(mLock);
+    std::function<::aidl::android::hardware::automotive::vehicle::StatusCode(
+            std::shared_ptr<const GetValuesCallback>,
+            const std::vector<::aidl::android::hardware::automotive::vehicle::GetValueRequest>&)>
+            mGetValueResponder GUARDED_BY(mLock);
 
     template <class ResultType>
     ::aidl::android::hardware::automotive::vehicle::StatusCode returnResponse(