Make the decoder multi-threaded

This implements the basic infrastructure for multi-threaded
decoding of rects. However there is just one thread reading data
and one thread decoding it. More logic is needed to safely decode
multiple rects at the same time.
diff --git a/common/os/CMakeLists.txt b/common/os/CMakeLists.txt
index b574959..7644341 100644
--- a/common/os/CMakeLists.txt
+++ b/common/os/CMakeLists.txt
@@ -7,5 +7,9 @@
   os.cxx)
 
 if(UNIX)
+  target_link_libraries(os pthread)
+endif()
+
+if(UNIX)
   libtool_create_control_file(os)
 endif()
diff --git a/common/rfb/CConnection.cxx b/common/rfb/CConnection.cxx
index 2ddfc33..7e9fd31 100644
--- a/common/rfb/CConnection.cxx
+++ b/common/rfb/CConnection.cxx
@@ -64,6 +64,8 @@
 
 void CConnection::setFramebuffer(ModifiablePixelBuffer* fb)
 {
+  decoder.flush();
+
   if ((framebuffer != NULL) && (fb != NULL)) {
     Rect rect;
 
@@ -303,6 +305,8 @@
 
 void CConnection::setDesktopSize(int w, int h)
 {
+  decoder.flush();
+
   CMsgHandler::setDesktopSize(w,h);
 }
 
@@ -311,6 +315,8 @@
                                          int w, int h,
                                          const ScreenSet& layout)
 {
+  decoder.flush();
+
   CMsgHandler::setExtendedDesktopSize(reason, result, w, h, layout);
 }
 
@@ -321,6 +327,8 @@
 
 void CConnection::framebufferUpdateEnd()
 {
+  decoder.flush();
+
   CMsgHandler::framebufferUpdateEnd();
 }
 
diff --git a/common/rfb/DecodeManager.cxx b/common/rfb/DecodeManager.cxx
index ffae18b..a444eb7 100644
--- a/common/rfb/DecodeManager.cxx
+++ b/common/rfb/DecodeManager.cxx
@@ -28,6 +28,8 @@
 #include <rdr/Exception.h>
 #include <rdr/MemOutStream.h>
 
+#include <os/Mutex.h>
+
 using namespace rfb;
 
 static LogWriter vlog("DecodeManager");
@@ -35,20 +37,54 @@
 DecodeManager::DecodeManager(CConnection *conn) :
   conn(conn)
 {
+  int i;
+
   memset(decoders, 0, sizeof(decoders));
-  bufferStream = new rdr::MemOutStream();
+
+  queueMutex = new os::Mutex();
+  producerCond = new os::Condition(queueMutex);
+  consumerCond = new os::Condition(queueMutex);
+
+  // Just a single thread for now as we haven't sorted out the
+  // dependencies between rects
+  for (i = 0;i < 1;i++) {
+    // Twice as many possible entries in the queue as there
+    // are worker threads to make sure they don't stall
+    freeBuffers.push_back(new rdr::MemOutStream());
+    freeBuffers.push_back(new rdr::MemOutStream());
+
+    threads.push_back(new DecodeThread(this));
+  }
 }
 
 DecodeManager::~DecodeManager()
 {
+  while (!threads.empty()) {
+    delete threads.back();
+    threads.pop_back();
+  }
+
+  while (!freeBuffers.empty()) {
+    delete freeBuffers.back();
+    freeBuffers.pop_back();
+  }
+
+  delete consumerCond;
+  delete producerCond;
+  delete queueMutex;
+
   for (size_t i = 0; i < sizeof(decoders)/sizeof(decoders[0]); i++)
     delete decoders[i];
-  delete bufferStream;
 }
 
 void DecodeManager::decodeRect(const Rect& r, int encoding,
                                ModifiablePixelBuffer* pb)
 {
+  Decoder *decoder;
+  rdr::MemOutStream *bufferStream;
+
+  QueueEntry *entry;
+
   assert(pb != NULL);
 
   if (!Decoder::supported(encoding)) {
@@ -64,10 +100,157 @@
     }
   }
 
+  decoder = decoders[encoding];
+
+  // Wait for an available memory buffer
+  queueMutex->lock();
+
+  while (freeBuffers.empty())
+    producerCond->wait();
+
+  // Don't pop the buffer in case we throw an exception
+  // whilst reading
+  bufferStream = freeBuffers.front();
+
+  queueMutex->unlock();
+
+  // Read the rect
   bufferStream->clear();
-  decoders[encoding]->readRect(r, conn->getInStream(),
-                               conn->cp, bufferStream);
-  decoders[encoding]->decodeRect(r, bufferStream->data(),
-                                 bufferStream->length(),
-                                 conn->cp, pb);
+  decoder->readRect(r, conn->getInStream(), conn->cp, bufferStream);
+
+  // Then try to put it on the queue
+  entry = new QueueEntry;
+
+  entry->active = false;
+  entry->rect = r;
+  entry->encoding = encoding;
+  entry->decoder = decoder;
+  entry->cp = &conn->cp;
+  entry->pb = pb;
+  entry->bufferStream = bufferStream;
+
+  queueMutex->lock();
+
+  // The workers add buffers to the end so it's safe to assume
+  // the front is still the same buffer
+  freeBuffers.pop_front();
+
+  workQueue.push_back(entry);
+
+  // We only put a single entry on the queue so waking a single
+  // thread is sufficient
+  consumerCond->signal();
+
+  queueMutex->unlock();
+}
+
+void DecodeManager::flush()
+{
+  queueMutex->lock();
+
+  while (!workQueue.empty())
+    producerCond->wait();
+
+  queueMutex->unlock();
+}
+
+DecodeManager::DecodeThread::DecodeThread(DecodeManager* manager)
+{
+  this->manager = manager;
+
+  stopRequested = false;
+
+  start();
+}
+
+DecodeManager::DecodeThread::~DecodeThread()
+{
+  stop();
+  wait();
+}
+
+void DecodeManager::DecodeThread::stop()
+{
+  os::AutoMutex a(manager->queueMutex);
+
+  if (!isRunning())
+    return;
+
+  stopRequested = true;
+
+  // We can't wake just this thread, so wake everyone
+  manager->consumerCond->broadcast();
+}
+
+void DecodeManager::DecodeThread::worker()
+{
+  manager->queueMutex->lock();
+
+  while (!stopRequested) {
+    DecodeManager::QueueEntry *entry;
+
+    // Look for an available entry in the work queue
+    entry = findEntry();
+    if (entry == NULL) {
+      // Wait and try again
+      manager->consumerCond->wait();
+      continue;
+    }
+
+    // This is ours now
+    entry->active = true;
+
+    manager->queueMutex->unlock();
+
+    // Do the actual decoding
+    try {
+      entry->decoder->decodeRect(entry->rect, entry->bufferStream->data(),
+                                 entry->bufferStream->length(),
+                                 *entry->cp, entry->pb);
+    } catch(...) {
+      // FIXME: Try to get the exception back to the main thread
+      assert(false);
+    }
+
+    manager->queueMutex->lock();
+
+    // Remove the entry from the queue and give back the memory buffer
+    manager->freeBuffers.push_back(entry->bufferStream);
+    manager->workQueue.remove(entry);
+    delete entry;
+
+    // Wake the main thread in case it is waiting for a memory buffer
+    manager->producerCond->signal();
+    // This rect might have been blocking multiple other rects, so
+    // wake up every worker thread
+    if (manager->workQueue.size() > 1)
+      manager->consumerCond->broadcast();
+  }
+
+  manager->queueMutex->unlock();
+}
+
+DecodeManager::QueueEntry* DecodeManager::DecodeThread::findEntry()
+{
+  std::list<DecodeManager::QueueEntry*>::iterator iter;
+
+  if (manager->workQueue.empty())
+    return NULL;
+
+  if (!manager->workQueue.front()->active)
+    return manager->workQueue.front();
+
+  for (iter = manager->workQueue.begin();
+       iter != manager->workQueue.end();
+       ++iter) {
+    // Another thread working on this?
+    if ((*iter)->active)
+      continue;
+
+    // FIXME: check dependencies between rects
+
+    return *iter;
+  }
+
+  return NULL;
 }
diff --git a/common/rfb/DecodeManager.h b/common/rfb/DecodeManager.h
index 63a4120..1a974ea 100644
--- a/common/rfb/DecodeManager.h
+++ b/common/rfb/DecodeManager.h
@@ -19,8 +19,17 @@
 #ifndef __RFB_DECODEMANAGER_H__
 #define __RFB_DECODEMANAGER_H__
 
+#include <list>
+
+#include <os/Thread.h>
+
 #include <rfb/encodings.h>
 
+namespace os {
+  class Condition;
+  class Mutex;
+}
+
 namespace rdr { class MemOutStream; }
 
 namespace rfb {
@@ -37,10 +46,48 @@
     void decodeRect(const Rect& r, int encoding,
                     ModifiablePixelBuffer* pb);
 
+    void flush();
+
   private:
     CConnection *conn;
     Decoder *decoders[encodingMax+1];
-    rdr::MemOutStream *bufferStream;
+
+    struct QueueEntry {
+      bool active;
+      Rect rect;
+      int encoding;
+      Decoder* decoder;
+      const ConnParams* cp;
+      ModifiablePixelBuffer* pb;
+      rdr::MemOutStream* bufferStream;
+    };
+
+    std::list<rdr::MemOutStream*> freeBuffers;
+    std::list<QueueEntry*> workQueue;
+
+    os::Mutex* queueMutex;
+    os::Condition* producerCond;
+    os::Condition* consumerCond;
+
+  private:
+    class DecodeThread : public os::Thread {
+    public:
+      DecodeThread(DecodeManager* manager);
+      ~DecodeThread();
+
+      void stop();
+
+    protected:
+      void worker();
+      DecodeManager::QueueEntry* findEntry();
+
+    private:
+      DecodeManager* manager;
+
+      bool stopRequested;
+    };
+
+    std::list<DecodeThread*> threads;
   };
 }
 
diff --git a/common/rfb/Decoder.h b/common/rfb/Decoder.h
index 4195a80..7286b41 100644
--- a/common/rfb/Decoder.h
+++ b/common/rfb/Decoder.h
@@ -38,9 +38,15 @@
 
     // readRect() transfers data for the given rectangle from the
     // InStream to the OutStream, possibly changing it along the way to
-    // make it easier to decode.
+    // make it easier to decode. This function will always be called in
+    // a serial manner on the main thread.
     virtual void readRect(const Rect& r, rdr::InStream* is,
                           const ConnParams& cp, rdr::OutStream* os)=0;
+
+    // These functions will be called from any of the worker threads.
+    // A lock will be held whilst these are called so it is safe to
+    // read and update internal state as necessary.
+
     // decodeRect() decodes the given rectangle with data from the
     // given buffer, onto the ModifiablePixelBuffer. The PixelFormat of
     // the PixelBuffer might not match the ConnParams and it is up to
@@ -49,6 +55,7 @@
                             size_t buflen, const ConnParams& cp,
                             ModifiablePixelBuffer* pb)=0;
 
+  public:
     static bool supported(int encoding);
     static Decoder* createDecoder(int encoding);
   };