Add timeout logic to TestWakeupClientServiceImpl.
Add timeout logic for fake tasks. They will timeout after 20s and
print an error message if not received by the remote access HAL.
Test: Manually run TestWakeupClientServiceImpl and verify the log:
Task for client ID: [ID] timed-out
is printed.
Bug: 246841306
Change-Id: I2173c931da9e0ea40c7b16f9e25a75592fa255c0
diff --git a/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h b/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h
index 4c440b8..9d6ef0a 100644
--- a/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h
+++ b/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h
@@ -17,6 +17,7 @@
#pragma once
#include <android-base/thread_annotations.h>
+#include <utils/Looper.h>
#include <wakeup_client.grpc.pb.h>
#include <condition_variable>
#include <mutex>
@@ -41,20 +42,60 @@
constexpr static uint8_t DATA[] = {0xde, 0xad, 0xbe, 0xef};
};
+struct TaskInfo {
+ // This is unique per-task. Note that a task might be popped and put back into the task queue,
+ // it will have a new task ID but the same clientId in the task data.
+ int taskId;
+ long timestampInMs;
+ GetRemoteTasksResponse taskData;
+};
+
+struct TaskInfoComparator {
+ // We want the smallest timestamp and smallest task ID on top.
+ bool operator()(const TaskInfo& l, const TaskInfo& r) {
+ return l.timestampInMs > r.timestampInMs ||
+ (l.timestampInMs == r.timestampInMs && l.taskId > r.taskId);
+ }
+};
+
+// forward-declaration.
+class TaskQueue;
+
+class TaskTimeoutMessageHandler final : public android::MessageHandler {
+ public:
+ TaskTimeoutMessageHandler(TaskQueue* taskQueue);
+ void handleMessage(const android::Message& message) override;
+
+ private:
+ TaskQueue* mTaskQueue;
+};
+
// TaskQueue is thread-safe.
class TaskQueue final {
public:
+ TaskQueue();
+ ~TaskQueue();
+
void add(const GetRemoteTasksResponse& response);
std::optional<GetRemoteTasksResponse> maybePopOne();
void waitForTask();
void stopWait();
+ void handleTaskTimeout();
private:
+ std::thread mCheckTaskTimeoutThread;
std::mutex mLock;
- std::queue<GetRemoteTasksResponse> mTasks GUARDED_BY(mLock);
+ std::priority_queue<TaskInfo, std::vector<TaskInfo>, TaskInfoComparator> mTasks
+ GUARDED_BY(mLock);
// A variable to notify mTasks is not empty.
std::condition_variable mTasksNotEmptyCv;
bool mStopped GUARDED_BY(mLock);
+ android::sp<Looper> mLooper;
+ android::sp<TaskTimeoutMessageHandler> mTaskTimeoutMessageHandler;
+ std::atomic<int> mTaskIdCounter = 0;
+
+ void checkForTestTimeoutLoop();
+ void waitForTaskWithLock(std::unique_lock<std::mutex>& lock);
};
class TestWakeupClientServiceImpl final : public WakeupClient::Service {
diff --git a/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp b/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp
index 1eb87e2..8e6669f 100644
--- a/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp
+++ b/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp
@@ -18,6 +18,8 @@
#include <android-base/stringprintf.h>
#include <utils/Log.h>
+#include <utils/Looper.h>
+#include <utils/SystemClock.h>
#include <chrono>
#include <thread>
@@ -28,13 +30,15 @@
namespace {
+using ::android::uptimeMillis;
using ::android::base::ScopedLockAssertion;
using ::android::base::StringPrintf;
using ::grpc::ServerContext;
using ::grpc::ServerWriter;
using ::grpc::Status;
-constexpr int kTaskIntervalInSec = 5;
+constexpr int kTaskIntervalInMs = 5'000;
+constexpr int KTaskTimeoutInMs = 20'000;
} // namespace
@@ -47,24 +51,68 @@
return response;
}
+TaskTimeoutMessageHandler::TaskTimeoutMessageHandler(TaskQueue* taskQueue)
+ : mTaskQueue(taskQueue) {}
+
+void TaskTimeoutMessageHandler::handleMessage(const android::Message& message) {
+ mTaskQueue->handleTaskTimeout();
+}
+
+TaskQueue::TaskQueue() {
+ mTaskTimeoutMessageHandler = android::sp<TaskTimeoutMessageHandler>::make(this);
+ mLooper = Looper::prepare(/*opts=*/0);
+ mCheckTaskTimeoutThread = std::thread([this] { checkForTestTimeoutLoop(); });
+}
+
+TaskQueue::~TaskQueue() {
+ {
+ std::lock_guard<std::mutex> lockGuard(mLock);
+ mStopped = true;
+ }
+ while (true) {
+ // Remove all pending timeout handlers from queue.
+ if (!maybePopOne().has_value()) {
+ break;
+ }
+ }
+ if (mCheckTaskTimeoutThread.joinable()) {
+ mCheckTaskTimeoutThread.join();
+ }
+}
+
std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() {
std::lock_guard<std::mutex> lockGuard(mLock);
if (mTasks.size() == 0) {
return std::nullopt;
}
- GetRemoteTasksResponse response = mTasks.front();
+ TaskInfo response = std::move(mTasks.top());
mTasks.pop();
- return std::move(response);
+ mLooper->removeMessages(mTaskTimeoutMessageHandler, response.taskId);
+ return std::move(response.taskData);
}
+
void TaskQueue::add(const GetRemoteTasksResponse& task) {
- // TODO (b/246841306): add timeout to tasks.
std::lock_guard<std::mutex> lockGuard(mLock);
- mTasks.push(task);
+ if (mStopped) {
+ return;
+ }
+ int taskId = mTaskIdCounter++;
+ mTasks.push(TaskInfo{
+ .taskId = taskId,
+ .timestampInMs = uptimeMillis(),
+ .taskData = task,
+ });
+ android::Message message(taskId);
+ mLooper->sendMessageDelayed(KTaskTimeoutInMs * 1000, mTaskTimeoutMessageHandler, message);
mTasksNotEmptyCv.notify_all();
}
void TaskQueue::waitForTask() {
std::unique_lock<std::mutex> lock(mLock);
+ waitForTaskWithLock(lock);
+}
+
+void TaskQueue::waitForTaskWithLock(std::unique_lock<std::mutex>& lock) {
mTasksNotEmptyCv.wait(lock, [this] {
ScopedLockAssertion lockAssertion(mLock);
return mTasks.size() > 0 || mStopped;
@@ -77,6 +125,41 @@
mTasksNotEmptyCv.notify_all();
}
+void TaskQueue::checkForTestTimeoutLoop() {
+ Looper::setForThread(mLooper);
+
+ while (true) {
+ {
+ std::unique_lock<std::mutex> lock(mLock);
+ if (mStopped) {
+ ALOGW("The TestWakeupClientServiceImpl is stopping, "
+ "exiting checkForTestTimeoutLoop");
+ return;
+ }
+ }
+
+ mLooper->pollAll(/*timeoutMillis=*/-1);
+ }
+}
+
+void TaskQueue::handleTaskTimeout() {
+ // We know which task timed-out from the taskId in the message. However, there is no easy way
+ // to remove a specific task with the task ID from the priority_queue, so we just check from
+ // the top of the queue (which have the oldest tasks).
+ std::lock_guard<std::mutex> lockGuard(mLock);
+ long now = uptimeMillis();
+ while (mTasks.size() > 0) {
+ const TaskInfo& taskInfo = mTasks.top();
+ if (taskInfo.timestampInMs + KTaskTimeoutInMs > now) {
+ break;
+ }
+ // In real implementation, this should report task failure to remote wakeup server.
+ ALOGW("Task for client ID: %s timed-out, added at %ld ms, now %ld ms",
+ taskInfo.taskData.clientid().c_str(), taskInfo.timestampInMs, now);
+ mTasks.pop();
+ }
+}
+
TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() {
mThread = std::thread([this] { fakeTaskGenerateLoop(); });
}
@@ -95,13 +178,13 @@
void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() {
// In actual implementation, this should communicate with the remote server and receives tasks
- // from it. Here we simulate receiving one remote task every {kTaskIntervalInSec}s.
+ // from it. Here we simulate receiving one remote task every {kTaskIntervalInMs}ms.
while (true) {
mTaskQueue.add(mFakeTaskGenerator.generateTask());
- ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInSec);
+ ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInMs);
std::unique_lock lk(mLock);
- if (mServerStoppedCv.wait_for(lk, std::chrono::seconds(kTaskIntervalInSec), [this] {
+ if (mServerStoppedCv.wait_for(lk, std::chrono::milliseconds(kTaskIntervalInMs), [this] {
ScopedLockAssertion lockAssertion(mLock);
return mServerStopped;
})) {