blob: a655c53afb395cfa66536cd47494789744517caa [file] [log] [blame]
Pierre Ossman9f273e92015-11-09 16:34:54 +01001/* Copyright 2015 Pierre Ossman for Cendio AB
2 *
3 * This is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This software is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this software; if not, write to the Free Software
15 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
16 * USA.
17 */
18
19#include <assert.h>
20#include <string.h>
21
Pierre Ossman86350622015-11-10 13:02:12 +010022#include <rfb/CConnection.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010023#include <rfb/DecodeManager.h>
24#include <rfb/Decoder.h>
Pierre Ossman14127892015-11-12 13:17:42 +010025#include <rfb/Region.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010026
27#include <rfb/LogWriter.h>
28
29#include <rdr/Exception.h>
Pierre Ossman80b42092015-11-10 17:17:34 +010030#include <rdr/MemOutStream.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010031
Pierre Ossman504afa22015-11-12 12:21:58 +010032#include <os/Mutex.h>
33
Pierre Ossman9f273e92015-11-09 16:34:54 +010034using namespace rfb;
35
36static LogWriter vlog("DecodeManager");
37
38DecodeManager::DecodeManager(CConnection *conn) :
Pierre Ossman05604652015-11-17 09:37:57 +010039 conn(conn), threadException(NULL)
Pierre Ossman9f273e92015-11-09 16:34:54 +010040{
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010041 size_t cpuCount;
Pierre Ossman504afa22015-11-12 12:21:58 +010042
Pierre Ossman9f273e92015-11-09 16:34:54 +010043 memset(decoders, 0, sizeof(decoders));
Pierre Ossman504afa22015-11-12 12:21:58 +010044
45 queueMutex = new os::Mutex();
46 producerCond = new os::Condition(queueMutex);
47 consumerCond = new os::Condition(queueMutex);
48
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010049 cpuCount = os::Thread::getSystemCPUCount();
50 if (cpuCount == 0) {
51 vlog.error("Unable to determine the number of CPU cores on this system");
52 cpuCount = 1;
53 } else {
Pierre Ossmana0eb1e82015-11-20 16:14:48 +010054 vlog.info("Detected %d CPU core(s)", (int)cpuCount);
55 // No point creating more threads than this, they'll just end up
56 // wasting CPU fighting for locks
57 if (cpuCount > 4)
58 cpuCount = 4;
Pierre Ossman2b8aa352015-11-24 17:15:16 +010059 // The overhead of threading is small, but not small enough to
60 // ignore on single CPU systems
61 if (cpuCount == 1)
62 vlog.info("Decoding data on main thread");
63 else
64 vlog.info("Creating %d decoder thread(s)", (int)cpuCount);
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010065 }
66
67 while (cpuCount--) {
Pierre Ossman504afa22015-11-12 12:21:58 +010068 // Twice as many possible entries in the queue as there
69 // are worker threads to make sure they don't stall
70 freeBuffers.push_back(new rdr::MemOutStream());
71 freeBuffers.push_back(new rdr::MemOutStream());
72
73 threads.push_back(new DecodeThread(this));
74 }
Pierre Ossman9f273e92015-11-09 16:34:54 +010075}
76
77DecodeManager::~DecodeManager()
78{
Pierre Ossman504afa22015-11-12 12:21:58 +010079 while (!threads.empty()) {
80 delete threads.back();
81 threads.pop_back();
82 }
83
Pierre Ossman05604652015-11-17 09:37:57 +010084 delete threadException;
85
Pierre Ossman504afa22015-11-12 12:21:58 +010086 while (!freeBuffers.empty()) {
87 delete freeBuffers.back();
88 freeBuffers.pop_back();
89 }
90
91 delete consumerCond;
92 delete producerCond;
93 delete queueMutex;
94
Pierre Ossman9f273e92015-11-09 16:34:54 +010095 for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
96 delete decoders[i];
97}
98
99void DecodeManager::decodeRect(const Rect& r, int encoding,
100 ModifiablePixelBuffer* pb)
101{
Pierre Ossman504afa22015-11-12 12:21:58 +0100102 Decoder *decoder;
103 rdr::MemOutStream *bufferStream;
104
105 QueueEntry *entry;
106
Pierre Ossman9f273e92015-11-09 16:34:54 +0100107 assert(pb != NULL);
108
109 if (!Decoder::supported(encoding)) {
110 vlog.error("Unknown encoding %d", encoding);
111 throw rdr::Exception("Unknown encoding");
112 }
113
114 if (!decoders[encoding]) {
Pierre Ossman86350622015-11-10 13:02:12 +0100115 decoders[encoding] = Decoder::createDecoder(encoding);
Pierre Ossman9f273e92015-11-09 16:34:54 +0100116 if (!decoders[encoding]) {
117 vlog.error("Unknown encoding %d", encoding);
118 throw rdr::Exception("Unknown encoding");
119 }
120 }
Pierre Ossman80b42092015-11-10 17:17:34 +0100121
Pierre Ossman504afa22015-11-12 12:21:58 +0100122 decoder = decoders[encoding];
123
Pierre Ossman2b8aa352015-11-24 17:15:16 +0100124 // Fast path for single CPU machines to avoid the context
125 // switching overhead
126 if (threads.size() == 1) {
127 bufferStream = freeBuffers.front();
128 bufferStream->clear();
129 decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);
130 decoder->decodeRect(r, bufferStream->data(), bufferStream->length(),
131 conn->cp, pb);
132 return;
133 }
134
Pierre Ossman504afa22015-11-12 12:21:58 +0100135 // Wait for an available memory buffer
136 queueMutex->lock();
137
138 while (freeBuffers.empty())
139 producerCond->wait();
140
141 // Don't pop the buffer in case we throw an exception
142 // whilst reading
143 bufferStream = freeBuffers.front();
144
145 queueMutex->unlock();
146
Pierre Ossman05604652015-11-17 09:37:57 +0100147 // First check if any thread has encountered a problem
148 throwThreadException();
149
Pierre Ossman504afa22015-11-12 12:21:58 +0100150 // Read the rect
Pierre Ossman80b42092015-11-10 17:17:34 +0100151 bufferStream->clear();
Pierre Ossman504afa22015-11-12 12:21:58 +0100152 decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);
153
154 // Then try to put it on the queue
155 entry = new QueueEntry;
156
157 entry->active = false;
158 entry->rect = r;
159 entry->encoding = encoding;
160 entry->decoder = decoder;
161 entry->cp = &conn->cp;
162 entry->pb = pb;
163 entry->bufferStream = bufferStream;
164
Pierre Ossman14127892015-11-12 13:17:42 +0100165 decoder->getAffectedRegion(r, bufferStream->data(),
166 bufferStream->length(), conn->cp,
167 &entry->affectedRegion);
168
Pierre Ossman504afa22015-11-12 12:21:58 +0100169 queueMutex->lock();
170
171 // The workers add buffers to the end so it's safe to assume
172 // the front is still the same buffer
173 freeBuffers.pop_front();
174
175 workQueue.push_back(entry);
176
177 // We only put a single entry on the queue so waking a single
178 // thread is sufficient
179 consumerCond->signal();
180
181 queueMutex->unlock();
182}
183
184void DecodeManager::flush()
185{
186 queueMutex->lock();
187
188 while (!workQueue.empty())
189 producerCond->wait();
190
191 queueMutex->unlock();
Pierre Ossman05604652015-11-17 09:37:57 +0100192
193 throwThreadException();
194}
195
196void DecodeManager::setThreadException(const rdr::Exception& e)
197{
198 os::AutoMutex a(queueMutex);
199
200 if (threadException == NULL)
201 return;
202
203 threadException = new rdr::Exception("Exception on worker thread: %s", e.str());
204}
205
206void DecodeManager::throwThreadException()
207{
208 os::AutoMutex a(queueMutex);
209
210 if (threadException == NULL)
211 return;
212
213 rdr::Exception e(*threadException);
214
215 delete threadException;
216 threadException = NULL;
217
218 throw e;
Pierre Ossman504afa22015-11-12 12:21:58 +0100219}
220
221DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
222{
223 this->manager = manager;
224
225 stopRequested = false;
226
227 start();
228}
229
230DecodeManager::DecodeThread::~DecodeThread()
231{
232 stop();
233 wait();
234}
235
236void DecodeManager::DecodeThread::stop()
237{
238 os::AutoMutex a(manager->queueMutex);
239
240 if (!isRunning())
241 return;
242
243 stopRequested = true;
244
245 // We can't wake just this thread, so wake everyone
246 manager->consumerCond->broadcast();
247}
248
249void DecodeManager::DecodeThread::worker()
250{
251 manager->queueMutex->lock();
252
253 while (!stopRequested) {
254 DecodeManager::QueueEntry *entry;
255
256 // Look for an available entry in the work queue
257 entry = findEntry();
258 if (entry == NULL) {
259 // Wait and try again
260 manager->consumerCond->wait();
261 continue;
262 }
263
264 // This is ours now
265 entry->active = true;
266
267 manager->queueMutex->unlock();
268
269 // Do the actual decoding
270 try {
271 entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
272 entry->bufferStream->length(),
273 *entry->cp, entry->pb);
Pierre Ossman05604652015-11-17 09:37:57 +0100274 } catch (rdr::Exception e) {
275 manager->setThreadException(e);
Pierre Ossman504afa22015-11-12 12:21:58 +0100276 } catch(...) {
Pierre Ossman504afa22015-11-12 12:21:58 +0100277 assert(false);
278 }
279
280 manager->queueMutex->lock();
281
282 // Remove the entry from the queue and give back the memory buffer
283 manager->freeBuffers.push_back(entry->bufferStream);
284 manager->workQueue.remove(entry);
285 delete entry;
286
287 // Wake the main thread in case it is waiting for a memory buffer
288 manager->producerCond->signal();
289 // This rect might have been blocking multiple other rects, so
290 // wake up every worker thread
291 if (manager->workQueue.size() > 1)
292 manager->consumerCond->broadcast();
293 }
294
295 manager->queueMutex->unlock();
296}
297
298DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
299{
300 std::list<DecodeManager::QueueEntry*>::iterator iter;
Pierre Ossman14127892015-11-12 13:17:42 +0100301 Region lockedRegion;
Pierre Ossman504afa22015-11-12 12:21:58 +0100302
303 if (manager->workQueue.empty())
304 return NULL;
305
306 if (!manager->workQueue.front()->active)
307 return manager->workQueue.front();
308
309 for (iter = manager->workQueue.begin();
310 iter != manager->workQueue.end();
311 ++iter) {
Pierre Ossman14127892015-11-12 13:17:42 +0100312 DecodeManager::QueueEntry* entry;
313
Pierre Ossmana862add2015-11-12 13:18:22 +0100314 std::list<DecodeManager::QueueEntry*>::iterator iter2;
315
Pierre Ossman14127892015-11-12 13:17:42 +0100316 entry = *iter;
317
Pierre Ossman504afa22015-11-12 12:21:58 +0100318 // Another thread working on this?
Pierre Ossman14127892015-11-12 13:17:42 +0100319 if (entry->active)
320 goto next;
321
Pierre Ossmana862add2015-11-12 13:18:22 +0100322 // If this is an ordered decoder then make sure this is the first
323 // rectangle in the queue for that decoder
324 if (entry->decoder->flags & DecoderOrdered) {
325 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
326 if (entry->encoding == (*iter2)->encoding)
327 goto next;
328 }
329 }
330
Pierre Ossmane6ad4452015-11-13 10:47:28 +0100331 // For a partially ordered decoder we must ask the decoder for each
332 // pair of rectangles.
333 if (entry->decoder->flags & DecoderPartiallyOrdered) {
334 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
335 if (entry->encoding != (*iter2)->encoding)
336 continue;
337 if (entry->decoder->doRectsConflict(entry->rect,
338 entry->bufferStream->data(),
339 entry->bufferStream->length(),
340 (*iter2)->rect,
341 (*iter2)->bufferStream->data(),
342 (*iter2)->bufferStream->length(),
343 *entry->cp))
344 goto next;
345 }
346 }
347
Pierre Ossman14127892015-11-12 13:17:42 +0100348 // Check overlap with earlier rectangles
349 if (!lockedRegion.intersect(entry->affectedRegion).is_empty())
350 goto next;
Pierre Ossman504afa22015-11-12 12:21:58 +0100351
Pierre Ossman14127892015-11-12 13:17:42 +0100352 return entry;
353
354next:
355 lockedRegion.assign_union(entry->affectedRegion);
Pierre Ossman504afa22015-11-12 12:21:58 +0100356 }
357
358 return NULL;
Pierre Ossman9f273e92015-11-09 16:34:54 +0100359}