Change ANetworkSession implementation to optionally attach timestamps

to fragments of data to be transferred and to log statistics when data
is finally submitted to the POSIX layer.

Change-Id: Icbfcac203cdc5c9eac1634e84d34bb380b316a01
diff --git a/media/libstagefright/wifi-display/ANetworkSession.cpp b/media/libstagefright/wifi-display/ANetworkSession.cpp
index 23bb04e..df20ae2 100644
--- a/media/libstagefright/wifi-display/ANetworkSession.cpp
+++ b/media/libstagefright/wifi-display/ANetworkSession.cpp
@@ -81,7 +81,8 @@
     status_t readMore();
     status_t writeMore();
 
-    status_t sendRequest(const void *data, ssize_t size);
+    status_t sendRequest(
+            const void *data, ssize_t size, bool timeValid, int64_t timeUs);
 
     void setIsRTSPConnection(bool yesno);
 
@@ -89,6 +90,15 @@
     virtual ~Session();
 
 private:
+    enum {
+        FRAGMENT_FLAG_TIME_VALID = 1,
+    };
+    struct Fragment {
+        uint32_t mFlags;
+        int64_t mTimeUs;
+        sp<ABuffer> mBuffer;
+    };
+
     int32_t mSessionID;
     State mState;
     bool mIsRTSPConnection;
@@ -96,11 +106,7 @@
     sp<AMessage> mNotify;
     bool mSawReceiveFailure, mSawSendFailure;
 
-    // for TCP / stream data
-    AString mOutBuffer;
-
-    // for UDP / datagrams
-    List<sp<ABuffer> > mOutDatagrams;
+    List<Fragment> mOutFragments;
 
     AString mInBuffer;
 
@@ -109,6 +115,8 @@
     void notifyError(bool send, status_t err, const char *detail);
     void notify(NotificationReason reason);
 
+    void dumpFragmentStats(const Fragment &frag);
+
     DISALLOW_EVIL_CONSTRUCTORS(Session);
 };
 ////////////////////////////////////////////////////////////////////////////////
@@ -221,8 +229,8 @@
 bool ANetworkSession::Session::wantsToWrite() {
     return !mSawSendFailure
         && (mState == CONNECTING
-            || (mState == CONNECTED && !mOutBuffer.empty())
-            || (mState == DATAGRAM && !mOutDatagrams.empty()));
+            || (mState == CONNECTED && !mOutFragments.empty())
+            || (mState == DATAGRAM && !mOutFragments.empty()));
 }
 
 status_t ANetworkSession::Session::readMore() {
@@ -407,13 +415,41 @@
     return err;
 }
 
+void ANetworkSession::Session::dumpFragmentStats(const Fragment &frag) {
+#if 0
+    int64_t nowUs = ALooper::GetNowUs();
+    int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll;
+
+    static const int64_t kMinDelayMs = 0;
+    static const int64_t kMaxDelayMs = 300;
+
+    const char *kPattern = "########################################";
+    size_t kPatternSize = strlen(kPattern);
+
+    int n = (kPatternSize * (delayMs - kMinDelayMs))
+                / (kMaxDelayMs - kMinDelayMs);
+
+    if (n < 0) {
+        n = 0;
+    } else if ((size_t)n > kPatternSize) {
+        n = kPatternSize;
+    }
+
+    ALOGI("[%lld]: (%4lld ms) %s\n",
+          frag.mTimeUs / 1000,
+          delayMs,
+          kPattern + kPatternSize - n);
+#endif
+}
+
 status_t ANetworkSession::Session::writeMore() {
     if (mState == DATAGRAM) {
-        CHECK(!mOutDatagrams.empty());
+        CHECK(!mOutFragments.empty());
 
         status_t err;
         do {
-            const sp<ABuffer> &datagram = *mOutDatagrams.begin();
+            const Fragment &frag = *mOutFragments.begin();
+            const sp<ABuffer> &datagram = frag.mBuffer;
 
             uint8_t *data = datagram->data();
             if (data[0] == 0x80 && (data[1] & 0x7f) == 33) {
@@ -441,17 +477,21 @@
             err = OK;
 
             if (n > 0) {
-                mOutDatagrams.erase(mOutDatagrams.begin());
+                if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
+                    dumpFragmentStats(frag);
+                }
+
+                mOutFragments.erase(mOutFragments.begin());
             } else if (n < 0) {
                 err = -errno;
             } else if (n == 0) {
                 err = -ECONNRESET;
             }
-        } while (err == OK && !mOutDatagrams.empty());
+        } while (err == OK && !mOutFragments.empty());
 
         if (err == -EAGAIN) {
-            if (!mOutDatagrams.empty()) {
-                ALOGI("%d datagrams remain queued.", mOutDatagrams.size());
+            if (!mOutFragments.empty()) {
+                ALOGI("%d datagrams remain queued.", mOutFragments.size());
             }
             err = OK;
         }
@@ -484,23 +524,37 @@
     }
 
     CHECK_EQ(mState, CONNECTED);
-    CHECK(!mOutBuffer.empty());
+    CHECK(!mOutFragments.empty());
 
     ssize_t n;
-    do {
-        n = send(mSocket, mOutBuffer.c_str(), mOutBuffer.size(), 0);
-    } while (n < 0 && errno == EINTR);
+    while (!mOutFragments.empty()) {
+        const Fragment &frag = *mOutFragments.begin();
+
+        do {
+            n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0);
+        } while (n < 0 && errno == EINTR);
+
+        if (n <= 0) {
+            break;
+        }
+
+        frag.mBuffer->setRange(
+                frag.mBuffer->offset() + n, frag.mBuffer->size() - n);
+
+        if (frag.mBuffer->size() > 0) {
+            break;
+        }
+
+        if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
+            dumpFragmentStats(frag);
+        }
+
+        mOutFragments.erase(mOutFragments.begin());
+    }
 
     status_t err = OK;
 
-    if (n > 0) {
-#if 0
-        ALOGI("out:");
-        hexdump(mOutBuffer.c_str(), n);
-#endif
-
-        mOutBuffer.erase(0, n);
-    } else if (n < 0) {
+    if (n < 0) {
         err = -errno;
     } else if (n == 0) {
         err = -ECONNRESET;
@@ -537,32 +591,43 @@
     return err;
 }
 
-status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) {
+status_t ANetworkSession::Session::sendRequest(
+        const void *data, ssize_t size, bool timeValid, int64_t timeUs) {
     CHECK(mState == CONNECTED || mState == DATAGRAM);
 
-    if (mState == DATAGRAM) {
-        CHECK_GE(size, 0);
+    if (size < 0) {
+        size = strlen((const char *)data);
+    }
 
-        sp<ABuffer> datagram = new ABuffer(size);
-        memcpy(datagram->data(), data, size);
-
-        mOutDatagrams.push_back(datagram);
+    if (size == 0) {
         return OK;
     }
 
+    sp<ABuffer> buffer;
+
     if (mState == CONNECTED && !mIsRTSPConnection) {
         CHECK_LE(size, 65535);
 
-        uint8_t prefix[2];
-        prefix[0] = size >> 8;
-        prefix[1] = size & 0xff;
-
-        mOutBuffer.append((const char *)prefix, sizeof(prefix));
+        buffer = new ABuffer(size + 2);
+        buffer->data()[0] = size >> 8;
+        buffer->data()[1] = size & 0xff;
+        memcpy(buffer->data() + 2, data, size);
+    } else {
+        buffer = new ABuffer(size);
+        memcpy(buffer->data(), data, size);
     }
 
-    mOutBuffer.append(
-            (const char *)data,
-            (size >= 0) ? size : strlen((const char *)data));
+    Fragment frag;
+
+    frag.mFlags = 0;
+    if (timeValid) {
+        frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
+        frag.mTimeUs = timeUs;
+    }
+
+    frag.mBuffer = buffer;
+
+    mOutFragments.push_back(frag);
 
     return OK;
 }
@@ -985,7 +1050,8 @@
 }
 
 status_t ANetworkSession::sendRequest(
-        int32_t sessionID, const void *data, ssize_t size) {
+        int32_t sessionID, const void *data, ssize_t size,
+        bool timeValid, int64_t timeUs) {
     Mutex::Autolock autoLock(mLock);
 
     ssize_t index = mSessions.indexOfKey(sessionID);
@@ -996,7 +1062,7 @@
 
     const sp<Session> session = mSessions.valueAt(index);
 
-    status_t err = session->sendRequest(data, size);
+    status_t err = session->sendRequest(data, size, timeValid, timeUs);
 
     interrupt();
 
diff --git a/media/libstagefright/wifi-display/ANetworkSession.h b/media/libstagefright/wifi-display/ANetworkSession.h
index 0d7cbd6..7c62b29 100644
--- a/media/libstagefright/wifi-display/ANetworkSession.h
+++ b/media/libstagefright/wifi-display/ANetworkSession.h
@@ -74,7 +74,8 @@
     status_t destroySession(int32_t sessionID);
 
     status_t sendRequest(
-            int32_t sessionID, const void *data, ssize_t size = -1);
+            int32_t sessionID, const void *data, ssize_t size = -1,
+            bool timeValid = false, int64_t timeUs = -1ll);
 
     enum NotificationReason {
         kWhatError,
diff --git a/media/libstagefright/wifi-display/MediaSender.cpp b/media/libstagefright/wifi-display/MediaSender.cpp
index a41f81b..d13a92e 100644
--- a/media/libstagefright/wifi-display/MediaSender.cpp
+++ b/media/libstagefright/wifi-display/MediaSender.cpp
@@ -252,6 +252,10 @@
                     fwrite(tsPackets->data(), 1, tsPackets->size(), mLogFile);
                 }
 
+                int64_t timeUs;
+                CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
+                tsPackets->meta()->setInt64("timeUs", timeUs);
+
                 err = mTSSender->queueBuffer(
                         tsPackets,
                         33 /* packetType */,
diff --git a/media/libstagefright/wifi-display/rtp/RTPSender.cpp b/media/libstagefright/wifi-display/rtp/RTPSender.cpp
index 8cd712d..c8e265c 100644
--- a/media/libstagefright/wifi-display/rtp/RTPSender.cpp
+++ b/media/libstagefright/wifi-display/rtp/RTPSender.cpp
@@ -194,6 +194,9 @@
         const sp<ABuffer> &tsPackets, uint8_t packetType) {
     CHECK_EQ(0, tsPackets->size() % 188);
 
+    int64_t timeUs;
+    CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs));
+
     const size_t numTSPackets = tsPackets->size() / 188;
 
     size_t srcOffset = 0;
@@ -232,13 +235,19 @@
         memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188);
 
         udpPacket->setRange(0, 12 + numTSPackets * 188);
-        status_t err = sendRTPPacket(udpPacket, true /* storeInHistory */);
+
+        srcOffset += numTSPackets * 188;
+        bool isLastPacket = (srcOffset == tsPackets->size());
+
+        status_t err = sendRTPPacket(
+                udpPacket,
+                true /* storeInHistory */,
+                isLastPacket /* timeValid */,
+                timeUs);
 
         if (err != OK) {
             return err;
         }
-
-        srcOffset += numTSPackets * 188;
     }
 
     return OK;
@@ -395,11 +404,13 @@
 }
 
 status_t RTPSender::sendRTPPacket(
-        const sp<ABuffer> &buffer, bool storeInHistory) {
+        const sp<ABuffer> &buffer, bool storeInHistory,
+        bool timeValid, int64_t timeUs) {
     CHECK(mRTPConnected);
 
     status_t err = mNetSession->sendRequest(
-            mRTPSessionID, buffer->data(), buffer->size());
+            mRTPSessionID, buffer->data(), buffer->size(),
+            timeValid, timeUs);
 
     if (err != OK) {
         return err;
diff --git a/media/libstagefright/wifi-display/rtp/RTPSender.h b/media/libstagefright/wifi-display/rtp/RTPSender.h
index 83c6223..90b1796 100644
--- a/media/libstagefright/wifi-display/rtp/RTPSender.h
+++ b/media/libstagefright/wifi-display/rtp/RTPSender.h
@@ -94,7 +94,9 @@
     status_t queueTSPackets(const sp<ABuffer> &tsPackets, uint8_t packetType);
     status_t queueAVCBuffer(const sp<ABuffer> &accessUnit, uint8_t packetType);
 
-    status_t sendRTPPacket(const sp<ABuffer> &packet, bool storeInHistory);
+    status_t sendRTPPacket(
+            const sp<ABuffer> &packet, bool storeInHistory,
+            bool timeValid = false, int64_t timeUs = -1ll);
 
     void onNetNotify(bool isRTP, const sp<AMessage> &msg);
 
diff --git a/media/libstagefright/wifi-display/wfd.cpp b/media/libstagefright/wifi-display/wfd.cpp
index 3a7a6e2..4f7dcc8 100644
--- a/media/libstagefright/wifi-display/wfd.cpp
+++ b/media/libstagefright/wifi-display/wfd.cpp
@@ -200,6 +200,8 @@
     CHECK_EQ((status_t)OK, source->start(iface.c_str()));
 
     client->waitUntilDone();
+
+    source->stop();
 }
 
 }  // namespace android