Merge "Properly order the data written out to the transport stream by timestamp."
diff --git a/media/libstagefright/MPEG2TSWriter.cpp b/media/libstagefright/MPEG2TSWriter.cpp
index 4d8165e..4e4f289 100644
--- a/media/libstagefright/MPEG2TSWriter.cpp
+++ b/media/libstagefright/MPEG2TSWriter.cpp
@@ -42,12 +42,21 @@
     unsigned streamType() const;
     unsigned incrementContinuityCounter();
 
+    void readMore();
+
     enum {
         kNotifyStartFailed,
         kNotifyBuffer,
         kNotifyReachedEOS,
     };
 
+    sp<ABuffer> lastAccessUnit();
+    int64_t lastAccessUnitTimeUs();
+    void setLastAccessUnit(const sp<ABuffer> &accessUnit);
+
+    void setEOSReceived();
+    bool eosReceived() const;
+
 protected:
     virtual void onMessageReceived(const sp<AMessage> &msg);
 
@@ -67,13 +76,16 @@
 
     sp<ABuffer> mAACBuffer;
 
+    sp<ABuffer> mLastAccessUnit;
+    bool mEOSReceived;
+
     unsigned mStreamType;
     unsigned mContinuityCounter;
 
     void extractCodecSpecificData();
 
-    void appendAACFrames(MediaBuffer *buffer);
-    void flushAACFrames();
+    bool appendAACFrames(MediaBuffer *buffer);
+    bool flushAACFrames();
 
     void postAVCFrame(MediaBuffer *buffer);
 
@@ -83,6 +95,7 @@
 MPEG2TSWriter::SourceInfo::SourceInfo(const sp<MediaSource> &source)
     : mSource(source),
       mLooper(new ALooper),
+      mEOSReceived(false),
       mStreamType(0),
       mContinuityCounter(0) {
     mLooper->setName("MPEG2TSWriter source");
@@ -232,6 +245,7 @@
     sp<AMessage> notify = mNotify->dup();
     notify->setInt32("what", kNotifyBuffer);
     notify->setObject("buffer", out);
+    notify->setInt32("oob", true);
     notify->post();
 }
 
@@ -260,11 +274,13 @@
     notify->post();
 }
 
-void MPEG2TSWriter::SourceInfo::appendAACFrames(MediaBuffer *buffer) {
+bool MPEG2TSWriter::SourceInfo::appendAACFrames(MediaBuffer *buffer) {
+    bool accessUnitPosted = false;
+
     if (mAACBuffer != NULL
             && mAACBuffer->size() + 7 + buffer->range_length()
                     > mAACBuffer->capacity()) {
-        flushAACFrames();
+        accessUnitPosted = flushAACFrames();
     }
 
     if (mAACBuffer == NULL) {
@@ -324,11 +340,13 @@
     ptr += buffer->range_length();
 
     mAACBuffer->setRange(0, ptr - mAACBuffer->data());
+
+    return accessUnitPosted;
 }
 
-void MPEG2TSWriter::SourceInfo::flushAACFrames() {
+bool MPEG2TSWriter::SourceInfo::flushAACFrames() {
     if (mAACBuffer == NULL) {
-        return;
+        return false;
     }
 
     sp<AMessage> notify = mNotify->dup();
@@ -337,6 +355,12 @@
     notify->post();
 
     mAACBuffer.clear();
+
+    return true;
+}
+
+void MPEG2TSWriter::SourceInfo::readMore() {
+    (new AMessage(kWhatRead, id()))->post();
 }
 
 void MPEG2TSWriter::SourceInfo::onMessageReceived(const sp<AMessage> &msg) {
@@ -353,7 +377,7 @@
 
             extractCodecSpecificData();
 
-            (new AMessage(kWhatRead, id()))->post();
+            readMore();
             break;
         }
 
@@ -388,7 +412,9 @@
                            buffer->range_length());
                 } else if (buffer->range_length() > 0) {
                     if (mStreamType == 0x0f) {
-                        appendAACFrames(buffer);
+                        if (!appendAACFrames(buffer)) {
+                            msg->post();
+                        }
                     } else {
                         postAVCFrame(buffer);
                     }
@@ -398,7 +424,7 @@
                 buffer = NULL;
             }
 
-            msg->post();
+            // Do not read more data until told to.
             break;
         }
 
@@ -407,6 +433,35 @@
     }
 }
 
+sp<ABuffer> MPEG2TSWriter::SourceInfo::lastAccessUnit() {
+    return mLastAccessUnit;
+}
+
+void MPEG2TSWriter::SourceInfo::setLastAccessUnit(
+        const sp<ABuffer> &accessUnit) {
+    mLastAccessUnit = accessUnit;
+}
+
+int64_t MPEG2TSWriter::SourceInfo::lastAccessUnitTimeUs() {
+    if (mLastAccessUnit == NULL) {
+        return -1;
+    }
+
+    int64_t timeUs;
+    CHECK(mLastAccessUnit->meta()->findInt64("timeUs", &timeUs));
+
+    return timeUs;
+}
+
+void MPEG2TSWriter::SourceInfo::setEOSReceived() {
+    CHECK(!mEOSReceived);
+    mEOSReceived = true;
+}
+
+bool MPEG2TSWriter::SourceInfo::eosReceived() const {
+    return mEOSReceived;
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 
 MPEG2TSWriter::MPEG2TSWriter(int fd)
@@ -527,15 +582,89 @@
 
             if (what == SourceInfo::kNotifyReachedEOS
                     || what == SourceInfo::kNotifyStartFailed) {
+                sp<SourceInfo> source = mSources.editItemAt(sourceIndex);
+                source->setEOSReceived();
+
+                sp<ABuffer> buffer = source->lastAccessUnit();
+                source->setLastAccessUnit(NULL);
+
+                if (buffer != NULL) {
+                    writeTS();
+                    writeAccessUnit(sourceIndex, buffer);
+                }
+
                 ++mNumSourcesDone;
             } else if (what == SourceInfo::kNotifyBuffer) {
                 sp<RefBase> obj;
                 CHECK(msg->findObject("buffer", &obj));
 
-                writeTS();
-
                 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
-                writeAccessUnit(sourceIndex, buffer);
+
+                int32_t oob;
+                if (msg->findInt32("oob", &oob) && oob) {
+                    // This is codec specific data delivered out of band.
+                    // It can be written out immediately.
+                    writeTS();
+                    writeAccessUnit(sourceIndex, buffer);
+                    break;
+                }
+
+                // We don't just write out data as we receive it from
+                // the various sources. That would essentially write them
+                // out in random order (as the thread scheduler determines
+                // how the messages are dispatched).
+                // Instead we gather an access unit for all tracks and
+                // write out the one with the smallest timestamp, then
+                // request more data for the written out track.
+                // Rinse, repeat.
+                // If we don't have data on any track we don't write
+                // anything just yet.
+
+                sp<SourceInfo> source = mSources.editItemAt(sourceIndex);
+
+                CHECK(source->lastAccessUnit() == NULL);
+                source->setLastAccessUnit(buffer);
+
+                LOGV("lastAccessUnitTimeUs[%d] = %.2f secs",
+                     sourceIndex, source->lastAccessUnitTimeUs() / 1E6);
+
+                int64_t minTimeUs = -1;
+                size_t minIndex = 0;
+
+                for (size_t i = 0; i < mSources.size(); ++i) {
+                    const sp<SourceInfo> &source = mSources.editItemAt(i);
+
+                    if (source->eosReceived()) {
+                        continue;
+                    }
+
+                    int64_t timeUs = source->lastAccessUnitTimeUs();
+                    if (timeUs < 0) {
+                        minTimeUs = -1;
+                        break;
+                    } else if (minTimeUs < 0 || timeUs < minTimeUs) {
+                        minTimeUs = timeUs;
+                        minIndex = i;
+                    }
+                }
+
+                if (minTimeUs < 0) {
+                    LOGV("not a all tracks have valid data.");
+                    break;
+                }
+
+                LOGV("writing access unit at time %.2f secs (index %d)",
+                     minTimeUs / 1E6, minIndex);
+
+                source = mSources.editItemAt(minIndex);
+
+                buffer = source->lastAccessUnit();
+                source->setLastAccessUnit(NULL);
+
+                writeTS();
+                writeAccessUnit(minIndex, buffer);
+
+                source->readMore();
             }
             break;
         }