Add a callback thread to ConsumerBase
- Add a message queue and callback thread in ConsumerBase.
- This is used to prevent deadlocks when ConsumerBase calls into
BufferQueueConsumer and that generates a callback.
Bug 27229287
Change-Id: I45c41e5a554555511fcfa5c185a7d60b0d969b7e
diff --git a/libs/gui/ConsumerBase.cpp b/libs/gui/ConsumerBase.cpp
index d01187f..a22b81b 100644
--- a/libs/gui/ConsumerBase.cpp
+++ b/libs/gui/ConsumerBase.cpp
@@ -74,12 +74,26 @@
} else {
mConsumer->setConsumerName(mName);
}
+
+ mMessageThread = new MessageThread(this);
+ mMessageThread->run();
}
ConsumerBase::~ConsumerBase() {
CB_LOGV("~ConsumerBase");
- Mutex::Autolock lock(mMutex);
+ mMessageThread->requestExit();
+ {
+ Mutex::Autolock lock(mMessageQueueLock);
+ mMessageQueue.emplace(std::piecewise_construct,
+ std::forward_as_tuple(EXIT),
+ std::forward_as_tuple());
+ mMessageAvailable.signal();
+ }
+
+ mMessageThread->join();
+
+ Mutex::Autolock lock(mMutex);
// Verify that abandon() has been called before we get here. This should
// be done by ConsumerBase::onLastStrongRef(), but it's possible for a
// derived class to override that method and not call
@@ -100,6 +114,13 @@
}
void ConsumerBase::onFrameAvailable(const BufferItem& item) {
+ Mutex::Autolock lock(mMessageQueueLock);
+ mMessageQueue.emplace(std::piecewise_construct,
+ std::forward_as_tuple(ON_FRAME_AVAILABLE),
+ std::forward_as_tuple(item));
+ mMessageAvailable.signal();
+}
+void ConsumerBase::onFrameAvailableHandler(const BufferItem& item) {
CB_LOGV("onFrameAvailable");
sp<FrameAvailableListener> listener;
@@ -115,6 +136,14 @@
}
void ConsumerBase::onFrameReplaced(const BufferItem &item) {
+ Mutex::Autolock lock(mMessageQueueLock);
+ mMessageQueue.emplace(std::piecewise_construct,
+ std::forward_as_tuple(ON_FRAME_REPLACED),
+ std::forward_as_tuple(item));
+ mMessageAvailable.signal();
+}
+
+void ConsumerBase::onFrameReplacedHandler(const BufferItem &item) {
CB_LOGV("onFrameReplaced");
sp<FrameAvailableListener> listener;
@@ -130,6 +159,14 @@
}
void ConsumerBase::onBuffersReleased() {
+ Mutex::Autolock lock(mMessageQueueLock);
+ mMessageQueue.emplace(std::piecewise_construct,
+ std::forward_as_tuple(ON_BUFFERS_RELEASED),
+ std::forward_as_tuple());
+ mMessageAvailable.signal();
+}
+
+void ConsumerBase::onBuffersReleasedHandler() {
Mutex::Autolock lock(mMutex);
CB_LOGV("onBuffersReleased");
@@ -149,6 +186,45 @@
}
void ConsumerBase::onSidebandStreamChanged() {
+ Mutex::Autolock lock(mMessageQueueLock);
+ mMessageQueue.emplace(std::piecewise_construct,
+ std::forward_as_tuple(ON_SIDEBAND_STREAM_CHANGED),
+ std::forward_as_tuple());
+ mMessageAvailable.signal();
+}
+
+void ConsumerBase::onSidebandStreamChangedHandler() {
+}
+
+bool ConsumerBase::MessageThread::threadLoop() {
+ Mutex::Autolock lock(mConsumerBase->mMessageQueueLock);
+
+ if (mConsumerBase->mMessageQueue.empty()) {
+ mConsumerBase->mMessageAvailable.wait(mConsumerBase->mMessageQueueLock);
+ }
+
+ while (!mConsumerBase->mMessageQueue.empty()) {
+ auto nextMessage = mConsumerBase->mMessageQueue.front();
+
+ switch (nextMessage.first) {
+ case ON_FRAME_AVAILABLE:
+ mConsumerBase->onFrameAvailableHandler(nextMessage.second);
+ break;
+ case ON_FRAME_REPLACED:
+ mConsumerBase->onFrameReplacedHandler(nextMessage.second);
+ break;
+ case ON_BUFFERS_RELEASED:
+ mConsumerBase->onBuffersReleasedHandler();
+ break;
+ case ON_SIDEBAND_STREAM_CHANGED:
+ mConsumerBase->onSidebandStreamChangedHandler();
+ break;
+ case EXIT:
+ break;
+ }
+ mConsumerBase->mMessageQueue.pop();
+ }
+ return true;
}
void ConsumerBase::abandon() {