blob: d7cffcfd6cf3edc20fd41c3ab3a66e2b54ef9853 [file] [log] [blame]
Pierre Ossman9f273e92015-11-09 16:34:54 +01001/* Copyright 2015 Pierre Ossman for Cendio AB
2 *
3 * This is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This software is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this software; if not, write to the Free Software
15 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
16 * USA.
17 */
18
19#include <assert.h>
20#include <string.h>
21
Pierre Ossman86350622015-11-10 13:02:12 +010022#include <rfb/CConnection.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010023#include <rfb/DecodeManager.h>
24#include <rfb/Decoder.h>
Pierre Ossman14127892015-11-12 13:17:42 +010025#include <rfb/Region.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010026
27#include <rfb/LogWriter.h>
28
29#include <rdr/Exception.h>
Pierre Ossman80b42092015-11-10 17:17:34 +010030#include <rdr/MemOutStream.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010031
Pierre Ossman504afa22015-11-12 12:21:58 +010032#include <os/Mutex.h>
33
Pierre Ossman9f273e92015-11-09 16:34:54 +010034using namespace rfb;
35
36static LogWriter vlog("DecodeManager");
37
38DecodeManager::DecodeManager(CConnection *conn) :
Pierre Ossman05604652015-11-17 09:37:57 +010039 conn(conn), threadException(NULL)
Pierre Ossman9f273e92015-11-09 16:34:54 +010040{
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010041 size_t cpuCount;
Pierre Ossman504afa22015-11-12 12:21:58 +010042
Pierre Ossman9f273e92015-11-09 16:34:54 +010043 memset(decoders, 0, sizeof(decoders));
Pierre Ossman504afa22015-11-12 12:21:58 +010044
45 queueMutex = new os::Mutex();
46 producerCond = new os::Condition(queueMutex);
47 consumerCond = new os::Condition(queueMutex);
48
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010049 cpuCount = os::Thread::getSystemCPUCount();
50 if (cpuCount == 0) {
51 vlog.error("Unable to determine the number of CPU cores on this system");
52 cpuCount = 1;
53 } else {
Pierre Ossmana0eb1e82015-11-20 16:14:48 +010054 vlog.info("Detected %d CPU core(s)", (int)cpuCount);
55 // No point creating more threads than this, they'll just end up
56 // wasting CPU fighting for locks
57 if (cpuCount > 4)
58 cpuCount = 4;
59 vlog.info("Creating %d decoder thread(s)", (int)cpuCount);
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010060 }
61
62 while (cpuCount--) {
Pierre Ossman504afa22015-11-12 12:21:58 +010063 // Twice as many possible entries in the queue as there
64 // are worker threads to make sure they don't stall
65 freeBuffers.push_back(new rdr::MemOutStream());
66 freeBuffers.push_back(new rdr::MemOutStream());
67
68 threads.push_back(new DecodeThread(this));
69 }
Pierre Ossman9f273e92015-11-09 16:34:54 +010070}
71
72DecodeManager::~DecodeManager()
73{
Pierre Ossman504afa22015-11-12 12:21:58 +010074 while (!threads.empty()) {
75 delete threads.back();
76 threads.pop_back();
77 }
78
Pierre Ossman05604652015-11-17 09:37:57 +010079 delete threadException;
80
Pierre Ossman504afa22015-11-12 12:21:58 +010081 while (!freeBuffers.empty()) {
82 delete freeBuffers.back();
83 freeBuffers.pop_back();
84 }
85
86 delete consumerCond;
87 delete producerCond;
88 delete queueMutex;
89
Pierre Ossman9f273e92015-11-09 16:34:54 +010090 for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
91 delete decoders[i];
92}
93
94void DecodeManager::decodeRect(const Rect& r, int encoding,
95 ModifiablePixelBuffer* pb)
96{
Pierre Ossman504afa22015-11-12 12:21:58 +010097 Decoder *decoder;
98 rdr::MemOutStream *bufferStream;
99
100 QueueEntry *entry;
101
Pierre Ossman9f273e92015-11-09 16:34:54 +0100102 assert(pb != NULL);
103
104 if (!Decoder::supported(encoding)) {
105 vlog.error("Unknown encoding %d", encoding);
106 throw rdr::Exception("Unknown encoding");
107 }
108
109 if (!decoders[encoding]) {
Pierre Ossman86350622015-11-10 13:02:12 +0100110 decoders[encoding] = Decoder::createDecoder(encoding);
Pierre Ossman9f273e92015-11-09 16:34:54 +0100111 if (!decoders[encoding]) {
112 vlog.error("Unknown encoding %d", encoding);
113 throw rdr::Exception("Unknown encoding");
114 }
115 }
Pierre Ossman80b42092015-11-10 17:17:34 +0100116
Pierre Ossman504afa22015-11-12 12:21:58 +0100117 decoder = decoders[encoding];
118
119 // Wait for an available memory buffer
120 queueMutex->lock();
121
122 while (freeBuffers.empty())
123 producerCond->wait();
124
125 // Don't pop the buffer in case we throw an exception
126 // whilst reading
127 bufferStream = freeBuffers.front();
128
129 queueMutex->unlock();
130
Pierre Ossman05604652015-11-17 09:37:57 +0100131 // First check if any thread has encountered a problem
132 throwThreadException();
133
Pierre Ossman504afa22015-11-12 12:21:58 +0100134 // Read the rect
Pierre Ossman80b42092015-11-10 17:17:34 +0100135 bufferStream->clear();
Pierre Ossman504afa22015-11-12 12:21:58 +0100136 decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);
137
138 // Then try to put it on the queue
139 entry = new QueueEntry;
140
141 entry->active = false;
142 entry->rect = r;
143 entry->encoding = encoding;
144 entry->decoder = decoder;
145 entry->cp = &conn->cp;
146 entry->pb = pb;
147 entry->bufferStream = bufferStream;
148
Pierre Ossman14127892015-11-12 13:17:42 +0100149 decoder->getAffectedRegion(r, bufferStream->data(),
150 bufferStream->length(), conn->cp,
151 &entry->affectedRegion);
152
Pierre Ossman504afa22015-11-12 12:21:58 +0100153 queueMutex->lock();
154
155 // The workers add buffers to the end so it's safe to assume
156 // the front is still the same buffer
157 freeBuffers.pop_front();
158
159 workQueue.push_back(entry);
160
161 // We only put a single entry on the queue so waking a single
162 // thread is sufficient
163 consumerCond->signal();
164
165 queueMutex->unlock();
166}
167
168void DecodeManager::flush()
169{
170 queueMutex->lock();
171
172 while (!workQueue.empty())
173 producerCond->wait();
174
175 queueMutex->unlock();
Pierre Ossman05604652015-11-17 09:37:57 +0100176
177 throwThreadException();
178}
179
180void DecodeManager::setThreadException(const rdr::Exception& e)
181{
182 os::AutoMutex a(queueMutex);
183
184 if (threadException == NULL)
185 return;
186
187 threadException = new rdr::Exception("Exception on worker thread: %s", e.str());
188}
189
190void DecodeManager::throwThreadException()
191{
192 os::AutoMutex a(queueMutex);
193
194 if (threadException == NULL)
195 return;
196
197 rdr::Exception e(*threadException);
198
199 delete threadException;
200 threadException = NULL;
201
202 throw e;
Pierre Ossman504afa22015-11-12 12:21:58 +0100203}
204
205DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
206{
207 this->manager = manager;
208
209 stopRequested = false;
210
211 start();
212}
213
214DecodeManager::DecodeThread::~DecodeThread()
215{
216 stop();
217 wait();
218}
219
220void DecodeManager::DecodeThread::stop()
221{
222 os::AutoMutex a(manager->queueMutex);
223
224 if (!isRunning())
225 return;
226
227 stopRequested = true;
228
229 // We can't wake just this thread, so wake everyone
230 manager->consumerCond->broadcast();
231}
232
233void DecodeManager::DecodeThread::worker()
234{
235 manager->queueMutex->lock();
236
237 while (!stopRequested) {
238 DecodeManager::QueueEntry *entry;
239
240 // Look for an available entry in the work queue
241 entry = findEntry();
242 if (entry == NULL) {
243 // Wait and try again
244 manager->consumerCond->wait();
245 continue;
246 }
247
248 // This is ours now
249 entry->active = true;
250
251 manager->queueMutex->unlock();
252
253 // Do the actual decoding
254 try {
255 entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
256 entry->bufferStream->length(),
257 *entry->cp, entry->pb);
Pierre Ossman05604652015-11-17 09:37:57 +0100258 } catch (rdr::Exception e) {
259 manager->setThreadException(e);
Pierre Ossman504afa22015-11-12 12:21:58 +0100260 } catch(...) {
Pierre Ossman504afa22015-11-12 12:21:58 +0100261 assert(false);
262 }
263
264 manager->queueMutex->lock();
265
266 // Remove the entry from the queue and give back the memory buffer
267 manager->freeBuffers.push_back(entry->bufferStream);
268 manager->workQueue.remove(entry);
269 delete entry;
270
271 // Wake the main thread in case it is waiting for a memory buffer
272 manager->producerCond->signal();
273 // This rect might have been blocking multiple other rects, so
274 // wake up every worker thread
275 if (manager->workQueue.size() > 1)
276 manager->consumerCond->broadcast();
277 }
278
279 manager->queueMutex->unlock();
280}
281
282DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
283{
284 std::list<DecodeManager::QueueEntry*>::iterator iter;
Pierre Ossman14127892015-11-12 13:17:42 +0100285 Region lockedRegion;
Pierre Ossman504afa22015-11-12 12:21:58 +0100286
287 if (manager->workQueue.empty())
288 return NULL;
289
290 if (!manager->workQueue.front()->active)
291 return manager->workQueue.front();
292
293 for (iter = manager->workQueue.begin();
294 iter != manager->workQueue.end();
295 ++iter) {
Pierre Ossman14127892015-11-12 13:17:42 +0100296 DecodeManager::QueueEntry* entry;
297
Pierre Ossmana862add2015-11-12 13:18:22 +0100298 std::list<DecodeManager::QueueEntry*>::iterator iter2;
299
Pierre Ossman14127892015-11-12 13:17:42 +0100300 entry = *iter;
301
Pierre Ossman504afa22015-11-12 12:21:58 +0100302 // Another thread working on this?
Pierre Ossman14127892015-11-12 13:17:42 +0100303 if (entry->active)
304 goto next;
305
Pierre Ossmana862add2015-11-12 13:18:22 +0100306 // If this is an ordered decoder then make sure this is the first
307 // rectangle in the queue for that decoder
308 if (entry->decoder->flags & DecoderOrdered) {
309 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
310 if (entry->encoding == (*iter2)->encoding)
311 goto next;
312 }
313 }
314
Pierre Ossmane6ad4452015-11-13 10:47:28 +0100315 // For a partially ordered decoder we must ask the decoder for each
316 // pair of rectangles.
317 if (entry->decoder->flags & DecoderPartiallyOrdered) {
318 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
319 if (entry->encoding != (*iter2)->encoding)
320 continue;
321 if (entry->decoder->doRectsConflict(entry->rect,
322 entry->bufferStream->data(),
323 entry->bufferStream->length(),
324 (*iter2)->rect,
325 (*iter2)->bufferStream->data(),
326 (*iter2)->bufferStream->length(),
327 *entry->cp))
328 goto next;
329 }
330 }
331
Pierre Ossman14127892015-11-12 13:17:42 +0100332 // Check overlap with earlier rectangles
333 if (!lockedRegion.intersect(entry->affectedRegion).is_empty())
334 goto next;
Pierre Ossman504afa22015-11-12 12:21:58 +0100335
Pierre Ossman14127892015-11-12 13:17:42 +0100336 return entry;
337
338next:
339 lockedRegion.assign_union(entry->affectedRegion);
Pierre Ossman504afa22015-11-12 12:21:58 +0100340 }
341
342 return NULL;
Pierre Ossman9f273e92015-11-09 16:34:54 +0100343}