Add subscription manager.
Add a class to manage VHAL subscription. It supports subscribing
and unsubscribing to properties. For continuous property, it uses
recurrent timer to recurrently calls the registered function. For
on-change property, it would return all subscribed clients for a
given property.
Test: atest DefaultVehicleHalTest
Bug: 200737967
Change-Id: I3e1a0401fd465dc31fe08ea77d5a6651fa7bbfaf
diff --git a/automotive/vehicle/aidl/impl/utils/common/include/VehicleHalTypes.h b/automotive/vehicle/aidl/impl/utils/common/include/VehicleHalTypes.h
index 013d177..a7fcdcf 100644
--- a/automotive/vehicle/aidl/impl/utils/common/include/VehicleHalTypes.h
+++ b/automotive/vehicle/aidl/impl/utils/common/include/VehicleHalTypes.h
@@ -37,6 +37,7 @@
#include <aidl/android/hardware/automotive/vehicle/SetValueResult.h>
#include <aidl/android/hardware/automotive/vehicle/SetValueResults.h>
#include <aidl/android/hardware/automotive/vehicle/StatusCode.h>
+#include <aidl/android/hardware/automotive/vehicle/SubscribeOptions.h>
#include <aidl/android/hardware/automotive/vehicle/VehicleApPowerStateReport.h>
#include <aidl/android/hardware/automotive/vehicle/VehicleApPowerStateReq.h>
#include <aidl/android/hardware/automotive/vehicle/VehicleArea.h>
diff --git a/automotive/vehicle/aidl/impl/vhal/Android.bp b/automotive/vehicle/aidl/impl/vhal/Android.bp
index eaa23dc..0132e6f 100644
--- a/automotive/vehicle/aidl/impl/vhal/Android.bp
+++ b/automotive/vehicle/aidl/impl/vhal/Android.bp
@@ -58,6 +58,7 @@
"src/DefaultVehicleHal.cpp",
"src/PendingRequestPool.cpp",
"src/RecurrentTimer.cpp",
+ "src/SubscriptionManager.cpp",
],
static_libs: [
"VehicleHalUtils",
diff --git a/automotive/vehicle/aidl/impl/vhal/include/SubscriptionManager.h b/automotive/vehicle/aidl/impl/vhal/include/SubscriptionManager.h
new file mode 100644
index 0000000..28809c6
--- /dev/null
+++ b/automotive/vehicle/aidl/impl/vhal/include/SubscriptionManager.h
@@ -0,0 +1,151 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef android_hardware_automotive_vehicle_aidl_impl_vhal_include_SubscriptionManager_H_
+#define android_hardware_automotive_vehicle_aidl_impl_vhal_include_SubscriptionManager_H_
+
+#include "RecurrentTimer.h"
+
+#include <VehicleHalTypes.h>
+
+#include <aidl/android/hardware/automotive/vehicle/IVehicleCallback.h>
+#include <android-base/result.h>
+#include <android-base/thread_annotations.h>
+
+#include <mutex>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+namespace android {
+namespace hardware {
+namespace automotive {
+namespace vehicle {
+
+// A thread-safe subscription manager that manages all VHAL subscriptions.
+class SubscriptionManager final {
+ public:
+ using CallbackType =
+ std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>;
+ using GetValueFunc = std::function<void(
+ const CallbackType& callback,
+ const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue& value)>;
+
+ explicit SubscriptionManager(GetValueFunc&& action);
+ ~SubscriptionManager();
+
+ // Subscribes to properties according to {@code SubscribeOptions}. Note that all option must
+ // contain non-empty areaIds field, which contains all area IDs to subscribe. As a result,
+ // the options here is different from the options passed from VHAL client.
+ // Returns error if any of the subscribe options is not valid. If error is returned, no
+ // properties would be subscribed.
+ // Returns ok if all the options are parsed correctly and all the properties are subscribed.
+ ::android::base::Result<void> subscribe(
+ const CallbackType& callback,
+ const std::vector<::aidl::android::hardware::automotive::vehicle::SubscribeOptions>&
+ options,
+ bool isContinuousProperty);
+
+ // Unsubscribes from the properties for the callback.
+ // Returns error if the callback was not subscribed before or one of the given property was not
+ // subscribed. If error is returned, no property would be unsubscribed.
+ // Returns ok if all the requested properties for the callback are unsubscribed.
+ ::android::base::Result<void> unsubscribe(const CallbackType& callback,
+ const std::vector<int32_t>& propIds);
+
+ // Unsubscribes to all the properties for the callback.
+ // Returns error if the callback was not subscribed before. If error is returned, no property
+ // would be unsubscribed.
+ // Returns ok if all the properties for the callback are unsubscribed.
+ ::android::base::Result<void> unsubscribe(const CallbackType& callback);
+
+ // For a list of updated properties, returns a map that maps clients subscribing to
+ // the updated properties to a list of updated values. This would only return on-change property
+ // clients that should be informed for the given updated values.
+ std::unordered_map<
+ std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>,
+ std::vector<const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue*>>
+ getSubscribedClients(
+ const std::vector<::aidl::android::hardware::automotive::vehicle::VehiclePropValue>&
+ updatedValues);
+
+ // Checks whether the sample rate is valid.
+ static bool checkSampleRate(float sampleRate);
+
+ private:
+ struct PropIdAreaId {
+ int32_t propId;
+ int32_t areaId;
+
+ bool operator==(const PropIdAreaId& other) const;
+ };
+
+ struct PropIdAreaIdHash {
+ size_t operator()(const PropIdAreaId& propIdAreaId) const;
+ };
+
+ // A class to represent a registered subscription.
+ class Subscription {
+ public:
+ Subscription() = default;
+
+ Subscription(const Subscription&) = delete;
+
+ virtual ~Subscription() = default;
+
+ virtual bool isOnChange();
+ };
+
+ // A subscription for OnContinuous property. The registered action would be called recurrently
+ // until this class is destructed.
+ class RecurrentSubscription final : public Subscription {
+ public:
+ explicit RecurrentSubscription(std::shared_ptr<RecurrentTimer> timer,
+ std::function<void()>&& action, int64_t interval);
+ ~RecurrentSubscription();
+
+ bool isOnChange() override;
+
+ private:
+ std::shared_ptr<std::function<void()>> mAction;
+ std::shared_ptr<RecurrentTimer> mTimer;
+ };
+
+ // A subscription for OnChange property.
+ class OnChangeSubscription final : public Subscription {
+ public:
+ bool isOnChange() override;
+ };
+
+ mutable std::mutex mLock;
+ std::unordered_map<PropIdAreaId, std::unordered_set<CallbackType>, PropIdAreaIdHash>
+ mClientsByPropIdArea GUARDED_BY(mLock);
+ std::unordered_map<CallbackType, std::unordered_map<PropIdAreaId, std::unique_ptr<Subscription>,
+ PropIdAreaIdHash>>
+ mSubscriptionsByClient GUARDED_BY(mLock);
+ // RecurrentTimer is thread-safe.
+ std::shared_ptr<RecurrentTimer> mTimer;
+ const GetValueFunc mGetValue;
+
+ static ::android::base::Result<int64_t> getInterval(float sampleRate);
+};
+
+} // namespace vehicle
+} // namespace automotive
+} // namespace hardware
+} // namespace android
+
+#endif // android_hardware_automotive_vehicle_aidl_impl_vhal_include_SubscriptionManager_H_
diff --git a/automotive/vehicle/aidl/impl/vhal/src/SubscriptionManager.cpp b/automotive/vehicle/aidl/impl/vhal/src/SubscriptionManager.cpp
new file mode 100644
index 0000000..dc9a6ce
--- /dev/null
+++ b/automotive/vehicle/aidl/impl/vhal/src/SubscriptionManager.cpp
@@ -0,0 +1,240 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SubscriptionManager.h"
+
+#include <math/HashCombine.h>
+#include <utils/Log.h>
+
+namespace android {
+namespace hardware {
+namespace automotive {
+namespace vehicle {
+
+namespace {
+
+constexpr float ONE_SECOND_IN_NANO = 1000000000.;
+
+} // namespace
+
+using ::aidl::android::hardware::automotive::vehicle::IVehicleCallback;
+using ::aidl::android::hardware::automotive::vehicle::SubscribeOptions;
+using ::aidl::android::hardware::automotive::vehicle::VehiclePropValue;
+using ::android::base::Error;
+using ::android::base::Result;
+using ::ndk::ScopedAStatus;
+
+bool SubscriptionManager::PropIdAreaId::operator==(const PropIdAreaId& other) const {
+ return areaId == other.areaId && propId == other.propId;
+}
+
+size_t SubscriptionManager::PropIdAreaIdHash::operator()(PropIdAreaId const& propIdAreaId) const {
+ size_t res = 0;
+ hashCombine(res, propIdAreaId.propId);
+ hashCombine(res, propIdAreaId.areaId);
+ return res;
+}
+
+SubscriptionManager::SubscriptionManager(GetValueFunc&& action)
+ : mTimer(std::make_shared<RecurrentTimer>()), mGetValue(std::move(action)) {}
+
+SubscriptionManager::~SubscriptionManager() {
+ std::scoped_lock<std::mutex> lockGuard(mLock);
+
+ mClientsByPropIdArea.clear();
+ mSubscriptionsByClient.clear();
+}
+
+bool SubscriptionManager::checkSampleRate(float sampleRate) {
+ return getInterval(sampleRate).ok();
+}
+
+Result<int64_t> SubscriptionManager::getInterval(float sampleRate) {
+ int64_t interval = 0;
+ if (sampleRate <= 0) {
+ return Error() << "invalid sample rate, must be a positive number";
+ }
+ if (sampleRate <= (ONE_SECOND_IN_NANO / static_cast<float>(INT64_MAX))) {
+ return Error() << "invalid sample rate: " << sampleRate << ", too small";
+ }
+ interval = static_cast<int64_t>(ONE_SECOND_IN_NANO / sampleRate);
+ return interval;
+}
+
+Result<void> SubscriptionManager::subscribe(const std::shared_ptr<IVehicleCallback>& callback,
+ const std::vector<SubscribeOptions>& options,
+ bool isContinuousProperty) {
+ std::scoped_lock<std::mutex> lockGuard(mLock);
+
+ std::vector<int64_t> intervals;
+
+ for (const auto& option : options) {
+ float sampleRate = option.sampleRate;
+
+ if (isContinuousProperty) {
+ auto intervalResult = getInterval(sampleRate);
+ if (!intervalResult.ok()) {
+ return intervalResult.error();
+ }
+ intervals.push_back(intervalResult.value());
+ }
+
+ if (option.areaIds.empty()) {
+ ALOGE("area IDs to subscribe must not be empty");
+ return Error() << "area IDs to subscribe must not be empty";
+ }
+ }
+
+ size_t intervalIndex = 0;
+ for (const auto& option : options) {
+ int32_t propId = option.propId;
+ const std::vector<int32_t>& areaIds = option.areaIds;
+ int64_t interval = 0;
+ if (isContinuousProperty) {
+ interval = intervals[intervalIndex];
+ intervalIndex++;
+ }
+ for (int32_t areaId : areaIds) {
+ PropIdAreaId propIdAreaId = {
+ .propId = propId,
+ .areaId = areaId,
+ };
+ if (isContinuousProperty) {
+ VehiclePropValue propValueRequest{
+ .prop = propId,
+ .areaId = areaId,
+ };
+ mSubscriptionsByClient[callback][propIdAreaId] =
+ std::make_unique<RecurrentSubscription>(
+ mTimer,
+ [this, callback, propValueRequest] {
+ mGetValue(callback, propValueRequest);
+ },
+ interval);
+ } else {
+ mSubscriptionsByClient[callback][propIdAreaId] =
+ std::make_unique<OnChangeSubscription>();
+ }
+ mClientsByPropIdArea[propIdAreaId].insert(callback);
+ }
+ }
+ return {};
+}
+
+Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCallback>& callback,
+ const std::vector<int32_t>& propIds) {
+ std::scoped_lock<std::mutex> lockGuard(mLock);
+
+ if (mSubscriptionsByClient.find(callback) == mSubscriptionsByClient.end()) {
+ return Error() << "No property was subscribed for the callback";
+ }
+ std::unordered_set<int32_t> subscribedPropIds;
+ for (auto const& [propIdAreaId, _] : mSubscriptionsByClient[callback]) {
+ subscribedPropIds.insert(propIdAreaId.propId);
+ }
+
+ for (int32_t propId : propIds) {
+ if (subscribedPropIds.find(propId) == subscribedPropIds.end()) {
+ return Error() << "property ID: " << propId << " is not subscribed";
+ }
+ }
+
+ auto& subscriptions = mSubscriptionsByClient[callback];
+ auto it = subscriptions.begin();
+ while (it != subscriptions.end()) {
+ int32_t propId = it->first.propId;
+ if (std::find(propIds.begin(), propIds.end(), propId) != propIds.end()) {
+ auto& clients = mClientsByPropIdArea[it->first];
+ clients.erase(callback);
+ if (clients.empty()) {
+ mClientsByPropIdArea.erase(it->first);
+ }
+ it = subscriptions.erase(it);
+ } else {
+ it++;
+ }
+ }
+ if (subscriptions.empty()) {
+ mSubscriptionsByClient.erase(callback);
+ }
+ return {};
+}
+
+Result<void> SubscriptionManager::unsubscribe(const std::shared_ptr<IVehicleCallback>& callback) {
+ std::scoped_lock<std::mutex> lockGuard(mLock);
+
+ if (mSubscriptionsByClient.find(callback) == mSubscriptionsByClient.end()) {
+ return Error() << "No property was subscribed for the callback";
+ }
+
+ auto& subscriptions = mSubscriptionsByClient[callback];
+ for (auto const& [propIdAreaId, _] : subscriptions) {
+ auto& clients = mClientsByPropIdArea[propIdAreaId];
+ clients.erase(callback);
+ if (clients.empty()) {
+ mClientsByPropIdArea.erase(propIdAreaId);
+ }
+ }
+ mSubscriptionsByClient.erase(callback);
+ return {};
+}
+
+std::unordered_map<std::shared_ptr<IVehicleCallback>, std::vector<const VehiclePropValue*>>
+SubscriptionManager::getSubscribedClients(const std::vector<VehiclePropValue>& updatedValues) {
+ std::scoped_lock<std::mutex> lockGuard(mLock);
+ std::unordered_map<std::shared_ptr<IVehicleCallback>, std::vector<const VehiclePropValue*>>
+ clients;
+
+ for (const auto& value : updatedValues) {
+ PropIdAreaId propIdAreaId{
+ .propId = value.prop,
+ .areaId = value.areaId,
+ };
+ if (mClientsByPropIdArea.find(propIdAreaId) == mClientsByPropIdArea.end()) {
+ continue;
+ }
+ for (const auto& client : mClientsByPropIdArea[propIdAreaId]) {
+ if (!mSubscriptionsByClient[client][propIdAreaId]->isOnChange()) {
+ continue;
+ }
+ clients[client].push_back(&value);
+ }
+ }
+ return clients;
+}
+
+SubscriptionManager::RecurrentSubscription::RecurrentSubscription(
+ std::shared_ptr<RecurrentTimer> timer, std::function<void()>&& action, int64_t interval)
+ : mAction(std::make_shared<std::function<void()>>(action)), mTimer(timer) {
+ mTimer->registerTimerCallback(interval, mAction);
+}
+
+SubscriptionManager::RecurrentSubscription::~RecurrentSubscription() {
+ mTimer->unregisterTimerCallback(mAction);
+}
+
+bool SubscriptionManager::RecurrentSubscription::isOnChange() {
+ return false;
+}
+
+bool SubscriptionManager::OnChangeSubscription::isOnChange() {
+ return true;
+}
+
+} // namespace vehicle
+} // namespace automotive
+} // namespace hardware
+} // namespace android
diff --git a/automotive/vehicle/aidl/impl/vhal/test/SubscriptionManagerTest.cpp b/automotive/vehicle/aidl/impl/vhal/test/SubscriptionManagerTest.cpp
new file mode 100644
index 0000000..fa08d6c
--- /dev/null
+++ b/automotive/vehicle/aidl/impl/vhal/test/SubscriptionManagerTest.cpp
@@ -0,0 +1,483 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SubscriptionManager.h"
+
+#include <VehicleHalTypes.h>
+
+#include <aidl/android/hardware/automotive/vehicle/BnVehicleCallback.h>
+#include <android-base/thread_annotations.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <float.h>
+#include <chrono>
+#include <list>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+namespace android {
+namespace hardware {
+namespace automotive {
+namespace vehicle {
+
+using ::aidl::android::hardware::automotive::vehicle::BnVehicleCallback;
+using ::aidl::android::hardware::automotive::vehicle::GetValueResults;
+using ::aidl::android::hardware::automotive::vehicle::IVehicleCallback;
+using ::aidl::android::hardware::automotive::vehicle::SetValueResults;
+using ::aidl::android::hardware::automotive::vehicle::SubscribeOptions;
+using ::aidl::android::hardware::automotive::vehicle::VehiclePropErrors;
+using ::aidl::android::hardware::automotive::vehicle::VehiclePropValue;
+using ::aidl::android::hardware::automotive::vehicle::VehiclePropValues;
+using ::ndk::ScopedAStatus;
+using ::testing::ElementsAre;
+using ::testing::WhenSorted;
+
+class PropertyCallback final : public BnVehicleCallback {
+ public:
+ ScopedAStatus onGetValues(const GetValueResults&) override { return ScopedAStatus::ok(); }
+
+ ScopedAStatus onSetValues(const SetValueResults&) override { return ScopedAStatus::ok(); }
+
+ ScopedAStatus onPropertyEvent(const VehiclePropValues& values, int32_t) override {
+ std::scoped_lock<std::mutex> lockGuard(mLock);
+ for (const auto& value : values.payloads) {
+ mEvents.push_back(value);
+ }
+ return ScopedAStatus::ok();
+ }
+
+ ScopedAStatus onPropertySetError(const VehiclePropErrors&) override {
+ return ScopedAStatus::ok();
+ }
+
+ // Test functions.
+ std::list<VehiclePropValue> getEvents() {
+ std::scoped_lock<std::mutex> lockGuard(mLock);
+ return mEvents;
+ }
+
+ void clearEvents() {
+ std::scoped_lock<std::mutex> lockGuard(mLock);
+ mEvents.clear();
+ }
+
+ private:
+ std::mutex mLock;
+ std::list<VehiclePropValue> mEvents GUARDED_BY(mLock);
+};
+
+class SubscriptionManagerTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ mManager = std::make_unique<SubscriptionManager>(
+ [](const std::shared_ptr<IVehicleCallback>& callback,
+ const VehiclePropValue& value) {
+ callback->onPropertyEvent(
+ VehiclePropValues{
+ .payloads = {value},
+ },
+ 0);
+ });
+ mCallback = ::ndk::SharedRefBase::make<PropertyCallback>();
+ mCallbackClient = IVehicleCallback::fromBinder(mCallback->asBinder());
+ }
+
+ SubscriptionManager* getManager() { return mManager.get(); }
+
+ std::shared_ptr<IVehicleCallback> getCallbackClient() { return mCallbackClient; }
+
+ PropertyCallback* getCallback() { return mCallback.get(); }
+
+ std::list<VehiclePropValue> getEvents() { return getCallback()->getEvents(); }
+
+ void clearEvents() { return getCallback()->clearEvents(); }
+
+ private:
+ std::unique_ptr<SubscriptionManager> mManager;
+ std::shared_ptr<PropertyCallback> mCallback;
+ std::shared_ptr<IVehicleCallback> mCallbackClient;
+};
+
+TEST_F(SubscriptionManagerTest, testSubscribeGlobalContinuous) {
+ std::vector<SubscribeOptions> options = {{
+ .propId = 0,
+ .areaIds = {0},
+ .sampleRate = 10.0,
+ }};
+
+ auto result = getManager()->subscribe(getCallbackClient(), options, true);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ // Theoretically trigger 10 times, but check for at least 9 times to be stable.
+ ASSERT_GE(getEvents().size(), static_cast<size_t>(9));
+ EXPECT_EQ(getEvents().back().prop, 0);
+ EXPECT_EQ(getEvents().back().areaId, 0);
+}
+
+TEST_F(SubscriptionManagerTest, testSubscribeMultiplePropsGlobalContinuous) {
+ std::vector<SubscribeOptions> options = {{
+ .propId = 0,
+ .areaIds = {0},
+ .sampleRate = 10.0,
+ },
+ {
+ .propId = 1,
+ .areaIds = {0},
+ .sampleRate = 20.0,
+ }};
+
+ auto result = getManager()->subscribe(getCallbackClient(), options, true);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ size_t event0Count = 0;
+ size_t event1Count = 0;
+
+ for (const auto& event : getEvents()) {
+ if (event.prop == 0) {
+ event0Count++;
+ } else {
+ event1Count++;
+ }
+ }
+
+ // Theoretically trigger 10 times, but check for at least 9 times to be stable.
+ EXPECT_GE(event0Count, static_cast<size_t>(9));
+ // Theoretically trigger 20 times, but check for at least 15 times to be stable.
+ EXPECT_GE(event1Count, static_cast<size_t>(15));
+}
+
+TEST_F(SubscriptionManagerTest, testOverrideSubscriptionContinuous) {
+ std::vector<SubscribeOptions> options = {{
+ .propId = 0,
+ .areaIds = {0},
+ .sampleRate = 20.0,
+ }};
+
+ auto result = getManager()->subscribe(getCallbackClient(), options, true);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+
+ // Override sample rate to be 10.0.
+ options[0].sampleRate = 10.0;
+ result = getManager()->subscribe(getCallbackClient(), options, true);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ // Theoretically trigger 10 times, but check for at least 9 times to be stable.
+ EXPECT_GE(getEvents().size(), static_cast<size_t>(9));
+ EXPECT_LE(getEvents().size(), static_cast<size_t>(11));
+}
+
+TEST_F(SubscriptionManagerTest, testSubscribeMultipleAreasContinuous) {
+ std::vector<SubscribeOptions> options = {
+ {
+ .propId = 0,
+ .areaIds = {0, 1},
+ .sampleRate = 10.0,
+ },
+ };
+
+ auto result = getManager()->subscribe(getCallbackClient(), options, true);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ size_t area0Count = 0;
+ size_t area1Count = 0;
+
+ for (const auto& event : getEvents()) {
+ if (event.areaId == 0) {
+ area0Count++;
+ } else {
+ area1Count++;
+ }
+ }
+
+ // Theoretically trigger 10 times, but check for at least 9 times to be stable.
+ EXPECT_GE(area0Count, static_cast<size_t>(9));
+ // Theoretically trigger 10 times, but check for at least 9 times to be stable.
+ EXPECT_GE(area1Count, static_cast<size_t>(9));
+}
+
+TEST_F(SubscriptionManagerTest, testUnsubscribeGlobalContinuous) {
+ std::vector<SubscribeOptions> options = {{
+ .propId = 0,
+ .areaIds = {0},
+ .sampleRate = 10.0,
+ }};
+
+ auto result = getManager()->subscribe(getCallbackClient(), options, true);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+
+ result = getManager()->unsubscribe(getCallbackClient());
+ ASSERT_TRUE(result.ok()) << "failed to unsubscribe: " << result.error().message();
+
+ clearEvents();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
+
+ // Theoretically trigger 10 times, but check for at least 9 times to be stable.
+ ASSERT_TRUE(getEvents().empty());
+}
+
+TEST_F(SubscriptionManagerTest, testUnsubscribeMultipleAreas) {
+ std::vector<SubscribeOptions> options = {
+ {
+ .propId = 0,
+ .areaIds = {0, 1, 2, 3, 4},
+ .sampleRate = 10.0,
+ },
+ {
+ .propId = 1,
+ .areaIds = {0},
+ .sampleRate = 10.0,
+ },
+ };
+
+ auto result = getManager()->subscribe(getCallbackClient(), options, true);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+
+ result = getManager()->unsubscribe(getCallbackClient(), std::vector<int32_t>({0}));
+ ASSERT_TRUE(result.ok()) << "failed to unsubscribe: " << result.error().message();
+
+ clearEvents();
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ // Theoretically trigger 10 times, but check for at least 9 times to be stable.
+ EXPECT_GE(getEvents().size(), static_cast<size_t>(9));
+
+ for (const auto& event : getEvents()) {
+ EXPECT_EQ(event.prop, 1);
+ }
+}
+
+TEST_F(SubscriptionManagerTest, testUnsubscribeByCallback) {
+ std::vector<SubscribeOptions> options = {
+ {
+ .propId = 0,
+ .areaIds = {0, 1, 2, 3, 4},
+ .sampleRate = 10.0,
+ },
+ {
+ .propId = 1,
+ .areaIds = {0},
+ .sampleRate = 10.0,
+ },
+ };
+
+ auto result = getManager()->subscribe(getCallbackClient(), options, true);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+
+ result = getManager()->unsubscribe(getCallbackClient());
+ ASSERT_TRUE(result.ok()) << "failed to unsubscribe: " << result.error().message();
+
+ clearEvents();
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ EXPECT_TRUE(getEvents().empty());
+}
+
+TEST_F(SubscriptionManagerTest, testUnsubscribeFailure) {
+ std::vector<SubscribeOptions> options = {
+ {
+ .propId = 0,
+ .areaIds = {0, 1, 2, 3, 4},
+ },
+ {
+ .propId = 1,
+ .areaIds = {0},
+ },
+ };
+
+ auto result = getManager()->subscribe(getCallbackClient(), options, false);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+
+ // Property ID: 2 was not subscribed.
+ result = getManager()->unsubscribe(getCallbackClient(), std::vector<int32_t>({0, 1, 2}));
+ ASSERT_FALSE(result.ok()) << "unsubscribe an unsubscribed property must fail";
+
+ // Since property 0 and property 1 was not unsubscribed successfully, we should be able to
+ // unsubscribe them again.
+ result = getManager()->unsubscribe(getCallbackClient(), std::vector<int32_t>({0, 1}));
+ ASSERT_TRUE(result.ok()) << "a failed unsubscription must not unsubscribe any properties"
+ << result.error().message();
+}
+
+TEST_F(SubscriptionManagerTest, testSubscribeOnchange) {
+ std::vector<SubscribeOptions> options1 = {
+ {
+ .propId = 0,
+ .areaIds = {0, 1},
+ },
+ {
+ .propId = 1,
+ .areaIds = {0},
+ },
+ };
+ std::vector<SubscribeOptions> options2 = {
+ {
+ .propId = 0,
+ .areaIds = {0},
+ },
+ };
+
+ std::shared_ptr<IVehicleCallback> client1 = IVehicleCallback::fromBinder(
+ ::ndk::SharedRefBase::make<PropertyCallback>()->asBinder());
+ std::shared_ptr<IVehicleCallback> client2 = IVehicleCallback::fromBinder(
+ ::ndk::SharedRefBase::make<PropertyCallback>()->asBinder());
+ auto result = getManager()->subscribe(client1, options1, false);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+ result = getManager()->subscribe(client2, options2, false);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+
+ std::vector<VehiclePropValue> updatedValues = {
+ {
+ .prop = 0,
+ .areaId = 0,
+ },
+ {
+ .prop = 0,
+ .areaId = 1,
+ },
+ {
+ .prop = 1,
+ .areaId = 0,
+ },
+ {
+ .prop = 1,
+ .areaId = 1,
+ },
+ };
+ auto clients = getManager()->getSubscribedClients(updatedValues);
+
+ ASSERT_THAT(clients[client1],
+ WhenSorted(ElementsAre(&updatedValues[0], &updatedValues[1], &updatedValues[2])));
+ ASSERT_THAT(clients[client2], ElementsAre(&updatedValues[0]));
+}
+
+TEST_F(SubscriptionManagerTest, testSubscribeInvalidOption) {
+ std::vector<SubscribeOptions> options = {
+ {
+ .propId = 0,
+ .areaIds = {0, 1, 2, 3, 4},
+ // invalid sample rate.
+ .sampleRate = 0.0,
+ },
+ {
+ .propId = 1,
+ .areaIds = {0},
+ .sampleRate = 10.0,
+ },
+ };
+
+ auto result = getManager()->subscribe(getCallbackClient(), options, true);
+ ASSERT_FALSE(result.ok()) << "subscribe with invalid sample rate must fail";
+ ASSERT_TRUE(getManager()
+ ->getSubscribedClients({{
+ .prop = 0,
+ .areaId = 0,
+ },
+ {
+ .prop = 1,
+ .areaId = 0,
+ }})
+ .empty())
+ << "no property should be subscribed if error is returned";
+}
+
+TEST_F(SubscriptionManagerTest, testSubscribeNoAreaIds) {
+ std::vector<SubscribeOptions> options = {
+ {
+ .propId = 0,
+ .areaIds = {},
+ .sampleRate = 1.0,
+ },
+ {
+ .propId = 1,
+ .areaIds = {0},
+ .sampleRate = 10.0,
+ },
+ };
+
+ auto result = getManager()->subscribe(getCallbackClient(), options, true);
+ ASSERT_FALSE(result.ok()) << "subscribe with invalid sample rate must fail";
+ ASSERT_TRUE(getManager()
+ ->getSubscribedClients({{
+ .prop = 1,
+ .areaId = 0,
+ }})
+ .empty())
+ << "no property should be subscribed if error is returned";
+}
+
+TEST_F(SubscriptionManagerTest, testUnsubscribeOnchange) {
+ std::vector<SubscribeOptions> options = {
+ {
+ .propId = 0,
+ .areaIds = {0, 1},
+ },
+ {
+ .propId = 1,
+ .areaIds = {0},
+ },
+ };
+
+ auto result = getManager()->subscribe(getCallbackClient(), options, false);
+ ASSERT_TRUE(result.ok()) << "failed to subscribe: " << result.error().message();
+
+ result = getManager()->unsubscribe(getCallbackClient(), std::vector<int32_t>({0}));
+ ASSERT_TRUE(result.ok()) << "failed to unsubscribe: " << result.error().message();
+
+ std::vector<VehiclePropValue> updatedValues = {
+ {
+ .prop = 0,
+ .areaId = 0,
+ },
+ {
+ .prop = 1,
+ .areaId = 0,
+ },
+ };
+ auto clients = getManager()->getSubscribedClients(updatedValues);
+
+ ASSERT_THAT(clients[getCallbackClient()], ElementsAre(&updatedValues[1]));
+}
+
+TEST_F(SubscriptionManagerTest, testCheckSampleRateValid) {
+ ASSERT_TRUE(SubscriptionManager::checkSampleRate(1.0));
+}
+
+TEST_F(SubscriptionManagerTest, testCheckSampleRateInvalidTooSmall) {
+ ASSERT_FALSE(SubscriptionManager::checkSampleRate(FLT_MIN));
+}
+
+TEST_F(SubscriptionManagerTest, testCheckSampleRateInvalidZero) {
+ ASSERT_FALSE(SubscriptionManager::checkSampleRate(0));
+}
+
+} // namespace vehicle
+} // namespace automotive
+} // namespace hardware
+} // namespace android