media.c2 aidl GraphicsTracker: Synchronize mDequeueable and pipe fds

Synchronize mDequeueable with the size of the internal pending buffer of
pipe fds.

Bug: 254050314
Change-Id: Ic2f3fc0464531b26ef9476869f3923cd35b31fee
diff --git a/media/codec2/hal/client/GraphicsTracker.cpp b/media/codec2/hal/client/GraphicsTracker.cpp
index 1c9c04c..ca2f6e6 100644
--- a/media/codec2/hal/client/GraphicsTracker.cpp
+++ b/media/codec2/hal/client/GraphicsTracker.cpp
@@ -192,20 +192,15 @@
 
     mReadPipeFd.reset(pipefd[0]);
     mWritePipeFd.reset(pipefd[1]);
-    mIncDequeueable = 0;
 
-    mEventQueueThread = std::thread([this](){processEvent();});
-    writeIncDequeueable(mDequeueable);
+    // ctor does not require lock to be held.
+    writeIncDequeueableLocked(mDequeueable);
 
     CHECK(ret >= 0);
-    CHECK(mEventQueueThread.joinable());
 }
 
 GraphicsTracker::~GraphicsTracker() {
     stop();
-    if (mEventQueueThread.joinable()) {
-        mEventQueueThread.join();
-    }
 }
 
 bool GraphicsTracker::adjustDequeueConfLocked(bool *updateDequeue) {
@@ -238,6 +233,7 @@
 
 c2_status_t GraphicsTracker::configureGraphics(
         const sp<IGraphicBufferProducer>& igbp, uint32_t generation) {
+    // TODO: wait until operations to previous IGBP is completed.
     std::shared_ptr<BufferCache> prevCache;
     int prevDequeueCommitted;
 
@@ -363,8 +359,7 @@
             int delta = mMaxDequeueCommitted - oldMaxDequeue;
             if (delta > 0) {
                 mDequeueable += delta;
-                l.unlock();
-                writeIncDequeueable(delta);
+                writeIncDequeueableLocked(delta);
             }
         }
     }
@@ -446,43 +441,42 @@
 }
 
 void GraphicsTracker::stop() {
-    bool expected = false;
-    std::unique_lock<std::mutex> l(mEventLock);
-    bool updated = mStopped.compare_exchange_strong(expected, true);
-    if (updated) {
-        // TODO: synchronize close and other i/o.
-        int writeFd = mWritePipeFd.release();
+   // TODO: wait until all operation to current IGBP
+   // being completed.
+    std::unique_lock<std::mutex> l(mLock);
+    if (mStopped) {
+        return;
+    }
+    mStopped = true;
+    int writeFd = mWritePipeFd.release();
+    if (writeFd >= 0) {
         ::close(writeFd);
-        int readFd = mReadPipeFd.release();
-        ::close(readFd);
-        mEventCv.notify_one();
     }
 }
 
-void GraphicsTracker::writeIncDequeueable(int inc) {
+void GraphicsTracker::writeIncDequeueableLocked(int inc) {
     CHECK(inc > 0 && inc < kMaxDequeueMax);
     thread_local char buf[kMaxDequeueMax];
-    int diff = 0;
-    {
-        std::unique_lock<std::mutex> l(mEventLock);
-        if (mStopped) {
-            return;
-        }
-        CHECK(mWritePipeFd.get() >= 0);
-        int ret = ::write(mWritePipeFd.get(), buf, inc);
-        if (ret == inc) {
-            return;
-        }
-        diff = ret < 0 ? inc : inc - ret;
-
-        // Partial write or EINTR. This will not happen in a real scenario.
-        mIncDequeueable += diff;
-        if (mIncDequeueable > 0) {
-            l.unlock();
-            mEventCv.notify_one();
-            ALOGW("updating dequeueable to pipefd pending");
-        }
+    if (mStopped) { // reading end closed;
+        return;
     }
+    int writeFd = mWritePipeFd.get();
+    if (writeFd < 0) {
+        // initialization fail and not valid though.
+        return;
+    }
+    int ret = ::write(writeFd, buf, inc);
+    // Since this is non-blocking i/o, it never returns EINTR.
+    //
+    // ::write() to pipe guarantee to succeed atomically if it writes less than
+    // the given PIPE_BUF. And the buffer size in pipe/fifo is at least 4K and our total
+    // max pending buffer size is 64. So it never returns EAGAIN here either.
+    // See pipe(7) for further information.
+    //
+    // Other errors are serious errors and we cannot synchronize mDequeueable to
+    // length of pending buffer in pipe/fifo anymore. So better to abort here.
+    // TODO: do not abort here. (b/318717399)
+    CHECK(ret == inc);
 }
 
 void GraphicsTracker::drainDequeueableLocked(int dec) {
@@ -491,43 +485,14 @@
     if (mStopped) {
         return;
     }
-    CHECK(mReadPipeFd.get() >= 0);
-    int ret = ::read(mReadPipeFd.get(), buf, dec);
-    if (ret < 0 && errno == EINTR) {
-        // signal cancel try again.
-        ret = ::read(mReadPipeFd.get(), buf, dec);
+    int readFd = mReadPipeFd.get();
+    if (readFd < 0) {
+        // initializationf fail and not valid though.
+        return;
     }
-    CHECK(mStopped || ret == dec);
-    // TODO: remove CHECK
-    // if ret < dec, drain the amount from EventThread.
-}
-
-void GraphicsTracker::processEvent() {
-    // This is for partial/failed writes to the writing end.
-    // This may not happen in the real scenario.
-    thread_local char buf[kMaxDequeueMax];
-    while (true) {
-        std::unique_lock<std::mutex> l(mEventLock);
-        if (mStopped) {
-            break;
-        }
-        if (mIncDequeueable > 0) {
-            int inc = mIncDequeueable > kMaxDequeueMax ? kMaxDequeueMax : mIncDequeueable;
-            int ret = ::write(mWritePipeFd.get(), buf, inc);
-            int written = ret <= 0 ? 0 : ret;
-            mIncDequeueable -= written;
-            if (mIncDequeueable > 0) {
-                l.unlock();
-                if (ret < 0) {
-                    ALOGE("write to writing end failed %d", errno);
-                } else {
-                    ALOGW("partial write %d(%d)", inc, written);
-                }
-                continue;
-            }
-        }
-        mEventCv.wait(l);
-    }
+    int ret = ::read(readFd, buf, dec);
+    // TODO: no dot abort here. (b/318717399)
+    CHECK(ret == dec);
 }
 
 c2_status_t GraphicsTracker::getWaitableFd(int *pipeFd) {
@@ -603,8 +568,7 @@
             return;
         }
         mDequeueable++;
-        l.unlock();
-        writeIncDequeueable(1);
+        writeIncDequeueableLocked(1);
     }
 }
 
@@ -779,8 +743,7 @@
             return C2_OK;
         }
         mDequeueable++;
-        l.unlock();
-        writeIncDequeueable(1);
+        writeIncDequeueableLocked(1);
     }
     return C2_OK;
 }
@@ -798,8 +761,7 @@
         return;
     }
     mDequeueable++;
-    l.unlock();
-    writeIncDequeueable(1);
+    writeIncDequeueableLocked(1);
 }
 
 
@@ -855,8 +817,7 @@
             return C2_BAD_STATE;
         }
         mDequeueable++;
-        l.unlock();
-        writeIncDequeueable(1);
+        writeIncDequeueableLocked(1);
         return C2_BAD_STATE;
     }
     std::shared_ptr<BufferItem> buffer = it->second;
@@ -898,8 +859,7 @@
             return;
         }
         mDequeueable++;
-        l.unlock();
-        writeIncDequeueable(1);
+        writeIncDequeueableLocked(1);
         return;
     }
 }
@@ -990,8 +950,7 @@
         if (mBufferCache->mGeneration == generation) {
             if (!adjustDequeueConfLocked(&updateDequeue)) {
                 mDequeueable++;
-                l.unlock();
-                writeIncDequeueable(1);
+                writeIncDequeueableLocked(1);
             }
         }
     }