blob: 15e8aaf669e46afe7ef7bd47a296faf14c247cf3 [file] [log] [blame]
/*
* Copyright (C) 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "android.hardware.tv.tuner@1.0-Demux"
#include "Demux.h"
#include <utils/Log.h>
namespace android {
namespace hardware {
namespace tv {
namespace tuner {
namespace V1_0 {
namespace implementation {
#define WAIT_TIMEOUT 3000000000
const std::vector<uint8_t> fakeDataInputBuffer{
0x00, 0x00, 0x00, 0x01, 0x09, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x67, 0x42, 0xc0, 0x1e, 0xdb,
0x01, 0x40, 0x16, 0xec, 0x04, 0x40, 0x00, 0x00, 0x03, 0x00, 0x40, 0x00, 0x00, 0x0f, 0x03,
0xc5, 0x8b, 0xb8, 0x00, 0x00, 0x00, 0x01, 0x68, 0xca, 0x8c, 0xb2, 0x00, 0x00, 0x01, 0x06,
0x05, 0xff, 0xff, 0x70, 0xdc, 0x45, 0xe9, 0xbd, 0xe6, 0xd9, 0x48, 0xb7, 0x96, 0x2c, 0xd8,
0x20, 0xd9, 0x23, 0xee, 0xef, 0x78, 0x32, 0x36, 0x34, 0x20, 0x2d, 0x20, 0x63, 0x6f, 0x72,
0x65, 0x20, 0x31, 0x34, 0x32, 0x20, 0x2d, 0x20, 0x48, 0x2e, 0x32, 0x36, 0x34, 0x2f, 0x4d,
0x50, 0x45, 0x47, 0x2d, 0x34, 0x20, 0x41, 0x56, 0x43, 0x20, 0x63, 0x6f, 0x64, 0x65, 0x63,
0x20, 0x2d, 0x20, 0x43, 0x6f, 0x70, 0x79, 0x6c, 0x65, 0x66, 0x74, 0x20, 0x32, 0x30, 0x30,
0x33, 0x2d, 0x32, 0x30, 0x31, 0x34, 0x20, 0x2d, 0x20, 0x68, 0x74, 0x74, 0x70, 0x3a, 0x2f,
0x2f, 0x77, 0x77, 0x77, 0x2e, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x6c, 0x61, 0x6e, 0x2e, 0x6f,
0x72, 0x67, 0x2f, 0x78, 0x32, 0x36, 0x34, 0x2e, 0x68, 0x74, 0x6d, 0x6c, 0x20, 0x2d, 0x20,
0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x3a, 0x20, 0x63, 0x61, 0x62, 0x61, 0x63, 0x3d,
0x30, 0x20, 0x72, 0x65, 0x66, 0x3d, 0x32, 0x20, 0x64, 0x65, 0x62, 0x6c, 0x6f, 0x63, 0x6b,
0x3d, 0x31, 0x3a, 0x30, 0x3a, 0x30, 0x20, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x73, 0x65, 0x3d,
0x30, 0x78, 0x31, 0x3a, 0x30, 0x78, 0x31, 0x31, 0x31, 0x20, 0x6d, 0x65, 0x3d, 0x68, 0x65,
0x78, 0x20, 0x73, 0x75, 0x62, 0x6d, 0x65, 0x3d, 0x37, 0x20, 0x70, 0x73, 0x79, 0x3d, 0x31,
0x20, 0x70, 0x73, 0x79, 0x5f, 0x72, 0x64, 0x3d, 0x31, 0x2e, 0x30, 0x30, 0x3a, 0x30, 0x2e,
0x30, 0x30, 0x20, 0x6d, 0x69, 0x78, 0x65, 0x64, 0x5f, 0x72, 0x65, 0x66, 0x3d, 0x31, 0x20,
0x6d, 0x65, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x3d, 0x31, 0x36, 0x20, 0x63, 0x68, 0x72,
0x6f, 0x6d, 0x61, 0x5f, 0x6d, 0x65, 0x3d, 0x31, 0x20, 0x74, 0x72, 0x65, 0x6c, 0x6c, 0x69,
0x73, 0x3d, 0x31, 0x20, 0x38, 0x78, 0x38, 0x64, 0x63, 0x74, 0x3d, 0x30, 0x20, 0x63, 0x71,
0x6d, 0x3d, 0x30, 0x20, 0x64, 0x65, 0x61, 0x64, 0x7a, 0x6f, 0x6e, 0x65, 0x3d, 0x32, 0x31,
0x2c, 0x31, 0x31, 0x20, 0x66, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x73, 0x6b, 0x69, 0x70, 0x3d,
0x31, 0x20, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x5f, 0x71, 0x70, 0x5f, 0x6f, 0x66, 0x66,
0x73, 0x65, 0x74, 0x3d, 0x2d, 0x32, 0x20, 0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d,
0x36, 0x30, 0x20, 0x6c, 0x6f, 0x6f, 0x6b, 0x61, 0x68, 0x65, 0x61, 0x64, 0x5f, 0x74, 0x68,
0x72, 0x65, 0x61, 0x64, 0x73, 0x3d, 0x35, 0x20, 0x73, 0x6c, 0x69, 0x63, 0x65, 0x64, 0x5f,
0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d, 0x30, 0x20, 0x6e, 0x72, 0x3d, 0x30, 0x20,
0x64, 0x65, 0x63, 0x69, 0x6d, 0x61, 0x74, 0x65, 0x3d, 0x31, 0x20, 0x69, 0x6e, 0x74, 0x65,
0x72, 0x6c, 0x61, 0x63, 0x65, 0x64, 0x3d, 0x30, 0x20, 0x62, 0x6c, 0x75, 0x72, 0x61, 0x79,
0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x3d, 0x30, 0x20, 0x63, 0x6f, 0x6e, 0x73, 0x74,
0x72, 0x61, 0x69, 0x6e, 0x65, 0x64, 0x5f, 0x69, 0x6e, 0x74, 0x72, 0x61, 0x3d, 0x30, 0x20,
0x62, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x73, 0x3d, 0x30, 0x20, 0x77, 0x65, 0x69, 0x67, 0x68,
0x74, 0x70, 0x3d, 0x30, 0x20, 0x6b, 0x65, 0x79, 0x69, 0x6e, 0x74, 0x3d, 0x32, 0x35, 0x30,
0x20, 0x6b, 0x65, 0x79, 0x69, 0x6e, 0x74, 0x5f, 0x6d, 0x69, 0x6e, 0x3d, 0x32, 0x35, 0x20,
0x73, 0x63, 0x65, 0x6e, 0x65,
};
Demux::Demux(uint32_t demuxId, sp<Tuner> tuner) {
mDemuxId = demuxId;
mTunerService = tuner;
}
Demux::~Demux() {}
Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) {
ALOGV("%s", __FUNCTION__);
if (mTunerService == nullptr) {
return Result::NOT_INITIALIZED;
}
mFrontend = mTunerService->getFrontendById(frontendId);
if (mFrontend == nullptr) {
return Result::INVALID_STATE;
}
mFrontendSourceFile = mFrontend->getSourceFile();
mTunerService->setFrontendAsDemuxSource(frontendId, mDemuxId);
return startBroadcastInputLoop();
}
Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
const sp<IDemuxCallback>& cb, addFilter_cb _hidl_cb) {
ALOGV("%s", __FUNCTION__);
uint32_t filterId;
if (!mUnusedFilterIds.empty()) {
filterId = *mUnusedFilterIds.begin();
mUnusedFilterIds.erase(filterId);
} else {
filterId = ++mLastUsedFilterId;
mFilterCallbacks.resize(filterId + 1);
mFilterMQs.resize(filterId + 1);
mFilterEvents.resize(filterId + 1);
mFilterEventFlags.resize(filterId + 1);
mFilterThreadRunning.resize(filterId + 1);
mFilterThreads.resize(filterId + 1);
mFilterPids.resize(filterId + 1);
mFilterOutputs.resize(filterId + 1);
mFilterStatus.resize(filterId + 1);
}
mUsedFilterIds.insert(filterId);
if ((type != DemuxFilterType::PCR || type != DemuxFilterType::TS) && cb == nullptr) {
ALOGW("callback can't be null");
_hidl_cb(Result::INVALID_ARGUMENT, filterId);
return Void();
}
// Add callback
mFilterCallbacks[filterId] = cb;
// Mapping from the filter ID to the filter event
DemuxFilterEvent event{
.filterId = filterId,
.filterType = type,
};
mFilterEvents[filterId] = event;
if (!createFilterMQ(bufferSize, filterId)) {
_hidl_cb(Result::UNKNOWN_ERROR, -1);
return Void();
}
_hidl_cb(Result::SUCCESS, filterId);
return Void();
}
Return<void> Demux::getFilterQueueDesc(uint32_t filterId, getFilterQueueDesc_cb _hidl_cb) {
ALOGV("%s", __FUNCTION__);
if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
ALOGW("No filter with id: %d exists to get desc", filterId);
_hidl_cb(Result::INVALID_ARGUMENT, FilterMQ::Descriptor());
return Void();
}
_hidl_cb(Result::SUCCESS, *mFilterMQs[filterId]->getDesc());
return Void();
}
Return<Result> Demux::configureFilter(uint32_t filterId, const DemuxFilterSettings& settings) {
ALOGV("%s", __FUNCTION__);
switch (mFilterEvents[filterId].filterType) {
case DemuxFilterType::SECTION:
mFilterPids[filterId] = settings.section().tpid;
break;
case DemuxFilterType::PES:
mFilterPids[filterId] = settings.pesData().tpid;
break;
case DemuxFilterType::TS:
mFilterPids[filterId] = settings.ts().tpid;
break;
case DemuxFilterType::AUDIO:
mFilterPids[filterId] = settings.audio().tpid;
break;
case DemuxFilterType::VIDEO:
mFilterPids[filterId] = settings.video().tpid;
break;
case DemuxFilterType::RECORD:
mFilterPids[filterId] = settings.record().tpid;
break;
case DemuxFilterType::PCR:
mFilterPids[filterId] = settings.pcr().tpid;
break;
default:
return Result::UNKNOWN_ERROR;
}
return Result::SUCCESS;
}
Return<Result> Demux::startFilter(uint32_t filterId) {
ALOGV("%s", __FUNCTION__);
Result result;
if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
ALOGW("No filter with id: %d exists to start filter", filterId);
return Result::INVALID_ARGUMENT;
}
result = startFilterLoop(filterId);
return result;
}
Return<Result> Demux::stopFilter(uint32_t filterId) {
ALOGV("%s", __FUNCTION__);
mFilterThreadRunning[filterId] = false;
std::lock_guard<std::mutex> lock(mFilterThreadLock);
return Result::SUCCESS;
}
Return<Result> Demux::flushFilter(uint32_t filterId) {
ALOGV("%s", __FUNCTION__);
// temp implementation to flush the FMQ
int size = mFilterMQs[filterId]->availableToRead();
char* buffer = new char[size];
mOutputMQ->read((unsigned char*)&buffer[0], size);
delete[] buffer;
mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
return Result::SUCCESS;
}
Return<Result> Demux::removeFilter(uint32_t filterId) {
ALOGV("%s", __FUNCTION__);
// resetFilterRecords(filterId);
mUsedFilterIds.erase(filterId);
mUnusedFilterIds.insert(filterId);
return Result::SUCCESS;
}
Return<void> Demux::getAvSyncHwId(uint32_t /* filterId */, getAvSyncHwId_cb _hidl_cb) {
ALOGV("%s", __FUNCTION__);
AvSyncHwId avSyncHwId = 0;
_hidl_cb(Result::SUCCESS, avSyncHwId);
return Void();
}
Return<void> Demux::getAvSyncTime(AvSyncHwId /* avSyncHwId */, getAvSyncTime_cb _hidl_cb) {
ALOGV("%s", __FUNCTION__);
uint64_t avSyncTime = 0;
_hidl_cb(Result::SUCCESS, avSyncTime);
return Void();
}
Return<Result> Demux::close() {
ALOGV("%s", __FUNCTION__);
set<uint32_t>::iterator it;
mInputThread = 0;
mOutputThread = 0;
mFilterThreads.clear();
mUnusedFilterIds.clear();
mUsedFilterIds.clear();
mFilterCallbacks.clear();
mFilterMQs.clear();
mFilterEvents.clear();
mFilterEventFlags.clear();
mFilterOutputs.clear();
mFilterPids.clear();
mLastUsedFilterId = -1;
return Result::SUCCESS;
}
Return<Result> Demux::addOutput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
ALOGV("%s", __FUNCTION__);
// Create a synchronized FMQ that supports blocking read/write
std::unique_ptr<FilterMQ> tmpFilterMQ =
std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
if (!tmpFilterMQ->isValid()) {
ALOGW("Failed to create output FMQ");
return Result::UNKNOWN_ERROR;
}
mOutputMQ = std::move(tmpFilterMQ);
if (EventFlag::createEventFlag(mOutputMQ->getEventFlagWord(), &mOutputEventFlag) != OK) {
return Result::UNKNOWN_ERROR;
}
mOutputCallback = cb;
return Result::SUCCESS;
}
Return<void> Demux::getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) {
ALOGV("%s", __FUNCTION__);
if (!mOutputMQ) {
_hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
return Void();
}
_hidl_cb(Result::SUCCESS, *mOutputMQ->getDesc());
return Void();
}
Return<Result> Demux::configureOutput(const DemuxOutputSettings& settings) {
ALOGV("%s", __FUNCTION__);
mOutputConfigured = true;
mOutputSettings = settings;
return Result::SUCCESS;
}
Return<Result> Demux::attachOutputFilter(uint32_t /*filterId*/) {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
}
Return<Result> Demux::detachOutputFilter(uint32_t /* filterId */) {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
}
Return<Result> Demux::startOutput() {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
}
Return<Result> Demux::stopOutput() {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
}
Return<Result> Demux::flushOutput() {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
}
Return<Result> Demux::removeOutput() {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
}
Return<Result> Demux::addInput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
ALOGV("%s", __FUNCTION__);
// Create a synchronized FMQ that supports blocking read/write
std::unique_ptr<FilterMQ> tmpInputMQ =
std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
if (!tmpInputMQ->isValid()) {
ALOGW("Failed to create input FMQ");
return Result::UNKNOWN_ERROR;
}
mInputMQ = std::move(tmpInputMQ);
if (EventFlag::createEventFlag(mInputMQ->getEventFlagWord(), &mInputEventFlag) != OK) {
return Result::UNKNOWN_ERROR;
}
mInputCallback = cb;
return Result::SUCCESS;
}
Return<void> Demux::getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) {
ALOGV("%s", __FUNCTION__);
if (!mInputMQ) {
_hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
return Void();
}
_hidl_cb(Result::SUCCESS, *mInputMQ->getDesc());
return Void();
}
Return<Result> Demux::configureInput(const DemuxInputSettings& settings) {
ALOGV("%s", __FUNCTION__);
mInputConfigured = true;
mInputSettings = settings;
return Result::SUCCESS;
}
Return<Result> Demux::startInput() {
ALOGV("%s", __FUNCTION__);
if (!mInputCallback) {
return Result::NOT_INITIALIZED;
}
if (!mInputConfigured) {
return Result::INVALID_STATE;
}
pthread_create(&mInputThread, NULL, __threadLoopInput, this);
pthread_setname_np(mInputThread, "demux_input_waiting_loop");
// TODO start another thread to send filter status callback to the framework
return Result::SUCCESS;
}
Return<Result> Demux::stopInput() {
ALOGV("%s", __FUNCTION__);
mInputThreadRunning = false;
std::lock_guard<std::mutex> lock(mInputThreadLock);
return Result::SUCCESS;
}
Return<Result> Demux::flushInput() {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
}
Return<Result> Demux::removeInput() {
ALOGV("%s", __FUNCTION__);
mInputMQ = nullptr;
return Result::SUCCESS;
}
Result Demux::startFilterLoop(uint32_t filterId) {
struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs));
threadArgs->user = this;
threadArgs->filterId = filterId;
pthread_t mFilterThread;
pthread_create(&mFilterThread, NULL, __threadLoopFilter, (void*)threadArgs);
mFilterThreads[filterId] = mFilterThread;
pthread_setname_np(mFilterThread, "demux_filter_waiting_loop");
return Result::SUCCESS;
}
Result Demux::startSectionFilterHandler(uint32_t filterId) {
if (mFilterOutputs[filterId].empty()) {
return Result::SUCCESS;
}
if (!writeSectionsAndCreateEvent(filterId, mFilterOutputs[filterId])) {
ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
return Result::UNKNOWN_ERROR;
}
mFilterOutputs[filterId].clear();
return Result::SUCCESS;
}
Result Demux::startPesFilterHandler(uint32_t filterId) {
std::lock_guard<std::mutex> lock(mFilterEventLock);
if (mFilterOutputs[filterId].empty()) {
return Result::SUCCESS;
}
for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) {
if (mPesSizeLeft == 0) {
uint32_t prefix = (mFilterOutputs[filterId][i + 4] << 16) |
(mFilterOutputs[filterId][i + 5] << 8) |
mFilterOutputs[filterId][i + 6];
ALOGD("[Demux] prefix %d", prefix);
if (prefix == 0x000001) {
// TODO handle mulptiple Pes filters
mPesSizeLeft =
(mFilterOutputs[filterId][i + 7] << 8) | mFilterOutputs[filterId][i + 8];
ALOGD("[Demux] pes data length %d", mPesSizeLeft);
} else {
continue;
}
}
int endPoint = min(184, mPesSizeLeft);
// append data and check size
vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4;
vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 3 + endPoint;
mPesOutput.insert(mPesOutput.end(), first, last);
// size does not match then continue
mPesSizeLeft -= endPoint;
if (mPesSizeLeft > 0) {
continue;
}
// size match then create event
if (!writeDataToFilterMQ(mPesOutput, filterId)) {
mFilterOutputs[filterId].clear();
return Result::INVALID_STATE;
}
maySendFilterStatusCallback(filterId);
DemuxFilterPesEvent pesEvent;
pesEvent = {
// temp dump meta data
.streamId = mPesOutput[3],
.dataLength = static_cast<uint16_t>(mPesOutput.size()),
};
ALOGD("[Demux] assembled pes data length %d", pesEvent.dataLength);
int size = mFilterEvents[filterId].events.size();
mFilterEvents[filterId].events.resize(size + 1);
mFilterEvents[filterId].events[size].pes(pesEvent);
mPesOutput.clear();
}
mFilterOutputs[filterId].clear();
return Result::SUCCESS;
}
Result Demux::startTsFilterHandler() {
// TODO handle starting TS filter
return Result::SUCCESS;
}
Result Demux::startMediaFilterHandler(uint32_t filterId) {
DemuxFilterMediaEvent mediaEvent;
mediaEvent = {
// temp dump meta data
.pts = 0,
.dataLength = 530,
.secureMemory = nullptr,
};
mFilterEvents[filterId].events.resize(1);
mFilterEvents[filterId].events[0].media() = mediaEvent;
mFilterOutputs[filterId].clear();
// TODO handle write FQM for media stream
return Result::SUCCESS;
}
Result Demux::startRecordFilterHandler(uint32_t filterId) {
DemuxFilterRecordEvent recordEvent;
recordEvent = {
// temp dump meta data
.tpid = 0,
.packetNum = 0,
};
recordEvent.indexMask.tsIndexMask() = 0x01;
mFilterEvents[filterId].events.resize(1);
mFilterEvents[filterId].events[0].ts() = recordEvent;
mFilterOutputs[filterId].clear();
return Result::SUCCESS;
}
Result Demux::startPcrFilterHandler() {
// TODO handle starting PCR filter
return Result::SUCCESS;
}
bool Demux::createFilterMQ(uint32_t bufferSize, uint32_t filterId) {
ALOGV("%s", __FUNCTION__);
// Create a synchronized FMQ that supports blocking read/write
std::unique_ptr<FilterMQ> tmpFilterMQ =
std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
if (!tmpFilterMQ->isValid()) {
ALOGW("Failed to create FMQ of filter with id: %d", filterId);
return false;
}
mFilterMQs[filterId] = std::move(tmpFilterMQ);
EventFlag* filterEventFlag;
if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &filterEventFlag) !=
OK) {
return false;
}
mFilterEventFlags[filterId] = filterEventFlag;
return true;
}
bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
// TODO check how many sections has been read
std::lock_guard<std::mutex> lock(mFilterEventLock);
if (!writeDataToFilterMQ(data, filterId)) {
return false;
}
int size = mFilterEvents[filterId].events.size();
mFilterEvents[filterId].events.resize(size + 1);
DemuxFilterSectionEvent secEvent;
secEvent = {
// temp dump meta data
.tableId = 0,
.version = 1,
.sectionNum = 1,
.dataLength = static_cast<uint16_t>(data.size()),
};
mFilterEvents[filterId].events[size].section(secEvent);
return true;
}
bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId) {
std::lock_guard<std::mutex> lock(mWriteLock);
if (mFilterMQs[filterId]->write(data.data(), data.size())) {
return true;
}
return false;
}
bool Demux::readInputFMQ() {
// Read input data from the input FMQ
int size = mInputMQ->availableToRead();
int inputPacketSize = mInputSettings.packetSize;
vector<uint8_t> dataOutputBuffer;
dataOutputBuffer.resize(inputPacketSize);
// Dispatch the packet to the PID matching filter output buffer
for (int i = 0; i < size / inputPacketSize; i++) {
if (!mInputMQ->read(dataOutputBuffer.data(), inputPacketSize)) {
return false;
}
startTsFilter(dataOutputBuffer);
}
return true;
}
void Demux::startTsFilter(vector<uint8_t> data) {
set<uint32_t>::iterator it;
for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
ALOGW("start ts filter pid: %d", pid);
if (pid == mFilterPids[*it]) {
mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), data.begin(), data.end());
}
}
}
bool Demux::startFilterDispatcher() {
Result result;
set<uint32_t>::iterator it;
// Handle the output data per filter type
for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
switch (mFilterEvents[*it].filterType) {
case DemuxFilterType::SECTION:
result = startSectionFilterHandler(*it);
break;
case DemuxFilterType::PES:
result = startPesFilterHandler(*it);
break;
case DemuxFilterType::TS:
result = startTsFilterHandler();
break;
case DemuxFilterType::AUDIO:
case DemuxFilterType::VIDEO:
result = startMediaFilterHandler(*it);
break;
case DemuxFilterType::RECORD:
result = startRecordFilterHandler(*it);
break;
case DemuxFilterType::PCR:
result = startPcrFilterHandler();
break;
default:
return false;
}
}
return result == Result::SUCCESS;
}
void* Demux::__threadLoopFilter(void* threadArg) {
Demux* const self = static_cast<Demux*>(((struct ThreadArgs*)threadArg)->user);
self->filterThreadLoop(((struct ThreadArgs*)threadArg)->filterId);
return 0;
}
void* Demux::__threadLoopInput(void* user) {
Demux* const self = static_cast<Demux*>(user);
self->inputThreadLoop();
return 0;
}
void Demux::filterThreadLoop(uint32_t filterId) {
ALOGD("[Demux] filter %d threadLoop start.", filterId);
std::lock_guard<std::mutex> lock(mFilterThreadLock);
mFilterThreadRunning[filterId] = true;
// For the first time of filter output, implementation needs to send the filter
// Event Callback without waiting for the DATA_CONSUMED to init the process.
while (mFilterThreadRunning[filterId]) {
if (mFilterEvents[filterId].events.size() == 0) {
ALOGD("[Demux] wait for filter data output.");
usleep(1000 * 1000);
continue;
}
// After successfully write, send a callback and wait for the read to be done
mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
mFilterEvents[filterId].events.resize(0);
mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]);
break;
}
while (mFilterThreadRunning[filterId]) {
uint32_t efState = 0;
// We do not wait for the last round of writen data to be read to finish the thread
// because the VTS can verify the reading itself.
for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
while (mFilterThreadRunning[filterId]) {
status_t status = mFilterEventFlags[filterId]->wait(
static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
WAIT_TIMEOUT, true /* retry on spurious wake */);
if (status != OK) {
ALOGD("[Demux] wait for data consumed");
continue;
}
break;
}
if (mFilterCallbacks[filterId] == nullptr) {
ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
break;
}
maySendFilterStatusCallback(filterId);
while (mFilterThreadRunning[filterId]) {
std::lock_guard<std::mutex> lock(mFilterEventLock);
if (mFilterEvents[filterId].events.size() == 0) {
continue;
}
// After successfully write, send a callback and wait for the read to be done
mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
mFilterEvents[filterId].events.resize(0);
break;
}
// We do not wait for the last read to be done
// VTS can verify the read result itself.
if (i == SECTION_WRITE_COUNT - 1) {
ALOGD("[Demux] filter %d writing done. Ending thread", filterId);
break;
}
}
mFilterThreadRunning[filterId] = false;
}
ALOGD("[Demux] filter thread ended.");
}
void Demux::inputThreadLoop() {
ALOGD("[Demux] input threadLoop start.");
std::lock_guard<std::mutex> lock(mInputThreadLock);
mInputThreadRunning = true;
while (mInputThreadRunning) {
uint32_t efState = 0;
status_t status =
mInputEventFlag->wait(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY),
&efState, WAIT_TIMEOUT, true /* retry on spurious wake */);
if (status != OK) {
ALOGD("[Demux] wait for data ready on the input FMQ");
continue;
}
// Our current implementation filter the data and write it into the filter FMQ immedaitely
// after the DATA_READY from the VTS/framework
if (!readInputFMQ() || !startFilterDispatcher()) {
ALOGD("[Demux] input data failed to be filtered. Ending thread");
break;
}
maySendInputStatusCallback();
}
mInputThreadRunning = false;
ALOGD("[Demux] input thread ended.");
}
void Demux::maySendInputStatusCallback() {
std::lock_guard<std::mutex> lock(mInputStatusLock);
int availableToRead = mInputMQ->availableToRead();
int availableToWrite = mInputMQ->availableToWrite();
DemuxInputStatus newStatus =
checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
mInputSettings.lowThreshold);
if (mIntputStatus != newStatus) {
mInputCallback->onInputStatus(newStatus);
mIntputStatus = newStatus;
}
}
void Demux::maySendFilterStatusCallback(uint32_t filterId) {
std::lock_guard<std::mutex> lock(mFilterStatusLock);
int availableToRead = mFilterMQs[filterId]->availableToRead();
int availableToWrite = mInputMQ->availableToWrite();
int fmqSize = mFilterMQs[filterId]->getQuantumCount();
DemuxFilterStatus newStatus =
checkFilterStatusChange(filterId, availableToWrite, availableToRead,
ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
if (mFilterStatus[filterId] != newStatus) {
mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus);
mFilterStatus[filterId] = newStatus;
}
}
DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
uint32_t highThreshold, uint32_t lowThreshold) {
if (availableToWrite == 0) {
return DemuxInputStatus::SPACE_FULL;
} else if (availableToRead > highThreshold) {
return DemuxInputStatus::SPACE_ALMOST_FULL;
} else if (availableToRead < lowThreshold) {
return DemuxInputStatus::SPACE_ALMOST_EMPTY;
} else if (availableToRead == 0) {
return DemuxInputStatus::SPACE_EMPTY;
}
return mIntputStatus;
}
DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite,
uint32_t availableToRead, uint32_t highThreshold,
uint32_t lowThreshold) {
if (availableToWrite == 0) {
return DemuxFilterStatus::OVERFLOW;
} else if (availableToRead > highThreshold) {
return DemuxFilterStatus::HIGH_WATER;
} else if (availableToRead < lowThreshold) {
return DemuxFilterStatus::LOW_WATER;
}
return mFilterStatus[filterId];
}
Result Demux::startBroadcastInputLoop() {
pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this);
pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread");
return Result::SUCCESS;
}
void* Demux::__threadLoopBroadcast(void* user) {
Demux* const self = static_cast<Demux*>(user);
self->broadcastInputThreadLoop();
return 0;
}
void Demux::broadcastInputThreadLoop() {
std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
mBroadcastInputThreadRunning = true;
mKeepFetchingDataFromFrontend = true;
// open the stream and get its length
std::ifstream inputData(mFrontendSourceFile, std::ifstream::binary);
// TODO take the packet size from the frontend setting
int packetSize = 188;
int writePacketAmount = 6;
char* buffer = new char[packetSize];
ALOGW("[Demux] broadcast input thread loop start %s", mFrontendSourceFile.c_str());
if (!inputData.is_open()) {
mBroadcastInputThreadRunning = false;
ALOGW("[Demux] Error %s", strerror(errno));
}
while (mBroadcastInputThreadRunning) {
// move the stream pointer for packet size * 6 every read until the end
while (mKeepFetchingDataFromFrontend) {
for (int i = 0; i < writePacketAmount; i++) {
inputData.read(buffer, packetSize);
if (!inputData) {
mBroadcastInputThreadRunning = false;
break;
}
// filter and dispatch filter output
vector<uint8_t> byteBuffer;
byteBuffer.resize(packetSize);
for (int index = 0; index < byteBuffer.size(); index++) {
byteBuffer[index] = static_cast<uint8_t>(buffer[index]);
}
startTsFilter(byteBuffer);
inputData.seekg(packetSize, inputData.cur);
}
startFilterDispatcher();
sleep(1);
}
}
ALOGW("[Demux] Broadcast Input thread end.");
delete[] buffer;
inputData.close();
}
void Demux::stopBroadcastInput() {
mKeepFetchingDataFromFrontend = false;
mBroadcastInputThreadRunning = false;
std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
}
} // namespace implementation
} // namespace V1_0
} // namespace tuner
} // namespace tv
} // namespace hardware
} // namespace android