Implement receiving remote task.

Test: atest RemoteAccessServiceUnitTest
Bug: 241483300
Change-Id: I3d7f54f154beebba1bb1bdb7640dfe973ad012e4
diff --git a/automotive/remoteaccess/impl/default/test/RemoteAccessServiceUnitTest.cpp b/automotive/remoteaccess/impl/default/test/RemoteAccessServiceUnitTest.cpp
index de25927..11523f6 100644
--- a/automotive/remoteaccess/impl/default/test/RemoteAccessServiceUnitTest.cpp
+++ b/automotive/remoteaccess/impl/default/test/RemoteAccessServiceUnitTest.cpp
@@ -16,15 +16,25 @@
 
 #include "RemoteAccessService.h"
 
+#include <aidl/android/hardware/automotive/remoteaccess/ApState.h>
+#include <aidl/android/hardware/automotive/remoteaccess/BnRemoteTaskCallback.h>
 #include <gmock/gmock.h>
+#include <grpcpp/test/mock_stream.h>
 #include <gtest/gtest.h>
 #include <wakeup_client.grpc.pb.h>
+#include <chrono>
+#include <thread>
 
 namespace android {
 namespace hardware {
 namespace automotive {
 namespace remoteaccess {
 
+using ::android::base::ScopedLockAssertion;
+
+using ::aidl::android::hardware::automotive::remoteaccess::ApState;
+using ::aidl::android::hardware::automotive::remoteaccess::BnRemoteTaskCallback;
+
 using ::grpc::ClientAsyncReaderInterface;
 using ::grpc::ClientAsyncResponseReaderInterface;
 using ::grpc::ClientContext;
@@ -32,8 +42,12 @@
 using ::grpc::ClientReaderInterface;
 using ::grpc::CompletionQueue;
 using ::grpc::Status;
-
+using ::grpc::testing::MockClientReader;
 using ::ndk::ScopedAStatus;
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::Return;
+using ::testing::SetArgPointee;
 
 class MockGrpcClientStub : public WakeupClient::StubInterface {
   public:
@@ -59,9 +73,37 @@
                  CompletionQueue* cq));
 };
 
+class FakeRemoteTaskCallback : public BnRemoteTaskCallback {
+  public:
+    ScopedAStatus onRemoteTaskRequested(const std::string& clientId,
+                                        const std::vector<uint8_t>& data) override {
+        std::lock_guard<std::mutex> lockGuard(mLock);
+        mDataByClientId[clientId] = data;
+        mTaskCount++;
+        mCv.notify_all();
+        return ScopedAStatus::ok();
+    }
+
+    std::vector<uint8_t> getData(const std::string& clientId) { return mDataByClientId[clientId]; }
+
+    bool wait(size_t taskCount, size_t timeoutInSec) {
+        std::unique_lock<std::mutex> lock(mLock);
+        return mCv.wait_for(lock, std::chrono::seconds(timeoutInSec), [taskCount, this] {
+            ScopedLockAssertion lockAssertion(mLock);
+            return mTaskCount >= taskCount;
+        });
+    }
+
+  private:
+    std::mutex mLock;
+    std::unordered_map<std::string, std::vector<uint8_t>> mDataByClientId GUARDED_BY(mLock);
+    size_t mTaskCount GUARDED_BY(mLock) = 0;
+    std::condition_variable mCv;
+};
+
 class RemoteAccessServiceUnitTest : public ::testing::Test {
   public:
-    RemoteAccessServiceUnitTest() {
+    virtual void SetUp() override {
         mGrpcWakeupClientStub = std::make_unique<MockGrpcClientStub>();
         mService = ndk::SharedRefBase::make<RemoteAccessService>(mGrpcWakeupClientStub.get());
     }
@@ -70,9 +112,12 @@
 
     RemoteAccessService* getService() { return mService.get(); }
 
+    void setRetryWaitInMs(size_t retryWaitInMs) { mService->setRetryWaitInMs(retryWaitInMs); }
+
   private:
     std::unique_ptr<MockGrpcClientStub> mGrpcWakeupClientStub;
     std::shared_ptr<RemoteAccessService> mService;
+    MockClientReader<GetRemoteTasksResponse>* mMockTaskReader;
 };
 
 TEST_F(RemoteAccessServiceUnitTest, TestGetWakeupServiceName) {
@@ -84,6 +129,159 @@
     EXPECT_EQ(serviceName, "com.google.vehicle.wakeup");
 }
 
+TEST_F(RemoteAccessServiceUnitTest, TestNotifyApStateChangeWakeupRequired) {
+    bool isWakeupRequired = false;
+    EXPECT_CALL(*getGrpcWakeupClientStub(), NotifyWakeupRequired)
+            .WillOnce([&isWakeupRequired]([[maybe_unused]] ClientContext* context,
+                                          const NotifyWakeupRequiredRequest& request,
+                                          [[maybe_unused]] NotifyWakeupRequiredResponse* response) {
+                isWakeupRequired = request.iswakeuprequired();
+                return Status();
+            });
+
+    ApState newState = {
+            .isWakeupRequired = true,
+    };
+    ScopedAStatus status = getService()->notifyApStateChange(newState);
+
+    EXPECT_TRUE(status.isOk());
+    EXPECT_TRUE(isWakeupRequired);
+}
+
+TEST_F(RemoteAccessServiceUnitTest, TestGetRemoteTasks) {
+    GetRemoteTasksResponse response1;
+    std::vector<uint8_t> testData = {0xde, 0xad, 0xbe, 0xef};
+    response1.set_clientid("1");
+    response1.set_data(testData.data(), testData.size());
+    GetRemoteTasksResponse response2;
+    response2.set_clientid("2");
+    std::shared_ptr<FakeRemoteTaskCallback> callback =
+            ndk::SharedRefBase::make<FakeRemoteTaskCallback>();
+
+    ON_CALL(*getGrpcWakeupClientStub(), GetRemoteTasksRaw)
+            .WillByDefault(
+                    [response1, response2]([[maybe_unused]] ClientContext* context,
+                                           [[maybe_unused]] const GetRemoteTasksRequest& request) {
+                        // mockReader ownership will be transferred to the client so we don't own it
+                        // here.
+                        MockClientReader<GetRemoteTasksResponse>* mockClientReader =
+                                new MockClientReader<GetRemoteTasksResponse>();
+                        EXPECT_CALL(*mockClientReader, Finish()).WillOnce(Return(Status::OK));
+                        EXPECT_CALL(*mockClientReader, Read(_))
+                                .WillOnce(DoAll(SetArgPointee<0>(response1), Return(true)))
+                                .WillOnce(DoAll(SetArgPointee<0>(response2), Return(true)))
+                                .WillRepeatedly(Return(false));
+                        return mockClientReader;
+                    });
+
+    getService()->setRemoteTaskCallback(callback);
+    // Start the long live connection to receive tasks.
+    ApState newState = {
+            .isReadyForRemoteTask = true,
+    };
+    ASSERT_TRUE(getService()->notifyApStateChange(newState).isOk());
+
+    ASSERT_TRUE(callback->wait(/*taskCount=*/2, /*timeoutInSec=*/10))
+            << "Did not receive enough tasks";
+    EXPECT_EQ(callback->getData("1"), testData);
+    EXPECT_EQ(callback->getData("2"), std::vector<uint8_t>());
+}
+
+TEST_F(RemoteAccessServiceUnitTest, TestGetRemoteTasksRetryConnection) {
+    GetRemoteTasksResponse response;
+    std::shared_ptr<FakeRemoteTaskCallback> callback =
+            ndk::SharedRefBase::make<FakeRemoteTaskCallback>();
+
+    ON_CALL(*getGrpcWakeupClientStub(), GetRemoteTasksRaw)
+            .WillByDefault([response]([[maybe_unused]] ClientContext* context,
+                                      [[maybe_unused]] const GetRemoteTasksRequest& request) {
+                // mockReader ownership will be transferred to the client so we don't own it here.
+                MockClientReader<GetRemoteTasksResponse>* mockClientReader =
+                        new MockClientReader<GetRemoteTasksResponse>();
+                EXPECT_CALL(*mockClientReader, Finish()).WillOnce(Return(Status::OK));
+                // Connection fails after receiving one task. Should retry after some time.
+                EXPECT_CALL(*mockClientReader, Read(_))
+                        .WillOnce(DoAll(SetArgPointee<0>(response), Return(true)))
+                        .WillRepeatedly(Return(false));
+                return mockClientReader;
+            });
+
+    getService()->setRemoteTaskCallback(callback);
+    setRetryWaitInMs(100);
+    // Start the long live connection to receive tasks.
+    ApState newState = {
+            .isReadyForRemoteTask = true,
+    };
+    ASSERT_TRUE(getService()->notifyApStateChange(newState).isOk());
+
+    ASSERT_TRUE(callback->wait(/*taskCount=*/2, /*timeoutInSec=*/10))
+            << "Did not receive enough tasks";
+}
+
+TEST_F(RemoteAccessServiceUnitTest, TestGetRemoteTasksDefaultNotReady) {
+    GetRemoteTasksResponse response1;
+    std::vector<uint8_t> testData = {0xde, 0xad, 0xbe, 0xef};
+    response1.set_clientid("1");
+    response1.set_data(testData.data(), testData.size());
+    GetRemoteTasksResponse response2;
+    response2.set_clientid("2");
+    std::shared_ptr<FakeRemoteTaskCallback> callback =
+            ndk::SharedRefBase::make<FakeRemoteTaskCallback>();
+
+    EXPECT_CALL(*getGrpcWakeupClientStub(), GetRemoteTasksRaw).Times(0);
+
+    // Default state is not ready for remote tasks, so no callback will be called.
+    getService()->setRemoteTaskCallback(callback);
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+}
+
+TEST_F(RemoteAccessServiceUnitTest, TestGetRemoteTasksNotReadyAfterReady) {
+    GetRemoteTasksResponse response1;
+    std::vector<uint8_t> testData = {0xde, 0xad, 0xbe, 0xef};
+    response1.set_clientid("1");
+    response1.set_data(testData.data(), testData.size());
+    GetRemoteTasksResponse response2;
+    response2.set_clientid("2");
+    std::shared_ptr<FakeRemoteTaskCallback> callback =
+            ndk::SharedRefBase::make<FakeRemoteTaskCallback>();
+
+    ON_CALL(*getGrpcWakeupClientStub(), GetRemoteTasksRaw)
+            .WillByDefault(
+                    [response1, response2]([[maybe_unused]] ClientContext* context,
+                                           [[maybe_unused]] const GetRemoteTasksRequest& request) {
+                        // mockReader ownership will be transferred to the client so we don't own it
+                        // here.
+                        MockClientReader<GetRemoteTasksResponse>* mockClientReader =
+                                new MockClientReader<GetRemoteTasksResponse>();
+                        EXPECT_CALL(*mockClientReader, Finish()).WillOnce(Return(Status::OK));
+                        EXPECT_CALL(*mockClientReader, Read(_))
+                                .WillOnce(DoAll(SetArgPointee<0>(response1), Return(true)))
+                                .WillOnce(DoAll(SetArgPointee<0>(response2), Return(true)))
+                                .WillRepeatedly(Return(false));
+                        return mockClientReader;
+                    });
+    // Should only be called once when is is ready for remote task.
+    EXPECT_CALL(*getGrpcWakeupClientStub(), GetRemoteTasksRaw).Times(1);
+
+    getService()->setRemoteTaskCallback(callback);
+    setRetryWaitInMs(100);
+    // Start the long live connection to receive tasks.
+    ApState newState = {
+            .isReadyForRemoteTask = true,
+    };
+    ASSERT_TRUE(getService()->notifyApStateChange(newState).isOk());
+    ASSERT_TRUE(callback->wait(/*taskCount=*/2, /*timeoutInSec=*/10))
+            << "Did not receive enough tasks";
+
+    // Stop the long live connection.
+    newState.isReadyForRemoteTask = false;
+    ASSERT_TRUE(getService()->notifyApStateChange(newState).isOk());
+
+    // Wait for the retry delay, but the loop should already exit.
+    std::this_thread::sleep_for(std::chrono::milliseconds(150));
+}
+
 }  // namespace remoteaccess
 }  // namespace automotive
 }  // namespace hardware