blob: b998468d26d91fcc31f427659caa4c3518ec5c7e [file] [log] [blame]
Amyfd4243a2019-08-16 16:01:27 -07001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "android.hardware.tv.tuner@1.0-Demux"
18
19#include "Demux.h"
20#include <utils/Log.h>
21
22namespace android {
23namespace hardware {
24namespace tv {
25namespace tuner {
26namespace V1_0 {
27namespace implementation {
28
Amya609d5a2019-08-23 14:38:31 -070029#define WAIT_TIMEOUT 3000000000
30
31const std::vector<uint8_t> fakeDataInputBuffer{
32 0x00, 0x00, 0x00, 0x01, 0x09, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x67, 0x42, 0xc0, 0x1e, 0xdb,
33 0x01, 0x40, 0x16, 0xec, 0x04, 0x40, 0x00, 0x00, 0x03, 0x00, 0x40, 0x00, 0x00, 0x0f, 0x03,
34 0xc5, 0x8b, 0xb8, 0x00, 0x00, 0x00, 0x01, 0x68, 0xca, 0x8c, 0xb2, 0x00, 0x00, 0x01, 0x06,
35 0x05, 0xff, 0xff, 0x70, 0xdc, 0x45, 0xe9, 0xbd, 0xe6, 0xd9, 0x48, 0xb7, 0x96, 0x2c, 0xd8,
36 0x20, 0xd9, 0x23, 0xee, 0xef, 0x78, 0x32, 0x36, 0x34, 0x20, 0x2d, 0x20, 0x63, 0x6f, 0x72,
37 0x65, 0x20, 0x31, 0x34, 0x32, 0x20, 0x2d, 0x20, 0x48, 0x2e, 0x32, 0x36, 0x34, 0x2f, 0x4d,
38 0x50, 0x45, 0x47, 0x2d, 0x34, 0x20, 0x41, 0x56, 0x43, 0x20, 0x63, 0x6f, 0x64, 0x65, 0x63,
39 0x20, 0x2d, 0x20, 0x43, 0x6f, 0x70, 0x79, 0x6c, 0x65, 0x66, 0x74, 0x20, 0x32, 0x30, 0x30,
40 0x33, 0x2d, 0x32, 0x30, 0x31, 0x34, 0x20, 0x2d, 0x20, 0x68, 0x74, 0x74, 0x70, 0x3a, 0x2f,
41 0x2f, 0x77, 0x77, 0x77, 0x2e, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x6c, 0x61, 0x6e, 0x2e, 0x6f,
42 0x72, 0x67, 0x2f, 0x78, 0x32, 0x36, 0x34, 0x2e, 0x68, 0x74, 0x6d, 0x6c, 0x20, 0x2d, 0x20,
43 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x3a, 0x20, 0x63, 0x61, 0x62, 0x61, 0x63, 0x3d,
44 0x30, 0x20, 0x72, 0x65, 0x66, 0x3d, 0x32, 0x20, 0x64, 0x65, 0x62, 0x6c, 0x6f, 0x63, 0x6b,
45 0x3d, 0x31, 0x3a, 0x30, 0x3a, 0x30, 0x20, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x73, 0x65, 0x3d,
46 0x30, 0x78, 0x31, 0x3a, 0x30, 0x78, 0x31, 0x31, 0x31, 0x20, 0x6d, 0x65, 0x3d, 0x68, 0x65,
47 0x78, 0x20, 0x73, 0x75, 0x62, 0x6d, 0x65, 0x3d, 0x37, 0x20, 0x70, 0x73, 0x79, 0x3d, 0x31,
48 0x20, 0x70, 0x73, 0x79, 0x5f, 0x72, 0x64, 0x3d, 0x31, 0x2e, 0x30, 0x30, 0x3a, 0x30, 0x2e,
49 0x30, 0x30, 0x20, 0x6d, 0x69, 0x78, 0x65, 0x64, 0x5f, 0x72, 0x65, 0x66, 0x3d, 0x31, 0x20,
50 0x6d, 0x65, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x3d, 0x31, 0x36, 0x20, 0x63, 0x68, 0x72,
51 0x6f, 0x6d, 0x61, 0x5f, 0x6d, 0x65, 0x3d, 0x31, 0x20, 0x74, 0x72, 0x65, 0x6c, 0x6c, 0x69,
52 0x73, 0x3d, 0x31, 0x20, 0x38, 0x78, 0x38, 0x64, 0x63, 0x74, 0x3d, 0x30, 0x20, 0x63, 0x71,
53 0x6d, 0x3d, 0x30, 0x20, 0x64, 0x65, 0x61, 0x64, 0x7a, 0x6f, 0x6e, 0x65, 0x3d, 0x32, 0x31,
54 0x2c, 0x31, 0x31, 0x20, 0x66, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x73, 0x6b, 0x69, 0x70, 0x3d,
55 0x31, 0x20, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x5f, 0x71, 0x70, 0x5f, 0x6f, 0x66, 0x66,
56 0x73, 0x65, 0x74, 0x3d, 0x2d, 0x32, 0x20, 0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d,
57 0x36, 0x30, 0x20, 0x6c, 0x6f, 0x6f, 0x6b, 0x61, 0x68, 0x65, 0x61, 0x64, 0x5f, 0x74, 0x68,
58 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d, 0x35, 0x20, 0x73, 0x6c, 0x69, 0x63, 0x65, 0x64, 0x5f,
59 0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d, 0x30, 0x20, 0x6e, 0x72, 0x3d, 0x30, 0x20,
60 0x64, 0x65, 0x63, 0x69, 0x6d, 0x61, 0x74, 0x65, 0x3d, 0x31, 0x20, 0x69, 0x6e, 0x74, 0x65,
61 0x72, 0x6c, 0x61, 0x63, 0x65, 0x64, 0x3d, 0x30, 0x20, 0x62, 0x6c, 0x75, 0x72, 0x61, 0x79,
62 0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x3d, 0x30, 0x20, 0x63, 0x6f, 0x6e, 0x73, 0x74,
63 0x72, 0x61, 0x69, 0x6e, 0x65, 0x64, 0x5f, 0x69, 0x6e, 0x74, 0x72, 0x61, 0x3d, 0x30, 0x20,
64 0x62, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x73, 0x3d, 0x30, 0x20, 0x77, 0x65, 0x69, 0x67, 0x68,
65 0x74, 0x70, 0x3d, 0x30, 0x20, 0x6b, 0x65, 0x79, 0x69, 0x6e, 0x74, 0x3d, 0x32, 0x35, 0x30,
66 0x20, 0x6b, 0x65, 0x79, 0x69, 0x6e, 0x74, 0x5f, 0x6d, 0x69, 0x6e, 0x3d, 0x32, 0x35, 0x20,
67 0x73, 0x63, 0x65, 0x6e, 0x65,
68};
69
Amy5094ae12019-10-04 18:43:21 -070070Demux::Demux(uint32_t demuxId, sp<Tuner> tuner) {
Amyfd4243a2019-08-16 16:01:27 -070071 mDemuxId = demuxId;
Amy5094ae12019-10-04 18:43:21 -070072 mTunerService = tuner;
Amyfd4243a2019-08-16 16:01:27 -070073}
74
75Demux::~Demux() {}
76
77Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) {
78 ALOGV("%s", __FUNCTION__);
79
Amy5094ae12019-10-04 18:43:21 -070080 if (mTunerService == nullptr) {
81 return Result::NOT_INITIALIZED;
82 }
Amyfd4243a2019-08-16 16:01:27 -070083
Amy5094ae12019-10-04 18:43:21 -070084 mFrontend = mTunerService->getFrontendById(frontendId);
85
86 if (mFrontend == nullptr) {
87 return Result::INVALID_STATE;
88 }
89
90 mFrontendSourceFile = mFrontend->getSourceFile();
91
92 mTunerService->setFrontendAsDemuxSource(frontendId, mDemuxId);
93 return startBroadcastInputLoop();
Amyfd4243a2019-08-16 16:01:27 -070094}
95
Amya609d5a2019-08-23 14:38:31 -070096Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
97 const sp<IDemuxCallback>& cb, addFilter_cb _hidl_cb) {
98 ALOGV("%s", __FUNCTION__);
99
Amya4885292019-09-06 10:30:53 -0700100 uint32_t filterId;
101
102 if (!mUnusedFilterIds.empty()) {
103 filterId = *mUnusedFilterIds.begin();
104
105 mUnusedFilterIds.erase(filterId);
106 } else {
107 filterId = ++mLastUsedFilterId;
108
Amy79125022019-10-10 15:30:17 -0700109 mFilterCallbacks.resize(filterId + 1);
Amya4885292019-09-06 10:30:53 -0700110 mFilterMQs.resize(filterId + 1);
111 mFilterEvents.resize(filterId + 1);
112 mFilterEventFlags.resize(filterId + 1);
113 mFilterThreadRunning.resize(filterId + 1);
114 mFilterThreads.resize(filterId + 1);
Amy42a5b4b2019-10-03 16:49:48 -0700115 mFilterPids.resize(filterId + 1);
116 mFilterOutputs.resize(filterId + 1);
Amy79125022019-10-10 15:30:17 -0700117 mFilterStatus.resize(filterId + 1);
Amya4885292019-09-06 10:30:53 -0700118 }
119
120 mUsedFilterIds.insert(filterId);
Amya609d5a2019-08-23 14:38:31 -0700121
122 if ((type != DemuxFilterType::PCR || type != DemuxFilterType::TS) && cb == nullptr) {
123 ALOGW("callback can't be null");
124 _hidl_cb(Result::INVALID_ARGUMENT, filterId);
125 return Void();
126 }
Amya4885292019-09-06 10:30:53 -0700127
Amya609d5a2019-08-23 14:38:31 -0700128 // Add callback
Amy79125022019-10-10 15:30:17 -0700129 mFilterCallbacks[filterId] = cb;
Amya609d5a2019-08-23 14:38:31 -0700130
Amya4885292019-09-06 10:30:53 -0700131 // Mapping from the filter ID to the filter event
132 DemuxFilterEvent event{
133 .filterId = filterId,
134 .filterType = type,
135 };
136 mFilterEvents[filterId] = event;
Amya609d5a2019-08-23 14:38:31 -0700137
Amya4885292019-09-06 10:30:53 -0700138 if (!createFilterMQ(bufferSize, filterId)) {
Amya609d5a2019-08-23 14:38:31 -0700139 _hidl_cb(Result::UNKNOWN_ERROR, -1);
140 return Void();
141 }
142
143 _hidl_cb(Result::SUCCESS, filterId);
144 return Void();
145}
146
147Return<void> Demux::getFilterQueueDesc(uint32_t filterId, getFilterQueueDesc_cb _hidl_cb) {
148 ALOGV("%s", __FUNCTION__);
149
Amya4885292019-09-06 10:30:53 -0700150 if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
151 ALOGW("No filter with id: %d exists to get desc", filterId);
Amya609d5a2019-08-23 14:38:31 -0700152 _hidl_cb(Result::INVALID_ARGUMENT, FilterMQ::Descriptor());
153 return Void();
154 }
155
156 _hidl_cb(Result::SUCCESS, *mFilterMQs[filterId]->getDesc());
157 return Void();
158}
159
Amy42a5b4b2019-10-03 16:49:48 -0700160Return<Result> Demux::configureFilter(uint32_t filterId, const DemuxFilterSettings& settings) {
Amya609d5a2019-08-23 14:38:31 -0700161 ALOGV("%s", __FUNCTION__);
162
Amy42a5b4b2019-10-03 16:49:48 -0700163 switch (mFilterEvents[filterId].filterType) {
164 case DemuxFilterType::SECTION:
165 mFilterPids[filterId] = settings.section().tpid;
166 break;
167 case DemuxFilterType::PES:
168 mFilterPids[filterId] = settings.pesData().tpid;
169 break;
170 case DemuxFilterType::TS:
171 mFilterPids[filterId] = settings.ts().tpid;
172 break;
173 case DemuxFilterType::AUDIO:
174 mFilterPids[filterId] = settings.audio().tpid;
175 break;
176 case DemuxFilterType::VIDEO:
177 mFilterPids[filterId] = settings.video().tpid;
178 break;
179 case DemuxFilterType::RECORD:
180 mFilterPids[filterId] = settings.record().tpid;
181 break;
182 case DemuxFilterType::PCR:
183 mFilterPids[filterId] = settings.pcr().tpid;
184 break;
185 default:
186 return Result::UNKNOWN_ERROR;
187 }
Amya609d5a2019-08-23 14:38:31 -0700188 return Result::SUCCESS;
189}
190
191Return<Result> Demux::startFilter(uint32_t filterId) {
192 ALOGV("%s", __FUNCTION__);
Amya4885292019-09-06 10:30:53 -0700193 Result result;
Amya609d5a2019-08-23 14:38:31 -0700194
Amya4885292019-09-06 10:30:53 -0700195 if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
196 ALOGW("No filter with id: %d exists to start filter", filterId);
Amya609d5a2019-08-23 14:38:31 -0700197 return Result::INVALID_ARGUMENT;
198 }
199
Amy42a5b4b2019-10-03 16:49:48 -0700200 result = startFilterLoop(filterId);
Amya609d5a2019-08-23 14:38:31 -0700201
202 return result;
203}
204
Amy42a5b4b2019-10-03 16:49:48 -0700205Return<Result> Demux::stopFilter(uint32_t filterId) {
Amya609d5a2019-08-23 14:38:31 -0700206 ALOGV("%s", __FUNCTION__);
207
Amy42a5b4b2019-10-03 16:49:48 -0700208 mFilterThreadRunning[filterId] = false;
209
Amy5094ae12019-10-04 18:43:21 -0700210 std::lock_guard<std::mutex> lock(mFilterThreadLock);
211
Amya609d5a2019-08-23 14:38:31 -0700212 return Result::SUCCESS;
213}
214
Amy79125022019-10-10 15:30:17 -0700215Return<Result> Demux::flushFilter(uint32_t filterId) {
Amya609d5a2019-08-23 14:38:31 -0700216 ALOGV("%s", __FUNCTION__);
217
Amy79125022019-10-10 15:30:17 -0700218 // temp implementation to flush the FMQ
219 int size = mFilterMQs[filterId]->availableToRead();
220 char* buffer = new char[size];
221 mOutputMQ->read((unsigned char*)&buffer[0], size);
222 delete[] buffer;
223 mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
224
Amya609d5a2019-08-23 14:38:31 -0700225 return Result::SUCCESS;
226}
227
Amya4885292019-09-06 10:30:53 -0700228Return<Result> Demux::removeFilter(uint32_t filterId) {
Amya609d5a2019-08-23 14:38:31 -0700229 ALOGV("%s", __FUNCTION__);
230
Amya4885292019-09-06 10:30:53 -0700231 // resetFilterRecords(filterId);
232 mUsedFilterIds.erase(filterId);
233 mUnusedFilterIds.insert(filterId);
234
Amya609d5a2019-08-23 14:38:31 -0700235 return Result::SUCCESS;
236}
237
238Return<void> Demux::getAvSyncHwId(uint32_t /* filterId */, getAvSyncHwId_cb _hidl_cb) {
239 ALOGV("%s", __FUNCTION__);
240
241 AvSyncHwId avSyncHwId = 0;
242
243 _hidl_cb(Result::SUCCESS, avSyncHwId);
244 return Void();
245}
246
247Return<void> Demux::getAvSyncTime(AvSyncHwId /* avSyncHwId */, getAvSyncTime_cb _hidl_cb) {
248 ALOGV("%s", __FUNCTION__);
249
250 uint64_t avSyncTime = 0;
251
252 _hidl_cb(Result::SUCCESS, avSyncTime);
253 return Void();
254}
255
Amyfd4243a2019-08-16 16:01:27 -0700256Return<Result> Demux::close() {
257 ALOGV("%s", __FUNCTION__);
258
Amya4885292019-09-06 10:30:53 -0700259 set<uint32_t>::iterator it;
260 mInputThread = 0;
261 mOutputThread = 0;
262 mFilterThreads.clear();
263 mUnusedFilterIds.clear();
264 mUsedFilterIds.clear();
Amy79125022019-10-10 15:30:17 -0700265 mFilterCallbacks.clear();
Amya4885292019-09-06 10:30:53 -0700266 mFilterMQs.clear();
267 mFilterEvents.clear();
268 mFilterEventFlags.clear();
Amy42a5b4b2019-10-03 16:49:48 -0700269 mFilterOutputs.clear();
270 mFilterPids.clear();
Amya4885292019-09-06 10:30:53 -0700271 mLastUsedFilterId = -1;
272
Amyfd4243a2019-08-16 16:01:27 -0700273 return Result::SUCCESS;
Amya609d5a2019-08-23 14:38:31 -0700274}
275
Amya4885292019-09-06 10:30:53 -0700276Return<Result> Demux::addOutput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
277 ALOGV("%s", __FUNCTION__);
278
279 // Create a synchronized FMQ that supports blocking read/write
280 std::unique_ptr<FilterMQ> tmpFilterMQ =
281 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
282 if (!tmpFilterMQ->isValid()) {
283 ALOGW("Failed to create output FMQ");
284 return Result::UNKNOWN_ERROR;
Amya609d5a2019-08-23 14:38:31 -0700285 }
Amya4885292019-09-06 10:30:53 -0700286
287 mOutputMQ = std::move(tmpFilterMQ);
288
289 if (EventFlag::createEventFlag(mOutputMQ->getEventFlagWord(), &mOutputEventFlag) != OK) {
290 return Result::UNKNOWN_ERROR;
291 }
292
293 mOutputCallback = cb;
294
295 return Result::SUCCESS;
296}
297
298Return<void> Demux::getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) {
299 ALOGV("%s", __FUNCTION__);
300
301 if (!mOutputMQ) {
302 _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
303 return Void();
304 }
305
306 _hidl_cb(Result::SUCCESS, *mOutputMQ->getDesc());
307 return Void();
308}
309
Amy42a5b4b2019-10-03 16:49:48 -0700310Return<Result> Demux::configureOutput(const DemuxOutputSettings& settings) {
311 ALOGV("%s", __FUNCTION__);
312
313 mOutputConfigured = true;
314 mOutputSettings = settings;
315 return Result::SUCCESS;
316}
317
318Return<Result> Demux::attachOutputFilter(uint32_t /*filterId*/) {
Amya4885292019-09-06 10:30:53 -0700319 ALOGV("%s", __FUNCTION__);
320
321 return Result::SUCCESS;
322}
323
Amy42a5b4b2019-10-03 16:49:48 -0700324Return<Result> Demux::detachOutputFilter(uint32_t /* filterId */) {
Amya4885292019-09-06 10:30:53 -0700325 ALOGV("%s", __FUNCTION__);
326
327 return Result::SUCCESS;
328}
329
330Return<Result> Demux::startOutput() {
331 ALOGV("%s", __FUNCTION__);
332
333 return Result::SUCCESS;
334}
335
336Return<Result> Demux::stopOutput() {
337 ALOGV("%s", __FUNCTION__);
338
339 return Result::SUCCESS;
340}
341
342Return<Result> Demux::flushOutput() {
343 ALOGV("%s", __FUNCTION__);
344
345 return Result::SUCCESS;
346}
347
348Return<Result> Demux::removeOutput() {
349 ALOGV("%s", __FUNCTION__);
350
351 return Result::SUCCESS;
352}
353
354Return<Result> Demux::addInput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
355 ALOGV("%s", __FUNCTION__);
356
357 // Create a synchronized FMQ that supports blocking read/write
358 std::unique_ptr<FilterMQ> tmpInputMQ =
359 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
360 if (!tmpInputMQ->isValid()) {
361 ALOGW("Failed to create input FMQ");
362 return Result::UNKNOWN_ERROR;
363 }
364
365 mInputMQ = std::move(tmpInputMQ);
366
367 if (EventFlag::createEventFlag(mInputMQ->getEventFlagWord(), &mInputEventFlag) != OK) {
368 return Result::UNKNOWN_ERROR;
369 }
370
371 mInputCallback = cb;
372
373 return Result::SUCCESS;
374}
375
376Return<void> Demux::getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) {
377 ALOGV("%s", __FUNCTION__);
378
379 if (!mInputMQ) {
380 _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
381 return Void();
382 }
383
384 _hidl_cb(Result::SUCCESS, *mInputMQ->getDesc());
385 return Void();
386}
387
Amy42a5b4b2019-10-03 16:49:48 -0700388Return<Result> Demux::configureInput(const DemuxInputSettings& settings) {
Amya4885292019-09-06 10:30:53 -0700389 ALOGV("%s", __FUNCTION__);
390
Amy42a5b4b2019-10-03 16:49:48 -0700391 mInputConfigured = true;
392 mInputSettings = settings;
393
Amya4885292019-09-06 10:30:53 -0700394 return Result::SUCCESS;
395}
396
397Return<Result> Demux::startInput() {
398 ALOGV("%s", __FUNCTION__);
399
Amy42a5b4b2019-10-03 16:49:48 -0700400 if (!mInputCallback) {
401 return Result::NOT_INITIALIZED;
402 }
403
404 if (!mInputConfigured) {
405 return Result::INVALID_STATE;
406 }
407
Amya4885292019-09-06 10:30:53 -0700408 pthread_create(&mInputThread, NULL, __threadLoopInput, this);
409 pthread_setname_np(mInputThread, "demux_input_waiting_loop");
410
411 // TODO start another thread to send filter status callback to the framework
412
413 return Result::SUCCESS;
414}
415
416Return<Result> Demux::stopInput() {
417 ALOGV("%s", __FUNCTION__);
418
Amy42a5b4b2019-10-03 16:49:48 -0700419 mInputThreadRunning = false;
420
Amy5094ae12019-10-04 18:43:21 -0700421 std::lock_guard<std::mutex> lock(mInputThreadLock);
422
Amya4885292019-09-06 10:30:53 -0700423 return Result::SUCCESS;
424}
425
426Return<Result> Demux::flushInput() {
427 ALOGV("%s", __FUNCTION__);
428
429 return Result::SUCCESS;
430}
431
432Return<Result> Demux::removeInput() {
433 ALOGV("%s", __FUNCTION__);
434
435 mInputMQ = nullptr;
436
437 return Result::SUCCESS;
438}
439
440Result Demux::startFilterLoop(uint32_t filterId) {
441 struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs));
442 threadArgs->user = this;
443 threadArgs->filterId = filterId;
444
445 pthread_t mFilterThread;
446 pthread_create(&mFilterThread, NULL, __threadLoopFilter, (void*)threadArgs);
447 mFilterThreads[filterId] = mFilterThread;
448 pthread_setname_np(mFilterThread, "demux_filter_waiting_loop");
449
450 return Result::SUCCESS;
451}
452
Amy42a5b4b2019-10-03 16:49:48 -0700453Result Demux::startSectionFilterHandler(uint32_t filterId) {
454 if (mFilterOutputs[filterId].empty()) {
455 return Result::SUCCESS;
456 }
457 if (!writeSectionsAndCreateEvent(filterId, mFilterOutputs[filterId])) {
Amya4885292019-09-06 10:30:53 -0700458 ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
459 return Result::UNKNOWN_ERROR;
460 }
461
Amy42a5b4b2019-10-03 16:49:48 -0700462 mFilterOutputs[filterId].clear();
463
Amya4885292019-09-06 10:30:53 -0700464 return Result::SUCCESS;
465}
466
467Result Demux::startPesFilterHandler(uint32_t filterId) {
Amy42a5b4b2019-10-03 16:49:48 -0700468 std::lock_guard<std::mutex> lock(mFilterEventLock);
Amya4885292019-09-06 10:30:53 -0700469 DemuxFilterPesEvent pesEvent;
Amy42a5b4b2019-10-03 16:49:48 -0700470 if (mFilterOutputs[filterId].empty()) {
471 return Result::SUCCESS;
472 }
473
Amy5094ae12019-10-04 18:43:21 -0700474 for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) {
475 uint8_t pusi = mFilterOutputs[filterId][i + 1] & 0x40;
476 uint8_t adaptFieldControl = (mFilterOutputs[filterId][i + 3] & 0x30) >> 4;
477 ALOGD("[Demux] pusi %d, adaptFieldControl %d", pusi, adaptFieldControl);
478 if (pusi && (adaptFieldControl == 0x01)) {
479 vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4;
480 vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 187;
481 vector<uint8_t> filterOutData(first, last);
482 if (!writeDataToFilterMQ(filterOutData, filterId)) {
483 mFilterOutputs[filterId].clear();
484 return Result::INVALID_STATE;
485 }
Amy79125022019-10-10 15:30:17 -0700486 maySendFilterStatusCallback(filterId);
Amy5094ae12019-10-04 18:43:21 -0700487 pesEvent = {
488 // temp dump meta data
489 .streamId = filterOutData[3],
490 .dataLength = static_cast<uint16_t>(filterOutData.size()),
491 };
492 int size = mFilterEvents[filterId].events.size();
493 mFilterEvents[filterId].events.resize(size + 1);
494 mFilterEvents[filterId].events[size].pes(pesEvent);
495 }
Amy42a5b4b2019-10-03 16:49:48 -0700496 }
Amya4885292019-09-06 10:30:53 -0700497
Amy42a5b4b2019-10-03 16:49:48 -0700498 mFilterOutputs[filterId].clear();
Amya4885292019-09-06 10:30:53 -0700499
Amya4885292019-09-06 10:30:53 -0700500 return Result::SUCCESS;
501}
502
503Result Demux::startTsFilterHandler() {
504 // TODO handle starting TS filter
505 return Result::SUCCESS;
506}
507
508Result Demux::startMediaFilterHandler(uint32_t filterId) {
509 DemuxFilterMediaEvent mediaEvent;
510 mediaEvent = {
511 // temp dump meta data
512 .pts = 0,
513 .dataLength = 530,
514 .secureMemory = nullptr,
515 };
516 mFilterEvents[filterId].events.resize(1);
517 mFilterEvents[filterId].events[0].media() = mediaEvent;
Amy5094ae12019-10-04 18:43:21 -0700518
519 mFilterOutputs[filterId].clear();
Amya4885292019-09-06 10:30:53 -0700520 // TODO handle write FQM for media stream
521 return Result::SUCCESS;
522}
523
524Result Demux::startRecordFilterHandler(uint32_t filterId) {
525 DemuxFilterRecordEvent recordEvent;
526 recordEvent = {
527 // temp dump meta data
528 .tpid = 0,
529 .packetNum = 0,
530 };
531 recordEvent.indexMask.tsIndexMask() = 0x01;
532 mFilterEvents[filterId].events.resize(1);
533 mFilterEvents[filterId].events[0].ts() = recordEvent;
Amy5094ae12019-10-04 18:43:21 -0700534
535 mFilterOutputs[filterId].clear();
Amya4885292019-09-06 10:30:53 -0700536 return Result::SUCCESS;
537}
538
539Result Demux::startPcrFilterHandler() {
540 // TODO handle starting PCR filter
541 return Result::SUCCESS;
542}
543
544bool Demux::createFilterMQ(uint32_t bufferSize, uint32_t filterId) {
545 ALOGV("%s", __FUNCTION__);
546
547 // Create a synchronized FMQ that supports blocking read/write
548 std::unique_ptr<FilterMQ> tmpFilterMQ =
549 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
550 if (!tmpFilterMQ->isValid()) {
551 ALOGW("Failed to create FMQ of filter with id: %d", filterId);
552 return false;
553 }
554
555 mFilterMQs[filterId] = std::move(tmpFilterMQ);
556
557 EventFlag* filterEventFlag;
558 if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &filterEventFlag) !=
559 OK) {
560 return false;
561 }
562 mFilterEventFlags[filterId] = filterEventFlag;
563
564 return true;
565}
566
567bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
568 // TODO check how many sections has been read
569 std::lock_guard<std::mutex> lock(mFilterEventLock);
Amya4885292019-09-06 10:30:53 -0700570 if (!writeDataToFilterMQ(data, filterId)) {
571 return false;
572 }
Amy42a5b4b2019-10-03 16:49:48 -0700573 int size = mFilterEvents[filterId].events.size();
574 mFilterEvents[filterId].events.resize(size + 1);
Amya4885292019-09-06 10:30:53 -0700575 DemuxFilterSectionEvent secEvent;
576 secEvent = {
577 // temp dump meta data
578 .tableId = 0,
579 .version = 1,
580 .sectionNum = 1,
Amy42a5b4b2019-10-03 16:49:48 -0700581 .dataLength = static_cast<uint16_t>(data.size()),
Amya4885292019-09-06 10:30:53 -0700582 };
583 mFilterEvents[filterId].events[size].section(secEvent);
Amya609d5a2019-08-23 14:38:31 -0700584 return true;
585}
586
587bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId) {
588 std::lock_guard<std::mutex> lock(mWriteLock);
589 if (mFilterMQs[filterId]->write(data.data(), data.size())) {
590 return true;
591 }
592 return false;
593}
594
Amy5094ae12019-10-04 18:43:21 -0700595bool Demux::readInputFMQ() {
Amya4885292019-09-06 10:30:53 -0700596 // Read input data from the input FMQ
597 int size = mInputMQ->availableToRead();
Amy42a5b4b2019-10-03 16:49:48 -0700598 int inputPacketSize = mInputSettings.packetSize;
Amya4885292019-09-06 10:30:53 -0700599 vector<uint8_t> dataOutputBuffer;
Amy42a5b4b2019-10-03 16:49:48 -0700600 dataOutputBuffer.resize(inputPacketSize);
Amya609d5a2019-08-23 14:38:31 -0700601
Amy42a5b4b2019-10-03 16:49:48 -0700602 // Dispatch the packet to the PID matching filter output buffer
603 for (int i = 0; i < size / inputPacketSize; i++) {
Amy5094ae12019-10-04 18:43:21 -0700604 if (!mInputMQ->read(dataOutputBuffer.data(), inputPacketSize)) {
605 return false;
606 }
607 startTsFilter(dataOutputBuffer);
608 }
609
610 return true;
611}
612
613void Demux::startTsFilter(vector<uint8_t> data) {
614 set<uint32_t>::iterator it;
615 for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
616 uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
617 ALOGW("start ts filter pid: %d", pid);
618 if (pid == mFilterPids[*it]) {
619 mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), data.begin(), data.end());
Amy42a5b4b2019-10-03 16:49:48 -0700620 }
621 }
Amy5094ae12019-10-04 18:43:21 -0700622}
623
624bool Demux::startFilterDispatcher() {
625 Result result;
626 set<uint32_t>::iterator it;
Amy42a5b4b2019-10-03 16:49:48 -0700627
628 // Handle the output data per filter type
Amya4885292019-09-06 10:30:53 -0700629 for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
630 switch (mFilterEvents[*it].filterType) {
631 case DemuxFilterType::SECTION:
Amy42a5b4b2019-10-03 16:49:48 -0700632 result = startSectionFilterHandler(*it);
Amya4885292019-09-06 10:30:53 -0700633 break;
634 case DemuxFilterType::PES:
635 result = startPesFilterHandler(*it);
636 break;
637 case DemuxFilterType::TS:
638 result = startTsFilterHandler();
639 break;
640 case DemuxFilterType::AUDIO:
641 case DemuxFilterType::VIDEO:
642 result = startMediaFilterHandler(*it);
643 break;
644 case DemuxFilterType::RECORD:
645 result = startRecordFilterHandler(*it);
646 break;
647 case DemuxFilterType::PCR:
648 result = startPcrFilterHandler();
649 break;
650 default:
651 return false;
652 }
Amya609d5a2019-08-23 14:38:31 -0700653 }
654
Amya4885292019-09-06 10:30:53 -0700655 return result == Result::SUCCESS;
Amya609d5a2019-08-23 14:38:31 -0700656}
657
Amya4885292019-09-06 10:30:53 -0700658void* Demux::__threadLoopFilter(void* threadArg) {
Amya609d5a2019-08-23 14:38:31 -0700659 Demux* const self = static_cast<Demux*>(((struct ThreadArgs*)threadArg)->user);
Amya4885292019-09-06 10:30:53 -0700660 self->filterThreadLoop(((struct ThreadArgs*)threadArg)->filterId);
Amya609d5a2019-08-23 14:38:31 -0700661 return 0;
662}
663
Amya4885292019-09-06 10:30:53 -0700664void* Demux::__threadLoopInput(void* user) {
665 Demux* const self = static_cast<Demux*>(user);
666 self->inputThreadLoop();
667 return 0;
668}
Amya609d5a2019-08-23 14:38:31 -0700669
Amya4885292019-09-06 10:30:53 -0700670void Demux::filterThreadLoop(uint32_t filterId) {
671 ALOGD("[Demux] filter %d threadLoop start.", filterId);
Amy5094ae12019-10-04 18:43:21 -0700672 std::lock_guard<std::mutex> lock(mFilterThreadLock);
Amya4885292019-09-06 10:30:53 -0700673 mFilterThreadRunning[filterId] = true;
674
675 // For the first time of filter output, implementation needs to send the filter
676 // Event Callback without waiting for the DATA_CONSUMED to init the process.
677 while (mFilterThreadRunning[filterId]) {
678 if (mFilterEvents[filterId].events.size() == 0) {
679 ALOGD("[Demux] wait for filter data output.");
680 usleep(1000 * 1000);
681 continue;
682 }
683 // After successfully write, send a callback and wait for the read to be done
Amy79125022019-10-10 15:30:17 -0700684 mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
Amya4885292019-09-06 10:30:53 -0700685 mFilterEvents[filterId].events.resize(0);
Amy79125022019-10-10 15:30:17 -0700686 mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
687 mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]);
Amya4885292019-09-06 10:30:53 -0700688 break;
689 }
690
691 while (mFilterThreadRunning[filterId]) {
Amya609d5a2019-08-23 14:38:31 -0700692 uint32_t efState = 0;
693 // We do not wait for the last round of writen data to be read to finish the thread
694 // because the VTS can verify the reading itself.
695 for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
Amya4885292019-09-06 10:30:53 -0700696 while (mFilterThreadRunning[filterId]) {
Amya609d5a2019-08-23 14:38:31 -0700697 status_t status = mFilterEventFlags[filterId]->wait(
698 static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
699 WAIT_TIMEOUT, true /* retry on spurious wake */);
700 if (status != OK) {
701 ALOGD("[Demux] wait for data consumed");
702 continue;
703 }
704 break;
705 }
Amya609d5a2019-08-23 14:38:31 -0700706
Amy79125022019-10-10 15:30:17 -0700707 if (mFilterCallbacks[filterId] == nullptr) {
Amya4885292019-09-06 10:30:53 -0700708 ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
709 break;
710 }
711
Amy79125022019-10-10 15:30:17 -0700712 maySendFilterStatusCallback(filterId);
713
Amya4885292019-09-06 10:30:53 -0700714 while (mFilterThreadRunning[filterId]) {
715 std::lock_guard<std::mutex> lock(mFilterEventLock);
716 if (mFilterEvents[filterId].events.size() == 0) {
717 continue;
718 }
719 // After successfully write, send a callback and wait for the read to be done
Amy79125022019-10-10 15:30:17 -0700720 mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
Amya4885292019-09-06 10:30:53 -0700721 mFilterEvents[filterId].events.resize(0);
722 break;
723 }
724 // We do not wait for the last read to be done
725 // VTS can verify the read result itself.
726 if (i == SECTION_WRITE_COUNT - 1) {
727 ALOGD("[Demux] filter %d writing done. Ending thread", filterId);
728 break;
729 }
730 }
731 mFilterThreadRunning[filterId] = false;
Amya609d5a2019-08-23 14:38:31 -0700732 }
733
734 ALOGD("[Demux] filter thread ended.");
Amyfd4243a2019-08-16 16:01:27 -0700735}
736
Amya4885292019-09-06 10:30:53 -0700737void Demux::inputThreadLoop() {
738 ALOGD("[Demux] input threadLoop start.");
Amy5094ae12019-10-04 18:43:21 -0700739 std::lock_guard<std::mutex> lock(mInputThreadLock);
Amya4885292019-09-06 10:30:53 -0700740 mInputThreadRunning = true;
741
742 while (mInputThreadRunning) {
743 uint32_t efState = 0;
744 status_t status =
745 mInputEventFlag->wait(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY),
746 &efState, WAIT_TIMEOUT, true /* retry on spurious wake */);
747 if (status != OK) {
748 ALOGD("[Demux] wait for data ready on the input FMQ");
749 continue;
750 }
Nick Chalkoe36b09b2019-10-04 17:06:40 -0700751 // Our current implementation filter the data and write it into the filter FMQ immediately
Amya4885292019-09-06 10:30:53 -0700752 // after the DATA_READY from the VTS/framework
Amy5094ae12019-10-04 18:43:21 -0700753 if (!readInputFMQ() || !startFilterDispatcher()) {
Amya4885292019-09-06 10:30:53 -0700754 ALOGD("[Demux] input data failed to be filtered. Ending thread");
755 break;
756 }
Amy42a5b4b2019-10-03 16:49:48 -0700757
758 maySendInputStatusCallback();
Amya4885292019-09-06 10:30:53 -0700759 }
760
761 mInputThreadRunning = false;
762 ALOGD("[Demux] input thread ended.");
763}
764
Amy42a5b4b2019-10-03 16:49:48 -0700765void Demux::maySendInputStatusCallback() {
766 std::lock_guard<std::mutex> lock(mInputStatusLock);
767 int availableToRead = mInputMQ->availableToRead();
768 int availableToWrite = mInputMQ->availableToWrite();
769
770 DemuxInputStatus newStatus =
Amy79125022019-10-10 15:30:17 -0700771 checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
772 mInputSettings.lowThreshold);
Amy42a5b4b2019-10-03 16:49:48 -0700773 if (mIntputStatus != newStatus) {
774 mInputCallback->onInputStatus(newStatus);
775 mIntputStatus = newStatus;
776 }
777}
778
Amy79125022019-10-10 15:30:17 -0700779void Demux::maySendFilterStatusCallback(uint32_t filterId) {
780 std::lock_guard<std::mutex> lock(mFilterStatusLock);
781 int availableToRead = mFilterMQs[filterId]->availableToRead();
782 int availableToWrite = mInputMQ->availableToWrite();
783 int fmqSize = mFilterMQs[filterId]->getQuantumCount();
784
785 DemuxFilterStatus newStatus =
786 checkFilterStatusChange(filterId, availableToWrite, availableToRead,
787 ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
788 if (mFilterStatus[filterId] != newStatus) {
789 mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus);
790 mFilterStatus[filterId] = newStatus;
791 }
792}
793
794DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
795 uint32_t highThreshold, uint32_t lowThreshold) {
Amy42a5b4b2019-10-03 16:49:48 -0700796 if (availableToWrite == 0) {
797 return DemuxInputStatus::SPACE_FULL;
798 } else if (availableToRead > highThreshold) {
799 return DemuxInputStatus::SPACE_ALMOST_FULL;
800 } else if (availableToRead < lowThreshold) {
801 return DemuxInputStatus::SPACE_ALMOST_EMPTY;
802 } else if (availableToRead == 0) {
803 return DemuxInputStatus::SPACE_EMPTY;
804 }
805 return mIntputStatus;
806}
807
Amy79125022019-10-10 15:30:17 -0700808DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite,
809 uint32_t availableToRead, uint32_t highThreshold,
810 uint32_t lowThreshold) {
811 if (availableToWrite == 0) {
812 return DemuxFilterStatus::OVERFLOW;
813 } else if (availableToRead > highThreshold) {
814 return DemuxFilterStatus::HIGH_WATER;
815 } else if (availableToRead < lowThreshold) {
816 return DemuxFilterStatus::LOW_WATER;
817 }
818 return mFilterStatus[filterId];
819}
820
Amy5094ae12019-10-04 18:43:21 -0700821Result Demux::startBroadcastInputLoop() {
822 pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this);
823 pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread");
824
825 return Result::SUCCESS;
826}
827
828void* Demux::__threadLoopBroadcast(void* user) {
829 Demux* const self = static_cast<Demux*>(user);
830 self->broadcastInputThreadLoop();
831 return 0;
832}
833
834void Demux::broadcastInputThreadLoop() {
835 std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
836 mBroadcastInputThreadRunning = true;
837 mKeepFetchingDataFromFrontend = true;
838
839 // open the stream and get its length
840 std::ifstream inputData(mFrontendSourceFile, std::ifstream::binary);
841 // TODO take the packet size from the frontend setting
842 int packetSize = 188;
843 int writePacketAmount = 6;
844 char* buffer = new char[packetSize];
845 ALOGW("[Demux] broadcast input thread loop start %s", mFrontendSourceFile.c_str());
846 if (!inputData.is_open()) {
847 mBroadcastInputThreadRunning = false;
848 ALOGW("[Demux] Error %s", strerror(errno));
849 }
850
851 while (mBroadcastInputThreadRunning) {
852 // move the stream pointer for packet size * 6 every read until the end
853 while (mKeepFetchingDataFromFrontend) {
854 for (int i = 0; i < writePacketAmount; i++) {
855 inputData.read(buffer, packetSize);
856 if (!inputData) {
857 mBroadcastInputThreadRunning = false;
858 break;
859 }
860 // filter and dispatch filter output
861 vector<uint8_t> byteBuffer;
Amy79125022019-10-10 15:30:17 -0700862 byteBuffer.resize(packetSize);
Amy5094ae12019-10-04 18:43:21 -0700863 for (int index = 0; index < byteBuffer.size(); index++) {
864 byteBuffer[index] = static_cast<uint8_t>(buffer[index]);
865 }
866 startTsFilter(byteBuffer);
867 inputData.seekg(packetSize, inputData.cur);
868 }
869 startFilterDispatcher();
870 sleep(1);
871 }
872 }
873
874 ALOGW("[Demux] Broadcast Input thread end.");
875 delete[] buffer;
876 inputData.close();
877}
878
879void Demux::stopBroadcastInput() {
880 mKeepFetchingDataFromFrontend = false;
881 mBroadcastInputThreadRunning = false;
882 std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
883}
884
Amyfd4243a2019-08-16 16:01:27 -0700885} // namespace implementation
886} // namespace V1_0
887} // namespace tuner
888} // namespace tv
889} // namespace hardware
890} // namespace android