incidentd can now handle multiple callers asking it for incident reports

Test: bit incident_test:* GtsIncidentManagerTestCases:*
Bug: 123543706
Change-Id: I9f671dd5d8b2ad139f952a23e575c2be16120459
diff --git a/cmds/incidentd/src/Reporter.cpp b/cmds/incidentd/src/Reporter.cpp
index 8f62da2..7a08dd6 100644
--- a/cmds/incidentd/src/Reporter.cpp
+++ b/cmds/incidentd/src/Reporter.cpp
@@ -18,12 +18,18 @@
 
 #include "Reporter.h"
 
+#include "incidentd_util.h"
 #include "Privacy.h"
+#include "PrivacyFilter.h"
+#include "proto_util.h"
 #include "report_directory.h"
 #include "section_list.h"
 
+#include <android-base/file.h>
 #include <android-base/properties.h>
 #include <android/os/DropBoxManager.h>
+#include <android/util/protobuf.h>
+#include <android/util/ProtoOutputStream.h>
 #include <private/android_filesystem_config.h>
 #include <utils/SystemClock.h>
 
@@ -33,308 +39,673 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 #include <string>
-
-/**
- * The directory where the incident reports are stored.
- */
-static const char* INCIDENT_DIRECTORY = "/data/misc/incidents/";
+#include <time.h>
 
 namespace android {
 namespace os {
 namespace incidentd {
 
+using namespace android::util;
+
+/**
+ * The field id of the metadata section from
+ *      frameworks/base/core/proto/android/os/incident.proto
+ */
+const int FIELD_ID_METADATA = 2;
+
+IncidentMetadata_Destination privacy_policy_to_dest(uint8_t privacyPolicy) {
+    switch (privacyPolicy) {
+        case PRIVACY_POLICY_AUTOMATIC:
+            return IncidentMetadata_Destination_AUTOMATIC;
+        case PRIVACY_POLICY_EXPLICIT:
+            return IncidentMetadata_Destination_EXPLICIT;
+        case PRIVACY_POLICY_LOCAL:
+            return IncidentMetadata_Destination_LOCAL;
+        default:
+            // Anything else reverts to automatic
+            return IncidentMetadata_Destination_AUTOMATIC;
+    }
+}
+
+void poo_make_metadata(IncidentMetadata* result, const IncidentMetadata& full,
+        int64_t reportId, int32_t privacyPolicy, const IncidentReportArgs& args) {
+    result->set_report_id(reportId);
+    result->set_dest(privacy_policy_to_dest(privacyPolicy));
+
+    size_t sectionCount = full.sections_size();
+    for (int sectionIndex = 0; sectionIndex < sectionCount; sectionIndex++) {
+        const IncidentMetadata::SectionStats& sectionStats = full.sections(sectionIndex);
+        if (args.containsSection(sectionStats.id())) {
+            *result->add_sections() = sectionStats;
+        }
+    }
+}
+
+// ARGS must have a containsSection(int) method
+template <typename ARGS> void make_metadata(IncidentMetadata* result, const IncidentMetadata& full,
+        int64_t reportId, int32_t privacyPolicy, ARGS args) {
+    result->set_report_id(reportId);
+    result->set_dest(privacy_policy_to_dest(privacyPolicy));
+
+    size_t sectionCount = full.sections_size();
+    for (int sectionIndex = 0; sectionIndex < sectionCount; sectionIndex++) {
+        const IncidentMetadata::SectionStats& sectionStats = full.sections(sectionIndex);
+        if (args->containsSection(sectionStats.id())) {
+            *result->add_sections() = sectionStats;
+        }
+    }
+}
+
+// ================================================================================
+class StreamingFilterFd : public FilterFd {
+public:
+    StreamingFilterFd(uint8_t privacyPolicy, int fd, const sp<ReportRequest>& request);
+
+    virtual void onWriteError(status_t err);
+
+private:
+    sp<ReportRequest> mRequest;
+};
+
+StreamingFilterFd::StreamingFilterFd(uint8_t privacyPolicy, int fd,
+            const sp<ReportRequest>& request)
+        :FilterFd(privacyPolicy, fd),
+         mRequest(request) {
+}
+
+void StreamingFilterFd::onWriteError(status_t err) {
+    mRequest->setStatus(err);
+}
+
+
+// ================================================================================
+class PersistedFilterFd : public FilterFd {
+public:
+    PersistedFilterFd(uint8_t privacyPolicy, int fd, const sp<ReportFile>& reportFile);
+
+    virtual void onWriteError(status_t err);
+
+private:
+    sp<ReportFile> mReportFile;
+};
+
+PersistedFilterFd::PersistedFilterFd(uint8_t privacyPolicy, int fd,
+            const sp<ReportFile>& reportFile)
+        :FilterFd(privacyPolicy, fd),
+         mReportFile(reportFile) {
+}
+
+void PersistedFilterFd::onWriteError(status_t err) {
+    mReportFile->setWriteError(err);
+}
+
+
 // ================================================================================
 ReportRequest::ReportRequest(const IncidentReportArgs& a,
-                             const sp<IIncidentReportStatusListener>& l, int f)
-    : args(a), listener(l), fd(f), err(NO_ERROR) {}
+                             const sp<IIncidentReportStatusListener>& listener, int fd)
+        :args(a),
+         mListener(listener),
+         mFd(fd),
+         mIsStreaming(fd >= 0),
+         mStatus(NO_ERROR) {
+}
 
 ReportRequest::~ReportRequest() {
-    if (fd >= 0) {
+    if (mIsStreaming && mFd >= 0) {
         // clean up the opened file descriptor
-        close(fd);
+        close(mFd);
     }
 }
 
-bool ReportRequest::ok() { return fd >= 0 && err == NO_ERROR; }
-
-// ================================================================================
-ReportRequestSet::ReportRequestSet()
-    : mRequests(), mSections(), mMainFd(-1), mMainDest(-1), mMetadata(), mSectionStats() {}
-
-ReportRequestSet::~ReportRequestSet() {}
-
-// TODO: dedup on exact same args and fd, report the status back to listener!
-void ReportRequestSet::add(const sp<ReportRequest>& request) {
-    mRequests.push_back(request);
-    mSections.merge(request->args);
-    mMetadata.set_request_size(mMetadata.request_size() + 1);
+bool ReportRequest::ok() {
+    return mFd >= 0 && mStatus == NO_ERROR;
 }
 
-void ReportRequestSet::setMainFd(int fd) {
-    mMainFd = fd;
-    mMetadata.set_use_dropbox(fd > 0);
-}
-
-void ReportRequestSet::setMainDest(int dest) {
-    mMainDest = dest;
-    PrivacySpec spec = PrivacySpec::new_spec(dest);
-    switch (spec.dest) {
-        case android::os::DEST_AUTOMATIC:
-            mMetadata.set_dest(IncidentMetadata_Destination_AUTOMATIC);
-            break;
-        case android::os::DEST_EXPLICIT:
-            mMetadata.set_dest(IncidentMetadata_Destination_EXPLICIT);
-            break;
-        case android::os::DEST_LOCAL:
-            mMetadata.set_dest(IncidentMetadata_Destination_LOCAL);
-            break;
+void ReportRequest::closeFd() {
+    if (mIsStreaming && mFd >= 0) {
+        close(mFd);
+        mFd = -1;
     }
 }
 
-bool ReportRequestSet::containsSection(int id) { return mSections.containsSection(id); }
-
-IncidentMetadata::SectionStats* ReportRequestSet::sectionStats(int id) {
-    if (mSectionStats.find(id) == mSectionStats.end()) {
-        IncidentMetadata::SectionStats stats;
-        stats.set_id(id);
-        mSectionStats[id] = stats;
-    }
-    return &mSectionStats[id];
-}
-
 // ================================================================================
-Reporter::Reporter() : Reporter(INCIDENT_DIRECTORY) { isTest = false; };
+ReportBatch::ReportBatch() {}
 
-Reporter::Reporter(const char* directory) : batch() {
-    char buf[100];
+ReportBatch::~ReportBatch() {}
 
-    mMaxSize = 30 * 1024 * 1024;  // incident reports can take up to 30MB on disk
-    mMaxCount = 100;
-
-    // string ends up with '/' is a directory
-    String8 dir = String8(directory);
-    if (directory[dir.size() - 1] != '/') dir += "/";
-    mIncidentDirectory = dir.string();
-
-    // There can't be two at the same time because it's on one thread.
-    mStartTime = time(NULL);
-    strftime(buf, sizeof(buf), "incident-%Y%m%d-%H%M%S", localtime(&mStartTime));
-    mFilename = mIncidentDirectory + buf;
+void ReportBatch::addPersistedReport(const IncidentReportArgs& args) {
+    ComponentName component(args.receiverPkg(), args.receiverCls());
+    map<ComponentName, sp<ReportRequest>>::iterator found = mPersistedRequests.find(component);
+    if (found == mPersistedRequests.end()) {
+        // not found
+        mPersistedRequests[component] = new ReportRequest(args, nullptr, -1);
+    } else {
+        // found
+        sp<ReportRequest> request = found->second;
+        request->args.merge(args);
+    }
 }
 
-Reporter::~Reporter() {}
+void ReportBatch::addStreamingReport(const IncidentReportArgs& args,
+        const sp<IIncidentReportStatusListener>& listener, int streamFd) {
+    mStreamingRequests.push_back(new ReportRequest(args, listener, streamFd));
+}
 
-Reporter::run_report_status_t Reporter::runReport(size_t* reportByteSize) {
+bool ReportBatch::empty() const {
+    return mPersistedRequests.size() == 0 && mStreamingRequests.size() == 0;
+}
+
+sp<ReportRequest> ReportBatch::getPersistedRequest(const ComponentName& component) {
+    map<ComponentName, sp<ReportRequest>>::iterator it = mPersistedRequests.find(component);
+    if (it != mPersistedRequests.find(component)) {
+        return it->second;
+    } else {
+        return nullptr;
+    }
+}
+
+void ReportBatch::forEachPersistedRequest(const function<void (const sp<ReportRequest>&)>& func) {
+    for (map<ComponentName, sp<ReportRequest>>::iterator it = mPersistedRequests.begin();
+            it != mPersistedRequests.end(); it++) {
+        func(it->second);
+    }
+}
+
+void ReportBatch::forEachStreamingRequest(const function<void (const sp<ReportRequest>&)>& func) {
+    for (vector<sp<ReportRequest>>::iterator request = mStreamingRequests.begin();
+            request != mStreamingRequests.end(); request++) {
+        func(*request);
+    }
+}
+
+void ReportBatch::forEachListener(
+        const function<void (const sp<IIncidentReportStatusListener>&)>& func) {
+    for (map<ComponentName, sp<ReportRequest>>::iterator it = mPersistedRequests.begin();
+            it != mPersistedRequests.end(); it++) {
+        sp<IIncidentReportStatusListener> listener = it->second->getListener();
+        if (listener != nullptr) {
+            func(listener);
+        }
+    }
+    for (vector<sp<ReportRequest>>::iterator request = mStreamingRequests.begin();
+            request != mStreamingRequests.end(); request++) {
+        sp<IIncidentReportStatusListener> listener = (*request)->getListener();
+        if (listener != nullptr) {
+            func(listener);
+        }
+    }
+}
+
+void ReportBatch::forEachListener(int sectionId,
+        const function<void (const sp<IIncidentReportStatusListener>&)>& func) {
+    for (map<ComponentName, sp<ReportRequest>>::iterator it = mPersistedRequests.begin();
+            it != mPersistedRequests.end(); it++) {
+        if (it->second->containsSection(sectionId)) {
+            sp<IIncidentReportStatusListener> listener = it->second->getListener();
+            if (listener != nullptr) {
+                func(listener);
+            }
+        }
+    }
+    for (vector<sp<ReportRequest>>::iterator request = mStreamingRequests.begin();
+            request != mStreamingRequests.end(); request++) {
+        if ((*request)->containsSection(sectionId)) {
+            sp<IIncidentReportStatusListener> listener = (*request)->getListener();
+            if (listener != nullptr) {
+                func(listener);
+            }
+        }
+    }
+}
+
+void ReportBatch::getCombinedPersistedArgs(IncidentReportArgs* result) {
+    for (map<ComponentName, sp<ReportRequest>>::iterator it = mPersistedRequests.begin();
+            it != mPersistedRequests.end(); it++) {
+        result->merge(it->second->args);
+    }
+}
+
+bool ReportBatch::containsSection(int sectionId) {
+    // We don't cache this, because in case of error, we remove requests
+    // from the batch, and this is easier than recomputing the set.
+    for (map<ComponentName, sp<ReportRequest>>::iterator it = mPersistedRequests.begin();
+            it != mPersistedRequests.end(); it++) {
+        if (it->second->containsSection(sectionId)) {
+            return true;
+        }
+    }
+    for (vector<sp<ReportRequest>>::iterator request = mStreamingRequests.begin();
+            request != mStreamingRequests.end(); request++) {
+        if ((*request)->containsSection(sectionId)) {
+            return true;
+        }
+    }
+    return false;
+}
+
+void ReportBatch::clearPersistedRequests() {
+    mPersistedRequests.clear();
+}
+
+void ReportBatch::getFailedRequests(vector<sp<ReportRequest>>* requests) {
+    for (map<ComponentName, sp<ReportRequest>>::iterator it = mPersistedRequests.begin();
+            it != mPersistedRequests.end(); it++) {
+        if (it->second->getStatus() != NO_ERROR) {
+            requests->push_back(it->second);
+        }
+    }
+    for (vector<sp<ReportRequest>>::iterator request = mStreamingRequests.begin();
+            request != mStreamingRequests.end(); request++) {
+        if ((*request)->getStatus() != NO_ERROR) {
+            requests->push_back(*request);
+        }
+    }
+}
+
+void ReportBatch::removeRequest(const sp<ReportRequest>& request) {
+    for (map<ComponentName, sp<ReportRequest>>::iterator it = mPersistedRequests.begin();
+            it != mPersistedRequests.end(); it++) {
+        if (it->second == request) {
+            mPersistedRequests.erase(it);
+            return;
+        }
+    }
+    for (vector<sp<ReportRequest>>::iterator it = mStreamingRequests.begin();
+            it != mStreamingRequests.end(); it++) {
+        if (*it == request) {
+            mStreamingRequests.erase(it);
+            return;
+        }
+    }
+}
+
+// ================================================================================
+ReportWriter::ReportWriter(const sp<ReportBatch>& batch)
+        :mBatch(batch),
+         mPersistedFile(),
+         mMaxPersistedPrivacyPolicy(PRIVACY_POLICY_UNSET) {
+}
+
+ReportWriter::~ReportWriter() {
+}
+
+void ReportWriter::setPersistedFile(sp<ReportFile> file) {
+    mPersistedFile = file;
+}
+
+void ReportWriter::setMaxPersistedPrivacyPolicy(uint8_t privacyPolicy) {
+    mMaxPersistedPrivacyPolicy = privacyPolicy;
+}
+
+void ReportWriter::startSection(int sectionId) {
+    mCurrentSectionId = sectionId;
+    mSectionStartTimeMs = uptimeMillis();
+
+    mSectionStatsCalledForSectionId = -1;
+    mDumpSizeBytes = 0;
+    mDumpDurationMs = 0;
+    mSectionTimedOut = false;
+    mSectionTruncated = false;
+    mSectionBufferSuccess = false;
+    mHadError = false;
+    mSectionErrors.clear();
+    
+}
+
+void ReportWriter::setSectionStats(const FdBuffer& buffer) {
+    mSectionStatsCalledForSectionId = mCurrentSectionId;
+    mDumpSizeBytes = buffer.size();
+    mDumpDurationMs = buffer.durationMs();
+    mSectionTimedOut = buffer.timedOut();
+    mSectionTruncated = buffer.truncated();
+    mSectionBufferSuccess = !buffer.timedOut() && !buffer.truncated();
+}
+
+void ReportWriter::endSection(IncidentMetadata::SectionStats* sectionMetadata) {
+    long endTime = uptimeMillis();
+
+    if (mSectionStatsCalledForSectionId != mCurrentSectionId) {
+        ALOGW("setSectionStats not called for section %d", mCurrentSectionId);
+    }
+
+    sectionMetadata->set_id(mCurrentSectionId);
+    sectionMetadata->set_success((!mHadError) && mSectionBufferSuccess);
+    sectionMetadata->set_report_size_bytes(mMaxSectionDataFilteredSize);
+    sectionMetadata->set_exec_duration_ms(endTime - mSectionStartTimeMs);
+    sectionMetadata->set_dump_size_bytes(mDumpSizeBytes);
+    sectionMetadata->set_dump_duration_ms(mDumpDurationMs);
+    sectionMetadata->set_timed_out(mSectionTimedOut);
+    sectionMetadata->set_is_truncated(mSectionTruncated);
+    sectionMetadata->set_error_msg(mSectionErrors);
+}
+
+void ReportWriter::warning(const Section* section, status_t err, const char* format, ...) {
+    va_list args;
+    va_start(args, format);
+    vflog(section, err, ANDROID_LOG_ERROR, "error", format, args);
+    va_end(args);
+}
+
+void ReportWriter::error(const Section* section, status_t err, const char* format, ...) {
+    va_list args;
+    va_start(args, format);
+    vflog(section, err, ANDROID_LOG_WARN, "warning", format, args);
+    va_end(args);
+}
+
+void ReportWriter::vflog(const Section* section, status_t err, int level, const char* levelText,
+        const char* format, va_list args) {
+    const char* prefixFormat = "%s in section %d (%d) '%s': ";
+    int prefixLen = snprintf(NULL, 0, prefixFormat, levelText, section->id,
+            err, strerror(-err));
+
+    va_list measureArgs;
+    va_copy(measureArgs, args);
+    int messageLen = vsnprintf(NULL, 0, format, args);
+    va_end(measureArgs);
+
+    char* line = (char*)malloc(prefixLen + messageLen + 1);
+    if (line == NULL) {
+        // All hope is lost, just give up.
+        return;
+    }
+
+    sprintf(line, prefixFormat, levelText, section->id, err, strerror(-err));
+
+    vsprintf(line + prefixLen, format, args);
+
+    __android_log_write(level, LOG_TAG, line);
+
+    if (mSectionErrors.length() == 0) {
+        mSectionErrors = line;
+    } else {
+        mSectionErrors += '\n';
+        mSectionErrors += line;
+    }
+
+    free(line);
+
+    if (level >= ANDROID_LOG_ERROR) {
+        mHadError = true;
+    }
+}
+
+// Reads data from FdBuffer and writes it to the requests file descriptor.
+status_t ReportWriter::writeSection(const FdBuffer& buffer) {
+    PrivacyFilter filter(mCurrentSectionId, get_privacy_of_section(mCurrentSectionId));
+
+    // Add the fd for the persisted requests
+    if (mPersistedFile != nullptr) {
+        filter.addFd(new PersistedFilterFd(mMaxPersistedPrivacyPolicy,
+                    mPersistedFile->getDataFileFd(), mPersistedFile));
+    }
+
+    // Add the fds for the streamed requests
+    mBatch->forEachStreamingRequest([&filter, this](const sp<ReportRequest>& request) {
+        if (request->ok() && request->args.containsSection(mCurrentSectionId)) {
+            filter.addFd(new StreamingFilterFd(request->args.getPrivacyPolicy(),
+                        request->getFd(), request));
+        }
+    });
+
+    return filter.writeData(buffer, PRIVACY_POLICY_LOCAL, &mMaxSectionDataFilteredSize);
+}
+
+
+// ================================================================================
+Reporter::Reporter(const sp<WorkDirectory>& workDirectory, const sp<ReportBatch>& batch)
+        :mWorkDirectory(workDirectory),
+         mWriter(batch),
+         mBatch(batch) {
+}
+
+Reporter::~Reporter() {
+}
+
+void Reporter::runReport(size_t* reportByteSize) {
     status_t err = NO_ERROR;
-    bool needMainFd = false;
-    int mainFd = -1;
-    int mainDest = -1;
-    int sectionCount = 0;
-    HeaderSection headers;
-    MetadataSection metadataSection;
+
+    IncidentMetadata metadata;
+    int persistedPrivacyPolicy = PRIVACY_POLICY_UNSET;
     std::string buildType = android::base::GetProperty("ro.build.type", "");
     const bool isUserdebugOrEng = buildType == "userdebug" || buildType == "eng";
 
-    // See if we need the main file
-    for (ReportRequestSet::iterator it = batch.begin(); it != batch.end(); it++) {
-        if ((*it)->fd < 0 && mainFd < 0) {
-            needMainFd = true;
-            mainDest = (*it)->args.dest();
-            break;
-        }
-    }
-    if (needMainFd) {
-        // Create the directory
-        if (!isTest) err = create_directory(mIncidentDirectory);
-        if (err != NO_ERROR) {
-            goto DONE;
-        }
-
-        // If there are too many files in the directory (for whatever reason),
-        // delete the oldest ones until it's under the limit. Doing this first
-        // does mean that we can go over, so the max size is not a hard limit.
-        if (!isTest) clean_directory(mIncidentDirectory, mMaxSize, mMaxCount);
-
-        // Open the file.
-        err = create_file(&mainFd);
-        if (err != NO_ERROR) {
-            goto DONE;
-        }
-
-        // Add to the set
-        batch.setMainFd(mainFd);
-        batch.setMainDest(mainDest);
-    }
+    (*reportByteSize) = 0;
 
     // Tell everyone that we're starting.
-    for (ReportRequestSet::iterator it = batch.begin(); it != batch.end(); it++) {
-        if ((*it)->listener != NULL) {
-            (*it)->listener->onReportStarted();
+    ALOGI("Starting incident report");
+    mBatch->forEachListener([](const auto& listener) { listener->onReportStarted(); });
+
+    if (mBatch->hasPersistedReports()) {
+        // Open a work file to contain the contents of all of the persisted reports.
+        // For this block, if we can't initialize the report file for some reason,
+        // then we will remove the persisted ReportRequests from the report, but
+        // continue with the streaming ones.
+        mPersistedFile = mWorkDirectory->createReportFile();
+        ALOGI("Report will be persisted: envelope: %s  data: %s",
+                mPersistedFile->getEnvelopeFileName().c_str(),
+                mPersistedFile->getDataFileName().c_str());
+
+        // Record all of the metadata to the persisted file's metadata file.
+        // It will be read from there and reconstructed as the actual reports
+        // are sent out.
+        if (mPersistedFile != nullptr) {
+            mBatch->forEachPersistedRequest([this, &persistedPrivacyPolicy](
+                        const sp<ReportRequest>& request) {
+                mPersistedFile->addReport(request->args);
+                if (request->args.getPrivacyPolicy() < persistedPrivacyPolicy) {
+                    persistedPrivacyPolicy = request->args.getPrivacyPolicy();
+                }
+            });
+            mPersistedFile->setMaxPersistedPrivacyPolicy(persistedPrivacyPolicy);
+            err = mPersistedFile->saveEnvelope();
+            if (err != NO_ERROR) {
+                mWorkDirectory->remove(mPersistedFile);
+                mPersistedFile = nullptr;
+            }
+            mWriter.setMaxPersistedPrivacyPolicy(persistedPrivacyPolicy);
+        }
+
+        if (mPersistedFile != nullptr) {
+            err = mPersistedFile->startWritingDataFile();
+            if (err != NO_ERROR) {
+                mWorkDirectory->remove(mPersistedFile);
+                mPersistedFile = nullptr;
+            }
+        }
+
+        if (mPersistedFile != nullptr) {
+            mWriter.setPersistedFile(mPersistedFile);
+        } else {
+            ALOGW("Error creating the persisted file, so clearing persisted reports.");
+            // If we couldn't open the file (permissions err, etc), then
+            // we still want to proceed with any streaming reports, but
+            // cancel all of the persisted ones.
+            mBatch->forEachPersistedRequest([](const sp<ReportRequest>& request) {
+                sp<IIncidentReportStatusListener> listener = request->getListener();
+                if (listener != nullptr) {
+                    listener->onReportFailed();
+                }
+            });
+            mBatch->clearPersistedRequests();
         }
     }
 
-    // Write the incident headers
-    headers.Execute(&batch);
+    // If we have a persisted ID, then we allow all the readers to see that.  There's
+    // enough in the data to allow for a join, and nothing in here that intrisincally
+    // could ever prevent that, so just give them the ID.  If we don't have that then we
+    // make and ID that's extremely likely to be unique, but clock resetting could allow
+    // it to be duplicate.
+    int64_t reportId;
+    if (mPersistedFile != nullptr) {
+        reportId = mPersistedFile->getTimestampNs();
+    } else {
+        struct timespec spec;
+        clock_gettime(CLOCK_REALTIME, &spec);
+        reportId = (spec.tv_sec) * 1000 + spec.tv_nsec;
+    }
+
+    // Write the incident report headers - each request gets its own headers.  It's different
+    // from the other top-level fields in IncidentReport that are the sections where the rest
+    // is all shared data (although with their own individual privacy filtering).
+    mBatch->forEachStreamingRequest([](const sp<ReportRequest>& request) {
+        const vector<vector<uint8_t>>& headers = request->args.headers();
+        for (vector<vector<uint8_t>>::const_iterator buf = headers.begin(); buf != headers.end();
+             buf++) {
+            // If there was an error now, there will be an error later and we will remove
+            // it from the list then.
+            write_header_section(request->getFd(), *buf);
+        }
+    });
+
+    // If writing to any of the headers failed, we don't want to keep processing
+    // sections for it.
+    cancel_and_remove_failed_requests();
 
     // For each of the report fields, see if we need it, and if so, execute the command
     // and report to those that care that we're doing it.
     for (const Section** section = SECTION_LIST; *section; section++) {
-        const int id = (*section)->id;
+        const int sectionId = (*section)->id;
+
+        // If this section is too private for user builds, skip it.
         if ((*section)->userdebugAndEngOnly && !isUserdebugOrEng) {
             VLOG("Skipping incident report section %d '%s' because it's limited to userdebug/eng",
-                  id, (*section)->name.string());
+                  sectionId, (*section)->name.string());
             continue;
         }
-        if (this->batch.containsSection(id)) {
-            VLOG("Taking incident report section %d '%s'", id, (*section)->name.string());
-            for (ReportRequestSet::iterator it = batch.begin(); it != batch.end(); it++) {
-                if ((*it)->listener != NULL && (*it)->args.containsSection(id)) {
-                    (*it)->listener->onReportSectionStatus(
-                            id, IIncidentReportStatusListener::STATUS_STARTING);
-                }
-            }
 
-            // Execute - go get the data and write it into the file descriptors.
-            IncidentMetadata::SectionStats* stats = batch.sectionStats(id);
-            int64_t startTime = uptimeMillis();
-            err = (*section)->Execute(&batch);
-            int64_t endTime = uptimeMillis();
-            stats->set_exec_duration_ms(endTime - startTime);
-            if (err != NO_ERROR) {
-                ALOGW("Incident section %s (%d) failed: %s. Stopping report.",
-                      (*section)->name.string(), id, strerror(-err));
-                // Execute() has already recorded this status. Only update if there's new failure.
-                stats->set_success(false);
-                goto DONE;
-            }
-            (*reportByteSize) += stats->report_size_bytes();
-
-            // Notify listener of starting
-            for (ReportRequestSet::iterator it = batch.begin(); it != batch.end(); it++) {
-                if ((*it)->listener != NULL && (*it)->args.containsSection(id)) {
-                    (*it)->listener->onReportSectionStatus(
-                            id, IIncidentReportStatusListener::STATUS_FINISHED);
-                }
-            }
-            VLOG("Finish incident report section %d '%s'", id, (*section)->name.string());
-            sectionCount++;
+        // If nobody wants this section, skip it.
+        if (!mBatch->containsSection(sectionId)) {
+            continue;
         }
+
+        ALOGD("Start incident report section %d '%s'", sectionId, (*section)->name.string());
+        IncidentMetadata::SectionStats* sectionMetadata = metadata.add_sections();
+
+        // Notify listener of starting
+        mBatch->forEachListener(sectionId, [sectionId](const auto& listener) {
+            listener->onReportSectionStatus(
+                    sectionId, IIncidentReportStatusListener::STATUS_STARTING);
+        });
+
+        // Go get the data and write it into the file descriptors.
+        mWriter.startSection(sectionId);
+        err = (*section)->Execute(&mWriter);
+        mWriter.endSection(sectionMetadata);
+
+        // Sections returning errors are fatal. Most errors should not be fatal.
+        if (err != NO_ERROR) {
+            mWriter.error((*section), err, "Section failed. Stopping report.");
+            goto DONE;
+        }
+
+        // The returned max data size is used for throttling too many incident reports.
+        (*reportByteSize) += sectionMetadata->report_size_bytes();
+
+        // For any requests that failed during this section, remove them now.  We do this
+        // before calling back about section finished, so listeners do not erroniously get the
+        // impression that the section succeeded.  But we do it here instead of inside
+        // writeSection so that the callback is done from a known context and not from the
+        // bowels of a section, where changing the batch could cause odd errors.
+        cancel_and_remove_failed_requests();
+
+        // Notify listener of finishing
+        mBatch->forEachListener(sectionId, [sectionId](const auto& listener) {
+                listener->onReportSectionStatus(
+                        sectionId, IIncidentReportStatusListener::STATUS_FINISHED);
+        });
+
+        ALOGD("Finish incident report section %d '%s'", sectionId, (*section)->name.string());
     }
 
 DONE:
-    ALOGD("Incident reporting took %d sections.", sectionCount);
-    // Reports the metdadata when taking the incident report.
-    if (!isTest) metadataSection.Execute(&batch);
+    // Finish up the persisted file.
+    if (mPersistedFile != nullptr) {
+        mPersistedFile->closeDataFile();
 
-    // Close the file.
-    if (mainFd >= 0) {
-        close(mainFd);
-    }
+        // Set the stored metadata
+        IncidentReportArgs combinedArgs;
+        mBatch->getCombinedPersistedArgs(&combinedArgs);
+        IncidentMetadata persistedMetadata;
+        make_metadata(&persistedMetadata, metadata, mPersistedFile->getTimestampNs(),
+                persistedPrivacyPolicy, &combinedArgs);
+        mPersistedFile->setMetadata(persistedMetadata);
 
-    // Tell everyone that we're done.
-    for (ReportRequestSet::iterator it = batch.begin(); it != batch.end(); it++) {
-        if ((*it)->listener != NULL) {
-            if (err == NO_ERROR) {
-                (*it)->listener->onReportFinished();
-            } else {
-                (*it)->listener->onReportFailed();
-            }
+        mPersistedFile->markCompleted();
+        err = mPersistedFile->saveEnvelope();
+        if (err != NO_ERROR) {
+            ALOGW("mPersistedFile->saveEnvelope returned %s. Won't send broadcast",
+                    strerror(-err));
+            // Abandon ship.
+            mWorkDirectory->remove(mPersistedFile);
         }
     }
 
-    // Put the report into dropbox.
-    if (needMainFd && err == NO_ERROR) {
-        sp<DropBoxManager> dropbox = new DropBoxManager();
-        Status status = dropbox->addFile(String16("incident"), mFilename, 0);
-        ALOGD("Incident report done. dropbox status=%s\n", status.toString8().string());
-        if (!status.isOk()) {
-            return REPORT_NEEDS_DROPBOX;
+    // Write the metadata to the streaming ones
+    mBatch->forEachStreamingRequest([reportId, &metadata](const sp<ReportRequest>& request) {
+        IncidentMetadata streamingMetadata;
+        make_metadata(&streamingMetadata, metadata, reportId,
+                request->args.getPrivacyPolicy(), request);
+        status_t nonFatalErr = write_section(request->getFd(), FIELD_ID_METADATA,
+                streamingMetadata);
+        if (nonFatalErr != NO_ERROR) {
+            ALOGW("Error writing the metadata to streaming incident report.  This is the last"
+                    " thing so we won't return an error: %s", strerror(nonFatalErr));
         }
+    });
 
-        // If the status was ok, delete the file. If not, leave it around until the next
-        // boot or the next checkin. If the directory gets too big older files will
-        // be rotated out.
-        if (!isTest) unlink(mFilename.c_str());
+    // Finish up the streaming ones.
+    mBatch->forEachStreamingRequest([](const sp<ReportRequest>& request) {
+        request->closeFd();
+    });
+
+    // Tell the listeners that we're done.
+    if (err == NO_ERROR) {
+        mBatch->forEachListener([](const auto& listener) {
+            listener->onReportFinished();
+        });
+    } else {
+        mBatch->forEachListener([](const auto& listener) {
+            listener->onReportFailed();
+        });
     }
 
-    return REPORT_FINISHED;
+    ALOGI("Done taking incident report err=%s", strerror(-err));
 }
 
-/**
- * Create our output file and set the access permissions to -rw-rw----
- */
-status_t Reporter::create_file(int* fd) {
-    const char* filename = mFilename.c_str();
-
-    *fd = open(filename, O_CREAT | O_TRUNC | O_RDWR | O_CLOEXEC, 0660);
-    if (*fd < 0) {
-        ALOGE("Couldn't open incident file: %s (%s)", filename, strerror(errno));
-        return -errno;
-    }
-
-    // Override umask. Not super critical. If it fails go on with life.
-    chmod(filename, 0660);
-
-    if (chown(filename, AID_INCIDENTD, AID_INCIDENTD)) {
-        ALOGE("Unable to change ownership of incident file %s: %s\n", filename, strerror(errno));
-        status_t err = -errno;
-        unlink(mFilename.c_str());
-        return err;
-    }
-
-    return NO_ERROR;
-}
-
-Reporter::run_report_status_t Reporter::upload_backlog() {
-    DIR* dir;
-    struct dirent* entry;
-    struct stat st;
-    status_t err;
-
-    ALOGD("Start uploading backlogs in %s", INCIDENT_DIRECTORY);
-    if ((err = create_directory(INCIDENT_DIRECTORY)) != NO_ERROR) {
-        ALOGE("directory doesn't exist: %s", strerror(-err));
-        return REPORT_FINISHED;
-    }
-
-    if ((dir = opendir(INCIDENT_DIRECTORY)) == NULL) {
-        ALOGE("Couldn't open incident directory: %s", INCIDENT_DIRECTORY);
-        return REPORT_NEEDS_DROPBOX;
-    }
-
-    sp<DropBoxManager> dropbox = new DropBoxManager();
-
-    // Enumerate, count and add up size
-    int count = 0;
-    while ((entry = readdir(dir)) != NULL) {
-        if (entry->d_name[0] == '.') {
-            continue;
+void Reporter::cancel_and_remove_failed_requests() {
+    // Handle a failure in the persisted file
+    if (mPersistedFile != nullptr) {
+        if (mPersistedFile->getWriteError() != NO_ERROR) {
+            ALOGW("Error writing to the persisted file (%s). Closing it and canceling.",
+                    strerror(-mPersistedFile->getWriteError()));
+            mBatch->forEachPersistedRequest([this](const sp<ReportRequest>& request) {
+                sp<IIncidentReportStatusListener> listener = request->getListener();
+                if (listener != nullptr) {
+                    listener->onReportFailed();
+                }
+                mBatch->removeRequest(request);
+            });
+            mWriter.setPersistedFile(nullptr);
+            mPersistedFile->closeDataFile();
+            mWorkDirectory->remove(mPersistedFile);
+            mPersistedFile = nullptr;
         }
-        String8 filename = String8(INCIDENT_DIRECTORY) + entry->d_name;
-        if (stat(filename.string(), &st) != 0) {
-            ALOGE("Unable to stat file %s", filename.string());
-            continue;
-        }
-        if (!S_ISREG(st.st_mode)) {
-            continue;
-        }
-
-        Status status = dropbox->addFile(String16("incident"), filename.string(), 0);
-        ALOGD("Incident report done. dropbox status=%s\n", status.toString8().string());
-        if (!status.isOk()) {
-            return REPORT_NEEDS_DROPBOX;
-        }
-
-        // If the status was ok, delete the file. If not, leave it around until the next
-        // boot or the next checkin. If the directory gets too big older files will
-        // be rotated out.
-        unlink(filename.string());
-        count++;
     }
-    ALOGD("Successfully uploaded %d files to Dropbox.", count);
-    closedir(dir);
 
-    return REPORT_FINISHED;
+    // Handle failures in the streaming files
+    vector<sp<ReportRequest>> failed;
+    mBatch->getFailedRequests(&failed);
+    for (sp<ReportRequest>& request: failed) {
+        ALOGW("Error writing to a request stream (%s). Closing it and canceling.",
+                strerror(-request->getStatus()));
+        sp<IIncidentReportStatusListener> listener = request->getListener();
+        if (listener != nullptr) {
+            listener->onReportFailed();
+        }
+        request->closeFd();  // Will only close the streaming ones.
+        mBatch->removeRequest(request);
+    }
 }
 
 }  // namespace incidentd