Merge "stagefright: rework MediaCodecSource" into nyc-dev
diff --git a/include/media/stagefright/MediaCodecSource.h b/include/media/stagefright/MediaCodecSource.h
index d147b57..e3f3f5e 100644
--- a/include/media/stagefright/MediaCodecSource.h
+++ b/include/media/stagefright/MediaCodecSource.h
@@ -19,6 +19,7 @@
#include <media/stagefright/foundation/ABase.h>
#include <media/stagefright/foundation/AHandlerReflector.h>
+#include <media/stagefright/foundation/Mutexed.h>
#include <media/stagefright/MediaSource.h>
#include <gui/IGraphicBufferConsumer.h>
@@ -79,6 +80,7 @@
kWhatStop,
kWhatPause,
kWhatSetInputBufferTimeOffset,
+ kWhatStopStalled,
};
MediaCodecSource(
@@ -127,12 +129,16 @@
int64_t mFirstSampleTimeUs;
List<int64_t> mDriftTimeQueue;
- // following variables are protected by mOutputBufferLock
- Mutex mOutputBufferLock;
- Condition mOutputBufferCond;
- List<MediaBuffer*> mOutputBufferQueue;
- bool mEncoderReachedEOS;
- status_t mErrorCode;
+ struct Output {
+ Output();
+ List<MediaBuffer*> mBufferQueue;
+ bool mEncoderReachedEOS;
+ status_t mErrorCode;
+ Condition mCond;
+ };
+ Mutexed<Output> mOutput;
+
+ int32_t mGeneration;
DISALLOW_EVIL_CONSTRUCTORS(MediaCodecSource);
};
diff --git a/media/libstagefright/MediaCodecSource.cpp b/media/libstagefright/MediaCodecSource.cpp
index a148d55..3a3a538 100644
--- a/media/libstagefright/MediaCodecSource.cpp
+++ b/media/libstagefright/MediaCodecSource.cpp
@@ -43,15 +43,19 @@
const int kDefaultSwVideoEncoderFormat = HAL_PIXEL_FORMAT_YCbCr_420_888;
const int kDefaultSwVideoEncoderDataSpace = HAL_DATASPACE_BT709;
+const int kStopTimeoutUs = 300000; // allow 1 sec for shutting down encoder
+
struct MediaCodecSource::Puller : public AHandler {
Puller(const sp<MediaSource> &source);
status_t start(const sp<MetaData> &meta, const sp<AMessage> ¬ify);
void stop();
-
+ void stopSource();
void pause();
void resume();
+ bool readBuffer(MediaBuffer **buffer);
+
protected:
virtual void onMessageReceived(const sp<AMessage> &msg);
virtual ~Puller();
@@ -61,17 +65,31 @@
kWhatStart = 'msta',
kWhatStop,
kWhatPull,
- kWhatPause,
- kWhatResume,
};
sp<MediaSource> mSource;
sp<AMessage> mNotify;
sp<ALooper> mLooper;
- int32_t mPullGeneration;
bool mIsAudio;
- bool mPaused;
- bool mReachedEOS;
+
+ struct Queue {
+ Queue()
+ : mReadPendingSince(0),
+ mPaused(false),
+ mPulling(false) { }
+ int64_t mReadPendingSince;
+ bool mPaused;
+ bool mPulling;
+ Vector<MediaBuffer *> mReadBuffers;
+
+ void flush();
+ // if queue is empty, return false and set *|buffer| to NULL . Otherwise, pop
+ // buffer from front of the queue, place it into *|buffer| and return true.
+ bool readBuffer(MediaBuffer **buffer);
+ // add a buffer to the back of the queue
+ void pushBuffer(MediaBuffer *mbuf);
+ };
+ Mutexed<Queue> mQueue;
status_t postSynchronouslyAndReturnError(const sp<AMessage> &msg);
void schedulePull();
@@ -83,10 +101,8 @@
MediaCodecSource::Puller::Puller(const sp<MediaSource> &source)
: mSource(source),
mLooper(new ALooper()),
- mPullGeneration(0),
- mIsAudio(false),
- mPaused(false),
- mReachedEOS(false) {
+ mIsAudio(false)
+{
sp<MetaData> meta = source->getFormat();
const char *mime;
CHECK(meta->findCString(kKeyMIMEType, &mime));
@@ -101,6 +117,33 @@
mLooper->stop();
}
+void MediaCodecSource::Puller::Queue::pushBuffer(MediaBuffer *mbuf) {
+ mReadBuffers.push_back(mbuf);
+}
+
+bool MediaCodecSource::Puller::Queue::readBuffer(MediaBuffer **mbuf) {
+ if (mReadBuffers.empty()) {
+ *mbuf = NULL;
+ return false;
+ }
+ *mbuf = *mReadBuffers.begin();
+ mReadBuffers.erase(mReadBuffers.begin());
+ return true;
+}
+
+void MediaCodecSource::Puller::Queue::flush() {
+ MediaBuffer *mbuf;
+ while (readBuffer(&mbuf)) {
+ // there are no null buffers in the queue
+ mbuf->release();
+ }
+}
+
+bool MediaCodecSource::Puller::readBuffer(MediaBuffer **mbuf) {
+ Mutexed<Queue>::Locked queue(mQueue);
+ return queue->readBuffer(mbuf);
+}
+
status_t MediaCodecSource::Puller::postSynchronouslyAndReturnError(
const sp<AMessage> &msg) {
sp<AMessage> response;
@@ -117,8 +160,7 @@
return err;
}
-status_t MediaCodecSource::Puller::start(const sp<MetaData> &meta,
- const sp<AMessage> ¬ify) {
+status_t MediaCodecSource::Puller::start(const sp<MetaData> &meta, const sp<AMessage> ¬ify) {
ALOGV("puller (%s) start", mIsAudio ? "audio" : "video");
mLooper->start(
false /* runOnCallingThread */,
@@ -133,41 +175,46 @@
}
void MediaCodecSource::Puller::stop() {
- // Stop source from caller's thread instead of puller's looper.
- // mSource->stop() is thread-safe, doing it outside the puller's
- // looper allows us to at least stop if source gets stuck.
- // If source gets stuck in read(), the looper would never
- // be able to process the stop(), which could lead to ANR.
+ bool interrupt = false;
+ {
+ // mark stopping before actually reaching kWhatStop on the looper, so the pulling will
+ // stop.
+ Mutexed<Queue>::Locked queue(mQueue);
+ queue->mPulling = false;
+ interrupt = queue->mReadPendingSince && (queue->mReadPendingSince < ALooper::GetNowUs() - 1000000);
+ queue->flush(); // flush any unprocessed pulled buffers
+ }
- ALOGV("source (%s) stopping", mIsAudio ? "audio" : "video");
- mSource->stop();
- ALOGV("source (%s) stopped", mIsAudio ? "audio" : "video");
+ if (interrupt) {
+ // call source->stop if read has been pending for over a second
+ // TODO: we should really call this if kWhatStop has not returned for more than a second.
+ mSource->stop();
+ }
+}
+void MediaCodecSource::Puller::stopSource() {
(new AMessage(kWhatStop, this))->post();
}
void MediaCodecSource::Puller::pause() {
- (new AMessage(kWhatPause, this))->post();
+ Mutexed<Queue>::Locked queue(mQueue);
+ queue->mPaused = true;
}
void MediaCodecSource::Puller::resume() {
- (new AMessage(kWhatResume, this))->post();
+ Mutexed<Queue>::Locked queue(mQueue);
+ queue->mPaused = false;
}
void MediaCodecSource::Puller::schedulePull() {
- sp<AMessage> msg = new AMessage(kWhatPull, this);
- msg->setInt32("generation", mPullGeneration);
- msg->post();
+ (new AMessage(kWhatPull, this))->post();
}
void MediaCodecSource::Puller::handleEOS() {
- if (!mReachedEOS) {
- ALOGV("puller (%s) posting EOS", mIsAudio ? "audio" : "video");
- mReachedEOS = true;
- sp<AMessage> notify = mNotify->dup();
- notify->setPointer("accessUnit", NULL);
- notify->post();
- }
+ ALOGV("puller (%s) posting EOS", mIsAudio ? "audio" : "video");
+ sp<AMessage> msg = mNotify->dup();
+ msg->setInt32("eos", 1);
+ msg->post();
}
void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) {
@@ -177,7 +224,10 @@
sp<RefBase> obj;
CHECK(msg->findObject("meta", &obj));
- mReachedEOS = false;
+ {
+ Mutexed<Queue>::Locked queue(mQueue);
+ queue->mPulling = true;
+ }
status_t err = mSource->start(static_cast<MetaData *>(obj.get()));
@@ -196,61 +246,52 @@
case kWhatStop:
{
- ++mPullGeneration;
-
- handleEOS();
+ mSource->stop();
break;
}
case kWhatPull:
{
- int32_t generation;
- CHECK(msg->findInt32("generation", &generation));
-
- if (generation != mPullGeneration) {
+ Mutexed<Queue>::Locked queue(mQueue);
+ queue->mReadPendingSince = ALooper::GetNowUs();
+ if (!queue->mPulling) {
+ handleEOS();
break;
}
- MediaBuffer *mbuf;
+ queue.unlock();
+ MediaBuffer *mbuf = NULL;
status_t err = mSource->read(&mbuf);
+ queue.lock();
- if (mPaused) {
- if (err == OK) {
+ queue->mReadPendingSince = 0;
+ // if we need to discard buffer
+ if (!queue->mPulling || queue->mPaused || err != OK) {
+ if (mbuf != NULL) {
mbuf->release();
mbuf = NULL;
}
-
- msg->post();
- break;
- }
-
- if (err != OK) {
- if (err == ERROR_END_OF_STREAM) {
+ if (queue->mPulling && err == OK) {
+ msg->post(); // if simply paused, keep pulling source
+ } else if (err == ERROR_END_OF_STREAM) {
ALOGV("stream ended, mbuf %p", mbuf);
- } else {
+ } else if (err != OK) {
ALOGE("error %d reading stream.", err);
}
- handleEOS();
- } else {
- sp<AMessage> notify = mNotify->dup();
-
- notify->setPointer("accessUnit", mbuf);
- notify->post();
-
- msg->post();
}
- break;
- }
- case kWhatPause:
- {
- mPaused = true;
- break;
- }
+ if (mbuf != NULL) {
+ queue->pushBuffer(mbuf);
+ }
- case kWhatResume:
- {
- mPaused = false;
+ queue.unlock();
+
+ if (mbuf != NULL) {
+ mNotify->post();
+ msg->post();
+ } else {
+ handleEOS();
+ }
break;
}
@@ -259,6 +300,11 @@
}
}
+MediaCodecSource::Output::Output()
+ : mEncoderReachedEOS(false),
+ mErrorCode(OK) {
+}
+
// static
sp<MediaCodecSource> MediaCodecSource::Create(
const sp<ALooper> &looper,
@@ -289,21 +335,7 @@
status_t MediaCodecSource::stop() {
sp<AMessage> msg = new AMessage(kWhatStop, mReflector);
- status_t err = postSynchronouslyAndReturnError(msg);
-
- // mPuller->stop() needs to be done outside MediaCodecSource's looper,
- // as it contains a synchronous call to stop the underlying MediaSource,
- // which often waits for all outstanding MediaBuffers to return, but
- // MediaBuffers are only returned when MediaCodecSource looper gets
- // to process them.
-
- if (mPuller != NULL) {
- ALOGI("puller (%s) stopping", mIsVideo ? "video" : "audio");
- mPuller->stop();
- ALOGI("puller (%s) stopped", mIsVideo ? "video" : "audio");
- }
-
- return err;
+ return postSynchronouslyAndReturnError(msg);
}
status_t MediaCodecSource::pause() {
@@ -318,18 +350,18 @@
status_t MediaCodecSource::read(
MediaBuffer** buffer, const ReadOptions* /* options */) {
- Mutex::Autolock autolock(mOutputBufferLock);
+ Mutexed<Output>::Locked output(mOutput);
*buffer = NULL;
- while (mOutputBufferQueue.size() == 0 && !mEncoderReachedEOS) {
- mOutputBufferCond.wait(mOutputBufferLock);
+ while (output->mBufferQueue.size() == 0 && !output->mEncoderReachedEOS) {
+ output.waitForCondition(output->mCond);
}
- if (!mEncoderReachedEOS) {
- *buffer = *mOutputBufferQueue.begin();
- mOutputBufferQueue.erase(mOutputBufferQueue.begin());
+ if (!output->mEncoderReachedEOS) {
+ *buffer = *output->mBufferQueue.begin();
+ output->mBufferQueue.erase(output->mBufferQueue.begin());
return OK;
}
- return mErrorCode;
+ return output->mErrorCode;
}
void MediaCodecSource::signalBufferReturned(MediaBuffer *buffer) {
@@ -357,8 +389,7 @@
mGraphicBufferConsumer(consumer),
mInputBufferTimeOffsetUs(0),
mFirstSampleTimeUs(-1ll),
- mEncoderReachedEOS(false),
- mErrorCode(OK) {
+ mGeneration(0) {
CHECK(mLooper != NULL);
AString mime;
@@ -485,8 +516,11 @@
return err;
}
- mEncoderReachedEOS = false;
- mErrorCode = OK;
+ {
+ Mutexed<Output>::Locked output(mOutput);
+ output->mEncoderReachedEOS = false;
+ output->mErrorCode = OK;
+ }
return OK;
}
@@ -498,14 +532,6 @@
mEncoder->release();
mEncoder.clear();
-
- while (!mInputBufferQueue.empty()) {
- MediaBuffer *mbuf = *mInputBufferQueue.begin();
- mInputBufferQueue.erase(mInputBufferQueue.begin());
- if (mbuf != NULL) {
- mbuf->release();
- }
- }
}
status_t MediaCodecSource::postSynchronouslyAndReturnError(
@@ -525,25 +551,32 @@
}
void MediaCodecSource::signalEOS(status_t err) {
- if (!mEncoderReachedEOS) {
- ALOGV("encoder (%s) reached EOS", mIsVideo ? "video" : "audio");
- {
- Mutex::Autolock autoLock(mOutputBufferLock);
+ bool reachedEOS = false;
+ {
+ Mutexed<Output>::Locked output(mOutput);
+ reachedEOS = output->mEncoderReachedEOS;
+ if (!reachedEOS) {
+ ALOGV("encoder (%s) reached EOS", mIsVideo ? "video" : "audio");
// release all unread media buffers
- for (List<MediaBuffer*>::iterator it = mOutputBufferQueue.begin();
- it != mOutputBufferQueue.end(); it++) {
+ for (List<MediaBuffer*>::iterator it = output->mBufferQueue.begin();
+ it != output->mBufferQueue.end(); it++) {
(*it)->release();
}
- mOutputBufferQueue.clear();
- mEncoderReachedEOS = true;
- mErrorCode = err;
- mOutputBufferCond.signal();
- }
+ output->mBufferQueue.clear();
+ output->mEncoderReachedEOS = true;
+ output->mErrorCode = err;
+ output->mCond.signal();
- releaseEncoder();
+ reachedEOS = true;
+ output.unlock();
+ releaseEncoder();
+ }
}
- if (mStopping && mEncoderReachedEOS) {
+
+ if (mStopping && reachedEOS) {
ALOGI("encoder (%s) stopped", mIsVideo ? "video" : "audio");
+ mPuller->stopSource();
+ ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
// posting reply to everyone that's waiting
List<sp<AReplyToken>>::iterator it;
for (it = mStopReplyIDQueue.begin();
@@ -552,6 +585,7 @@
}
mStopReplyIDQueue.clear();
mStopping = false;
+ ++mGeneration;
}
}
@@ -577,11 +611,8 @@
}
status_t MediaCodecSource::feedEncoderInputBuffers() {
- while (!mInputBufferQueue.empty()
- && !mAvailEncoderInputIndices.empty()) {
- MediaBuffer* mbuf = *mInputBufferQueue.begin();
- mInputBufferQueue.erase(mInputBufferQueue.begin());
-
+ MediaBuffer* mbuf = NULL;
+ while (!mAvailEncoderInputIndices.empty() && mPuller->readBuffer(&mbuf)) {
size_t bufferIndex = *mAvailEncoderInputIndices.begin();
mAvailEncoderInputIndices.erase(mAvailEncoderInputIndices.begin());
@@ -700,30 +731,19 @@
switch (msg->what()) {
case kWhatPullerNotify:
{
- MediaBuffer *mbuf;
- CHECK(msg->findPointer("accessUnit", (void**)&mbuf));
-
- if (mbuf == NULL) {
- ALOGV("puller (%s) reached EOS",
- mIsVideo ? "video" : "audio");
+ int32_t eos = 0;
+ if (msg->findInt32("eos", &eos) && eos) {
+ ALOGV("puller (%s) reached EOS", mIsVideo ? "video" : "audio");
signalEOS();
- }
-
- if (mEncoder == NULL) {
- ALOGV("got msg '%s' after encoder shutdown.",
- msg->debugString().c_str());
-
- if (mbuf != NULL) {
- mbuf->release();
- }
-
break;
}
- mInputBufferQueue.push_back(mbuf);
+ if (mEncoder == NULL) {
+ ALOGV("got msg '%s' after encoder shutdown.", msg->debugString().c_str());
+ break;
+ }
feedEncoderInputBuffers();
-
break;
}
case kWhatEncoderActivity:
@@ -815,9 +835,9 @@
mbuf->add_ref();
{
- Mutex::Autolock autoLock(mOutputBufferLock);
- mOutputBufferQueue.push_back(mbuf);
- mOutputBufferCond.signal();
+ Mutexed<Output>::Locked output(mOutput);
+ output->mBufferQueue.push_back(mbuf);
+ output->mCond.signal();
}
mEncoder->releaseOutputBuffer(index);
@@ -851,7 +871,7 @@
sp<AReplyToken> replyID;
CHECK(msg->senderAwaitsResponse(&replyID));
- if (mEncoderReachedEOS) {
+ if (mOutput.lock()->mEncoderReachedEOS) {
// if we already reached EOS, reply and return now
ALOGI("encoder (%s) already stopped",
mIsVideo ? "video" : "audio");
@@ -869,14 +889,38 @@
mStopping = true;
// if using surface, signal source EOS and wait for EOS to come back.
- // otherwise, release encoder and post EOS if haven't done already
+ // otherwise, stop puller (which also clears the input buffer queue)
+ // and wait for the EOS message. We cannot call source->stop() because
+ // the encoder may still be processing input buffers.
if (mFlags & FLAG_USE_SURFACE_INPUT) {
mEncoder->signalEndOfInputStream();
} else {
- signalEOS();
+ mPuller->stop();
}
+
+ // complete stop even if encoder/puller stalled
+ sp<AMessage> timeoutMsg = new AMessage(kWhatStopStalled, mReflector);
+ timeoutMsg->setInt32("generation", mGeneration);
+ timeoutMsg->post(kStopTimeoutUs);
break;
}
+
+ case kWhatStopStalled:
+ {
+ int32_t generation;
+ CHECK(msg->findInt32("generation", &generation));
+ if (generation != mGeneration) {
+ break;
+ }
+
+ if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
+ ALOGV("source (%s) stopping", mIsVideo ? "video" : "audio");
+ mPuller->stopSource();
+ ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
+ }
+ signalEOS();
+ }
+
case kWhatPause:
{
if (mFlags & FLAG_USE_SURFACE_INPUT) {