blob: a042dc3590ea288b4f069582a79fda07e7354762 [file] [log] [blame]
Hongguang4092f2f2021-07-08 18:49:12 -07001/*
2 * Copyright 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "android.hardware.tv.tuner-service.example-Dvr"
19
20#include <aidl/android/hardware/tv/tuner/DemuxQueueNotifyBits.h>
21
22#include <utils/Log.h>
23#include "Dvr.h"
24
25namespace aidl {
26namespace android {
27namespace hardware {
28namespace tv {
29namespace tuner {
30
31#define WAIT_TIMEOUT 3000000000
32
33Dvr::Dvr(DvrType type, uint32_t bufferSize, const std::shared_ptr<IDvrCallback>& cb,
34 std::shared_ptr<Demux> demux) {
35 mType = type;
36 mBufferSize = bufferSize;
37 mCallback = cb;
38 mDemux = demux;
39}
40
41Dvr::~Dvr() {
42 mDvrThreadRunning = false;
43 lock_guard<mutex> lock(mDvrThreadLock);
44}
45
46::ndk::ScopedAStatus Dvr::getQueueDesc(MQDescriptor<int8_t, SynchronizedReadWrite>* out_queue) {
47 ALOGV("%s", __FUNCTION__);
48
49 *out_queue = mDvrMQ->dupeDesc();
50
51 return ::ndk::ScopedAStatus::ok();
52}
53
54::ndk::ScopedAStatus Dvr::configure(const DvrSettings& in_settings) {
55 ALOGV("%s", __FUNCTION__);
56
57 mDvrSettings = in_settings;
58 mDvrConfigured = true;
59
60 return ::ndk::ScopedAStatus::ok();
61}
62
63::ndk::ScopedAStatus Dvr::attachFilter(const std::shared_ptr<IFilter>& in_filter) {
64 ALOGV("%s", __FUNCTION__);
65
66 int64_t filterId;
67 ::ndk::ScopedAStatus status = in_filter->getId64Bit(&filterId);
68 if (!status.isOk()) {
69 return status;
70 }
71
72 if (!mDemux->attachRecordFilter(filterId)) {
73 return ::ndk::ScopedAStatus::fromExceptionCode(STATUS_INVALID_OPERATION);
74 }
75
76 return ::ndk::ScopedAStatus::ok();
77}
78
79::ndk::ScopedAStatus Dvr::detachFilter(const std::shared_ptr<IFilter>& in_filter) {
80 ALOGV("%s", __FUNCTION__);
81
82 int64_t filterId;
83 ::ndk::ScopedAStatus status = in_filter->getId64Bit(&filterId);
84 if (!status.isOk()) {
85 return status;
86 }
87
88 if (!mDemux->detachRecordFilter(filterId)) {
89 return ::ndk::ScopedAStatus::fromExceptionCode(STATUS_INVALID_OPERATION);
90 }
91
92 return ::ndk::ScopedAStatus::ok();
93}
94
95::ndk::ScopedAStatus Dvr::start() {
96 ALOGV("%s", __FUNCTION__);
97 if (mDvrThreadRunning) {
98 return ::ndk::ScopedAStatus::ok();
99 }
100
101 if (!mCallback) {
102 return ::ndk::ScopedAStatus::fromExceptionCode(STATUS_NO_INIT);
103 }
104
105 if (!mDvrConfigured) {
106 return ::ndk::ScopedAStatus::fromExceptionCode(STATUS_INVALID_OPERATION);
107 }
108
109 if (mType == DvrType::PLAYBACK) {
110 mDvrThreadRunning = true;
111 pthread_create(&mDvrThread, NULL, __threadLoopPlayback, this);
112 pthread_setname_np(mDvrThread, "playback_waiting_loop");
113 } else if (mType == DvrType::RECORD) {
114 mRecordStatus = RecordStatus::DATA_READY;
115 mDemux->setIsRecording(mType == DvrType::RECORD);
116 }
117
118 // TODO start another thread to send filter status callback to the framework
119
120 return ::ndk::ScopedAStatus::ok();
121}
122
123::ndk::ScopedAStatus Dvr::stop() {
124 ALOGV("%s", __FUNCTION__);
125
126 mDvrThreadRunning = false;
127 lock_guard<mutex> lock(mDvrThreadLock);
128
129 mIsRecordStarted = false;
130 mDemux->setIsRecording(false);
131
132 return ::ndk::ScopedAStatus::ok();
133}
134
135::ndk::ScopedAStatus Dvr::flush() {
136 ALOGV("%s", __FUNCTION__);
137
138 mRecordStatus = RecordStatus::DATA_READY;
139
140 return ::ndk::ScopedAStatus::ok();
141}
142
143::ndk::ScopedAStatus Dvr::close() {
144 ALOGV("%s", __FUNCTION__);
145
146 mDvrThreadRunning = false;
147 lock_guard<mutex> lock(mDvrThreadLock);
148 return ::ndk::ScopedAStatus::ok();
149}
150
151bool Dvr::createDvrMQ() {
152 ALOGV("%s", __FUNCTION__);
153
154 // Create a synchronized FMQ that supports blocking read/write
155 unique_ptr<DvrMQ> tmpDvrMQ = unique_ptr<DvrMQ>(new (nothrow) DvrMQ(mBufferSize, true));
156 if (!tmpDvrMQ->isValid()) {
157 ALOGW("[Dvr] Failed to create FMQ of DVR");
158 return false;
159 }
160
161 mDvrMQ = move(tmpDvrMQ);
162
163 if (EventFlag::createEventFlag(mDvrMQ->getEventFlagWord(), &mDvrEventFlag) != ::android::OK) {
164 return false;
165 }
166
167 return true;
168}
169
170EventFlag* Dvr::getDvrEventFlag() {
171 return mDvrEventFlag;
172}
173
174void* Dvr::__threadLoopPlayback(void* user) {
175 Dvr* const self = static_cast<Dvr*>(user);
176 self->playbackThreadLoop();
177 return 0;
178}
179
180void Dvr::playbackThreadLoop() {
181 ALOGD("[Dvr] playback threadLoop start.");
182 lock_guard<mutex> lock(mDvrThreadLock);
183
184 while (mDvrThreadRunning) {
185 uint32_t efState = 0;
186 ::android::status_t status =
187 mDvrEventFlag->wait(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY),
188 &efState, WAIT_TIMEOUT, true /* retry on spurious wake */);
189 if (status != ::android::OK) {
190 ALOGD("[Dvr] wait for data ready on the playback FMQ");
191 continue;
192 }
193
194 // If the both dvr playback and dvr record are created, the playback will be treated as
195 // the source of the record. isVirtualFrontend set to true would direct the dvr playback
196 // input to the demux record filters or live broadcast filters.
197 bool isRecording = mDemux->isRecording();
198 bool isVirtualFrontend = isRecording;
199
200 if (mDvrSettings.get<DvrSettings::Tag::playback>().dataFormat == DataFormat::ES) {
201 if (!processEsDataOnPlayback(isVirtualFrontend, isRecording)) {
202 ALOGE("[Dvr] playback es data failed to be filtered. Ending thread");
203 break;
204 }
205 maySendPlaybackStatusCallback();
206 continue;
207 }
208
209 // Our current implementation filter the data and write it into the filter FMQ immediately
210 // after the DATA_READY from the VTS/framework
211 // This is for the non-ES data source, real playback use case handling.
212 if (!readPlaybackFMQ(isVirtualFrontend, isRecording) ||
213 !startFilterDispatcher(isVirtualFrontend, isRecording)) {
214 ALOGE("[Dvr] playback data failed to be filtered. Ending thread");
215 break;
216 }
217
218 maySendPlaybackStatusCallback();
219 }
220
221 mDvrThreadRunning = false;
222 ALOGD("[Dvr] playback thread ended.");
223}
224
225void Dvr::maySendPlaybackStatusCallback() {
226 lock_guard<mutex> lock(mPlaybackStatusLock);
227 int availableToRead = mDvrMQ->availableToRead();
228 int availableToWrite = mDvrMQ->availableToWrite();
229
230 PlaybackStatus newStatus =
231 checkPlaybackStatusChange(availableToWrite, availableToRead,
232 mDvrSettings.get<DvrSettings::Tag::playback>().highThreshold,
233 mDvrSettings.get<DvrSettings::Tag::playback>().lowThreshold);
234 if (mPlaybackStatus != newStatus) {
235 mCallback->onPlaybackStatus(newStatus);
236 mPlaybackStatus = newStatus;
237 }
238}
239
240PlaybackStatus Dvr::checkPlaybackStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
241 uint32_t highThreshold, uint32_t lowThreshold) {
242 if (availableToWrite == 0) {
243 return PlaybackStatus::SPACE_FULL;
244 } else if (availableToRead > highThreshold) {
245 return PlaybackStatus::SPACE_ALMOST_FULL;
246 } else if (availableToRead < lowThreshold) {
247 return PlaybackStatus::SPACE_ALMOST_EMPTY;
248 } else if (availableToRead == 0) {
249 return PlaybackStatus::SPACE_EMPTY;
250 }
251 return mPlaybackStatus;
252}
253
254bool Dvr::readPlaybackFMQ(bool isVirtualFrontend, bool isRecording) {
255 // Read playback data from the input FMQ
256 int size = mDvrMQ->availableToRead();
257 int playbackPacketSize = mDvrSettings.get<DvrSettings::Tag::playback>().packetSize;
258 vector<int8_t> dataOutputBuffer;
259 dataOutputBuffer.resize(playbackPacketSize);
260 // Dispatch the packet to the PID matching filter output buffer
261 for (int i = 0; i < size / playbackPacketSize; i++) {
262 if (!mDvrMQ->read(dataOutputBuffer.data(), playbackPacketSize)) {
263 return false;
264 }
265 if (isVirtualFrontend) {
266 if (isRecording) {
267 mDemux->sendFrontendInputToRecord(dataOutputBuffer);
268 } else {
269 mDemux->startBroadcastTsFilter(dataOutputBuffer);
270 }
271 } else {
272 startTpidFilter(dataOutputBuffer);
273 }
274 }
275
276 return true;
277}
278
279bool Dvr::processEsDataOnPlayback(bool isVirtualFrontend, bool isRecording) {
280 // Read ES from the DVR FMQ
281 // Note that currently we only provides ES with metaData in a specific format to be parsed.
282 // The ES size should be smaller than the Playback FMQ size to avoid reading truncated data.
283 int size = mDvrMQ->availableToRead();
284 vector<int8_t> dataOutputBuffer;
285 dataOutputBuffer.resize(size);
286 if (!mDvrMQ->read(dataOutputBuffer.data(), size)) {
287 return false;
288 }
289
290 int metaDataSize = size;
291 int totalFrames = 0;
292 int videoEsDataSize = 0;
293 int audioEsDataSize = 0;
294 int audioPid = 0;
295 int videoPid = 0;
296
297 vector<MediaEsMetaData> esMeta;
298 int videoReadPointer = 0;
299 int audioReadPointer = 0;
300 int frameCount = 0;
301 // Get meta data from the es
302 for (int i = 0; i < metaDataSize; i++) {
303 switch (dataOutputBuffer[i]) {
304 case 'm':
305 metaDataSize = 0;
306 getMetaDataValue(i, dataOutputBuffer.data(), metaDataSize);
307 videoReadPointer = metaDataSize;
308 continue;
309 case 'l':
310 getMetaDataValue(i, dataOutputBuffer.data(), totalFrames);
311 esMeta.resize(totalFrames);
312 continue;
313 case 'V':
314 getMetaDataValue(i, dataOutputBuffer.data(), videoEsDataSize);
315 audioReadPointer = metaDataSize + videoEsDataSize;
316 continue;
317 case 'A':
318 getMetaDataValue(i, dataOutputBuffer.data(), audioEsDataSize);
319 continue;
320 case 'p':
321 if (dataOutputBuffer[++i] == 'a') {
322 getMetaDataValue(i, dataOutputBuffer.data(), audioPid);
323 } else if (dataOutputBuffer[i] == 'v') {
324 getMetaDataValue(i, dataOutputBuffer.data(), videoPid);
325 }
326 continue;
327 case 'v':
328 case 'a':
329 if (dataOutputBuffer[i + 1] != ',') {
330 ALOGE("[Dvr] Invalid format meta data.");
331 return false;
332 }
333 esMeta[frameCount] = {
334 .isAudio = dataOutputBuffer[i] == 'a' ? true : false,
335 };
336 i += 5; // Move to Len
337 getMetaDataValue(i, dataOutputBuffer.data(), esMeta[frameCount].len);
338 if (esMeta[frameCount].isAudio) {
339 esMeta[frameCount].startIndex = audioReadPointer;
340 audioReadPointer += esMeta[frameCount].len;
341 } else {
342 esMeta[frameCount].startIndex = videoReadPointer;
343 videoReadPointer += esMeta[frameCount].len;
344 }
345 i += 4; // move to PTS
346 getMetaDataValue(i, dataOutputBuffer.data(), esMeta[frameCount].pts);
347 frameCount++;
348 continue;
349 default:
350 continue;
351 }
352 }
353
354 if (frameCount != totalFrames) {
355 ALOGE("[Dvr] Invalid meta data, frameCount=%d, totalFrames reported=%d", frameCount,
356 totalFrames);
357 return false;
358 }
359
360 if (metaDataSize + audioEsDataSize + videoEsDataSize != size) {
361 ALOGE("[Dvr] Invalid meta data, metaSize=%d, videoSize=%d, audioSize=%d, totolSize=%d",
362 metaDataSize, videoEsDataSize, audioEsDataSize, size);
363 return false;
364 }
365
366 // Read es raw data from the FMQ per meta data built previously
367 vector<int8_t> frameData;
368 map<int64_t, std::shared_ptr<IFilter>>::iterator it;
369 int pid = 0;
370 for (int i = 0; i < totalFrames; i++) {
371 frameData.resize(esMeta[i].len);
372 pid = esMeta[i].isAudio ? audioPid : videoPid;
373 memcpy(frameData.data(), dataOutputBuffer.data() + esMeta[i].startIndex, esMeta[i].len);
374 // Send to the media filters or record filters
375 if (!isRecording) {
376 for (it = mFilters.begin(); it != mFilters.end(); it++) {
377 if (pid == mDemux->getFilterTpid(it->first)) {
378 mDemux->updateMediaFilterOutput(it->first, frameData,
379 static_cast<uint64_t>(esMeta[i].pts));
380 }
381 }
382 } else {
383 mDemux->sendFrontendInputToRecord(frameData, pid, static_cast<uint64_t>(esMeta[i].pts));
384 }
385 startFilterDispatcher(isVirtualFrontend, isRecording);
386 frameData.clear();
387 }
388
389 return true;
390}
391
392void Dvr::getMetaDataValue(int& index, int8_t* dataOutputBuffer, int& value) {
393 index += 2; // Move the pointer across the ":" to the value
394 while (dataOutputBuffer[index] != ',' && dataOutputBuffer[index] != '\n') {
395 value = ((dataOutputBuffer[index++] - 48) + value * 10);
396 }
397}
398
399void Dvr::startTpidFilter(vector<int8_t> data) {
400 map<int64_t, std::shared_ptr<IFilter>>::iterator it;
401 for (it = mFilters.begin(); it != mFilters.end(); it++) {
402 uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
403 if (DEBUG_DVR) {
404 ALOGW("[Dvr] start ts filter pid: %d", pid);
405 }
406 if (pid == mDemux->getFilterTpid(it->first)) {
407 mDemux->updateFilterOutput(it->first, data);
408 }
409 }
410}
411
412bool Dvr::startFilterDispatcher(bool isVirtualFrontend, bool isRecording) {
413 if (isVirtualFrontend) {
414 if (isRecording) {
415 return mDemux->startRecordFilterDispatcher();
416 } else {
417 return mDemux->startBroadcastFilterDispatcher();
418 }
419 }
420
421 map<int64_t, std::shared_ptr<IFilter>>::iterator it;
422 // Handle the output data per filter type
423 for (it = mFilters.begin(); it != mFilters.end(); it++) {
424 if (mDemux->startFilterHandler(it->first).isOk()) {
425 return false;
426 }
427 }
428
429 return true;
430}
431
432bool Dvr::writeRecordFMQ(const vector<int8_t>& data) {
433 lock_guard<mutex> lock(mWriteLock);
434 if (mRecordStatus == RecordStatus::OVERFLOW) {
435 ALOGW("[Dvr] stops writing and wait for the client side flushing.");
436 return true;
437 }
438 if (mDvrMQ->write(data.data(), data.size())) {
439 mDvrEventFlag->wake(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY));
440 maySendRecordStatusCallback();
441 return true;
442 }
443
444 maySendRecordStatusCallback();
445 return false;
446}
447
448void Dvr::maySendRecordStatusCallback() {
449 lock_guard<mutex> lock(mRecordStatusLock);
450 int availableToRead = mDvrMQ->availableToRead();
451 int availableToWrite = mDvrMQ->availableToWrite();
452
453 RecordStatus newStatus =
454 checkRecordStatusChange(availableToWrite, availableToRead,
455 mDvrSettings.get<DvrSettings::Tag::record>().highThreshold,
456 mDvrSettings.get<DvrSettings::Tag::record>().lowThreshold);
457 if (mRecordStatus != newStatus) {
458 mCallback->onRecordStatus(newStatus);
459 mRecordStatus = newStatus;
460 }
461}
462
463RecordStatus Dvr::checkRecordStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
464 uint32_t highThreshold, uint32_t lowThreshold) {
465 if (availableToWrite == 0) {
466 return RecordStatus::OVERFLOW;
467 } else if (availableToRead > highThreshold) {
468 return RecordStatus::HIGH_WATER;
469 } else if (availableToRead < lowThreshold) {
470 return RecordStatus::LOW_WATER;
471 }
472 return mRecordStatus;
473}
474
475bool Dvr::addPlaybackFilter(int64_t filterId, std::shared_ptr<IFilter> filter) {
476 mFilters[filterId] = filter;
477 return true;
478}
479
480bool Dvr::removePlaybackFilter(int64_t filterId) {
481 mFilters.erase(filterId);
482 return true;
483}
484
485} // namespace tuner
486} // namespace tv
487} // namespace hardware
488} // namespace android
489} // namespace aidl