blob: 724cf215a474aa750b22afaa249a03aac7bd1cc1 [file] [log] [blame]
Pierre Ossman9f273e92015-11-09 16:34:54 +01001/* Copyright 2015 Pierre Ossman for Cendio AB
2 *
3 * This is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This software is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this software; if not, write to the Free Software
15 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
16 * USA.
17 */
18
19#include <assert.h>
20#include <string.h>
21
Pierre Ossman86350622015-11-10 13:02:12 +010022#include <rfb/CConnection.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010023#include <rfb/DecodeManager.h>
24#include <rfb/Decoder.h>
Pierre Ossman14127892015-11-12 13:17:42 +010025#include <rfb/Region.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010026
27#include <rfb/LogWriter.h>
28
29#include <rdr/Exception.h>
Pierre Ossman80b42092015-11-10 17:17:34 +010030#include <rdr/MemOutStream.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010031
Pierre Ossman504afa22015-11-12 12:21:58 +010032#include <os/Mutex.h>
33
Pierre Ossman9f273e92015-11-09 16:34:54 +010034using namespace rfb;
35
36static LogWriter vlog("DecodeManager");
37
38DecodeManager::DecodeManager(CConnection *conn) :
39 conn(conn)
40{
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010041 size_t cpuCount;
Pierre Ossman504afa22015-11-12 12:21:58 +010042
Pierre Ossman9f273e92015-11-09 16:34:54 +010043 memset(decoders, 0, sizeof(decoders));
Pierre Ossman504afa22015-11-12 12:21:58 +010044
45 queueMutex = new os::Mutex();
46 producerCond = new os::Condition(queueMutex);
47 consumerCond = new os::Condition(queueMutex);
48
Pierre Ossman7b63a7c2015-11-13 14:07:52 +010049 cpuCount = os::Thread::getSystemCPUCount();
50 if (cpuCount == 0) {
51 vlog.error("Unable to determine the number of CPU cores on this system");
52 cpuCount = 1;
53 } else {
54 vlog.info("Detected %d CPU core(s) available for decoding", (int)cpuCount);
55 }
56
57 while (cpuCount--) {
Pierre Ossman504afa22015-11-12 12:21:58 +010058 // Twice as many possible entries in the queue as there
59 // are worker threads to make sure they don't stall
60 freeBuffers.push_back(new rdr::MemOutStream());
61 freeBuffers.push_back(new rdr::MemOutStream());
62
63 threads.push_back(new DecodeThread(this));
64 }
Pierre Ossman9f273e92015-11-09 16:34:54 +010065}
66
67DecodeManager::~DecodeManager()
68{
Pierre Ossman504afa22015-11-12 12:21:58 +010069 while (!threads.empty()) {
70 delete threads.back();
71 threads.pop_back();
72 }
73
74 while (!freeBuffers.empty()) {
75 delete freeBuffers.back();
76 freeBuffers.pop_back();
77 }
78
79 delete consumerCond;
80 delete producerCond;
81 delete queueMutex;
82
Pierre Ossman9f273e92015-11-09 16:34:54 +010083 for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
84 delete decoders[i];
85}
86
87void DecodeManager::decodeRect(const Rect& r, int encoding,
88 ModifiablePixelBuffer* pb)
89{
Pierre Ossman504afa22015-11-12 12:21:58 +010090 Decoder *decoder;
91 rdr::MemOutStream *bufferStream;
92
93 QueueEntry *entry;
94
Pierre Ossman9f273e92015-11-09 16:34:54 +010095 assert(pb != NULL);
96
97 if (!Decoder::supported(encoding)) {
98 vlog.error("Unknown encoding %d", encoding);
99 throw rdr::Exception("Unknown encoding");
100 }
101
102 if (!decoders[encoding]) {
Pierre Ossman86350622015-11-10 13:02:12 +0100103 decoders[encoding] = Decoder::createDecoder(encoding);
Pierre Ossman9f273e92015-11-09 16:34:54 +0100104 if (!decoders[encoding]) {
105 vlog.error("Unknown encoding %d", encoding);
106 throw rdr::Exception("Unknown encoding");
107 }
108 }
Pierre Ossman80b42092015-11-10 17:17:34 +0100109
Pierre Ossman504afa22015-11-12 12:21:58 +0100110 decoder = decoders[encoding];
111
112 // Wait for an available memory buffer
113 queueMutex->lock();
114
115 while (freeBuffers.empty())
116 producerCond->wait();
117
118 // Don't pop the buffer in case we throw an exception
119 // whilst reading
120 bufferStream = freeBuffers.front();
121
122 queueMutex->unlock();
123
124 // Read the rect
Pierre Ossman80b42092015-11-10 17:17:34 +0100125 bufferStream->clear();
Pierre Ossman504afa22015-11-12 12:21:58 +0100126 decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);
127
128 // Then try to put it on the queue
129 entry = new QueueEntry;
130
131 entry->active = false;
132 entry->rect = r;
133 entry->encoding = encoding;
134 entry->decoder = decoder;
135 entry->cp = &conn->cp;
136 entry->pb = pb;
137 entry->bufferStream = bufferStream;
138
Pierre Ossman14127892015-11-12 13:17:42 +0100139 decoder->getAffectedRegion(r, bufferStream->data(),
140 bufferStream->length(), conn->cp,
141 &entry->affectedRegion);
142
Pierre Ossman504afa22015-11-12 12:21:58 +0100143 queueMutex->lock();
144
145 // The workers add buffers to the end so it's safe to assume
146 // the front is still the same buffer
147 freeBuffers.pop_front();
148
149 workQueue.push_back(entry);
150
151 // We only put a single entry on the queue so waking a single
152 // thread is sufficient
153 consumerCond->signal();
154
155 queueMutex->unlock();
156}
157
158void DecodeManager::flush()
159{
160 queueMutex->lock();
161
162 while (!workQueue.empty())
163 producerCond->wait();
164
165 queueMutex->unlock();
166}
167
168DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
169{
170 this->manager = manager;
171
172 stopRequested = false;
173
174 start();
175}
176
177DecodeManager::DecodeThread::~DecodeThread()
178{
179 stop();
180 wait();
181}
182
183void DecodeManager::DecodeThread::stop()
184{
185 os::AutoMutex a(manager->queueMutex);
186
187 if (!isRunning())
188 return;
189
190 stopRequested = true;
191
192 // We can't wake just this thread, so wake everyone
193 manager->consumerCond->broadcast();
194}
195
196void DecodeManager::DecodeThread::worker()
197{
198 manager->queueMutex->lock();
199
200 while (!stopRequested) {
201 DecodeManager::QueueEntry *entry;
202
203 // Look for an available entry in the work queue
204 entry = findEntry();
205 if (entry == NULL) {
206 // Wait and try again
207 manager->consumerCond->wait();
208 continue;
209 }
210
211 // This is ours now
212 entry->active = true;
213
214 manager->queueMutex->unlock();
215
216 // Do the actual decoding
217 try {
218 entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
219 entry->bufferStream->length(),
220 *entry->cp, entry->pb);
221 } catch(...) {
222 // FIXME: Try to get the exception back to the main thread
223 assert(false);
224 }
225
226 manager->queueMutex->lock();
227
228 // Remove the entry from the queue and give back the memory buffer
229 manager->freeBuffers.push_back(entry->bufferStream);
230 manager->workQueue.remove(entry);
231 delete entry;
232
233 // Wake the main thread in case it is waiting for a memory buffer
234 manager->producerCond->signal();
235 // This rect might have been blocking multiple other rects, so
236 // wake up every worker thread
237 if (manager->workQueue.size() > 1)
238 manager->consumerCond->broadcast();
239 }
240
241 manager->queueMutex->unlock();
242}
243
244DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
245{
246 std::list<DecodeManager::QueueEntry*>::iterator iter;
Pierre Ossman14127892015-11-12 13:17:42 +0100247 Region lockedRegion;
Pierre Ossman504afa22015-11-12 12:21:58 +0100248
249 if (manager->workQueue.empty())
250 return NULL;
251
252 if (!manager->workQueue.front()->active)
253 return manager->workQueue.front();
254
255 for (iter = manager->workQueue.begin();
256 iter != manager->workQueue.end();
257 ++iter) {
Pierre Ossman14127892015-11-12 13:17:42 +0100258 DecodeManager::QueueEntry* entry;
259
Pierre Ossmana862add2015-11-12 13:18:22 +0100260 std::list<DecodeManager::QueueEntry*>::iterator iter2;
261
Pierre Ossman14127892015-11-12 13:17:42 +0100262 entry = *iter;
263
Pierre Ossman504afa22015-11-12 12:21:58 +0100264 // Another thread working on this?
Pierre Ossman14127892015-11-12 13:17:42 +0100265 if (entry->active)
266 goto next;
267
Pierre Ossmana862add2015-11-12 13:18:22 +0100268 // If this is an ordered decoder then make sure this is the first
269 // rectangle in the queue for that decoder
270 if (entry->decoder->flags & DecoderOrdered) {
271 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
272 if (entry->encoding == (*iter2)->encoding)
273 goto next;
274 }
275 }
276
Pierre Ossmane6ad4452015-11-13 10:47:28 +0100277 // For a partially ordered decoder we must ask the decoder for each
278 // pair of rectangles.
279 if (entry->decoder->flags & DecoderPartiallyOrdered) {
280 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
281 if (entry->encoding != (*iter2)->encoding)
282 continue;
283 if (entry->decoder->doRectsConflict(entry->rect,
284 entry->bufferStream->data(),
285 entry->bufferStream->length(),
286 (*iter2)->rect,
287 (*iter2)->bufferStream->data(),
288 (*iter2)->bufferStream->length(),
289 *entry->cp))
290 goto next;
291 }
292 }
293
Pierre Ossman14127892015-11-12 13:17:42 +0100294 // Check overlap with earlier rectangles
295 if (!lockedRegion.intersect(entry->affectedRegion).is_empty())
296 goto next;
Pierre Ossman504afa22015-11-12 12:21:58 +0100297
Pierre Ossman14127892015-11-12 13:17:42 +0100298 return entry;
299
300next:
301 lockedRegion.assign_union(entry->affectedRegion);
Pierre Ossman504afa22015-11-12 12:21:58 +0100302 }
303
304 return NULL;
Pierre Ossman9f273e92015-11-09 16:34:54 +0100305}