blob: a444eb7b1500d15719c51e34abc7fb6b1d31bc10 [file] [log] [blame]
Pierre Ossman9f273e92015-11-09 16:34:54 +01001/* Copyright 2015 Pierre Ossman for Cendio AB
2 *
3 * This is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
7 *
8 * This software is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this software; if not, write to the Free Software
15 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
16 * USA.
17 */
18
19#include <assert.h>
20#include <string.h>
21
Pierre Ossman86350622015-11-10 13:02:12 +010022#include <rfb/CConnection.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010023#include <rfb/DecodeManager.h>
24#include <rfb/Decoder.h>
25
26#include <rfb/LogWriter.h>
27
28#include <rdr/Exception.h>
Pierre Ossman80b42092015-11-10 17:17:34 +010029#include <rdr/MemOutStream.h>
Pierre Ossman9f273e92015-11-09 16:34:54 +010030
Pierre Ossman504afa22015-11-12 12:21:58 +010031#include <os/Mutex.h>
32
Pierre Ossman9f273e92015-11-09 16:34:54 +010033using namespace rfb;
34
35static LogWriter vlog("DecodeManager");
36
37DecodeManager::DecodeManager(CConnection *conn) :
38 conn(conn)
39{
Pierre Ossman504afa22015-11-12 12:21:58 +010040 int i;
41
Pierre Ossman9f273e92015-11-09 16:34:54 +010042 memset(decoders, 0, sizeof(decoders));
Pierre Ossman504afa22015-11-12 12:21:58 +010043
44 queueMutex = new os::Mutex();
45 producerCond = new os::Condition(queueMutex);
46 consumerCond = new os::Condition(queueMutex);
47
48 // Just a single thread for now as we haven't sorted out the
49 // dependencies between rects
50 for (i = 0;i < 1;i++) {
51 // Twice as many possible entries in the queue as there
52 // are worker threads to make sure they don't stall
53 freeBuffers.push_back(new rdr::MemOutStream());
54 freeBuffers.push_back(new rdr::MemOutStream());
55
56 threads.push_back(new DecodeThread(this));
57 }
Pierre Ossman9f273e92015-11-09 16:34:54 +010058}
59
60DecodeManager::~DecodeManager()
61{
Pierre Ossman504afa22015-11-12 12:21:58 +010062 while (!threads.empty()) {
63 delete threads.back();
64 threads.pop_back();
65 }
66
67 while (!freeBuffers.empty()) {
68 delete freeBuffers.back();
69 freeBuffers.pop_back();
70 }
71
72 delete consumerCond;
73 delete producerCond;
74 delete queueMutex;
75
Pierre Ossman9f273e92015-11-09 16:34:54 +010076 for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
77 delete decoders[i];
78}
79
80void DecodeManager::decodeRect(const Rect& r, int encoding,
81 ModifiablePixelBuffer* pb)
82{
Pierre Ossman504afa22015-11-12 12:21:58 +010083 Decoder *decoder;
84 rdr::MemOutStream *bufferStream;
85
86 QueueEntry *entry;
87
Pierre Ossman9f273e92015-11-09 16:34:54 +010088 assert(pb != NULL);
89
90 if (!Decoder::supported(encoding)) {
91 vlog.error("Unknown encoding %d", encoding);
92 throw rdr::Exception("Unknown encoding");
93 }
94
95 if (!decoders[encoding]) {
Pierre Ossman86350622015-11-10 13:02:12 +010096 decoders[encoding] = Decoder::createDecoder(encoding);
Pierre Ossman9f273e92015-11-09 16:34:54 +010097 if (!decoders[encoding]) {
98 vlog.error("Unknown encoding %d", encoding);
99 throw rdr::Exception("Unknown encoding");
100 }
101 }
Pierre Ossman80b42092015-11-10 17:17:34 +0100102
Pierre Ossman504afa22015-11-12 12:21:58 +0100103 decoder = decoders[encoding];
104
105 // Wait for an available memory buffer
106 queueMutex->lock();
107
108 while (freeBuffers.empty())
109 producerCond->wait();
110
111 // Don't pop the buffer in case we throw an exception
112 // whilst reading
113 bufferStream = freeBuffers.front();
114
115 queueMutex->unlock();
116
117 // Read the rect
Pierre Ossman80b42092015-11-10 17:17:34 +0100118 bufferStream->clear();
Pierre Ossman504afa22015-11-12 12:21:58 +0100119 decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);
120
121 // Then try to put it on the queue
122 entry = new QueueEntry;
123
124 entry->active = false;
125 entry->rect = r;
126 entry->encoding = encoding;
127 entry->decoder = decoder;
128 entry->cp = &conn->cp;
129 entry->pb = pb;
130 entry->bufferStream = bufferStream;
131
132 queueMutex->lock();
133
134 // The workers add buffers to the end so it's safe to assume
135 // the front is still the same buffer
136 freeBuffers.pop_front();
137
138 workQueue.push_back(entry);
139
140 // We only put a single entry on the queue so waking a single
141 // thread is sufficient
142 consumerCond->signal();
143
144 queueMutex->unlock();
145}
146
147void DecodeManager::flush()
148{
149 queueMutex->lock();
150
151 while (!workQueue.empty())
152 producerCond->wait();
153
154 queueMutex->unlock();
155}
156
157DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
158{
159 this->manager = manager;
160
161 stopRequested = false;
162
163 start();
164}
165
166DecodeManager::DecodeThread::~DecodeThread()
167{
168 stop();
169 wait();
170}
171
172void DecodeManager::DecodeThread::stop()
173{
174 os::AutoMutex a(manager->queueMutex);
175
176 if (!isRunning())
177 return;
178
179 stopRequested = true;
180
181 // We can't wake just this thread, so wake everyone
182 manager->consumerCond->broadcast();
183}
184
185void DecodeManager::DecodeThread::worker()
186{
187 manager->queueMutex->lock();
188
189 while (!stopRequested) {
190 DecodeManager::QueueEntry *entry;
191
192 // Look for an available entry in the work queue
193 entry = findEntry();
194 if (entry == NULL) {
195 // Wait and try again
196 manager->consumerCond->wait();
197 continue;
198 }
199
200 // This is ours now
201 entry->active = true;
202
203 manager->queueMutex->unlock();
204
205 // Do the actual decoding
206 try {
207 entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
208 entry->bufferStream->length(),
209 *entry->cp, entry->pb);
210 } catch(...) {
211 // FIXME: Try to get the exception back to the main thread
212 assert(false);
213 }
214
215 manager->queueMutex->lock();
216
217 // Remove the entry from the queue and give back the memory buffer
218 manager->freeBuffers.push_back(entry->bufferStream);
219 manager->workQueue.remove(entry);
220 delete entry;
221
222 // Wake the main thread in case it is waiting for a memory buffer
223 manager->producerCond->signal();
224 // This rect might have been blocking multiple other rects, so
225 // wake up every worker thread
226 if (manager->workQueue.size() > 1)
227 manager->consumerCond->broadcast();
228 }
229
230 manager->queueMutex->unlock();
231}
232
233DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
234{
235 std::list<DecodeManager::QueueEntry*>::iterator iter;
236
237 if (manager->workQueue.empty())
238 return NULL;
239
240 if (!manager->workQueue.front()->active)
241 return manager->workQueue.front();
242
243 for (iter = manager->workQueue.begin();
244 iter != manager->workQueue.end();
245 ++iter) {
246 // Another thread working on this?
247 if ((*iter)->active)
248 continue;
249
250 // FIXME: check dependencies between rects
251
252 return *iter;
253 }
254
255 return NULL;
Pierre Ossman9f273e92015-11-09 16:34:54 +0100256}