blob: f848c82c423cab6c7b74f0ddd3f9a9a4b9a5e243 [file] [log] [blame]
Siarhei Vishniakou473174e2017-12-27 16:44:42 -08001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Prabir Pradhan48108662022-09-09 21:22:04 +000017#pragma once
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080018
19#include <condition_variable>
Ryan Prichard2aef4642023-09-26 16:49:22 -070020#include <functional>
Prabir Pradhand5678112023-05-18 01:57:10 +000021#include <list>
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080022#include <mutex>
Prabir Pradhand5678112023-05-18 01:57:10 +000023#include <optional>
24#include "android-base/thread_annotations.h"
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080025
26namespace android {
27
28/**
Prabir Pradhand5678112023-05-18 01:57:10 +000029 * A thread-safe FIFO queue. This list-backed queue stores up to <i>capacity</i> objects if
30 * a capacity is provided at construction, and is otherwise unbounded.
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080031 * Objects can always be added. Objects are added immediately.
32 * If the queue is full, new objects cannot be added.
33 *
34 * The action of retrieving an object will block until an element is available.
35 */
36template <class T>
37class BlockingQueue {
38public:
Prabir Pradhand5678112023-05-18 01:57:10 +000039 explicit BlockingQueue() = default;
40
41 explicit BlockingQueue(size_t capacity) : mCapacity(capacity){};
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080042
43 /**
44 * Retrieve and remove the oldest object.
Prabir Pradhand5678112023-05-18 01:57:10 +000045 * Blocks execution indefinitely while queue is empty.
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080046 */
47 T pop() {
Siarhei Vishniakou443ad902019-03-06 17:25:41 -080048 std::unique_lock lock(mLock);
Siarhei Vishniakou61291d42019-02-11 18:13:20 -080049 android::base::ScopedLockAssertion assumeLock(mLock);
Siarhei Vishniakoue3021d72020-02-28 15:25:41 -080050 mHasElements.wait(lock, [this]() REQUIRES(mLock) { return !this->mQueue.empty(); });
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080051 T t = std::move(mQueue.front());
52 mQueue.erase(mQueue.begin());
Siarhei Vishniakou61291d42019-02-11 18:13:20 -080053 return t;
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080054 };
55
56 /**
Prabir Pradhand5678112023-05-18 01:57:10 +000057 * Retrieve and remove the oldest object.
58 * Blocks execution for the given duration while queue is empty, and returns std::nullopt
59 * if the queue was empty for the entire duration.
60 */
61 std::optional<T> popWithTimeout(std::chrono::nanoseconds duration) {
62 std::unique_lock lock(mLock);
63 android::base::ScopedLockAssertion assumeLock(mLock);
64 if (!mHasElements.wait_for(lock, duration,
65 [this]() REQUIRES(mLock) { return !this->mQueue.empty(); })) {
66 return {};
67 }
68 T t = std::move(mQueue.front());
69 mQueue.erase(mQueue.begin());
70 return t;
71 };
72
73 /**
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080074 * Add a new object to the queue.
75 * Does not block.
76 * Return true if an element was successfully added.
77 * Return false if the queue is full.
78 */
79 bool push(T&& t) {
Prabir Pradhand5678112023-05-18 01:57:10 +000080 { // acquire lock
Siarhei Vishniakou61291d42019-02-11 18:13:20 -080081 std::scoped_lock lock(mLock);
Prabir Pradhand5678112023-05-18 01:57:10 +000082 if (mCapacity && mQueue.size() == mCapacity) {
Siarhei Vishniakou61291d42019-02-11 18:13:20 -080083 return false;
84 }
85 mQueue.push_back(std::move(t));
Prabir Pradhand5678112023-05-18 01:57:10 +000086 } // release lock
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080087 mHasElements.notify_one();
88 return true;
89 };
90
Prabir Pradhand5678112023-05-18 01:57:10 +000091 /**
92 * Construct a new object into the queue.
93 * Does not block.
94 * Return true if an element was successfully added.
95 * Return false if the queue is full.
96 */
97 template <class... Args>
98 bool emplace(Args&&... args) {
99 { // acquire lock
100 std::scoped_lock lock(mLock);
101 if (mCapacity && mQueue.size() == mCapacity) {
102 return false;
103 }
104 mQueue.emplace_back(args...);
105 } // release lock
106 mHasElements.notify_one();
107 return true;
108 };
109
110 void erase_if(const std::function<bool(const T&)>& pred) {
Siarhei Vishniakou61291d42019-02-11 18:13:20 -0800111 std::scoped_lock lock(mLock);
Prabir Pradhand5678112023-05-18 01:57:10 +0000112 std::erase_if(mQueue, pred);
Siarhei Vishniakou473174e2017-12-27 16:44:42 -0800113 }
114
115 /**
116 * Remove all elements.
117 * Does not block.
118 */
119 void clear() {
120 std::scoped_lock lock(mLock);
121 mQueue.clear();
122 };
123
Siarhei Vishniakoua028c442019-02-04 14:33:23 -0800124 /**
125 * How many elements are currently stored in the queue.
126 * Primary used for debugging.
127 * Does not block.
128 */
129 size_t size() {
130 std::scoped_lock lock(mLock);
131 return mQueue.size();
132 }
133
Siarhei Vishniakou473174e2017-12-27 16:44:42 -0800134private:
Prabir Pradhand5678112023-05-18 01:57:10 +0000135 const std::optional<size_t> mCapacity;
Siarhei Vishniakou473174e2017-12-27 16:44:42 -0800136 /**
137 * Used to signal that mQueue is non-empty.
138 */
139 std::condition_variable mHasElements;
140 /**
141 * Lock for accessing and waiting on elements.
142 */
143 std::mutex mLock;
Prabir Pradhand5678112023-05-18 01:57:10 +0000144 std::list<T> mQueue GUARDED_BY(mLock);
Siarhei Vishniakou473174e2017-12-27 16:44:42 -0800145};
146
Siarhei Vishniakou473174e2017-12-27 16:44:42 -0800147} // namespace android