blob: ba2b2d2f33bc9c1a62dd53b07e02066872be6e74 [file] [log] [blame]
Pierre Ossman9f273e92015-11-09 16:34:54 +01001/* Copyright 2015 Pierre Ossman for Cendio AB
2 *
3 * This is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This software is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this software; if not, write to the Free Software
15 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
16 * USA.
17 */
18
19#include <assert.h>
20#include <string.h>
21
Pierre Ossman86350622015-11-10 13:02:12 +010022#include <rfb/CConnection.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010023#include <rfb/DecodeManager.h>
24#include <rfb/Decoder.h>
Pierre Ossman14127892015-11-12 13:17:42 +010025#include <rfb/Region.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010026
27#include <rfb/LogWriter.h>
28
29#include <rdr/Exception.h>
Pierre Ossman80b42092015-11-10 17:17:34 +010030#include <rdr/MemOutStream.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010031
Pierre Ossman504afa22015-11-12 12:21:58 +010032#include <os/Mutex.h>
33
Pierre Ossman9f273e92015-11-09 16:34:54 +010034using namespace rfb;
35
36static LogWriter vlog("DecodeManager");
37
38DecodeManager::DecodeManager(CConnection *conn) :
39 conn(conn)
40{
Pierre Ossman504afa22015-11-12 12:21:58 +010041 int i;
42
Pierre Ossman9f273e92015-11-09 16:34:54 +010043 memset(decoders, 0, sizeof(decoders));
Pierre Ossman504afa22015-11-12 12:21:58 +010044
45 queueMutex = new os::Mutex();
46 producerCond = new os::Condition(queueMutex);
47 consumerCond = new os::Condition(queueMutex);
48
49 // Just a single thread for now as we haven't sorted out the
50 // dependencies between rects
51 for (i = 0;i < 1;i++) {
52 // Twice as many possible entries in the queue as there
53 // are worker threads to make sure they don't stall
54 freeBuffers.push_back(new rdr::MemOutStream());
55 freeBuffers.push_back(new rdr::MemOutStream());
56
57 threads.push_back(new DecodeThread(this));
58 }
Pierre Ossman9f273e92015-11-09 16:34:54 +010059}
60
61DecodeManager::~DecodeManager()
62{
Pierre Ossman504afa22015-11-12 12:21:58 +010063 while (!threads.empty()) {
64 delete threads.back();
65 threads.pop_back();
66 }
67
68 while (!freeBuffers.empty()) {
69 delete freeBuffers.back();
70 freeBuffers.pop_back();
71 }
72
73 delete consumerCond;
74 delete producerCond;
75 delete queueMutex;
76
Pierre Ossman9f273e92015-11-09 16:34:54 +010077 for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
78 delete decoders[i];
79}
80
81void DecodeManager::decodeRect(const Rect& r, int encoding,
82 ModifiablePixelBuffer* pb)
83{
Pierre Ossman504afa22015-11-12 12:21:58 +010084 Decoder *decoder;
85 rdr::MemOutStream *bufferStream;
86
87 QueueEntry *entry;
88
Pierre Ossman9f273e92015-11-09 16:34:54 +010089 assert(pb != NULL);
90
91 if (!Decoder::supported(encoding)) {
92 vlog.error("Unknown encoding %d", encoding);
93 throw rdr::Exception("Unknown encoding");
94 }
95
96 if (!decoders[encoding]) {
Pierre Ossman86350622015-11-10 13:02:12 +010097 decoders[encoding] = Decoder::createDecoder(encoding);
Pierre Ossman9f273e92015-11-09 16:34:54 +010098 if (!decoders[encoding]) {
99 vlog.error("Unknown encoding %d", encoding);
100 throw rdr::Exception("Unknown encoding");
101 }
102 }
Pierre Ossman80b42092015-11-10 17:17:34 +0100103
Pierre Ossman504afa22015-11-12 12:21:58 +0100104 decoder = decoders[encoding];
105
106 // Wait for an available memory buffer
107 queueMutex->lock();
108
109 while (freeBuffers.empty())
110 producerCond->wait();
111
112 // Don't pop the buffer in case we throw an exception
113 // whilst reading
114 bufferStream = freeBuffers.front();
115
116 queueMutex->unlock();
117
118 // Read the rect
Pierre Ossman80b42092015-11-10 17:17:34 +0100119 bufferStream->clear();
Pierre Ossman504afa22015-11-12 12:21:58 +0100120 decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);
121
122 // Then try to put it on the queue
123 entry = new QueueEntry;
124
125 entry->active = false;
126 entry->rect = r;
127 entry->encoding = encoding;
128 entry->decoder = decoder;
129 entry->cp = &conn->cp;
130 entry->pb = pb;
131 entry->bufferStream = bufferStream;
132
Pierre Ossman14127892015-11-12 13:17:42 +0100133 decoder->getAffectedRegion(r, bufferStream->data(),
134 bufferStream->length(), conn->cp,
135 &entry->affectedRegion);
136
Pierre Ossman504afa22015-11-12 12:21:58 +0100137 queueMutex->lock();
138
139 // The workers add buffers to the end so it's safe to assume
140 // the front is still the same buffer
141 freeBuffers.pop_front();
142
143 workQueue.push_back(entry);
144
145 // We only put a single entry on the queue so waking a single
146 // thread is sufficient
147 consumerCond->signal();
148
149 queueMutex->unlock();
150}
151
152void DecodeManager::flush()
153{
154 queueMutex->lock();
155
156 while (!workQueue.empty())
157 producerCond->wait();
158
159 queueMutex->unlock();
160}
161
162DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
163{
164 this->manager = manager;
165
166 stopRequested = false;
167
168 start();
169}
170
171DecodeManager::DecodeThread::~DecodeThread()
172{
173 stop();
174 wait();
175}
176
177void DecodeManager::DecodeThread::stop()
178{
179 os::AutoMutex a(manager->queueMutex);
180
181 if (!isRunning())
182 return;
183
184 stopRequested = true;
185
186 // We can't wake just this thread, so wake everyone
187 manager->consumerCond->broadcast();
188}
189
190void DecodeManager::DecodeThread::worker()
191{
192 manager->queueMutex->lock();
193
194 while (!stopRequested) {
195 DecodeManager::QueueEntry *entry;
196
197 // Look for an available entry in the work queue
198 entry = findEntry();
199 if (entry == NULL) {
200 // Wait and try again
201 manager->consumerCond->wait();
202 continue;
203 }
204
205 // This is ours now
206 entry->active = true;
207
208 manager->queueMutex->unlock();
209
210 // Do the actual decoding
211 try {
212 entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
213 entry->bufferStream->length(),
214 *entry->cp, entry->pb);
215 } catch(...) {
216 // FIXME: Try to get the exception back to the main thread
217 assert(false);
218 }
219
220 manager->queueMutex->lock();
221
222 // Remove the entry from the queue and give back the memory buffer
223 manager->freeBuffers.push_back(entry->bufferStream);
224 manager->workQueue.remove(entry);
225 delete entry;
226
227 // Wake the main thread in case it is waiting for a memory buffer
228 manager->producerCond->signal();
229 // This rect might have been blocking multiple other rects, so
230 // wake up every worker thread
231 if (manager->workQueue.size() > 1)
232 manager->consumerCond->broadcast();
233 }
234
235 manager->queueMutex->unlock();
236}
237
238DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
239{
240 std::list<DecodeManager::QueueEntry*>::iterator iter;
Pierre Ossman14127892015-11-12 13:17:42 +0100241 Region lockedRegion;
Pierre Ossman504afa22015-11-12 12:21:58 +0100242
243 if (manager->workQueue.empty())
244 return NULL;
245
246 if (!manager->workQueue.front()->active)
247 return manager->workQueue.front();
248
249 for (iter = manager->workQueue.begin();
250 iter != manager->workQueue.end();
251 ++iter) {
Pierre Ossman14127892015-11-12 13:17:42 +0100252 DecodeManager::QueueEntry* entry;
253
Pierre Ossmana862add2015-11-12 13:18:22 +0100254 std::list<DecodeManager::QueueEntry*>::iterator iter2;
255
Pierre Ossman14127892015-11-12 13:17:42 +0100256 entry = *iter;
257
Pierre Ossman504afa22015-11-12 12:21:58 +0100258 // Another thread working on this?
Pierre Ossman14127892015-11-12 13:17:42 +0100259 if (entry->active)
260 goto next;
261
Pierre Ossmana862add2015-11-12 13:18:22 +0100262 // If this is an ordered decoder then make sure this is the first
263 // rectangle in the queue for that decoder
264 if (entry->decoder->flags & DecoderOrdered) {
265 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
266 if (entry->encoding == (*iter2)->encoding)
267 goto next;
268 }
269 }
270
Pierre Ossmane6ad4452015-11-13 10:47:28 +0100271 // For a partially ordered decoder we must ask the decoder for each
272 // pair of rectangles.
273 if (entry->decoder->flags & DecoderPartiallyOrdered) {
274 for (iter2 = manager->workQueue.begin(); iter2 != iter; ++iter2) {
275 if (entry->encoding != (*iter2)->encoding)
276 continue;
277 if (entry->decoder->doRectsConflict(entry->rect,
278 entry->bufferStream->data(),
279 entry->bufferStream->length(),
280 (*iter2)->rect,
281 (*iter2)->bufferStream->data(),
282 (*iter2)->bufferStream->length(),
283 *entry->cp))
284 goto next;
285 }
286 }
287
Pierre Ossman14127892015-11-12 13:17:42 +0100288 // Check overlap with earlier rectangles
289 if (!lockedRegion.intersect(entry->affectedRegion).is_empty())
290 goto next;
Pierre Ossman504afa22015-11-12 12:21:58 +0100291
Pierre Ossman14127892015-11-12 13:17:42 +0100292 return entry;
293
294next:
295 lockedRegion.assign_union(entry->affectedRegion);
Pierre Ossman504afa22015-11-12 12:21:58 +0100296 }
297
298 return NULL;
Pierre Ossman9f273e92015-11-09 16:34:54 +0100299}