blob: 8bb79f970257389330a36dc02fb0a8deff0bd511 [file] [log] [blame]
Amyfd4243a2019-08-16 16:01:27 -07001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "android.hardware.tv.tuner@1.0-Demux"
18
19#include "Demux.h"
20#include <utils/Log.h>
21
22namespace android {
23namespace hardware {
24namespace tv {
25namespace tuner {
26namespace V1_0 {
27namespace implementation {
28
Amya609d5a2019-08-23 14:38:31 -070029#define WAIT_TIMEOUT 3000000000
30
31const std::vector<uint8_t> fakeDataInputBuffer{
32 0x00, 0x00, 0x00, 0x01, 0x09, 0xf0, 0x00, 0x00, 0x00, 0x01, 0x67, 0x42, 0xc0, 0x1e, 0xdb,
33 0x01, 0x40, 0x16, 0xec, 0x04, 0x40, 0x00, 0x00, 0x03, 0x00, 0x40, 0x00, 0x00, 0x0f, 0x03,
34 0xc5, 0x8b, 0xb8, 0x00, 0x00, 0x00, 0x01, 0x68, 0xca, 0x8c, 0xb2, 0x00, 0x00, 0x01, 0x06,
35 0x05, 0xff, 0xff, 0x70, 0xdc, 0x45, 0xe9, 0xbd, 0xe6, 0xd9, 0x48, 0xb7, 0x96, 0x2c, 0xd8,
36 0x20, 0xd9, 0x23, 0xee, 0xef, 0x78, 0x32, 0x36, 0x34, 0x20, 0x2d, 0x20, 0x63, 0x6f, 0x72,
37 0x65, 0x20, 0x31, 0x34, 0x32, 0x20, 0x2d, 0x20, 0x48, 0x2e, 0x32, 0x36, 0x34, 0x2f, 0x4d,
38 0x50, 0x45, 0x47, 0x2d, 0x34, 0x20, 0x41, 0x56, 0x43, 0x20, 0x63, 0x6f, 0x64, 0x65, 0x63,
39 0x20, 0x2d, 0x20, 0x43, 0x6f, 0x70, 0x79, 0x6c, 0x65, 0x66, 0x74, 0x20, 0x32, 0x30, 0x30,
40 0x33, 0x2d, 0x32, 0x30, 0x31, 0x34, 0x20, 0x2d, 0x20, 0x68, 0x74, 0x74, 0x70, 0x3a, 0x2f,
41 0x2f, 0x77, 0x77, 0x77, 0x2e, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x6c, 0x61, 0x6e, 0x2e, 0x6f,
42 0x72, 0x67, 0x2f, 0x78, 0x32, 0x36, 0x34, 0x2e, 0x68, 0x74, 0x6d, 0x6c, 0x20, 0x2d, 0x20,
43 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x3a, 0x20, 0x63, 0x61, 0x62, 0x61, 0x63, 0x3d,
44 0x30, 0x20, 0x72, 0x65, 0x66, 0x3d, 0x32, 0x20, 0x64, 0x65, 0x62, 0x6c, 0x6f, 0x63, 0x6b,
45 0x3d, 0x31, 0x3a, 0x30, 0x3a, 0x30, 0x20, 0x61, 0x6e, 0x61, 0x6c, 0x79, 0x73, 0x65, 0x3d,
46 0x30, 0x78, 0x31, 0x3a, 0x30, 0x78, 0x31, 0x31, 0x31, 0x20, 0x6d, 0x65, 0x3d, 0x68, 0x65,
47 0x78, 0x20, 0x73, 0x75, 0x62, 0x6d, 0x65, 0x3d, 0x37, 0x20, 0x70, 0x73, 0x79, 0x3d, 0x31,
48 0x20, 0x70, 0x73, 0x79, 0x5f, 0x72, 0x64, 0x3d, 0x31, 0x2e, 0x30, 0x30, 0x3a, 0x30, 0x2e,
49 0x30, 0x30, 0x20, 0x6d, 0x69, 0x78, 0x65, 0x64, 0x5f, 0x72, 0x65, 0x66, 0x3d, 0x31, 0x20,
50 0x6d, 0x65, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x3d, 0x31, 0x36, 0x20, 0x63, 0x68, 0x72,
51 0x6f, 0x6d, 0x61, 0x5f, 0x6d, 0x65, 0x3d, 0x31, 0x20, 0x74, 0x72, 0x65, 0x6c, 0x6c, 0x69,
52 0x73, 0x3d, 0x31, 0x20, 0x38, 0x78, 0x38, 0x64, 0x63, 0x74, 0x3d, 0x30, 0x20, 0x63, 0x71,
53 0x6d, 0x3d, 0x30, 0x20, 0x64, 0x65, 0x61, 0x64, 0x7a, 0x6f, 0x6e, 0x65, 0x3d, 0x32, 0x31,
54 0x2c, 0x31, 0x31, 0x20, 0x66, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x73, 0x6b, 0x69, 0x70, 0x3d,
55 0x31, 0x20, 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x5f, 0x71, 0x70, 0x5f, 0x6f, 0x66, 0x66,
56 0x73, 0x65, 0x74, 0x3d, 0x2d, 0x32, 0x20, 0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d,
57 0x36, 0x30, 0x20, 0x6c, 0x6f, 0x6f, 0x6b, 0x61, 0x68, 0x65, 0x61, 0x64, 0x5f, 0x74, 0x68,
58 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d, 0x35, 0x20, 0x73, 0x6c, 0x69, 0x63, 0x65, 0x64, 0x5f,
59 0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x73, 0x3d, 0x30, 0x20, 0x6e, 0x72, 0x3d, 0x30, 0x20,
60 0x64, 0x65, 0x63, 0x69, 0x6d, 0x61, 0x74, 0x65, 0x3d, 0x31, 0x20, 0x69, 0x6e, 0x74, 0x65,
61 0x72, 0x6c, 0x61, 0x63, 0x65, 0x64, 0x3d, 0x30, 0x20, 0x62, 0x6c, 0x75, 0x72, 0x61, 0x79,
62 0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x3d, 0x30, 0x20, 0x63, 0x6f, 0x6e, 0x73, 0x74,
63 0x72, 0x61, 0x69, 0x6e, 0x65, 0x64, 0x5f, 0x69, 0x6e, 0x74, 0x72, 0x61, 0x3d, 0x30, 0x20,
64 0x62, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x73, 0x3d, 0x30, 0x20, 0x77, 0x65, 0x69, 0x67, 0x68,
65 0x74, 0x70, 0x3d, 0x30, 0x20, 0x6b, 0x65, 0x79, 0x69, 0x6e, 0x74, 0x3d, 0x32, 0x35, 0x30,
66 0x20, 0x6b, 0x65, 0x79, 0x69, 0x6e, 0x74, 0x5f, 0x6d, 0x69, 0x6e, 0x3d, 0x32, 0x35, 0x20,
67 0x73, 0x63, 0x65, 0x6e, 0x65,
68};
69
Amy5094ae12019-10-04 18:43:21 -070070Demux::Demux(uint32_t demuxId, sp<Tuner> tuner) {
Amyfd4243a2019-08-16 16:01:27 -070071 mDemuxId = demuxId;
Amy5094ae12019-10-04 18:43:21 -070072 mTunerService = tuner;
Amyfd4243a2019-08-16 16:01:27 -070073}
74
75Demux::~Demux() {}
76
77Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) {
78 ALOGV("%s", __FUNCTION__);
79
Amy5094ae12019-10-04 18:43:21 -070080 if (mTunerService == nullptr) {
81 return Result::NOT_INITIALIZED;
82 }
Amyfd4243a2019-08-16 16:01:27 -070083
Amy5094ae12019-10-04 18:43:21 -070084 mFrontend = mTunerService->getFrontendById(frontendId);
85
86 if (mFrontend == nullptr) {
87 return Result::INVALID_STATE;
88 }
89
90 mFrontendSourceFile = mFrontend->getSourceFile();
91
92 mTunerService->setFrontendAsDemuxSource(frontendId, mDemuxId);
93 return startBroadcastInputLoop();
Amyfd4243a2019-08-16 16:01:27 -070094}
95
Amya609d5a2019-08-23 14:38:31 -070096Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
97 const sp<IDemuxCallback>& cb, addFilter_cb _hidl_cb) {
98 ALOGV("%s", __FUNCTION__);
99
Amya4885292019-09-06 10:30:53 -0700100 uint32_t filterId;
101
102 if (!mUnusedFilterIds.empty()) {
103 filterId = *mUnusedFilterIds.begin();
104
105 mUnusedFilterIds.erase(filterId);
106 } else {
107 filterId = ++mLastUsedFilterId;
108
Amy79125022019-10-10 15:30:17 -0700109 mFilterCallbacks.resize(filterId + 1);
Amya4885292019-09-06 10:30:53 -0700110 mFilterMQs.resize(filterId + 1);
111 mFilterEvents.resize(filterId + 1);
112 mFilterEventFlags.resize(filterId + 1);
113 mFilterThreadRunning.resize(filterId + 1);
114 mFilterThreads.resize(filterId + 1);
Amy42a5b4b2019-10-03 16:49:48 -0700115 mFilterPids.resize(filterId + 1);
116 mFilterOutputs.resize(filterId + 1);
Amy79125022019-10-10 15:30:17 -0700117 mFilterStatus.resize(filterId + 1);
Amya4885292019-09-06 10:30:53 -0700118 }
119
120 mUsedFilterIds.insert(filterId);
Amya609d5a2019-08-23 14:38:31 -0700121
122 if ((type != DemuxFilterType::PCR || type != DemuxFilterType::TS) && cb == nullptr) {
123 ALOGW("callback can't be null");
124 _hidl_cb(Result::INVALID_ARGUMENT, filterId);
125 return Void();
126 }
Amya4885292019-09-06 10:30:53 -0700127
Amya609d5a2019-08-23 14:38:31 -0700128 // Add callback
Amy79125022019-10-10 15:30:17 -0700129 mFilterCallbacks[filterId] = cb;
Amya609d5a2019-08-23 14:38:31 -0700130
Amya4885292019-09-06 10:30:53 -0700131 // Mapping from the filter ID to the filter event
132 DemuxFilterEvent event{
133 .filterId = filterId,
134 .filterType = type,
135 };
136 mFilterEvents[filterId] = event;
Amya609d5a2019-08-23 14:38:31 -0700137
Amya4885292019-09-06 10:30:53 -0700138 if (!createFilterMQ(bufferSize, filterId)) {
Amya609d5a2019-08-23 14:38:31 -0700139 _hidl_cb(Result::UNKNOWN_ERROR, -1);
140 return Void();
141 }
142
143 _hidl_cb(Result::SUCCESS, filterId);
144 return Void();
145}
146
147Return<void> Demux::getFilterQueueDesc(uint32_t filterId, getFilterQueueDesc_cb _hidl_cb) {
148 ALOGV("%s", __FUNCTION__);
149
Amya4885292019-09-06 10:30:53 -0700150 if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
151 ALOGW("No filter with id: %d exists to get desc", filterId);
Amya609d5a2019-08-23 14:38:31 -0700152 _hidl_cb(Result::INVALID_ARGUMENT, FilterMQ::Descriptor());
153 return Void();
154 }
155
156 _hidl_cb(Result::SUCCESS, *mFilterMQs[filterId]->getDesc());
157 return Void();
158}
159
Amy42a5b4b2019-10-03 16:49:48 -0700160Return<Result> Demux::configureFilter(uint32_t filterId, const DemuxFilterSettings& settings) {
Amya609d5a2019-08-23 14:38:31 -0700161 ALOGV("%s", __FUNCTION__);
162
Amy42a5b4b2019-10-03 16:49:48 -0700163 switch (mFilterEvents[filterId].filterType) {
164 case DemuxFilterType::SECTION:
165 mFilterPids[filterId] = settings.section().tpid;
166 break;
167 case DemuxFilterType::PES:
168 mFilterPids[filterId] = settings.pesData().tpid;
169 break;
170 case DemuxFilterType::TS:
171 mFilterPids[filterId] = settings.ts().tpid;
172 break;
173 case DemuxFilterType::AUDIO:
174 mFilterPids[filterId] = settings.audio().tpid;
175 break;
176 case DemuxFilterType::VIDEO:
177 mFilterPids[filterId] = settings.video().tpid;
178 break;
179 case DemuxFilterType::RECORD:
180 mFilterPids[filterId] = settings.record().tpid;
181 break;
182 case DemuxFilterType::PCR:
183 mFilterPids[filterId] = settings.pcr().tpid;
184 break;
185 default:
186 return Result::UNKNOWN_ERROR;
187 }
Amya609d5a2019-08-23 14:38:31 -0700188 return Result::SUCCESS;
189}
190
191Return<Result> Demux::startFilter(uint32_t filterId) {
192 ALOGV("%s", __FUNCTION__);
Amya4885292019-09-06 10:30:53 -0700193 Result result;
Amya609d5a2019-08-23 14:38:31 -0700194
Amya4885292019-09-06 10:30:53 -0700195 if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
196 ALOGW("No filter with id: %d exists to start filter", filterId);
Amya609d5a2019-08-23 14:38:31 -0700197 return Result::INVALID_ARGUMENT;
198 }
199
Amy42a5b4b2019-10-03 16:49:48 -0700200 result = startFilterLoop(filterId);
Amya609d5a2019-08-23 14:38:31 -0700201
202 return result;
203}
204
Amy42a5b4b2019-10-03 16:49:48 -0700205Return<Result> Demux::stopFilter(uint32_t filterId) {
Amya609d5a2019-08-23 14:38:31 -0700206 ALOGV("%s", __FUNCTION__);
207
Amy42a5b4b2019-10-03 16:49:48 -0700208 mFilterThreadRunning[filterId] = false;
209
Amy5094ae12019-10-04 18:43:21 -0700210 std::lock_guard<std::mutex> lock(mFilterThreadLock);
211
Amya609d5a2019-08-23 14:38:31 -0700212 return Result::SUCCESS;
213}
214
Amy79125022019-10-10 15:30:17 -0700215Return<Result> Demux::flushFilter(uint32_t filterId) {
Amya609d5a2019-08-23 14:38:31 -0700216 ALOGV("%s", __FUNCTION__);
217
Amy79125022019-10-10 15:30:17 -0700218 // temp implementation to flush the FMQ
219 int size = mFilterMQs[filterId]->availableToRead();
220 char* buffer = new char[size];
221 mOutputMQ->read((unsigned char*)&buffer[0], size);
222 delete[] buffer;
223 mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
224
Amya609d5a2019-08-23 14:38:31 -0700225 return Result::SUCCESS;
226}
227
Amya4885292019-09-06 10:30:53 -0700228Return<Result> Demux::removeFilter(uint32_t filterId) {
Amya609d5a2019-08-23 14:38:31 -0700229 ALOGV("%s", __FUNCTION__);
230
Amya4885292019-09-06 10:30:53 -0700231 // resetFilterRecords(filterId);
232 mUsedFilterIds.erase(filterId);
233 mUnusedFilterIds.insert(filterId);
234
Amya609d5a2019-08-23 14:38:31 -0700235 return Result::SUCCESS;
236}
237
238Return<void> Demux::getAvSyncHwId(uint32_t /* filterId */, getAvSyncHwId_cb _hidl_cb) {
239 ALOGV("%s", __FUNCTION__);
240
241 AvSyncHwId avSyncHwId = 0;
242
243 _hidl_cb(Result::SUCCESS, avSyncHwId);
244 return Void();
245}
246
247Return<void> Demux::getAvSyncTime(AvSyncHwId /* avSyncHwId */, getAvSyncTime_cb _hidl_cb) {
248 ALOGV("%s", __FUNCTION__);
249
250 uint64_t avSyncTime = 0;
251
252 _hidl_cb(Result::SUCCESS, avSyncTime);
253 return Void();
254}
255
Amyfd4243a2019-08-16 16:01:27 -0700256Return<Result> Demux::close() {
257 ALOGV("%s", __FUNCTION__);
258
Amya4885292019-09-06 10:30:53 -0700259 set<uint32_t>::iterator it;
260 mInputThread = 0;
261 mOutputThread = 0;
262 mFilterThreads.clear();
263 mUnusedFilterIds.clear();
264 mUsedFilterIds.clear();
Amy79125022019-10-10 15:30:17 -0700265 mFilterCallbacks.clear();
Amya4885292019-09-06 10:30:53 -0700266 mFilterMQs.clear();
267 mFilterEvents.clear();
268 mFilterEventFlags.clear();
Amy42a5b4b2019-10-03 16:49:48 -0700269 mFilterOutputs.clear();
270 mFilterPids.clear();
Amya4885292019-09-06 10:30:53 -0700271 mLastUsedFilterId = -1;
272
Amyfd4243a2019-08-16 16:01:27 -0700273 return Result::SUCCESS;
Amya609d5a2019-08-23 14:38:31 -0700274}
275
Amya4885292019-09-06 10:30:53 -0700276Return<Result> Demux::addOutput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
277 ALOGV("%s", __FUNCTION__);
278
279 // Create a synchronized FMQ that supports blocking read/write
280 std::unique_ptr<FilterMQ> tmpFilterMQ =
281 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
282 if (!tmpFilterMQ->isValid()) {
283 ALOGW("Failed to create output FMQ");
284 return Result::UNKNOWN_ERROR;
Amya609d5a2019-08-23 14:38:31 -0700285 }
Amya4885292019-09-06 10:30:53 -0700286
287 mOutputMQ = std::move(tmpFilterMQ);
288
289 if (EventFlag::createEventFlag(mOutputMQ->getEventFlagWord(), &mOutputEventFlag) != OK) {
290 return Result::UNKNOWN_ERROR;
291 }
292
293 mOutputCallback = cb;
294
295 return Result::SUCCESS;
296}
297
298Return<void> Demux::getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) {
299 ALOGV("%s", __FUNCTION__);
300
301 if (!mOutputMQ) {
302 _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
303 return Void();
304 }
305
306 _hidl_cb(Result::SUCCESS, *mOutputMQ->getDesc());
307 return Void();
308}
309
Amy42a5b4b2019-10-03 16:49:48 -0700310Return<Result> Demux::configureOutput(const DemuxOutputSettings& settings) {
311 ALOGV("%s", __FUNCTION__);
312
313 mOutputConfigured = true;
314 mOutputSettings = settings;
315 return Result::SUCCESS;
316}
317
318Return<Result> Demux::attachOutputFilter(uint32_t /*filterId*/) {
Amya4885292019-09-06 10:30:53 -0700319 ALOGV("%s", __FUNCTION__);
320
321 return Result::SUCCESS;
322}
323
Amy42a5b4b2019-10-03 16:49:48 -0700324Return<Result> Demux::detachOutputFilter(uint32_t /* filterId */) {
Amya4885292019-09-06 10:30:53 -0700325 ALOGV("%s", __FUNCTION__);
326
327 return Result::SUCCESS;
328}
329
330Return<Result> Demux::startOutput() {
331 ALOGV("%s", __FUNCTION__);
332
333 return Result::SUCCESS;
334}
335
336Return<Result> Demux::stopOutput() {
337 ALOGV("%s", __FUNCTION__);
338
339 return Result::SUCCESS;
340}
341
342Return<Result> Demux::flushOutput() {
343 ALOGV("%s", __FUNCTION__);
344
345 return Result::SUCCESS;
346}
347
348Return<Result> Demux::removeOutput() {
349 ALOGV("%s", __FUNCTION__);
350
351 return Result::SUCCESS;
352}
353
354Return<Result> Demux::addInput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
355 ALOGV("%s", __FUNCTION__);
356
357 // Create a synchronized FMQ that supports blocking read/write
358 std::unique_ptr<FilterMQ> tmpInputMQ =
359 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
360 if (!tmpInputMQ->isValid()) {
361 ALOGW("Failed to create input FMQ");
362 return Result::UNKNOWN_ERROR;
363 }
364
365 mInputMQ = std::move(tmpInputMQ);
366
367 if (EventFlag::createEventFlag(mInputMQ->getEventFlagWord(), &mInputEventFlag) != OK) {
368 return Result::UNKNOWN_ERROR;
369 }
370
371 mInputCallback = cb;
372
373 return Result::SUCCESS;
374}
375
376Return<void> Demux::getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) {
377 ALOGV("%s", __FUNCTION__);
378
379 if (!mInputMQ) {
380 _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
381 return Void();
382 }
383
384 _hidl_cb(Result::SUCCESS, *mInputMQ->getDesc());
385 return Void();
386}
387
Amy42a5b4b2019-10-03 16:49:48 -0700388Return<Result> Demux::configureInput(const DemuxInputSettings& settings) {
Amya4885292019-09-06 10:30:53 -0700389 ALOGV("%s", __FUNCTION__);
390
Amy42a5b4b2019-10-03 16:49:48 -0700391 mInputConfigured = true;
392 mInputSettings = settings;
393
Amya4885292019-09-06 10:30:53 -0700394 return Result::SUCCESS;
395}
396
397Return<Result> Demux::startInput() {
398 ALOGV("%s", __FUNCTION__);
399
Amy42a5b4b2019-10-03 16:49:48 -0700400 if (!mInputCallback) {
401 return Result::NOT_INITIALIZED;
402 }
403
404 if (!mInputConfigured) {
405 return Result::INVALID_STATE;
406 }
407
Amya4885292019-09-06 10:30:53 -0700408 pthread_create(&mInputThread, NULL, __threadLoopInput, this);
409 pthread_setname_np(mInputThread, "demux_input_waiting_loop");
410
411 // TODO start another thread to send filter status callback to the framework
412
413 return Result::SUCCESS;
414}
415
416Return<Result> Demux::stopInput() {
417 ALOGV("%s", __FUNCTION__);
418
Amy42a5b4b2019-10-03 16:49:48 -0700419 mInputThreadRunning = false;
420
Amy5094ae12019-10-04 18:43:21 -0700421 std::lock_guard<std::mutex> lock(mInputThreadLock);
422
Amya4885292019-09-06 10:30:53 -0700423 return Result::SUCCESS;
424}
425
426Return<Result> Demux::flushInput() {
427 ALOGV("%s", __FUNCTION__);
428
429 return Result::SUCCESS;
430}
431
432Return<Result> Demux::removeInput() {
433 ALOGV("%s", __FUNCTION__);
434
435 mInputMQ = nullptr;
436
437 return Result::SUCCESS;
438}
439
440Result Demux::startFilterLoop(uint32_t filterId) {
441 struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs));
442 threadArgs->user = this;
443 threadArgs->filterId = filterId;
444
445 pthread_t mFilterThread;
446 pthread_create(&mFilterThread, NULL, __threadLoopFilter, (void*)threadArgs);
447 mFilterThreads[filterId] = mFilterThread;
448 pthread_setname_np(mFilterThread, "demux_filter_waiting_loop");
449
450 return Result::SUCCESS;
451}
452
Amy42a5b4b2019-10-03 16:49:48 -0700453Result Demux::startSectionFilterHandler(uint32_t filterId) {
454 if (mFilterOutputs[filterId].empty()) {
455 return Result::SUCCESS;
456 }
457 if (!writeSectionsAndCreateEvent(filterId, mFilterOutputs[filterId])) {
Amya4885292019-09-06 10:30:53 -0700458 ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
459 return Result::UNKNOWN_ERROR;
460 }
461
Amy42a5b4b2019-10-03 16:49:48 -0700462 mFilterOutputs[filterId].clear();
463
Amya4885292019-09-06 10:30:53 -0700464 return Result::SUCCESS;
465}
466
467Result Demux::startPesFilterHandler(uint32_t filterId) {
Amy42a5b4b2019-10-03 16:49:48 -0700468 std::lock_guard<std::mutex> lock(mFilterEventLock);
Amy42a5b4b2019-10-03 16:49:48 -0700469 if (mFilterOutputs[filterId].empty()) {
470 return Result::SUCCESS;
471 }
472
Amy5094ae12019-10-04 18:43:21 -0700473 for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) {
Amy1109e9f2019-10-10 18:30:28 -0700474 if (mPesSizeLeft == 0) {
475 uint32_t prefix = (mFilterOutputs[filterId][i + 4] << 16) |
476 (mFilterOutputs[filterId][i + 5] << 8) |
477 mFilterOutputs[filterId][i + 6];
478 ALOGD("[Demux] prefix %d", prefix);
479 if (prefix == 0x000001) {
480 // TODO handle mulptiple Pes filters
481 mPesSizeLeft =
shubang9a1e5aa2019-10-15 22:18:11 -0700482 (mFilterOutputs[filterId][i + 8] << 8) | mFilterOutputs[filterId][i + 9];
483 mPesSizeLeft += 6;
Amy1109e9f2019-10-10 18:30:28 -0700484 ALOGD("[Demux] pes data length %d", mPesSizeLeft);
485 } else {
486 continue;
Amy5094ae12019-10-04 18:43:21 -0700487 }
Amy5094ae12019-10-04 18:43:21 -0700488 }
Amy1109e9f2019-10-10 18:30:28 -0700489
490 int endPoint = min(184, mPesSizeLeft);
491 // append data and check size
492 vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4;
shubang9a1e5aa2019-10-15 22:18:11 -0700493 vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 4 + endPoint;
Amy1109e9f2019-10-10 18:30:28 -0700494 mPesOutput.insert(mPesOutput.end(), first, last);
495 // size does not match then continue
496 mPesSizeLeft -= endPoint;
497 if (mPesSizeLeft > 0) {
498 continue;
499 }
500 // size match then create event
501 if (!writeDataToFilterMQ(mPesOutput, filterId)) {
502 mFilterOutputs[filterId].clear();
503 return Result::INVALID_STATE;
504 }
505 maySendFilterStatusCallback(filterId);
506 DemuxFilterPesEvent pesEvent;
507 pesEvent = {
508 // temp dump meta data
509 .streamId = mPesOutput[3],
510 .dataLength = static_cast<uint16_t>(mPesOutput.size()),
511 };
512 ALOGD("[Demux] assembled pes data length %d", pesEvent.dataLength);
513
514 int size = mFilterEvents[filterId].events.size();
515 mFilterEvents[filterId].events.resize(size + 1);
516 mFilterEvents[filterId].events[size].pes(pesEvent);
517 mPesOutput.clear();
Amy42a5b4b2019-10-03 16:49:48 -0700518 }
Amya4885292019-09-06 10:30:53 -0700519
Amy42a5b4b2019-10-03 16:49:48 -0700520 mFilterOutputs[filterId].clear();
Amya4885292019-09-06 10:30:53 -0700521
Amya4885292019-09-06 10:30:53 -0700522 return Result::SUCCESS;
523}
524
525Result Demux::startTsFilterHandler() {
526 // TODO handle starting TS filter
527 return Result::SUCCESS;
528}
529
530Result Demux::startMediaFilterHandler(uint32_t filterId) {
531 DemuxFilterMediaEvent mediaEvent;
532 mediaEvent = {
533 // temp dump meta data
534 .pts = 0,
535 .dataLength = 530,
536 .secureMemory = nullptr,
537 };
538 mFilterEvents[filterId].events.resize(1);
539 mFilterEvents[filterId].events[0].media() = mediaEvent;
Amy5094ae12019-10-04 18:43:21 -0700540
541 mFilterOutputs[filterId].clear();
Amya4885292019-09-06 10:30:53 -0700542 // TODO handle write FQM for media stream
543 return Result::SUCCESS;
544}
545
546Result Demux::startRecordFilterHandler(uint32_t filterId) {
547 DemuxFilterRecordEvent recordEvent;
548 recordEvent = {
549 // temp dump meta data
550 .tpid = 0,
551 .packetNum = 0,
552 };
553 recordEvent.indexMask.tsIndexMask() = 0x01;
554 mFilterEvents[filterId].events.resize(1);
555 mFilterEvents[filterId].events[0].ts() = recordEvent;
Amy5094ae12019-10-04 18:43:21 -0700556
557 mFilterOutputs[filterId].clear();
Amya4885292019-09-06 10:30:53 -0700558 return Result::SUCCESS;
559}
560
561Result Demux::startPcrFilterHandler() {
562 // TODO handle starting PCR filter
563 return Result::SUCCESS;
564}
565
566bool Demux::createFilterMQ(uint32_t bufferSize, uint32_t filterId) {
567 ALOGV("%s", __FUNCTION__);
568
569 // Create a synchronized FMQ that supports blocking read/write
570 std::unique_ptr<FilterMQ> tmpFilterMQ =
571 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
572 if (!tmpFilterMQ->isValid()) {
573 ALOGW("Failed to create FMQ of filter with id: %d", filterId);
574 return false;
575 }
576
577 mFilterMQs[filterId] = std::move(tmpFilterMQ);
578
579 EventFlag* filterEventFlag;
580 if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &filterEventFlag) !=
581 OK) {
582 return false;
583 }
584 mFilterEventFlags[filterId] = filterEventFlag;
585
586 return true;
587}
588
589bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
590 // TODO check how many sections has been read
591 std::lock_guard<std::mutex> lock(mFilterEventLock);
Amya4885292019-09-06 10:30:53 -0700592 if (!writeDataToFilterMQ(data, filterId)) {
593 return false;
594 }
Amy42a5b4b2019-10-03 16:49:48 -0700595 int size = mFilterEvents[filterId].events.size();
596 mFilterEvents[filterId].events.resize(size + 1);
Amya4885292019-09-06 10:30:53 -0700597 DemuxFilterSectionEvent secEvent;
598 secEvent = {
599 // temp dump meta data
600 .tableId = 0,
601 .version = 1,
602 .sectionNum = 1,
Amy42a5b4b2019-10-03 16:49:48 -0700603 .dataLength = static_cast<uint16_t>(data.size()),
Amya4885292019-09-06 10:30:53 -0700604 };
605 mFilterEvents[filterId].events[size].section(secEvent);
Amya609d5a2019-08-23 14:38:31 -0700606 return true;
607}
608
609bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId) {
610 std::lock_guard<std::mutex> lock(mWriteLock);
611 if (mFilterMQs[filterId]->write(data.data(), data.size())) {
612 return true;
613 }
614 return false;
615}
616
Amy5094ae12019-10-04 18:43:21 -0700617bool Demux::readInputFMQ() {
Amya4885292019-09-06 10:30:53 -0700618 // Read input data from the input FMQ
619 int size = mInputMQ->availableToRead();
Amy42a5b4b2019-10-03 16:49:48 -0700620 int inputPacketSize = mInputSettings.packetSize;
Amya4885292019-09-06 10:30:53 -0700621 vector<uint8_t> dataOutputBuffer;
Amy42a5b4b2019-10-03 16:49:48 -0700622 dataOutputBuffer.resize(inputPacketSize);
Amya609d5a2019-08-23 14:38:31 -0700623
Amy42a5b4b2019-10-03 16:49:48 -0700624 // Dispatch the packet to the PID matching filter output buffer
625 for (int i = 0; i < size / inputPacketSize; i++) {
Amy5094ae12019-10-04 18:43:21 -0700626 if (!mInputMQ->read(dataOutputBuffer.data(), inputPacketSize)) {
627 return false;
628 }
629 startTsFilter(dataOutputBuffer);
630 }
631
632 return true;
633}
634
635void Demux::startTsFilter(vector<uint8_t> data) {
636 set<uint32_t>::iterator it;
637 for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
638 uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
639 ALOGW("start ts filter pid: %d", pid);
640 if (pid == mFilterPids[*it]) {
641 mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), data.begin(), data.end());
Amy42a5b4b2019-10-03 16:49:48 -0700642 }
643 }
Amy5094ae12019-10-04 18:43:21 -0700644}
645
646bool Demux::startFilterDispatcher() {
647 Result result;
648 set<uint32_t>::iterator it;
Amy42a5b4b2019-10-03 16:49:48 -0700649
650 // Handle the output data per filter type
Amya4885292019-09-06 10:30:53 -0700651 for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
652 switch (mFilterEvents[*it].filterType) {
653 case DemuxFilterType::SECTION:
Amy42a5b4b2019-10-03 16:49:48 -0700654 result = startSectionFilterHandler(*it);
Amya4885292019-09-06 10:30:53 -0700655 break;
656 case DemuxFilterType::PES:
657 result = startPesFilterHandler(*it);
658 break;
659 case DemuxFilterType::TS:
660 result = startTsFilterHandler();
661 break;
662 case DemuxFilterType::AUDIO:
663 case DemuxFilterType::VIDEO:
664 result = startMediaFilterHandler(*it);
665 break;
666 case DemuxFilterType::RECORD:
667 result = startRecordFilterHandler(*it);
668 break;
669 case DemuxFilterType::PCR:
670 result = startPcrFilterHandler();
671 break;
672 default:
673 return false;
674 }
Amya609d5a2019-08-23 14:38:31 -0700675 }
676
Amya4885292019-09-06 10:30:53 -0700677 return result == Result::SUCCESS;
Amya609d5a2019-08-23 14:38:31 -0700678}
679
Amya4885292019-09-06 10:30:53 -0700680void* Demux::__threadLoopFilter(void* threadArg) {
Amya609d5a2019-08-23 14:38:31 -0700681 Demux* const self = static_cast<Demux*>(((struct ThreadArgs*)threadArg)->user);
Amya4885292019-09-06 10:30:53 -0700682 self->filterThreadLoop(((struct ThreadArgs*)threadArg)->filterId);
Amya609d5a2019-08-23 14:38:31 -0700683 return 0;
684}
685
Amya4885292019-09-06 10:30:53 -0700686void* Demux::__threadLoopInput(void* user) {
687 Demux* const self = static_cast<Demux*>(user);
688 self->inputThreadLoop();
689 return 0;
690}
Amya609d5a2019-08-23 14:38:31 -0700691
Amya4885292019-09-06 10:30:53 -0700692void Demux::filterThreadLoop(uint32_t filterId) {
693 ALOGD("[Demux] filter %d threadLoop start.", filterId);
Amy5094ae12019-10-04 18:43:21 -0700694 std::lock_guard<std::mutex> lock(mFilterThreadLock);
Amya4885292019-09-06 10:30:53 -0700695 mFilterThreadRunning[filterId] = true;
696
697 // For the first time of filter output, implementation needs to send the filter
698 // Event Callback without waiting for the DATA_CONSUMED to init the process.
699 while (mFilterThreadRunning[filterId]) {
700 if (mFilterEvents[filterId].events.size() == 0) {
701 ALOGD("[Demux] wait for filter data output.");
702 usleep(1000 * 1000);
703 continue;
704 }
705 // After successfully write, send a callback and wait for the read to be done
Amy79125022019-10-10 15:30:17 -0700706 mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
Amya4885292019-09-06 10:30:53 -0700707 mFilterEvents[filterId].events.resize(0);
Amy79125022019-10-10 15:30:17 -0700708 mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
709 mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]);
Amya4885292019-09-06 10:30:53 -0700710 break;
711 }
712
713 while (mFilterThreadRunning[filterId]) {
Amya609d5a2019-08-23 14:38:31 -0700714 uint32_t efState = 0;
715 // We do not wait for the last round of writen data to be read to finish the thread
716 // because the VTS can verify the reading itself.
717 for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
Amya4885292019-09-06 10:30:53 -0700718 while (mFilterThreadRunning[filterId]) {
Amya609d5a2019-08-23 14:38:31 -0700719 status_t status = mFilterEventFlags[filterId]->wait(
720 static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
721 WAIT_TIMEOUT, true /* retry on spurious wake */);
722 if (status != OK) {
723 ALOGD("[Demux] wait for data consumed");
724 continue;
725 }
726 break;
727 }
Amya609d5a2019-08-23 14:38:31 -0700728
Amy79125022019-10-10 15:30:17 -0700729 if (mFilterCallbacks[filterId] == nullptr) {
Amya4885292019-09-06 10:30:53 -0700730 ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
731 break;
732 }
733
Amy79125022019-10-10 15:30:17 -0700734 maySendFilterStatusCallback(filterId);
735
Amya4885292019-09-06 10:30:53 -0700736 while (mFilterThreadRunning[filterId]) {
737 std::lock_guard<std::mutex> lock(mFilterEventLock);
738 if (mFilterEvents[filterId].events.size() == 0) {
739 continue;
740 }
741 // After successfully write, send a callback and wait for the read to be done
Amy79125022019-10-10 15:30:17 -0700742 mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
Amya4885292019-09-06 10:30:53 -0700743 mFilterEvents[filterId].events.resize(0);
744 break;
745 }
746 // We do not wait for the last read to be done
747 // VTS can verify the read result itself.
748 if (i == SECTION_WRITE_COUNT - 1) {
749 ALOGD("[Demux] filter %d writing done. Ending thread", filterId);
750 break;
751 }
752 }
753 mFilterThreadRunning[filterId] = false;
Amya609d5a2019-08-23 14:38:31 -0700754 }
755
756 ALOGD("[Demux] filter thread ended.");
Amyfd4243a2019-08-16 16:01:27 -0700757}
758
Amya4885292019-09-06 10:30:53 -0700759void Demux::inputThreadLoop() {
760 ALOGD("[Demux] input threadLoop start.");
Amy5094ae12019-10-04 18:43:21 -0700761 std::lock_guard<std::mutex> lock(mInputThreadLock);
Amya4885292019-09-06 10:30:53 -0700762 mInputThreadRunning = true;
763
764 while (mInputThreadRunning) {
765 uint32_t efState = 0;
766 status_t status =
767 mInputEventFlag->wait(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY),
768 &efState, WAIT_TIMEOUT, true /* retry on spurious wake */);
769 if (status != OK) {
770 ALOGD("[Demux] wait for data ready on the input FMQ");
771 continue;
772 }
Nick Chalkoe36b09b2019-10-04 17:06:40 -0700773 // Our current implementation filter the data and write it into the filter FMQ immediately
Amya4885292019-09-06 10:30:53 -0700774 // after the DATA_READY from the VTS/framework
Amy5094ae12019-10-04 18:43:21 -0700775 if (!readInputFMQ() || !startFilterDispatcher()) {
Amya4885292019-09-06 10:30:53 -0700776 ALOGD("[Demux] input data failed to be filtered. Ending thread");
777 break;
778 }
Amy42a5b4b2019-10-03 16:49:48 -0700779
780 maySendInputStatusCallback();
Amya4885292019-09-06 10:30:53 -0700781 }
782
783 mInputThreadRunning = false;
784 ALOGD("[Demux] input thread ended.");
785}
786
Amy42a5b4b2019-10-03 16:49:48 -0700787void Demux::maySendInputStatusCallback() {
788 std::lock_guard<std::mutex> lock(mInputStatusLock);
789 int availableToRead = mInputMQ->availableToRead();
790 int availableToWrite = mInputMQ->availableToWrite();
791
792 DemuxInputStatus newStatus =
Amy79125022019-10-10 15:30:17 -0700793 checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
794 mInputSettings.lowThreshold);
Amy42a5b4b2019-10-03 16:49:48 -0700795 if (mIntputStatus != newStatus) {
796 mInputCallback->onInputStatus(newStatus);
797 mIntputStatus = newStatus;
798 }
799}
800
Amy79125022019-10-10 15:30:17 -0700801void Demux::maySendFilterStatusCallback(uint32_t filterId) {
802 std::lock_guard<std::mutex> lock(mFilterStatusLock);
803 int availableToRead = mFilterMQs[filterId]->availableToRead();
shubang9a1e5aa2019-10-15 22:18:11 -0700804 int availableToWrite = mFilterMQs[filterId]->availableToWrite();
Amy79125022019-10-10 15:30:17 -0700805 int fmqSize = mFilterMQs[filterId]->getQuantumCount();
806
807 DemuxFilterStatus newStatus =
808 checkFilterStatusChange(filterId, availableToWrite, availableToRead,
809 ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
810 if (mFilterStatus[filterId] != newStatus) {
811 mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus);
812 mFilterStatus[filterId] = newStatus;
813 }
814}
815
816DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
817 uint32_t highThreshold, uint32_t lowThreshold) {
Amy42a5b4b2019-10-03 16:49:48 -0700818 if (availableToWrite == 0) {
819 return DemuxInputStatus::SPACE_FULL;
820 } else if (availableToRead > highThreshold) {
821 return DemuxInputStatus::SPACE_ALMOST_FULL;
822 } else if (availableToRead < lowThreshold) {
823 return DemuxInputStatus::SPACE_ALMOST_EMPTY;
824 } else if (availableToRead == 0) {
825 return DemuxInputStatus::SPACE_EMPTY;
826 }
827 return mIntputStatus;
828}
829
Amy79125022019-10-10 15:30:17 -0700830DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite,
831 uint32_t availableToRead, uint32_t highThreshold,
832 uint32_t lowThreshold) {
833 if (availableToWrite == 0) {
834 return DemuxFilterStatus::OVERFLOW;
835 } else if (availableToRead > highThreshold) {
836 return DemuxFilterStatus::HIGH_WATER;
837 } else if (availableToRead < lowThreshold) {
838 return DemuxFilterStatus::LOW_WATER;
839 }
840 return mFilterStatus[filterId];
841}
842
Amy5094ae12019-10-04 18:43:21 -0700843Result Demux::startBroadcastInputLoop() {
844 pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this);
845 pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread");
846
847 return Result::SUCCESS;
848}
849
850void* Demux::__threadLoopBroadcast(void* user) {
851 Demux* const self = static_cast<Demux*>(user);
852 self->broadcastInputThreadLoop();
853 return 0;
854}
855
856void Demux::broadcastInputThreadLoop() {
857 std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
858 mBroadcastInputThreadRunning = true;
859 mKeepFetchingDataFromFrontend = true;
860
861 // open the stream and get its length
862 std::ifstream inputData(mFrontendSourceFile, std::ifstream::binary);
863 // TODO take the packet size from the frontend setting
864 int packetSize = 188;
865 int writePacketAmount = 6;
866 char* buffer = new char[packetSize];
867 ALOGW("[Demux] broadcast input thread loop start %s", mFrontendSourceFile.c_str());
868 if (!inputData.is_open()) {
869 mBroadcastInputThreadRunning = false;
870 ALOGW("[Demux] Error %s", strerror(errno));
871 }
872
873 while (mBroadcastInputThreadRunning) {
874 // move the stream pointer for packet size * 6 every read until the end
875 while (mKeepFetchingDataFromFrontend) {
876 for (int i = 0; i < writePacketAmount; i++) {
877 inputData.read(buffer, packetSize);
878 if (!inputData) {
879 mBroadcastInputThreadRunning = false;
880 break;
881 }
882 // filter and dispatch filter output
883 vector<uint8_t> byteBuffer;
Amy79125022019-10-10 15:30:17 -0700884 byteBuffer.resize(packetSize);
Amy5094ae12019-10-04 18:43:21 -0700885 for (int index = 0; index < byteBuffer.size(); index++) {
886 byteBuffer[index] = static_cast<uint8_t>(buffer[index]);
887 }
888 startTsFilter(byteBuffer);
Amy5094ae12019-10-04 18:43:21 -0700889 }
890 startFilterDispatcher();
891 sleep(1);
892 }
893 }
894
895 ALOGW("[Demux] Broadcast Input thread end.");
896 delete[] buffer;
897 inputData.close();
898}
899
900void Demux::stopBroadcastInput() {
901 mKeepFetchingDataFromFrontend = false;
902 mBroadcastInputThreadRunning = false;
903 std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock);
904}
905
Amyfd4243a2019-08-16 16:01:27 -0700906} // namespace implementation
907} // namespace V1_0
908} // namespace tuner
909} // namespace tv
910} // namespace hardware
911} // namespace android