Transcoder: Refactor sample writer to not block clients.
This commit fixes an issue with hangs in the transcoder
by not letting samples from all tracks go directly to the
backing muxer. This relies on tracks being synchronized by
the sample reader and that the muxer buffers and interleaves
samples internally.
Test: Transcoder unit tests.
Fixes: 165374867
Change-Id: I99d2dbfa4eb094b7364848a1a8aa3d3d8742140d
diff --git a/media/libmediatranscoding/transcoder/tests/VideoTrackTranscoderTests.cpp b/media/libmediatranscoding/transcoder/tests/VideoTrackTranscoderTests.cpp
index e809cbd..1b5bd13 100644
--- a/media/libmediatranscoding/transcoder/tests/VideoTrackTranscoderTests.cpp
+++ b/media/libmediatranscoding/transcoder/tests/VideoTrackTranscoderTests.cpp
@@ -102,46 +102,40 @@
AMEDIA_OK);
ASSERT_TRUE(transcoder->start());
- std::shared_ptr<MediaSampleQueue> outputQueue = transcoder->getOutputQueue();
- std::thread sampleConsumerThread{[&outputQueue] {
- uint64_t sampleCount = 0;
- std::shared_ptr<MediaSample> sample;
- while (!outputQueue->dequeue(&sample)) {
- ASSERT_NE(sample, nullptr);
- const uint32_t flags = sample->info.flags;
+ bool eos = false;
+ uint64_t sampleCount = 0;
+ transcoder->setSampleConsumer([&sampleCount, &eos](const std::shared_ptr<MediaSample>& sample) {
+ ASSERT_NE(sample, nullptr);
+ const uint32_t flags = sample->info.flags;
- if (sampleCount == 0) {
- // Expect first sample to be a codec config.
- EXPECT_TRUE((flags & SAMPLE_FLAG_CODEC_CONFIG) != 0);
- EXPECT_TRUE((flags & SAMPLE_FLAG_SYNC_SAMPLE) == 0);
- EXPECT_TRUE((flags & SAMPLE_FLAG_END_OF_STREAM) == 0);
- EXPECT_TRUE((flags & SAMPLE_FLAG_PARTIAL_FRAME) == 0);
- } else if (sampleCount == 1) {
- // Expect second sample to be a sync sample.
- EXPECT_TRUE((flags & SAMPLE_FLAG_CODEC_CONFIG) == 0);
- EXPECT_TRUE((flags & SAMPLE_FLAG_SYNC_SAMPLE) != 0);
- EXPECT_TRUE((flags & SAMPLE_FLAG_END_OF_STREAM) == 0);
- }
-
- if (!(flags & SAMPLE_FLAG_END_OF_STREAM)) {
- // Expect a valid buffer unless it is EOS.
- EXPECT_NE(sample->buffer, nullptr);
- EXPECT_NE(sample->bufferId, 0xBAADF00D);
- EXPECT_GT(sample->info.size, 0);
- }
-
- ++sampleCount;
- if (sample->info.flags & SAMPLE_FLAG_END_OF_STREAM) {
- break;
- }
- sample.reset();
+ if (sampleCount == 0) {
+ // Expect first sample to be a codec config.
+ EXPECT_TRUE((flags & SAMPLE_FLAG_CODEC_CONFIG) != 0);
+ EXPECT_TRUE((flags & SAMPLE_FLAG_SYNC_SAMPLE) == 0);
+ EXPECT_TRUE((flags & SAMPLE_FLAG_END_OF_STREAM) == 0);
+ EXPECT_TRUE((flags & SAMPLE_FLAG_PARTIAL_FRAME) == 0);
+ } else if (sampleCount == 1) {
+ // Expect second sample to be a sync sample.
+ EXPECT_TRUE((flags & SAMPLE_FLAG_CODEC_CONFIG) == 0);
+ EXPECT_TRUE((flags & SAMPLE_FLAG_SYNC_SAMPLE) != 0);
+ EXPECT_TRUE((flags & SAMPLE_FLAG_END_OF_STREAM) == 0);
}
- }};
+
+ if (!(flags & SAMPLE_FLAG_END_OF_STREAM)) {
+ // Expect a valid buffer unless it is EOS.
+ EXPECT_NE(sample->buffer, nullptr);
+ EXPECT_NE(sample->bufferId, 0xBAADF00D);
+ EXPECT_GT(sample->info.size, 0);
+ } else {
+ EXPECT_FALSE(eos);
+ eos = true;
+ }
+
+ ++sampleCount;
+ });
EXPECT_EQ(callback->waitUntilFinished(), AMEDIA_OK);
EXPECT_TRUE(transcoder->stop());
-
- sampleConsumerThread.join();
}
TEST_F(VideoTrackTranscoderTests, PreserveBitrate) {
@@ -167,7 +161,6 @@
ASSERT_NE(outputFormat, nullptr);
ASSERT_TRUE(transcoder->stop());
- transcoder->getOutputQueue()->abort();
int32_t outBitrate;
EXPECT_TRUE(AMediaFormat_getInt32(outputFormat.get(), AMEDIAFORMAT_KEY_BIT_RATE, &outBitrate));
@@ -187,25 +180,7 @@
}
TEST_F(VideoTrackTranscoderTests, LingeringEncoder) {
- struct {
- void wait() {
- std::unique_lock<std::mutex> lock(mMutex);
- while (!mSignaled) {
- mCondition.wait(lock);
- }
- }
-
- void signal() {
- std::unique_lock<std::mutex> lock(mMutex);
- mSignaled = true;
- mCondition.notify_all();
- }
-
- std::mutex mMutex;
- std::condition_variable mCondition;
- bool mSignaled = false;
- } semaphore;
-
+ OneShotSemaphore semaphore;
auto callback = std::make_shared<TestCallback>();
auto transcoder = VideoTrackTranscoder::create(callback);
@@ -214,29 +189,24 @@
AMEDIA_OK);
ASSERT_TRUE(transcoder->start());
- std::shared_ptr<MediaSampleQueue> outputQueue = transcoder->getOutputQueue();
std::vector<std::shared_ptr<MediaSample>> samples;
- std::thread sampleConsumerThread([&outputQueue, &samples, &semaphore] {
- std::shared_ptr<MediaSample> sample;
- while (samples.size() < 4 && !outputQueue->dequeue(&sample)) {
- ASSERT_NE(sample, nullptr);
- samples.push_back(sample);
+ transcoder->setSampleConsumer(
+ [&samples, &semaphore](const std::shared_ptr<MediaSample>& sample) {
+ if (samples.size() >= 4) return;
- if (sample->info.flags & SAMPLE_FLAG_END_OF_STREAM) {
- break;
- }
- sample.reset();
- }
+ ASSERT_NE(sample, nullptr);
+ samples.push_back(sample);
- semaphore.signal();
- });
+ if (samples.size() == 4 || sample->info.flags & SAMPLE_FLAG_END_OF_STREAM) {
+ semaphore.signal();
+ }
+ });
// Wait for the encoder to output samples before stopping and releasing the transcoder.
semaphore.wait();
EXPECT_TRUE(transcoder->stop());
transcoder.reset();
- sampleConsumerThread.join();
// Return buffers to the codec so that it can resume processing, but keep one buffer to avoid
// the codec being released.