Migrate ConcurrentQueue.

Test: atest VehicleHalVehicleUtilsTest
Bug: 201830716
Change-Id: I78671befd8351f17bf3da43372ec302e106690de
diff --git a/automotive/vehicle/aidl/impl/utils/common/include/ConcurrentQueue.h b/automotive/vehicle/aidl/impl/utils/common/include/ConcurrentQueue.h
new file mode 100644
index 0000000..68bd559
--- /dev/null
+++ b/automotive/vehicle/aidl/impl/utils/common/include/ConcurrentQueue.h
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2016 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
+#define android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
+
+#include <android-base/thread_annotations.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <queue>
+#include <thread>
+
+namespace android {
+namespace hardware {
+namespace automotive {
+namespace vehicle {
+
+template <typename T>
+class ConcurrentQueue {
+  public:
+    void waitForItems() {
+        std::unique_lock<std::mutex> lockGuard(mLock);
+        ::android::base::ScopedLockAssertion lockAssertion(mLock);
+        while (mQueue.empty() && mIsActive) {
+            mCond.wait(lockGuard);
+        }
+    }
+
+    std::vector<T> flush() {
+        std::vector<T> items;
+
+        std::lock_guard<std::mutex> lockGuard(mLock);
+        if (mQueue.empty()) {
+            return items;
+        }
+        while (!mQueue.empty()) {
+            // Even if the queue is deactivated, we should still flush all the remaining values
+            // in the queue.
+            items.push_back(std::move(mQueue.front()));
+            mQueue.pop();
+        }
+        return items;
+    }
+
+    void push(T&& item) {
+        {
+            std::lock_guard<std::mutex> lockGuard(mLock);
+            if (!mIsActive) {
+                return;
+            }
+            mQueue.push(std::move(item));
+        }
+        mCond.notify_one();
+    }
+
+    // Deactivates the queue, thus no one can push items to it, also notifies all waiting thread.
+    // The items already in the queue could still be flushed even after the queue is deactivated.
+    void deactivate() {
+        {
+            std::lock_guard<std::mutex> lockGuard(mLock);
+            mIsActive = false;
+        }
+        // To unblock all waiting consumers.
+        mCond.notify_all();
+    }
+
+    ConcurrentQueue() = default;
+
+    ConcurrentQueue(const ConcurrentQueue&) = delete;
+    ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;
+
+  private:
+    mutable std::mutex mLock;
+    bool mIsActive GUARDED_BY(mLock) = true;
+    std::condition_variable mCond;
+    std::queue<T> mQueue GUARDED_BY(mLock);
+};
+
+}  // namespace vehicle
+}  // namespace automotive
+}  // namespace hardware
+}  // namespace android
+
+#endif  // android_hardware_automotive_vehicle_aidl_impl_utils_common_include_ConcurrentQueue_H_
diff --git a/automotive/vehicle/aidl/impl/utils/common/test/VehicleUtilsTest.cpp b/automotive/vehicle/aidl/impl/utils/common/test/VehicleUtilsTest.cpp
index 7ad3d31..131eb3b 100644
--- a/automotive/vehicle/aidl/impl/utils/common/test/VehicleUtilsTest.cpp
+++ b/automotive/vehicle/aidl/impl/utils/common/test/VehicleUtilsTest.cpp
@@ -14,10 +14,14 @@
  * limitations under the License.
  */
 
+#include <ConcurrentQueue.h>
 #include <PropertyUtils.h>
 #include <VehicleUtils.h>
 
 #include <gtest/gtest.h>
+
+#include <atomic>
+#include <thread>
 #include <vector>
 
 namespace android {
@@ -231,7 +235,7 @@
             << "vector size should always be 1 for single value type";
 }
 
-TEST(VehicleUtilsTest, testCreateVehiclePropValueFloVecatVec) {
+TEST(VehicleUtilsTest, testCreateVehiclePropValueFloatVecMultiValues) {
     std::unique_ptr<VehiclePropValue> value =
             createVehiclePropValueVec(VehiclePropertyType::FLOAT_VEC, /*vecSize=*/2);
 
@@ -247,6 +251,90 @@
     ASSERT_EQ(2u, value->value.byteValues.size());
 }
 
+TEST(VehicleUtilsTest, testConcurrentQueueOneThread) {
+    ConcurrentQueue<int> queue;
+
+    queue.push(1);
+    queue.push(2);
+    auto result = queue.flush();
+
+    ASSERT_EQ(result, std::vector<int>({1, 2}));
+}
+
+TEST(VehicleUtilsTest, testConcurrentQueueMultipleThreads) {
+    ConcurrentQueue<int> queue;
+    std::vector<int> results;
+    std::atomic<bool> stop = false;
+
+    std::thread t1([&queue]() {
+        for (int i = 0; i < 100; i++) {
+            queue.push(0);
+        }
+    });
+    std::thread t2([&queue]() {
+        for (int i = 0; i < 100; i++) {
+            queue.push(1);
+        }
+    });
+    std::thread t3([&queue, &results, &stop]() {
+        while (!stop) {
+            queue.waitForItems();
+            for (int i : queue.flush()) {
+                results.push_back(i);
+            }
+        }
+
+        // After we stop, get all the remaining values in the queue.
+        for (int i : queue.flush()) {
+            results.push_back(i);
+        }
+    });
+
+    t1.join();
+    t2.join();
+
+    stop = true;
+    queue.deactivate();
+    t3.join();
+
+    size_t zeroCount = 0;
+    size_t oneCount = 0;
+    for (int i : results) {
+        if (i == 0) {
+            zeroCount++;
+        }
+        if (i == 1) {
+            oneCount++;
+        }
+    }
+
+    EXPECT_EQ(results.size(), static_cast<size_t>(200));
+    EXPECT_EQ(zeroCount, static_cast<size_t>(100));
+    EXPECT_EQ(oneCount, static_cast<size_t>(100));
+}
+
+TEST(VehicleUtilsTest, testConcurrentQueuePushAfterDeactivate) {
+    ConcurrentQueue<int> queue;
+
+    queue.deactivate();
+    queue.push(1);
+
+    ASSERT_TRUE(queue.flush().empty());
+}
+
+TEST(VehicleUtilsTest, testConcurrentQueueDeactivateNotifyWaitingThread) {
+    ConcurrentQueue<int> queue;
+
+    std::thread t([&queue]() {
+        // This would block until queue is deactivated.
+        queue.waitForItems();
+    });
+
+    queue.deactivate();
+
+    t.join();
+}
+
 }  // namespace vehicle
 }  // namespace automotive
 }  // namespace hardware