Improved congestion control handling
Refine the previous method by interpolating the values we need.
This reduces the effect of the problem that we cannot send enough
ping packets.
diff --git a/common/rfb/Congestion.cxx b/common/rfb/Congestion.cxx
index c4c4d96..94d78e3 100644
--- a/common/rfb/Congestion.cxx
+++ b/common/rfb/Congestion.cxx
@@ -24,12 +24,16 @@
* The basic principle is TCP Congestion Control (RFC 5618), with the
* addition of using the TCP Vegas algorithm. The reason we use Vegas
* is that we run on top of a reliable transport so we need a latency
- * based algorithm rather than a loss based one.
+ * based algorithm rather than a loss based one. There is also a lot of
+ * interpolation of values. This is because we have rather horrible
+ * granularity in our measurements.
*/
+#include <assert.h>
#include <sys/time.h>
#include <rfb/Congestion.h>
+#include <rfb/LogWriter.h>
#include <rfb/util.h>
// Debug output on what the congestion control is up to
@@ -49,158 +53,332 @@
// limit for now...
static const unsigned MAXIMUM_WINDOW = 4194304;
-struct Congestion::RTTInfo {
- struct timeval tv;
- int offset;
- unsigned inFlight;
-};
+static LogWriter vlog("Congestion");
Congestion::Congestion() :
+ lastPosition(0), extraBuffer(0),
baseRTT(-1), congWindow(INITIAL_WINDOW),
- ackedOffset(0), sentOffset(0),
- minRTT(-1), seenCongestion(false),
- congestionTimer(this)
+ measurements(0), minRTT(-1), minCongestedRTT(-1)
{
+ gettimeofday(&lastUpdate, NULL);
+ gettimeofday(&lastSent, NULL);
+ memset(&lastPong, 0, sizeof(lastPong));
+ gettimeofday(&lastPongArrival, NULL);
+ gettimeofday(&lastAdjustment, NULL);
}
Congestion::~Congestion()
{
}
+void Congestion::updatePosition(unsigned pos)
+{
+ struct timeval now;
+ unsigned delta, consumed;
-void Congestion::sentPing(int offset)
+ gettimeofday(&now, NULL);
+
+ delta = pos - lastPosition;
+ if ((delta > 0) || (extraBuffer > 0))
+ lastSent = now;
+
+ // Idle for too long?
+ // We use a very crude RTO calculation in order to keep things simple
+ // FIXME: should implement RFC 2861
+ if (msBetween(&lastSent, &now) > __rfbmax(baseRTT*2, 100)) {
+
+#ifdef CONGESTION_DEBUG
+ vlog.debug("Connection idle for %d ms, resetting congestion control",
+ msBetween(&lastSent, &now));
+#endif
+
+ // Close congestion window and redo wire latency measurement
+ congWindow = __rfbmin(INITIAL_WINDOW, congWindow);
+ baseRTT = -1;
+ measurements = 0;
+ gettimeofday(&lastAdjustment, NULL);
+ minRTT = minCongestedRTT = -1;
+ }
+
+ // Commonly we will be in a state of overbuffering. We need to
+ // estimate the extra delay that causes so we can separate it from
+ // the delay caused by an incorrect congestion window.
+ // (we cannot do this until we have a RTT measurement though)
+ if (baseRTT != (unsigned)-1) {
+ extraBuffer += delta;
+ consumed = msBetween(&lastUpdate, &now) * congWindow / baseRTT;
+ if (extraBuffer < consumed)
+ extraBuffer = 0;
+ else
+ extraBuffer -= consumed;
+ }
+
+ lastPosition = pos;
+ lastUpdate = now;
+}
+
+void Congestion::sentPing()
{
struct RTTInfo rttInfo;
- if (ackedOffset == 0)
- ackedOffset = offset;
-
memset(&rttInfo, 0, sizeof(struct RTTInfo));
gettimeofday(&rttInfo.tv, NULL);
- rttInfo.offset = offset;
- rttInfo.inFlight = rttInfo.offset - ackedOffset;
+ rttInfo.pos = lastPosition;
+ rttInfo.extra = getExtraBuffer();
+ rttInfo.congested = isCongested();
pings.push_back(rttInfo);
-
- sentOffset = offset;
-
- // Let some data flow before we adjust the settings
- if (!congestionTimer.isStarted())
- congestionTimer.start(__rfbmin(baseRTT * 2, 100));
}
void Congestion::gotPong()
{
+ struct timeval now;
struct RTTInfo rttInfo;
unsigned rtt, delay;
if (pings.empty())
return;
+ gettimeofday(&now, NULL);
+
rttInfo = pings.front();
pings.pop_front();
- rtt = msSince(&rttInfo.tv);
+ lastPong = rttInfo;
+ lastPongArrival = now;
+
+ rtt = msBetween(&rttInfo.tv, &now);
if (rtt < 1)
rtt = 1;
- ackedOffset = rttInfo.offset;
-
// Try to estimate wire latency by tracking lowest seen latency
if (rtt < baseRTT)
baseRTT = rtt;
- if (rttInfo.inFlight > congWindow) {
- seenCongestion = true;
+ // Pings sent before the last adjustment aren't interesting as they
+ // aren't a measurement of the current congestion window
+ if (isBefore(&rttInfo.tv, &lastAdjustment))
+ return;
- // Estimate added delay because of overtaxed buffers
- delay = (rttInfo.inFlight - congWindow) * baseRTT / congWindow;
+ // Estimate added delay because of overtaxed buffers (see above)
+ delay = rttInfo.extra * baseRTT / congWindow;
+ if (delay < rtt)
+ rtt -= delay;
+ else
+ rtt = 1;
- if (delay < rtt)
- rtt -= delay;
- else
- rtt = 1;
+ // A latency less than the wire latency means that we've
+ // understimated the congestion window. We can't really determine
+ // how much, so pretend that we got no buffer latency at all.
+ if (rtt < baseRTT)
+ rtt = baseRTT;
- // If we underestimate the congestion window, then we'll get a latency
- // that's less than the wire latency, which will confuse other portions
- // of the code.
- if (rtt < baseRTT)
- rtt = baseRTT;
- }
-
- // We only keep track of the minimum latency seen (for a given interval)
- // on the basis that we want to avoid continuous buffer issue, but don't
- // mind (or even approve of) bursts.
+ // Record the minimum seen delay (hopefully ignores jitter) and let
+ // the congestion control do its thing.
+ //
+ // Note: We are delay based rather than loss based, which means we
+ // need to look at pongs even if they weren't limited by the
+ // current window ("congested"). Otherwise we will fail to
+ // detect increasing congestion until the application exceeds
+ // the congestion window.
if (rtt < minRTT)
minRTT = rtt;
-}
-
-bool Congestion::isCongested(int offset, unsigned idleTime)
-{
- // Idle for too long? (and no data on the wire)
- //
- // FIXME: This should really just be one baseRTT, but we're getting
- // problems with triggering the idle timeout on each update.
- // Maybe we need to use a moving average for the wire latency
- // instead of baseRTT.
- if ((sentOffset == ackedOffset) && (idleTime > 2 * baseRTT)) {
-
-#ifdef CONGESTION_DEBUG
- if (congWindow > INITIAL_WINDOW)
- fprintf(stderr, "Reverting to initial window (%d KiB) after %d ms\n",
- INITIAL_WINDOW / 1024, sock->outStream().getIdleTime());
-#endif
-
- // Close congestion window and allow a transfer
- // FIXME: Reset baseRTT like Linux Vegas?
- congWindow = __rfbmin(INITIAL_WINDOW, congWindow);
-
- return false;
+ if (rttInfo.congested) {
+ if (rtt < minCongestedRTT)
+ minCongestedRTT = rtt;
}
- // FIXME: Should we compensate for non-update data?
- // (i.e. use sentOffset instead of offset)
- if ((offset - ackedOffset) < congWindow)
- return false;
+ measurements++;
+ updateCongestion();
+}
- // If we just have one outstanding "ping", that means the client has
- // started receiving our update. In order to not regress compared to
- // before we had congestion avoidance, we allow another update here.
- // This could further clog up the tubes, but congestion control isn't
- // really working properly right now anyway as the wire would otherwise
- // be idle for at least RTT/2.
- if (pings.size() == 1)
+bool Congestion::isCongested()
+{
+ if (getInFlight() < congWindow)
return false;
return true;
}
-bool Congestion::handleTimeout(Timer* t)
+int Congestion::getUncongestedETA()
+{
+ unsigned targetAcked;
+
+ const struct RTTInfo* prevPing;
+ unsigned eta, elapsed;
+ unsigned etaNext, delay;
+
+ std::list<struct RTTInfo>::const_iterator iter;
+
+ targetAcked = lastPosition - congWindow;
+
+ // Simple case?
+ if (lastPong.pos > targetAcked)
+ return 0;
+
+ // No measurements yet?
+ if (baseRTT == (unsigned)-1)
+ return -1;
+
+ prevPing = &lastPong;
+ eta = 0;
+ elapsed = msSince(&lastPongArrival);
+
+ // Walk the ping queue and figure out which one we are waiting for to
+ // get to an uncongested state
+
+ for (iter = pings.begin(); ;++iter) {
+ struct RTTInfo curPing;
+
+ // If we aren't waiting for a pong that will clear the congested
+ // state then we have to estimate the final bit by pretending that
+ // we had a ping just after the last position update.
+ if (iter == pings.end()) {
+ curPing.tv = lastUpdate;
+ curPing.pos = lastPosition;
+ curPing.extra = extraBuffer;
+ } else {
+ curPing = *iter;
+ }
+
+ etaNext = msBetween(&prevPing->tv, &curPing.tv);
+ // Compensate for buffering delays
+ delay = curPing.extra * baseRTT / congWindow;
+ etaNext += delay;
+ delay = prevPing->extra * baseRTT / congWindow;
+ if (delay >= etaNext)
+ etaNext = 0;
+ else
+ etaNext -= delay;
+
+ // Found it?
+ if (curPing.pos > targetAcked) {
+ eta += etaNext * (curPing.pos - targetAcked) / (curPing.pos - prevPing->pos);
+ if (elapsed > eta)
+ return 0;
+ else
+ return eta - elapsed;
+ }
+
+ assert(iter != pings.end());
+
+ eta += etaNext;
+ prevPing = &*iter;
+ }
+}
+
+unsigned Congestion::getExtraBuffer()
+{
+ unsigned elapsed;
+ unsigned consumed;
+
+ if (baseRTT == (unsigned)-1)
+ return 0;
+
+ elapsed = msSince(&lastUpdate);
+ consumed = elapsed * congWindow / baseRTT;
+
+ if (consumed >= extraBuffer)
+ return 0;
+ else
+ return extraBuffer - consumed;
+}
+
+unsigned Congestion::getInFlight()
+{
+ struct RTTInfo nextPong;
+ unsigned etaNext, delay, elapsed, acked;
+
+ // Simple case?
+ if (lastPosition == lastPong.pos)
+ return 0;
+
+ // No measurements yet?
+ if (baseRTT == (unsigned)-1) {
+ if (!pings.empty())
+ return lastPosition - pings.front().pos;
+ return 0;
+ }
+
+ // If we aren't waiting for any pong then we have to estimate things
+ // by pretending that we had a ping just after the last position
+ // update.
+ if (pings.empty()) {
+ nextPong.tv = lastUpdate;
+ nextPong.pos = lastPosition;
+ nextPong.extra = extraBuffer;
+ } else {
+ nextPong = pings.front();
+ }
+
+ // First we need to estimate how many bytes have made it through
+ // completely. Look at the next ping that should arrive and figure
+ // out how far behind it should be and interpolate the positions.
+
+ etaNext = msBetween(&lastPong.tv, &nextPong.tv);
+ // Compensate for buffering delays
+ delay = nextPong.extra * baseRTT / congWindow;
+ etaNext += delay;
+ delay = lastPong.extra * baseRTT / congWindow;
+ if (delay >= etaNext)
+ etaNext = 0;
+ else
+ etaNext -= delay;
+
+ elapsed = msSince(&lastPongArrival);
+
+ // The pong should be here any second. Be optimistic and assume
+ // we can already use its value.
+ if (etaNext <= elapsed)
+ acked = nextPong.pos;
+ else {
+ acked = lastPong.pos;
+ acked += (nextPong.pos - lastPong.pos) * elapsed / etaNext;
+ }
+
+ return lastPosition - acked;
+}
+
+void Congestion::updateCongestion()
{
unsigned diff;
- if (!seenCongestion)
- return false;
+ // We want at least three measurements to avoid noise
+ if (measurements < 3)
+ return;
+
+ assert(minRTT >= baseRTT);
+ assert(minCongestedRTT >= baseRTT);
// The goal is to have a slightly too large congestion window since
// a "perfect" one cannot be distinguished from a too small one. This
// translates to a goal of a few extra milliseconds of delay.
+ // First we check all pongs to make sure we're not having a too large
+ // congestion window.
diff = minRTT - baseRTT;
- if (diff > __rfbmin(100, baseRTT)) {
+ // FIXME: Should we do slow start?
+ if (diff > 100) {
// Way too fast
congWindow = congWindow * baseRTT / minRTT;
- } else if (diff > __rfbmin(50, baseRTT/2)) {
+ } else if (diff > 50) {
// Slightly too fast
congWindow -= 4096;
- } else if (diff < 5) {
- // Way too slow
- congWindow += 8192;
- } else if (diff < 25) {
- // Too slow
- congWindow += 4096;
+ } else {
+ // Secondly only the "congested" pongs are checked to see if the
+ // window is too small.
+
+ diff = minCongestedRTT - baseRTT;
+
+ if (diff < 5) {
+ // Way too slow
+ congWindow += 8192;
+ } else if (diff < 25) {
+ // Too slow
+ congWindow += 4096;
+ }
}
if (congWindow < MINIMUM_WINDOW)
@@ -209,14 +387,13 @@
congWindow = MAXIMUM_WINDOW;
#ifdef CONGESTION_DEBUG
- fprintf(stderr, "RTT: %d ms (%d ms), Window: %d KiB, Bandwidth: %g Mbps\n",
- minRTT, baseRTT, congWindow / 1024,
- congWindow * 8.0 / baseRTT / 1000.0);
+ vlog.debug("RTT: %d ms (%d ms), Window: %d KiB, Bandwidth: %g Mbps",
+ minRTT, baseRTT, congWindow / 1024,
+ congWindow * 8.0 / baseRTT / 1000.0);
#endif
- minRTT = -1;
- seenCongestion = false;
-
- return false;
+ measurements = 0;
+ gettimeofday(&lastAdjustment, NULL);
+ minRTT = minCongestedRTT = -1;
}