lshal: PipeRelay: use modern classes.

- Use android::base helpers
- Use std::thread instead of utils/Thread
- Use poll() instead of select()
- Use a separate fd trigger instead of an atomic_bool
  so that poll() finishes sooner.
- Also removes useless READ_TIMEOUT and error message
  for select() timeout and ~PipeRelay.

Test: lshal_test
Test: manually run lshal debug repeatedly to see if it
      is stuck or output is not complete

Bug: 182306776

Change-Id: Ie623bf1b979654a30b360819c9a787c88fd8d91d
diff --git a/cmds/lshal/Lshal.cpp b/cmds/lshal/Lshal.cpp
index bc99f4d..a5f98c2 100644
--- a/cmds/lshal/Lshal.cpp
+++ b/cmds/lshal/Lshal.cpp
@@ -142,12 +142,10 @@
         }
     }
 
-    PipeRelay relay(out, err, interfaceName, instanceName);
-
-    if (relay.initCheck() != OK) {
-        std::string msg = "PipeRelay::initCheck() FAILED w/ " + std::to_string(relay.initCheck());
-        err << msg << std::endl;
-        LOG(ERROR) << msg;
+    auto relay = PipeRelay::create(out, err, interfaceName + "/" + instanceName);
+    if (!relay.ok()) {
+        err << "Unable to create PipeRelay: " << relay.error() << std::endl;
+        LOG(ERROR) << "Unable to create PipeRelay: " << relay.error();
         return IO_ERROR;
     }
 
@@ -155,7 +153,7 @@
         native_handle_create(1 /* numFds */, 0 /* numInts */),
         native_handle_delete);
 
-    fdHandle->data[0] = relay.fd();
+    fdHandle->data[0] = relay.value()->fd().get();
 
     hardware::Return<void> ret = base->debug(fdHandle.get(), convert(options));
 
diff --git a/cmds/lshal/PipeRelay.cpp b/cmds/lshal/PipeRelay.cpp
index 4e97636..0c3fb96 100644
--- a/cmds/lshal/PipeRelay.cpp
+++ b/cmds/lshal/PipeRelay.cpp
@@ -16,143 +16,93 @@
 
 #include "PipeRelay.h"
 
-#include <sys/select.h>
-#include <sys/time.h>
+#include <sys/poll.h>
 #include <sys/types.h>
 #include <unistd.h>
 
-#include <atomic>
+#include <chrono>
+#include <optional>
 
-#include <utils/Thread.h>
+#include <android-base/unique_fd.h>
+
+using android::base::borrowed_fd;
+using android::base::Result;
+using android::base::unique_fd;
+using std::chrono_literals::operator""ms;
 
 namespace android {
 namespace lshal {
-
-static constexpr struct timeval READ_TIMEOUT { .tv_sec = 1, .tv_usec = 0 };
-
-static std::string getThreadName(std::string interfaceName, const std::string &instanceName) {
-    auto dot = interfaceName.rfind(".");
-    if (dot != std::string::npos) interfaceName = interfaceName.substr(dot + 1);
-    return "RelayThread_" + interfaceName + "_" + instanceName;
+Result<std::unique_ptr<PipeRelay>> PipeRelay::create(std::ostream& os,
+                                                     const NullableOStream<std::ostream>& err,
+                                                     const std::string& fqName) {
+    auto pipeRelay = std::unique_ptr<PipeRelay>(new PipeRelay());
+    unique_fd rfd;
+    if (!android::base::Pipe(&rfd, &pipeRelay->mWrite)) {
+        return android::base::ErrnoError() << "pipe()";
+    }
+    // Workaround for b/111997867: need a separate FD trigger because rfd can't receive POLLHUP
+    // when the write end is closed after the write end was sent through hwbinder.
+    unique_fd rfdTrigger;
+    if (!android::base::Pipe(&rfdTrigger, &pipeRelay->mWriteTrigger)) {
+        return android::base::ErrnoError() << "pipe() for trigger";
+    }
+    pipeRelay->mThread =
+            std::make_unique<std::thread>(&PipeRelay::thread, std::move(rfd), std::move(rfdTrigger),
+                                          &os, &err, fqName);
+    return pipeRelay;
 }
 
-struct PipeRelay::RelayThread : public Thread {
-    explicit RelayThread(int fd, std::ostream &os, const NullableOStream<std::ostream> &err,
-                         const std::string &fqName);
+void PipeRelay::thread(unique_fd rfd, unique_fd rfdTrigger, std::ostream* out,
+                       const NullableOStream<std::ostream>* err, std::string fqName) {
+    while (true) {
+        pollfd pfd[2];
+        pfd[0] = {.fd = rfd.get(), .events = POLLIN};
+        pfd[1] = {.fd = rfdTrigger.get(), .events = 0};
 
-    bool threadLoop() override;
-    void setFinished();
-
-private:
-    int mFd;
-    std::ostream &mOutStream;
-    NullableOStream<std::ostream> mErrStream;
-
-    // If we were to use requestExit() and exitPending() instead, threadLoop()
-    // may not run at all by the time ~PipeRelay is called (i.e. debug() has
-    // returned from HAL). By using our own flag, we ensure that select() and
-    // read() are executed until data are drained.
-    std::atomic_bool mFinished;
-
-    std::string mFqName;
-
-    DISALLOW_COPY_AND_ASSIGN(RelayThread);
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-PipeRelay::RelayThread::RelayThread(int fd, std::ostream &os,
-                                    const NullableOStream<std::ostream> &err,
-                                    const std::string &fqName)
-      : mFd(fd), mOutStream(os), mErrStream(err), mFinished(false), mFqName(fqName) {}
-
-bool PipeRelay::RelayThread::threadLoop() {
-    char buffer[1024];
-
-    fd_set set;
-    FD_ZERO(&set);
-    FD_SET(mFd, &set);
-
-    struct timeval timeout = READ_TIMEOUT;
-
-    int res = TEMP_FAILURE_RETRY(select(mFd + 1, &set, nullptr, nullptr, &timeout));
-    if (res < 0) {
-        mErrStream << "debug " << mFqName << ": select() failed";
-        return false;
-    }
-
-    if (res == 0 || !FD_ISSET(mFd, &set)) {
-        if (mFinished) {
-            mErrStream << "debug " << mFqName
-                       << ": timeout reading from pipe, output may be truncated.";
-            return false;
+        int pollRes = poll(pfd, arraysize(pfd), -1 /* infinite timeout */);
+        if (pollRes < 0) {
+            int savedErrno = errno;
+            (*err) << "debug " << fqName << ": poll() failed: " << strerror(savedErrno)
+                   << std::endl;
+            break;
         }
-        // timeout, but debug() has not returned, so wait for HAL to finish.
-        return true;
-    }
 
-    // FD_ISSET(mFd, &set) == true. Data available, start reading
-    ssize_t n = TEMP_FAILURE_RETRY(read(mFd, buffer, sizeof(buffer)));
-
-    if (n < 0) {
-        mErrStream << "debug " << mFqName << ": read() failed";
-    }
-
-    if (n <= 0) {
-        return false;
-    }
-
-    mOutStream.write(buffer, n);
-
-    return true;
-}
-
-void PipeRelay::RelayThread::setFinished() {
-    mFinished = true;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-PipeRelay::PipeRelay(std::ostream &os, const NullableOStream<std::ostream> &err,
-                     const std::string &interfaceName, const std::string &instanceName)
-      : mInitCheck(NO_INIT) {
-    int res = pipe(mFds);
-
-    if (res < 0) {
-        mInitCheck = -errno;
-        return;
-    }
-
-    mThread = new RelayThread(mFds[0], os, err, interfaceName + "/" + instanceName);
-    mInitCheck = mThread->run(getThreadName(interfaceName, instanceName).c_str());
-}
-
-void PipeRelay::CloseFd(int *fd) {
-    if (*fd >= 0) {
-        close(*fd);
-        *fd = -1;
+        if (pfd[0].revents & POLLIN) {
+            char buffer[1024];
+            ssize_t n = TEMP_FAILURE_RETRY(read(rfd.get(), buffer, sizeof(buffer)));
+            if (n < 0) {
+                int savedErrno = errno;
+                (*err) << "debug " << fqName << ": read() failed: " << strerror(savedErrno)
+                       << std::endl;
+                break;
+            }
+            if (n == 0) {
+                (*err) << "Warning: debug " << fqName << ": poll() indicates POLLIN but no data"
+                       << std::endl;
+                continue;
+            }
+            out->write(buffer, n);
+        }
+        if (pfd[0].revents & POLLHUP) {
+            break;
+        }
+        if (pfd[1].revents & POLLHUP) {
+            // ~PipeRelay is called on the main thread. |mWrite| has been flushed and closed.
+            // Ensure that our read end of the pipe doesn't have pending data, then exit.
+            if ((pfd[0].revents & POLLIN) == 0) {
+                break;
+            }
+        }
     }
 }
 
 PipeRelay::~PipeRelay() {
-    CloseFd(&mFds[1]);
-
-    if (mThread != nullptr) {
-        mThread->setFinished();
+    mWrite.reset();
+    mWriteTrigger.reset();
+    if (mThread != nullptr && mThread->joinable()) {
         mThread->join();
-        mThread.clear();
     }
-
-    CloseFd(&mFds[0]);
 }
 
-status_t PipeRelay::initCheck() const {
-    return mInitCheck;
-}
-
-int PipeRelay::fd() const {
-    return mFds[1];
-}
-
-}  // namespace lshal
-}  // namespace android
+} // namespace lshal
+} // namespace android
diff --git a/cmds/lshal/PipeRelay.h b/cmds/lshal/PipeRelay.h
index bd994b4..45ba982 100644
--- a/cmds/lshal/PipeRelay.h
+++ b/cmds/lshal/PipeRelay.h
@@ -16,42 +16,43 @@
 
 #pragma once
 
+#include <thread>
+
 #include <android-base/macros.h>
-#include <ostream>
+#include <android-base/result.h>
+#include <android-base/unique_fd.h>
 #include <utils/Errors.h>
 #include <utils/RefBase.h>
+#include <ostream>
 
 #include "NullableOStream.h"
 
 namespace android {
 namespace lshal {
 
-/* Creates an AF_UNIX socketpair and spawns a thread that relays any data
+/**
+ * Creates a pipe and spawns a thread that relays any data
  * written to the "write"-end of the pair to the specified output stream "os".
  */
 struct PipeRelay {
-    explicit PipeRelay(std::ostream& os,
-                       const NullableOStream<std::ostream>& err,
-                       const std::string& interfaceName,
-                       const std::string& instanceName);
+    static android::base::Result<std::unique_ptr<PipeRelay>> create(
+            std::ostream& os, const NullableOStream<std::ostream>& err, const std::string& fqName);
     ~PipeRelay();
 
-    status_t initCheck() const;
-
     // Returns the file descriptor corresponding to the "write"-end of the
     // connection.
-    int fd() const;
+    android::base::borrowed_fd fd() const { return mWrite; }
 
 private:
-    struct RelayThread;
-
-    status_t mInitCheck;
-    int mFds[2];
-    sp<RelayThread> mThread;
-
-    static void CloseFd(int *fd);
-
+    PipeRelay() = default;
     DISALLOW_COPY_AND_ASSIGN(PipeRelay);
+    static void thread(android::base::unique_fd rfd, android::base::unique_fd rfdTrigger,
+                       std::ostream* out, const NullableOStream<std::ostream>* err,
+                       std::string fqName);
+
+    android::base::unique_fd mWrite;
+    android::base::unique_fd mWriteTrigger;
+    std::unique_ptr<std::thread> mThread;
 };
 
 }  // namespace lshal