blob: d6c3b0b58fb331cd677231512cabfbccc08f3ca1 [file] [log] [blame]
Pierre Ossman9f273e92015-11-09 16:34:54 +01001/* Copyright 2015 Pierre Ossman for Cendio AB
2 *
3 * This is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This software is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this software; if not, write to the Free Software
15 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
16 * USA.
17 */
18
19#include <assert.h>
20#include <string.h>
21
Pierre Ossman86350622015-11-10 13:02:12 +010022#include <rfb/CConnection.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010023#include <rfb/DecodeManager.h>
24#include <rfb/Decoder.h>
Pierre Ossman14127892015-11-12 13:17:42 +010025#include <rfb/Region.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010026
27#include <rfb/LogWriter.h>
28
29#include <rdr/Exception.h>
Pierre Ossman80b42092015-11-10 17:17:34 +010030#include <rdr/MemOutStream.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010031
Pierre Ossman504afa22015-11-12 12:21:58 +010032#include <os/Mutex.h>
33
Pierre Ossman9f273e92015-11-09 16:34:54 +010034using namespace rfb;
35
36static LogWriter vlog("DecodeManager");
37
38DecodeManager::DecodeManager(CConnection *conn) :
Pierre Ossman05604652015-11-17 09:37:57 +010039 conn(conn), threadException(NULL)
Pierre Ossman9f273e92015-11-09 16:34:54 +010040{
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010041 size_t cpuCount;
Pierre Ossman504afa22015-11-12 12:21:58 +010042
Pierre Ossman9f273e92015-11-09 16:34:54 +010043 memset(decoders, 0, sizeof(decoders));
Pierre Ossman504afa22015-11-12 12:21:58 +010044
45 queueMutex = new os::Mutex();
46 producerCond = new os::Condition(queueMutex);
47 consumerCond = new os::Condition(queueMutex);
48
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010049 cpuCount = os::Thread::getSystemCPUCount();
50 if (cpuCount == 0) {
51 vlog.error("Unable to determine the number of CPU cores on this system");
52 cpuCount = 1;
53 } else {
54 vlog.info("Detected %d CPU core(s) available for decoding", (int)cpuCount);
55 }
56
57 while (cpuCount--) {
Pierre Ossman504afa22015-11-12 12:21:58 +010058 // Twice as many possible entries in the queue as there
59 // are worker threads to make sure they don't stall
60 freeBuffers.push_back(new rdr::MemOutStream());
61 freeBuffers.push_back(new rdr::MemOutStream());
62
63 threads.push_back(new DecodeThread(this));
64 }
Pierre Ossman9f273e92015-11-09 16:34:54 +010065}
66
67DecodeManager::~DecodeManager()
68{
Pierre Ossman504afa22015-11-12 12:21:58 +010069 while (!threads.empty()) {
70 delete threads.back();
71 threads.pop_back();
72 }
73
Pierre Ossman05604652015-11-17 09:37:57 +010074 delete threadException;
75
Pierre Ossman504afa22015-11-12 12:21:58 +010076 while (!freeBuffers.empty()) {
77 delete freeBuffers.back();
78 freeBuffers.pop_back();
79 }
80
81 delete consumerCond;
82 delete producerCond;
83 delete queueMutex;
84
Pierre Ossman9f273e92015-11-09 16:34:54 +010085 for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
86 delete decoders[i];
87}
88
89void DecodeManager::decodeRect(const Rect& r, int encoding,
90 ModifiablePixelBuffer* pb)
91{
Pierre Ossman504afa22015-11-12 12:21:58 +010092 Decoder *decoder;
93 rdr::MemOutStream *bufferStream;
94
95 QueueEntry *entry;
96
Pierre Ossman9f273e92015-11-09 16:34:54 +010097 assert(pb != NULL);
98
99 if (!Decoder::supported(encoding)) {
100 vlog.error("Unknown encoding %d", encoding);
101 throw rdr::Exception("Unknown encoding");
102 }
103
104 if (!decoders[encoding]) {
Pierre Ossman86350622015-11-10 13:02:12 +0100105 decoders[encoding] = Decoder::createDecoder(encoding);
Pierre Ossman9f273e92015-11-09 16:34:54 +0100106 if (!decoders[encoding]) {
107 vlog.error("Unknown encoding %d", encoding);
108 throw rdr::Exception("Unknown encoding");
109 }
110 }
Pierre Ossman80b42092015-11-10 17:17:34 +0100111
Pierre Ossman504afa22015-11-12 12:21:58 +0100112 decoder = decoders[encoding];
113
114 // Wait for an available memory buffer
115 queueMutex->lock();
116
117 while (freeBuffers.empty())
118 producerCond->wait();
119
120 // Don't pop the buffer in case we throw an exception
121 // whilst reading
122 bufferStream = freeBuffers.front();
123
124 queueMutex->unlock();
125
Pierre Ossman05604652015-11-17 09:37:57 +0100126 // First check if any thread has encountered a problem
127 throwThreadException();
128
Pierre Ossman504afa22015-11-12 12:21:58 +0100129 // Read the rect
Pierre Ossman80b42092015-11-10 17:17:34 +0100130 bufferStream->clear();
Pierre Ossman504afa22015-11-12 12:21:58 +0100131 decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);
132
133 // Then try to put it on the queue
134 entry = new QueueEntry;
135
136 entry->active = false;
137 entry->rect = r;
138 entry->encoding = encoding;
139 entry->decoder = decoder;
140 entry->cp = &conn->cp;
141 entry->pb = pb;
142 entry->bufferStream = bufferStream;
143
Pierre Ossman14127892015-11-12 13:17:42 +0100144 decoder->getAffectedRegion(r, bufferStream->data(),
145 bufferStream->length(), conn->cp,
146 &entry->affectedRegion);
147
Pierre Ossman504afa22015-11-12 12:21:58 +0100148 queueMutex->lock();
149
150 // The workers add buffers to the end so it's safe to assume
151 // the front is still the same buffer
152 freeBuffers.pop_front();
153
154 workQueue.push_back(entry);
155
156 // We only put a single entry on the queue so waking a single
157 // thread is sufficient
158 consumerCond->signal();
159
160 queueMutex->unlock();
161}
162
163void DecodeManager::flush()
164{
165 queueMutex->lock();
166
167 while (!workQueue.empty())
168 producerCond->wait();
169
170 queueMutex->unlock();
Pierre Ossman05604652015-11-17 09:37:57 +0100171
172 throwThreadException();
173}
174
175void DecodeManager::setThreadException(const rdr::Exception& e)
176{
177 os::AutoMutex a(queueMutex);
178
179 if (threadException == NULL)
180 return;
181
182 threadException = new rdr::Exception("Exception on worker thread: %s", e.str());
183}
184
185void DecodeManager::throwThreadException()
186{
187 os::AutoMutex a(queueMutex);
188
189 if (threadException == NULL)
190 return;
191
192 rdr::Exception e(*threadException);
193
194 delete threadException;
195 threadException = NULL;
196
197 throw e;
Pierre Ossman504afa22015-11-12 12:21:58 +0100198}
199
200DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
201{
202 this->manager = manager;
203
204 stopRequested = false;
205
206 start();
207}
208
209DecodeManager::DecodeThread::~DecodeThread()
210{
211 stop();
212 wait();
213}
214
215void DecodeManager::DecodeThread::stop()
216{
217 os::AutoMutex a(manager->queueMutex);
218
219 if (!isRunning())
220 return;
221
222 stopRequested = true;
223
224 // We can't wake just this thread, so wake everyone
225 manager->consumerCond->broadcast();
226}
227
228void DecodeManager::DecodeThread::worker()
229{
230 manager->queueMutex->lock();
231
232 while (!stopRequested) {
233 DecodeManager::QueueEntry *entry;
234
235 // Look for an available entry in the work queue
236 entry = findEntry();
237 if (entry == NULL) {
238 // Wait and try again
239 manager->consumerCond->wait();
240 continue;
241 }
242
243 // This is ours now
244 entry->active = true;
245
246 manager->queueMutex->unlock();
247
248 // Do the actual decoding
249 try {
250 entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
251 entry->bufferStream->length(),
252 *entry->cp, entry->pb);
Pierre Ossman05604652015-11-17 09:37:57 +0100253 } catch (rdr::Exception e) {
254 manager->setThreadException(e);
Pierre Ossman504afa22015-11-12 12:21:58 +0100255 } catch(...) {
Pierre Ossman504afa22015-11-12 12:21:58 +0100256 assert(false);
257 }
258
259 manager->queueMutex->lock();
260
261 // Remove the entry from the queue and give back the memory buffer
262 manager->freeBuffers.push_back(entry->bufferStream);
263 manager->workQueue.remove(entry);
264 delete entry;
265
266 // Wake the main thread in case it is waiting for a memory buffer
267 manager->producerCond->signal();
268 // This rect might have been blocking multiple other rects, so
269 // wake up every worker thread
270 if (manager->workQueue.size() > 1)
271 manager->consumerCond->broadcast();
272 }
273
274 manager->queueMutex->unlock();
275}
276
277DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
278{
279 std::list<DecodeManager::QueueEntry*>::iterator iter;
Pierre Ossman14127892015-11-12 13:17:42 +0100280 Region lockedRegion;
Pierre Ossman504afa22015-11-12 12:21:58 +0100281
282 if (manager->workQueue.empty())
283 return NULL;
284
285 if (!manager->workQueue.front()->active)
286 return manager->workQueue.front();
287
288 for (iter = manager->workQueue.begin();
289 iter != manager->workQueue.end();
290 ++iter) {
Pierre Ossman14127892015-11-12 13:17:42 +0100291 DecodeManager::QueueEntry* entry;
292
Pierre Ossmana862add2015-11-12 13:18:22 +0100293 std::list<DecodeManager::QueueEntry*>::iterator iter2;
294
Pierre Ossman14127892015-11-12 13:17:42 +0100295 entry = *iter;
296
Pierre Ossman504afa22015-11-12 12:21:58 +0100297 // Another thread working on this?
Pierre Ossman14127892015-11-12 13:17:42 +0100298 if (entry->active)
299 goto next;
300
Pierre Ossmana862add2015-11-12 13:18:22 +0100301 // If this is an ordered decoder then make sure this is the first
302 // rectangle in the queue for that decoder
303 if (entry->decoder->flags & DecoderOrdered) {
304 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
305 if (entry->encoding == (*iter2)->encoding)
306 goto next;
307 }
308 }
309
Pierre Ossmane6ad4452015-11-13 10:47:28 +0100310 // For a partially ordered decoder we must ask the decoder for each
311 // pair of rectangles.
312 if (entry->decoder->flags & DecoderPartiallyOrdered) {
313 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
314 if (entry->encoding != (*iter2)->encoding)
315 continue;
316 if (entry->decoder->doRectsConflict(entry->rect,
317 entry->bufferStream->data(),
318 entry->bufferStream->length(),
319 (*iter2)->rect,
320 (*iter2)->bufferStream->data(),
321 (*iter2)->bufferStream->length(),
322 *entry->cp))
323 goto next;
324 }
325 }
326
Pierre Ossman14127892015-11-12 13:17:42 +0100327 // Check overlap with earlier rectangles
328 if (!lockedRegion.intersect(entry->affectedRegion).is_empty())
329 goto next;
Pierre Ossman504afa22015-11-12 12:21:58 +0100330
Pierre Ossman14127892015-11-12 13:17:42 +0100331 return entry;
332
333next:
334 lockedRegion.assign_union(entry->affectedRegion);
Pierre Ossman504afa22015-11-12 12:21:58 +0100335 }
336
337 return NULL;
Pierre Ossman9f273e92015-11-09 16:34:54 +0100338}