Migrate ConcurrentQueue.
Test: atest VehicleHalVehicleUtilsTest
Bug: 201830716
Change-Id: I78671befd8351f17bf3da43372ec302e106690de
diff --git a/automotive/vehicle/aidl/impl/utils/common/include/ConcurrentQueue.h b/automotive/vehicle/aidl/impl/utils/common/include/ConcurrentQueue.h
new file mode 100644
index 0000000..68bd559
--- /dev/null
+++ b/automotive/vehicle/aidl/impl/utils/common/include/ConcurrentQueue.h
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2016 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
+#define android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
+
+#include <android-base/thread_annotations.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <queue>
+#include <thread>
+
+namespace android {
+namespace hardware {
+namespace automotive {
+namespace vehicle {
+
+template <typename T>
+class ConcurrentQueue {
+ public:
+ void waitForItems() {
+ std::unique_lock<std::mutex> lockGuard(mLock);
+ ::android::base::ScopedLockAssertion lockAssertion(mLock);
+ while (mQueue.empty() && mIsActive) {
+ mCond.wait(lockGuard);
+ }
+ }
+
+ std::vector<T> flush() {
+ std::vector<T> items;
+
+ std::lock_guard<std::mutex> lockGuard(mLock);
+ if (mQueue.empty()) {
+ return items;
+ }
+ while (!mQueue.empty()) {
+ // Even if the queue is deactivated, we should still flush all the remaining values
+ // in the queue.
+ items.push_back(std::move(mQueue.front()));
+ mQueue.pop();
+ }
+ return items;
+ }
+
+ void push(T&& item) {
+ {
+ std::lock_guard<std::mutex> lockGuard(mLock);
+ if (!mIsActive) {
+ return;
+ }
+ mQueue.push(std::move(item));
+ }
+ mCond.notify_one();
+ }
+
+ // Deactivates the queue, thus no one can push items to it, also notifies all waiting thread.
+ // The items already in the queue could still be flushed even after the queue is deactivated.
+ void deactivate() {
+ {
+ std::lock_guard<std::mutex> lockGuard(mLock);
+ mIsActive = false;
+ }
+ // To unblock all waiting consumers.
+ mCond.notify_all();
+ }
+
+ ConcurrentQueue() = default;
+
+ ConcurrentQueue(const ConcurrentQueue&) = delete;
+ ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;
+
+ private:
+ mutable std::mutex mLock;
+ bool mIsActive GUARDED_BY(mLock) = true;
+ std::condition_variable mCond;
+ std::queue<T> mQueue GUARDED_BY(mLock);
+};
+
+} // namespace vehicle
+} // namespace automotive
+} // namespace hardware
+} // namespace android
+
+#endif // android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
diff --git a/automotive/vehicle/aidl/impl/utils/common/test/VehicleUtilsTest.cpp b/automotive/vehicle/aidl/impl/utils/common/test/VehicleUtilsTest.cpp
index 7ad3d31..131eb3b 100644
--- a/automotive/vehicle/aidl/impl/utils/common/test/VehicleUtilsTest.cpp
+++ b/automotive/vehicle/aidl/impl/utils/common/test/VehicleUtilsTest.cpp
@@ -14,10 +14,14 @@
* limitations under the License.
*/
+#include <ConcurrentQueue.h>
#include <PropertyUtils.h>
#include <VehicleUtils.h>
#include <gtest/gtest.h>
+
+#include <atomic>
+#include <thread>
#include <vector>
namespace android {
@@ -231,7 +235,7 @@
<< "vector size should always be 1 for single value type";
}
-TEST(VehicleUtilsTest, testCreateVehiclePropValueFloVecatVec) {
+TEST(VehicleUtilsTest, testCreateVehiclePropValueFloatVecMultiValues) {
std::unique_ptr<VehiclePropValue> value =
createVehiclePropValueVec(VehiclePropertyType::FLOAT_VEC, /*vecSize=*/2);
@@ -247,6 +251,90 @@
ASSERT_EQ(2u, value->value.byteValues.size());
}
+TEST(VehicleUtilsTest, testConcurrentQueueOneThread) {
+ ConcurrentQueue<int> queue;
+
+ queue.push(1);
+ queue.push(2);
+ auto result = queue.flush();
+
+ ASSERT_EQ(result, std::vector<int>({1, 2}));
+}
+
+TEST(VehicleUtilsTest, testConcurrentQueueMultipleThreads) {
+ ConcurrentQueue<int> queue;
+ std::vector<int> results;
+ std::atomic<bool> stop = false;
+
+ std::thread t1([&queue]() {
+ for (int i = 0; i < 100; i++) {
+ queue.push(0);
+ }
+ });
+ std::thread t2([&queue]() {
+ for (int i = 0; i < 100; i++) {
+ queue.push(1);
+ }
+ });
+ std::thread t3([&queue, &results, &stop]() {
+ while (!stop) {
+ queue.waitForItems();
+ for (int i : queue.flush()) {
+ results.push_back(i);
+ }
+ }
+
+ // After we stop, get all the remaining values in the queue.
+ for (int i : queue.flush()) {
+ results.push_back(i);
+ }
+ });
+
+ t1.join();
+ t2.join();
+
+ stop = true;
+ queue.deactivate();
+ t3.join();
+
+ size_t zeroCount = 0;
+ size_t oneCount = 0;
+ for (int i : results) {
+ if (i == 0) {
+ zeroCount++;
+ }
+ if (i == 1) {
+ oneCount++;
+ }
+ }
+
+ EXPECT_EQ(results.size(), static_cast<size_t>(200));
+ EXPECT_EQ(zeroCount, static_cast<size_t>(100));
+ EXPECT_EQ(oneCount, static_cast<size_t>(100));
+}
+
+TEST(VehicleUtilsTest, testConcurrentQueuePushAfterDeactivate) {
+ ConcurrentQueue<int> queue;
+
+ queue.deactivate();
+ queue.push(1);
+
+ ASSERT_TRUE(queue.flush().empty());
+}
+
+TEST(VehicleUtilsTest, testConcurrentQueueDeactivateNotifyWaitingThread) {
+ ConcurrentQueue<int> queue;
+
+ std::thread t([&queue]() {
+ // This would block until queue is deactivated.
+ queue.waitForItems();
+ });
+
+ queue.deactivate();
+
+ t.join();
+}
+
} // namespace vehicle
} // namespace automotive
} // namespace hardware