Add a callback thread to ConsumerBase

- Add a message queue and callback thread in ConsumerBase.
- This is used to prevent deadlocks when ConsumerBase calls into
  BufferQueueConsumer and that generates a callback.

Bug 27229287

Change-Id: I45c41e5a554555511fcfa5c185a7d60b0d969b7e
diff --git a/libs/gui/ConsumerBase.cpp b/libs/gui/ConsumerBase.cpp
index d01187f..a22b81b 100644
--- a/libs/gui/ConsumerBase.cpp
+++ b/libs/gui/ConsumerBase.cpp
@@ -74,12 +74,26 @@
     } else {
         mConsumer->setConsumerName(mName);
     }
+
+    mMessageThread = new MessageThread(this);
+    mMessageThread->run();
 }
 
 ConsumerBase::~ConsumerBase() {
     CB_LOGV("~ConsumerBase");
-    Mutex::Autolock lock(mMutex);
 
+    mMessageThread->requestExit();
+    {
+        Mutex::Autolock lock(mMessageQueueLock);
+        mMessageQueue.emplace(std::piecewise_construct,
+                std::forward_as_tuple(EXIT),
+                std::forward_as_tuple());
+        mMessageAvailable.signal();
+    }
+
+    mMessageThread->join();
+
+    Mutex::Autolock lock(mMutex);
     // Verify that abandon() has been called before we get here.  This should
     // be done by ConsumerBase::onLastStrongRef(), but it's possible for a
     // derived class to override that method and not call
@@ -100,6 +114,13 @@
 }
 
 void ConsumerBase::onFrameAvailable(const BufferItem& item) {
+    Mutex::Autolock lock(mMessageQueueLock);
+    mMessageQueue.emplace(std::piecewise_construct,
+            std::forward_as_tuple(ON_FRAME_AVAILABLE),
+            std::forward_as_tuple(item));
+    mMessageAvailable.signal();
+}
+void ConsumerBase::onFrameAvailableHandler(const BufferItem& item) {
     CB_LOGV("onFrameAvailable");
 
     sp<FrameAvailableListener> listener;
@@ -115,6 +136,14 @@
 }
 
 void ConsumerBase::onFrameReplaced(const BufferItem &item) {
+    Mutex::Autolock lock(mMessageQueueLock);
+    mMessageQueue.emplace(std::piecewise_construct,
+            std::forward_as_tuple(ON_FRAME_REPLACED),
+            std::forward_as_tuple(item));
+    mMessageAvailable.signal();
+}
+
+void ConsumerBase::onFrameReplacedHandler(const BufferItem &item) {
     CB_LOGV("onFrameReplaced");
 
     sp<FrameAvailableListener> listener;
@@ -130,6 +159,14 @@
 }
 
 void ConsumerBase::onBuffersReleased() {
+    Mutex::Autolock lock(mMessageQueueLock);
+    mMessageQueue.emplace(std::piecewise_construct,
+            std::forward_as_tuple(ON_BUFFERS_RELEASED),
+            std::forward_as_tuple());
+    mMessageAvailable.signal();
+}
+
+void ConsumerBase::onBuffersReleasedHandler() {
     Mutex::Autolock lock(mMutex);
 
     CB_LOGV("onBuffersReleased");
@@ -149,6 +186,45 @@
 }
 
 void ConsumerBase::onSidebandStreamChanged() {
+    Mutex::Autolock lock(mMessageQueueLock);
+    mMessageQueue.emplace(std::piecewise_construct,
+            std::forward_as_tuple(ON_SIDEBAND_STREAM_CHANGED),
+            std::forward_as_tuple());
+    mMessageAvailable.signal();
+}
+
+void ConsumerBase::onSidebandStreamChangedHandler() {
+}
+
+bool ConsumerBase::MessageThread::threadLoop() {
+    Mutex::Autolock lock(mConsumerBase->mMessageQueueLock);
+
+    if (mConsumerBase->mMessageQueue.empty()) {
+        mConsumerBase->mMessageAvailable.wait(mConsumerBase->mMessageQueueLock);
+    }
+
+    while (!mConsumerBase->mMessageQueue.empty()) {
+        auto nextMessage = mConsumerBase->mMessageQueue.front();
+
+        switch (nextMessage.first) {
+            case ON_FRAME_AVAILABLE:
+                mConsumerBase->onFrameAvailableHandler(nextMessage.second);
+                break;
+            case ON_FRAME_REPLACED:
+                mConsumerBase->onFrameReplacedHandler(nextMessage.second);
+                break;
+            case ON_BUFFERS_RELEASED:
+                mConsumerBase->onBuffersReleasedHandler();
+                break;
+            case ON_SIDEBAND_STREAM_CHANGED:
+                mConsumerBase->onSidebandStreamChangedHandler();
+                break;
+            case EXIT:
+                break;
+        }
+        mConsumerBase->mMessageQueue.pop();
+    }
+    return true;
 }
 
 void ConsumerBase::abandon() {