Merge "Use subscription manager in VHAL."
diff --git a/automotive/vehicle/aidl/impl/vhal/include/ConnectedClient.h b/automotive/vehicle/aidl/impl/vhal/include/ConnectedClient.h
index 97c25e3..d8516b1 100644
--- a/automotive/vehicle/aidl/impl/vhal/include/ConnectedClient.h
+++ b/automotive/vehicle/aidl/impl/vhal/include/ConnectedClient.h
@@ -101,6 +101,38 @@
     std::shared_ptr<const std::function<void(std::vector<ResultType>)>> mResultCallback;
 };
 
+// A class to represent a client that calls {@code IVehicle.subscribe}.
+class SubscriptionClient final : public ConnectedClient {
+  public:
+    SubscriptionClient(
+            std::shared_ptr<PendingRequestPool> requestPool,
+            std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>
+                    callback);
+
+    // Gets the callback to be called when the request for this client has finished.
+    std::shared_ptr<const std::function<
+            void(std::vector<::aidl::android::hardware::automotive::vehicle::GetValueResult>)>>
+    getResultCallback();
+
+  protected:
+    // Gets the callback to be called when the request for this client has timeout.
+    std::shared_ptr<const PendingRequestPool::TimeoutCallbackFunc> getTimeoutCallback() override;
+
+  private:
+    // The following members are only initialized during construction.
+    std::shared_ptr<const PendingRequestPool::TimeoutCallbackFunc> mTimeoutCallback;
+    std::shared_ptr<const std::function<void(
+            std::vector<::aidl::android::hardware::automotive::vehicle::GetValueResult>)>>
+            mResultCallback;
+
+    static void onGetValueResults(
+            const void* clientId,
+            std::shared_ptr<::aidl::android::hardware::automotive::vehicle::IVehicleCallback>
+                    callback,
+            std::shared_ptr<PendingRequestPool> requestPool,
+            std::vector<::aidl::android::hardware::automotive::vehicle::GetValueResult> results);
+};
+
 }  // namespace vehicle
 }  // namespace automotive
 }  // namespace hardware
diff --git a/automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h b/automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h
index e3e77a3..b0423a3 100644
--- a/automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h
+++ b/automotive/vehicle/aidl/impl/vhal/include/DefaultVehicleHal.h
@@ -20,6 +20,7 @@
 #include "ConnectedClient.h"
 #include "ParcelableUtils.h"
 #include "PendingRequestPool.h"
+#include "SubscriptionManager.h"
 
 #include <IVehicleHardware.h>
 #include <VehicleUtils.h>
@@ -52,6 +53,8 @@
 
     explicit DefaultVehicleHal(std::unique_ptr<IVehicleHardware> hardware);
 
+    ~DefaultVehicleHal();
+
     ::ndk::ScopedAStatus getAllPropConfigs(
             ::aidl::android::hardware::automotive::vehicle::VehiclePropConfigs* returnConfigs)
             override;
@@ -104,12 +107,18 @@
     std::unique_ptr<::ndk::ScopedFileDescriptor> mConfigFile;
     // PendingRequestPool is thread-safe.
     std::shared_ptr<PendingRequestPool> mPendingRequestPool;
+    // SubscriptionManager is thread-safe.
+    std::unique_ptr<SubscriptionManager> mSubscriptionManager;
 
     std::mutex mLock;
     std::unordered_map<CallbackType, std::shared_ptr<GetValuesClient>> mGetValuesClients
             GUARDED_BY(mLock);
     std::unordered_map<CallbackType, std::shared_ptr<SetValuesClient>> mSetValuesClients
             GUARDED_BY(mLock);
+    std::unordered_map<CallbackType, std::shared_ptr<SubscriptionClient>> mSubscriptionClients
+            GUARDED_BY(mLock);
+    // An increasing request ID we keep for subscribe clients.
+    std::unordered_map<CallbackType, int64_t> mSubscribeIdByClient GUARDED_BY(mLock);
 
     template <class T>
     std::shared_ptr<T> getOrCreateClient(
@@ -127,6 +136,10 @@
             const std::vector<::aidl::android::hardware::automotive::vehicle::SetValueRequest>&
                     requests);
 
+    void getValueFromHardwareCallCallback(
+            const CallbackType& callback,
+            const ::aidl::android::hardware::automotive::vehicle::VehiclePropValue& value);
+
     // Test-only
     // Set the default timeout for pending requests.
     void setTimeout(int64_t timeoutInNano);
diff --git a/automotive/vehicle/aidl/impl/vhal/src/ConnectedClient.cpp b/automotive/vehicle/aidl/impl/vhal/src/ConnectedClient.cpp
index abc3eb0..7d02a05 100644
--- a/automotive/vehicle/aidl/impl/vhal/src/ConnectedClient.cpp
+++ b/automotive/vehicle/aidl/impl/vhal/src/ConnectedClient.cpp
@@ -244,6 +244,93 @@
 template class GetSetValuesClient<GetValueResult, GetValueResults>;
 template class GetSetValuesClient<SetValueResult, SetValueResults>;
 
+SubscriptionClient::SubscriptionClient(std::shared_ptr<PendingRequestPool> requestPool,
+                                       std::shared_ptr<IVehicleCallback> callback)
+    : ConnectedClient(requestPool, callback) {
+    mTimeoutCallback = std::make_shared<const PendingRequestPool::TimeoutCallbackFunc>(
+            [](std::unordered_set<int64_t> timeoutIds) {
+                for (int64_t id : timeoutIds) {
+                    ALOGW("subscribe: requests with IDs: %" PRId64
+                          " has timed-out, not client informed, "
+                          "possibly one of recurrent requests for this subscription failed",
+                          id);
+                }
+            });
+    auto requestPoolCopy = mRequestPool;
+    const void* clientId = reinterpret_cast<const void*>(this);
+    mResultCallback = std::make_shared<const std::function<void(std::vector<GetValueResult>)>>(
+            [clientId, callback, requestPoolCopy](std::vector<GetValueResult> results) {
+                onGetValueResults(clientId, callback, requestPoolCopy, results);
+            });
+}
+
+std::shared_ptr<const std::function<void(std::vector<GetValueResult>)>>
+SubscriptionClient::getResultCallback() {
+    return mResultCallback;
+}
+
+std::shared_ptr<const PendingRequestPool::TimeoutCallbackFunc>
+SubscriptionClient::getTimeoutCallback() {
+    return mTimeoutCallback;
+}
+
+void SubscriptionClient::onGetValueResults(const void* clientId,
+                                           std::shared_ptr<IVehicleCallback> callback,
+                                           std::shared_ptr<PendingRequestPool> requestPool,
+                                           std::vector<GetValueResult> results) {
+    std::unordered_set<int64_t> requestIds;
+    for (const auto& result : results) {
+        requestIds.insert(result.requestId);
+    }
+
+    auto finishedRequests = requestPool->tryFinishRequests(clientId, requestIds);
+    std::vector<VehiclePropValue> propValues;
+    for (auto& result : results) {
+        int64_t requestId = result.requestId;
+        if (finishedRequests.find(requestId) == finishedRequests.end()) {
+            ALOGE("subscribe[%" PRId64
+                  "]: no pending request for the result from hardware, "
+                  "possibly already time-out",
+                  requestId);
+            continue;
+        }
+        if (result.status != StatusCode::OK) {
+            ALOGE("subscribe[%" PRId64
+                  "]: hardware returns non-ok status for getValues, status: "
+                  "%d",
+                  requestId, toInt(result.status));
+            continue;
+        }
+        if (!result.prop.has_value()) {
+            ALOGE("subscribe[%" PRId64 "]: no prop value in getValues result", requestId);
+            continue;
+        }
+        propValues.push_back(std::move(result.prop.value()));
+    }
+
+    if (propValues.empty()) {
+        return;
+    }
+    // TODO(b/205189110): Use memory pool here and fill in sharedMemoryId.
+    VehiclePropValues vehiclePropValues;
+    int32_t sharedMemoryFileCount = 0;
+    ScopedAStatus status = vectorToStableLargeParcelable(propValues, &vehiclePropValues);
+    if (!status.isOk()) {
+        int statusCode = status.getServiceSpecificError();
+        ALOGE("failed to marshal result into large parcelable, error: "
+              "%s, code: %d",
+              status.getMessage(), statusCode);
+        return;
+    }
+
+    if (ScopedAStatus callbackStatus =
+                callback->onPropertyEvent(vehiclePropValues, sharedMemoryFileCount);
+        !callbackStatus.isOk()) {
+        ALOGE("failed to call callback, error: %s, code: %d", status.getMessage(),
+              status.getServiceSpecificError());
+    }
+}
+
 }  // namespace vehicle
 }  // namespace automotive
 }  // namespace hardware
diff --git a/automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp b/automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp
index 3c454f0..1e76eb7 100644
--- a/automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp
+++ b/automotive/vehicle/aidl/impl/vhal/src/DefaultVehicleHal.cpp
@@ -25,6 +25,7 @@
 #include <android-base/result.h>
 #include <utils/Log.h>
 
+#include <inttypes.h>
 #include <set>
 #include <unordered_set>
 
@@ -33,6 +34,8 @@
 namespace automotive {
 namespace vehicle {
 
+namespace {
+
 using ::aidl::android::hardware::automotive::vehicle::GetValueRequest;
 using ::aidl::android::hardware::automotive::vehicle::GetValueRequests;
 using ::aidl::android::hardware::automotive::vehicle::GetValueResult;
@@ -54,6 +57,19 @@
 using ::android::base::Result;
 using ::ndk::ScopedAStatus;
 
+std::string toString(const std::unordered_set<int64_t>& values) {
+    std::string str = "";
+    for (auto it = values.begin(); it != values.end(); it++) {
+        str += std::to_string(*it);
+        if (std::next(it, 1) != values.end()) {
+            str += ", ";
+        }
+    }
+    return str;
+}
+
+}  // namespace
+
 DefaultVehicleHal::DefaultVehicleHal(std::unique_ptr<IVehicleHardware> hardware)
     : mVehicleHardware(std::move(hardware)),
       mPendingRequestPool(std::make_shared<PendingRequestPool>(TIMEOUT_IN_NANO)) {
@@ -73,6 +89,72 @@
     if (result.value() != nullptr) {
         mConfigFile = std::move(result.value());
     }
+
+    mSubscriptionManager = std::make_unique<SubscriptionManager>(
+            [this](const CallbackType& callback, const VehiclePropValue& value) {
+                getValueFromHardwareCallCallback(callback, value);
+            });
+}
+
+DefaultVehicleHal::~DefaultVehicleHal() {
+    // mSubscriptionManager has reference to this, so must be destroyed before other members.
+    mSubscriptionManager.reset();
+}
+
+template <class T>
+std::shared_ptr<T> DefaultVehicleHal::getOrCreateClient(
+        std::unordered_map<CallbackType, std::shared_ptr<T>>* clients,
+        const CallbackType& callback) {
+    if (clients->find(callback) == clients->end()) {
+        // TODO(b/204943359): Remove client from clients when linkToDeath is implemented.
+        (*clients)[callback] = std::make_shared<T>(mPendingRequestPool, callback);
+    }
+    return (*clients)[callback];
+}
+
+template std::shared_ptr<DefaultVehicleHal::GetValuesClient>
+DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::GetValuesClient>(
+        std::unordered_map<CallbackType, std::shared_ptr<GetValuesClient>>* clients,
+        const CallbackType& callback);
+template std::shared_ptr<DefaultVehicleHal::SetValuesClient>
+DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::SetValuesClient>(
+        std::unordered_map<CallbackType, std::shared_ptr<SetValuesClient>>* clients,
+        const CallbackType& callback);
+template std::shared_ptr<SubscriptionClient>
+DefaultVehicleHal::getOrCreateClient<SubscriptionClient>(
+        std::unordered_map<CallbackType, std::shared_ptr<SubscriptionClient>>* clients,
+        const CallbackType& callback);
+
+void DefaultVehicleHal::getValueFromHardwareCallCallback(const CallbackType& callback,
+                                                         const VehiclePropValue& value) {
+    int64_t subscribeId;
+    std::shared_ptr<SubscriptionClient> client;
+    {
+        std::scoped_lock<std::mutex> lockGuard(mLock);
+        // This is initialized to 0 if callback does not exist in the map.
+        subscribeId = (mSubscribeIdByClient[callback])++;
+        client = getOrCreateClient(&mSubscriptionClients, callback);
+    }
+    if (auto addRequestResult = client->addRequests({subscribeId}); !addRequestResult.ok()) {
+        ALOGE("subscribe[%" PRId64 "]: too many pending requests, ignore the getValue request",
+              subscribeId);
+        return;
+    }
+
+    std::vector<GetValueRequest> hardwareRequests = {{
+            .requestId = subscribeId,
+            .prop = value,
+    }};
+
+    if (StatusCode status =
+                mVehicleHardware->getValues(client->getResultCallback(), hardwareRequests);
+        status != StatusCode::OK) {
+        // If the hardware returns error, finish all the pending requests for this request because
+        // we never expect hardware to call callback for these requests.
+        client->tryFinishRequests({subscribeId});
+        ALOGE("subscribe[%" PRId64 "]: failed to get value from VehicleHardware, code: %d",
+              subscribeId, toInt(status));
+    }
 }
 
 void DefaultVehicleHal::setTimeout(int64_t timeoutInNano) {
@@ -92,27 +174,6 @@
     return ScopedAStatus::ok();
 }
 
-template <class T>
-std::shared_ptr<T> DefaultVehicleHal::getOrCreateClient(
-        std::unordered_map<CallbackType, std::shared_ptr<T>>* clients,
-        const CallbackType& callback) {
-    if (clients->find(callback) == clients->end()) {
-        // TODO(b/204943359): Remove client from clients when linkToDeath is implemented.
-        (*clients)[callback] = std::make_shared<T>(mPendingRequestPool, callback);
-    }
-    return (*clients)[callback];
-}
-
-template std::shared_ptr<DefaultVehicleHal::GetValuesClient>
-DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::GetValuesClient>(
-        std::unordered_map<CallbackType, std::shared_ptr<GetValuesClient>>* clients,
-        const CallbackType& callback);
-
-template std::shared_ptr<DefaultVehicleHal::SetValuesClient>
-DefaultVehicleHal::getOrCreateClient<DefaultVehicleHal::SetValuesClient>(
-        std::unordered_map<CallbackType, std::shared_ptr<SetValuesClient>>* clients,
-        const CallbackType& callback);
-
 Result<void> DefaultVehicleHal::checkProperty(const VehiclePropValue& propValue) {
     int32_t propId = propValue.prop;
     auto it = mConfigsByPropId.find(propId);
@@ -151,7 +212,7 @@
 
     auto maybeRequestIds = checkDuplicateRequests(getValueRequests);
     if (!maybeRequestIds.ok()) {
-        ALOGE("duplicate request ID");
+        ALOGE("getValues: duplicate request ID");
         return toScopedAStatus(maybeRequestIds, StatusCode::INVALID_ARG);
     }
     // The set of request Ids that we would send to hardware.
@@ -165,8 +226,8 @@
     }
     // Register the pending hardware requests and also check for duplicate request Ids.
     if (auto addRequestResult = client->addRequests(hardwareRequestIds); !addRequestResult.ok()) {
-        ALOGE("failed to add pending requests, error: %s",
-              addRequestResult.error().message().c_str());
+        ALOGE("getValues[%s]: failed to add pending requests, error: %s",
+              toString(hardwareRequestIds).c_str(), addRequestResult.error().message().c_str());
         return toScopedAStatus(addRequestResult);
     }
 
@@ -176,7 +237,8 @@
         // If the hardware returns error, finish all the pending requests for this request because
         // we never expect hardware to call callback for these requests.
         client->tryFinishRequests(hardwareRequestIds);
-        ALOGE("failed to get value from VehicleHardware, status: %d", toInt(status));
+        ALOGE("getValues[%s]: failed to get value from VehicleHardware, status: %d",
+              toString(hardwareRequestIds).c_str(), toInt(status));
         return ScopedAStatus::fromServiceSpecificErrorWithMessage(
                 toInt(status), "failed to get value from VehicleHardware");
     }
@@ -201,14 +263,15 @@
 
     auto maybeRequestIds = checkDuplicateRequests(setValueRequests);
     if (!maybeRequestIds.ok()) {
-        ALOGE("duplicate request ID");
+        ALOGE("setValues: duplicate request ID");
         return toScopedAStatus(maybeRequestIds, StatusCode::INVALID_ARG);
     }
 
     for (auto& request : setValueRequests) {
         int64_t requestId = request.requestId;
         if (auto result = checkProperty(request.value); !result.ok()) {
-            ALOGW("property not valid: %s", result.error().message().c_str());
+            ALOGW("setValues[%" PRId64 "]: property not valid: %s", requestId,
+                  result.error().message().c_str());
             failedResults.push_back(SetValueResult{
                     .requestId = requestId,
                     .status = StatusCode::INVALID_ARG,
@@ -233,8 +296,8 @@
 
     // Register the pending hardware requests and also check for duplicate request Ids.
     if (auto addRequestResult = client->addRequests(hardwareRequestIds); !addRequestResult.ok()) {
-        ALOGE("failed to add pending requests, error: %s",
-              addRequestResult.error().message().c_str());
+        ALOGE("setValues[%s], failed to add pending requests, error: %s",
+              toString(hardwareRequestIds).c_str(), addRequestResult.error().message().c_str());
         return toScopedAStatus(addRequestResult, StatusCode::INVALID_ARG);
     }
 
@@ -249,7 +312,8 @@
         // If the hardware returns error, finish all the pending requests for this request because
         // we never expect hardware to call callback for these requests.
         client->tryFinishRequests(hardwareRequestIds);
-        ALOGE("failed to set value to VehicleHardware, status: %d", toInt(status));
+        ALOGE("setValues[%s], failed to set value to VehicleHardware, status: %d",
+              toString(hardwareRequestIds).c_str(), toInt(status));
         return ScopedAStatus::fromServiceSpecificErrorWithMessage(
                 toInt(status), "failed to set value to VehicleHardware");
     }
@@ -298,12 +362,10 @@
 
 ScopedAStatus DefaultVehicleHal::subscribe(const CallbackType&,
                                            const std::vector<SubscribeOptions>&, int32_t) {
-    // TODO(b/200737967): implement this.
     return ScopedAStatus::ok();
 }
 
 ScopedAStatus DefaultVehicleHal::unsubscribe(const CallbackType&, const std::vector<int32_t>&) {
-    // TODO(b/200737967): implement this.
     return ScopedAStatus::ok();
 }
 
diff --git a/automotive/vehicle/aidl/impl/vhal/test/DefaultVehicleHalTest.cpp b/automotive/vehicle/aidl/impl/vhal/test/DefaultVehicleHalTest.cpp
index 6970e48..d763f03 100644
--- a/automotive/vehicle/aidl/impl/vhal/test/DefaultVehicleHalTest.cpp
+++ b/automotive/vehicle/aidl/impl/vhal/test/DefaultVehicleHalTest.cpp
@@ -14,6 +14,7 @@
  * limitations under the License.
  */
 
+#include "ConnectedClient.h"
 #include "DefaultVehicleHal.h"
 #include "MockVehicleCallback.h"