Merge "Revert "Add timeout logic to TestWakeupClientServiceImpl.""
diff --git a/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h b/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h
index 9d6ef0a..4c440b8 100644
--- a/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h
+++ b/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h
@@ -17,7 +17,6 @@
#pragma once
#include <android-base/thread_annotations.h>
-#include <utils/Looper.h>
#include <wakeup_client.grpc.pb.h>
#include <condition_variable>
#include <mutex>
@@ -42,60 +41,20 @@
constexpr static uint8_t DATA[] = {0xde, 0xad, 0xbe, 0xef};
};
-struct TaskInfo {
- // This is unique per-task. Note that a task might be popped and put back into the task queue,
- // it will have a new task ID but the same clientId in the task data.
- int taskId;
- long timestampInMs;
- GetRemoteTasksResponse taskData;
-};
-
-struct TaskInfoComparator {
- // We want the smallest timestamp and smallest task ID on top.
- bool operator()(const TaskInfo& l, const TaskInfo& r) {
- return l.timestampInMs > r.timestampInMs ||
- (l.timestampInMs == r.timestampInMs && l.taskId > r.taskId);
- }
-};
-
-// forward-declaration.
-class TaskQueue;
-
-class TaskTimeoutMessageHandler final : public android::MessageHandler {
- public:
- TaskTimeoutMessageHandler(TaskQueue* taskQueue);
- void handleMessage(const android::Message& message) override;
-
- private:
- TaskQueue* mTaskQueue;
-};
-
// TaskQueue is thread-safe.
class TaskQueue final {
public:
- TaskQueue();
- ~TaskQueue();
-
void add(const GetRemoteTasksResponse& response);
std::optional<GetRemoteTasksResponse> maybePopOne();
void waitForTask();
void stopWait();
- void handleTaskTimeout();
private:
- std::thread mCheckTaskTimeoutThread;
std::mutex mLock;
- std::priority_queue<TaskInfo, std::vector<TaskInfo>, TaskInfoComparator> mTasks
- GUARDED_BY(mLock);
+ std::queue<GetRemoteTasksResponse> mTasks GUARDED_BY(mLock);
// A variable to notify mTasks is not empty.
std::condition_variable mTasksNotEmptyCv;
bool mStopped GUARDED_BY(mLock);
- android::sp<Looper> mLooper;
- android::sp<TaskTimeoutMessageHandler> mTaskTimeoutMessageHandler;
- std::atomic<int> mTaskIdCounter = 0;
-
- void checkForTestTimeoutLoop();
- void waitForTaskWithLock(std::unique_lock<std::mutex>& lock);
};
class TestWakeupClientServiceImpl final : public WakeupClient::Service {
diff --git a/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp b/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp
index 8e6669f..1eb87e2 100644
--- a/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp
+++ b/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp
@@ -18,8 +18,6 @@
#include <android-base/stringprintf.h>
#include <utils/Log.h>
-#include <utils/Looper.h>
-#include <utils/SystemClock.h>
#include <chrono>
#include <thread>
@@ -30,15 +28,13 @@
namespace {
-using ::android::uptimeMillis;
using ::android::base::ScopedLockAssertion;
using ::android::base::StringPrintf;
using ::grpc::ServerContext;
using ::grpc::ServerWriter;
using ::grpc::Status;
-constexpr int kTaskIntervalInMs = 5'000;
-constexpr int KTaskTimeoutInMs = 20'000;
+constexpr int kTaskIntervalInSec = 5;
} // namespace
@@ -51,68 +47,24 @@
return response;
}
-TaskTimeoutMessageHandler::TaskTimeoutMessageHandler(TaskQueue* taskQueue)
- : mTaskQueue(taskQueue) {}
-
-void TaskTimeoutMessageHandler::handleMessage(const android::Message& message) {
- mTaskQueue->handleTaskTimeout();
-}
-
-TaskQueue::TaskQueue() {
- mTaskTimeoutMessageHandler = android::sp<TaskTimeoutMessageHandler>::make(this);
- mLooper = Looper::prepare(/*opts=*/0);
- mCheckTaskTimeoutThread = std::thread([this] { checkForTestTimeoutLoop(); });
-}
-
-TaskQueue::~TaskQueue() {
- {
- std::lock_guard<std::mutex> lockGuard(mLock);
- mStopped = true;
- }
- while (true) {
- // Remove all pending timeout handlers from queue.
- if (!maybePopOne().has_value()) {
- break;
- }
- }
- if (mCheckTaskTimeoutThread.joinable()) {
- mCheckTaskTimeoutThread.join();
- }
-}
-
std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() {
std::lock_guard<std::mutex> lockGuard(mLock);
if (mTasks.size() == 0) {
return std::nullopt;
}
- TaskInfo response = std::move(mTasks.top());
+ GetRemoteTasksResponse response = mTasks.front();
mTasks.pop();
- mLooper->removeMessages(mTaskTimeoutMessageHandler, response.taskId);
- return std::move(response.taskData);
+ return std::move(response);
}
-
void TaskQueue::add(const GetRemoteTasksResponse& task) {
+ // TODO (b/246841306): add timeout to tasks.
std::lock_guard<std::mutex> lockGuard(mLock);
- if (mStopped) {
- return;
- }
- int taskId = mTaskIdCounter++;
- mTasks.push(TaskInfo{
- .taskId = taskId,
- .timestampInMs = uptimeMillis(),
- .taskData = task,
- });
- android::Message message(taskId);
- mLooper->sendMessageDelayed(KTaskTimeoutInMs * 1000, mTaskTimeoutMessageHandler, message);
+ mTasks.push(task);
mTasksNotEmptyCv.notify_all();
}
void TaskQueue::waitForTask() {
std::unique_lock<std::mutex> lock(mLock);
- waitForTaskWithLock(lock);
-}
-
-void TaskQueue::waitForTaskWithLock(std::unique_lock<std::mutex>& lock) {
mTasksNotEmptyCv.wait(lock, [this] {
ScopedLockAssertion lockAssertion(mLock);
return mTasks.size() > 0 || mStopped;
@@ -125,41 +77,6 @@
mTasksNotEmptyCv.notify_all();
}
-void TaskQueue::checkForTestTimeoutLoop() {
- Looper::setForThread(mLooper);
-
- while (true) {
- {
- std::unique_lock<std::mutex> lock(mLock);
- if (mStopped) {
- ALOGW("The TestWakeupClientServiceImpl is stopping, "
- "exiting checkForTestTimeoutLoop");
- return;
- }
- }
-
- mLooper->pollAll(/*timeoutMillis=*/-1);
- }
-}
-
-void TaskQueue::handleTaskTimeout() {
- // We know which task timed-out from the taskId in the message. However, there is no easy way
- // to remove a specific task with the task ID from the priority_queue, so we just check from
- // the top of the queue (which have the oldest tasks).
- std::lock_guard<std::mutex> lockGuard(mLock);
- long now = uptimeMillis();
- while (mTasks.size() > 0) {
- const TaskInfo& taskInfo = mTasks.top();
- if (taskInfo.timestampInMs + KTaskTimeoutInMs > now) {
- break;
- }
- // In real implementation, this should report task failure to remote wakeup server.
- ALOGW("Task for client ID: %s timed-out, added at %ld ms, now %ld ms",
- taskInfo.taskData.clientid().c_str(), taskInfo.timestampInMs, now);
- mTasks.pop();
- }
-}
-
TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() {
mThread = std::thread([this] { fakeTaskGenerateLoop(); });
}
@@ -178,13 +95,13 @@
void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() {
// In actual implementation, this should communicate with the remote server and receives tasks
- // from it. Here we simulate receiving one remote task every {kTaskIntervalInMs}ms.
+ // from it. Here we simulate receiving one remote task every {kTaskIntervalInSec}s.
while (true) {
mTaskQueue.add(mFakeTaskGenerator.generateTask());
- ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInMs);
+ ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInSec);
std::unique_lock lk(mLock);
- if (mServerStoppedCv.wait_for(lk, std::chrono::milliseconds(kTaskIntervalInMs), [this] {
+ if (mServerStoppedCv.wait_for(lk, std::chrono::seconds(kTaskIntervalInSec), [this] {
ScopedLockAssertion lockAssertion(mLock);
return mServerStopped;
})) {