A ThreadedSource wraps around an existing MediaSource and reads output buffers on a separate thread. It's now used for the vpx decoder to decode frames ahead of time to improve playback performance.

Change-Id: I57a798b00adeb2c8056e85aab29a2b57aef00b63
diff --git a/media/libstagefright/Android.mk b/media/libstagefright/Android.mk
index 86fa668..e0321a5 100644
--- a/media/libstagefright/Android.mk
+++ b/media/libstagefright/Android.mk
@@ -34,6 +34,7 @@
         ShoutcastSource.cpp               \
         StagefrightMediaScanner.cpp       \
         StagefrightMetadataRetriever.cpp  \
+        ThreadedSource.cpp                \
         ThrottledSource.cpp               \
         TimeSource.cpp                    \
         TimedEventQueue.cpp               \
diff --git a/media/libstagefright/OMXCodec.cpp b/media/libstagefright/OMXCodec.cpp
index fd23b61..3d25a4b 100644
--- a/media/libstagefright/OMXCodec.cpp
+++ b/media/libstagefright/OMXCodec.cpp
@@ -52,6 +52,8 @@
 #include <OMX_Audio.h>
 #include <OMX_Component.h>
 
+#include "include/ThreadedSource.h"
+
 namespace android {
 
 static const int OMX_QCOM_COLOR_FormatYVU420SemiPlanar = 0x7FA30C00;
@@ -134,6 +136,10 @@
     for (size_t i = 0;
          i < sizeof(kFactoryInfo) / sizeof(kFactoryInfo[0]); ++i) {
         if (!strcmp(name, kFactoryInfo[i].name)) {
+            if (!strcmp(name, "VPXDecoder")) {
+                return new ThreadedSource(
+                        (*kFactoryInfo[i].CreateFunc)(source));
+            }
             return (*kFactoryInfo[i].CreateFunc)(source);
         }
     }
diff --git a/media/libstagefright/ThreadedSource.cpp b/media/libstagefright/ThreadedSource.cpp
new file mode 100644
index 0000000..5add2a5
--- /dev/null
+++ b/media/libstagefright/ThreadedSource.cpp
@@ -0,0 +1,209 @@
+/*
+ * Copyright (C) 2010 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "include/ThreadedSource.h"
+
+#include <media/stagefright/foundation/ADebug.h>
+#include <media/stagefright/foundation/AMessage.h>
+#include <media/stagefright/MediaBuffer.h>
+#include <media/stagefright/MetaData.h>
+
+namespace android {
+
+static const size_t kMaxQueueSize = 2;
+
+ThreadedSource::ThreadedSource(const sp<MediaSource> &source)
+    : mSource(source),
+      mReflector(new AHandlerReflector<ThreadedSource>(this)),
+      mLooper(new ALooper),
+      mStarted(false) {
+    mLooper->registerHandler(mReflector);
+}
+
+ThreadedSource::~ThreadedSource() {
+    if (mStarted) {
+        stop();
+    }
+}
+
+status_t ThreadedSource::start(MetaData *params) {
+    CHECK(!mStarted);
+
+    status_t err = mSource->start(params);
+
+    if (err != OK) {
+        return err;
+    }
+
+    mFinalResult = OK;
+    mSeekTimeUs = -1;
+    mDecodePending = false;
+
+    Mutex::Autolock autoLock(mLock);
+    postDecodeMore_l();
+
+    CHECK_EQ(mLooper->start(), (status_t)OK);
+
+    mStarted = true;
+
+    return OK;
+}
+
+status_t ThreadedSource::stop() {
+    CHECK(mStarted);
+
+    CHECK_EQ(mLooper->stop(), (status_t)OK);
+
+    Mutex::Autolock autoLock(mLock);
+    clearQueue_l();
+
+    status_t err = mSource->stop();
+
+    mStarted = false;
+
+    return err;
+}
+
+sp<MetaData> ThreadedSource::getFormat() {
+    return mSource->getFormat();
+}
+
+status_t ThreadedSource::read(
+        MediaBuffer **buffer, const ReadOptions *options) {
+    *buffer = NULL;
+
+    Mutex::Autolock autoLock(mLock);
+
+    int64_t seekTimeUs;
+    ReadOptions::SeekMode seekMode;
+    if (options && options->getSeekTo(&seekTimeUs, &seekMode)) {
+        int32_t seekComplete = 0;
+
+        sp<AMessage> msg = new AMessage(kWhatSeek, mReflector->id());
+        msg->setInt64("timeUs", seekTimeUs);
+        msg->setInt32("mode", seekMode);
+        msg->setPointer("complete", &seekComplete);
+        msg->post();
+
+        while (!seekComplete) {
+            mCondition.wait(mLock);
+        }
+    }
+
+    while (mQueue.empty() && mFinalResult == OK) {
+        mCondition.wait(mLock);
+    }
+
+    if (!mQueue.empty()) {
+        *buffer = *mQueue.begin();
+        mQueue.erase(mQueue.begin());
+
+        if (mFinalResult == OK) {
+            postDecodeMore_l();
+        }
+
+        return OK;
+    }
+
+    return mFinalResult;
+}
+
+void ThreadedSource::onMessageReceived(const sp<AMessage> &msg) {
+    switch (msg->what()) {
+        case kWhatSeek:
+        {
+            CHECK(msg->findInt64("timeUs", &mSeekTimeUs));
+            CHECK_GE(mSeekTimeUs, 0ll);
+
+            int32_t x;
+            CHECK(msg->findInt32("mode", &x));
+            mSeekMode = (ReadOptions::SeekMode)x;
+
+            int32_t *seekComplete;
+            CHECK(msg->findPointer("complete", (void **)&seekComplete));
+
+            Mutex::Autolock autoLock(mLock);
+            clearQueue_l();
+            mFinalResult = OK;
+
+            *seekComplete = 1;
+            mCondition.signal();
+
+            postDecodeMore_l();
+            break;
+        }
+
+        case kWhatDecodeMore:
+        {
+            {
+                Mutex::Autolock autoLock(mLock);
+                mDecodePending = false;
+
+                if (mQueue.size() == kMaxQueueSize) {
+                    break;
+                }
+            }
+
+            MediaBuffer *buffer;
+            ReadOptions options;
+            if (mSeekTimeUs >= 0) {
+                options.setSeekTo(mSeekTimeUs, mSeekMode);
+                mSeekTimeUs = -1ll;
+            }
+            status_t err = mSource->read(&buffer, &options);
+
+            Mutex::Autolock autoLock(mLock);
+
+            if (err != OK) {
+                mFinalResult = err;
+            } else {
+                mQueue.push_back(buffer);
+
+                if (mQueue.size() < kMaxQueueSize) {
+                    postDecodeMore_l();
+                }
+            }
+
+            mCondition.signal();
+            break;
+        }
+
+        default:
+            TRESPASS();
+            break;
+    }
+}
+
+void ThreadedSource::postDecodeMore_l() {
+    if (mDecodePending) {
+        return;
+    }
+
+    mDecodePending = true;
+    (new AMessage(kWhatDecodeMore, mReflector->id()))->post();
+}
+
+void ThreadedSource::clearQueue_l() {
+    while (!mQueue.empty()) {
+        MediaBuffer *buffer = *mQueue.begin();
+        mQueue.erase(mQueue.begin());
+
+        buffer->release();
+        buffer = NULL;
+    }
+}
+
+}  // namespace android
diff --git a/media/libstagefright/include/ThreadedSource.h b/media/libstagefright/include/ThreadedSource.h
new file mode 100644
index 0000000..c67295c
--- /dev/null
+++ b/media/libstagefright/include/ThreadedSource.h
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2010 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef THREADED_SOURCE_H_
+
+#define THREADED_SOURCE_H_
+
+#include <media/stagefright/foundation/ABase.h>
+#include <media/stagefright/foundation/AHandlerReflector.h>
+#include <media/stagefright/foundation/ALooper.h>
+#include <media/stagefright/MediaSource.h>
+#include <utils/threads.h>
+
+namespace android {
+
+struct ThreadedSource : public MediaSource {
+    ThreadedSource(const sp<MediaSource> &source);
+
+    virtual status_t start(MetaData *params);
+    virtual status_t stop();
+
+    virtual sp<MetaData> getFormat();
+
+    virtual status_t read(
+            MediaBuffer **buffer, const ReadOptions *options);
+
+    virtual void onMessageReceived(const sp<AMessage> &msg);
+
+protected:
+    virtual ~ThreadedSource();
+
+private:
+    enum {
+        kWhatDecodeMore = 'deco',
+        kWhatSeek       = 'seek',
+    };
+
+    sp<MediaSource> mSource;
+    sp<AHandlerReflector<ThreadedSource> > mReflector;
+    sp<ALooper> mLooper;
+
+    Mutex mLock;
+    Condition mCondition;
+    List<MediaBuffer *> mQueue;
+    status_t mFinalResult;
+    bool mDecodePending;
+    bool mStarted;
+
+    int64_t mSeekTimeUs;
+    ReadOptions::SeekMode mSeekMode;
+
+    void postDecodeMore_l();
+    void clearQueue_l();
+
+    DISALLOW_EVIL_CONSTRUCTORS(ThreadedSource);
+};
+
+}  // namespace android
+
+#endif  // THREADED_SOURCE_H_