Transcoder: Refactor sample writer to not block clients.
This commit fixes an issue with hangs in the transcoder
by not letting samples from all tracks go directly to the
backing muxer. This relies on tracks being synchronized by
the sample reader and that the muxer buffers and interleaves
samples internally.
Test: Transcoder unit tests.
Fixes: 165374867
Change-Id: I99d2dbfa4eb094b7364848a1a8aa3d3d8742140d
diff --git a/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp b/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp
index 64240d4..46f3e9b 100644
--- a/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp
+++ b/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp
@@ -274,102 +274,95 @@
void SetUp() override {
LOG(DEBUG) << "MediaSampleWriterTests set up";
mTestMuxer = std::make_shared<TestMuxer>();
- mSampleQueue = std::make_shared<MediaSampleQueue>();
}
void TearDown() override {
LOG(DEBUG) << "MediaSampleWriterTests tear down";
mTestMuxer.reset();
- mSampleQueue.reset();
}
protected:
std::shared_ptr<TestMuxer> mTestMuxer;
- std::shared_ptr<MediaSampleQueue> mSampleQueue;
std::shared_ptr<TestCallbacks> mTestCallbacks = std::make_shared<TestCallbacks>();
};
TEST_F(MediaSampleWriterTests, TestAddTrackWithoutInit) {
const TestMediaSource& mediaSource = getMediaSource();
- MediaSampleWriter writer{};
- EXPECT_FALSE(writer.addTrack(mSampleQueue, mediaSource.mTrackFormats[0]));
+ std::shared_ptr<MediaSampleWriter> writer = MediaSampleWriter::Create();
+ EXPECT_EQ(writer->addTrack(mediaSource.mTrackFormats[0]), nullptr);
}
TEST_F(MediaSampleWriterTests, TestStartWithoutInit) {
- MediaSampleWriter writer{};
- EXPECT_FALSE(writer.start());
+ std::shared_ptr<MediaSampleWriter> writer = MediaSampleWriter::Create();
+ EXPECT_FALSE(writer->start());
}
TEST_F(MediaSampleWriterTests, TestStartWithoutTracks) {
- MediaSampleWriter writer{};
- EXPECT_TRUE(writer.init(mTestMuxer, mTestCallbacks));
- EXPECT_FALSE(writer.start());
+ std::shared_ptr<MediaSampleWriter> writer = MediaSampleWriter::Create();
+ EXPECT_TRUE(writer->init(mTestMuxer, mTestCallbacks));
+ EXPECT_FALSE(writer->start());
EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::NoEvent);
}
TEST_F(MediaSampleWriterTests, TestAddInvalidTrack) {
- MediaSampleWriter writer{};
- EXPECT_TRUE(writer.init(mTestMuxer, mTestCallbacks));
+ std::shared_ptr<MediaSampleWriter> writer = MediaSampleWriter::Create();
+ EXPECT_TRUE(writer->init(mTestMuxer, mTestCallbacks));
- EXPECT_FALSE(writer.addTrack(mSampleQueue, nullptr));
- EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::NoEvent);
-
- const TestMediaSource& mediaSource = getMediaSource();
- EXPECT_FALSE(writer.addTrack(nullptr, mediaSource.mTrackFormats[0]));
+ EXPECT_EQ(writer->addTrack(nullptr), nullptr);
EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::NoEvent);
}
TEST_F(MediaSampleWriterTests, TestDoubleStartStop) {
- MediaSampleWriter writer{};
+ std::shared_ptr<MediaSampleWriter> writer = MediaSampleWriter::Create();
std::shared_ptr<TestCallbacks> callbacks =
std::make_shared<TestCallbacks>(false /* expectSuccess */);
- EXPECT_TRUE(writer.init(mTestMuxer, callbacks));
+ EXPECT_TRUE(writer->init(mTestMuxer, callbacks));
const TestMediaSource& mediaSource = getMediaSource();
- EXPECT_TRUE(writer.addTrack(mSampleQueue, mediaSource.mTrackFormats[0]));
+ EXPECT_NE(writer->addTrack(mediaSource.mTrackFormats[0]), nullptr);
EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::AddTrack(mediaSource.mTrackFormats[0].get()));
- ASSERT_TRUE(writer.start());
- EXPECT_FALSE(writer.start());
+ ASSERT_TRUE(writer->start());
+ EXPECT_FALSE(writer->start());
- EXPECT_TRUE(writer.stop());
+ EXPECT_TRUE(writer->stop());
EXPECT_TRUE(callbacks->hasFinished());
- EXPECT_FALSE(writer.stop());
+ EXPECT_FALSE(writer->stop());
}
TEST_F(MediaSampleWriterTests, TestStopWithoutStart) {
- MediaSampleWriter writer{};
- EXPECT_TRUE(writer.init(mTestMuxer, mTestCallbacks));
+ std::shared_ptr<MediaSampleWriter> writer = MediaSampleWriter::Create();
+ EXPECT_TRUE(writer->init(mTestMuxer, mTestCallbacks));
const TestMediaSource& mediaSource = getMediaSource();
- EXPECT_TRUE(writer.addTrack(mSampleQueue, mediaSource.mTrackFormats[0]));
+ EXPECT_NE(writer->addTrack(mediaSource.mTrackFormats[0]), nullptr);
EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::AddTrack(mediaSource.mTrackFormats[0].get()));
- EXPECT_FALSE(writer.stop());
+ EXPECT_FALSE(writer->stop());
EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::NoEvent);
}
TEST_F(MediaSampleWriterTests, TestStartWithoutCallback) {
- MediaSampleWriter writer{};
+ std::shared_ptr<MediaSampleWriter> writer = MediaSampleWriter::Create();
std::weak_ptr<MediaSampleWriter::CallbackInterface> unassignedWp;
- EXPECT_FALSE(writer.init(mTestMuxer, unassignedWp));
+ EXPECT_FALSE(writer->init(mTestMuxer, unassignedWp));
std::shared_ptr<MediaSampleWriter::CallbackInterface> unassignedSp;
- EXPECT_FALSE(writer.init(mTestMuxer, unassignedSp));
+ EXPECT_FALSE(writer->init(mTestMuxer, unassignedSp));
const TestMediaSource& mediaSource = getMediaSource();
- EXPECT_FALSE(writer.addTrack(mSampleQueue, mediaSource.mTrackFormats[0]));
- ASSERT_FALSE(writer.start());
+ EXPECT_EQ(writer->addTrack(mediaSource.mTrackFormats[0]), nullptr);
+ ASSERT_FALSE(writer->start());
}
TEST_F(MediaSampleWriterTests, TestProgressUpdate) {
const TestMediaSource& mediaSource = getMediaSource();
- MediaSampleWriter writer{};
- EXPECT_TRUE(writer.init(mTestMuxer, mTestCallbacks));
+ std::shared_ptr<MediaSampleWriter> writer = MediaSampleWriter::Create();
+ EXPECT_TRUE(writer->init(mTestMuxer, mTestCallbacks));
std::shared_ptr<AMediaFormat> videoFormat =
std::shared_ptr<AMediaFormat>(AMediaFormat_new(), &AMediaFormat_delete);
@@ -377,42 +370,41 @@
mediaSource.mTrackFormats[mediaSource.mVideoTrackIndex].get());
AMediaFormat_setInt64(videoFormat.get(), AMEDIAFORMAT_KEY_DURATION, 100);
- EXPECT_TRUE(writer.addTrack(mSampleQueue, videoFormat));
- ASSERT_TRUE(writer.start());
+ auto sampleConsumer = writer->addTrack(videoFormat);
+ EXPECT_NE(sampleConsumer, nullptr);
+ ASSERT_TRUE(writer->start());
for (int64_t pts = 0; pts < 100; ++pts) {
- mSampleQueue->enqueue(newSampleWithPts(pts));
+ sampleConsumer(newSampleWithPts(pts));
}
- mSampleQueue->enqueue(newSampleEos());
+ sampleConsumer(newSampleEos());
mTestCallbacks->waitForWritingFinished();
EXPECT_EQ(mTestCallbacks->getProgressUpdateCount(), 100);
}
TEST_F(MediaSampleWriterTests, TestInterleaving) {
- MediaSampleWriter writer{};
- EXPECT_TRUE(writer.init(mTestMuxer, mTestCallbacks));
+ std::shared_ptr<MediaSampleWriter> writer = MediaSampleWriter::Create();
+ EXPECT_TRUE(writer->init(mTestMuxer, mTestCallbacks));
// Use two tracks for this test.
static constexpr int kNumTracks = 2;
- std::shared_ptr<MediaSampleQueue> sampleQueues[kNumTracks];
- std::vector<std::pair<std::shared_ptr<MediaSample>, size_t>> interleavedSamples;
+ MediaSampleWriter::MediaSampleConsumerFunction sampleConsumers[kNumTracks];
+ std::vector<std::pair<std::shared_ptr<MediaSample>, size_t>> addedSamples;
const TestMediaSource& mediaSource = getMediaSource();
for (int trackIdx = 0; trackIdx < kNumTracks; ++trackIdx) {
- sampleQueues[trackIdx] = std::make_shared<MediaSampleQueue>();
-
auto trackFormat = mediaSource.mTrackFormats[trackIdx % mediaSource.mTrackCount];
- EXPECT_TRUE(writer.addTrack(sampleQueues[trackIdx], trackFormat));
+ sampleConsumers[trackIdx] = writer->addTrack(trackFormat);
+ EXPECT_NE(sampleConsumers[trackIdx], nullptr);
EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::AddTrack(trackFormat.get()));
}
// Create samples in the expected interleaved order for easy verification.
- auto addSampleToTrackWithPts = [&interleavedSamples, &sampleQueues](int trackIndex,
- int64_t pts) {
+ auto addSampleToTrackWithPts = [&addedSamples, &sampleConsumers](int trackIndex, int64_t pts) {
auto sample = newSampleWithPts(pts);
- sampleQueues[trackIndex]->enqueue(sample);
- interleavedSamples.emplace_back(sample, trackIndex);
+ sampleConsumers[trackIndex](sample);
+ addedSamples.emplace_back(sample, trackIndex);
};
addSampleToTrackWithPts(0, 0);
@@ -431,18 +423,24 @@
addSampleToTrackWithPts(1, 13);
for (int trackIndex = 0; trackIndex < kNumTracks; ++trackIndex) {
- sampleQueues[trackIndex]->enqueue(newSampleEos());
+ sampleConsumers[trackIndex](newSampleEos());
}
// Start the writer.
- ASSERT_TRUE(writer.start());
+ ASSERT_TRUE(writer->start());
// Wait for writer to complete.
mTestCallbacks->waitForWritingFinished();
EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::Start());
+ std::sort(addedSamples.begin(), addedSamples.end(),
+ [](const std::pair<std::shared_ptr<MediaSample>, size_t>& left,
+ const std::pair<std::shared_ptr<MediaSample>, size_t>& right) {
+ return left.first->info.presentationTimeUs < right.first->info.presentationTimeUs;
+ });
+
// Verify sample order.
- for (auto entry : interleavedSamples) {
+ for (auto entry : addedSamples) {
auto sample = entry.first;
auto trackIndex = entry.second;
@@ -470,162 +468,10 @@
}
EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::Stop());
- EXPECT_TRUE(writer.stop());
+ EXPECT_TRUE(writer->stop());
EXPECT_TRUE(mTestCallbacks->hasFinished());
}
-TEST_F(MediaSampleWriterTests, TestMaxDivergence) {
- static constexpr uint32_t kMaxDivergenceUs = 10;
-
- MediaSampleWriter writer{kMaxDivergenceUs};
- EXPECT_TRUE(writer.init(mTestMuxer, mTestCallbacks));
-
- // Use two tracks for this test.
- static constexpr int kNumTracks = 2;
- std::shared_ptr<MediaSampleQueue> sampleQueues[kNumTracks];
- const TestMediaSource& mediaSource = getMediaSource();
-
- for (int trackIdx = 0; trackIdx < kNumTracks; ++trackIdx) {
- sampleQueues[trackIdx] = std::make_shared<MediaSampleQueue>();
-
- auto trackFormat = mediaSource.mTrackFormats[trackIdx % mediaSource.mTrackCount];
- EXPECT_TRUE(writer.addTrack(sampleQueues[trackIdx], trackFormat));
- EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::AddTrack(trackFormat.get()));
- }
-
- ASSERT_TRUE(writer.start());
- EXPECT_EQ(mTestMuxer->popEvent(true), TestMuxer::Start());
-
- // The first samples of each track can be written in any order since the writer does not have
- // any previous timestamps to compare.
- sampleQueues[0]->enqueue(newSampleWithPtsOnly(0));
- sampleQueues[1]->enqueue(newSampleWithPtsOnly(1));
- mTestMuxer->popEvent(true);
- mTestMuxer->popEvent(true);
-
- // The writer will now be waiting on track 0 since it has the lowest previous timestamp.
- sampleQueues[0]->enqueue(newSampleWithPtsOnly(kMaxDivergenceUs + 1));
- sampleQueues[0]->enqueue(newSampleWithPtsOnly(kMaxDivergenceUs + 2));
-
- // The writer should dequeue the first sample above but not the second since track 0 now is too
- // far ahead. Instead it should wait for track 1.
- EXPECT_EQ(mTestMuxer->popEvent(true), TestMuxer::WriteSampleWithPts(0, kMaxDivergenceUs + 1));
-
- // Enqueue a sample from track 1 that puts it within acceptable divergence range again. The
- // writer should dequeue that sample and then go back to track 0 since track 1 is empty.
- sampleQueues[1]->enqueue(newSampleWithPtsOnly(kMaxDivergenceUs));
- EXPECT_EQ(mTestMuxer->popEvent(true), TestMuxer::WriteSampleWithPts(1, kMaxDivergenceUs));
- EXPECT_EQ(mTestMuxer->popEvent(true), TestMuxer::WriteSampleWithPts(0, kMaxDivergenceUs + 2));
-
- // Both tracks are now empty so the writer should wait for track 1 which is farthest behind.
- sampleQueues[1]->enqueue(newSampleWithPtsOnly(kMaxDivergenceUs + 3));
- EXPECT_EQ(mTestMuxer->popEvent(true), TestMuxer::WriteSampleWithPts(1, kMaxDivergenceUs + 3));
-
- for (int trackIndex = 0; trackIndex < kNumTracks; ++trackIndex) {
- sampleQueues[trackIndex]->enqueue(newSampleEos());
- }
-
- // Wait for writer to complete.
- mTestCallbacks->waitForWritingFinished();
-
- // Verify EOS samples.
- for (int trackIndex = 0; trackIndex < kNumTracks; ++trackIndex) {
- auto trackFormat = mediaSource.mTrackFormats[trackIndex % mediaSource.mTrackCount];
- int64_t duration = 0;
- AMediaFormat_getInt64(trackFormat.get(), AMEDIAFORMAT_KEY_DURATION, &duration);
-
- // EOS timestamp = first sample timestamp + duration.
- const int64_t endTime = duration + (trackIndex == 1 ? 1 : 0);
- const AMediaCodecBufferInfo info = {0, 0, endTime, AMEDIACODEC_BUFFER_FLAG_END_OF_STREAM};
- EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::WriteSample(trackIndex, nullptr, &info));
- }
-
- EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::Stop());
- EXPECT_TRUE(writer.stop());
- EXPECT_TRUE(mTestCallbacks->hasFinished());
-}
-
-TEST_F(MediaSampleWriterTests, TestTimestampDivergenceOverflow) {
- auto testCallbacks = std::make_shared<TestCallbacks>(false /* expectSuccess */);
- MediaSampleWriter writer{};
- EXPECT_TRUE(writer.init(mTestMuxer, testCallbacks));
-
- // Use two tracks for this test.
- static constexpr int kNumTracks = 2;
- std::shared_ptr<MediaSampleQueue> sampleQueues[kNumTracks];
- const TestMediaSource& mediaSource = getMediaSource();
-
- for (int trackIdx = 0; trackIdx < kNumTracks; ++trackIdx) {
- sampleQueues[trackIdx] = std::make_shared<MediaSampleQueue>();
-
- auto trackFormat = mediaSource.mTrackFormats[trackIdx % mediaSource.mTrackCount];
- EXPECT_TRUE(writer.addTrack(sampleQueues[trackIdx], trackFormat));
- EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::AddTrack(trackFormat.get()));
- }
-
- // Prime track 0 with lower end of INT64 range, and track 1 with positive timestamps making the
- // difference larger than INT64_MAX.
- sampleQueues[0]->enqueue(newSampleWithPtsOnly(INT64_MIN + 1));
- sampleQueues[1]->enqueue(newSampleWithPtsOnly(1000));
- sampleQueues[1]->enqueue(newSampleWithPtsOnly(1001));
-
- ASSERT_TRUE(writer.start());
- EXPECT_EQ(mTestMuxer->popEvent(true), TestMuxer::Start());
-
- // The first sample of each track can be pulled in any order.
- mTestMuxer->popEvent(true);
- mTestMuxer->popEvent(true);
-
- // Wait to make sure the writer compares track 0 empty against track 1 non-empty. The writer
- // should handle the large timestamp differences and chose to wait for track 0 even though
- // track 1 has a sample ready.
- std::this_thread::sleep_for(std::chrono::milliseconds(20));
-
- sampleQueues[0]->enqueue(newSampleWithPtsOnly(INT64_MIN + 2));
- sampleQueues[0]->enqueue(newSampleWithPtsOnly(1000)); // <-- Close the gap between the tracks.
- EXPECT_EQ(mTestMuxer->popEvent(true), TestMuxer::WriteSampleWithPts(0, INT64_MIN + 2));
- EXPECT_EQ(mTestMuxer->popEvent(true), TestMuxer::WriteSampleWithPts(0, 1000));
- EXPECT_EQ(mTestMuxer->popEvent(true), TestMuxer::WriteSampleWithPts(1, 1001));
-
- EXPECT_TRUE(writer.stop());
- EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::Stop());
- EXPECT_TRUE(testCallbacks->hasFinished());
-}
-
-TEST_F(MediaSampleWriterTests, TestAbortInputQueue) {
- MediaSampleWriter writer{};
- std::shared_ptr<TestCallbacks> callbacks =
- std::make_shared<TestCallbacks>(false /* expectSuccess */);
- EXPECT_TRUE(writer.init(mTestMuxer, callbacks));
-
- // Use two tracks for this test.
- static constexpr int kNumTracks = 2;
- std::shared_ptr<MediaSampleQueue> sampleQueues[kNumTracks];
- const TestMediaSource& mediaSource = getMediaSource();
-
- for (int trackIdx = 0; trackIdx < kNumTracks; ++trackIdx) {
- sampleQueues[trackIdx] = std::make_shared<MediaSampleQueue>();
-
- auto trackFormat = mediaSource.mTrackFormats[trackIdx % mediaSource.mTrackCount];
- EXPECT_TRUE(writer.addTrack(sampleQueues[trackIdx], trackFormat));
- EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::AddTrack(trackFormat.get()));
- }
-
- // Start the writer.
- ASSERT_TRUE(writer.start());
-
- // Abort the input queues and wait for the writer to complete.
- for (int trackIdx = 0; trackIdx < kNumTracks; ++trackIdx) {
- sampleQueues[trackIdx]->abort();
- }
-
- callbacks->waitForWritingFinished();
-
- EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::Start());
- EXPECT_EQ(mTestMuxer->popEvent(), TestMuxer::Stop());
- EXPECT_TRUE(writer.stop());
-}
-
// Convenience function for reading a sample from an AMediaExtractor represented as a MediaSample.
static std::shared_ptr<MediaSample> readSampleAndAdvance(AMediaExtractor* extractor,
size_t* trackIndexOut) {
@@ -667,36 +513,35 @@
ASSERT_GT(destinationFd, 0);
// Initialize writer.
- MediaSampleWriter writer{};
- EXPECT_TRUE(writer.init(destinationFd, mTestCallbacks));
+ std::shared_ptr<MediaSampleWriter> writer = MediaSampleWriter::Create();
+ EXPECT_TRUE(writer->init(destinationFd, mTestCallbacks));
close(destinationFd);
// Add tracks.
const TestMediaSource& mediaSource = getMediaSource();
- std::vector<std::shared_ptr<MediaSampleQueue>> inputQueues;
+ std::vector<MediaSampleWriter::MediaSampleConsumerFunction> sampleConsumers;
for (size_t trackIndex = 0; trackIndex < mediaSource.mTrackCount; trackIndex++) {
- inputQueues.push_back(std::make_shared<MediaSampleQueue>());
- EXPECT_TRUE(
- writer.addTrack(inputQueues[trackIndex], mediaSource.mTrackFormats[trackIndex]));
+ auto consumer = writer->addTrack(mediaSource.mTrackFormats[trackIndex]);
+ sampleConsumers.push_back(consumer);
}
// Start the writer.
- ASSERT_TRUE(writer.start());
+ ASSERT_TRUE(writer->start());
// Enqueue samples and finally End Of Stream.
std::shared_ptr<MediaSample> sample;
size_t trackIndex;
while ((sample = readSampleAndAdvance(mediaSource.mExtractor, &trackIndex)) != nullptr) {
- inputQueues[trackIndex]->enqueue(sample);
+ sampleConsumers[trackIndex](sample);
}
for (trackIndex = 0; trackIndex < mediaSource.mTrackCount; trackIndex++) {
- inputQueues[trackIndex]->enqueue(newSampleEos());
+ sampleConsumers[trackIndex](newSampleEos());
}
// Wait for writer.
mTestCallbacks->waitForWritingFinished();
- EXPECT_TRUE(writer.stop());
+ EXPECT_TRUE(writer->stop());
// Compare output file with source.
mediaSource.reset();