Add timeout logic to TestWakeupClientServiceImpl.

Add timeout logic for fake tasks. They will timeout after 20s and
print an error message if not received by the remote access HAL.

Test: Manually run TestWakeupClientServiceImpl and verify the log:
Task for client ID: [ID] timed-out
is printed.
Bug: 246841306

Change-Id: I2173c931da9e0ea40c7b16f9e25a75592fa255c0
diff --git a/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h b/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h
index 4c440b8..9d6ef0a 100644
--- a/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h
+++ b/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <android-base/thread_annotations.h>
+#include <utils/Looper.h>
 #include <wakeup_client.grpc.pb.h>
 #include <condition_variable>
 #include <mutex>
@@ -41,20 +42,60 @@
     constexpr static uint8_t DATA[] = {0xde, 0xad, 0xbe, 0xef};
 };
 
+struct TaskInfo {
+    // This is unique per-task. Note that a task might be popped and put back into the task queue,
+    // it will have a new task ID but the same clientId in the task data.
+    int taskId;
+    long timestampInMs;
+    GetRemoteTasksResponse taskData;
+};
+
+struct TaskInfoComparator {
+    // We want the smallest timestamp and smallest task ID on top.
+    bool operator()(const TaskInfo& l, const TaskInfo& r) {
+        return l.timestampInMs > r.timestampInMs ||
+               (l.timestampInMs == r.timestampInMs && l.taskId > r.taskId);
+    }
+};
+
+// forward-declaration.
+class TaskQueue;
+
+class TaskTimeoutMessageHandler final : public android::MessageHandler {
+  public:
+    TaskTimeoutMessageHandler(TaskQueue* taskQueue);
+    void handleMessage(const android::Message& message) override;
+
+  private:
+    TaskQueue* mTaskQueue;
+};
+
 // TaskQueue is thread-safe.
 class TaskQueue final {
   public:
+    TaskQueue();
+    ~TaskQueue();
+
     void add(const GetRemoteTasksResponse& response);
     std::optional<GetRemoteTasksResponse> maybePopOne();
     void waitForTask();
     void stopWait();
+    void handleTaskTimeout();
 
   private:
+    std::thread mCheckTaskTimeoutThread;
     std::mutex mLock;
-    std::queue<GetRemoteTasksResponse> mTasks GUARDED_BY(mLock);
+    std::priority_queue<TaskInfo, std::vector<TaskInfo>, TaskInfoComparator> mTasks
+            GUARDED_BY(mLock);
     // A variable to notify mTasks is not empty.
     std::condition_variable mTasksNotEmptyCv;
     bool mStopped GUARDED_BY(mLock);
+    android::sp<Looper> mLooper;
+    android::sp<TaskTimeoutMessageHandler> mTaskTimeoutMessageHandler;
+    std::atomic<int> mTaskIdCounter = 0;
+
+    void checkForTestTimeoutLoop();
+    void waitForTaskWithLock(std::unique_lock<std::mutex>& lock);
 };
 
 class TestWakeupClientServiceImpl final : public WakeupClient::Service {
diff --git a/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp b/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp
index 1eb87e2..8e6669f 100644
--- a/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp
+++ b/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp
@@ -18,6 +18,8 @@
 
 #include <android-base/stringprintf.h>
 #include <utils/Log.h>
+#include <utils/Looper.h>
+#include <utils/SystemClock.h>
 #include <chrono>
 #include <thread>
 
@@ -28,13 +30,15 @@
 
 namespace {
 
+using ::android::uptimeMillis;
 using ::android::base::ScopedLockAssertion;
 using ::android::base::StringPrintf;
 using ::grpc::ServerContext;
 using ::grpc::ServerWriter;
 using ::grpc::Status;
 
-constexpr int kTaskIntervalInSec = 5;
+constexpr int kTaskIntervalInMs = 5'000;
+constexpr int KTaskTimeoutInMs = 20'000;
 
 }  // namespace
 
@@ -47,24 +51,68 @@
     return response;
 }
 
+TaskTimeoutMessageHandler::TaskTimeoutMessageHandler(TaskQueue* taskQueue)
+    : mTaskQueue(taskQueue) {}
+
+void TaskTimeoutMessageHandler::handleMessage(const android::Message& message) {
+    mTaskQueue->handleTaskTimeout();
+}
+
+TaskQueue::TaskQueue() {
+    mTaskTimeoutMessageHandler = android::sp<TaskTimeoutMessageHandler>::make(this);
+    mLooper = Looper::prepare(/*opts=*/0);
+    mCheckTaskTimeoutThread = std::thread([this] { checkForTestTimeoutLoop(); });
+}
+
+TaskQueue::~TaskQueue() {
+    {
+        std::lock_guard<std::mutex> lockGuard(mLock);
+        mStopped = true;
+    }
+    while (true) {
+        // Remove all pending timeout handlers from queue.
+        if (!maybePopOne().has_value()) {
+            break;
+        }
+    }
+    if (mCheckTaskTimeoutThread.joinable()) {
+        mCheckTaskTimeoutThread.join();
+    }
+}
+
 std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() {
     std::lock_guard<std::mutex> lockGuard(mLock);
     if (mTasks.size() == 0) {
         return std::nullopt;
     }
-    GetRemoteTasksResponse response = mTasks.front();
+    TaskInfo response = std::move(mTasks.top());
     mTasks.pop();
-    return std::move(response);
+    mLooper->removeMessages(mTaskTimeoutMessageHandler, response.taskId);
+    return std::move(response.taskData);
 }
+
 void TaskQueue::add(const GetRemoteTasksResponse& task) {
-    // TODO (b/246841306): add timeout to tasks.
     std::lock_guard<std::mutex> lockGuard(mLock);
-    mTasks.push(task);
+    if (mStopped) {
+        return;
+    }
+    int taskId = mTaskIdCounter++;
+    mTasks.push(TaskInfo{
+            .taskId = taskId,
+            .timestampInMs = uptimeMillis(),
+            .taskData = task,
+    });
+    android::Message message(taskId);
+    mLooper->sendMessageDelayed(KTaskTimeoutInMs * 1000, mTaskTimeoutMessageHandler, message);
     mTasksNotEmptyCv.notify_all();
 }
 
 void TaskQueue::waitForTask() {
     std::unique_lock<std::mutex> lock(mLock);
+    waitForTaskWithLock(lock);
+}
+
+void TaskQueue::waitForTaskWithLock(std::unique_lock<std::mutex>& lock) {
     mTasksNotEmptyCv.wait(lock, [this] {
         ScopedLockAssertion lockAssertion(mLock);
         return mTasks.size() > 0 || mStopped;
@@ -77,6 +125,41 @@
     mTasksNotEmptyCv.notify_all();
 }
 
+void TaskQueue::checkForTestTimeoutLoop() {
+    Looper::setForThread(mLooper);
+
+    while (true) {
+        {
+            std::unique_lock<std::mutex> lock(mLock);
+            if (mStopped) {
+                ALOGW("The TestWakeupClientServiceImpl is stopping, "
+                      "exiting checkForTestTimeoutLoop");
+                return;
+            }
+        }
+
+        mLooper->pollAll(/*timeoutMillis=*/-1);
+    }
+}
+
+void TaskQueue::handleTaskTimeout() {
+    // We know which task timed-out from the taskId in the message. However, there is no easy way
+    // to remove a specific task with the task ID from the priority_queue, so we just check from
+    // the top of the queue (which have the oldest tasks).
+    std::lock_guard<std::mutex> lockGuard(mLock);
+    long now = uptimeMillis();
+    while (mTasks.size() > 0) {
+        const TaskInfo& taskInfo = mTasks.top();
+        if (taskInfo.timestampInMs + KTaskTimeoutInMs > now) {
+            break;
+        }
+        // In real implementation, this should report task failure to remote wakeup server.
+        ALOGW("Task for client ID: %s timed-out, added at %ld ms, now %ld ms",
+              taskInfo.taskData.clientid().c_str(), taskInfo.timestampInMs, now);
+        mTasks.pop();
+    }
+}
+
 TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() {
     mThread = std::thread([this] { fakeTaskGenerateLoop(); });
 }
@@ -95,13 +178,13 @@
 
 void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() {
     // In actual implementation, this should communicate with the remote server and receives tasks
-    // from it. Here we simulate receiving one remote task every {kTaskIntervalInSec}s.
+    // from it. Here we simulate receiving one remote task every {kTaskIntervalInMs}ms.
     while (true) {
         mTaskQueue.add(mFakeTaskGenerator.generateTask());
-        ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInSec);
+        ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInMs);
 
         std::unique_lock lk(mLock);
-        if (mServerStoppedCv.wait_for(lk, std::chrono::seconds(kTaskIntervalInSec), [this] {
+        if (mServerStoppedCv.wait_for(lk, std::chrono::milliseconds(kTaskIntervalInMs), [this] {
                 ScopedLockAssertion lockAssertion(mLock);
                 return mServerStopped;
             })) {