blob: 98b6e790401d2c6c3263f8e9ace3ce2ad87ef5e3 [file] [log] [blame]
Pierre Ossman9f273e92015-11-09 16:34:54 +01001/* Copyright 2015 Pierre Ossman for Cendio AB
2 *
3 * This is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This software is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this software; if not, write to the Free Software
15 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
16 * USA.
17 */
18
19#include <assert.h>
20#include <string.h>
21
Pierre Ossman86350622015-11-10 13:02:12 +010022#include <rfb/CConnection.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010023#include <rfb/DecodeManager.h>
24#include <rfb/Decoder.h>
Pierre Ossman14127892015-11-12 13:17:42 +010025#include <rfb/Region.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010026
27#include <rfb/LogWriter.h>
28
29#include <rdr/Exception.h>
Pierre Ossman80b42092015-11-10 17:17:34 +010030#include <rdr/MemOutStream.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010031
Pierre Ossman504afa22015-11-12 12:21:58 +010032#include <os/Mutex.h>
33
Pierre Ossman9f273e92015-11-09 16:34:54 +010034using namespace rfb;
35
36static LogWriter vlog("DecodeManager");
37
38DecodeManager::DecodeManager(CConnection *conn) :
Pierre Ossman05604652015-11-17 09:37:57 +010039 conn(conn), threadException(NULL)
Pierre Ossman9f273e92015-11-09 16:34:54 +010040{
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010041 size_t cpuCount;
Pierre Ossman504afa22015-11-12 12:21:58 +010042
Pierre Ossman9f273e92015-11-09 16:34:54 +010043 memset(decoders, 0, sizeof(decoders));
Pierre Ossman504afa22015-11-12 12:21:58 +010044
45 queueMutex = new os::Mutex();
46 producerCond = new os::Condition(queueMutex);
47 consumerCond = new os::Condition(queueMutex);
48
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010049 cpuCount = os::Thread::getSystemCPUCount();
50 if (cpuCount == 0) {
51 vlog.error("Unable to determine the number of CPU cores on this system");
52 cpuCount = 1;
53 } else {
Pierre Ossmana0eb1e82015-11-20 16:14:48 +010054 vlog.info("Detected %d CPU core(s)", (int)cpuCount);
55 // No point creating more threads than this, they'll just end up
56 // wasting CPU fighting for locks
57 if (cpuCount > 4)
58 cpuCount = 4;
Pierre Ossman2b8aa352015-11-24 17:15:16 +010059 // The overhead of threading is small, but not small enough to
60 // ignore on single CPU systems
61 if (cpuCount == 1)
62 vlog.info("Decoding data on main thread");
63 else
64 vlog.info("Creating %d decoder thread(s)", (int)cpuCount);
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010065 }
66
Peter Åstrand (astrand)67814b62018-05-07 14:59:57 +020067 if (cpuCount == 1) {
68 // Threads are not used on single CPU machines
69 freeBuffers.push_back(new rdr::MemOutStream());
70 return;
71 }
72
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010073 while (cpuCount--) {
Pierre Ossman504afa22015-11-12 12:21:58 +010074 // Twice as many possible entries in the queue as there
75 // are worker threads to make sure they don't stall
76 freeBuffers.push_back(new rdr::MemOutStream());
77 freeBuffers.push_back(new rdr::MemOutStream());
78
79 threads.push_back(new DecodeThread(this));
80 }
Pierre Ossman9f273e92015-11-09 16:34:54 +010081}
82
83DecodeManager::~DecodeManager()
84{
Pierre Ossman504afa22015-11-12 12:21:58 +010085 while (!threads.empty()) {
86 delete threads.back();
87 threads.pop_back();
88 }
89
Pierre Ossman05604652015-11-17 09:37:57 +010090 delete threadException;
91
Pierre Ossman504afa22015-11-12 12:21:58 +010092 while (!freeBuffers.empty()) {
93 delete freeBuffers.back();
94 freeBuffers.pop_back();
95 }
96
97 delete consumerCond;
98 delete producerCond;
99 delete queueMutex;
100
Pierre Ossman9f273e92015-11-09 16:34:54 +0100101 for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
102 delete decoders[i];
103}
104
105void DecodeManager::decodeRect(const Rect& r, int encoding,
106 ModifiablePixelBuffer* pb)
107{
Pierre Ossman504afa22015-11-12 12:21:58 +0100108 Decoder *decoder;
109 rdr::MemOutStream *bufferStream;
110
111 QueueEntry *entry;
112
Pierre Ossman9f273e92015-11-09 16:34:54 +0100113 assert(pb != NULL);
114
115 if (!Decoder::supported(encoding)) {
116 vlog.error("Unknown encoding %d", encoding);
117 throw rdr::Exception("Unknown encoding");
118 }
119
120 if (!decoders[encoding]) {
Pierre Ossman86350622015-11-10 13:02:12 +0100121 decoders[encoding] = Decoder::createDecoder(encoding);
Pierre Ossman9f273e92015-11-09 16:34:54 +0100122 if (!decoders[encoding]) {
123 vlog.error("Unknown encoding %d", encoding);
124 throw rdr::Exception("Unknown encoding");
125 }
126 }
Pierre Ossman80b42092015-11-10 17:17:34 +0100127
Pierre Ossman504afa22015-11-12 12:21:58 +0100128 decoder = decoders[encoding];
129
Pierre Ossman2b8aa352015-11-24 17:15:16 +0100130 // Fast path for single CPU machines to avoid the context
131 // switching overhead
Peter Åstrand (astrand)67814b62018-05-07 14:59:57 +0200132 if (threads.empty()) {
Pierre Ossman2b8aa352015-11-24 17:15:16 +0100133 bufferStream = freeBuffers.front();
134 bufferStream->clear();
Pierre Ossmanb14a6bc2018-06-18 15:44:26 +0200135 decoder->readRect(r, conn->getInStream(), conn->server, bufferStream);
Pierre Ossman2b8aa352015-11-24 17:15:16 +0100136 decoder->decodeRect(r, bufferStream->data(), bufferStream->length(),
Pierre Ossmanb14a6bc2018-06-18 15:44:26 +0200137 conn->server, pb);
Pierre Ossman2b8aa352015-11-24 17:15:16 +0100138 return;
139 }
140
Pierre Ossman504afa22015-11-12 12:21:58 +0100141 // Wait for an available memory buffer
142 queueMutex->lock();
143
144 while (freeBuffers.empty())
145 producerCond->wait();
146
147 // Don't pop the buffer in case we throw an exception
148 // whilst reading
149 bufferStream = freeBuffers.front();
150
151 queueMutex->unlock();
152
Pierre Ossman05604652015-11-17 09:37:57 +0100153 // First check if any thread has encountered a problem
154 throwThreadException();
155
Pierre Ossman504afa22015-11-12 12:21:58 +0100156 // Read the rect
Pierre Ossman80b42092015-11-10 17:17:34 +0100157 bufferStream->clear();
Pierre Ossmanb14a6bc2018-06-18 15:44:26 +0200158 decoder->readRect(r, conn->getInStream(), conn->server, bufferStream);
Pierre Ossman504afa22015-11-12 12:21:58 +0100159
160 // Then try to put it on the queue
161 entry = new QueueEntry;
162
163 entry->active = false;
164 entry->rect = r;
165 entry->encoding = encoding;
166 entry->decoder = decoder;
Pierre Ossmanb14a6bc2018-06-18 15:44:26 +0200167 entry->server = &conn->server;
Pierre Ossman504afa22015-11-12 12:21:58 +0100168 entry->pb = pb;
169 entry->bufferStream = bufferStream;
170
Pierre Ossman14127892015-11-12 13:17:42 +0100171 decoder->getAffectedRegion(r, bufferStream->data(),
Pierre Ossmanb14a6bc2018-06-18 15:44:26 +0200172 bufferStream->length(), conn->server,
Pierre Ossman14127892015-11-12 13:17:42 +0100173 &entry->affectedRegion);
174
Pierre Ossman504afa22015-11-12 12:21:58 +0100175 queueMutex->lock();
176
177 // The workers add buffers to the end so it's safe to assume
178 // the front is still the same buffer
179 freeBuffers.pop_front();
180
181 workQueue.push_back(entry);
182
183 // We only put a single entry on the queue so waking a single
184 // thread is sufficient
185 consumerCond->signal();
186
187 queueMutex->unlock();
188}
189
190void DecodeManager::flush()
191{
192 queueMutex->lock();
193
194 while (!workQueue.empty())
195 producerCond->wait();
196
197 queueMutex->unlock();
Pierre Ossman05604652015-11-17 09:37:57 +0100198
199 throwThreadException();
200}
201
202void DecodeManager::setThreadException(const rdr::Exception& e)
203{
204 os::AutoMutex a(queueMutex);
205
Pierre Ossmane20cf622017-02-19 15:51:45 +0100206 if (threadException != NULL)
Pierre Ossman05604652015-11-17 09:37:57 +0100207 return;
208
209 threadException = new rdr::Exception("Exception on worker thread: %s", e.str());
210}
211
212void DecodeManager::throwThreadException()
213{
214 os::AutoMutex a(queueMutex);
215
216 if (threadException == NULL)
217 return;
218
219 rdr::Exception e(*threadException);
220
221 delete threadException;
222 threadException = NULL;
223
224 throw e;
Pierre Ossman504afa22015-11-12 12:21:58 +0100225}
226
227DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
228{
229 this->manager = manager;
230
231 stopRequested = false;
232
233 start();
234}
235
236DecodeManager::DecodeThread::~DecodeThread()
237{
238 stop();
239 wait();
240}
241
242void DecodeManager::DecodeThread::stop()
243{
244 os::AutoMutex a(manager->queueMutex);
245
246 if (!isRunning())
247 return;
248
249 stopRequested = true;
250
251 // We can't wake just this thread, so wake everyone
252 manager->consumerCond->broadcast();
253}
254
255void DecodeManager::DecodeThread::worker()
256{
257 manager->queueMutex->lock();
258
259 while (!stopRequested) {
260 DecodeManager::QueueEntry *entry;
261
262 // Look for an available entry in the work queue
263 entry = findEntry();
264 if (entry == NULL) {
265 // Wait and try again
266 manager->consumerCond->wait();
267 continue;
268 }
269
270 // This is ours now
271 entry->active = true;
272
273 manager->queueMutex->unlock();
274
275 // Do the actual decoding
276 try {
277 entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
278 entry->bufferStream->length(),
Pierre Ossmanb14a6bc2018-06-18 15:44:26 +0200279 *entry->server, entry->pb);
Pierre Ossman8ee522a2018-05-29 15:50:08 +0200280 } catch (rdr::Exception& e) {
Pierre Ossman05604652015-11-17 09:37:57 +0100281 manager->setThreadException(e);
Pierre Ossman504afa22015-11-12 12:21:58 +0100282 } catch(...) {
Pierre Ossman504afa22015-11-12 12:21:58 +0100283 assert(false);
284 }
285
286 manager->queueMutex->lock();
287
288 // Remove the entry from the queue and give back the memory buffer
289 manager->freeBuffers.push_back(entry->bufferStream);
290 manager->workQueue.remove(entry);
291 delete entry;
292
293 // Wake the main thread in case it is waiting for a memory buffer
294 manager->producerCond->signal();
295 // This rect might have been blocking multiple other rects, so
296 // wake up every worker thread
297 if (manager->workQueue.size() > 1)
298 manager->consumerCond->broadcast();
299 }
300
301 manager->queueMutex->unlock();
302}
303
304DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
305{
306 std::list<DecodeManager::QueueEntry*>::iterator iter;
Pierre Ossman14127892015-11-12 13:17:42 +0100307 Region lockedRegion;
Pierre Ossman504afa22015-11-12 12:21:58 +0100308
309 if (manager->workQueue.empty())
310 return NULL;
311
312 if (!manager->workQueue.front()->active)
313 return manager->workQueue.front();
314
315 for (iter = manager->workQueue.begin();
316 iter != manager->workQueue.end();
317 ++iter) {
Pierre Ossman14127892015-11-12 13:17:42 +0100318 DecodeManager::QueueEntry* entry;
319
Pierre Ossmana862add2015-11-12 13:18:22 +0100320 std::list<DecodeManager::QueueEntry*>::iterator iter2;
321
Pierre Ossman14127892015-11-12 13:17:42 +0100322 entry = *iter;
323
Pierre Ossman504afa22015-11-12 12:21:58 +0100324 // Another thread working on this?
Pierre Ossman14127892015-11-12 13:17:42 +0100325 if (entry->active)
326 goto next;
327
Pierre Ossmana862add2015-11-12 13:18:22 +0100328 // If this is an ordered decoder then make sure this is the first
329 // rectangle in the queue for that decoder
330 if (entry->decoder->flags & DecoderOrdered) {
331 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
332 if (entry->encoding == (*iter2)->encoding)
333 goto next;
334 }
335 }
336
Pierre Ossmane6ad4452015-11-13 10:47:28 +0100337 // For a partially ordered decoder we must ask the decoder for each
338 // pair of rectangles.
339 if (entry->decoder->flags & DecoderPartiallyOrdered) {
340 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
341 if (entry->encoding != (*iter2)->encoding)
342 continue;
343 if (entry->decoder->doRectsConflict(entry->rect,
344 entry->bufferStream->data(),
345 entry->bufferStream->length(),
346 (*iter2)->rect,
347 (*iter2)->bufferStream->data(),
348 (*iter2)->bufferStream->length(),
Pierre Ossmanb14a6bc2018-06-18 15:44:26 +0200349 *entry->server))
Pierre Ossmane6ad4452015-11-13 10:47:28 +0100350 goto next;
351 }
352 }
353
Pierre Ossman14127892015-11-12 13:17:42 +0100354 // Check overlap with earlier rectangles
355 if (!lockedRegion.intersect(entry->affectedRegion).is_empty())
356 goto next;
Pierre Ossman504afa22015-11-12 12:21:58 +0100357
Pierre Ossman14127892015-11-12 13:17:42 +0100358 return entry;
359
360next:
361 lockedRegion.assign_union(entry->affectedRegion);
Pierre Ossman504afa22015-11-12 12:21:58 +0100362 }
363
364 return NULL;
Pierre Ossman9f273e92015-11-09 16:34:54 +0100365}