blob: b18d4df9e6086c28944e85a533824144a8d60cf7 [file] [log] [blame]
Amyfd4243a2019-08-16 16:01:27 -07001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "android.hardware.tv.tuner@1.0-Demux"
18
19#include "Demux.h"
20#include <utils/Log.h>
21
22namespace android {
23namespace hardware {
24namespace tv {
25namespace tuner {
26namespace V1_0 {
27namespace implementation {
28
Amya609d5a2019-08-23 14:38:31 -070029#define WAIT_TIMEOUT 3000000000
30
31const std::vector<uint8_t> fakeDataInputBuffer{
32 0x00, 0x00, 0x00, 0x01, 0x09, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x67, 0x42, 0xc0, 0x1e, 0xdb,
33 0x01, 0x40, 0x16, 0xec, 0x04, 0x40, 0x00, 0x00, 0x03, 0x00, 0x40, 0x00, 0x00, 0x0f, 0x03,
34 0xc5, 0x8b, 0xb8, 0x00, 0x00, 0x00, 0x01, 0x68, 0xca, 0x8c, 0xb2, 0x00, 0x00, 0x01, 0x06,
35 0x05, 0xff, 0xff, 0x70, 0xdc, 0x45, 0xe9, 0xbd, 0xe6, 0xd9, 0x48, 0xb7, 0x96, 0x2c, 0xd8,
36 0x20, 0xd9, 0x23, 0xee, 0xef, 0x78, 0x32, 0x36, 0x34, 0x20, 0x2d, 0x20, 0x63, 0x6f, 0x72,
37 0x65, 0x20, 0x31, 0x34, 0x32, 0x20, 0x2d, 0x20, 0x48, 0x2e, 0x32, 0x36, 0x34, 0x2f, 0x4d,
38 0x50, 0x45, 0x47, 0x2d, 0x34, 0x20, 0x41, 0x56, 0x43, 0x20, 0x63, 0x6f, 0x64, 0x65, 0x63,
39 0x20, 0x2d, 0x20, 0x43, 0x6f, 0x70, 0x79, 0x6c, 0x65, 0x66, 0x74, 0x20, 0x32, 0x30, 0x30,
40 0x33, 0x2d, 0x32, 0x30, 0x31, 0x34, 0x20, 0x2d, 0x20, 0x68, 0x74, 0x74, 0x70, 0x3a, 0x2f,
41 0x2f, 0x77, 0x77, 0x77, 0x2e, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x6c, 0x61, 0x6e, 0x2e, 0x6f,
42 0x72, 0x67, 0x2f, 0x78, 0x32, 0x36, 0x34, 0x2e, 0x68, 0x74, 0x6d, 0x6c, 0x20, 0x2d, 0x20,
43 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x3a, 0x20, 0x63, 0x61, 0x62, 0x61, 0x63, 0x3d,
44 0x30, 0x20, 0x72, 0x65, 0x66, 0x3d, 0x32, 0x20, 0x64, 0x65, 0x62, 0x6c, 0x6f, 0x63, 0x6b,
45 0x3d, 0x31, 0x3a, 0x30, 0x3a, 0x30, 0x20, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x73, 0x65, 0x3d,
46 0x30, 0x78, 0x31, 0x3a, 0x30, 0x78, 0x31, 0x31, 0x31, 0x20, 0x6d, 0x65, 0x3d, 0x68, 0x65,
47 0x78, 0x20, 0x73, 0x75, 0x62, 0x6d, 0x65, 0x3d, 0x37, 0x20, 0x70, 0x73, 0x79, 0x3d, 0x31,
48 0x20, 0x70, 0x73, 0x79, 0x5f, 0x72, 0x64, 0x3d, 0x31, 0x2e, 0x30, 0x30, 0x3a, 0x30, 0x2e,
49 0x30, 0x30, 0x20, 0x6d, 0x69, 0x78, 0x65, 0x64, 0x5f, 0x72, 0x65, 0x66, 0x3d, 0x31, 0x20,
50 0x6d, 0x65, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x3d, 0x31, 0x36, 0x20, 0x63, 0x68, 0x72,
51 0x6f, 0x6d, 0x61, 0x5f, 0x6d, 0x65, 0x3d, 0x31, 0x20, 0x74, 0x72, 0x65, 0x6c, 0x6c, 0x69,
52 0x73, 0x3d, 0x31, 0x20, 0x38, 0x78, 0x38, 0x64, 0x63, 0x74, 0x3d, 0x30, 0x20, 0x63, 0x71,
53 0x6d, 0x3d, 0x30, 0x20, 0x64, 0x65, 0x61, 0x64, 0x7a, 0x6f, 0x6e, 0x65, 0x3d, 0x32, 0x31,
54 0x2c, 0x31, 0x31, 0x20, 0x66, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x73, 0x6b, 0x69, 0x70, 0x3d,
55 0x31, 0x20, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x5f, 0x71, 0x70, 0x5f, 0x6f, 0x66, 0x66,
56 0x73, 0x65, 0x74, 0x3d, 0x2d, 0x32, 0x20, 0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d,
57 0x36, 0x30, 0x20, 0x6c, 0x6f, 0x6f, 0x6b, 0x61, 0x68, 0x65, 0x61, 0x64, 0x5f, 0x74, 0x68,
58 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d, 0x35, 0x20, 0x73, 0x6c, 0x69, 0x63, 0x65, 0x64, 0x5f,
59 0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d, 0x30, 0x20, 0x6e, 0x72, 0x3d, 0x30, 0x20,
60 0x64, 0x65, 0x63, 0x69, 0x6d, 0x61, 0x74, 0x65, 0x3d, 0x31, 0x20, 0x69, 0x6e, 0x74, 0x65,
61 0x72, 0x6c, 0x61, 0x63, 0x65, 0x64, 0x3d, 0x30, 0x20, 0x62, 0x6c, 0x75, 0x72, 0x61, 0x79,
62 0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x3d, 0x30, 0x20, 0x63, 0x6f, 0x6e, 0x73, 0x74,
63 0x72, 0x61, 0x69, 0x6e, 0x65, 0x64, 0x5f, 0x69, 0x6e, 0x74, 0x72, 0x61, 0x3d, 0x30, 0x20,
64 0x62, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x73, 0x3d, 0x30, 0x20, 0x77, 0x65, 0x69, 0x67, 0x68,
65 0x74, 0x70, 0x3d, 0x30, 0x20, 0x6b, 0x65, 0x79, 0x69, 0x6e, 0x74, 0x3d, 0x32, 0x35, 0x30,
66 0x20, 0x6b, 0x65, 0x79, 0x69, 0x6e, 0x74, 0x5f, 0x6d, 0x69, 0x6e, 0x3d, 0x32, 0x35, 0x20,
67 0x73, 0x63, 0x65, 0x6e, 0x65,
68};
69
Amy5094ae12019-10-04 18:43:21 -070070Demux::Demux(uint32_t demuxId, sp<Tuner> tuner) {
Amyfd4243a2019-08-16 16:01:27 -070071 mDemuxId = demuxId;
Amy5094ae12019-10-04 18:43:21 -070072 mTunerService = tuner;
Amyfd4243a2019-08-16 16:01:27 -070073}
74
75Demux::~Demux() {}
76
77Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) {
78 ALOGV("%s", __FUNCTION__);
79
Amy5094ae12019-10-04 18:43:21 -070080 if (mTunerService == nullptr) {
81 return Result::NOT_INITIALIZED;
82 }
Amyfd4243a2019-08-16 16:01:27 -070083
Amy5094ae12019-10-04 18:43:21 -070084 mFrontend = mTunerService->getFrontendById(frontendId);
85
86 if (mFrontend == nullptr) {
87 return Result::INVALID_STATE;
88 }
89
90 mFrontendSourceFile = mFrontend->getSourceFile();
91
92 mTunerService->setFrontendAsDemuxSource(frontendId, mDemuxId);
93 return startBroadcastInputLoop();
Amyfd4243a2019-08-16 16:01:27 -070094}
95
Amya609d5a2019-08-23 14:38:31 -070096Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
97 const sp<IDemuxCallback>& cb, addFilter_cb _hidl_cb) {
98 ALOGV("%s", __FUNCTION__);
99
Amya4885292019-09-06 10:30:53 -0700100 uint32_t filterId;
101
102 if (!mUnusedFilterIds.empty()) {
103 filterId = *mUnusedFilterIds.begin();
104
105 mUnusedFilterIds.erase(filterId);
106 } else {
107 filterId = ++mLastUsedFilterId;
108
Amy79125022019-10-10 15:30:17 -0700109 mFilterCallbacks.resize(filterId + 1);
Amya4885292019-09-06 10:30:53 -0700110 mFilterMQs.resize(filterId + 1);
111 mFilterEvents.resize(filterId + 1);
112 mFilterEventFlags.resize(filterId + 1);
113 mFilterThreadRunning.resize(filterId + 1);
114 mFilterThreads.resize(filterId + 1);
Amy42a5b4b2019-10-03 16:49:48 -0700115 mFilterPids.resize(filterId + 1);
116 mFilterOutputs.resize(filterId + 1);
Amy79125022019-10-10 15:30:17 -0700117 mFilterStatus.resize(filterId + 1);
Amya4885292019-09-06 10:30:53 -0700118 }
119
120 mUsedFilterIds.insert(filterId);
Amya609d5a2019-08-23 14:38:31 -0700121
122 if ((type != DemuxFilterType::PCR || type != DemuxFilterType::TS) && cb == nullptr) {
123 ALOGW("callback can't be null");
124 _hidl_cb(Result::INVALID_ARGUMENT, filterId);
125 return Void();
126 }
Amya4885292019-09-06 10:30:53 -0700127
Amya609d5a2019-08-23 14:38:31 -0700128 // Add callback
Amy79125022019-10-10 15:30:17 -0700129 mFilterCallbacks[filterId] = cb;
Amya609d5a2019-08-23 14:38:31 -0700130
Amya4885292019-09-06 10:30:53 -0700131 // Mapping from the filter ID to the filter event
132 DemuxFilterEvent event{
133 .filterId = filterId,
134 .filterType = type,
135 };
136 mFilterEvents[filterId] = event;
Amya609d5a2019-08-23 14:38:31 -0700137
Amya4885292019-09-06 10:30:53 -0700138 if (!createFilterMQ(bufferSize, filterId)) {
Amya609d5a2019-08-23 14:38:31 -0700139 _hidl_cb(Result::UNKNOWN_ERROR, -1);
140 return Void();
141 }
142
143 _hidl_cb(Result::SUCCESS, filterId);
144 return Void();
145}
146
147Return<void> Demux::getFilterQueueDesc(uint32_t filterId, getFilterQueueDesc_cb _hidl_cb) {
148 ALOGV("%s", __FUNCTION__);
149
Amya4885292019-09-06 10:30:53 -0700150 if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
151 ALOGW("No filter with id: %d exists to get desc", filterId);
Amya609d5a2019-08-23 14:38:31 -0700152 _hidl_cb(Result::INVALID_ARGUMENT, FilterMQ::Descriptor());
153 return Void();
154 }
155
156 _hidl_cb(Result::SUCCESS, *mFilterMQs[filterId]->getDesc());
157 return Void();
158}
159
Amy42a5b4b2019-10-03 16:49:48 -0700160Return<Result> Demux::configureFilter(uint32_t filterId, const DemuxFilterSettings& settings) {
Amya609d5a2019-08-23 14:38:31 -0700161 ALOGV("%s", __FUNCTION__);
162
Amy42a5b4b2019-10-03 16:49:48 -0700163 switch (mFilterEvents[filterId].filterType) {
164 case DemuxFilterType::SECTION:
165 mFilterPids[filterId] = settings.section().tpid;
166 break;
167 case DemuxFilterType::PES:
168 mFilterPids[filterId] = settings.pesData().tpid;
169 break;
170 case DemuxFilterType::TS:
171 mFilterPids[filterId] = settings.ts().tpid;
172 break;
173 case DemuxFilterType::AUDIO:
174 mFilterPids[filterId] = settings.audio().tpid;
175 break;
176 case DemuxFilterType::VIDEO:
177 mFilterPids[filterId] = settings.video().tpid;
178 break;
179 case DemuxFilterType::RECORD:
180 mFilterPids[filterId] = settings.record().tpid;
181 break;
182 case DemuxFilterType::PCR:
183 mFilterPids[filterId] = settings.pcr().tpid;
184 break;
185 default:
186 return Result::UNKNOWN_ERROR;
187 }
Amya609d5a2019-08-23 14:38:31 -0700188 return Result::SUCCESS;
189}
190
191Return<Result> Demux::startFilter(uint32_t filterId) {
192 ALOGV("%s", __FUNCTION__);
Amya4885292019-09-06 10:30:53 -0700193 Result result;
Amya609d5a2019-08-23 14:38:31 -0700194
Amya4885292019-09-06 10:30:53 -0700195 if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
196 ALOGW("No filter with id: %d exists to start filter", filterId);
Amya609d5a2019-08-23 14:38:31 -0700197 return Result::INVALID_ARGUMENT;
198 }
199
Amy42a5b4b2019-10-03 16:49:48 -0700200 result = startFilterLoop(filterId);
Amya609d5a2019-08-23 14:38:31 -0700201
202 return result;
203}
204
Amy42a5b4b2019-10-03 16:49:48 -0700205Return<Result> Demux::stopFilter(uint32_t filterId) {
Amya609d5a2019-08-23 14:38:31 -0700206 ALOGV("%s", __FUNCTION__);
207
Amy42a5b4b2019-10-03 16:49:48 -0700208 mFilterThreadRunning[filterId] = false;
209
Amy5094ae12019-10-04 18:43:21 -0700210 std::lock_guard<std::mutex> lock(mFilterThreadLock);
211
Amya609d5a2019-08-23 14:38:31 -0700212 return Result::SUCCESS;
213}
214
Amy79125022019-10-10 15:30:17 -0700215Return<Result> Demux::flushFilter(uint32_t filterId) {
Amya609d5a2019-08-23 14:38:31 -0700216 ALOGV("%s", __FUNCTION__);
217
Amy79125022019-10-10 15:30:17 -0700218 // temp implementation to flush the FMQ
219 int size = mFilterMQs[filterId]->availableToRead();
220 char* buffer = new char[size];
221 mOutputMQ->read((unsigned char*)&buffer[0], size);
222 delete[] buffer;
223 mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
224
Amya609d5a2019-08-23 14:38:31 -0700225 return Result::SUCCESS;
226}
227
Amya4885292019-09-06 10:30:53 -0700228Return<Result> Demux::removeFilter(uint32_t filterId) {
Amya609d5a2019-08-23 14:38:31 -0700229 ALOGV("%s", __FUNCTION__);
230
Amya4885292019-09-06 10:30:53 -0700231 // resetFilterRecords(filterId);
232 mUsedFilterIds.erase(filterId);
233 mUnusedFilterIds.insert(filterId);
234
Amya609d5a2019-08-23 14:38:31 -0700235 return Result::SUCCESS;
236}
237
238Return<void> Demux::getAvSyncHwId(uint32_t /* filterId */, getAvSyncHwId_cb _hidl_cb) {
239 ALOGV("%s", __FUNCTION__);
240
241 AvSyncHwId avSyncHwId = 0;
242
243 _hidl_cb(Result::SUCCESS, avSyncHwId);
244 return Void();
245}
246
247Return<void> Demux::getAvSyncTime(AvSyncHwId /* avSyncHwId */, getAvSyncTime_cb _hidl_cb) {
248 ALOGV("%s", __FUNCTION__);
249
250 uint64_t avSyncTime = 0;
251
252 _hidl_cb(Result::SUCCESS, avSyncTime);
253 return Void();
254}
255
Amyfd4243a2019-08-16 16:01:27 -0700256Return<Result> Demux::close() {
257 ALOGV("%s", __FUNCTION__);
258
Amya4885292019-09-06 10:30:53 -0700259 set<uint32_t>::iterator it;
260 mInputThread = 0;
261 mOutputThread = 0;
262 mFilterThreads.clear();
263 mUnusedFilterIds.clear();
264 mUsedFilterIds.clear();
Amy79125022019-10-10 15:30:17 -0700265 mFilterCallbacks.clear();
Amya4885292019-09-06 10:30:53 -0700266 mFilterMQs.clear();
267 mFilterEvents.clear();
268 mFilterEventFlags.clear();
Amy42a5b4b2019-10-03 16:49:48 -0700269 mFilterOutputs.clear();
270 mFilterPids.clear();
Amya4885292019-09-06 10:30:53 -0700271 mLastUsedFilterId = -1;
272
Amyfd4243a2019-08-16 16:01:27 -0700273 return Result::SUCCESS;
Amya609d5a2019-08-23 14:38:31 -0700274}
275
Amya4885292019-09-06 10:30:53 -0700276Return<Result> Demux::addOutput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
277 ALOGV("%s", __FUNCTION__);
278
279 // Create a synchronized FMQ that supports blocking read/write
280 std::unique_ptr<FilterMQ> tmpFilterMQ =
281 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
282 if (!tmpFilterMQ->isValid()) {
283 ALOGW("Failed to create output FMQ");
284 return Result::UNKNOWN_ERROR;
Amya609d5a2019-08-23 14:38:31 -0700285 }
Amya4885292019-09-06 10:30:53 -0700286
287 mOutputMQ = std::move(tmpFilterMQ);
288
289 if (EventFlag::createEventFlag(mOutputMQ->getEventFlagWord(), &mOutputEventFlag) != OK) {
290 return Result::UNKNOWN_ERROR;
291 }
292
293 mOutputCallback = cb;
294
295 return Result::SUCCESS;
296}
297
298Return<void> Demux::getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) {
299 ALOGV("%s", __FUNCTION__);
300
301 if (!mOutputMQ) {
302 _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
303 return Void();
304 }
305
306 _hidl_cb(Result::SUCCESS, *mOutputMQ->getDesc());
307 return Void();
308}
309
Amy42a5b4b2019-10-03 16:49:48 -0700310Return<Result> Demux::configureOutput(const DemuxOutputSettings& settings) {
311 ALOGV("%s", __FUNCTION__);
312
313 mOutputConfigured = true;
314 mOutputSettings = settings;
315 return Result::SUCCESS;
316}
317
318Return<Result> Demux::attachOutputFilter(uint32_t /*filterId*/) {
Amya4885292019-09-06 10:30:53 -0700319 ALOGV("%s", __FUNCTION__);
320
321 return Result::SUCCESS;
322}
323
Amy42a5b4b2019-10-03 16:49:48 -0700324Return<Result> Demux::detachOutputFilter(uint32_t /* filterId */) {
Amya4885292019-09-06 10:30:53 -0700325 ALOGV("%s", __FUNCTION__);
326
327 return Result::SUCCESS;
328}
329
330Return<Result> Demux::startOutput() {
331 ALOGV("%s", __FUNCTION__);
332
333 return Result::SUCCESS;
334}
335
336Return<Result> Demux::stopOutput() {
337 ALOGV("%s", __FUNCTION__);
338
339 return Result::SUCCESS;
340}
341
342Return<Result> Demux::flushOutput() {
343 ALOGV("%s", __FUNCTION__);
344
345 return Result::SUCCESS;
346}
347
348Return<Result> Demux::removeOutput() {
349 ALOGV("%s", __FUNCTION__);
350
351 return Result::SUCCESS;
352}
353
354Return<Result> Demux::addInput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
355 ALOGV("%s", __FUNCTION__);
356
357 // Create a synchronized FMQ that supports blocking read/write
358 std::unique_ptr<FilterMQ> tmpInputMQ =
359 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
360 if (!tmpInputMQ->isValid()) {
361 ALOGW("Failed to create input FMQ");
362 return Result::UNKNOWN_ERROR;
363 }
364
365 mInputMQ = std::move(tmpInputMQ);
366
367 if (EventFlag::createEventFlag(mInputMQ->getEventFlagWord(), &mInputEventFlag) != OK) {
368 return Result::UNKNOWN_ERROR;
369 }
370
371 mInputCallback = cb;
372
373 return Result::SUCCESS;
374}
375
376Return<void> Demux::getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) {
377 ALOGV("%s", __FUNCTION__);
378
379 if (!mInputMQ) {
380 _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
381 return Void();
382 }
383
384 _hidl_cb(Result::SUCCESS, *mInputMQ->getDesc());
385 return Void();
386}
387
Amy42a5b4b2019-10-03 16:49:48 -0700388Return<Result> Demux::configureInput(const DemuxInputSettings& settings) {
Amya4885292019-09-06 10:30:53 -0700389 ALOGV("%s", __FUNCTION__);
390
Amy42a5b4b2019-10-03 16:49:48 -0700391 mInputConfigured = true;
392 mInputSettings = settings;
393
Amya4885292019-09-06 10:30:53 -0700394 return Result::SUCCESS;
395}
396
397Return<Result> Demux::startInput() {
398 ALOGV("%s", __FUNCTION__);
399
Amy42a5b4b2019-10-03 16:49:48 -0700400 if (!mInputCallback) {
401 return Result::NOT_INITIALIZED;
402 }
403
404 if (!mInputConfigured) {
405 return Result::INVALID_STATE;
406 }
407
Amya4885292019-09-06 10:30:53 -0700408 pthread_create(&mInputThread, NULL, __threadLoopInput, this);
409 pthread_setname_np(mInputThread, "demux_input_waiting_loop");
410
411 // TODO start another thread to send filter status callback to the framework
412
413 return Result::SUCCESS;
414}
415
416Return<Result> Demux::stopInput() {
417 ALOGV("%s", __FUNCTION__);
418
Amy42a5b4b2019-10-03 16:49:48 -0700419 mInputThreadRunning = false;
420
Amy5094ae12019-10-04 18:43:21 -0700421 std::lock_guard<std::mutex> lock(mInputThreadLock);
422
Amya4885292019-09-06 10:30:53 -0700423 return Result::SUCCESS;
424}
425
426Return<Result> Demux::flushInput() {
427 ALOGV("%s", __FUNCTION__);
428
429 return Result::SUCCESS;
430}
431
432Return<Result> Demux::removeInput() {
433 ALOGV("%s", __FUNCTION__);
434
435 mInputMQ = nullptr;
436
437 return Result::SUCCESS;
438}
439
440Result Demux::startFilterLoop(uint32_t filterId) {
441 struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs));
442 threadArgs->user = this;
443 threadArgs->filterId = filterId;
444
445 pthread_t mFilterThread;
446 pthread_create(&mFilterThread, NULL, __threadLoopFilter, (void*)threadArgs);
447 mFilterThreads[filterId] = mFilterThread;
448 pthread_setname_np(mFilterThread, "demux_filter_waiting_loop");
449
450 return Result::SUCCESS;
451}
452
Amy42a5b4b2019-10-03 16:49:48 -0700453Result Demux::startSectionFilterHandler(uint32_t filterId) {
454 if (mFilterOutputs[filterId].empty()) {
455 return Result::SUCCESS;
456 }
457 if (!writeSectionsAndCreateEvent(filterId, mFilterOutputs[filterId])) {
Amya4885292019-09-06 10:30:53 -0700458 ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
459 return Result::UNKNOWN_ERROR;
460 }
461
Amy42a5b4b2019-10-03 16:49:48 -0700462 mFilterOutputs[filterId].clear();
463
Amya4885292019-09-06 10:30:53 -0700464 return Result::SUCCESS;
465}
466
467Result Demux::startPesFilterHandler(uint32_t filterId) {
Amy42a5b4b2019-10-03 16:49:48 -0700468 std::lock_guard<std::mutex> lock(mFilterEventLock);
Amy42a5b4b2019-10-03 16:49:48 -0700469 if (mFilterOutputs[filterId].empty()) {
470 return Result::SUCCESS;
471 }
472
Amy5094ae12019-10-04 18:43:21 -0700473 for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) {
Amy1109e9f2019-10-10 18:30:28 -0700474 if (mPesSizeLeft == 0) {
475 uint32_t prefix = (mFilterOutputs[filterId][i + 4] << 16) |
476 (mFilterOutputs[filterId][i + 5] << 8) |
477 mFilterOutputs[filterId][i + 6];
478 ALOGD("[Demux] prefix %d", prefix);
479 if (prefix == 0x000001) {
480 // TODO handle mulptiple Pes filters
481 mPesSizeLeft =
482 (mFilterOutputs[filterId][i + 7] << 8) | mFilterOutputs[filterId][i + 8];
483 ALOGD("[Demux] pes data length %d", mPesSizeLeft);
484 } else {
485 continue;
Amy5094ae12019-10-04 18:43:21 -0700486 }
Amy5094ae12019-10-04 18:43:21 -0700487 }
Amy1109e9f2019-10-10 18:30:28 -0700488
489 int endPoint = min(184, mPesSizeLeft);
490 // append data and check size
491 vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4;
492 vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 3 + endPoint;
493 mPesOutput.insert(mPesOutput.end(), first, last);
494 // size does not match then continue
495 mPesSizeLeft -= endPoint;
496 if (mPesSizeLeft > 0) {
497 continue;
498 }
499 // size match then create event
500 if (!writeDataToFilterMQ(mPesOutput, filterId)) {
501 mFilterOutputs[filterId].clear();
502 return Result::INVALID_STATE;
503 }
504 maySendFilterStatusCallback(filterId);
505 DemuxFilterPesEvent pesEvent;
506 pesEvent = {
507 // temp dump meta data
508 .streamId = mPesOutput[3],
509 .dataLength = static_cast<uint16_t>(mPesOutput.size()),
510 };
511 ALOGD("[Demux] assembled pes data length %d", pesEvent.dataLength);
512
513 int size = mFilterEvents[filterId].events.size();
514 mFilterEvents[filterId].events.resize(size + 1);
515 mFilterEvents[filterId].events[size].pes(pesEvent);
516 mPesOutput.clear();
Amy42a5b4b2019-10-03 16:49:48 -0700517 }
Amya4885292019-09-06 10:30:53 -0700518
Amy42a5b4b2019-10-03 16:49:48 -0700519 mFilterOutputs[filterId].clear();
Amya4885292019-09-06 10:30:53 -0700520
Amya4885292019-09-06 10:30:53 -0700521 return Result::SUCCESS;
522}
523
524Result Demux::startTsFilterHandler() {
525 // TODO handle starting TS filter
526 return Result::SUCCESS;
527}
528
529Result Demux::startMediaFilterHandler(uint32_t filterId) {
530 DemuxFilterMediaEvent mediaEvent;
531 mediaEvent = {
532 // temp dump meta data
533 .pts = 0,
534 .dataLength = 530,
535 .secureMemory = nullptr,
536 };
537 mFilterEvents[filterId].events.resize(1);
538 mFilterEvents[filterId].events[0].media() = mediaEvent;
Amy5094ae12019-10-04 18:43:21 -0700539
540 mFilterOutputs[filterId].clear();
Amya4885292019-09-06 10:30:53 -0700541 // TODO handle write FQM for media stream
542 return Result::SUCCESS;
543}
544
545Result Demux::startRecordFilterHandler(uint32_t filterId) {
546 DemuxFilterRecordEvent recordEvent;
547 recordEvent = {
548 // temp dump meta data
549 .tpid = 0,
550 .packetNum = 0,
551 };
552 recordEvent.indexMask.tsIndexMask() = 0x01;
553 mFilterEvents[filterId].events.resize(1);
554 mFilterEvents[filterId].events[0].ts() = recordEvent;
Amy5094ae12019-10-04 18:43:21 -0700555
556 mFilterOutputs[filterId].clear();
Amya4885292019-09-06 10:30:53 -0700557 return Result::SUCCESS;
558}
559
560Result Demux::startPcrFilterHandler() {
561 // TODO handle starting PCR filter
562 return Result::SUCCESS;
563}
564
565bool Demux::createFilterMQ(uint32_t bufferSize, uint32_t filterId) {
566 ALOGV("%s", __FUNCTION__);
567
568 // Create a synchronized FMQ that supports blocking read/write
569 std::unique_ptr<FilterMQ> tmpFilterMQ =
570 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
571 if (!tmpFilterMQ->isValid()) {
572 ALOGW("Failed to create FMQ of filter with id: %d", filterId);
573 return false;
574 }
575
576 mFilterMQs[filterId] = std::move(tmpFilterMQ);
577
578 EventFlag* filterEventFlag;
579 if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &filterEventFlag) !=
580 OK) {
581 return false;
582 }
583 mFilterEventFlags[filterId] = filterEventFlag;
584
585 return true;
586}
587
588bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
589 // TODO check how many sections has been read
590 std::lock_guard<std::mutex> lock(mFilterEventLock);
Amya4885292019-09-06 10:30:53 -0700591 if (!writeDataToFilterMQ(data, filterId)) {
592 return false;
593 }
Amy42a5b4b2019-10-03 16:49:48 -0700594 int size = mFilterEvents[filterId].events.size();
595 mFilterEvents[filterId].events.resize(size + 1);
Amya4885292019-09-06 10:30:53 -0700596 DemuxFilterSectionEvent secEvent;
597 secEvent = {
598 // temp dump meta data
599 .tableId = 0,
600 .version = 1,
601 .sectionNum = 1,
Amy42a5b4b2019-10-03 16:49:48 -0700602 .dataLength = static_cast<uint16_t>(data.size()),
Amya4885292019-09-06 10:30:53 -0700603 };
604 mFilterEvents[filterId].events[size].section(secEvent);
Amya609d5a2019-08-23 14:38:31 -0700605 return true;
606}
607
608bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId) {
609 std::lock_guard<std::mutex> lock(mWriteLock);
610 if (mFilterMQs[filterId]->write(data.data(), data.size())) {
611 return true;
612 }
613 return false;
614}
615
Amy5094ae12019-10-04 18:43:21 -0700616bool Demux::readInputFMQ() {
Amya4885292019-09-06 10:30:53 -0700617 // Read input data from the input FMQ
618 int size = mInputMQ->availableToRead();
Amy42a5b4b2019-10-03 16:49:48 -0700619 int inputPacketSize = mInputSettings.packetSize;
Amya4885292019-09-06 10:30:53 -0700620 vector<uint8_t> dataOutputBuffer;
Amy42a5b4b2019-10-03 16:49:48 -0700621 dataOutputBuffer.resize(inputPacketSize);
Amya609d5a2019-08-23 14:38:31 -0700622
Amy42a5b4b2019-10-03 16:49:48 -0700623 // Dispatch the packet to the PID matching filter output buffer
624 for (int i = 0; i < size / inputPacketSize; i++) {
Amy5094ae12019-10-04 18:43:21 -0700625 if (!mInputMQ->read(dataOutputBuffer.data(), inputPacketSize)) {
626 return false;
627 }
628 startTsFilter(dataOutputBuffer);
629 }
630
631 return true;
632}
633
634void Demux::startTsFilter(vector<uint8_t> data) {
635 set<uint32_t>::iterator it;
636 for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
637 uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
638 ALOGW("start ts filter pid: %d", pid);
639 if (pid == mFilterPids[*it]) {
640 mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), data.begin(), data.end());
Amy42a5b4b2019-10-03 16:49:48 -0700641 }
642 }
Amy5094ae12019-10-04 18:43:21 -0700643}
644
645bool Demux::startFilterDispatcher() {
646 Result result;
647 set<uint32_t>::iterator it;
Amy42a5b4b2019-10-03 16:49:48 -0700648
649 // Handle the output data per filter type
Amya4885292019-09-06 10:30:53 -0700650 for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
651 switch (mFilterEvents[*it].filterType) {
652 case DemuxFilterType::SECTION:
Amy42a5b4b2019-10-03 16:49:48 -0700653 result = startSectionFilterHandler(*it);
Amya4885292019-09-06 10:30:53 -0700654 break;
655 case DemuxFilterType::PES:
656 result = startPesFilterHandler(*it);
657 break;
658 case DemuxFilterType::TS:
659 result = startTsFilterHandler();
660 break;
661 case DemuxFilterType::AUDIO:
662 case DemuxFilterType::VIDEO:
663 result = startMediaFilterHandler(*it);
664 break;
665 case DemuxFilterType::RECORD:
666 result = startRecordFilterHandler(*it);
667 break;
668 case DemuxFilterType::PCR:
669 result = startPcrFilterHandler();
670 break;
671 default:
672 return false;
673 }
Amya609d5a2019-08-23 14:38:31 -0700674 }
675
Amya4885292019-09-06 10:30:53 -0700676 return result == Result::SUCCESS;
Amya609d5a2019-08-23 14:38:31 -0700677}
678
Amya4885292019-09-06 10:30:53 -0700679void* Demux::__threadLoopFilter(void* threadArg) {
Amya609d5a2019-08-23 14:38:31 -0700680 Demux* const self = static_cast<Demux*>(((struct ThreadArgs*)threadArg)->user);
Amya4885292019-09-06 10:30:53 -0700681 self->filterThreadLoop(((struct ThreadArgs*)threadArg)->filterId);
Amya609d5a2019-08-23 14:38:31 -0700682 return 0;
683}
684
Amya4885292019-09-06 10:30:53 -0700685void* Demux::__threadLoopInput(void* user) {
686 Demux* const self = static_cast<Demux*>(user);
687 self->inputThreadLoop();
688 return 0;
689}
Amya609d5a2019-08-23 14:38:31 -0700690
Amya4885292019-09-06 10:30:53 -0700691void Demux::filterThreadLoop(uint32_t filterId) {
692 ALOGD("[Demux] filter %d threadLoop start.", filterId);
Amy5094ae12019-10-04 18:43:21 -0700693 std::lock_guard<std::mutex> lock(mFilterThreadLock);
Amya4885292019-09-06 10:30:53 -0700694 mFilterThreadRunning[filterId] = true;
695
696 // For the first time of filter output, implementation needs to send the filter
697 // Event Callback without waiting for the DATA_CONSUMED to init the process.
698 while (mFilterThreadRunning[filterId]) {
699 if (mFilterEvents[filterId].events.size() == 0) {
700 ALOGD("[Demux] wait for filter data output.");
701 usleep(1000 * 1000);
702 continue;
703 }
704 // After successfully write, send a callback and wait for the read to be done
Amy79125022019-10-10 15:30:17 -0700705 mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
Amya4885292019-09-06 10:30:53 -0700706 mFilterEvents[filterId].events.resize(0);
Amy79125022019-10-10 15:30:17 -0700707 mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
708 mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]);
Amya4885292019-09-06 10:30:53 -0700709 break;
710 }
711
712 while (mFilterThreadRunning[filterId]) {
Amya609d5a2019-08-23 14:38:31 -0700713 uint32_t efState = 0;
714 // We do not wait for the last round of writen data to be read to finish the thread
715 // because the VTS can verify the reading itself.
716 for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
Amya4885292019-09-06 10:30:53 -0700717 while (mFilterThreadRunning[filterId]) {
Amya609d5a2019-08-23 14:38:31 -0700718 status_t status = mFilterEventFlags[filterId]->wait(
719 static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
720 WAIT_TIMEOUT, true /* retry on spurious wake */);
721 if (status != OK) {
722 ALOGD("[Demux] wait for data consumed");
723 continue;
724 }
725 break;
726 }
Amya609d5a2019-08-23 14:38:31 -0700727
Amy79125022019-10-10 15:30:17 -0700728 if (mFilterCallbacks[filterId] == nullptr) {
Amya4885292019-09-06 10:30:53 -0700729 ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
730 break;
731 }
732
Amy79125022019-10-10 15:30:17 -0700733 maySendFilterStatusCallback(filterId);
734
Amya4885292019-09-06 10:30:53 -0700735 while (mFilterThreadRunning[filterId]) {
736 std::lock_guard<std::mutex> lock(mFilterEventLock);
737 if (mFilterEvents[filterId].events.size() == 0) {
738 continue;
739 }
740 // After successfully write, send a callback and wait for the read to be done
Amy79125022019-10-10 15:30:17 -0700741 mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
Amya4885292019-09-06 10:30:53 -0700742 mFilterEvents[filterId].events.resize(0);
743 break;
744 }
745 // We do not wait for the last read to be done
746 // VTS can verify the read result itself.
747 if (i == SECTION_WRITE_COUNT - 1) {
748 ALOGD("[Demux] filter %d writing done. Ending thread", filterId);
749 break;
750 }
751 }
752 mFilterThreadRunning[filterId] = false;
Amya609d5a2019-08-23 14:38:31 -0700753 }
754
755 ALOGD("[Demux] filter thread ended.");
Amyfd4243a2019-08-16 16:01:27 -0700756}
757
Amya4885292019-09-06 10:30:53 -0700758void Demux::inputThreadLoop() {
759 ALOGD("[Demux] input threadLoop start.");
Amy5094ae12019-10-04 18:43:21 -0700760 std::lock_guard<std::mutex> lock(mInputThreadLock);
Amya4885292019-09-06 10:30:53 -0700761 mInputThreadRunning = true;
762
763 while (mInputThreadRunning) {
764 uint32_t efState = 0;
765 status_t status =
766 mInputEventFlag->wait(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY),
767 &efState, WAIT_TIMEOUT, true /* retry on spurious wake */);
768 if (status != OK) {
769 ALOGD("[Demux] wait for data ready on the input FMQ");
770 continue;
771 }
Nick Chalkoe36b09b2019-10-04 17:06:40 -0700772 // Our current implementation filter the data and write it into the filter FMQ immediately
Amya4885292019-09-06 10:30:53 -0700773 // after the DATA_READY from the VTS/framework
Amy5094ae12019-10-04 18:43:21 -0700774 if (!readInputFMQ() || !startFilterDispatcher()) {
Amya4885292019-09-06 10:30:53 -0700775 ALOGD("[Demux] input data failed to be filtered. Ending thread");
776 break;
777 }
Amy42a5b4b2019-10-03 16:49:48 -0700778
779 maySendInputStatusCallback();
Amya4885292019-09-06 10:30:53 -0700780 }
781
782 mInputThreadRunning = false;
783 ALOGD("[Demux] input thread ended.");
784}
785
Amy42a5b4b2019-10-03 16:49:48 -0700786void Demux::maySendInputStatusCallback() {
787 std::lock_guard<std::mutex> lock(mInputStatusLock);
788 int availableToRead = mInputMQ->availableToRead();
789 int availableToWrite = mInputMQ->availableToWrite();
790
791 DemuxInputStatus newStatus =
Amy79125022019-10-10 15:30:17 -0700792 checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
793 mInputSettings.lowThreshold);
Amy42a5b4b2019-10-03 16:49:48 -0700794 if (mIntputStatus != newStatus) {
795 mInputCallback->onInputStatus(newStatus);
796 mIntputStatus = newStatus;
797 }
798}
799
Amy79125022019-10-10 15:30:17 -0700800void Demux::maySendFilterStatusCallback(uint32_t filterId) {
801 std::lock_guard<std::mutex> lock(mFilterStatusLock);
802 int availableToRead = mFilterMQs[filterId]->availableToRead();
803 int availableToWrite = mInputMQ->availableToWrite();
804 int fmqSize = mFilterMQs[filterId]->getQuantumCount();
805
806 DemuxFilterStatus newStatus =
807 checkFilterStatusChange(filterId, availableToWrite, availableToRead,
808 ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
809 if (mFilterStatus[filterId] != newStatus) {
810 mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus);
811 mFilterStatus[filterId] = newStatus;
812 }
813}
814
815DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
816 uint32_t highThreshold, uint32_t lowThreshold) {
Amy42a5b4b2019-10-03 16:49:48 -0700817 if (availableToWrite == 0) {
818 return DemuxInputStatus::SPACE_FULL;
819 } else if (availableToRead > highThreshold) {
820 return DemuxInputStatus::SPACE_ALMOST_FULL;
821 } else if (availableToRead < lowThreshold) {
822 return DemuxInputStatus::SPACE_ALMOST_EMPTY;
823 } else if (availableToRead == 0) {
824 return DemuxInputStatus::SPACE_EMPTY;
825 }
826 return mIntputStatus;
827}
828
Amy79125022019-10-10 15:30:17 -0700829DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite,
830 uint32_t availableToRead, uint32_t highThreshold,
831 uint32_t lowThreshold) {
832 if (availableToWrite == 0) {
833 return DemuxFilterStatus::OVERFLOW;
834 } else if (availableToRead > highThreshold) {
835 return DemuxFilterStatus::HIGH_WATER;
836 } else if (availableToRead < lowThreshold) {
837 return DemuxFilterStatus::LOW_WATER;
838 }
839 return mFilterStatus[filterId];
840}
841
Amy5094ae12019-10-04 18:43:21 -0700842Result Demux::startBroadcastInputLoop() {
843 pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this);
844 pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread");
845
846 return Result::SUCCESS;
847}
848
849void* Demux::__threadLoopBroadcast(void* user) {
850 Demux* const self = static_cast<Demux*>(user);
851 self->broadcastInputThreadLoop();
852 return 0;
853}
854
855void Demux::broadcastInputThreadLoop() {
856 std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
857 mBroadcastInputThreadRunning = true;
858 mKeepFetchingDataFromFrontend = true;
859
860 // open the stream and get its length
861 std::ifstream inputData(mFrontendSourceFile, std::ifstream::binary);
862 // TODO take the packet size from the frontend setting
863 int packetSize = 188;
864 int writePacketAmount = 6;
865 char* buffer = new char[packetSize];
866 ALOGW("[Demux] broadcast input thread loop start %s", mFrontendSourceFile.c_str());
867 if (!inputData.is_open()) {
868 mBroadcastInputThreadRunning = false;
869 ALOGW("[Demux] Error %s", strerror(errno));
870 }
871
872 while (mBroadcastInputThreadRunning) {
873 // move the stream pointer for packet size * 6 every read until the end
874 while (mKeepFetchingDataFromFrontend) {
875 for (int i = 0; i < writePacketAmount; i++) {
876 inputData.read(buffer, packetSize);
877 if (!inputData) {
878 mBroadcastInputThreadRunning = false;
879 break;
880 }
881 // filter and dispatch filter output
882 vector<uint8_t> byteBuffer;
Amy79125022019-10-10 15:30:17 -0700883 byteBuffer.resize(packetSize);
Amy5094ae12019-10-04 18:43:21 -0700884 for (int index = 0; index < byteBuffer.size(); index++) {
885 byteBuffer[index] = static_cast<uint8_t>(buffer[index]);
886 }
887 startTsFilter(byteBuffer);
888 inputData.seekg(packetSize, inputData.cur);
889 }
890 startFilterDispatcher();
891 sleep(1);
892 }
893 }
894
895 ALOGW("[Demux] Broadcast Input thread end.");
896 delete[] buffer;
897 inputData.close();
898}
899
900void Demux::stopBroadcastInput() {
901 mKeepFetchingDataFromFrontend = false;
902 mBroadcastInputThreadRunning = false;
903 std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
904}
905
Amyfd4243a2019-08-16 16:01:27 -0700906} // namespace implementation
907} // namespace V1_0
908} // namespace tuner
909} // namespace tv
910} // namespace hardware
911} // namespace android