Transcoder: Refactor sample writer to not block clients.
This commit fixes an issue with hangs in the transcoder
by not letting samples from all tracks go directly to the
backing muxer. This relies on tracks being synchronized by
the sample reader and that the muxer buffers and interleaves
samples internally.
Test: Transcoder unit tests.
Fixes: 165374867
Change-Id: I99d2dbfa4eb094b7364848a1a8aa3d3d8742140d
diff --git a/media/libmediatranscoding/transcoder/tests/MediaTrackTranscoderTests.cpp b/media/libmediatranscoding/transcoder/tests/MediaTrackTranscoderTests.cpp
index a46c2bd..83f0a4a 100644
--- a/media/libmediatranscoding/transcoder/tests/MediaTrackTranscoderTests.cpp
+++ b/media/libmediatranscoding/transcoder/tests/MediaTrackTranscoderTests.cpp
@@ -60,7 +60,6 @@
break;
}
ASSERT_NE(mTranscoder, nullptr);
- mTranscoderOutputQueue = mTranscoder->getOutputQueue();
initSampleReader();
}
@@ -115,34 +114,29 @@
}
// Drains the transcoder's output queue in a loop.
- void drainOutputSampleQueue() {
- mSampleQueueDrainThread = std::thread{[this] {
- std::shared_ptr<MediaSample> sample;
- bool aborted = false;
- do {
- aborted = mTranscoderOutputQueue->dequeue(&sample);
- } while (!aborted && !(sample->info.flags & SAMPLE_FLAG_END_OF_STREAM));
- mQueueWasAborted = aborted;
- mGotEndOfStream =
- sample != nullptr && (sample->info.flags & SAMPLE_FLAG_END_OF_STREAM) != 0;
- }};
+ void drainOutputSamples(int numSamplesToSave = 0) {
+ mTranscoder->setSampleConsumer(
+ [this, numSamplesToSave](const std::shared_ptr<MediaSample>& sample) {
+ ASSERT_NE(sample, nullptr);
+
+ mGotEndOfStream = (sample->info.flags & SAMPLE_FLAG_END_OF_STREAM) != 0;
+
+ if (mSavedSamples.size() < numSamplesToSave) {
+ mSavedSamples.push_back(sample);
+ }
+
+ if (mSavedSamples.size() == numSamplesToSave || mGotEndOfStream) {
+ mSamplesSavedSemaphore.signal();
+ }
+ });
}
- void joinDrainThread() {
- if (mSampleQueueDrainThread.joinable()) {
- mSampleQueueDrainThread.join();
- }
- }
- void TearDown() override {
- LOG(DEBUG) << "MediaTrackTranscoderTests tear down";
- joinDrainThread();
- }
+ void TearDown() override { LOG(DEBUG) << "MediaTrackTranscoderTests tear down"; }
~MediaTrackTranscoderTests() { LOG(DEBUG) << "MediaTrackTranscoderTests destroyed"; }
protected:
std::shared_ptr<MediaTrackTranscoder> mTranscoder;
- std::shared_ptr<MediaSampleQueue> mTranscoderOutputQueue;
std::shared_ptr<TestCallback> mCallback;
std::shared_ptr<MediaSampleReader> mMediaSampleReader;
@@ -151,8 +145,8 @@
std::shared_ptr<AMediaFormat> mSourceFormat;
std::shared_ptr<AMediaFormat> mDestinationFormat;
- std::thread mSampleQueueDrainThread;
- bool mQueueWasAborted = false;
+ std::vector<std::shared_ptr<MediaSample>> mSavedSamples;
+ OneShotSemaphore mSamplesSavedSemaphore;
bool mGotEndOfStream = false;
};
@@ -161,11 +155,9 @@
EXPECT_EQ(mTranscoder->configure(mMediaSampleReader, mTrackIndex, mDestinationFormat),
AMEDIA_OK);
ASSERT_TRUE(mTranscoder->start());
- drainOutputSampleQueue();
+ drainOutputSamples();
EXPECT_EQ(mCallback->waitUntilFinished(), AMEDIA_OK);
- joinDrainThread();
EXPECT_TRUE(mTranscoder->stop());
- EXPECT_FALSE(mQueueWasAborted);
EXPECT_TRUE(mGotEndOfStream);
}
@@ -229,49 +221,27 @@
EXPECT_EQ(mTranscoder->configure(mMediaSampleReader, mTrackIndex, mDestinationFormat),
AMEDIA_OK);
ASSERT_TRUE(mTranscoder->start());
- drainOutputSampleQueue();
+ drainOutputSamples();
EXPECT_EQ(mCallback->waitUntilFinished(), AMEDIA_OK);
- joinDrainThread();
EXPECT_TRUE(mTranscoder->stop());
EXPECT_FALSE(mTranscoder->start());
- EXPECT_FALSE(mQueueWasAborted);
EXPECT_TRUE(mGotEndOfStream);
}
-TEST_P(MediaTrackTranscoderTests, AbortOutputQueue) {
- LOG(DEBUG) << "Testing AbortOutputQueue";
- EXPECT_EQ(mTranscoder->configure(mMediaSampleReader, mTrackIndex, mDestinationFormat),
- AMEDIA_OK);
- ASSERT_TRUE(mTranscoder->start());
- mTranscoderOutputQueue->abort();
- drainOutputSampleQueue();
- EXPECT_EQ(mCallback->waitUntilFinished(), AMEDIA_ERROR_IO);
- joinDrainThread();
- EXPECT_TRUE(mTranscoder->stop());
- EXPECT_TRUE(mQueueWasAborted);
- EXPECT_FALSE(mGotEndOfStream);
-}
-
TEST_P(MediaTrackTranscoderTests, HoldSampleAfterTranscoderRelease) {
LOG(DEBUG) << "Testing HoldSampleAfterTranscoderRelease";
EXPECT_EQ(mTranscoder->configure(mMediaSampleReader, mTrackIndex, mDestinationFormat),
AMEDIA_OK);
ASSERT_TRUE(mTranscoder->start());
-
- std::shared_ptr<MediaSample> sample;
- EXPECT_FALSE(mTranscoderOutputQueue->dequeue(&sample));
-
- drainOutputSampleQueue();
+ drainOutputSamples(1 /* numSamplesToSave */);
EXPECT_EQ(mCallback->waitUntilFinished(), AMEDIA_OK);
- joinDrainThread();
EXPECT_TRUE(mTranscoder->stop());
- EXPECT_FALSE(mQueueWasAborted);
EXPECT_TRUE(mGotEndOfStream);
mTranscoder.reset();
- mTranscoderOutputQueue.reset();
+
std::this_thread::sleep_for(std::chrono::milliseconds(20));
- sample.reset();
+ mSavedSamples.clear();
}
TEST_P(MediaTrackTranscoderTests, HoldSampleAfterTranscoderStop) {
@@ -279,13 +249,12 @@
EXPECT_EQ(mTranscoder->configure(mMediaSampleReader, mTrackIndex, mDestinationFormat),
AMEDIA_OK);
ASSERT_TRUE(mTranscoder->start());
-
- std::shared_ptr<MediaSample> sample;
- EXPECT_FALSE(mTranscoderOutputQueue->dequeue(&sample));
+ drainOutputSamples(1 /* numSamplesToSave */);
+ mSamplesSavedSemaphore.wait();
EXPECT_TRUE(mTranscoder->stop());
std::this_thread::sleep_for(std::chrono::milliseconds(20));
- sample.reset();
+ mSavedSamples.clear();
}
TEST_P(MediaTrackTranscoderTests, NullSampleReader) {