HLS: make disconnect faster to prevent ANR

disconnect HTTP connection when we absolutely won't resume

bug: 19890444
Change-Id: Idee36b48741f6f8eb1d65bca32156e9e18349c67
diff --git a/media/libmediaplayerservice/nuplayer/NuPlayerDecoder.cpp b/media/libmediaplayerservice/nuplayer/NuPlayerDecoder.cpp
index 65e80c3..acc9ef5 100644
--- a/media/libmediaplayerservice/nuplayer/NuPlayerDecoder.cpp
+++ b/media/libmediaplayerservice/nuplayer/NuPlayerDecoder.cpp
@@ -658,7 +658,7 @@
 #if 0
     int64_t mediaTimeUs;
     CHECK(accessUnit->meta()->findInt64("timeUs", &mediaTimeUs));
-    ALOGV("[%s] feeding input buffer at media time %" PRId64,
+    ALOGV("[%s] feeding input buffer at media time %.3f",
          mIsAudio ? "audio" : "video",
          mediaTimeUs / 1E6);
 #endif
diff --git a/media/libstagefright/httplive/Android.mk b/media/libstagefright/httplive/Android.mk
index 2639deb..fc85835 100644
--- a/media/libstagefright/httplive/Android.mk
+++ b/media/libstagefright/httplive/Android.mk
@@ -3,6 +3,7 @@
 include $(CLEAR_VARS)
 
 LOCAL_SRC_FILES:=               \
+        HTTPDownloader.cpp      \
         LiveDataSource.cpp      \
         LiveSession.cpp         \
         M3UParser.cpp           \
diff --git a/media/libstagefright/httplive/HTTPDownloader.cpp b/media/libstagefright/httplive/HTTPDownloader.cpp
new file mode 100644
index 0000000..3b44bae
--- /dev/null
+++ b/media/libstagefright/httplive/HTTPDownloader.cpp
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2015 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//#define LOG_NDEBUG 0
+#define LOG_TAG "HTTPDownloader"
+#include <utils/Log.h>
+
+#include "HTTPDownloader.h"
+#include "M3UParser.h"
+
+#include <media/IMediaHTTPConnection.h>
+#include <media/IMediaHTTPService.h>
+#include <media/stagefright/foundation/ABuffer.h>
+#include <media/stagefright/foundation/ADebug.h>
+#include <media/stagefright/MediaHTTP.h>
+#include <media/stagefright/DataSource.h>
+#include <media/stagefright/FileSource.h>
+#include <openssl/aes.h>
+#include <openssl/md5.h>
+#include <utils/Mutex.h>
+
+namespace android {
+
+HTTPDownloader::HTTPDownloader(
+        const sp<IMediaHTTPService> &httpService,
+        const KeyedVector<String8, String8> &headers) :
+    mHTTPDataSource(new MediaHTTP(httpService->makeHTTPConnection())),
+    mExtraHeaders(headers),
+    mDisconnecting(false) {
+}
+
+void HTTPDownloader::reconnect() {
+    AutoMutex _l(mLock);
+    mDisconnecting = false;
+}
+
+void HTTPDownloader::disconnect() {
+    {
+        AutoMutex _l(mLock);
+        mDisconnecting = true;
+    }
+    mHTTPDataSource->disconnect();
+}
+
+bool HTTPDownloader::isDisconnecting() {
+    AutoMutex _l(mLock);
+    return mDisconnecting;
+}
+
+/*
+ * Illustration of parameters:
+ *
+ * 0      `range_offset`
+ * +------------+-------------------------------------------------------+--+--+
+ * |            |                                 | next block to fetch |  |  |
+ * |            | `source` handle => `out` buffer |                     |  |  |
+ * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
+ * |            |<----------- `range_length` / buffer capacity ----------->|  |
+ * |<------------------------------ file_size ------------------------------->|
+ *
+ * Special parameter values:
+ * - range_length == -1 means entire file
+ * - block_size == 0 means entire range
+ *
+ */
+ssize_t HTTPDownloader::fetchBlock(
+        const char *url, sp<ABuffer> *out,
+        int64_t range_offset, int64_t range_length,
+        uint32_t block_size, /* download block size */
+        String8 *actualUrl,
+        bool reconnect /* force connect HTTP when resuing source */) {
+    if (isDisconnecting()) {
+        return ERROR_NOT_CONNECTED;
+    }
+
+    off64_t size;
+
+    if (reconnect) {
+        if (!strncasecmp(url, "file://", 7)) {
+            mDataSource = new FileSource(url + 7);
+        } else if (strncasecmp(url, "http://", 7)
+                && strncasecmp(url, "https://", 8)) {
+            return ERROR_UNSUPPORTED;
+        } else {
+            KeyedVector<String8, String8> headers = mExtraHeaders;
+            if (range_offset > 0 || range_length >= 0) {
+                headers.add(
+                        String8("Range"),
+                        String8(
+                            AStringPrintf(
+                                "bytes=%lld-%s",
+                                range_offset,
+                                range_length < 0
+                                    ? "" : AStringPrintf("%lld",
+                                            range_offset + range_length - 1).c_str()).c_str()));
+            }
+
+            status_t err = mHTTPDataSource->connect(url, &headers);
+
+            if (isDisconnecting()) {
+                return ERROR_NOT_CONNECTED;
+            }
+
+            if (err != OK) {
+                return err;
+            }
+
+            mDataSource = mHTTPDataSource;
+        }
+    }
+
+    status_t getSizeErr = mDataSource->getSize(&size);
+
+    if (isDisconnecting()) {
+        return ERROR_NOT_CONNECTED;
+    }
+
+    if (getSizeErr != OK) {
+        size = 65536;
+    }
+
+    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
+    if (*out == NULL) {
+        buffer->setRange(0, 0);
+    }
+
+    ssize_t bytesRead = 0;
+    // adjust range_length if only reading partial block
+    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
+        range_length = buffer->size() + block_size;
+    }
+    for (;;) {
+        // Only resize when we don't know the size.
+        size_t bufferRemaining = buffer->capacity() - buffer->size();
+        if (bufferRemaining == 0 && getSizeErr != OK) {
+            size_t bufferIncrement = buffer->size() / 2;
+            if (bufferIncrement < 32768) {
+                bufferIncrement = 32768;
+            }
+            bufferRemaining = bufferIncrement;
+
+            ALOGV("increasing download buffer to %zu bytes",
+                 buffer->size() + bufferRemaining);
+
+            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
+            memcpy(copy->data(), buffer->data(), buffer->size());
+            copy->setRange(0, buffer->size());
+
+            buffer = copy;
+        }
+
+        size_t maxBytesToRead = bufferRemaining;
+        if (range_length >= 0) {
+            int64_t bytesLeftInRange = range_length - buffer->size();
+            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
+                maxBytesToRead = bytesLeftInRange;
+
+                if (bytesLeftInRange == 0) {
+                    break;
+                }
+            }
+        }
+
+        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
+        // to help us break out of the loop.
+        ssize_t n = mDataSource->readAt(
+                buffer->size(), buffer->data() + buffer->size(),
+                maxBytesToRead);
+
+        if (isDisconnecting()) {
+            return ERROR_NOT_CONNECTED;
+        }
+
+        if (n < 0) {
+            return n;
+        }
+
+        if (n == 0) {
+            break;
+        }
+
+        buffer->setRange(0, buffer->size() + (size_t)n);
+        bytesRead += n;
+    }
+
+    *out = buffer;
+    if (actualUrl != NULL) {
+        *actualUrl = mDataSource->getUri();
+        if (actualUrl->isEmpty()) {
+            *actualUrl = url;
+        }
+    }
+
+    return bytesRead;
+}
+
+ssize_t HTTPDownloader::fetchFile(
+        const char *url, sp<ABuffer> *out, String8 *actualUrl) {
+    ssize_t err = fetchBlock(url, out, 0, -1, 0, actualUrl, true /* reconnect */);
+
+    // close off the connection after use
+    mHTTPDataSource->disconnect();
+
+    return err;
+}
+
+sp<M3UParser> HTTPDownloader::fetchPlaylist(
+        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
+    ALOGV("fetchPlaylist '%s'", url);
+
+    *unchanged = false;
+
+    sp<ABuffer> buffer;
+    String8 actualUrl;
+    ssize_t err = fetchFile(url, &buffer, &actualUrl);
+
+    // close off the connection after use
+    mHTTPDataSource->disconnect();
+
+    if (err <= 0) {
+        return NULL;
+    }
+
+    // MD5 functionality is not available on the simulator, treat all
+    // playlists as changed.
+
+#if defined(HAVE_ANDROID_OS)
+    uint8_t hash[16];
+
+    MD5_CTX m;
+    MD5_Init(&m);
+    MD5_Update(&m, buffer->data(), buffer->size());
+
+    MD5_Final(hash, &m);
+
+    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
+        // playlist unchanged
+        *unchanged = true;
+
+        return NULL;
+    }
+
+    if (curPlaylistHash != NULL) {
+        memcpy(curPlaylistHash, hash, sizeof(hash));
+    }
+#endif
+
+    sp<M3UParser> playlist =
+        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
+
+    if (playlist->initCheck() != OK) {
+        ALOGE("failed to parse .m3u8 playlist");
+
+        return NULL;
+    }
+
+    return playlist;
+}
+
+}  // namespace android
diff --git a/media/libstagefright/httplive/HTTPDownloader.h b/media/libstagefright/httplive/HTTPDownloader.h
new file mode 100644
index 0000000..1db4a48
--- /dev/null
+++ b/media/libstagefright/httplive/HTTPDownloader.h
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2015 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HTTP_DOWNLOADER_H_
+
+#define HTTP_DOWNLOADER_H_
+
+#include <media/stagefright/foundation/ADebug.h>
+#include <utils/KeyedVector.h>
+#include <utils/Mutex.h>
+#include <utils/RefBase.h>
+
+namespace android {
+
+struct ABuffer;
+class DataSource;
+struct HTTPBase;
+struct IMediaHTTPService;
+struct M3UParser;
+
+struct HTTPDownloader : public RefBase {
+    HTTPDownloader(
+            const sp<IMediaHTTPService> &httpService,
+            const KeyedVector<String8, String8> &headers);
+
+    void reconnect();
+    void disconnect();
+    bool isDisconnecting();
+    // If given a non-zero block_size (default 0), it is used to cap the number of
+    // bytes read in from the DataSource. If given a non-NULL buffer, new content
+    // is read into the end.
+    //
+    // The DataSource we read from is responsible for signaling error or EOF to help us
+    // break out of the read loop. The DataSource can be returned to the caller, so
+    // that the caller can reuse it for subsequent fetches (within the initially
+    // requested range).
+    //
+    // For reused HTTP sources, the caller must download a file sequentially without
+    // any overlaps or gaps to prevent reconnection.
+    ssize_t fetchBlock(
+            const char *url,
+            sp<ABuffer> *out,
+            int64_t range_offset, /* open file at range_offset */
+            int64_t range_length, /* open file for range_length (-1: entire file) */
+            uint32_t block_size,  /* download block size (0: entire range) */
+            String8 *actualUrl,   /* returns actual URL */
+            bool reconnect        /* force connect http */
+            );
+
+    // simplified version to fetch a single file
+    ssize_t fetchFile(
+            const char *url,
+            sp<ABuffer> *out,
+            String8 *actualUrl = NULL);
+
+    // fetch a playlist file
+    sp<M3UParser> fetchPlaylist(
+            const char *url, uint8_t *curPlaylistHash, bool *unchanged);
+
+private:
+    sp<HTTPBase> mHTTPDataSource;
+    sp<DataSource> mDataSource;
+    KeyedVector<String8, String8> mExtraHeaders;
+
+    Mutex mLock;
+    bool mDisconnecting;
+
+    DISALLOW_EVIL_CONSTRUCTORS(HTTPDownloader);
+};
+
+}  // namespace android
+
+#endif  // HTTP_DOWNLOADER_H_
diff --git a/media/libstagefright/httplive/LiveSession.cpp b/media/libstagefright/httplive/LiveSession.cpp
index 9ac764c..d8c38e7 100644
--- a/media/libstagefright/httplive/LiveSession.cpp
+++ b/media/libstagefright/httplive/LiveSession.cpp
@@ -19,25 +19,18 @@
 #include <utils/Log.h>
 
 #include "LiveSession.h"
-
+#include "HTTPDownloader.h"
 #include "M3UParser.h"
 #include "PlaylistFetcher.h"
 
-#include "include/HTTPBase.h"
 #include "mpeg2ts/AnotherPacketSource.h"
 
 #include <cutils/properties.h>
-#include <media/IMediaHTTPConnection.h>
 #include <media/IMediaHTTPService.h>
-#include <media/stagefright/foundation/hexdump.h>
 #include <media/stagefright/foundation/ABuffer.h>
 #include <media/stagefright/foundation/ADebug.h>
 #include <media/stagefright/foundation/AMessage.h>
 #include <media/stagefright/foundation/AUtils.h>
-#include <media/stagefright/DataSource.h>
-#include <media/stagefright/FileSource.h>
-#include <media/stagefright/MediaErrors.h>
-#include <media/stagefright/MediaHTTP.h>
 #include <media/stagefright/MediaDefs.h>
 #include <media/stagefright/MetaData.h>
 #include <media/stagefright/Utils.h>
@@ -46,8 +39,6 @@
 
 #include <ctype.h>
 #include <inttypes.h>
-#include <openssl/aes.h>
-#include <openssl/md5.h>
 
 namespace android {
 
@@ -257,7 +248,6 @@
       mInPreparationPhase(true),
       mPollBufferingGeneration(0),
       mPrevBufferPercentage(-1),
-      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
       mCurBandwidthIndex(-1),
       mOrigBandwidthIndex(-1),
       mLastBandwidthBps(-1ll),
@@ -317,7 +307,7 @@
 
     ssize_t streamIdx = typeToIndex(stream);
     if (streamIdx < 0) {
-        return INVALID_VALUE;
+        return BAD_VALUE;
     }
     const char *streamStr = getNameForStream(stream);
     // Do not let client pull data if we don't have data packets yet.
@@ -465,8 +455,8 @@
     return convertMetaDataToMessage(meta, format);
 }
 
-sp<HTTPBase> LiveSession::getHTTPDataSource() {
-    return new MediaHTTP(mHTTPService->makeHTTPConnection());
+sp<HTTPDownloader> LiveSession::getHTTPDownloader() {
+    return new HTTPDownloader(mHTTPService, mExtraHeaders);
 }
 
 void LiveSession::connectAsync(
@@ -838,6 +828,12 @@
                     break;
                 }
 
+                case PlaylistFetcher::kWhatPlaylistFetched:
+                {
+                    onMasterPlaylistFetched(msg);
+                    break;
+                }
+
                 case PlaylistFetcher::kWhatMetadataDetected:
                 {
                     if (!mHasMetadata) {
@@ -874,12 +870,6 @@
             break;
         }
 
-        case kWhatFinishDisconnect2:
-        {
-            onFinishDisconnect2();
-            break;
-        }
-
         case kWhatPollBuffering:
         {
             int32_t generation;
@@ -931,8 +921,10 @@
 }
 
 void LiveSession::onConnect(const sp<AMessage> &msg) {
-    AString url;
-    CHECK(msg->findString("url", &url));
+    CHECK(msg->findString("url", &mMasterURL));
+
+    // TODO currently we don't know if we are coming here from incognito mode
+    ALOGI("onConnect %s", uriDebugString(mMasterURL).c_str());
 
     KeyedVector<String8, String8> *headers = NULL;
     if (!msg->findPointer("headers", (void **)&headers)) {
@@ -944,21 +936,6 @@
         headers = NULL;
     }
 
-    // TODO currently we don't know if we are coming here from incognito mode
-    ALOGI("onConnect %s", uriDebugString(url).c_str());
-
-    mMasterURL = url;
-
-    bool dummy;
-    mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
-
-    if (mPlaylist == NULL) {
-        ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
-
-        postPrepared(ERROR_IO);
-        return;
-    }
-
     // create looper for fetchers
     if (mFetcherLooper == NULL) {
         mFetcherLooper = new ALooper();
@@ -967,6 +944,31 @@
         mFetcherLooper->start(false, false);
     }
 
+    // create fetcher to fetch the master playlist
+    addFetcher(mMasterURL.c_str())->fetchPlaylistAsync();
+}
+
+void LiveSession::onMasterPlaylistFetched(const sp<AMessage> &msg) {
+    AString uri;
+    CHECK(msg->findString("uri", &uri));
+    ssize_t index = mFetcherInfos.indexOfKey(uri);
+    if (index < 0) {
+        ALOGW("fetcher for master playlist is gone.");
+        return;
+    }
+
+    // no longer useful, remove
+    mFetcherLooper->unregisterHandler(mFetcherInfos[index].mFetcher->id());
+    mFetcherInfos.removeItemsAt(index);
+
+    CHECK(msg->findObject("playlist", (sp<RefBase> *)&mPlaylist));
+    if (mPlaylist == NULL) {
+        ALOGE("unable to fetch master playlist %s.",
+                uriDebugString(mMasterURL).c_str());
+
+        postPrepared(ERROR_IO);
+        return;
+    }
     // We trust the content provider to make a reasonable choice of preferred
     // initial bandwidth by listing it first in the variant playlist.
     // At startup we really don't have a good estimate on the available
@@ -1050,22 +1052,26 @@
     // cancel buffer polling
     cancelPollBuffering();
 
+    // TRICKY: don't wait for all fetcher to be stopped when disconnecting
+    //
+    // Some fetchers might be stuck in connect/getSize at this point. These
+    // operations will eventually timeout (as we have a timeout set in
+    // MediaHTTPConnection), but we don't want to block the main UI thread
+    // until then. Here we just need to make sure we clear all references
+    // to the fetchers, so that when they finally exit from the blocking
+    // operation, they can be destructed.
+    //
+    // There is one very tricky point though. For this scheme to work, the
+    // fecther must hold a reference to LiveSession, so that LiveSession is
+    // destroyed after fetcher. Otherwise LiveSession would get stuck in its
+    // own destructor when it waits for mFetcherLooper to stop, which still
+    // blocks main UI thread.
     for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
         mFetcherInfos.valueAt(i).mFetcher->stopAsync();
+        mFetcherLooper->unregisterHandler(
+                mFetcherInfos.valueAt(i).mFetcher->id());
     }
-
-    sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, this);
-
-    mContinuationCounter = mFetcherInfos.size();
-    mContinuation = msg;
-
-    if (mContinuationCounter == 0) {
-        msg->post();
-    }
-}
-
-void LiveSession::onFinishDisconnect2() {
-    mContinuation.clear();
+    mFetcherInfos.clear();
 
     mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
     mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
@@ -1104,198 +1110,6 @@
     return info.mFetcher;
 }
 
-/*
- * Illustration of parameters:
- *
- * 0      `range_offset`
- * +------------+-------------------------------------------------------+--+--+
- * |            |                                 | next block to fetch |  |  |
- * |            | `source` handle => `out` buffer |                     |  |  |
- * | `url` file |<--------- buffer size --------->|<--- `block_size` -->|  |  |
- * |            |<----------- `range_length` / buffer capacity ----------->|  |
- * |<------------------------------ file_size ------------------------------->|
- *
- * Special parameter values:
- * - range_length == -1 means entire file
- * - block_size == 0 means entire range
- *
- */
-ssize_t LiveSession::fetchFile(
-        const char *url, sp<ABuffer> *out,
-        int64_t range_offset, int64_t range_length,
-        uint32_t block_size, /* download block size */
-        sp<DataSource> *source, /* to return and reuse source */
-        String8 *actualUrl,
-        bool forceConnectHTTP /* force connect HTTP when resuing source */) {
-    off64_t size;
-    sp<DataSource> temp_source;
-    if (source == NULL) {
-        source = &temp_source;
-    }
-
-    if (*source == NULL || forceConnectHTTP) {
-        if (!strncasecmp(url, "file://", 7)) {
-            *source = new FileSource(url + 7);
-        } else if (strncasecmp(url, "http://", 7)
-                && strncasecmp(url, "https://", 8)) {
-            return ERROR_UNSUPPORTED;
-        } else {
-            KeyedVector<String8, String8> headers = mExtraHeaders;
-            if (range_offset > 0 || range_length >= 0) {
-                headers.add(
-                        String8("Range"),
-                        String8(
-                            AStringPrintf(
-                                "bytes=%lld-%s",
-                                range_offset,
-                                range_length < 0
-                                    ? "" : AStringPrintf("%lld",
-                                            range_offset + range_length - 1).c_str()).c_str()));
-            }
-
-            HTTPBase* httpDataSource =
-                    (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get();
-            status_t err = httpDataSource->connect(url, &headers);
-
-            if (err != OK) {
-                return err;
-            }
-
-            if (*source == NULL) {
-                *source = mHTTPDataSource;
-            }
-        }
-    }
-
-    status_t getSizeErr = (*source)->getSize(&size);
-    if (getSizeErr != OK) {
-        size = 65536;
-    }
-
-    sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
-    if (*out == NULL) {
-        buffer->setRange(0, 0);
-    }
-
-    ssize_t bytesRead = 0;
-    // adjust range_length if only reading partial block
-    if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
-        range_length = buffer->size() + block_size;
-    }
-    for (;;) {
-        // Only resize when we don't know the size.
-        size_t bufferRemaining = buffer->capacity() - buffer->size();
-        if (bufferRemaining == 0 && getSizeErr != OK) {
-            size_t bufferIncrement = buffer->size() / 2;
-            if (bufferIncrement < 32768) {
-                bufferIncrement = 32768;
-            }
-            bufferRemaining = bufferIncrement;
-
-            ALOGV("increasing download buffer to %zu bytes",
-                 buffer->size() + bufferRemaining);
-
-            sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
-            memcpy(copy->data(), buffer->data(), buffer->size());
-            copy->setRange(0, buffer->size());
-
-            buffer = copy;
-        }
-
-        size_t maxBytesToRead = bufferRemaining;
-        if (range_length >= 0) {
-            int64_t bytesLeftInRange = range_length - buffer->size();
-            if (bytesLeftInRange < (int64_t)maxBytesToRead) {
-                maxBytesToRead = bytesLeftInRange;
-
-                if (bytesLeftInRange == 0) {
-                    break;
-                }
-            }
-        }
-
-        // The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
-        // to help us break out of the loop.
-        ssize_t n = (*source)->readAt(
-                buffer->size(), buffer->data() + buffer->size(),
-                maxBytesToRead);
-
-        if (n < 0) {
-            return n;
-        }
-
-        if (n == 0) {
-            break;
-        }
-
-        buffer->setRange(0, buffer->size() + (size_t)n);
-        bytesRead += n;
-    }
-
-    *out = buffer;
-    if (actualUrl != NULL) {
-        *actualUrl = (*source)->getUri();
-        if (actualUrl->isEmpty()) {
-            *actualUrl = url;
-        }
-    }
-
-    return bytesRead;
-}
-
-sp<M3UParser> LiveSession::fetchPlaylist(
-        const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
-    ALOGV("fetchPlaylist '%s'", url);
-
-    *unchanged = false;
-
-    sp<ABuffer> buffer;
-    String8 actualUrl;
-    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
-
-    // close off the connection after use
-    mHTTPDataSource->disconnect();
-
-    if (err <= 0) {
-        return NULL;
-    }
-
-    // MD5 functionality is not available on the simulator, treat all
-    // playlists as changed.
-
-#if defined(HAVE_ANDROID_OS)
-    uint8_t hash[16];
-
-    MD5_CTX m;
-    MD5_Init(&m);
-    MD5_Update(&m, buffer->data(), buffer->size());
-
-    MD5_Final(hash, &m);
-
-    if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
-        // playlist unchanged
-        *unchanged = true;
-
-        return NULL;
-    }
-
-    if (curPlaylistHash != NULL) {
-        memcpy(curPlaylistHash, hash, sizeof(hash));
-    }
-#endif
-
-    sp<M3UParser> playlist =
-        new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
-
-    if (playlist->initCheck() != OK) {
-        ALOGE("failed to parse .m3u8 playlist");
-
-        return NULL;
-    }
-
-    return playlist;
-}
-
 #if 0
 static double uniformRand() {
     return (double)rand() / RAND_MAX;
@@ -1690,9 +1504,11 @@
             fetcher->stopAsync();
         } else {
             float threshold = -1.0f; // always finish fetching by default
+            bool disconnect = false;
             if (timeUs >= 0ll) {
                 // seeking, no need to finish fetching
                 threshold = 0.0f;
+                disconnect = true;
             } else if (delayRemoval) {
                 // adapting, abort if remaining of current segment is over threshold
                 threshold = getAbortThreshold(
@@ -1701,7 +1517,7 @@
 
             ALOGV("pausing fetcher-%d, threshold=%.2f",
                     fetcher->getFetcherID(), threshold);
-            fetcher->pauseAsync(threshold);
+            fetcher->pauseAsync(threshold, disconnect);
         }
     }
 
diff --git a/media/libstagefright/httplive/LiveSession.h b/media/libstagefright/httplive/LiveSession.h
index 4e7ccac..21be413 100644
--- a/media/libstagefright/httplive/LiveSession.h
+++ b/media/libstagefright/httplive/LiveSession.h
@@ -37,6 +37,7 @@
 struct M3UParser;
 struct PlaylistFetcher;
 struct HLSTime;
+struct HTTPDownloader;
 
 struct LiveSession : public AHandler {
     enum Flags {
@@ -76,7 +77,7 @@
 
     status_t getStreamFormat(StreamType stream, sp<AMessage> *format);
 
-    sp<HTTPBase> getHTTPDataSource();
+    sp<HTTPDownloader> getHTTPDownloader();
 
     void connectAsync(
             const char *url,
@@ -127,7 +128,6 @@
         kWhatChangeConfiguration        = 'chC0',
         kWhatChangeConfiguration2       = 'chC2',
         kWhatChangeConfiguration3       = 'chC3',
-        kWhatFinishDisconnect2          = 'fin2',
         kWhatPollBuffering              = 'poll',
     };
 
@@ -191,7 +191,6 @@
     int32_t mPollBufferingGeneration;
     int32_t mPrevBufferPercentage;
 
-    sp<HTTPBase> mHTTPDataSource;
     KeyedVector<String8, String8> mExtraHeaders;
 
     AString mMasterURL;
@@ -253,34 +252,8 @@
     sp<PlaylistFetcher> addFetcher(const char *uri);
 
     void onConnect(const sp<AMessage> &msg);
+    void onMasterPlaylistFetched(const sp<AMessage> &msg);
     void onSeek(const sp<AMessage> &msg);
-    void onFinishDisconnect2();
-
-    // If given a non-zero block_size (default 0), it is used to cap the number of
-    // bytes read in from the DataSource. If given a non-NULL buffer, new content
-    // is read into the end.
-    //
-    // The DataSource we read from is responsible for signaling error or EOF to help us
-    // break out of the read loop. The DataSource can be returned to the caller, so
-    // that the caller can reuse it for subsequent fetches (within the initially
-    // requested range).
-    //
-    // For reused HTTP sources, the caller must download a file sequentially without
-    // any overlaps or gaps to prevent reconnection.
-    ssize_t fetchFile(
-            const char *url, sp<ABuffer> *out,
-            /* request/open a file starting at range_offset for range_length bytes */
-            int64_t range_offset = 0, int64_t range_length = -1,
-            /* download block size */
-            uint32_t block_size = 0,
-            /* reuse DataSource if doing partial fetch */
-            sp<DataSource> *source = NULL,
-            String8 *actualUrl = NULL,
-            /* force connect http even when resuing DataSource */
-            bool forceConnectHTTP = false);
-
-    sp<M3UParser> fetchPlaylist(
-            const char *url, uint8_t *curPlaylistHash, bool *unchanged);
 
     bool UriIsSameAsIndex( const AString &uri, int32_t index, bool newUri);
     sp<AnotherPacketSource> getPacketSourceForStreamIndex(size_t trackIndex, bool newUri);
diff --git a/media/libstagefright/httplive/PlaylistFetcher.cpp b/media/libstagefright/httplive/PlaylistFetcher.cpp
index 8350c1b..53087b6 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.cpp
+++ b/media/libstagefright/httplive/PlaylistFetcher.cpp
@@ -20,23 +20,16 @@
 #include <utils/misc.h>
 
 #include "PlaylistFetcher.h"
-
-#include "LiveDataSource.h"
+#include "HTTPDownloader.h"
 #include "LiveSession.h"
 #include "M3UParser.h"
-
 #include "include/avc_utils.h"
-#include "include/HTTPBase.h"
 #include "include/ID3.h"
 #include "mpeg2ts/AnotherPacketSource.h"
 
-#include <media/IStreamSource.h>
 #include <media/stagefright/foundation/ABitReader.h>
 #include <media/stagefright/foundation/ABuffer.h>
 #include <media/stagefright/foundation/ADebug.h>
-#include <media/stagefright/foundation/AUtils.h>
-#include <media/stagefright/foundation/hexdump.h>
-#include <media/stagefright/FileSource.h>
 #include <media/stagefright/MediaDefs.h>
 #include <media/stagefright/MetaData.h>
 #include <media/stagefright/Utils.h>
@@ -44,7 +37,6 @@
 #include <ctype.h>
 #include <inttypes.h>
 #include <openssl/aes.h>
-#include <openssl/md5.h>
 
 #define FLOGV(fmt, ...) ALOGV("[fetcher-%d] " fmt, mFetcherID, ##__VA_ARGS__)
 #define FSLOGV(stream, fmt, ...) ALOGV("[fetcher-%d] [%s] " fmt, mFetcherID, \
@@ -179,7 +171,7 @@
       mDownloadState(new DownloadState()),
       mHasMetadata(false) {
     memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
-    mHTTPDataSource = mSession->getHTTPDataSource();
+    mHTTPDownloader = mSession->getHTTPDownloader();
 }
 
 PlaylistFetcher::~PlaylistFetcher() {
@@ -338,9 +330,11 @@
     if (index >= 0) {
         key = mAESKeyForURI.valueAt(index);
     } else {
-        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);
+        ssize_t err = mHTTPDownloader->fetchFile(keyURI.c_str(), &key);
 
-        if (err < 0) {
+        if (err == ERROR_NOT_CONNECTED) {
+            return ERROR_NOT_CONNECTED;
+        } else if (err < 0) {
             ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
             return ERROR_IO;
         } else if (key->size() != 16) {
@@ -448,12 +442,32 @@
     ++mMonitorQueueGeneration;
 }
 
-void PlaylistFetcher::setStoppingThreshold(float thresholdRatio) {
-    AutoMutex _l(mThresholdLock);
-    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
-        return;
+void PlaylistFetcher::setStoppingThreshold(float thresholdRatio, bool disconnect) {
+    {
+        AutoMutex _l(mThresholdLock);
+        mThresholdRatio = thresholdRatio;
     }
-    mThresholdRatio = thresholdRatio;
+    if (disconnect) {
+        mHTTPDownloader->disconnect();
+    }
+}
+
+void PlaylistFetcher::resetStoppingThreshold(bool disconnect) {
+    {
+        AutoMutex _l(mThresholdLock);
+        mThresholdRatio = -1.0f;
+    }
+    if (disconnect) {
+        mHTTPDownloader->disconnect();
+    } else {
+        // allow reconnect
+        mHTTPDownloader->reconnect();
+    }
+}
+
+float PlaylistFetcher::getStoppingThreshold() {
+    AutoMutex _l(mThresholdLock);
+    return mThresholdRatio;
 }
 
 void PlaylistFetcher::startAsync(
@@ -497,15 +511,15 @@
     msg->post();
 }
 
-void PlaylistFetcher::pauseAsync(float thresholdRatio) {
-    if (thresholdRatio >= 0.0f) {
-        setStoppingThreshold(thresholdRatio);
-    }
+void PlaylistFetcher::pauseAsync(
+        float thresholdRatio, bool disconnect) {
+    setStoppingThreshold(thresholdRatio, disconnect);
+
     (new AMessage(kWhatPause, this))->post();
 }
 
 void PlaylistFetcher::stopAsync(bool clear) {
-    setStoppingThreshold(0.0f);
+    setStoppingThreshold(0.0f, true /* disconncect */);
 
     sp<AMessage> msg = new AMessage(kWhatStop, this);
     msg->setInt32("clear", clear);
@@ -520,6 +534,10 @@
     msg->post();
 }
 
+void PlaylistFetcher::fetchPlaylistAsync() {
+    (new AMessage(kWhatFetchPlaylist, this))->post();
+}
+
 void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {
     switch (msg->what()) {
         case kWhatStart:
@@ -557,6 +575,19 @@
             break;
         }
 
+        case kWhatFetchPlaylist:
+        {
+            bool unchanged;
+            sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
+                    mURI.c_str(), NULL /* curPlaylistHash */, &unchanged);
+
+            sp<AMessage> notify = mNotify->dup();
+            notify->setInt32("what", kWhatPlaylistFetched);
+            notify->setObject("playlist", playlist);
+            notify->post();
+            break;
+        }
+
         case kWhatMonitorQueue:
         case kWhatDownloadNext:
         {
@@ -676,7 +707,7 @@
     cancelMonitorQueue();
     mLastDiscontinuitySeq = mDiscontinuitySeq;
 
-    setStoppingThreshold(-1.0f);
+    resetStoppingThreshold(false /* disconnect */);
 }
 
 void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
@@ -691,14 +722,11 @@
         }
     }
 
-    // close off the connection after use
-    mHTTPDataSource->disconnect();
-
     mDownloadState->resetState();
     mPacketSources.clear();
     mStreamTypeMask = 0;
 
-    setStoppingThreshold(-1.0f);
+    resetStoppingThreshold(true /* disconnect */);
 }
 
 // Resume until we have reached the boundary timestamps listed in `msg`; when
@@ -815,7 +843,7 @@
 status_t PlaylistFetcher::refreshPlaylist() {
     if (delayUsToRefreshPlaylist() <= 0) {
         bool unchanged;
-        sp<M3UParser> playlist = mSession->fetchPlaylist(
+        sp<M3UParser> playlist = mHTTPDownloader->fetchPlaylist(
                 mURI.c_str(), mPlaylistHash, &unchanged);
 
         if (playlist == NULL) {
@@ -864,18 +892,12 @@
     }
 
     // Calculate threshold to abort current download
-    int64_t targetDurationUs = mPlaylist->getTargetDuration();
-    int64_t thresholdUs = -1;
-    {
-        AutoMutex _l(mThresholdLock);
-        thresholdUs = (mThresholdRatio < 0.0f) ?
-                -1ll : mThresholdRatio * targetDurationUs;
-    }
+    float thresholdRatio = getStoppingThreshold();
 
-    if (thresholdUs < 0) {
+    if (thresholdRatio < 0.0f) {
         // never abort
         return false;
-    } else if (thresholdUs == 0) {
+    } else if (thresholdRatio == 0.0f) {
         // immediately abort
         return true;
     }
@@ -905,6 +927,9 @@
     }
     lastEnqueueUs -= mSegmentFirstPTS;
 
+    int64_t targetDurationUs = mPlaylist->getTargetDuration();
+    int64_t thresholdUs = thresholdRatio * targetDurationUs;
+
     FLOGV("%spausing now, thresholdUs %lld, remaining %lld",
             targetDurationUs - lastEnqueueUs > thresholdUs ? "" : "not ",
             (long long)thresholdUs,
@@ -1101,7 +1126,9 @@
         junk->setRange(0, 16);
         status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
                 true /* first */);
-        if (err != OK) {
+        if (err == ERROR_NOT_CONNECTED) {
+            return false;
+        } else if (err != OK) {
             notifyError(err);
             return false;
         }
@@ -1202,12 +1229,21 @@
     bool shouldPause = false;
     ssize_t bytesRead;
     do {
-        sp<DataSource> source = mHTTPDataSource;
-
         int64_t startUs = ALooper::GetNowUs();
-        bytesRead = mSession->fetchFile(
+        bytesRead = mHTTPDownloader->fetchBlock(
                 uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize,
-                &source, NULL, connectHTTP);
+                NULL /* actualURL */, connectHTTP);
+        int64_t delayUs = ALooper::GetNowUs() - startUs;
+
+        if (bytesRead == ERROR_NOT_CONNECTED) {
+            return;
+        }
+        if (bytesRead < 0) {
+            status_t err = bytesRead;
+            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
+            notifyError(err);
+            return;
+        }
 
         // add sample for bandwidth estimation, excluding samples from subtitles (as
         // its too small), or during startup/resumeUntil (when we could have more than
@@ -1216,9 +1252,7 @@
                 && (mStreamTypeMask
                         & (LiveSession::STREAMTYPE_AUDIO
                         | LiveSession::STREAMTYPE_VIDEO))) {
-            int64_t delayUs = ALooper::GetNowUs() - startUs;
             mSession->addBandwidthMeasurement(bytesRead, delayUs);
-
             if (delayUs > 2000000ll) {
                 FLOGV("bytesRead %zd took %.2f seconds - abnormal bandwidth dip",
                         bytesRead, (double)delayUs / 1.0e6);
@@ -1227,13 +1261,6 @@
 
         connectHTTP = false;
 
-        if (bytesRead < 0) {
-            status_t err = bytesRead;
-            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
-            notifyError(err);
-            return;
-        }
-
         CHECK(buffer != NULL);
 
         size_t size = buffer->size();
diff --git a/media/libstagefright/httplive/PlaylistFetcher.h b/media/libstagefright/httplive/PlaylistFetcher.h
index 1f5e9b0..c8ca457 100644
--- a/media/libstagefright/httplive/PlaylistFetcher.h
+++ b/media/libstagefright/httplive/PlaylistFetcher.h
@@ -49,6 +49,7 @@
         kWhatPreparationFailed,
         kWhatStartedAt,
         kWhatStopReached,
+        kWhatPlaylistFetched,
         kWhatMetadataDetected,
     };
 
@@ -61,8 +62,6 @@
 
     int32_t getFetcherID() const;
 
-    sp<DataSource> getDataSource();
-
     void startAsync(
             const sp<AnotherPacketSource> &audioSource,
             const sp<AnotherPacketSource> &videoSource,
@@ -74,12 +73,14 @@
             int32_t startDiscontinuitySeq = -1,
             LiveSession::SeekMode seekMode = LiveSession::kSeekModeExactPosition);
 
-    void pauseAsync(float thresholdRatio);
+    void pauseAsync(float thresholdRatio, bool disconnect);
 
     void stopAsync(bool clear = true);
 
     void resumeUntilAsync(const sp<AMessage> &params);
 
+    void fetchPlaylistAsync();
+
     uint32_t getStreamTypeMask() const {
         return mStreamTypeMask;
     }
@@ -100,6 +101,7 @@
         kWhatMonitorQueue   = 'moni',
         kWhatResumeUntil    = 'rsme',
         kWhatDownloadNext   = 'dlnx',
+        kWhatFetchPlaylist  = 'flst'
     };
 
     struct DownloadState;
@@ -114,7 +116,7 @@
     sp<AMessage> mNotify;
     sp<AMessage> mStartTimeUsNotify;
 
-    sp<HTTPBase> mHTTPDataSource;
+    sp<HTTPDownloader> mHTTPDownloader;
     sp<LiveSession> mSession;
     AString mURI;
 
@@ -197,7 +199,9 @@
 
     void postMonitorQueue(int64_t delayUs = 0, int64_t minDelayUs = 0);
     void cancelMonitorQueue();
-    void setStoppingThreshold(float thresholdRatio);
+    void setStoppingThreshold(float thresholdRatio, bool disconnect);
+    void resetStoppingThreshold(bool disconnect);
+    float getStoppingThreshold();
     bool shouldPauseDownload();
 
     int64_t delayUsToRefreshPlaylist() const;