blob: 56938480f043d36361b8ada84a6330b09d827638 [file] [log] [blame]
Siarhei Vishniakou473174e2017-12-27 16:44:42 -08001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Prabir Pradhan48108662022-09-09 21:22:04 +000017#pragma once
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080018
19#include <condition_variable>
Prabir Pradhand5678112023-05-18 01:57:10 +000020#include <list>
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080021#include <mutex>
Prabir Pradhand5678112023-05-18 01:57:10 +000022#include <optional>
23#include "android-base/thread_annotations.h"
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080024
25namespace android {
26
27/**
Prabir Pradhand5678112023-05-18 01:57:10 +000028 * A thread-safe FIFO queue. This list-backed queue stores up to <i>capacity</i> objects if
29 * a capacity is provided at construction, and is otherwise unbounded.
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080030 * Objects can always be added. Objects are added immediately.
31 * If the queue is full, new objects cannot be added.
32 *
33 * The action of retrieving an object will block until an element is available.
34 */
35template <class T>
36class BlockingQueue {
37public:
Prabir Pradhand5678112023-05-18 01:57:10 +000038 explicit BlockingQueue() = default;
39
40 explicit BlockingQueue(size_t capacity) : mCapacity(capacity){};
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080041
42 /**
43 * Retrieve and remove the oldest object.
Prabir Pradhand5678112023-05-18 01:57:10 +000044 * Blocks execution indefinitely while queue is empty.
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080045 */
46 T pop() {
Siarhei Vishniakou443ad902019-03-06 17:25:41 -080047 std::unique_lock lock(mLock);
Siarhei Vishniakou61291d42019-02-11 18:13:20 -080048 android::base::ScopedLockAssertion assumeLock(mLock);
Siarhei Vishniakoue3021d72020-02-28 15:25:41 -080049 mHasElements.wait(lock, [this]() REQUIRES(mLock) { return !this->mQueue.empty(); });
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080050 T t = std::move(mQueue.front());
51 mQueue.erase(mQueue.begin());
Siarhei Vishniakou61291d42019-02-11 18:13:20 -080052 return t;
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080053 };
54
55 /**
Prabir Pradhand5678112023-05-18 01:57:10 +000056 * Retrieve and remove the oldest object.
57 * Blocks execution for the given duration while queue is empty, and returns std::nullopt
58 * if the queue was empty for the entire duration.
59 */
60 std::optional<T> popWithTimeout(std::chrono::nanoseconds duration) {
61 std::unique_lock lock(mLock);
62 android::base::ScopedLockAssertion assumeLock(mLock);
63 if (!mHasElements.wait_for(lock, duration,
64 [this]() REQUIRES(mLock) { return !this->mQueue.empty(); })) {
65 return {};
66 }
67 T t = std::move(mQueue.front());
68 mQueue.erase(mQueue.begin());
69 return t;
70 };
71
72 /**
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080073 * Add a new object to the queue.
74 * Does not block.
75 * Return true if an element was successfully added.
76 * Return false if the queue is full.
77 */
78 bool push(T&& t) {
Prabir Pradhand5678112023-05-18 01:57:10 +000079 { // acquire lock
Siarhei Vishniakou61291d42019-02-11 18:13:20 -080080 std::scoped_lock lock(mLock);
Prabir Pradhand5678112023-05-18 01:57:10 +000081 if (mCapacity && mQueue.size() == mCapacity) {
Siarhei Vishniakou61291d42019-02-11 18:13:20 -080082 return false;
83 }
84 mQueue.push_back(std::move(t));
Prabir Pradhand5678112023-05-18 01:57:10 +000085 } // release lock
Siarhei Vishniakou473174e2017-12-27 16:44:42 -080086 mHasElements.notify_one();
87 return true;
88 };
89
Prabir Pradhand5678112023-05-18 01:57:10 +000090 /**
91 * Construct a new object into the queue.
92 * Does not block.
93 * Return true if an element was successfully added.
94 * Return false if the queue is full.
95 */
96 template <class... Args>
97 bool emplace(Args&&... args) {
98 { // acquire lock
99 std::scoped_lock lock(mLock);
100 if (mCapacity && mQueue.size() == mCapacity) {
101 return false;
102 }
103 mQueue.emplace_back(args...);
104 } // release lock
105 mHasElements.notify_one();
106 return true;
107 };
108
109 void erase_if(const std::function<bool(const T&)>& pred) {
Siarhei Vishniakou61291d42019-02-11 18:13:20 -0800110 std::scoped_lock lock(mLock);
Prabir Pradhand5678112023-05-18 01:57:10 +0000111 std::erase_if(mQueue, pred);
Siarhei Vishniakou473174e2017-12-27 16:44:42 -0800112 }
113
114 /**
115 * Remove all elements.
116 * Does not block.
117 */
118 void clear() {
119 std::scoped_lock lock(mLock);
120 mQueue.clear();
121 };
122
Siarhei Vishniakoua028c442019-02-04 14:33:23 -0800123 /**
124 * How many elements are currently stored in the queue.
125 * Primary used for debugging.
126 * Does not block.
127 */
128 size_t size() {
129 std::scoped_lock lock(mLock);
130 return mQueue.size();
131 }
132
Siarhei Vishniakou473174e2017-12-27 16:44:42 -0800133private:
Prabir Pradhand5678112023-05-18 01:57:10 +0000134 const std::optional<size_t> mCapacity;
Siarhei Vishniakou473174e2017-12-27 16:44:42 -0800135 /**
136 * Used to signal that mQueue is non-empty.
137 */
138 std::condition_variable mHasElements;
139 /**
140 * Lock for accessing and waiting on elements.
141 */
142 std::mutex mLock;
Prabir Pradhand5678112023-05-18 01:57:10 +0000143 std::list<T> mQueue GUARDED_BY(mLock);
Siarhei Vishniakou473174e2017-12-27 16:44:42 -0800144};
145
Siarhei Vishniakou473174e2017-12-27 16:44:42 -0800146} // namespace android