Transcoder: Add support for pausing transcoding on a sync frame.
- Added support for stopping transcoders on a sync frame.
- Refactored MediaTrackTranscoders and MediaSampleWriter to stop()
asynchronously.
- Fixed callback and error handling logic in MediaTranscoder.
- Added tests for pause and stopping on sync frame.
Bug: 162886306
Test: Unit tests.
Change-Id: If689a10dfee198c674c4c13b865a7c56a901e075
diff --git a/media/libmediatranscoding/transcoder/MediaTranscoder.cpp b/media/libmediatranscoding/transcoder/MediaTranscoder.cpp
index d89b58f..dc49014 100644
--- a/media/libmediatranscoding/transcoder/MediaTranscoder.cpp
+++ b/media/libmediatranscoding/transcoder/MediaTranscoder.cpp
@@ -69,38 +69,67 @@
return format;
}
-void MediaTranscoder::sendCallback(media_status_t status) {
- // If the transcoder is already cancelled explicitly, don't send any error callbacks.
- // Tracks and sample writer will report errors for abort. However, currently we can't
- // tell it apart from real errors. Ideally we still want to report real errors back
- // to client, as there is a small chance that explicit abort and the real error come
- // at around the same time, we should report that if abort has a specific error code.
- // On the other hand, if the transcoder actually finished (status is AMEDIA_OK) at around
- // the same time of the abort, we should still report the finish back to the client.
- if (mCancelled && status != AMEDIA_OK) {
+void MediaTranscoder::onThreadFinished(const void* thread, media_status_t threadStatus,
+ bool threadStopped) {
+ LOG(DEBUG) << "Thread " << thread << " finished with status " << threadStatus << " stopped "
+ << threadStopped;
+
+ // Stop all threads if one reports an error.
+ if (threadStatus != AMEDIA_OK) {
+ requestStop(false /* stopOnSync */);
+ }
+
+ std::scoped_lock lock{mThreadStateMutex};
+
+ // Record the change.
+ mThreadStates[thread] = DONE;
+ if (threadStatus != AMEDIA_OK && mTranscoderStatus == AMEDIA_OK) {
+ mTranscoderStatus = threadStatus;
+ }
+
+ mTranscoderStopped |= threadStopped;
+
+ // Check if all threads are done. Note that if all transcoders have stopped but the sample
+ // writer has not yet started, it never will.
+ bool transcodersDone = true;
+ ThreadState sampleWriterState = PENDING;
+ for (const auto& it : mThreadStates) {
+ LOG(DEBUG) << " Thread " << it.first << " state" << it.second;
+ if (it.first == static_cast<const void*>(mSampleWriter.get())) {
+ sampleWriterState = it.second;
+ } else {
+ transcodersDone &= (it.second == DONE);
+ }
+ }
+ if (!transcodersDone || sampleWriterState == RUNNING) {
return;
}
- bool expected = false;
- if (mCallbackSent.compare_exchange_strong(expected, true)) {
- if (status == AMEDIA_OK) {
- mCallbacks->onFinished(this);
- } else {
- mCallbacks->onError(this, status);
- }
-
- // Transcoding is done and the callback to the client has been sent, so tear down the
- // pipeline but do it asynchronously to avoid deadlocks. If an error occurred, client
- // should clean up the file.
- std::thread asyncCancelThread{[self = shared_from_this()] { self->cancel(); }};
- asyncCancelThread.detach();
+ // All done. Send callback asynchronously and wake up threads waiting in cancel/pause.
+ mThreadsDone = true;
+ if (!mCallbackSent) {
+ std::thread asyncNotificationThread{[this, self = shared_from_this(),
+ status = mTranscoderStatus,
+ stopped = mTranscoderStopped] {
+ // If the transcoder was stopped that means a caller is waiting in stop or pause
+ // in which case we don't send a callback.
+ if (status != AMEDIA_OK) {
+ mCallbacks->onError(this, status);
+ } else if (!stopped) {
+ mCallbacks->onFinished(this);
+ }
+ mThreadsDoneSignal.notify_all();
+ }};
+ asyncNotificationThread.detach();
+ mCallbackSent = true;
}
}
void MediaTranscoder::onTrackFormatAvailable(const MediaTrackTranscoder* transcoder) {
- LOG(INFO) << "TrackTranscoder " << transcoder << " format available.";
+ LOG(DEBUG) << "TrackTranscoder " << transcoder << " format available.";
std::scoped_lock lock{mTracksAddedMutex};
+ const void* sampleWriterPtr = static_cast<const void*>(mSampleWriter.get());
// Ignore duplicate format change.
if (mTracksAdded.count(transcoder) > 0) {
@@ -111,7 +140,7 @@
auto consumer = mSampleWriter->addTrack(transcoder->getOutputFormat());
if (consumer == nullptr) {
LOG(ERROR) << "Unable to add track to sample writer.";
- sendCallback(AMEDIA_ERROR_UNKNOWN);
+ onThreadFinished(sampleWriterPtr, AMEDIA_ERROR_UNKNOWN, false /* stopped */);
return;
}
@@ -119,34 +148,57 @@
mutableTranscoder->setSampleConsumer(consumer);
mTracksAdded.insert(transcoder);
+ bool errorStarting = false;
if (mTracksAdded.size() == mTrackTranscoders.size()) {
// Enable sequential access mode on the sample reader to achieve optimal read performance.
// This has to wait until all tracks have delivered their output formats and the sample
// writer is started. Otherwise the tracks will not get their output sample queues drained
// and the transcoder could hang due to one track running out of buffers and blocking the
// other tracks from reading source samples before they could output their formats.
- mSampleReader->setEnforceSequentialAccess(true);
- LOG(INFO) << "Starting sample writer.";
- bool started = mSampleWriter->start();
- if (!started) {
- LOG(ERROR) << "Unable to start sample writer.";
- sendCallback(AMEDIA_ERROR_UNKNOWN);
+
+ std::scoped_lock lock{mThreadStateMutex};
+ // Don't start the sample writer if a stop already has been requested.
+ if (!mSampleWriterStopped) {
+ if (!mCancelled) {
+ mSampleReader->setEnforceSequentialAccess(true);
+ }
+ LOG(DEBUG) << "Starting sample writer.";
+ errorStarting = !mSampleWriter->start();
+ if (!errorStarting) {
+ mThreadStates[sampleWriterPtr] = RUNNING;
+ }
}
}
+
+ if (errorStarting) {
+ LOG(ERROR) << "Unable to start sample writer.";
+ onThreadFinished(sampleWriterPtr, AMEDIA_ERROR_UNKNOWN, false /* stopped */);
+ }
}
void MediaTranscoder::onTrackFinished(const MediaTrackTranscoder* transcoder) {
LOG(DEBUG) << "TrackTranscoder " << transcoder << " finished";
+ onThreadFinished(static_cast<const void*>(transcoder), AMEDIA_OK, false /* stopped */);
+}
+
+void MediaTranscoder::onTrackStopped(const MediaTrackTranscoder* transcoder) {
+ LOG(DEBUG) << "TrackTranscoder " << transcoder << " stopped";
+ onThreadFinished(static_cast<const void*>(transcoder), AMEDIA_OK, true /* stopped */);
}
void MediaTranscoder::onTrackError(const MediaTrackTranscoder* transcoder, media_status_t status) {
LOG(ERROR) << "TrackTranscoder " << transcoder << " returned error " << status;
- sendCallback(status);
+ onThreadFinished(static_cast<const void*>(transcoder), status, false /* stopped */);
}
-void MediaTranscoder::onFinished(const MediaSampleWriter* writer __unused, media_status_t status) {
- LOG((status != AMEDIA_OK) ? ERROR : DEBUG) << "Sample writer finished with status " << status;
- sendCallback(status);
+void MediaTranscoder::onFinished(const MediaSampleWriter* writer, media_status_t status) {
+ LOG(status == AMEDIA_OK ? DEBUG : ERROR) << "Sample writer finished with status " << status;
+ onThreadFinished(static_cast<const void*>(writer), status, false /* stopped */);
+}
+
+void MediaTranscoder::onStopped(const MediaSampleWriter* writer) {
+ LOG(DEBUG) << "Sample writer " << writer << " stopped";
+ onThreadFinished(static_cast<const void*>(writer), AMEDIA_OK, true /* stopped */);
}
void MediaTranscoder::onProgressUpdate(const MediaSampleWriter* writer __unused, int32_t progress) {
@@ -276,6 +328,9 @@
return status;
}
+ std::scoped_lock lock{mThreadStateMutex};
+ mThreadStates[static_cast<const void*>(transcoder.get())] = PENDING;
+
mTrackTranscoders.emplace_back(std::move(transcoder));
return AMEDIA_OK;
}
@@ -300,6 +355,8 @@
return AMEDIA_ERROR_UNKNOWN;
}
+ std::scoped_lock lock{mThreadStateMutex};
+ mThreadStates[static_cast<const void*>(mSampleWriter.get())] = PENDING;
return AMEDIA_OK;
}
@@ -313,21 +370,75 @@
}
// Start transcoders
- for (auto& transcoder : mTrackTranscoders) {
- bool started = transcoder->start();
- if (!started) {
- LOG(ERROR) << "Unable to start track transcoder.";
- cancel();
- return AMEDIA_ERROR_UNKNOWN;
+ bool started = true;
+ {
+ std::scoped_lock lock{mThreadStateMutex};
+ for (auto& transcoder : mTrackTranscoders) {
+ if (!(started = transcoder->start())) {
+ break;
+ }
+ mThreadStates[static_cast<const void*>(transcoder.get())] = RUNNING;
}
}
+ if (!started) {
+ LOG(ERROR) << "Unable to start track transcoder.";
+ cancel();
+ return AMEDIA_ERROR_UNKNOWN;
+ }
return AMEDIA_OK;
}
+media_status_t MediaTranscoder::requestStop(bool stopOnSync) {
+ std::scoped_lock lock{mThreadStateMutex};
+ if (mCancelled) {
+ LOG(DEBUG) << "MediaTranscoder already cancelled";
+ return AMEDIA_ERROR_UNSUPPORTED;
+ }
+
+ if (!stopOnSync) {
+ mSampleWriterStopped = true;
+ mSampleWriter->stop();
+ }
+
+ mSampleReader->setEnforceSequentialAccess(false);
+ for (auto& transcoder : mTrackTranscoders) {
+ transcoder->stop(stopOnSync);
+ }
+
+ mCancelled = true;
+ return AMEDIA_OK;
+}
+
+void MediaTranscoder::waitForThreads() NO_THREAD_SAFETY_ANALYSIS {
+ std::unique_lock lock{mThreadStateMutex};
+ while (!mThreadsDone) {
+ mThreadsDoneSignal.wait(lock);
+ }
+}
+
media_status_t MediaTranscoder::pause(std::shared_ptr<ndk::ScopedAParcel>* pausedState) {
+ media_status_t status = requestStop(true /* stopOnSync */);
+ if (status != AMEDIA_OK) {
+ return status;
+ }
+
+ waitForThreads();
+
// TODO: write internal states to parcel.
*pausedState = std::shared_ptr<::ndk::ScopedAParcel>(new ::ndk::ScopedAParcel());
- return cancel();
+ return AMEDIA_OK;
+}
+
+media_status_t MediaTranscoder::cancel() {
+ media_status_t status = requestStop(false /* stopOnSync */);
+ if (status != AMEDIA_OK) {
+ return status;
+ }
+
+ waitForThreads();
+
+ // TODO: Release transcoders?
+ return AMEDIA_OK;
}
media_status_t MediaTranscoder::resume() {
@@ -335,20 +446,4 @@
return start();
}
-media_status_t MediaTranscoder::cancel() {
- bool expected = false;
- if (!mCancelled.compare_exchange_strong(expected, true)) {
- // Already cancelled.
- return AMEDIA_OK;
- }
-
- mSampleWriter->stop();
- mSampleReader->setEnforceSequentialAccess(false);
- for (auto& transcoder : mTrackTranscoders) {
- transcoder->stop();
- }
-
- return AMEDIA_OK;
-}
-
} // namespace android